diff --git a/src/ubsocket/brpc/brpc_context.h b/src/ubsocket/brpc/brpc_context.h index cf65fa5be433bca36607c779fe01a708b23ab521..3e53a1b818312e9e4c62c8340931e291f74f9ca1 100644 --- a/src/ubsocket/brpc/brpc_context.h +++ b/src/ubsocket/brpc/brpc_context.h @@ -17,6 +17,7 @@ #include "brpc_iobuf_adapter.h" #include "brpc_dynsym_scanner.h" #include "statistics.h" +#include "umq_types.h" namespace Brpc { @@ -68,6 +69,11 @@ class Context : public Brpc::ConfigSettings { } } + bool IsBonding() + { + return isBonding; + } + private: void RecordAndSetBrpcAllocator() { @@ -150,7 +156,6 @@ class Context : public Brpc::ConfigSettings { } else if ((dev_info = GetDevNameStr()) != nullptr) { umq_config.trans_info[0].mem_cfg.total_size = GetIOTotalSize(); umq_config.trans_info[0].trans_mode = GetTransMode(); - umq_config.trans_info[0].dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_DEV; ret = sprintf_s(umq_config.trans_info[0].dev_info.dev.dev_name, UMQ_DEV_NAME_SIZE, "%s", dev_info); if (ret < 0 || ret >= UMQ_DEV_NAME_SIZE) { RPC_ADPT_VLOG_ERR("Failed to sprintf_s device name\n"); @@ -158,12 +163,19 @@ class Context : public Brpc::ConfigSettings { SetSocketFdTransMode(SOCKET_FD_TRANS_MODE_TCP); return; } - umq_config.eid_idx = GetEidIdx(); + if (strcmp(umq_config.trans_info[0].dev_info.dev.dev_name, "bonding_dev_0") != 0){ + umq_config.trans_info[0].dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_DEV; + umq_config.trans_info[0].dev_info.dev.eid_idx = GetEidIdx(); + }else{ + umq_config.trans_info[0].dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_EID; + umq_config.trans_info[0].dev_info.eid.eid = GetDevSrcEid(); + isBonding = true; + } } ret = umq_init(&umq_config); if(ret != 0){ - RPC_ADPT_VLOG_ERR("Failed to execute umq init"); + RPC_ADPT_VLOG_ERR("Failed to execute umq init\n"); ResetBrpcAllocator(); SetSocketFdTransMode(SOCKET_FD_TRANS_MODE_TCP); return; @@ -206,6 +218,7 @@ class Context : public Brpc::ConfigSettings { IOBuf::blockmem_deallocate_t *m_dealloc_addr = nullptr; // store scanned address of dealloc function IOBuf::blockmem_allocate_t m_alloc_addr_origin = nullptr; // store original alloc function address IOBuf::blockmem_deallocate_t m_dealloc_addr_origin = nullptr; // store original dealloc function address + bool isBonding = false; }; } diff --git a/src/ubsocket/brpc/brpc_file_descriptor.h b/src/ubsocket/brpc/brpc_file_descriptor.h index 2bd06e8585ff9f425762970467fdd6159b86fc78..fe7c0b8ad85d15484a04fa66f787e0aa16230834 100644 --- a/src/ubsocket/brpc/brpc_file_descriptor.h +++ b/src/ubsocket/brpc/brpc_file_descriptor.h @@ -11,7 +11,9 @@ #define BRPC_FILE_DESCRIPTOR_H #include +#include #include +#include #include "umq_pro_api.h" #include "umq_errno.h" #include "brpc_context.h" @@ -25,6 +27,14 @@ #define DIVIDED_NUMBER (2) #define CACHE_LINE_ALIGNMENT (64) +inline bool operator==(const umq_eid_t& a, const umq_eid_t& b) { + return ::memcmp(a.raw, b.raw, sizeof(a.raw)) == 0; +} + +inline bool operator!=(const umq_eid_t& a, const umq_eid_t& b) { + return !(a==b); +} + namespace Brpc { class FallbackTcpMgr { @@ -122,6 +132,36 @@ protected: bool m_stats_enable = false; }; +struct UmqEidHash { + std::size_t operator()(const umq_eid_t& eid) const noexcept { + uint64_t h = *reinterpret_cast(eid.raw); + uint64_t l = *reinterpret_cast(eid.raw + 8); + return std::hash{}(h) ^ (std::hash{}(l) << 1); + } +}; + +class EidRegistry { +public: + bool RegisterEid(const umq_eid_t& eid) { + std::lock_guard lock(m_mutex); + return m_registered_eids.insert(eid).second; + } + + bool IsRegisteredEid(const umq_eid_t& eid) { + std::lock_guard lock(m_mutex); + return m_registered_eids.count(eid) > 0; + } + + bool UnregisterEid(const umq_eid_t& eid) { + std::lock_guard lock(m_mutex); + return m_registered_eids.erase(eid) > 0; + } + +private: + mutable std::mutex m_mutex; + std::unordered_set m_registered_eids; +}; + class SocketFd : public ::SocketFd, public FallbackTcpMgr, public StatsMgr { public: enum error_code { @@ -224,6 +264,44 @@ public: return fd; } + int ConnectExchangeEid(umq_eid_t *connEid){ + Context *context = Context::GetContext(); + if(!context->IsBonding()){ + return 0; + } + umq_eid_t localEid = context->GetDevSrcEid(); + if (SendSocketData(m_fd, &localEid, sizeof(umq_eid_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_eid_t)) { + RPC_ADPT_VLOG_ERR("Failed to send local eid message in connect, fd: %d\n", m_fd); + return -1; + } + + umq_eid_t remoteEid; + if (RecvSocketData( + m_fd, &remoteEid, sizeof(umq_eid_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_eid_t)) { + RPC_ADPT_VLOG_ERR("Failed to receive remote eid message in connect, fd: %d\n", m_fd); + return -1; + } + + *connEid = context->GetDevSrcEid(); + if (localEid != remoteEid) { + umq_route_t connRoute; + if (GetDevRouteList(&localEid, &remoteEid, &connRoute) != 0) { + RPC_ADPT_VLOG_ERR("Failed to get route list in connect, fd: %d\n", m_fd); + return -1; + } + + if (RecvSocketData( + m_fd, &connRoute, sizeof(umq_route_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_route_t)) { + RPC_ADPT_VLOG_ERR("Failed to receive remote eid message in connect, fd: %d\n", m_fd); + return -1; + } + + *connEid = connRoute.dst; + } + + return 0; + } + int DoConnect(void) { CpMsg local_cp_msg; @@ -236,7 +314,13 @@ public: return -1; } - if (CreateLocalUmq() < 0) { + umq_eid_t connEid; + if (ConnectExchangeEid(&connEid) < 0) { + RPC_ADPT_VLOG_ERR("Failed to exchange eid in connect\n"); + return -1; + } + + if (CreateLocalUmq(&connEid) < 0) { RPC_ADPT_VLOG_ERR("Failed to create umq\n"); return -1; } @@ -807,13 +891,17 @@ private: uint8_t queue_bind_info[UMQ_BIND_INFO_SIZE_MAX]; }; - int CreateLocalUmq(void) + int CreateLocalUmq(umq_eid_t *connEid) { if (m_local_umqh != UMQ_INVALID_HANDLE) { return EEXIST; } Context *context = Context::GetContext(); + if (context->IsBonding() && connEid == nullptr){ + RPC_ADPT_VLOG_ERR("Failed to use eid, because eid is null\n"); + return -1; + } umq_create_option_t queue_cfg; memset_s(&queue_cfg, sizeof(queue_cfg), 0, sizeof(queue_cfg)); queue_cfg.trans_mode = context->GetTransMode(); @@ -846,11 +934,17 @@ private: } } } else if (context -> GetDevNameStr() != nullptr) { - queue_cfg.dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_DEV; if (strcpy_s(queue_cfg.dev_info.dev.dev_name, UMQ_DEV_NAME_SIZE, context->GetDevNameStr()) != EOK) { RPC_ADPT_VLOG_ERR("Failed to strcpy_s device name\n"); return -1; - } + } + if(!context->IsBonding()){ + queue_cfg.dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_DEV; + } else { + // init use bonding dev + queue_cfg.dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_EID; + queue_cfg.dev_info.eid.eid = *connEid; + } } m_local_umqh = umq_create(&queue_cfg); @@ -937,6 +1031,45 @@ private: return rx_total_len; } + int AcceptExchangeEid(int new_fd, umq_eid_t *connEid){ + Context *context = Context::GetContext(); + if(!context->IsBonding()){ + return 0; + } + + umq_eid_t remoteEid; + if (RecvSocketData( + new_fd, &remoteEid, sizeof(umq_eid_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_eid_t)) { + RPC_ADPT_VLOG_ERR("Failed to receive remote eid message in accept, fd: %d\n", new_fd); + return -1; + } + + umq_eid_t localEid = context->GetDevSrcEid(); + if (SendSocketData(new_fd, &localEid, sizeof(umq_eid_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_eid_t)) { + RPC_ADPT_VLOG_ERR("Failed to send local eid message in accept, fd: %d\n", new_fd); + return -1; + } + + *connEid = context->GetDevSrcEid(); + if (localEid != remoteEid) { + umq_route_t connRoute; + if (GetDevRouteList(&localEid, &remoteEid, &connRoute) != 0) { + RPC_ADPT_VLOG_ERR("Failed to get route list in accept, fd: %d\n", new_fd); + return -1; + } + + if (SendSocketData( + new_fd, &connRoute, sizeof(umq_route_t), CONTROL_PLANE_TIMEOUT_MS) != sizeof(umq_route_t)) { + RPC_ADPT_VLOG_ERR("Failed to send connect eid message in accept, fd: %d\n", new_fd); + return -1; + } + + *connEid = connRoute.src; + } + + return 0; + } + int DoAccept(int new_fd) { CpMsg local_cp_msg; @@ -953,7 +1086,14 @@ private: return -1; } - if (socket_fd_obj->CreateLocalUmq() < 0) { + umq_eid_t connEid; + if (AcceptExchangeEid(new_fd, &connEid) < 0) { + RPC_ADPT_VLOG_ERR("Failed to exchange eid in accept\n"); + delete socket_fd_obj; + return -1; + } + + if (socket_fd_obj->CreateLocalUmq(&connEid) < 0) { RPC_ADPT_VLOG_ERR("Failed to create umq\n"); delete socket_fd_obj; return -1; @@ -1418,6 +1558,46 @@ private: return wr_cnt; } + int GetDevRouteList(const umq_eid_t *srcEid, const umq_eid_t *dstEid, umq_route_t *connRoute) + { + umq_route_t route; + route.flag.bs.rtp = 1; + (void)memcpy_s(&route.src, sizeof(umq_eid_t), srcEid, sizeof(umq_eid_t)); + (void)memcpy_s(&route.dst, sizeof(umq_eid_t), dstEid, sizeof(umq_eid_t)); + + umq_route_list_t route_list; + int ret = umq_get_route_list(&route, UMQ_TRANS_MODE_UB, &route_list); + if (ret!=0) { + RPC_ADPT_VLOG_ERR("Failed to get urma topo\n"); + return -1; + } + + if (route_list.len == 0) { + RPC_ADPT_VLOG_ERR("Failed to get urma topo is zero\n"); + return -1; + } + + *connRoute = route_list.buf[0]; + if(mEidRegistry.IsRegisteredEid(*srcEid)) { + return 0; + } + + for(uint32_t i = 0;i< route_list.len; ++i){ + umq_trans_info_t trans_info; + trans_info.trans_mode = UMQ_TRANS_MODE_UB; + trans_info.dev_info.assign_mode = UMQ_DEV_ASSIGN_MODE_EID; + trans_info.dev_info.eid.eid = route_list.buf[i].src; + ret = umq_dev_add(&trans_info); + if(ret != 0){ + RPC_ADPT_VLOG_ERR("Failed to add umq dev\n"); + return -1; + } + } + + mEidRegistry.RegisterEid(*srcEid); + return 0; + } + //common fields uint64_t m_local_umqh = UMQ_INVALID_HANDLE; uint16_t m_tx_window_capacity = 0; // the capacity of TX window size @@ -1426,6 +1606,7 @@ private: bool m_context_stats_enable = false; std::atomic m_closed{false}; int m_event_fd; + EidRegistry mEidRegistry; // TX fields struct alignas(CACHE_LINE_ALIGNMENT) TxDataPlane { diff --git a/src/ubsocket/configure_settings.cpp b/src/ubsocket/configure_settings.cpp index 0dd3b59a244248445990b921634051d5e1414dc5..470aaad7fff56f2dce83a6b4bf9e10b8e88c9a27 100644 --- a/src/ubsocket/configure_settings.cpp +++ b/src/ubsocket/configure_settings.cpp @@ -100,7 +100,15 @@ int ConfigSettings::ParseEnvVars() return -1; } }else if(strlen(m_dev_name_str) > 0){ - RPC_ADPT_VLOG_INFO("%s: %s\n", ENV_VAR_DEV_DEV_NAME, m_dev_name_str); + RPC_ADPT_VLOG_INFO("%s: %s\n", ENV_VAR_DEV_DEV_NAME, m_dev_name_str); + if(strlen(m_src_eid_str) > 0){ + if(inet_pton(AF_INET6, m_src_eid_str, &(m_src_eid)) == 1){ + RPC_ADPT_VLOG_INFO("%s: %s (eid)\n", ENV_VAR_DEV_SRC_EID, m_src_eid_str); + }else { + RPC_ADPT_VLOG_ERR("Eid is invalid. Please double check your input(%s)\n", m_src_eid_str); + return -1; + } + } }else { RPC_ADPT_VLOG_ERR("No device information is provided, such as device IP or device name\n"); return -1; diff --git a/src/ubsocket/configure_settings.h b/src/ubsocket/configure_settings.h index 7e90e2a1d43cd2aac898ce4138a667cbda8ac082..aa82342998e30e9992352f91edcf22a12f2b6779 100644 --- a/src/ubsocket/configure_settings.h +++ b/src/ubsocket/configure_settings.h @@ -39,11 +39,12 @@ #define ENV_VAR_DEV_DEV_NAME "RPC_ADPT_DEV_NAME" #define ENV_VAR_EID_IDX "RPC_ADPT_EID_IDX" #define ENV_VAR_TX_DEPTH "RPC_ADPT_TX_DEPTH" +#define ENV_VAR_DEV_SRC_EID "RPC_ADPT_SRC_EID" #define ENV_VAR_RX_DEPTH "RPC_ADPT_RX_DEPTH" #define ENV_VAR_STATS "RPC_ADPT_STATS" #define ENV_VAR_BLOCK_TYPE "RPC_ADPT_BLOCK_TYPE" // default, large #define ENV_VAR_POOL_INITIAL_SIZE "RPC_ADPT_POOL_INITIAL_SIZE" // MB -#define ENV_LOG_USE_PRINTF "RPC_ADPT_LOG_USE_PRINTF" // default false +#define ENV_LOG_USE_PRINTF "RPC_ADPT_LOG_USE_PRINTF" // default 0, 0 false; 1 true template class EnvStrConverter { @@ -196,6 +197,11 @@ public: return m_block_type; } + umq_eid_t GetDevSrcEid() + { + return m_src_eid; + } + bool GetLogUse() { return m_log_use_printf; @@ -241,6 +247,10 @@ protected: m_eid_idx = input_eid_idx == 0 ? DEFAULT_EID_IDX : input_eid_idx; } + if((env_ptr = getenv(ENV_VAR_DEV_SRC_EID)) != NULL){ + ReadEnvVar(env_ptr,m_src_eid_str,sizeof(m_src_eid_str)); + } + if((env_ptr = getenv(ENV_VAR_TX_DEPTH))!=NULL){ /* atoi return 0 means (1) failed to transfer input to int; (2) user set 0; both of them use default value directly*/ @@ -299,6 +309,8 @@ protected: char m_stats_str[BOOL_STR_LEN_MAX] = ""; char m_block_type_str[BLOCK_TYPE_STR_LEN_MAX] = ""; uint32_t m_eid_idx = DEFAULT_EID_IDX; + char m_src_eid_str[BLOCK_TYPE_STR_LEN_MAX] = ""; + umq_eid_t m_src_eid; uint32_t m_tx_depth = DEFAULT_TX_DEPTH; uint32_t m_rx_depth = DEFAULT_RX_DEPTH; uint64_t m_io_total_size = DEFAULT_IO_TOTAL_SIZE * IO_SIZE_MB;