diff --git a/src/urpc/include/umq/umq_types.h b/src/urpc/include/umq/umq_types.h index 9d5ca7ddaf1baa16c88541071c18258de214dea1..d545445967d8e5e2aed52fdd95dcb645ac7e1ef6 100644 --- a/src/urpc/include/umq/umq_types.h +++ b/src/urpc/include/umq/umq_types.h @@ -197,6 +197,9 @@ typedef struct umq_init_cfg { #define UMQ_CREATE_FLAG_RX_DEPTH (1 << 2) // enable arg rx_depth when create umq #define UMQ_CREATE_FLAG_TX_DEPTH (1 << 3) // enable arg tx_depth when create umq #define UMQ_CREATE_FLAG_QUEUE_MODE (1 << 4) // enable arg mode when create umq +#define UMQ_CREATE_FLAG_SHARE_RQ (1 << 5) // enable arg share_rq_umqh when create umq +#define UMQ_CREATE_FLAG_UMQ_CTX (1 << 6) // enable arg umq_ctx when create umq +#define UMQ_CREATE_FLAG_SUB_UMQ (1 << 7) // just indicates the umq is sub queue typedef struct umq_create_option { /*************Required paramenters start*****************/ @@ -211,6 +214,8 @@ typedef struct umq_create_option { uint32_t tx_buf_size; uint32_t rx_depth; uint32_t tx_depth; + uint64_t share_rq_umqh; + uint64_t umq_ctx; umq_queue_mode_t mode; // mode of queue, QUEUE_MODE_POLLING for default /*************Optional paramenters end*******************/ diff --git a/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c b/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c index 1e141e7e7ec6bd06856eb93feead1124cd2a1fd7..ca5be2bf76a25a754e373cfa9514016074082fdc 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c +++ b/src/urpc/umq/umq_ub/core/private/umq_pro_ub.c @@ -18,7 +18,7 @@ int rx_buf_ctx_list_init(ub_queue_t *queue) { - rx_buf_ctx_list_t *rx_buf_ctx_list = &queue->rx_buf_ctx_list; + rx_buf_ctx_list_t *rx_buf_ctx_list = &queue->jfr_ctx->rx_buf_ctx_list; uint32_t num = queue->rx_depth; rx_buf_ctx_list->addr = calloc(num, sizeof(rx_buf_ctx_t)); @@ -84,7 +84,7 @@ static inline umq_buf_t *umq_get_buf_by_user_ctx(ub_queue_t *queue, uint64_t use { rx_buf_ctx_t *rx_buf_ctx = (rx_buf_ctx_t *)(uintptr_t)user_ctx; umq_buf_t *buf = rx_buf_ctx->buffer; - queue_rx_buf_ctx_put(&queue->rx_buf_ctx_list, rx_buf_ctx); + queue_rx_buf_ctx_put(&queue->jfr_ctx->rx_buf_ctx_list, rx_buf_ctx); return buf; } @@ -342,7 +342,7 @@ static ALWAYS_INLINE void process_bad_wr(ub_queue_t *queue, urma_jfr_wr_t *bad_w fail = fail->qbuf_next; } - queue_rx_buf_ctx_put(&queue->rx_buf_ctx_list, rx_buf_ctx); + queue_rx_buf_ctx_put(&queue->jfr_ctx->rx_buf_ctx_list, rx_buf_ctx); last_fail_end = fail; wr = wr->next; } @@ -382,7 +382,7 @@ int umq_ub_post_rx_inner_impl(ub_queue_t *queue, umq_buf_t *qbuf, umq_buf_t **ba uint32_t rest_size = buffer->total_data_size; uint32_t sge_num = 0; - rx_buf_ctx = queue_rx_buf_ctx_get(&queue->rx_buf_ctx_list); + rx_buf_ctx = queue_rx_buf_ctx_get(&queue->jfr_ctx->rx_buf_ctx_list); if (rx_buf_ctx == NULL) { UMQ_LIMIT_VLOG_ERR("rx buf ctx is used up\n"); goto PUT_ALL_RX_CTX; @@ -454,7 +454,7 @@ int umq_ub_post_rx_inner_impl(ub_queue_t *queue, umq_buf_t *qbuf, umq_buf_t **ba PUT_CUR_RX_CTX: buffer = rx_buf_ctx->buffer; // put rx buf ctx that was not added to recv wr - queue_rx_buf_ctx_put(&queue->rx_buf_ctx_list, rx_buf_ctx); + queue_rx_buf_ctx_put(&queue->jfr_ctx->rx_buf_ctx_list, rx_buf_ctx); PUT_ALL_RX_CTX: // put rx buf in recv wr @@ -579,13 +579,13 @@ static int umq_report_incomplete_rx(ub_queue_t *queue, uint32_t max_rx_ctx, umq_ { int buf_cnt = 0; if (!queue->tx_flush_done || queue->rx_flush_done || - queue->state != QUEUE_STATE_ERR || queue->jfr->jfr_cfg.trans_mode != URMA_TM_RC) { + queue->state != QUEUE_STATE_ERR || queue->jfr_ctx->jfr->jfr_cfg.trans_mode != URMA_TM_RC) { return buf_cnt; } rx_buf_ctx_t *rx_buf_ctx; for (buf_cnt = 0; buf_cnt < (int)max_rx_ctx; buf_cnt++) { - rx_buf_ctx = queue_rx_buf_ctx_flush(&queue->rx_buf_ctx_list); + rx_buf_ctx = queue_rx_buf_ctx_flush(&queue->jfr_ctx->rx_buf_ctx_list); if (rx_buf_ctx == NULL) { break; } @@ -630,7 +630,7 @@ int umq_ub_poll_rx(uint64_t umqh, umq_buf_t **buf, uint32_t buf_count) urma_cr_t cr[max_batch]; uint64_t start_timestmap = umq_perf_get_start_timestamp_with_feature(queue->dev_ctx->feature); - int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, max_batch, cr); + int rx_cr_cnt = urma_poll_jfc(queue->jfr_ctx->jfr_jfc, max_batch, cr); umq_perf_record_write_poll(UMQ_PERF_RECORD_TRANSPORT_POLL_RX, start_timestmap, queue->dev_ctx->feature, rx_cr_cnt); if (rx_cr_cnt < 0) { umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->ref_cnt, 1); diff --git a/src/urpc/umq/umq_ub/core/private/umq_ub.c b/src/urpc/umq/umq_ub/core/private/umq_ub.c index 8513466606591ba3373f6cbf0eec97d38b348bed..4d68cc73410bf30e20ead13b1c007d83313eac08 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_ub.c +++ b/src/urpc/umq/umq_ub/core/private/umq_ub.c @@ -342,7 +342,7 @@ int umq_modify_ubq_to_err(ub_queue_t *queue) .mask = JETTY_STATE, .state = URMA_JFR_STATE_ERROR, }; - urma_status = urma_modify_jfr(queue->jfr, &jfr_attr); + urma_status = urma_modify_jfr(queue->jfr_ctx->jfr, &jfr_attr); if (urma_status != URMA_SUCCESS) { UMQ_VLOG_ERR("modify jfr to URMA_JFR_STATE_ERROR fail, status %u\n", urma_status); } @@ -601,7 +601,7 @@ urma_jetty_t *umq_create_jetty(ub_queue_t *queue, umq_ub_ctx_t *dev_ctx) .id = 0, }; jetty_cfg.flag.bs.share_jfr = true; - jetty_cfg.shared.jfr = queue->jfr; + jetty_cfg.shared.jfr = queue->jfr_ctx->jfr; urma_jetty_t *jetty = urma_create_jetty(dev_ctx->urma_ctx, &jetty_cfg); if (jetty == NULL) { @@ -657,7 +657,7 @@ int check_and_set_param(umq_ub_ctx_t *dev_ctx, umq_create_option_t *option, ub_q if (option->create_flag & UMQ_CREATE_FLAG_TX_DEPTH) { if (option->tx_depth > dev_ctx->dev_attr.dev_cap.max_jfc_depth) { - UMQ_VLOG_ERR("rx depth [%u] exceed max depth [%d]\n", option->tx_depth, + UMQ_VLOG_ERR("tx depth [%u] exceed max depth [%d]\n", option->tx_depth, dev_ctx->dev_attr.dev_cap.max_jfc_depth); return -UMQ_ERR_EINVAL; } @@ -685,9 +685,131 @@ int check_and_set_param(umq_ub_ctx_t *dev_ctx, umq_create_option_t *option, ub_q queue->dev_ctx = dev_ctx; queue->umq_trans_mode = option->trans_mode; queue->remote_rx_buf_size = dev_ctx->dev_attr.dev_cap.max_msg_size; + queue->create_flag = option->create_flag; + queue->share_rq_umqh = option->share_rq_umqh; return UMQ_SUCCESS; } +int share_rq_param_check(ub_queue_t *queue, ub_queue_t *share_rq) +{ + if (share_rq->state == QUEUE_STATE_ERR) { + UMQ_VLOG_ERR("the share_rq is invalid\n"); + errno = UMQ_ERR_EINVAL; + return -UMQ_ERR_EINVAL; + } + if (share_rq->rx_depth != queue->rx_depth || share_rq->dev_ctx != queue->dev_ctx || + share_rq->rx_buf_size != queue->rx_buf_size || share_rq->mode != queue->mode) { + UMQ_VLOG_ERR("the rx params of share_rq and creating queue are not same\n"); + errno = UMQ_ERR_EINVAL; + return -UMQ_ERR_EINVAL; + } + return UMQ_SUCCESS; +} + +void umq_ub_jfr_ctx_destroy(ub_queue_t *queue) +{ + uint32_t new_value = __atomic_sub_fetch(&queue->jfr_ctx->ref_cnt, 1, __ATOMIC_RELAXED); + UMQ_VLOG_DEBUG("jfr_ctx ref_cnt %u\n", new_value); + if (new_value > 0) { + return; + } + UMQ_VLOG_INFO("destroy jfr_ctx, eid: " EID_FMT ", jfr_id: %u\n", + EID_ARGS(queue->jfr_ctx->jfr->jfr_id.eid), queue->jfr_ctx->jfr->jfr_id.id); + if (urma_delete_jfr(queue->jfr_ctx->jfr) != URMA_SUCCESS) { + UMQ_VLOG_ERR("delete jfr failed\n"); + } + if (urma_delete_jfc(queue->jfr_ctx->jfr_jfc) != URMA_SUCCESS) { + UMQ_VLOG_ERR("delete jfr_jfc failed\n"); + } + if (queue->mode == UMQ_MODE_INTERRUPT) { + if (urma_delete_jfce(queue->jfr_ctx->jfr_jfce) != URMA_SUCCESS) { + UMQ_VLOG_ERR("delete jfr_jfce failed\n"); + } + } + rx_buf_ctx_list_uninit(&queue->jfr_ctx->rx_buf_ctx_list); + free(queue->jfr_ctx); + queue->jfr_ctx = NULL; +} + +int umq_ub_jfr_ctx_create(ub_queue_t *queue, umq_ub_ctx_t *dev_ctx, umq_create_option_t *option, + ub_queue_t *share_queue) +{ + if ((option->create_flag & UMQ_CREATE_FLAG_SHARE_RQ) != 0) { + queue->jfr_ctx = share_queue->jfr_ctx; + (void)__atomic_add_fetch(&queue->jfr_ctx->ref_cnt, 1, __ATOMIC_RELAXED); + return UMQ_SUCCESS; + } + bool enable_token = (dev_ctx->feature & UMQ_FEATURE_ENABLE_TOKEN_POLICY) != 0; + uint32_t jetty_token; + if (umq_ub_token_generate(enable_token, &jetty_token) != 0) { + UMQ_VLOG_ERR("generate jetty token failed\n"); + return UMQ_FAIL; + } + queue->jfr_ctx = calloc(1, sizeof(jfr_ctx_t)); + if (queue->jfr_ctx == NULL) { + UMQ_VLOG_ERR("get_jfr_ctx failed, calloc jfr_ctx failed\n"); + return -UMQ_ERR_ENOMEM; + } + // create jfce + if (queue->mode == UMQ_MODE_INTERRUPT) { + queue->jfr_ctx->jfr_jfce = urma_create_jfce(dev_ctx->urma_ctx); + if (queue->jfr_ctx->jfr_jfce == NULL) { + UMQ_VLOG_ERR("create jfr_jfce failed\n"); + goto FREE_JFR_CTX; + } + } + // create jfr_jfc + urma_jfc_cfg_t jfr_jfc_cfg = { + .depth = queue->rx_depth, + .jfce = queue->jfr_ctx->jfr_jfce + }; + queue->jfr_ctx->jfr_jfc = urma_create_jfc(dev_ctx->urma_ctx, &jfr_jfc_cfg); + if (queue->jfr_ctx->jfr_jfc == NULL) { + UMQ_VLOG_ERR("urma create jfr_jfc failed\n"); + goto DELETE_JFR_JFCE; + } + // create jfr + urma_jfr_cfg_t jfr_cfg = { + .flag.bs.token_policy = token_policy_get(enable_token), + .trans_mode = URMA_TM_RC, + .depth = queue->rx_depth, + .max_sge = queue->max_rx_sge, + .min_rnr_timer = queue->min_rnr_timer, + .jfc = queue->jfr_ctx->jfr_jfc, + .token_value = { .token = jetty_token } + }; + jfr_cfg.flag.bs.order_type = dev_ctx->order_type; + queue->jfr_ctx->jfr = urma_create_jfr(dev_ctx->urma_ctx, &jfr_cfg); + if (queue->jfr_ctx->jfr == NULL) { + UMQ_VLOG_ERR("urma create jfr failed\n"); + goto DELETE_JFR_JFC; + } + if (rx_buf_ctx_list_init(queue) != UMQ_SUCCESS) { + UMQ_VLOG_ERR("rx buf ctx list init failed\n"); + goto DELETE_JFR; + } + queue->jfr_ctx->ref_cnt = 1; + UMQ_VLOG_INFO("create jfr_ctx success, eid: " EID_FMT ", jfr_id: %u\n", + EID_ARGS(queue->jfr_ctx->jfr->jfr_id.eid), queue->jfr_ctx->jfr->jfr_id.id); + return UMQ_SUCCESS; + +DELETE_JFR: + (void)urma_delete_jfr(queue->jfr_ctx->jfr); + +DELETE_JFR_JFC: + (void)urma_delete_jfc(queue->jfr_ctx->jfr_jfc); + +DELETE_JFR_JFCE: + if (queue->mode == UMQ_MODE_INTERRUPT) { + (void)urma_delete_jfce(queue->jfr_ctx->jfr_jfce); + } + +FREE_JFR_CTX: + free(queue->jfr_ctx); + queue->jfr_ctx = NULL; + return UMQ_FAIL; +} + uint32_t token_policy_get(bool enable) { return enable ? URMA_TOKEN_PLAIN_TEXT : URMA_TOKEN_NONE; @@ -758,7 +880,7 @@ void handle_async_event_jfc_err(urma_async_event_t *urma_event, umq_async_event_ (void)pthread_rwlock_rdlock(&g_umq_ub_queue_ctx_list.lock); URPC_LIST_FOR_EACH(local, qctx_node, &g_umq_ub_queue_ctx_list.queue_list) { - if (local->jfs_jfc == urma_event->element.jfc || local->jfr_jfc == urma_event->element.jfc) { + if (local->jfs_jfc == urma_event->element.jfc || local->jfr_ctx->jfr_jfc == urma_event->element.jfc) { umq_event->element.umqh = local->umqh; break; } @@ -774,7 +896,7 @@ void handle_async_event_jfr_err(urma_async_event_t *urma_event, umq_async_event_ (void)pthread_rwlock_rdlock(&g_umq_ub_queue_ctx_list.lock); URPC_LIST_FOR_EACH(local, qctx_node, &g_umq_ub_queue_ctx_list.queue_list) { - if (local->jfr == urma_event->element.jfr) { + if (local->jfr_ctx->jfr == urma_event->element.jfr) { umq_event->element.umqh = local->umqh; break; } @@ -790,7 +912,7 @@ void handle_async_event_jfr_limit(urma_async_event_t *urma_event, umq_async_even (void)pthread_rwlock_rdlock(&g_umq_ub_queue_ctx_list.lock); URPC_LIST_FOR_EACH(local, qctx_node, &g_umq_ub_queue_ctx_list.queue_list) { - if (local->jfr == urma_event->element.jfr) { + if (local->jfr_ctx->jfr == urma_event->element.jfr) { umq_event->element.umqh = local->umqh; break; } @@ -1512,12 +1634,12 @@ static int umq_report_incomplete_and_merge_rx( { int buf_cnt = 0; if (!queue->tx_flush_done || queue->rx_flush_done || - queue->state != QUEUE_STATE_ERR || queue->jfr->jfr_cfg.trans_mode != URMA_TM_RC) { + queue->state != QUEUE_STATE_ERR || queue->jfr_ctx->jfr->jfr_cfg.trans_mode != URMA_TM_RC) { return buf_cnt; } rx_buf_ctx_t *rx_buf_ctx; for (; buf_cnt < max_rx_ctx; buf_cnt++) { - rx_buf_ctx = queue_rx_buf_ctx_flush(&queue->rx_buf_ctx_list); + rx_buf_ctx = queue_rx_buf_ctx_flush(&queue->jfr_ctx->rx_buf_ctx_list); if (rx_buf_ctx == NULL) { break; } @@ -1572,7 +1694,7 @@ static inline umq_buf_t *umq_get_buf_by_user_ctx(ub_queue_t *queue, uint64_t use rx_buf_ctx_t *rx_buf_ctx = (rx_buf_ctx_t *)(uintptr_t)user_ctx; umq_buf_t *buf = rx_buf_ctx->buffer; urpc_list_remove(&rx_buf_ctx->node); - urpc_list_push_back(&queue->rx_buf_ctx_list.idle_rx_buf_ctx_list, &rx_buf_ctx->node); + urpc_list_push_back(&queue->jfr_ctx->rx_buf_ctx_list.idle_rx_buf_ctx_list, &rx_buf_ctx->node); return buf; } @@ -1585,7 +1707,7 @@ int umq_ub_dequeue_with_poll_rx(ub_queue_t *queue, urma_cr_t *cr, umq_buf_t **bu return umq_report_incomplete_and_merge_rx(queue, UMQ_POST_POLL_BATCH, buf, &previous_last); } - int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, UMQ_POST_POLL_BATCH, cr); + int rx_cr_cnt = urma_poll_jfc(queue->jfr_ctx->jfr_jfc, UMQ_POST_POLL_BATCH, cr); if (rx_cr_cnt < 0) { UMQ_LIMIT_VLOG_ERR("UB RX reports rx_cr_cnt[%d]\n", rx_cr_cnt); return rx_cr_cnt; @@ -1623,7 +1745,7 @@ int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq_buf_t } int qbuf_cnt = 0; - int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, UMQ_POST_POLL_BATCH, cr); + int rx_cr_cnt = urma_poll_jfc(queue->jfr_ctx->jfr_jfc, UMQ_POST_POLL_BATCH, cr); if (rx_cr_cnt < 0) { UMQ_LIMIT_VLOG_ERR("UB RX reports rx_cr_cnt[%d]\n", rx_cr_cnt); return rx_cr_cnt; diff --git a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h index 72cb51c1ed3063dffa717e3b30024c09b092c358..c4f757b4d544181d3f3044b6ed78992f70374c88 100644 --- a/src/urpc/umq/umq_ub/core/private/umq_ub_private.h +++ b/src/urpc/umq/umq_ub/core/private/umq_ub_private.h @@ -163,6 +163,7 @@ typedef struct umq_ub_ctx { urma_target_jetty_t *tjetty; umq_trans_info_t trans_info; uint64_t remote_notify_addr; + uint64_t *umq_ctx_jetty_table; } umq_ub_ctx_t; typedef struct rx_buf_ctx { @@ -206,20 +207,27 @@ typedef struct ub_bind_ctx { uint64_t remote_notify_addr; } ub_bind_ctx_t; +typedef struct jfr_ctx { + urma_jfr_t *jfr; + urma_jfc_t *jfr_jfc; + urma_jfce_t *jfr_jfce; + volatile uint32_t ref_cnt; + rx_buf_ctx_list_t rx_buf_ctx_list; +} jfr_ctx_t; + typedef struct ub_queue { urpc_list_t qctx_node; // queue param urma_jetty_t *jetty; + jfr_ctx_t *jfr_ctx; urma_jfc_t *jfs_jfc; - urma_jfc_t *jfr_jfc; - urma_jfr_t *jfr; urma_jfce_t *jfs_jfce; - urma_jfce_t *jfr_jfce; umq_ub_ctx_t *dev_ctx; struct ub_bind_ctx *bind_ctx; volatile uint32_t ref_cnt; atomic_uint require_rx_count; volatile uint32_t tx_outstanding; + uint32_t create_flag; urma_target_seg_t **imported_tseg_list; // read-only uint64_t addr_list[UMQ_MAX_ID_NUM]; @@ -243,9 +251,9 @@ typedef struct ub_queue { bool rx_flush_done; // rx buf ctx all report umq_queue_mode_t mode; // mode of queue, QUEUE_MODE_POLLING for default umq_state_t state; - rx_buf_ctx_list_t rx_buf_ctx_list; umq_buf_t *notify_buf; // qbuf for manage message exchange, such as mem import/initial flow control window uint64_t umqh; + uint64_t share_rq_umqh; } ub_queue_t; typedef struct user_ctx { @@ -309,6 +317,10 @@ urma_jetty_t *umq_create_jetty(ub_queue_t *queue, umq_ub_ctx_t *dev_ctx); int check_and_set_param(umq_ub_ctx_t *dev_ctx, umq_create_option_t *option, ub_queue_t *queue); int umq_ub_register_seg(umq_ub_ctx_t *ctx, uint8_t mempool_id, void *addr, uint64_t size); void umq_ub_unregister_seg(umq_ub_ctx_t *ctx_list, uint32_t ctx_cnt, uint8_t mempool_id); +int share_rq_param_check(ub_queue_t *queue, ub_queue_t *share_queue); +void umq_ub_jfr_ctx_destroy(ub_queue_t *queue); +int umq_ub_jfr_ctx_create(ub_queue_t *queue, umq_ub_ctx_t *dev_ctx, umq_create_option_t *option, + ub_queue_t *share_queue); // hanele async event void handle_async_event_jfc_err(urma_async_event_t *urma_event, umq_async_event_t *umq_event); diff --git a/src/urpc/umq/umq_ub/core/umq_ub_impl.c b/src/urpc/umq/umq_ub/core/umq_ub_impl.c index ee0f3ac803339edd24edf08e217a348a783b1eec..3c57667a42c6eedce399c42c72e64e6ad63c6d16 100644 --- a/src/urpc/umq/umq_ub/core/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/core/umq_ub_impl.c @@ -428,13 +428,22 @@ uint8_t *umq_ub_ctx_init_impl(umq_init_cfg_t *cfg) g_ub_ctx[g_ub_ctx_count].remote_imported_info = umq_ub_ctx_imported_info_create(); if (g_ub_ctx[g_ub_ctx_count].remote_imported_info == NULL) { UMQ_VLOG_ERR("imported info create failed\n"); - goto ROLLBACL_UB_CTX; + goto ROLLBACK_UB_CTX; } if (umq_find_ub_device(info, &g_ub_ctx[g_ub_ctx_count]) != UMQ_SUCCESS) { - UMQ_VLOG_INFO("find ub device failed\n"); + UMQ_VLOG_ERR("find ub device failed\n"); umq_ub_ctx_imported_info_destroy(&g_ub_ctx[g_ub_ctx_count]); - goto ROLLBACL_UB_CTX; + goto ROLLBACK_UB_CTX; + } + + g_ub_ctx[g_ub_ctx_count].umq_ctx_jetty_table = (uint64_t *)calloc( + g_ub_ctx[g_ub_ctx_count].dev_attr.dev_cap.max_jetty, sizeof(uint64_t)); + if (g_ub_ctx[g_ub_ctx_count].umq_ctx_jetty_table == NULL) { + UMQ_VLOG_ERR("calloc umq_ctx_jetty_table failed\n"); + umq_ub_delete_urma_ctx(&g_ub_ctx[g_ub_ctx_count]); + umq_ub_ctx_imported_info_destroy(&g_ub_ctx[g_ub_ctx_count]); + goto ROLLBACK_UB_CTX; } if (total_io_buf_size == 0) { @@ -449,11 +458,11 @@ uint8_t *umq_ub_ctx_init_impl(umq_init_cfg_t *cfg) ++g_ub_ctx_count; } if (g_ub_ctx_count == 0) { - goto ROLLBACL_UB_CTX; + goto ROLLBACK_UB_CTX; } if (umq_io_buf_malloc(cfg->buf_mode, total_io_buf_size) == NULL) { - goto ROLLBACL_UB_CTX; + goto ROLLBACK_UB_CTX; } qbuf_pool_cfg_t qbuf_cfg = { @@ -475,10 +484,12 @@ uint8_t *umq_ub_ctx_init_impl(umq_init_cfg_t *cfg) IO_BUF_FREE: umq_io_buf_free(); -ROLLBACL_UB_CTX: +ROLLBACK_UB_CTX: for (uint32_t i = 0; i < g_ub_ctx_count; i++) { umq_ub_ctx_imported_info_destroy(&g_ub_ctx[i]); umq_ub_delete_urma_ctx(&g_ub_ctx[i]); + free(g_ub_ctx[i].umq_ctx_jetty_table); + g_ub_ctx[i].umq_ctx_jetty_table = NULL; } g_ub_ctx_count = 0; (void)urma_uninit(); @@ -513,6 +524,8 @@ void umq_ub_ctx_uninit_impl(uint8_t *ctx) umq_ub_ctx_imported_info_destroy(&context[i]); umq_dec_ref(context[i].io_lock_free, &context[i].ref_cnt, 1); urma_delete_context(context[i].urma_ctx); + free(context[i].umq_ctx_jetty_table); + context[i].umq_ctx_jetty_table = NULL; } umq_qbuf_pool_uninit(); @@ -552,23 +565,33 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op goto FREE_QUEUE; } + ub_queue_t *share_rq = NULL; + if ((option->create_flag & UMQ_CREATE_FLAG_SHARE_RQ) != 0) { + if (option->share_rq_umqh == UMQ_INVALID_HANDLE) { + UMQ_VLOG_ERR("the share_rq_umqh is invalid\n"); + goto FREE_QUEUE; + } + umq_t *umq = (umq_t *)(uintptr_t)option->share_rq_umqh; + share_rq = (ub_queue_t *)(uintptr_t)umq->umqh_tp; + if (share_rq_param_check(queue, share_rq) != UMQ_SUCCESS) { + goto FREE_QUEUE; + } + } + if (umq_ub_flow_control_init(&queue->flow_control, queue, dev_ctx->feature, &dev_ctx->flow_control) != UMQ_SUCCESS) { goto FREE_QUEUE; } - queue->jfs_jfce = NULL; - queue->jfr_jfce = NULL; + if(umq_ub_jfr_ctx_create(queue, dev_ctx, option, share_rq) != UMQ_SUCCESS) { + goto UNINIT_FLOW_CONTROL; + } + if (queue->mode == UMQ_MODE_INTERRUPT) { queue->jfs_jfce = urma_create_jfce(dev_ctx->urma_ctx); if (queue->jfs_jfce == NULL) { UMQ_VLOG_ERR("create jfs_jfce failed\n"); - goto UNINIT_FLOW_CONTROL; - } - queue->jfr_jfce = urma_create_jfce(dev_ctx->urma_ctx); - if (queue->jfr_jfce == NULL) { - UMQ_VLOG_ERR("create jfr_jfce failed\n"); - goto UNINIT_FLOW_CONTROL; + goto DESTROY_JFR_CTX; } } @@ -582,47 +605,18 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op goto DELETE_JFCE; } - jfc_cfg.depth = queue->rx_depth; - urma_jfc_cfg_t jfr_jfc_cfg = { - .depth = queue->rx_depth, - .jfce = queue->jfr_jfce - }; - queue->jfr_jfc = urma_create_jfc(dev_ctx->urma_ctx, &jfr_jfc_cfg); - if (queue->jfr_jfc == NULL) { - UMQ_VLOG_ERR("urma create jfr_jfc failed\n"); - goto DELETE_JFS_JFC; - } - - urma_jfr_cfg_t jfr_cfg = { - .flag.bs.token_policy = token_policy_get(enable_token), - .trans_mode = URMA_TM_RC, - .depth = queue->rx_depth, - .max_sge = queue->max_rx_sge, - .min_rnr_timer = queue->min_rnr_timer, - .jfc = queue->jfr_jfc, - .token_value = { .token = jetty_token } - }; - jfr_cfg.flag.bs.order_type = dev_ctx->order_type; - queue->jfr = urma_create_jfr(dev_ctx->urma_ctx, &jfr_cfg); - if (queue->jfr == NULL) { - UMQ_VLOG_ERR("urma create jfr failed\n"); - goto DELETE_JFR_JFC; - } - queue->jetty = umq_create_jetty(queue, dev_ctx); if (queue->jetty == NULL) { - goto DELETE_JFR; + goto DELETE_JFS_JFC; } - - if (rx_buf_ctx_list_init(queue) != UMQ_SUCCESS) { - UMQ_VLOG_ERR("rx buf ctx list init failed\n"); - goto DELETE_JETTY; + if ((option->create_flag & UMQ_CREATE_FLAG_UMQ_CTX) != 0) { + dev_ctx->umq_ctx_jetty_table[queue->jetty->jetty_id.id] = option->umq_ctx; } queue->notify_buf = umq_buf_alloc(umq_buf_size_small(), 1, UMQ_INVALID_HANDLE, NULL); if (queue->notify_buf == NULL) { UMQ_VLOG_ERR("buf alloc failed\n"); - goto UNINIT_RX_CTX_LIST; + goto DELETE_JETTY; } memset(queue->notify_buf->buf_data, 0, queue->notify_buf->data_size); @@ -634,21 +628,19 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op queue->umqh = umqh; umq_ub_queue_ctx_list_push(&queue->qctx_node); return (uint64_t)(uintptr_t)queue; -UNINIT_RX_CTX_LIST: - (void)rx_buf_ctx_list_uninit(&queue->rx_buf_ctx_list); DELETE_JETTY: + if ((option->create_flag & UMQ_CREATE_FLAG_UMQ_CTX) != 0) { + dev_ctx->umq_ctx_jetty_table[queue->jetty->jetty_id.id] = 0; + } (void)urma_delete_jetty(queue->jetty); -DELETE_JFR: - (void)urma_delete_jfr(queue->jfr); -DELETE_JFR_JFC: - (void)urma_delete_jfc(queue->jfr_jfc); DELETE_JFS_JFC: (void)urma_delete_jfc(queue->jfs_jfc); DELETE_JFCE: if (queue->mode == UMQ_MODE_INTERRUPT) { (void)urma_delete_jfce(queue->jfs_jfce); - (void)urma_delete_jfce(queue->jfr_jfce); } +DESTROY_JFR_CTX: + umq_ub_jfr_ctx_destroy(queue); UNINIT_FLOW_CONTROL: umq_ub_flow_control_uninit(&queue->flow_control); FREE_QUEUE: @@ -677,21 +669,14 @@ int32_t umq_ub_destroy_impl(uint64_t umqh) umq_buf_free(queue->notify_buf); umq_ub_flow_control_uninit(&queue->flow_control); - rx_buf_ctx_list_uninit(&queue->rx_buf_ctx_list); - UMQ_VLOG_INFO("delete jetty, eid: " EID_FMT ", jetty_id: %u\n", EID_ARGS(queue->jetty->jetty_id.eid), queue->jetty->jetty_id.id); + if ((queue->create_flag & UMQ_CREATE_FLAG_UMQ_CTX) != 0) { + queue->dev_ctx->umq_ctx_jetty_table[queue->jetty->jetty_id.id] = 0; + } if (urma_delete_jetty(queue->jetty) != URMA_SUCCESS) { UMQ_VLOG_ERR("delete jetty failed\n"); } - if (queue->jfr != NULL) { - if (urma_delete_jfr(queue->jfr) != URMA_SUCCESS) { - UMQ_VLOG_ERR("delete jfr failed\n"); - } - } - if (urma_delete_jfc(queue->jfr_jfc) != URMA_SUCCESS) { - UMQ_VLOG_ERR("delete jfr_jfc failed\n"); - } if (urma_delete_jfc(queue->jfs_jfc) != URMA_SUCCESS) { UMQ_VLOG_ERR("delete jfs_jfc failed\n"); } @@ -699,10 +684,8 @@ int32_t umq_ub_destroy_impl(uint64_t umqh) if (urma_delete_jfce(queue->jfs_jfce) != URMA_SUCCESS) { UMQ_VLOG_ERR("delete jfs_jfce failed\n"); } - if (urma_delete_jfce(queue->jfr_jfce) != URMA_SUCCESS) { - UMQ_VLOG_ERR("delete jfr_jfce failed\n"); - } } + umq_ub_jfr_ctx_destroy(queue); umq_ub_queue_ctx_list_remove(&queue->qctx_node); umq_dec_ref(queue->dev_ctx->io_lock_free, &queue->dev_ctx->ref_cnt, 1); free(queue); @@ -722,7 +705,7 @@ void umq_ub_ack_interrupt_impl(uint64_t umqh_tp, uint32_t nevents, umq_interrupt return; } if (option->direction == UMQ_IO_RX) { - urma_ack_jfc(&queue->jfr_jfc, &nevents, 1); + urma_ack_jfc(&queue->jfr_ctx->jfr_jfc, &nevents, 1); } else { urma_ack_jfc(&queue->jfs_jfc, &nevents, 1); } @@ -749,7 +732,7 @@ int umq_ub_wait_interrupt_impl(uint64_t wait_umqh_tp, int time_out, umq_interrup urma_jfc_t *jfc; int cnt = 0; if (option->direction == UMQ_IO_RX) { - cnt = urma_wait_jfc(queue->jfr_jfce, 1, time_out, &jfc); + cnt = urma_wait_jfc(queue->jfr_ctx->jfr_jfce, 1, time_out, &jfc); } else { cnt = urma_wait_jfc(queue->jfs_jfce, 1, time_out, &jfc); } @@ -773,14 +756,14 @@ int umq_ub_interrupt_fd_get_impl(uint64_t umqh_tp, umq_interrupt_option_t *optio return -UMQ_ERR_EINVAL; } ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; - if (queue->jfs_jfce == NULL || queue->jfr_jfce == NULL) { + if (queue->jfs_jfce == NULL || queue->jfr_ctx->jfr_jfce == NULL) { UMQ_VLOG_ERR("get interrupt fd error, jfce is NULL\n"); return -UMQ_ERR_EINVAL; } if (option->direction == UMQ_IO_TX) { return queue->jfs_jfce->fd; } else { - return queue->jfr_jfce->fd; + return queue->jfr_ctx->jfr_jfce->fd; } } @@ -796,7 +779,7 @@ int umq_ub_rearm_impl(uint64_t umqh_tp, bool solicated, umq_interrupt_option_t * UMQ_LIMIT_VLOG_ERR("queue mode is not interrupt\n"); return -UMQ_ERR_EINVAL; } - urma_jfc_t *jfc = option->direction == UMQ_IO_RX ? queue->jfr_jfc : queue->jfs_jfc; + urma_jfc_t *jfc = option->direction == UMQ_IO_RX ? queue->jfr_ctx->jfr_jfc : queue->jfs_jfc; urma_status_t status = urma_rearm_jfc(jfc, solicated); if (status != URMA_SUCCESS) { UMQ_VLOG_ERR("rearm jfc failed\n");