# 消息队列传输工具 **Repository Path**: zhougaoping/queue-tools ## Basic Information - **Project Name**: 消息队列传输工具 - **Description**: 消息队列传输工具 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-15 - **Last Updated**: 2025-08-15 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Data Flow - Kafka 与 RabbitMQ 数据传输工具 ## 项目概述 Data Flow 是一个用 Go 语言开发的高性能数据传输工具,用于在 Kafka 与 RabbitMQ 之间进行稳定、高并发、可观测的数据转发。支持单向与双向传输,具备详细中文日志与 Prometheus 指标输出。 ## 主要特性 - **双向传输**: 支持 Kafka → RabbitMQ、RabbitMQ → Kafka、双向 - **高性能**: 基于 goroutine 的并发消费与生产 - **可观测性**: 内置 Prometheus 指标、健康检查、状态/统计接口 - **配置灵活**: YAML 配置文件;支持转换与过滤,可快速按需启用 - **日志完善**: zap 日志;支持控制台/文件/双通道输出,中文字段 - **健壮可靠**: 断线自动重连、优雅关闭、错误统计 ## 快速开始 ### 1. 环境准备 - Go 1.21+ - Kafka、RabbitMQ 实例(示例地址:Kafka `192.168.168.23:9092`,RabbitMQ `amqp://guest:guest@192.168.168.23:5672/`) ### 2. 安装依赖 ```bash # 在项目根目录 go mod tidy ``` ### 3. 配置文件 - 复制或直接编辑 `config/config.yaml`,核心字段: ```yaml kafka: brokers: - "192.168.168.23:9092" version: "2.8.0" consumer: group_id: "data-flow-consumer" topics: - "source-topic" auto_offset_reset: "latest" max_wait_time: 1000 producer: topic: "target-topic" required_acks: 1 timeout: 5000 retry_max: 3 batch_size: 1000 linger_ms: 10 rabbitmq: url: "amqp://guest:guest@192.168.168.23:5672/" exchange: "data-flow-exchange" exchange_type: "topic" routing_key: "data-flow.#" queue: "data-flow-queue" durable: true auto_delete: false internal: false no_wait: false arguments: {} consumer: prefetch_count: 10 auto_ack: false producer: mandatory: false immediate: false transfer: direction: "kafka_to_rabbitmq" # kafka_to_rabbitmq | rabbitmq_to_kafka | bidirectional batch_size: 100 batch_timeout: 1000 retry: max_attempts: 3 backoff_ms: 1000 transform: enabled: false mode: "none" # none | uppercase | lowercase | pretty_json prefix: "" suffix: "" filter: enabled: false include_rules: [] # 命中任意则通过(为空表示不过滤) exclude_rules: [] # 命中任意则过滤 content_type: "body" # 预留:body/header/key(目前仅 body 生效) monitoring: enabled: true metrics_port: 8080 prometheus: enabled: true path: "/metrics" logging: level: "info" # debug | info | warn | error format: "json" # json | console output: "both" # console | file | both log_file: "logs/data-flow.log" rotation: "daily" # size | daily(daily 时按日期生成文件名,如 data-flow-20250814.log) max_size: 100 max_age: 30 max_backups: 10 compress: true ``` ### 4. 启动服务 ```bash # 正常模式(需要能连通 Kafka/RabbitMQ) go run ./cmd --config config/config.yaml # DEV 模式(跳过外部连接,仅启动 HTTP 服务以便调试接口) # Windows PowerShell $env:DEV_MODE=1; go run ./cmd --config config/config.yaml ``` ### 5. 健康与观测 - 健康: `GET http://localhost:8080/health` - 状态: `GET http://localhost:8080/status` - 统计: `GET http://localhost:8080/stats` - 指标: `GET http://localhost:8080/metrics` - UI: `GET http://localhost:8080/ui`(离线可用) ## 转换与过滤(可选) - 转换(`transfer.transform`) - `enabled`: 开关 - `mode`: none|uppercase|lowercase|pretty_json - `prefix`/`suffix`: 为转换后的消息增加前后缀 - 过滤(`transfer.filter`) - `enabled`: 开关 - `include_rules`: 命中任意则通过;支持子串或正则(用 `/.../` 包裹) - `exclude_rules`: 命中任意则过滤;同上 - `content_type`: 预留;当前仅对消息体(body)匹配 示例: ```yaml transfer: transform: enabled: true mode: pretty_json filter: enabled: true include_rules: - "/order_id\\":\\s*\\"[0-9]+\\"/" exclude_rules: - "/error|exception|failed/" ``` ## 方向配置(非常重要) - 通过 `transfer.direction` 区分传输方向: - `kafka_to_rabbitmq`:Kafka → RabbitMQ - `rabbitmq_to_kafka`:RabbitMQ → Kafka - `bidirectional`:双向同时进行 - 示例配置文件: - Kafka → RabbitMQ:`config/config.kafka_to_rabbitmq.yaml` - RabbitMQ → Kafka:`config/config.rabbitmq_to_kafka.yaml` - 日志文件名会自动追加后缀,便于区分: - k2r:Kafka→RabbitMQ(如 `data-flow-k2r.log`) - r2k:RabbitMQ→Kafka(如 `data-flow-r2k.log`) - bi:双向(如 `data-flow-bi.log`) - 若 `rotation: daily`,还会追加日期(如 `data-flow-k2r-YYYYMMDD.log`) ## 内置生产者工具 用于向 Kafka 写入测试消息,位于 `tools/producer/`。 - 单条消息: ```bash go run ./tools/producer --brokers 192.168.168.23:9092 --topic source-topic --message '{"hello":"world"}' ``` - 连发 N 条、间隔发送: ```bash go run ./tools/producer --brokers 192.168.168.23:9092 --topic source-topic --message test --count 10 --interval-ms 200 ``` - 从 stdin 批量发送(每行一条): ```bash # Windows PowerShell type .\messages.txt | go run ./tools/producer --brokers 192.168.168.23:9092 --topic source-topic --stdin ``` 可选参数:`--version`(默认 2.8.0)、`--key`、`--acks`(0/1/-1)、`--timeout-ms`。 ## 常见问题 - 无法连接 Kafka(`dial tcp ...:9092` 超时) - 检查网络/防火墙 - Kafka 配置需对外可达: - `listeners=PLAINTEXT://0.0.0.0:9092` - `advertised.listeners=PLAINTEXT://<服务器IP>:9092` - `listener.security.protocol.map=PLAINTEXT:PLAINTEXT` - `/metrics` 404:确认 `monitoring.prometheus.enabled: true`,`path` 正确 - 本地无依赖想先看接口:启用 `DEV_MODE=1` ## 架构 ``` ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Kafka │ │ Data Flow │ │ RabbitMQ │ │ Consumer │◄──►│ Engine │◄──►│ Producer │ └─────────────┘ └─────────────┘ └─────────────┘ │ ▼ ┌─────────────┐ │ Monitoring │ │ & Logs │ └─────────────┘ ``` ## Docker 运行(可选) - 本仓库包含 `docker-compose.yml`(Kafka、ZooKeeper、RabbitMQ、Prometheus、Grafana、Data Flow)。 - 你的本机未安装 Docker/Compose 可跳过本节。 ## 开发与贡献 - 代码风格:清晰命名、中文日志、必要时中文注释 - 欢迎提交 Issue / PR ## 许可证 MIT License