# worker **Repository Path**: gomods/worker ## Basic Information - **Project Name**: worker - **Description**: No description available - **Primary Language**: Go - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-09-26 - **Last Updated**: 2025-12-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Worker 消费者管理器,支持多种消息中间件:kafka, rabbitmq, redis... ## 安装 ```shell script go get gitee.com/gomods/worker ``` ## 配置 ```json { "enabled": { "kafka": false, "rabbitmq": false, "redis": true, "delay": true }, "kafka": [ { "consumerGroup": "demo_consumer", "consumerCount": 1, "host": [ "10.11.11.1:9092", "10.11.11.2:9092", "10.11.11.3:9092" ], "sasl": { "enabled": false, "user": "", "password": "" }, "topic": "udc_message", "failTopic": "" }, { "consumerGroup": "demo_consumer", "consumerCount": 1, "host": [ "10.11.11.1:9092", "10.11.11.2:9092", "10.11.11.3:9092" ], "sasl": { "enabled": false, "user": "", "password": "" }, "topic": "udc_public", "failTopic": "" } ], "redis": [ { "consumerCount": 1, "addr": "127.0.0.1:6379", "password": "", "db": 0, "queue": "message_list", "failQueue": "", "tplName": "echo" } ], "delay": [ { "consumerCount": 1, "addr": "127.0.0.1:6379", "password": "", "db": 0, "queue": "message_delay", "failQueue": "", "tplName": "echo" } ] } ``` - enabled控制消息队列的入口 - kafka/redis/delay,是一个数组,可以支持多个不同topic消费 - consumerGroup:消费组 - consumerCount:消费者数量 - host:broker地址 - sasl:kafka安全认证 - topic:消费的topic - failTopic:失败消息队列 - tplName:消费逻辑name,若为空,默认为topic名称 - tplMode:两种模式 > 1:消费topic; 2:消费某一个topic下某个tag(默认消息体按空格切分,第一个元素为tag) ## 初始化 ```go import ( "gitee.com/gomods/config" "gitee.com/gomods/logger" "gitee.com/gomods/worker" "gitee.com/gomods/worker/bootstrap" ) // init logger appCfg := config.GetConfStringMap("App") s := worker.NewWorker() s.AddBeforeServerStartFunc(bootstrap.InitLogger(appCfg["env"], appCfg["name"], "udc", version.TAG), s.RegisterHandle(app.HandleMap()), s.InitConsumer()) s.AddAfterServerStopFunc(s.CloseConsumer()) err := s.Serve() if err != nil { log.Printf("worker stop err:%v", err) } else { log.Printf("worker exit") } ``` > 默认消费者处理器放在 `app/handle.go` ## 服务骨架示例 https://gitee.com/gomods/worker-demo > 新项目可以直接克隆,替换名称并直接使用。