# AvenirMQ **Repository Path**: onlyyyy/AvenirMQ ## Basic Information - **Project Name**: AvenirMQ - **Description**: 用Node.js实现一个消息中间件 - **Primary Language**: NodeJS - **License**: GPL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 34 - **Forks**: 10 - **Created**: 2021-01-08 - **Last Updated**: 2023-11-21 ## Categories & Tags **Categories**: message-server **Tags**: None ## README # AvenirMQ 一个轻量化的消息中间件 ## 安装 **需要较高版本Node.js,推荐V14,支持跨平台** git clone https://gitee.com/onlyyyy/AvenirMQ.git cd AvenirMQ pm2 start AvenirMQ 或先安装pm2:npm i pm2 -g 并提供Nodejs版操作库:https://www.npmjs.com/package/avenirmq ## 代码结构 - AvenirMQ.js 启动的主进程 - dtest.js 测试用函数 - run.ini 配置文件 - user.json 用户文件 - curl.js cli程序 - /core AvenirMQ核心模块 - /AvenirMQ 提供的Nodejs版操作库 - clien1-4.js 测试用的接收方 ## 技术特点 - 1.使用签名进行消息分发,支持用户管理。 - 2.通过routingkey来实现用户的消息绑定。 - 3.支持连接池,并支持智能的连接淘汰策略。 - 4.使用生产者/消费者模型。 - 5.支持消息重发,死信回收 - 6.灵活的策略配置 - 7.提供cli程序进行管理 ## 技术实现 1. 用户管理 用户信息保存在user.json中 ```js async login(data) { if (!data.name || !data.password) { throw ('INVALID_LOGIN'); } let password = delQuotation(libcu.cipher.AesDecode(data.password)); toLog("password = ", password); if (!this.userList[data.name] || this.userList[data.name].password != password) { throw ('INVALID_LOGIN'); } //其他的就成功了 let sign = getSign(data.name, data.password); toLog("生成签名", sign); return sign; } ``` 解析routingkey ```js //解析绑定时的from.to.key parseKey(keys) { let arr = keys.split('.'); toLog('arr = ', arr); if (arr.length != 3) { throw ("BAD_KEYS"); } return { send: arr[0], to: arr[1], type: arr[2], } } ``` 加解密: 使用libcu.cipher.AesEncode/AesDecode函数进行加解密。 密钥参见libcu库。 用户增删改查暂略。 2. 连接池 ```js async add2ConnectPool(data, sign, client) { this.signPool[sign] = { conn: client, name: data.name, createTime: moment().valueOf(), }; throw ({ code: SUCCESS, data: sign }); } ``` 将连接保存到对象中,下次发送消息的时候会优先选用连接发送,失败的话就会更新连接池 3. 消息发送-死信处理 ```js async AvenirMQSend(msg, type) { //20210110先写个简单版的 不用promise.all发送消息 for (let i = 0; i < msg.length; i++) { try { ······ if(type != 'gc') { for (let j = 0; j < bind.length; j++) { //20210116增加对类型的判断 if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL) && (bind[j].receive === sub.to)) { //说明这是要发送的消息 let info = { ip: bind[j].ip, port: bind[j].port, }; let conn = null; if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) { conn = this.connPoll[bind[j].ip][bind[j].port].conn; } let newSub = JSON.parse(JSON.stringify(sub)); await this.send(text, conn, info, newSub, type, i); } else { toLog("存在无人接收的信息", msg[i]); } } } else { let conn = null; if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) { conn = this.connPoll[gcInfo.ip][gcInfo.port].conn; } await this.send(text, conn, gcInfo, sub, type, i); } ···· } catch (error) { toLog("AvenirMQSend error->",error); } } } ``` 将gc与普通的消息放在一个函数中处理。 ## 接口协议格式 0. 登录获得签名 ```js { type:'login', name:'test', password:'AES', } ``` 返回值 ```js { code:0, message:'success', data:'sign' } ``` 1. 新建用户 ```js //key为send.to.type的结构 表示自己的键值与接收的键值 以及接收的消息类型 //告诉AvenirMQ自己的连接信息 { "type":"addUser", "name":"test", "password":"123456" "key":"a.b.rpc" "ip":"127.0.0.1" "port":13000, } ``` 返回值 ```js { "code":0, "message":"success", } ``` 2. 删除用户 ```js { "type":"deleteUser", "name":"test", } ``` 返回值 ```js { "code":0, "message":"success", } ``` 3. 修改用户信息 ```js //key为send.to.type的结构 send : 接收名称为send的发送方消息,to:发送给谁 type:消息类型 //告诉AvenirMQ自己的连接信息 { "type":"updateUser", "name":"test", "password":"123456" "key":"a.b.rpc" "ip":"127.0.0.1" "port":13000, } ``` 返回值 ```js { "code":0, "message":"success", } ``` 4. 发送消息 ```js //生产者->消费者的概念 { sign:"test", type:"send", data:"hello world" } ``` 返回值 ```js { "code":0, "message":"success", } ``` 5. 修改绑定的键值 ```js { sign:'test', type:'setKey', data: { name:"test", key:"a.b.rpc", } } ``` 返回值 ```js { code:0, message:'success', } ``` 6. 收到消息(无需请求,需要用户方起一个tcp服务器) ```js { code:0, message:'success', sender:'发送方的名字', data:'消息', } ``` 7. 获取用户列表 ```js { type:'userList', } ``` 返回值 ```js { code:0, message:'success', data:[a,b], } ``` ## 配置文件示例 ```ini [main] ip=127.0.0.1 port=52013 [mq] #用户文件路径 userFileName=./user.json #是否输出日志 ifConsoleLog=true #连接超时时间 timeOut=10 #是否为长连接 keepAlive=true #AvenirMQ发消息的超时时间(秒) MQTimeOut=3 #AvenirMQ重发消息的周期(秒) 范围 2-50 MQResend=2 #重试次数 retryTime=5 ``` ## 结语 本项目照着RabbitMQ的思想简单地实现了一个消息中间件,不过没有使用AMQP协议,而只是简单的tcp处理,只能后期再优化了。 不过当这个项目在我脑海中浮现,我就认为我应该通过努力将它编写出来。目前AvenirMQ也达到了能用的程度了,这一路上学到的知识也是久久难忘的。 技术没有高低贵贱之分,脑海中如果有想法的话,我们要做的就是去把它实现。 百舸争流,奋楫者先。 编程之路漫漫修远兮,吾将上下而求索。 谢谢。