Skip to content

RTIO: Cancel multi-shot items on error as Error Handling #93543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 77 additions & 37 deletions subsys/rtio/rtio_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ void rtio_executor_submit(struct rtio *r)
#ifdef CONFIG_ASSERT
bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
bool multishot = iodev_sqe->sqe.flags & RTIO_SQE_MULTISHOT;

__ASSERT(transaction != chained,
"Expected chained or transaction flag, not both");
__ASSERT((transaction ^ chained ^ multishot) &&
!(transaction && chained && multishot),
"Cannot have more than one of these flags"
" enabled: transaction, chained or multishot");
Comment on lines +89 to +92
Copy link
Member Author

@ubieda ubieda Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why SonarQube says multishot is not used. Maybe it discards __ASSERT()'s regardless?

#endif
node = mpsc_pop(&iodev_sqe->r->sq);

Expand Down Expand Up @@ -122,71 +125,108 @@ void rtio_executor_submit(struct rtio *r)
/**
* @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
*
* @param[in] r RTIO context
* @param[in] curr Current IODev SQE that's being marked for finished.
* @param[in] is_canceled Whether or not the SQE is canceled
* @param[in] iodev_sqe IODEV SQE that's being marked as finished.
* @param[in] result The result of the latest request iteration
* @param[in] is_ok Whether or not the SQE's result was successful
*/
static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_iodev_sqe *curr,
bool is_canceled)
static inline void rtio_executor_handle_multishot(struct rtio_iodev_sqe *iodev_sqe,
int result, bool is_ok)
{
/* Reset the mempool if needed */
if (curr->sqe.op == RTIO_OP_RX && FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, curr->sqe.flags)) {
if (is_canceled) {
/* Free the memory first since no CQE will be generated */
LOG_DBG("Releasing memory @%p size=%u", (void *)curr->sqe.rx.buf,
curr->sqe.rx.buf_len);
rtio_release_buffer(r, curr->sqe.rx.buf, curr->sqe.rx.buf_len);
}
if (!is_ok) {
/** Error handling of multi-shot submissions require
* stopping re-submitting if something goes wrong.
* Let the application decide what's best for handling
* the corresponding error: whether re-submitting,
* rebooting or anything else.
*/
iodev_sqe->sqe.flags |= RTIO_SQE_CANCELED;
}

struct rtio *r = iodev_sqe->r;
const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
const bool uses_mempool = FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, iodev_sqe->sqe.flags) == 1;
const bool requires_response = FIELD_GET(RTIO_SQE_NO_RESPONSE, iodev_sqe->sqe.flags) == 0;
uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
void *userdata = iodev_sqe->sqe.userdata;

if (requires_response) {
rtio_cqe_submit(r, result, userdata, cqe_flags);
}

if (iodev_sqe->sqe.op == RTIO_OP_RX && uses_mempool) {
/* Reset the buffer info so the next request can get a new one */
curr->sqe.rx.buf = NULL;
curr->sqe.rx.buf_len = 0;
iodev_sqe->sqe.rx.buf = NULL;
iodev_sqe->sqe.rx.buf_len = 0;
}
if (!is_canceled) {

if (is_canceled) {
LOG_DBG("Releasing memory @%p size=%u", (void *)iodev_sqe->sqe.rx.buf,
iodev_sqe->sqe.rx.buf_len);
rtio_release_buffer(r, iodev_sqe->sqe.rx.buf, iodev_sqe->sqe.rx.buf_len);
rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
} else {
/* Request was not canceled, put the SQE back in the queue */
mpsc_push(&r->sq, &curr->q);
mpsc_push(&r->sq, &iodev_sqe->q);
rtio_executor_submit(r);
}
}

static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
/**
* @brief Handle common logic one-shot items
*
* @param[in] iodev_sqe IODEV SQE that's being marked as finished.
* @param[in] result The result of the latest request iteration
* @param[in] is_ok Whether or not the SQE's result was successful
*/
static inline void rtio_executor_handle_oneshot(struct rtio_iodev_sqe *iodev_sqe,
int result, bool is_ok)
{
const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
struct rtio_iodev_sqe *curr = iodev_sqe;
struct rtio *r = iodev_sqe->r;
struct rtio_iodev_sqe *curr = iodev_sqe, *next;
void *userdata;
uint32_t sqe_flags, cqe_flags;
uint32_t sqe_flags;

/** Single-shot items may be linked as transactions or be chained together.
* Untangle the set of SQEs and act accordingly on each one.
*/
do {
userdata = curr->sqe.userdata;
void *userdata = curr->sqe.userdata;
uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
struct rtio_iodev_sqe *next = rtio_iodev_sqe_next(curr);

sqe_flags = curr->sqe.flags;
cqe_flags = rtio_cqe_compute_flags(iodev_sqe);

next = rtio_iodev_sqe_next(curr);
if (is_multishot) {
rtio_executor_handle_multishot(r, curr, is_canceled);
}
if (!is_multishot || is_canceled) {
/* SQE is no longer needed, release it */
rtio_sqe_pool_free(r->sqe_pool, curr);
}
if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
/* Request was not canceled, generate a CQE */
/* Generate a result back to the client if need be.*/
rtio_cqe_submit(r, result, userdata, cqe_flags);
}

rtio_sqe_pool_free(r->sqe_pool, curr);
curr = next;

if (!is_ok) {
/* This is an error path, so cancel any chained SQEs */
result = -ECANCELED;
}
} while (sqe_flags & RTIO_SQE_TRANSACTION);
} while (FIELD_GET(RTIO_SQE_TRANSACTION, sqe_flags) == 1);

/* curr should now be the last sqe in the transaction if that is what completed */
if (sqe_flags & RTIO_SQE_CHAINED) {
if (FIELD_GET(RTIO_SQE_CHAINED, sqe_flags) == 1) {
rtio_iodev_submit(curr);
}
}

static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
{
const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;

if (is_multishot) {
rtio_executor_handle_multishot(iodev_sqe, result, is_ok);
} else {
rtio_executor_handle_oneshot(iodev_sqe, result, is_ok);
}
}

/**
* @brief Callback from an iodev describing success
*/
Expand Down
18 changes: 15 additions & 3 deletions tests/subsys/rtio/rtio_api/src/rtio_iodev_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ struct rtio_iodev_test_data {

/* Lock around kicking off next timer */
struct k_spinlock lock;

/* Mocked result to receive by the IODEV */
int result;
};

static void rtio_iodev_test_next(struct rtio_iodev_test_data *data, bool completion)
Expand Down Expand Up @@ -64,6 +67,7 @@ static void rtio_iodev_test_complete(struct rtio_iodev_test_data *data, int stat
if (status < 0) {
rtio_iodev_sqe_err(data->txn_head, status);
rtio_iodev_test_next(data, true);
return;
}

data->txn_curr = rtio_txn_next(data->txn_curr);
Expand All @@ -80,7 +84,7 @@ static void rtio_iodev_await_signaled(struct rtio_iodev_sqe *iodev_sqe, void *us
{
struct rtio_iodev_test_data *data = userdata;

rtio_iodev_test_complete(data, 0);
rtio_iodev_test_complete(data, data->result);
}

static void rtio_iodev_timer_fn(struct k_timer *tm)
Expand All @@ -93,7 +97,7 @@ static void rtio_iodev_timer_fn(struct k_timer *tm)

switch (iodev_sqe->sqe.op) {
case RTIO_OP_NOP:
rtio_iodev_test_complete(data, 0);
rtio_iodev_test_complete(data, data->result);
break;
case RTIO_OP_RX:
rc = rtio_sqe_rx_buf(iodev_sqe, 16, 16, &buf, &buf_len);
Expand All @@ -103,7 +107,7 @@ static void rtio_iodev_timer_fn(struct k_timer *tm)
}
/* For reads the test device copies from the given userdata */
memcpy(buf, ((uint8_t *)iodev_sqe->sqe.userdata), 16);
rtio_iodev_test_complete(data, 0);
rtio_iodev_test_complete(data, data->result);
break;
case RTIO_OP_AWAIT:
rtio_iodev_sqe_await_signal(iodev_sqe, rtio_iodev_await_signaled, data);
Expand Down Expand Up @@ -138,6 +142,14 @@ void rtio_iodev_test_init(struct rtio_iodev *test)
data->txn_head = NULL;
data->txn_curr = NULL;
k_timer_init(&data->timer, rtio_iodev_timer_fn, NULL);
data->result = 0;
}

void rtio_iodev_test_set_result(struct rtio_iodev *test, int result)
{
struct rtio_iodev_test_data *data = test->data;

data->result = result;
}

#define RTIO_IODEV_TEST_DEFINE(name) \
Expand Down
40 changes: 40 additions & 0 deletions tests/subsys/rtio/rtio_api/src/test_rtio_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,46 @@ ZTEST_USER(rtio_api, test_rtio_multishot)
}
}

ZTEST(rtio_api, test_rtio_multishot_cancelled_when_fails)
{
int res;
struct rtio_sqe sqe;
struct rtio_cqe cqe;
struct rtio_sqe *handle;
struct rtio *r = &r_simple;
uint8_t *buffer = NULL;
uint32_t buffer_len = 0;

for (int i = 0 ; i < MEM_BLK_SIZE; i++) {
mempool_data[i] = i;
}

rtio_sqe_prep_read_multishot(&sqe, (struct rtio_iodev *)&iodev_test_simple, 0,
mempool_data);
res = rtio_sqe_copy_in_get_handles(r, &sqe, &handle, 1);
zassert_ok(res);

rtio_iodev_test_set_result(&iodev_test_simple, -EIO);

rtio_submit(r, 1);

/** The multi-shot SQE should fail, transmit the result and then canceled. */
zassert_equal(1, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(100)));
zassert_equal(cqe.result, -EIO, "Result should be %d but got %d", -EIO, cqe.result);

/* No more CQE's coming as it should be canceled */
zassert_equal(0, rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(100)),
"Should not get more CQEs after the canceled CQE");

rtio_sqe_drop_all(r);

/* Flush any pending CQEs */
while (rtio_cqe_copy_out(r, &cqe, 1, K_MSEC(1000)) != 0) {
rtio_cqe_get_mempool_buffer(r, &cqe, &buffer, &buffer_len);
rtio_release_buffer(r, buffer, buffer_len);
}
}

RTIO_DEFINE(r_transaction, SQE_POOL_SIZE, CQE_POOL_SIZE);

RTIO_IODEV_TEST_DEFINE(iodev_test_transaction0);
Expand Down
Loading