# pipe_module **Repository Path**: gavingqf/pipe_module ## Basic Information - **Project Name**: pipe_module - **Description**: pipe with service discovery(通过服务发现进行服务节点之间的互联) - **Primary Language**: C++ - **License**: Not specified - **Default Branch**: master - **Homepage**: https://gitee.com/gavingqf - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2023-04-10 - **Last Updated**: 2025-08-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # summary The pipe module is for servers' connection each other. set the pipe's config(connect or listen mode. please see the pipe config) and the pipe can listen or connect to the remote server automatically. The detail functions of the pipe module are as followings: - listen or connect with local xml file automatically. - reconnect to the peer node automatically if the tcp connection is terminated. - send beat heart message automatically. # pipe config The pipe config xml is as followings: ```xml > ``` # pipe example ```c++ #include #include #include "log.h" #include "time_wheel.h" #include "pipe_module.h" #include "pipe.h" // a certain server node. class CGameServer : public anet::pipe::IPipeMsgHandler { public: CGameServer() : m_pipe(nullptr) {} virtual ~CGameServer() = default; public: virtual void Handle(const char* msg, int len) override { Adebug("message: {}", std::string(msg, len)); } void SetPipe(anet::pipe::IPipe* pipe) { m_pipe = pipe; } void SendMsg(const char* msg, int len) { if (m_pipe != nullptr) { m_pipe->Send(msg, len); } } private: anet::pipe::IPipe* m_pipe; }; // server node manager class. class CServerMgr : public anet::pipe::IPipeReporter { public: CServerMgr() = default; virtual ~CServerMgr() = default; public: virtual void OnReport(bool isConnected, anet::pipe::IPipe* pipe) { auto strId = anet::pipe::getPipeInfo(&pipe->GetPipeInfo()); if (isConnected) { // server connected. Adebug("{}:({}) is connected", strId.c_str(), pipe->GetPipeId()); pipe->SetMsgHandler(&m_server); m_server.SetPipe(pipe); // repeated timer. m_timer.add_repeated_timer([this](void* pData) { m_server.SendMsg("hello", 5); }, 0, 10); } else { // server disconnected. m_timer.kill_timer(0); Adebug("{}:({}) is disconnected", strId.c_str(), pipe->GetPipeId()); m_server.SetPipe(nullptr); } } private: CGameServer m_server; STimeWheelSpace::CTimerRegister m_timer; }; // define sleep ms. #define sleepMS(x) std::this_thread::sleep_for(std::chrono::milliseconds(x)) int main(int argc, char* argv[]) { // init log module anet::log::initLog("./log", "test-net", anet::log::eLogLevel::debug, 1000); // rand seed. srand((unsigned int)(time(0))); // set log level. if (argc >= 2) { anet::log::setLogLevel(anet::log::eLogLevel(atoi(argv[1]))); } else { anet::log::setLogLevel(anet::log::eLogLevel::debug); } // set start flag Adebug("{}", "=== start ==="); // define pipe reporter. CServerMgr* serverMgr = new CServerMgr(); auto ret = anet::pipe::CPipeModule::instance()->Init("./pipe_config.xml", serverMgr); if (!ret) { delete serverMgr; return -1; } // main thread run. bool busy = false; int runCount = 10000; for (;;) { busy = false; // run pipe module. busy = anet::pipe::CPipeModule::instance()->Run(runCount); if (!busy) { sleepMS(1); } } Adebug("{}", "=== end ==="); } ``` # note This project depends on the anet module for c++17 standard. # how to use - User should implement the following callback interfaces: ```c++ // pipe status reporter callback interface. class IPipeReporter { public: virtual ~IPipeReporter() {} // pipe is connected or so callback. // user must call IPipe's SetMsgHandler interface to set its message handler. // isConnected == true denotes connected and else denotes terminate // if it is connected, the pipe is valid, // and if it is terminated, the pipe is invalid now. virtual void OnReport(bool isConnected, IPipe* pipe) = 0; }; // server message handler callback interface. class IPipeMsgHandler { public: virtual ~IPipeMsgHandler() {} // pipe receives message callback. // !!The msg has no header information. virtual void Handle(const char* msg, int len) = 0; }; ``` - Get the pipe module instance to initialize the pipe as the following function: ```c++ anet::pipe::CPipeModule::instance()->Init(xmlPath, IPipeReporter*) ``` - Manager the server's connection/disconnection and its message exchange. that is to deal the IPipeReporter's OnReport() and IPipeMsgHandler's Handle() functions.