diff --git a/src/urma/lib/urma/bond/bondp_api.h b/src/urma/lib/urma/bond/bondp_api.h index 8f6b22f2a87fbf8e115b4c354c5c4bde622eecba..d52beddbbb601593ea59b6fb81f8b3e51574bb54 100644 --- a/src/urma/lib/urma/bond/bondp_api.h +++ b/src/urma/lib/urma/bond/bondp_api.h @@ -75,4 +75,6 @@ void bondp_ack_async_event(urma_async_event_t *event); /* Other ops */ int bondp_user_ctl(urma_context_t *ctx, urma_user_ctl_in_t *in, urma_user_ctl_out_t *out); + +int bondp_flush_jetty(urma_jetty_t *jetty, int cr_cnt, urma_cr_t *cr); #endif // BONDP_API_H \ No newline at end of file diff --git a/src/urma/lib/urma/bond/bondp_datapath.c b/src/urma/lib/urma/bond/bondp_datapath.c index 0cfcf8112978bd71a27044997070e15b5778c589..bf3939a6f34b1d84be07cd835a12983df85e5da8 100644 --- a/src/urma/lib/urma/bond/bondp_datapath.c +++ b/src/urma/lib/urma/bond/bondp_datapath.c @@ -2228,3 +2228,79 @@ urma_status_t bondp_post_jfr_wr(urma_jfr_t *jfr, urma_jfr_wr_t *wr, urma_jfr_wr_ return bondp_post_recv_wr_and_store(bjetty_ctx, wr, bad_wr); } } + +static int bondp_flush_pjetty(bondp_context_t *bdp_ctx, bondp_comp_t *bdp_jetty, int cr_cnt, + int flush_cnt[], urma_cr_t (*bdp_cr_buf)[URMA_UBAGG_MAX_CR_CNT_PER_DEV]) +{ + int total_flush_cnt = 0; + int remaining_flush = cr_cnt; + uintptr_t last_flush_idx = (uintptr_t)bdp_jetty->comp_ctx; + /* Starting from the next idx of the last obtainable CR from pjetty, prevent pjetty from starving. */ + for (int i = 1; i <= bdp_jetty->dev_num; ++i) { + int dev_id = (last_flush_idx + i) % bdp_jetty->dev_num; + if (remaining_flush <= 0) { + break; + } + if (bdp_jetty->p_jetty[dev_id] == NULL) { + continue; + } + int current_cr_cnt = remaining_flush > URMA_UBAGG_MAX_CR_CNT_PER_DEV ? + URMA_UBAGG_MAX_CR_CNT_PER_DEV : remaining_flush; + flush_cnt[dev_id] = urma_flush_jetty(bdp_jetty->p_jetty[dev_id], current_cr_cnt, bdp_cr_buf[dev_id]); + if (flush_cnt[dev_id] < 0) { + URMA_LOG_ERR("Failed to flush pjetty[%d]: %d\n", dev_id, flush_cnt[dev_id]); + return flush_cnt[dev_id]; + } + if (flush_cnt[dev_id] == 0) { + continue; + } + total_flush_cnt += flush_cnt[dev_id]; + remaining_flush -= flush_cnt[dev_id]; + bdp_jetty->comp_ctx = (void *)(uintptr_t)dev_id; + urma_status_t ret = update_device_valid_state(bdp_ctx, dev_id, flush_cnt[dev_id], bdp_cr_buf[dev_id]); + if (ret) { + URMA_LOG_ERR("Failed to update deivce valid state: %d\n", ret); + return -ret; + } + } + return total_flush_cnt; +} + +int bondp_flush_jetty(urma_jetty_t *jetty, int cr_cnt, urma_cr_t *cr_output_array) +{ + bondp_context_t *bdp_ctx = CONTAINER_OF_FIELD(jetty->urma_ctx, bondp_context_t, v_ctx); + bondp_comp_t *bdp_jetty = CONTAINER_OF_FIELD(jetty, bondp_comp_t, v_jetty); + urma_cr_t bdp_cr_buf[URMA_UBAGG_DEV_MAX_NUM][URMA_UBAGG_MAX_CR_CNT_PER_DEV] = {0}; + int flush_cnt[URMA_UBAGG_DEV_MAX_NUM] = {0}; + + if (!is_valid_bondp_comp(bdp_jetty)) { + return -EINVAL; + } + + /* Get all CR from pjetty and check device status */ + int total_flush_cnt = bondp_flush_pjetty(bdp_ctx, bdp_jetty, cr_cnt, flush_cnt, bdp_cr_buf); + if (total_flush_cnt <= 0) { + return total_flush_cnt; + } + /* Handle each CR */ + int total_cnt = 0; + for (int dev_id = 0; dev_id < bdp_jetty->dev_num; ++dev_id) { + if (bdp_jetty->p_jetty[dev_id] == NULL) { + continue; + } + for (int cr_id = 0; cr_id < flush_cnt[dev_id]; ++cr_id) { + int ret = 0; + if (is_single_dev_mode(&bdp_ctx->v_ctx)) { + ret = bondp_handle_cr_no_store(bdp_ctx, dev_id, &bdp_cr_buf[dev_id][cr_id], + cr_output_array, &total_cnt); + } else { + ret = bondp_handle_cr_with_store(bdp_ctx, total_flush_cnt, cr_cnt, + &bdp_cr_buf[dev_id][cr_id], cr_output_array, &total_cnt); + } + if (ret < 0) { + return total_cnt; + } + } + } + return total_cnt; +} diff --git a/src/urma/lib/urma/bond/bondp_provider_ops.c b/src/urma/lib/urma/bond/bondp_provider_ops.c index bf731580359fe0316a5fc8816a68e4be7fed2fdb..e4544a975673db3978cb7c9d953a88bb54e30e25 100644 --- a/src/urma/lib/urma/bond/bondp_provider_ops.c +++ b/src/urma/lib/urma/bond/bondp_provider_ops.c @@ -62,7 +62,7 @@ static urma_ops_t g_bond_ops = { .create_jetty = bondp_create_jetty, .modify_jetty = bondp_modify_jetty, .query_jetty = NULL, - .flush_jetty = NULL, + .flush_jetty = bondp_flush_jetty, .delete_jetty = bondp_delete_jetty, .import_jetty = bondp_import_jetty, .unimport_jetty = bondp_unimport_jetty,