diff --git a/src/urpc/examples/umq/umq_example_base.c b/src/urpc/examples/umq/umq_example_base.c index 80815d795a41332d36a5dfdfdd35b8caab91c324..abbf330fce7d194e58bc7291e52ef2d089eae62a 100644 --- a/src/urpc/examples/umq/umq_example_base.c +++ b/src/urpc/examples/umq/umq_example_base.c @@ -123,7 +123,10 @@ int run_umq_example_client(struct urpc_example_config *cfg) goto UNBIND; } - if (example_dequeue_data(umqh, EXAMPLE_SERVER_ENQUEUE_DATA, strlen(EXAMPLE_CLIENT_ENQUEUE_DATA)) != 0) { + // Mimic there is a time consuming job. + usleep(cfg->sleep_ms * 1000); + + if (example_dequeue_data(umqh, EXAMPLE_SERVER_ENQUEUE_DATA, strlen(EXAMPLE_SERVER_ENQUEUE_DATA)) != 0) { goto UNBIND; } ret = 0; diff --git a/src/urpc/examples/umq/umq_example_common.c b/src/urpc/examples/umq/umq_example_common.c index 65ea3fb86250567f75d9580c9da40ac7aebc7832..f0858aa0bced57475269a0deb2bdb3accf8d3efa 100644 --- a/src/urpc/examples/umq/umq_example_common.c +++ b/src/urpc/examples/umq/umq_example_common.c @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -56,6 +57,8 @@ static struct option g_long_options[] = { {"client", no_argument, NULL, 'l'}, {"cna", required_argument, NULL, 'C'}, {"deid", required_argument, NULL, 'D'}, + {"timeout", required_argument, NULL, 'x'}, + {"sleep-ms", required_argument, NULL, 'z'}, {NULL, 0, NULL, 0 } }; @@ -82,12 +85,13 @@ uint64_t init_and_create_umq(struct urpc_example_config *cfg, uint8_t *local_bin umq_create_option_t option = { .trans_mode = init_cfg->trans_info[0].trans_mode, .create_flag = UMQ_CREATE_FLAG_RX_BUF_SIZE | UMQ_CREATE_FLAG_TX_BUF_SIZE | UMQ_CREATE_FLAG_RX_DEPTH | - UMQ_CREATE_FLAG_TX_DEPTH | UMQ_CREATE_FLAG_QUEUE_MODE, + UMQ_CREATE_FLAG_TX_DEPTH | UMQ_CREATE_FLAG_QUEUE_MODE | UMQ_CREATE_FLAG_RNDV_TIMEOUT, .rx_buf_size = EXAMPLE_BUFFER_SIZE, .tx_buf_size = EXAMPLE_BUFFER_SIZE, .rx_depth = EXAMPLE_DEPTH, .tx_depth = EXAMPLE_DEPTH, - .mode = (umq_queue_mode_t)cfg->poll_mode + .rndv_timeout = cfg->timeout, + .mode = (umq_queue_mode_t)cfg->poll_mode, }; if (cfg->instance_mode == SERVER) { if (sprintf(option.name, "%s", "server") <= 0) { @@ -474,10 +478,15 @@ int example_enqueue_data(uint64_t umqh, const char *data, uint32_t data_size) return -1; } + umq_buf_t *it = buf; + for (uint32_t offset = 0; offset < data_size; ) { + (void)memcpy(it->buf_data, data + offset, it->data_size); + + offset += it->data_size; + it = it->qbuf_next; + } buf->io_direction = UMQ_IO_TX; - (void)memcpy(buf->buf_data, data, data_size); - buf->data_size = data_size; - buf->total_data_size = data_size; + umq_buf_t *bad_buf = NULL; if (umq_enqueue(umqh, buf, &bad_buf) != UMQ_SUCCESS) { LOG_PRINT_ERR("enqueue failed\n"); @@ -488,6 +497,28 @@ int example_enqueue_data(uint64_t umqh, const char *data, uint32_t data_size) return 0; } +static bool umq_buf_memcmp(umq_buf_t *buf, const char *data, uint32_t sz) +{ + if (buf == NULL) { + return false; + } + + if (buf->total_data_size != sz) { + return false; + } + + while (buf) { + if (memcmp(buf->buf_data, data, buf->data_size) != 0) { + return false; + } + + data += buf->data_size; + buf = buf->qbuf_next; + } + + return true; +} + int example_dequeue_data(uint64_t umqh, const char *check_data, uint32_t data_size) { umq_buf_t *buf = NULL; @@ -502,7 +533,7 @@ int example_dequeue_data(uint64_t umqh, const char *check_data, uint32_t data_si return -1; } - if (buf->buf_data == NULL || memcmp((char *)buf->buf_data, check_data, data_size) != 0) { + if (buf->buf_data == NULL || !umq_buf_memcmp(buf, check_data, data_size)) { LOG_PRINT_ERR("dequeue data[%s] doesn't match check data[%s]\n", (char *)buf->buf_data, check_data); umq_buf_free(buf); return -1; @@ -631,6 +662,12 @@ int parse_arguments(int argc, char **argv, struct urpc_example_config *cfg) case 'D': cfg->deid = (uint32_t)strtoul(optarg, NULL, 0); break; + case 'x': + cfg->timeout = (uint32_t)strtoul(optarg, NULL, 0); + break; + case 'z': + cfg->sleep_ms = (uint32_t)strtoul(optarg, NULL, 0); + break; default: return -1; } diff --git a/src/urpc/examples/umq/umq_example_common.h b/src/urpc/examples/umq/umq_example_common.h index 288b01e7927f77a869aab8f34aa27e2ca68f32b4..dfd245f3875cb6389e8d3485bbcaff8581bb92a7 100644 --- a/src/urpc/examples/umq/umq_example_common.h +++ b/src/urpc/examples/umq/umq_example_common.h @@ -81,6 +81,8 @@ struct urpc_example_config { uint16_t cna; enum UMQ_TRANS_MODE sub_trans_mode; uint32_t deid; + uint32_t timeout; // Timeout for PULL_MEM_FREE msg. The timer will be timed-out if sleep_ms >= timeout. + uint32_t sleep_ms; // Mimic there is a op between enqueue and dequeue in client }; struct req_cb_arg { diff --git a/src/urpc/include/umq/umq_types.h b/src/urpc/include/umq/umq_types.h index 1794ea51aa3edf24d7c81a84f00fae1eb99e2d59..afdda3dff599b8953548686a4ec6983d0ccaaf52 100644 --- a/src/urpc/include/umq/umq_types.h +++ b/src/urpc/include/umq/umq_types.h @@ -172,6 +172,7 @@ typedef struct umq_init_cfg { #define UMQ_CREATE_FLAG_RX_DEPTH (1 << 2) // enable arg rx_depth when create umq #define UMQ_CREATE_FLAG_TX_DEPTH (1 << 3) // enable arg tx_depth when create umq #define UMQ_CREATE_FLAG_QUEUE_MODE (1 << 4) // enable arg mode when create umq +#define UMQ_CREATE_FLAG_RNDV_TIMEOUT (1 << 5) // enable arg rndv_timeout when create umq typedef struct umq_create_option { /*************Required paramenters start*****************/ @@ -187,6 +188,8 @@ typedef struct umq_create_option { uint32_t rx_depth; uint32_t tx_depth; + uint32_t rndv_timeout; + umq_queue_mode_t mode; // mode of queue, QUEUE_MODE_POLLING for default /*************Optional paramenters end*******************/ } umq_create_option_t; diff --git a/src/urpc/umq/umq_api.c b/src/urpc/umq/umq_api.c index f10b143d362b10ff583ac398048819e8d9422b28..e49c1ad008b8064966829e148ce78c9a07461f6c 100644 --- a/src/urpc/umq/umq_api.c +++ b/src/urpc/umq/umq_api.c @@ -16,6 +16,7 @@ #include "umq_inner.h" #include "umq_qbuf_pool.h" #include "umq_huge_qbuf_pool.h" +#include "umq_timer.h" #include "umq_errno.h" #include "urpc_util.h" @@ -419,15 +420,38 @@ int umq_init(umq_init_cfg_t *cfg) goto FW_UNINIT; } + // timer pool + const uint64_t timer_pool_num_block = UMQ_TIMER_TOTAL_COUNT; + const uint64_t timer_pool_block_size = sizeof(umq_timer_t) + UMQ_TIMER_PAYLOAD_SIZE; + void *timer_pool_buf_addr = memalign(_Alignof(max_align_t), timer_pool_num_block * timer_pool_block_size); + if (timer_pool_buf_addr == NULL) { + UMQ_VLOG_ERR("memory alloc for timer pool failed\n"); + goto QBUF_POOL_UNINIT; + } + + umq_timer_pool_cfg_t timer_pool_cfg = { + .buf_addr = timer_pool_buf_addr, + .num_block = timer_pool_num_block, + .block_size = timer_pool_block_size, + }; + if (umq_timer_pool_init(&timer_pool_cfg) != UMQ_SUCCESS) { + UMQ_VLOG_ERR("timer pool init failed\n"); + free(timer_pool_buf_addr); + goto QBUF_POOL_UNINIT; + } + if (umq_dfx_init(cfg) != UMQ_SUCCESS) { UMQ_VLOG_ERR("umq dfx init failed\n"); - goto POOL_UNINIT; + goto TIMER_POOL_UNINIT; } g_umq_inited = true; return UMQ_SUCCESS; -POOL_UNINIT: +TIMER_POOL_UNINIT: + umq_timer_pool_uninit(); + +QBUF_POOL_UNINIT: umq_qbuf_pool_uninit(); FW_UNINIT: diff --git a/src/urpc/umq/umq_timer.c b/src/urpc/umq/umq_timer.c new file mode 100644 index 0000000000000000000000000000000000000000..500d95421694e5670ed8e92157e1c22d4a06608f --- /dev/null +++ b/src/urpc/umq/umq_timer.c @@ -0,0 +1,398 @@ +/** + * SPDX-License-Identifier: MIT + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * Description: umq timer + * Create: 2025-11-03 + */ + +#include "umq_timer.h" + +#include +#include + +#include "urpc_util.h" +#include "urpc_thread_closure.h" +#include "umq_errno.h" + +static umq_timer_pool_t g_umq_timer_pool_root = { + .inited = false, + + .parent = NULL, + .mutex = PTHREAD_MUTEX_INITIALIZER, + .count = 0, + + // ops + .alloc = umq_timer_pool_alloc_op, + .alloc_batch = umq_timer_pool_alloc_batch_op, + .free = umq_timer_pool_free_op, + .free_batch = umq_timer_pool_free_batch_op, + .destroy = umq_timer_pool_destroy_op, +}; + +static __thread umq_timer_pool_t g_umq_timer_pool_thread_cache = { + .inited = false, + + .parent = &g_umq_timer_pool_root, + .mutex = PTHREAD_MUTEX_INITIALIZER, // The mutex is unused as we are thread-local + .count = 0, + + // ops + .alloc = umq_timer_pool_cache_alloc_op, + .alloc_batch = umq_timer_pool_cache_alloc_batch_op, + .free = umq_timer_pool_cache_free_op, + .free_batch = umq_timer_pool_cache_free_batch_op, + .destroy = umq_timer_pool_cache_destroy_op, +}; + +umq_timer_t *umq_timer_create(umq_timer_pool_t *pool, void (*func)(void *), void *arg, size_t sz, uint64_t timeout) +{ + if (URPC_UNLIKELY(sz > UMQ_TIMER_PAYLOAD_SIZE)) { + return NULL; + } + + umq_timer_t *timer = pool->alloc(pool); + if (URPC_UNLIKELY(timer == NULL)) { + return NULL; + } + + // Users should guarantee sz is not oversize. + // See umq_timer_pool_cfg_t + (void)memcpy(timer + 1, arg, sz); + + urpc_list_init(&timer->node); + timer->pool = pool; + timer->id = (uint64_t)(uintptr_t)timer; + timer->expiration = get_timestamp_ms() + timeout; + timer->func = func; + timer->arg = (void *)(timer + 1); + return timer; +} + +umq_timer_t *umq_timer_create_with_id(umq_timer_pool_t *pool, void (*func)(void *), void *arg, size_t sz, + uint64_t timeout, uint64_t id) +{ + umq_timer_t *timer = umq_timer_create(pool, func, arg, sz, timeout); + if (URPC_UNLIKELY(timer == NULL)) { + return NULL; + } + + timer->id = id; + return timer; +} + +void umq_timer_destroy(umq_timer_t *timer) +{ + if (URPC_LIKELY(timer != NULL)) { + umq_timer_pool_t *pool = timer->pool; + pool->free(pool, timer); + } +} + +bool umq_timer_expired(const umq_timer_t *timer, uint64_t now) +{ + if (URPC_LIKELY(timer != NULL)) { + return now >= timer->expiration; + } + return false; +} + +void umq_timer_list_init(umq_timer_list_t *list) +{ + urpc_list_init(&list->head); +} + +void umq_timer_list_uninit(umq_timer_list_t *list) +{ + urpc_list_init(&list->head); +} + +umq_timer_t *umq_timer_list_top(umq_timer_list_t *list) +{ + umq_timer_t *timer; + URPC_LIST_FIRST_NODE(timer, node, &list->head); + return timer; +} + +void umq_timer_list_pop(umq_timer_list_t *list) +{ + umq_timer_t *timer; + URPC_LIST_FIRST_NODE(timer, node, &list->head); + urpc_list_remove(&timer->node); +} + +void umq_timer_list_add(umq_timer_list_t *list, umq_timer_t *timer) +{ + const uint64_t expiration = timer->expiration; + + umq_timer_t *it; + URPC_LIST_FOR_EACH(it, node, &list->head) { + if (expiration < it->expiration) { + urpc_list_insert_before(&it->node, &timer->node); + return; + } + } + + urpc_list_insert_before(&it->node, &timer->node); +} + +umq_timer_t *umq_timer_list_remove_by_id(umq_timer_list_t *list, uint64_t id) +{ + umq_timer_t *it; + umq_timer_t *next; + URPC_LIST_FOR_EACH_SAFE(it, next, node, &list->head) { + if (it->id == id) { + urpc_list_remove(&it->node); + return it; + } + } + return NULL; +} + +void umq_timer_list_get_expired(umq_timer_list_t *list, umq_timer_list_t *out, uint64_t now) +{ + umq_timer_t *it; + umq_timer_t *next; + URPC_LIST_FOR_EACH_SAFE(it, next, node, &list->head) { + if (!umq_timer_expired(it, now)) { + break; + } + + urpc_list_remove(&it->node); + urpc_list_push_back(&out->head, &it->node); + } +} + +int umq_timer_list_do_expire(umq_timer_list_t *list) +{ + int cnt = 0; + umq_timer_t *it; + umq_timer_t *next; + URPC_LIST_FOR_EACH_SAFE(it, next, node, &list->head) { + ++cnt; + umq_timer_invoke(it); + umq_timer_destroy(it); + } + + return cnt; +} + +int umq_timer_pool_init(const umq_timer_pool_cfg_t *config) +{ + umq_timer_pool_t *root = umq_timer_pool_root(); + + root->inited = true; + root->cfg = *config; + root->parent = NULL; + umq_timer_list_init(&root->freelist); + pthread_mutex_init(&root->mutex, NULL); + root->count = config->num_block; + + for (uint64_t i = 0; i < config->num_block; ++i) { + umq_timer_t *timer = (umq_timer_t *)((char *)config->buf_addr + i * config->block_size); + umq_timer_list_push_back(&root->freelist, timer); + } + + return UMQ_SUCCESS; +} + +int umq_timer_pool_uninit(void) +{ + umq_timer_pool_t *root = umq_timer_pool_root(); + + root->inited = false; + umq_timer_list_uninit(&root->freelist); + pthread_mutex_destroy(&root->mutex); + root->count = 0; + + free(root->cfg.buf_addr); + + return UMQ_SUCCESS; +} + +static void umq_timer_release_thread_cache(uint64_t id) +{ + umq_timer_pool_t *pool = &g_umq_timer_pool_thread_cache; + pool->destroy(pool); +} + +umq_timer_pool_t *umq_timer_pool_thread_cache() +{ + umq_timer_pool_t *pool = &g_umq_timer_pool_thread_cache; + if (URPC_UNLIKELY(!pool->inited)) { + umq_timer_list_init(&pool->freelist); + pool->count = 0; + pool->inited = true; + urpc_thread_closure_register(THREAD_CLOSURE_UMQ_TIMER_POOL, 0, umq_timer_release_thread_cache); + } + return pool; +} + +umq_timer_pool_t *umq_timer_pool_root() +{ + return &g_umq_timer_pool_root; +} + +umq_timer_t *umq_timer_pool_alloc_op(umq_timer_pool_t *pool) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return NULL; + } + + umq_timer_t *timer = NULL; + URPC_WITH_LOCK(pool->mutex) { + if (pool->count > 0) { + timer = umq_timer_list_top(&pool->freelist); + umq_timer_list_pop(&pool->freelist); + --pool->count; + } + } + return timer; +} + +int umq_timer_pool_alloc_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return 0; + } + + size_t cnt = 0; + URPC_WITH_LOCK(pool->mutex) { + cnt = MIN(pool->count, UMQ_TIMER_BATCH_COUNT); + for (size_t i = 0; i < cnt; ++i) { + umq_timer_list_push_back(list, umq_timer_list_top(&pool->freelist)); + } + + pool->count -= cnt; + } + return cnt; +} + +void umq_timer_pool_free_op(umq_timer_pool_t *pool, umq_timer_t *timer) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + URPC_WITH_LOCK(pool->mutex) { + umq_timer_list_push_back(&pool->freelist, timer); + } +} + +void umq_timer_pool_free_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + URPC_WITH_LOCK(pool->mutex) { + urpc_list_splice_tail(&list->head, &pool->freelist.head); + } +} + +void umq_timer_pool_destroy_op(umq_timer_pool_t *pool) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + URPC_WITH_LOCK(pool->mutex) { + umq_timer_pool_t *parent = pool->parent; + if (parent == NULL) { + // The memory owner + free(pool->cfg.buf_addr); + umq_timer_list_init(&pool->freelist); + } else { + parent->free_batch(parent, &pool->freelist); + } + + pool->count = 0; + pool->inited = false; + } +} + +umq_timer_t *umq_timer_pool_cache_alloc_op(umq_timer_pool_t *pool) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return NULL; + } + + // If there is no timer, beg some timers from parent. + if (pool->count == 0) { + umq_timer_pool_t *parent = pool->parent; + if (URPC_UNLIKELY(parent == NULL)) { + return NULL; + } + + umq_timer_list_t list2; + umq_timer_list_init(&list2); + + int n = parent->alloc_batch(parent, &list2); + if (URPC_UNLIKELY(n == 0)) { + return NULL; + } + + urpc_list_splice(&list2.head, &pool->freelist.head); + pool->count += n; + } + + umq_timer_t *timer = umq_timer_list_top(&pool->freelist); + umq_timer_list_pop(&pool->freelist); + --pool->count; + return timer; +} + + +int umq_timer_pool_cache_alloc_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return 0; + } + + umq_timer_t *timer = umq_timer_pool_cache_alloc_op(pool); + if (URPC_UNLIKELY(timer == NULL)) { + return 0; + } + + umq_timer_list_push_back(list, timer); + return 1; +} + +void umq_timer_pool_cache_free_op(umq_timer_pool_t *pool, umq_timer_t *timer) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + umq_timer_list_push_back(&pool->freelist, timer); + ++pool->count; +} + +void umq_timer_pool_cache_free_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + umq_timer_t *it; + umq_timer_t *next; + URPC_LIST_FOR_EACH_SAFE(it, next, node, &list->head) { + umq_timer_list_push_back(&pool->freelist, it); + ++pool->count; + } +} + +void umq_timer_pool_cache_destroy_op(umq_timer_pool_t *pool) +{ + if (URPC_UNLIKELY(!pool->inited)) { + return; + } + + umq_timer_pool_t *parent = pool->parent; + if (URPC_UNLIKELY(parent == NULL)) { + return; + } + + parent->free_batch(parent, &pool->freelist); + pool->count = 0; + pool->inited = false; +} diff --git a/src/urpc/umq/umq_timer.h b/src/urpc/umq/umq_timer.h new file mode 100644 index 0000000000000000000000000000000000000000..29e962e89cccc69ee633bd6878bd4459e02004aa --- /dev/null +++ b/src/urpc/umq/umq_timer.h @@ -0,0 +1,272 @@ +/** + * SPDX-License-Identifier: MIT + * Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + * Description: umq timer + * Create: 2025-11-03 + */ + +#ifndef UMQ_TIMER_H +#define UMQ_TIMER_H + +#include +#include +#include + +#include "urpc_list.h" +#include "urpc_util.h" + +#include + +#define UMQ_TIMER_TLS_MAX_COUNT 2048 +#define UMQ_TIMER_BATCH_COUNT 512 +#define UMQ_TIMER_PAYLOAD_SIZE 32 // The max payload size behind umq_timer +#define UMQ_TIMER_TOTAL_COUNT 1048576 // UMQ has 1M timers at all + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct umq_timer_pool_cfg umq_timer_pool_cfg_t; +typedef struct umq_timer_pool umq_timer_pool_t; + +typedef struct umq_timer { + urpc_list_t node; // The intrusive list node + umq_timer_pool_t *pool; // The timer pool + uint64_t id; // The timer id + uint64_t expiration; // The absolute expiration timestamp + void (*func)(void *); // The callback function + void *arg; // The function arg +} umq_timer_t; + +static_assert(_Alignof(umq_timer_t) == 8, "The payload can be put at the end of the timer"); +static_assert((sizeof(umq_timer_t) + UMQ_TIMER_PAYLOAD_SIZE) % 8 == 0, "The payload can be put at the end of the timer"); + +/** + * @brief Create a timer. @p arg will be deep-copied into timer. + * @param[in] pool The timer pool used to reduce malloc latency during data plane + * @param[in] func The callback + * @param[in] arg The pointer to arg + * @param[in] sz The size of arg + * @param[in] timeout The timeout in milliseconds + * @return umq_timer_t*, or NULL if malloc failed + * @note The id field is filled with the address of @c umq_timer_t itself. + * @seealso umq_timer_create_with_id + */ +umq_timer_t *umq_timer_create(umq_timer_pool_t *pool, void (*func)(void *), void *arg, size_t sz, uint64_t timeout); + +/** + * @brief Create a timer with user specified @p id. @p arg will be deep-copied into timer. + * @param[in] pool The timer pool used to reduce malloc latency during data plane + * @param[in] func The callback + * @param[in] arg The pointer to arg + * @param[in] sz The size of arg + * @param[in] timeout The timeout in milliseconds + * @param[in] id The user given timer id + * @return umq_timer_t*, or NULL if malloc failed + * @note Users should guarantee the @p id is unique. + * @seealso umq_timer_create + */ +umq_timer_t *umq_timer_create_with_id(umq_timer_pool_t *pool, void (*func)(void *), void *arg, size_t sz, + uint64_t timeout, uint64_t id); + +/** + * @brief Destroy the timer which is created by umq_timer_create and umq_timer_create_with_id. + * @param[in] timer The timer will be freed + */ +void umq_timer_destroy(umq_timer_t *timer); + +/** + * @brief Check whether a timer is expired or not + * @param[in] timer The timer + * @return true if the timer is expired, otherwise false + */ +bool umq_timer_expired(const umq_timer_t *timer, uint64_t now); + +/** + * @brief Invoke the timer callback with stored arg. + * @param[in] timer The timer + */ +static inline void umq_timer_invoke(const umq_timer_t *timer) { + if (URPC_LIKELY(timer != NULL)) { + timer->func(timer->arg); + } +} + +typedef struct umq_timer_list { + urpc_list_t head; +} umq_timer_list_t; + +void umq_timer_list_init(umq_timer_list_t *list); +void umq_timer_list_uninit(umq_timer_list_t *list); + +/** + * @brief Get the earliest timer + * @param[in] list The timer list + * @return The timer, or NULL if @p list is empty + */ +umq_timer_t *umq_timer_list_top(umq_timer_list_t *list); + +/** + * @brief Drop the earliest timer + * @param[in] list The timer list + * @pre The @p list is not empty. + * @seealso umq_timer_list_add + * @seealso umq_timer_list_top + */ +void umq_timer_list_pop(umq_timer_list_t *list); + +/** + * @brief Add a new timer to list, all timers are in ascending order by the expiration field. + * @param[in] list The timer list + * @param[in] timer The new timer + */ +void umq_timer_list_add(umq_timer_list_t *list, umq_timer_t *timer); + +/** + * @brief Add a new timer to the end of the list with no ordering count. + * @param[in] list The timer list + * @param[in] timer The new timer + */ +static inline void umq_timer_list_push_back(umq_timer_list_t *list, umq_timer_t *timer) +{ + urpc_list_push_back(&list->head, &timer->node); +} + +/** + * @brief Remove the timer with user given id. + * @param[in] list The timer list + * @param[in] id The user given timer id + * @return The removed timer, or NULL if not found + */ +umq_timer_t *umq_timer_list_remove_by_id(umq_timer_list_t *list, uint64_t id); + +/** + * @brief Get the expired timers into @p out. + * @param[in] list The timer list + * @param[out] out The out timer list + * @param[in] now The current timestamp in milliseconds + * @return The list of expired timers + * @pre The @p out timer list is initialized by umq_timer_list_init. + */ +void umq_timer_list_get_expired(umq_timer_list_t *list, umq_timer_list_t *out, uint64_t now); + +/** + * @brief Do expire the timers. + * @param[in] list The timer list + * @return The count of expired timers + */ +int umq_timer_list_do_expire(umq_timer_list_t *list); + +struct umq_timer_pool_cfg { + void *buf_addr; // The start address of pool + uint64_t num_block; // The number of blocks + uint64_t block_size; // The size of a block +}; + +/** + * @c umq_timer_pool can be chained if the parent is non-null. In this codebase, + * we use two levels of pool: level 1 thread-local timer pool, and level 2 the + * global timer pool. + */ +struct umq_timer_pool { + bool inited; + + umq_timer_pool_cfg_t cfg; // The configuration of timer pool. Only meaningful for memory owner + umq_timer_pool_t *parent; // The parent timer pool. The pool is the memory owner if it's NULL + umq_timer_list_t freelist; // The available timers in this pool + pthread_mutex_t mutex; // To protect freelist when concurrent allocation happens + size_t count; // The count of available timers + + // ops + umq_timer_t *(*alloc)(umq_timer_pool_t *); + int (*alloc_batch)(umq_timer_pool_t *, umq_timer_list_t *); + void (*free)(umq_timer_pool_t *, umq_timer_t *); + void (*free_batch)(umq_timer_pool_t *, umq_timer_list_t *); + void (*destroy)(umq_timer_pool_t *); +}; + +int umq_timer_pool_init(const umq_timer_pool_cfg_t *config); +int umq_timer_pool_uninit(void); + +// ops for non-thread-cache timer pool + +/** + * @brief Allocate one timer from head with thread-safety + * @param[in] pool The timer pool + * @return umq_timer_t*, or NULL if there is no space + */ +umq_timer_t *umq_timer_pool_alloc_op(umq_timer_pool_t *pool); + +/** + * @brief Allocate @c UMQ_TIMER_BATCH_COUNT timers from head with thread-safety + * @param[in] pool The timer pool + * @param[out] list The out list containing allocated timers + * @return The count of allocated timers. It can be less than @c UMQ_TIMER_BATCH_COUNT. + */ +int umq_timer_pool_alloc_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list); + +/** + * @brief Free the @p timer to the pool with thread-safety + * @param[in] pool The timer pool + * @param[in] timer The timer will be freed + */ +void umq_timer_pool_free_op(umq_timer_pool_t *pool, umq_timer_t *timer); + +/** + * @brief Free all timers in @p list to the pool with thread-safety + * @param[in] pool The timer pool + * @param[out] list The timer list will be freed + */ +void umq_timer_pool_free_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list); + +/** + * @brief Release all timers to OS with thread-safety + * @param[in] pool The timer pool + */ +void umq_timer_pool_destroy_op(umq_timer_pool_t *pool); + +// ops for thread-cache timer pool + +/** + * @brief Allocate one timers from head without thread-safety + * @param[in] pool The timer pool + * @return umq_timer_t*, or NULL if there is no space + */ +umq_timer_t *umq_timer_pool_cache_alloc_op(umq_timer_pool_t *pool); + +/** + * @brief Allocate one timers from head without thread-safety + * @param[in] pool The timer pool + * @param[out] list The timer list contains one timer + * @note Even if it's named alloc_batch, it always allocates one timer. + */ +int umq_timer_pool_cache_alloc_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list); + +/** + * @brief Free the @p timer to the pool + * @param[in] pool The timer pool + * @param[in] timer The timer will be recycled + */ +void umq_timer_pool_cache_free_op(umq_timer_pool_t *pool, umq_timer_t *timer); + +/** + * @brief Free all timers in @p list to the pool + * @param[in] pool The timer pool + * @param[in] list The timer list will be recycled + */ +void umq_timer_pool_cache_free_batch_op(umq_timer_pool_t *pool, umq_timer_list_t *list); + +/** + * @brief Free all timers to parent timer pool + * @param[in] pool + */ +void umq_timer_pool_cache_destroy_op(umq_timer_pool_t *pool); + +umq_timer_pool_t *umq_timer_pool_thread_cache(void); +umq_timer_pool_t *umq_timer_pool_root(void); + +#ifdef __cplusplus +} +#endif + +#endif // UMQ_TIMER_H diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.c b/src/urpc/umq/umq_ub/umq_ub_impl.c index 959482d6e5ae7feb0486fe461c1e638ab504cb7e..c6af10006d9dc5e41bfcfdf0831581cdec42b8d4 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.c +++ b/src/urpc/umq/umq_ub/umq_ub_impl.c @@ -10,6 +10,7 @@ #include #include #include +#include #include "perf.h" #include "urpc_util.h" @@ -23,9 +24,12 @@ #include "util_id_generator.h" #include "umq_ub_imm_data.h" #include "umq_ub_impl.h" +#include "umq_timer.h" #define UMQ_DEFAULT_BUF_SIZE 4096 #define UMQ_DEFAULT_DEPTH 1024 +#define DEFAULT_RNDV_TIMEOUT 1000 // 1000ms +#define DEFAULT_RNDV_TIMEOUT_MIN 100 // 100ms #define DEFAULT_PRIORITY 5 #define DEFAULT_RNR_RETRY 6 // Retry 6 times #define DEFAULT_ERR_TIMEOUT 2 @@ -33,7 +37,7 @@ #define UMQ_MAX_SGE_NUM 16 #define UMQ_REV_PULL_DONE 1 #define UMQ_FLUSH_MAX_RETRY_TIMES 10000 -#define UMQ_MAX_ID_NUM (1 << 16) +#define UMQ_MAX_ID_NUM ((1 << 16) - 1) // 1 .. 65535 #define UMQ_CONTINUE_FLAG 1 #define UMQ_MAX_QBUF_NUM 1 #define UMQ_MAX_TSEG_NUM 255 @@ -143,6 +147,7 @@ typedef struct ub_queue { uint32_t tx_depth; uint32_t remote_rx_buf_size; + uint32_t rndv_timeout; // the milliseconds of waiting for REVERSE_PULL_MEM_FREE msg (UB PLUS ONLY) uint8_t priority; // priority of the queue uint8_t max_rx_sge; // max sge number of receive array uint8_t max_tx_sge; // max sge number of send array @@ -153,6 +158,10 @@ typedef struct ub_queue { bool rx_flush_done; // rx buf ctx all report umq_queue_mode_t mode; // mode of queue, QUEUE_MODE_POLLING for default umq_state_t state; + umq_timer_list_t tx_timer_list; + pthread_mutex_t tx_timer_list_mutex; + umq_timer_list_t rx_timer_list; + pthread_mutex_t rx_timer_list_mutex; rx_buf_ctx_list_t rx_buf_ctx_list; umq_buf_t *notify_buf; // qbuf for manage message exchange, such as mem import/initial flow control window uint64_t umqh; @@ -260,6 +269,32 @@ typedef struct ub_bind_ctx { uint64_t remote_notify_addr; } ub_bind_ctx_t; +/** + * Server needs a timer with umq_ub_on_pull_mem_done_timeout_ctx_t attached to + * handle cases, e.g. the client is down. + */ +typedef struct umq_ub_on_pull_mem_done_timeout_ctx { + uint16_t msg_id; // Client sends a REVERSE_PULL_MEM meta message containing this msg_id + ub_queue_t *queue; // The ub queue object + umq_buf_t *dst_buf; // The target bufs +} umq_ub_on_pull_mem_done_timeout_ctx_t; + +/** + * Client needs a timer with umq_ub_on_pull_mem_free_timeout_ctx_t attached to + * handle cases, e.g. the server is down. + */ +typedef struct umq_ub_on_pull_mem_free_timeout_ctx { + uint16_t msg_id; // To find the relevant BUFs + ub_queue_t *queue; // The ub queue object +} umq_ub_on_pull_mem_free_timeout_ctx_t; + +// If you add a new timer context, please add it below. +typedef union umq_ub_timer_ctx { + umq_ub_on_pull_mem_done_timeout_ctx_t on_pull_mem_done_timeout_ctx; + umq_ub_on_pull_mem_free_timeout_ctx_t on_pull_mem_free_timeout_ctx; +} umq_ub_timer_ctx_t; +static_assert(sizeof(umq_ub_timer_ctx_t) <= UMQ_TIMER_PAYLOAD_SIZE, "The timer context is too large"); + static umq_ub_ctx_t *g_ub_ctx = NULL; static uint32_t g_ub_ctx_count = 0; @@ -1630,6 +1665,10 @@ static int check_and_set_param(umq_ub_ctx_t *dev_ctx, umq_create_option_t *optio dev_ctx->dev_attr.dev_cap.max_jfr_sge : UMQ_MAX_SGE_NUM; queue->max_tx_sge = dev_ctx->dev_attr.dev_cap.max_jfs_sge < UMQ_MAX_SGE_NUM ? dev_ctx->dev_attr.dev_cap.max_jfs_sge : UMQ_MAX_SGE_NUM; + queue->rndv_timeout = + (option->create_flag & UMQ_CREATE_FLAG_RNDV_TIMEOUT) + ? MAX(option->rndv_timeout, DEFAULT_RNDV_TIMEOUT_MIN) + : DEFAULT_RNDV_TIMEOUT; queue->priority = DEFAULT_PRIORITY; queue->err_timeout = DEFAULT_ERR_TIMEOUT; queue->rnr_retry = DEFAULT_RNR_RETRY; @@ -1738,9 +1777,14 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op goto DELETE_JFR; } + (void)pthread_mutex_init(&queue->tx_timer_list_mutex, NULL); + (void)pthread_mutex_init(&queue->rx_timer_list_mutex, NULL); + umq_timer_list_init(&queue->tx_timer_list); + umq_timer_list_init(&queue->rx_timer_list); + if (rx_buf_ctx_list_init(queue) != UMQ_SUCCESS) { UMQ_VLOG_ERR("rx buf ctx list init failed\n"); - goto DELETE_JETTY; + goto UNINIT_TIMER_LIST; } queue->notify_buf = umq_buf_alloc(1, 1, UMQ_INVALID_HANDLE, NULL); @@ -1763,7 +1807,11 @@ uint64_t umq_ub_create_impl(uint64_t umqh, uint8_t *ctx, umq_create_option_t *op return (uint64_t)(uintptr_t)queue; UNINIT_RX_CTX_LIST: (void)rx_buf_ctx_list_uninit(&queue->rx_buf_ctx_list); -DELETE_JETTY: +UNINIT_TIMER_LIST: + (void)umq_timer_list_uninit(&queue->tx_timer_list); + (void)umq_timer_list_uninit(&queue->rx_timer_list); + (void)pthread_mutex_destroy(&queue->tx_timer_list_mutex); + (void)pthread_mutex_destroy(&queue->rx_timer_list_mutex); (void)urma_delete_jetty(queue->jetty); DELETE_JFR: (void)urma_delete_jfr(queue->jfr); @@ -1807,6 +1855,11 @@ int32_t umq_ub_destroy_impl(uint64_t umqh) umq_ub_flow_control_uninit(&queue->flow_control); rx_buf_ctx_list_uninit(&queue->rx_buf_ctx_list); + pthread_mutex_destroy(&queue->tx_timer_list_mutex); + pthread_mutex_destroy(&queue->rx_timer_list_mutex); + umq_timer_list_uninit(&queue->tx_timer_list); + umq_timer_list_uninit(&queue->rx_timer_list); + if (urma_delete_jetty(queue->jetty) != URMA_SUCCESS) { UMQ_VLOG_ERR("delete jetty failed\n"); } @@ -1893,6 +1946,47 @@ void ubmm_fill_big_data_ref_sge(uint64_t umqh_tp, ub_ref_sge_t *ref_sge, fill_big_data_ref_sge(queue, ref_sge, buffer, import_mempool_info, umq_imm_head); } +static void on_pull_mem_free_timeout(void *arg) +{ + umq_ub_on_pull_mem_free_timeout_ctx_t *ctx = (umq_ub_on_pull_mem_free_timeout_ctx_t *)arg; + + const uint16_t msg_id = ctx->msg_id; + if (URPC_LIKELY(msg_id != 0)) { + ub_queue_t *queue = ctx->queue; + umq_buf_t *buffer = (umq_buf_t *)queue->addr_list[msg_id]; + int32_t rest_data_size = (int32_t)buffer->total_data_size; + for (umq_buf_t *current = buffer; current && rest_data_size > 0; current = current->qbuf_next) { + rest_data_size -= (int32_t)current->data_size; + // truncate qbuf list for many packages connected, only release one + // package, can't truncate qbuf list when send, because all qbufs of 128 + // wr are connected, and the address of the first qbuf is placed in the + // user_ctx of the 128th wr, then released + if (rest_data_size <= 0) { + current->qbuf_next = NULL; + break; + } + } + umq_buf_free(buffer); + util_id_allocator_release(&g_umq_ub_id_allocator, msg_id); + } +} + +static int umq_ub_read_done(ub_queue_t *queue, uint16_t msg_id); + +static void on_pull_mem_done_timeout(void *arg) +{ + umq_ub_on_pull_mem_done_timeout_ctx_t *ctx = (umq_ub_on_pull_mem_done_timeout_ctx_t *)arg; + + const uint16_t msg_id = ctx->msg_id; + if (URPC_LIKELY(msg_id != 0)) { + ub_queue_t *queue = ctx->queue; + if (URPC_UNLIKELY(umq_ub_read_done(queue, msg_id) != UMQ_SUCCESS)) { + UMQ_LIMIT_VLOG_ERR("umq ub send imm failed\n"); + } + umq_buf_free(ctx->dst_buf); + } +} + static int umq_ub_send_big_data(ub_queue_t *queue, umq_buf_t **buffer) { // apply for one to avoid memory leak @@ -1909,8 +2003,21 @@ static int umq_ub_send_big_data(ub_queue_t *queue, umq_buf_t **buffer) uint16_t msg_id = util_id_allocator_get(&g_umq_ub_id_allocator); queue->addr_list[msg_id] = (uint64_t)(uintptr_t)(*buffer); + // Client: Create the timer for receiving REVERSE_PULL_MEM_FREE msg. + umq_ub_on_pull_mem_free_timeout_ctx_t ctx = { + .msg_id = msg_id, + .queue = queue, + }; + umq_timer_t *timer = + umq_timer_create_with_id(umq_timer_pool_thread_cache(), on_pull_mem_free_timeout, &ctx, sizeof(ctx), + queue->rndv_timeout, msg_id); + if (URPC_UNLIKELY(timer == NULL)) { + UMQ_LIMIT_VLOG_ERR("umq malloc for timer failed\n"); + return -UMQ_ERR_ENOMEM; + } + umq_imm_head_t *umq_imm_head = (umq_imm_head_t *)(uintptr_t)send_buf->buf_data; - ub_fill_umq_imm_head(umq_imm_head, *buffer); + ub_fill_umq_imm_head(umq_imm_head, *buffer, queue->rndv_timeout); ub_ref_sge_t *ref_sge = (ub_ref_sge_t *)(uintptr_t)(umq_imm_head + 1); ub_import_mempool_info_t import_mempool_info[UMQ_MAX_TSEG_NUM]; @@ -1962,13 +2069,20 @@ static int umq_ub_send_big_data(ub_queue_t *queue, umq_buf_t **buffer) .msg_num = (uint16_t)buf_index}}; int ret = umq_ub_send_imm(queue, imm.value, &sge, user_ctx); if (ret != UMQ_SUCCESS) { + umq_timer_destroy(timer); umq_buf_free(send_buf); UMQ_LIMIT_VLOG_ERR("umq_ub_send_imm failed\n"); return ret; } + + URPC_WITH_LOCK(queue->rx_timer_list_mutex) { + umq_timer_list_add(&queue->rx_timer_list, timer); + } + return UMQ_SUCCESS; FREE_BUF: + umq_timer_destroy(timer); umq_buf_free(send_buf); return UMQ_FAIL; } @@ -2390,6 +2504,21 @@ int umq_ub_read(uint64_t umqh_tp, umq_buf_t *rx_buf, umq_ub_imm_t imm) return -UMQ_ERR_ENOMEM; } + // Server: Create the timer for arrival of all READ CQEs + const uint32_t rtt_ms = DEFAULT_RNDV_TIMEOUT_MIN; + umq_ub_on_pull_mem_done_timeout_ctx_t ctx = { + .msg_id = msg_id, + .queue = queue, + .dst_buf = ((user_ctx_t *)(ctx_buf->buf_data))->dst_buf, + }; + umq_timer_t *timer = + umq_timer_create_with_id(umq_timer_pool_thread_cache(), on_pull_mem_done_timeout, &ctx, sizeof(ctx), + umq_imm_head->rndv_timeout - rtt_ms, msg_id); + if (URPC_UNLIKELY(timer == NULL)) { + UMQ_LIMIT_VLOG_ERR("create pull mem done timer failed\n"); + return -UMQ_ERR_ENOMEM; + } + urma_target_seg_t **tseg_list = queue->dev_ctx->tseg_list; user_ctx_t *user_ctx = (user_ctx_t *)ctx_buf->buf_data; umq_buf_t *dst_buf = user_ctx->dst_buf; @@ -2429,11 +2558,17 @@ int umq_ub_read(uint64_t umqh_tp, umq_buf_t *rx_buf, umq_ub_imm_t imm) } } } + + URPC_WITH_LOCK(queue->tx_timer_list_mutex) { + umq_timer_list_add(&queue->tx_timer_list, timer); + } + dst_buf->total_data_size = total_data_size; umq_inc_ref(queue->dev_ctx->io_lock_free, &queue->tx_outstanding, buf_num); return UMQ_SUCCESS; FREE_CTX_BUF: + umq_timer_destroy(timer); umq_ub_read_ctx_destory(ctx_buf); return UMQ_FAIL; } @@ -2558,27 +2693,22 @@ static int process_send_imm(umq_buf_t *rx_buf, umq_ub_imm_t imm, uint64_t umqh) umq_buf_free(rx_buf); // release rx ret = UMQ_CONTINUE_FLAG; } else if (imm.ub_plus.sub_type == IMM_TYPE_REVERSE_PULL_MEM_FREE) { - uint16_t msg_id = (uint16_t)(imm.ub_plus.msg_id); + const uint16_t msg_id = (uint16_t)(imm.ub_plus.msg_id); if (msg_id != 0) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh; - umq_buf_t *buffer = (umq_buf_t *)queue->addr_list[msg_id]; - int32_t rest_data_size = (int32_t)buffer->total_data_size; - umq_buf_t *tmp_buf = buffer; - while (tmp_buf && rest_data_size > 0) { - rest_data_size -= (int32_t)tmp_buf->data_size; - /* - * truncate qbuf list for many packages connected, only release one package, - * can't truncate qbuf list when send, because all qbufs of 128 wr are connected, - * and the address of the first qbuf is placed in the user_ctx of the 128th wr, then released - */ - if (rest_data_size <= 0) { - tmp_buf->qbuf_next = NULL; - break; - } - tmp_buf = tmp_buf->qbuf_next; + + umq_timer_t *timer = NULL; + URPC_WITH_LOCK(queue->rx_timer_list_mutex) { + timer = umq_timer_list_remove_by_id(&queue->rx_timer_list, msg_id); + } + + if (timer != NULL) { + // See on_pull_mem_free_timeout + umq_timer_invoke(timer); + umq_timer_destroy(timer); + } else { + UMQ_LIMIT_VLOG_WARN("server read done, but the timer has already been timed-out with msg_id %d\n", msg_id); } - umq_buf_free(buffer); // release tx mem - util_id_allocator_release(&g_umq_ub_id_allocator, msg_id); } umq_buf_free(rx_buf); // release rx ret = UMQ_CONTINUE_FLAG; @@ -2941,6 +3071,18 @@ static void umq_ub_enqueue_with_poll_tx(ub_queue_t *queue, umq_buf_t **buf) static void umq_ub_enqueue_plus_with_poll_tx(ub_queue_t *queue, umq_buf_t **buf) { + // If there are timers expired... + umq_timer_list_t expired; + umq_timer_list_init(&expired); + URPC_WITH_LOCK(queue->tx_timer_list_mutex) { + umq_timer_list_get_expired(&queue->tx_timer_list, &expired, get_timestamp_ms()); + } + + const int cnt = umq_timer_list_do_expire(&expired); + if (cnt > 0) { + UMQ_LIMIT_VLOG_INFO("%d timers are expired.", cnt); + } + urma_cr_t cr[UMQ_POST_POLL_BATCH]; int tx_cr_cnt = urma_poll_jfc(queue->jfs_jfc, UMQ_POST_POLL_BATCH, cr); if (tx_cr_cnt < 0) { @@ -3435,6 +3577,19 @@ static inline int process_imm_msg(uint64_t umqh_tp, umq_buf_t *buf, urma_cr_t *c static int umq_ub_dequeue_plus_with_poll_rx(uint64_t umqh_tp, urma_cr_t *cr, umq_buf_t **buf) { ub_queue_t *queue = (ub_queue_t *)(uintptr_t)umqh_tp; + + // Check for timer expiration. + umq_timer_list_t expired; + umq_timer_list_init(&expired); + URPC_WITH_LOCK(queue->rx_timer_list_mutex) { + umq_timer_list_get_expired(&queue->rx_timer_list, &expired, get_timestamp_ms()); + } + + const int cnt = umq_timer_list_do_expire(&expired); + if (cnt > 0) { + UMQ_LIMIT_VLOG_INFO("%d timers are expired.\n", cnt); + } + int qbuf_cnt = 0; int rx_cr_cnt = urma_poll_jfc(queue->jfr_jfc, UMQ_POST_POLL_BATCH, cr); if (rx_cr_cnt < 0) { @@ -3479,6 +3634,18 @@ static void umq_ub_rev_pull_tx_cqe( user_ctx_t *user_ctx = (user_ctx_t *)cur_tx_buf->buf_data; user_ctx->wr_cnt++; if (user_ctx->wr_cnt == user_ctx->wr_total) { + umq_timer_t *timer = NULL; + URPC_WITH_LOCK(queue->tx_timer_list_mutex) { + timer = umq_timer_list_remove_by_id(&queue->tx_timer_list, user_ctx->msg_id); + } + + if (timer == NULL) { + UMQ_LIMIT_VLOG_WARN("The timer with msg_id %u has already been timed-out\n", user_ctx->msg_id); + return; + } + + umq_timer_destroy(timer); + if (umq_ub_read_done(queue, user_ctx->msg_id) != UMQ_SUCCESS) { UMQ_LIMIT_VLOG_ERR("umq ub send imm failed\n"); } @@ -3517,6 +3684,18 @@ static void umq_ub_non_rev_pull_tx_cqe(ub_queue_t *queue, umq_buf_t *cur_tx_buf, static int umq_ub_dequeue_plus_with_poll_tx(ub_queue_t *queue, urma_cr_t *cr, umq_buf_t **buf, int return_rx_cnt) { + // Check for timer expiration + umq_timer_list_t expired; + umq_timer_list_init(&expired); + URPC_WITH_LOCK(queue->tx_timer_list_mutex) { + umq_timer_list_get_expired(&queue->tx_timer_list, &expired, get_timestamp_ms()); + } + + const int cnt = umq_timer_list_do_expire(&expired); + if (cnt > 0) { + UMQ_LIMIT_VLOG_INFO("%d timers are expired.\n", cnt); + } + umq_buf_t *tx_buf[UMQ_POST_POLL_BATCH]; int tx_cr_cnt = urma_poll_jfc(queue->jfs_jfc, UMQ_POST_POLL_BATCH, cr); if (tx_cr_cnt < 0) { @@ -3870,4 +4049,4 @@ void umq_ub_async_event_ack(umq_async_event_t *event) urma_ack_async_event(urma_event); free(urma_event); event->priv = NULL; -} \ No newline at end of file +} diff --git a/src/urpc/umq/umq_ub/umq_ub_impl.h b/src/urpc/umq/umq_ub/umq_ub_impl.h index 2604968a9b8fb54f727914e3fdaadf1713f36f14..9eda4d9cfa86e725cbffaa10e52f81ea48e9c094 100644 --- a/src/urpc/umq/umq_ub/umq_ub_impl.h +++ b/src/urpc/umq/umq_ub/umq_ub_impl.h @@ -38,6 +38,7 @@ typedef struct umq_imm_head { uint32_t mem_interval : 2; uint32_t recv : 6; uint32_t mempool_num : 8; + uint32_t rndv_timeout; } umq_imm_head_t; typedef struct ub_ref_sge { @@ -120,12 +121,13 @@ static inline uint32_t get_mem_interval(uint32_t used_mem_size) return UMQ_SIZE_MID_BIG_INTERVAL; } -static inline void ub_fill_umq_imm_head(umq_imm_head_t *umq_imm_head, umq_buf_t *buffer) +static inline void ub_fill_umq_imm_head(umq_imm_head_t *umq_imm_head, umq_buf_t *buffer, uint32_t timeout) { umq_imm_head->version = UMQ_IMM_VERSION; umq_imm_head->type = IMM_PROTOCAL_TYPE_NONE; umq_imm_head->mempool_num = 0; umq_imm_head->mem_interval = get_mem_interval(buffer->data_size); + umq_imm_head->rndv_timeout = timeout; } void ubmm_fill_big_data_ref_sge(uint64_t umqh_tp, ub_ref_sge_t *ref_sge, diff --git a/src/urpc/umq/util_id_generator.c b/src/urpc/umq/util_id_generator.c index 44ee0c257b32716bea0ea6fbb379d9efd7d61527..19bdf9cdc5508ab7e0695d4cbfcaaf14a118cb10 100644 --- a/src/urpc/umq/util_id_generator.c +++ b/src/urpc/umq/util_id_generator.c @@ -12,7 +12,7 @@ int util_id_allocator_init(util_id_allocator_t *id_allocator, uint32_t max_num, uint32_t start_id) { - if (id_allocator->available_ids != NULL) { + if (id_allocator->available_ids != NULL || id_allocator->recycled_ids != NULL) { return -1; } @@ -21,8 +21,17 @@ int util_id_allocator_init(util_id_allocator_t *id_allocator, uint32_t max_num, return -1; } - id_allocator->num_available = 0; - id_allocator->next_id = start_id; + id_allocator->recycled_ids = (uint32_t *)calloc(max_num, sizeof(uint32_t)); + if (id_allocator->recycled_ids == NULL) { + return -1; + } + + for (uint32_t i = 0; i < max_num; ++i) { + id_allocator->available_ids[i] = start_id++; + } + + id_allocator->num_available = max_num; + id_allocator->num_recycled = 0; id_allocator->max_num = max_num; (void)pthread_mutex_init(&id_allocator->lock, NULL); @@ -40,14 +49,15 @@ uint32_t util_id_allocator_get(util_id_allocator_t *id_allocator) { uint32_t id; (void)pthread_mutex_lock(&id_allocator->lock); - if (id_allocator->num_available > 0) { - id = id_allocator->available_ids[--id_allocator->num_available]; - } else { - id = id_allocator->next_id++; - if (id >= id_allocator->max_num) { - id_allocator->next_id--; - } + if (id_allocator->num_available == 0) { + uint32_t *tmp = id_allocator->available_ids; + id_allocator->available_ids = id_allocator->recycled_ids; + id_allocator->recycled_ids = tmp; + + id_allocator->num_available = id_allocator->num_recycled; + id_allocator->num_recycled = 0; } + id = id_allocator->available_ids[--id_allocator->num_available]; (void)pthread_mutex_unlock(&id_allocator->lock); return id; @@ -55,13 +65,11 @@ uint32_t util_id_allocator_get(util_id_allocator_t *id_allocator) void util_id_allocator_release(util_id_allocator_t *id_allocator, uint32_t util_id) { - if (id_allocator->available_ids == NULL) { + if (id_allocator->recycled_ids == NULL) { return; } (void)pthread_mutex_lock(&id_allocator->lock); - if (id_allocator->num_available < id_allocator->max_num) { - id_allocator->available_ids[id_allocator->num_available++] = util_id; - } + id_allocator->recycled_ids[id_allocator->num_recycled++] = util_id; (void)pthread_mutex_unlock(&id_allocator->lock); return; } diff --git a/src/urpc/umq/util_id_generator.h b/src/urpc/umq/util_id_generator.h index c535e4c792cde04a72addd467c2b7c8206f1fd1e..85dfcffd2c1bc562b56f44c8578f6dd62251f438 100644 --- a/src/urpc/umq/util_id_generator.h +++ b/src/urpc/umq/util_id_generator.h @@ -11,7 +11,8 @@ typedef struct util_id_allocator { uint32_t *available_ids; uint32_t num_available; - uint32_t next_id; + uint32_t *recycled_ids; + uint32_t num_recycled; uint32_t max_num; pthread_mutex_t lock; } util_id_allocator_t; diff --git a/src/urpc/util/urpc_list.h b/src/urpc/util/urpc_list.h index f9c1552429149e1f67df40014ad7b44712306d8f..1c32e2065b3f13dbbee2ec3c56c9611b28c94a18 100644 --- a/src/urpc/util/urpc_list.h +++ b/src/urpc/util/urpc_list.h @@ -86,6 +86,32 @@ static inline void urpc_list_remove(struct urpc_list *node) node->prev = (struct urpc_list *)LIST_POSION_PREV; } +static inline void urpc_list_splice_impl(struct urpc_list *list, struct urpc_list *prev, struct urpc_list *next) +{ + struct urpc_list *first = list->next; + struct urpc_list *last = list->prev; + + first->prev = prev; + prev->next = first; + + last->next = next; + next->prev = last; +} + +static inline void urpc_list_splice(struct urpc_list *list, struct urpc_list *head) +{ + if (!urpc_list_is_empty(list)) { + urpc_list_splice_impl(list, head, head->next); + } +} + +static inline void urpc_list_splice_tail(struct urpc_list *list, struct urpc_list *head) +{ + if (!urpc_list_is_empty(list)) { + urpc_list_splice_impl(list, head->prev, head); + } +} + static inline struct urpc_list *urpc_list_pop_front(struct urpc_list *list) { struct urpc_list *node = list->next; @@ -132,4 +158,4 @@ static inline size_t urpc_list_size(const struct urpc_list *list) } #endif -#endif \ No newline at end of file +#endif diff --git a/src/urpc/util/urpc_thread_closure.h b/src/urpc/util/urpc_thread_closure.h index 3c4540f2163e4a211a574a5132c83c567f292d99..0b6c835b8e728010d3c533f01cdf4f91e6986fe7 100644 --- a/src/urpc/util/urpc_thread_closure.h +++ b/src/urpc/util/urpc_thread_closure.h @@ -18,6 +18,7 @@ typedef enum urpc_thread_closure_type { THREAD_CLOSURE_PERF, THREAD_CLOSURE_QBUF, THREAD_CLOSURE_POOL, + THREAD_CLOSURE_UMQ_TIMER_POOL, THREAD_CLOSURE_MAX, } urpc_thread_closure_type_t; diff --git a/src/urpc/util/urpc_util.h b/src/urpc/util/urpc_util.h index 3799078069ab2541f45644e6f1e0b51cc01105e9..65c4276d1a3909449764410b41860a105ed4a829 100644 --- a/src/urpc/util/urpc_util.h +++ b/src/urpc/util/urpc_util.h @@ -77,6 +77,12 @@ extern "C" { #define URPC_RUNNING (1) #define URPC_INVALID_TASK_ID (-1) +#define URPC_WITH_LOCK(mtx) \ + for (int b = (pthread_mutex_lock(&(mtx)), 1); b; b = 0, pthread_mutex_unlock(&(mtx))) \ + +#define URPC_WITH_TRYLOCK(mtx) \ + for (int b = pthread_mutex_trylock(&(mtx)) == 0; b; b = 0, pthread_mutex_unlock(&(mtx))) \ + enum constructor_priority { CONSTRUCTOR_PRIORITY_GLOBAL = 101, CONSTRUCTOR_PRIORITY_DRIVER,