diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.c b/src/urpc/umq/umq_ub/umq_ub_impl.c index 175dcfec5cb2d304167b148927e7b4b91106af50..a744f1c1b70b2640d91a36cb201207a711a29596 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/umq_ub_impl.c @@ -3308,6 +3308,12 @@ static int umq_ub_poll_rx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) uint32_t max_batch = buf_count > UMQ_POST_POLL_BATCH ? UMQ_POST_POLL_BATCH : buf_count; ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh; umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); + int32_t qbuf_cnt = 0; + if (queue->state == QUEUE_STATE_ERR) { + qbuf_cnt = umq_report_incomplete_rx(queue, max_batch, buf); + umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); + return qbuf_cnt; + } urma_cr_t cr[max_batch]; uint64_t start_timestmap = umq_perf_get_start_timestamp_with_feature(queue->dev_ctx->feature); @@ -3318,7 +3324,7 @@ static int umq_ub_poll_rx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) UMQ_LIMIT_VLOG_ERR("UB RX reports rx_cr_cnt[%d]\n", rx_cr_cnt); return rx_cr_cnt; } - int32_t qbuf_cnt = 0; + int ret = 0; umq_buf_status_t qbuf_status; for (int i = 0; i < rx_cr_cnt; i++) { @@ -3344,7 +3350,6 @@ static int umq_ub_poll_rx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) } ++qbuf_cnt; } - qbuf_cnt += umq_report_incomplete_rx(queue, max_batch - rx_cr_cnt, buf + qbuf_cnt); umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); return qbuf_cnt; @@ -4053,13 +4058,18 @@ static int umq_report_incomplete_and_merge_rx( static int umq_ub_dequeue_with_poll_rx(ub_queue_t *queue, urma_cr_t *cr, umq_buf_t **buf) { int qbuf_cnt = 0; + // merge rx buffer + umq_buf_t *previous_last = NULL; + if (queue->state == QUEUE_STATE_ERR) { + return umq_report_incomplete_and_merge_rx(queue, UMQ_POST_POLL_BATCH, buf, &previous_last); + } + int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, UMQ_POST_POLL_BATCH, cr); if (rx_cr_cnt < 0) { UMQ_LIMIT_VLOG_ERR("UB RX reports rx_cr_cnt[%d]\n", rx_cr_cnt); return rx_cr_cnt; } - // merge rx buffer - umq_buf_t *previous_last = NULL; + for (int i = 0; i < rx_cr_cnt; i++) { buf[i] = umq_get_buf_by_user_ctx(queue, cr[i].user_ctx); buf[i]->io_direction = UMQ_IO_RX; @@ -4077,10 +4087,8 @@ static int umq_ub_dequeue_with_poll_rx(ub_queue_t *queue, urma_cr_t *cr, umq_buf } } umq_ub_merge_rx_buffer(buf[i], &previous_last); + qbuf_cnt++; } - qbuf_cnt = rx_cr_cnt; - qbuf_cnt += - umq_report_incomplete_and_merge_rx(queue, UMQ_POST_POLL_BATCH - rx_cr_cnt, buf + qbuf_cnt, &previous_last); return qbuf_cnt; } @@ -4117,14 +4125,19 @@ static inline int process_imm_msg(uint64_t umqh_tp, umq_buf_t *buf, urma_cr_t *c static int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq_buf_t **buf) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; + // merge rx buffer + umq_buf_t *previous_last = NULL; + if (queue->state == QUEUE_STATE_ERR) { + return umq_report_incomplete_and_merge_rx(queue, UMQ_POST_POLL_BATCH, buf, &previous_last); + } + int qbuf_cnt = 0; int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, UMQ_POST_POLL_BATCH, cr); if (rx_cr_cnt < 0) { UMQ_LIMIT_VLOG_ERR("UB RX reports rx_cr_cnt[%d]\n", rx_cr_cnt); return rx_cr_cnt; } - // merge rx buffer - umq_buf_t *previous_last = NULL; + for (int i = 0; i < rx_cr_cnt; i++) { buf[qbuf_cnt] = umq_get_buf_by_user_ctx(queue, cr[i].user_ctx); if (process_imm_msg(umqh_tp, buf[qbuf_cnt], cr + i) == UMQ_CONTINUE_FLAG) { @@ -4150,8 +4163,6 @@ static int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq if (rx_cr_cnt != 0) { umq_ub_fill_rx_buffer(queue, rx_cr_cnt); } - qbuf_cnt += - umq_report_incomplete_and_merge_rx(queue, UMQ_POST_POLL_BATCH - rx_cr_cnt, buf + qbuf_cnt, &previous_last); return qbuf_cnt; }