Skip to content

Commit e4b1032

Browse files
teburdstephanosio
authored andcommitted
rtio: Use mpsc for submission and completion queue
Rather than the rings, which weren't shared between userspace and kernel space in Zephyr like they are in Linux with io_uring, use atomic mpsc queues for submission and completion queues. Most importantly this removes a potential head of line blocker in the submission queue as the sqe would be held until a task is completed. As additional bonuses this avoids some additional locks and restrictions about what can be submitted and where. It also removes the need for two executors as all chains/transactions are done concurrently. Lastly this opens up the possibility for a common pool of sqe's to allocate from potentially saving lots of memory. Signed-off-by: Tom Burdick <[email protected]>
1 parent 92f0b54 commit e4b1032

File tree

20 files changed

+545
-1123
lines changed

20 files changed

+545
-1123
lines changed

drivers/spi/spi_sam.c

+37-31
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ LOG_MODULE_REGISTER(spi_sam);
2121
#include <zephyr/drivers/pinctrl.h>
2222
#include <zephyr/drivers/clock_control/atmel_sam_pmc.h>
2323
#include <zephyr/rtio/rtio.h>
24-
#include <zephyr/rtio/rtio_executor_simple.h>
2524
#include <zephyr/sys/__assert.h>
2625
#include <zephyr/sys/util.h>
2726
#include <soc.h>
@@ -55,8 +54,8 @@ struct spi_sam_data {
5554
#ifdef CONFIG_SPI_RTIO
5655
struct rtio *r; /* context for thread calls */
5756
struct rtio_iodev iodev;
58-
struct rtio_iodev_sqe *iodev_sqe;
59-
struct rtio_sqe *sqe;
57+
struct rtio_iodev_sqe *txn_head;
58+
struct rtio_iodev_sqe *txn_curr;
6059
struct spi_dt_spec dt_spec;
6160
#endif
6261

@@ -304,7 +303,7 @@ static void dma_callback(const struct device *dma_dev, void *user_data,
304303
struct spi_sam_data *drv_data = dev->data;
305304

306305
#ifdef CONFIG_SPI_RTIO
307-
if (drv_data->iodev_sqe != NULL) {
306+
if (drv_data->txn_head != NULL) {
308307
spi_sam_iodev_complete(dev, status);
309308
return;
310309
}
@@ -323,7 +322,7 @@ static int spi_sam_dma_txrx(const struct device *dev,
323322
const struct spi_sam_config *drv_cfg = dev->config;
324323
struct spi_sam_data *drv_data = dev->data;
325324
#ifdef CONFIG_SPI_RTIO
326-
bool blocking = drv_data->iodev_sqe == NULL;
325+
bool blocking = drv_data->txn_head == NULL;
327326
#else
328327
bool blocking = true;
329328
#endif
@@ -648,12 +647,13 @@ static bool spi_sam_is_regular(const struct spi_buf_set *tx_bufs,
648647
#else
649648

650649
static void spi_sam_iodev_complete(const struct device *dev, int status);
650+
static void spi_sam_iodev_next(const struct device *dev, bool completion);
651651

652652
static void spi_sam_iodev_start(const struct device *dev)
653653
{
654654
const struct spi_sam_config *cfg = dev->config;
655655
struct spi_sam_data *data = dev->data;
656-
struct rtio_sqe *sqe = data->sqe;
656+
struct rtio_sqe *sqe = &data->txn_curr->sqe;
657657
int ret = 0;
658658

659659
switch (sqe->op) {
@@ -671,9 +671,10 @@ static void spi_sam_iodev_start(const struct device *dev)
671671
break;
672672
default:
673673
LOG_ERR("Invalid op code %d for submission %p\n", sqe->op, (void *)sqe);
674-
rtio_iodev_sqe_err(data->iodev_sqe, -EINVAL);
675-
data->iodev_sqe = NULL;
676-
data->sqe = NULL;
674+
struct rtio_iodev_sqe *txn_head = data->txn_head;
675+
676+
spi_sam_iodev_next(dev, true);
677+
rtio_iodev_sqe_err(txn_head, -EINVAL);
677678
ret = 0;
678679
}
679680
if (ret == 0) {
@@ -687,7 +688,7 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)
687688

688689
k_spinlock_key_t key = spi_spin_lock(dev);
689690

690-
if (!completion && data->iodev_sqe != NULL) {
691+
if (!completion && data->txn_curr != NULL) {
691692
spi_spin_unlock(dev, key);
692693
return;
693694
}
@@ -697,17 +698,17 @@ static void spi_sam_iodev_next(const struct device *dev, bool completion)
697698
if (next != NULL) {
698699
struct rtio_iodev_sqe *next_sqe = CONTAINER_OF(next, struct rtio_iodev_sqe, q);
699700

700-
data->iodev_sqe = next_sqe;
701-
data->sqe = (struct rtio_sqe *)next_sqe->sqe;
701+
data->txn_head = next_sqe;
702+
data->txn_curr = next_sqe;
702703
} else {
703-
data->iodev_sqe = NULL;
704-
data->sqe = NULL;
704+
data->txn_head = NULL;
705+
data->txn_curr = NULL;
705706
}
706707

707708
spi_spin_unlock(dev, key);
708709

709-
if (data->iodev_sqe != NULL) {
710-
struct spi_dt_spec *spi_dt_spec = data->sqe->iodev->data;
710+
if (data->txn_curr != NULL) {
711+
struct spi_dt_spec *spi_dt_spec = data->txn_curr->sqe.iodev->data;
711712
struct spi_config *spi_cfg = &spi_dt_spec->config;
712713

713714
spi_sam_configure(dev, spi_cfg);
@@ -720,15 +721,15 @@ static void spi_sam_iodev_complete(const struct device *dev, int status)
720721
{
721722
struct spi_sam_data *data = dev->data;
722723

723-
if (data->sqe->flags & RTIO_SQE_TRANSACTION) {
724-
data->sqe = rtio_spsc_next(data->iodev_sqe->r->sq, data->sqe);
724+
if (data->txn_curr->sqe.flags & RTIO_SQE_TRANSACTION) {
725+
data->txn_curr = rtio_txn_next(data->txn_curr);
725726
spi_sam_iodev_start(dev);
726727
} else {
727-
struct rtio_iodev_sqe *iodev_sqe = data->iodev_sqe;
728+
struct rtio_iodev_sqe *txn_head = data->txn_head;
728729

729730
spi_context_cs_control(&data->ctx, false);
730731
spi_sam_iodev_next(dev, true);
731-
rtio_iodev_sqe_ok(iodev_sqe, status);
732+
rtio_iodev_sqe_ok(txn_head, status);
732733
}
733734
}
734735

@@ -760,20 +761,27 @@ static int spi_sam_transceive(const struct device *dev,
760761

761762
dt_spec->config = *config;
762763

763-
sqe = spi_rtio_copy(data->r, &data->iodev, tx_bufs, rx_bufs);
764-
if (sqe == NULL) {
765-
err = -ENOMEM;
764+
int ret = spi_rtio_copy(data->r, &data->iodev, tx_bufs, rx_bufs, &sqe);
765+
766+
if (ret < 0) {
767+
err = ret;
766768
goto done;
767769
}
768770

769771
/* Submit request and wait */
770-
rtio_submit(data->r, 1);
772+
rtio_submit(data->r, ret);
771773

772-
cqe = rtio_cqe_consume(data->r);
774+
while (ret > 0) {
775+
cqe = rtio_cqe_consume(data->r);
773776

774-
err = cqe->result;
777+
if (cqe->result < 0) {
778+
err = cqe->result;
779+
}
780+
781+
rtio_cqe_release(data->r, cqe);
775782

776-
rtio_cqe_release(data->r);
783+
ret--;
784+
}
777785
#else
778786
const struct spi_sam_config *cfg = dev->config;
779787

@@ -905,10 +913,8 @@ static const struct spi_driver_api spi_sam_driver_api = {
905913
COND_CODE_1(SPI_SAM_USE_DMA(n), (SPI_DMA_INIT(n)), ()) \
906914
}
907915

908-
#define SPI_SAM_RTIO_DEFINE(n) \
909-
RTIO_EXECUTOR_SIMPLE_DEFINE(spi_sam_exec_##n); \
910-
RTIO_DEFINE(spi_sam_rtio_##n, (struct rtio_executor *)&spi_sam_exec_##n, \
911-
CONFIG_SPI_SAM_RTIO_SQ_SIZE, 1)
916+
#define SPI_SAM_RTIO_DEFINE(n) RTIO_DEFINE(spi_sam_rtio_##n, CONFIG_SPI_SAM_RTIO_SQ_SIZE, \
917+
CONFIG_SPI_SAM_RTIO_SQ_SIZE)
912918

913919
#define SPI_SAM_DEVICE_INIT(n) \
914920
PINCTRL_DT_INST_DEFINE(n); \

include/zephyr/drivers/spi.h

+24-14
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,7 @@ __deprecated static inline int spi_write_async(const struct device *dev,
904904
*/
905905
static inline void spi_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
906906
{
907-
const struct spi_dt_spec *dt_spec = iodev_sqe->sqe->iodev->data;
907+
const struct spi_dt_spec *dt_spec = iodev_sqe->sqe.iodev->data;
908908
const struct device *dev = dt_spec->bus;
909909
const struct spi_driver_api *api = (const struct spi_driver_api *)dev->api;
910910

@@ -944,26 +944,31 @@ static inline bool spi_is_ready_iodev(const struct rtio_iodev *spi_iodev)
944944
/**
945945
* @brief Copy the tx_bufs and rx_bufs into a set of RTIO requests
946946
*
947-
* @param r RTIO context
948-
* @param tx_bufs Transmit buffer set
949-
* @param rx_bufs Receive buffer set
947+
* @param r rtio context
948+
* @param iodev iodev to transceive with
949+
* @param tx_bufs transmit buffer set
950+
* @param rx_bufs receive buffer set
951+
* @param sqe[out] Last sqe submitted, NULL if not enough memory
950952
*
951-
* @retval sqe Last submission in the queue added
952-
* @retval NULL Not enough memory in the context to copy the requests
953+
* @retval Number of submission queue entries
954+
* @retval -ENOMEM out of memory
953955
*/
954-
static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
955-
struct rtio_iodev *iodev,
956-
const struct spi_buf_set *tx_bufs,
957-
const struct spi_buf_set *rx_bufs)
956+
static inline int spi_rtio_copy(struct rtio *r,
957+
struct rtio_iodev *iodev,
958+
const struct spi_buf_set *tx_bufs,
959+
const struct spi_buf_set *rx_bufs,
960+
struct rtio_sqe **last_sqe)
958961
{
959-
struct rtio_sqe *sqe = NULL;
962+
int ret = 0;
960963
size_t tx_count = tx_bufs ? tx_bufs->count : 0;
961964
size_t rx_count = rx_bufs ? rx_bufs->count : 0;
962965

963966
uint32_t tx = 0, tx_len = 0;
964967
uint32_t rx = 0, rx_len = 0;
965968
uint8_t *tx_buf, *rx_buf;
966969

970+
struct rtio_sqe *sqe = NULL;
971+
967972
if (tx < tx_count) {
968973
tx_buf = tx_bufs->buffers[tx].buf;
969974
tx_len = tx_bufs->buffers[tx].len;
@@ -985,10 +990,13 @@ static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
985990
sqe = rtio_sqe_acquire(r);
986991

987992
if (sqe == NULL) {
988-
rtio_spsc_drop_all(r->sq);
989-
return NULL;
993+
ret = -ENOMEM;
994+
rtio_sqe_drop_all(r);
995+
goto out;
990996
}
991997

998+
ret++;
999+
9921000
/* If tx/rx len are same, we can do a simple transceive */
9931001
if (tx_len == rx_len) {
9941002
if (tx_buf == NULL) {
@@ -1084,9 +1092,11 @@ static inline struct rtio_sqe *spi_rtio_copy(struct rtio *r,
10841092

10851093
if (sqe != NULL) {
10861094
sqe->flags = 0;
1095+
*last_sqe = sqe;
10871096
}
10881097

1089-
return sqe;
1098+
out:
1099+
return ret;
10901100
}
10911101

10921102
#endif /* CONFIG_SPI_RTIO */

include/zephyr/linker/common-ram.ld

+2
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@
115115
#if defined(CONFIG_RTIO)
116116
ITERABLE_SECTION_RAM(rtio, 4)
117117
ITERABLE_SECTION_RAM(rtio_iodev, 4)
118+
ITERABLE_SECTION_RAM(rtio_sqe_pool, 4)
119+
ITERABLE_SECTION_RAM(rtio_cqe_pool, 4)
118120
#endif /* CONFIG_RTIO */
119121

120122

0 commit comments

Comments
 (0)