# kafka-sink-sdk **Repository Path**: ssp198/kafka-sink-sdk ## Basic Information - **Project Name**: kafka-sink-sdk - **Description**: kafka-sink-sdk,kafka-sink操作实现,sink目的源支持:Kafka、RestFull API,支持一路到多路sink操作,多路并行。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2023-05-14 - **Last Updated**: 2023-05-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: sink, Kafka, Java ## README #### kafka-sink-kafka-sdk——kafka sink to kafka ##### 1、实现功能 kafka一对多sink操作:一路到多路sink,一份poll,多份消费. - [x] 上游同broker源同topic消费任务自动归档聚合,对下游作数据广播(Event) - [x] Sink任务流采用责任链机制实现数据处理加工过滤链 - [x] 并行Sink任务流 - [ ] Sink任务下游同broker源生产者合并 - [ ] Kafka连接池同源合并(kafka-clients内部实现) ##### 2、若干实现点 - [x] Sink任务流首尾Kafka消费者、生产者集成 - [x] Handler + Pipeline 轻量责任链机制定制数据处理流程 - [x] Broadcaster 作数据广播器,向下游多条pipeline递送数据 - [x] Handler: - [x] converter: pojo to pojo converter - [x] filter: condition filter, 消息传递阻断 - [ ] Handler公/私有化 ##### 3、线程模型设计——Kafka消息同步IO + 异步处理\[异步并行责任链流转] * 1 * kafka消费者线程组(parallelism <= topic.partition) + 同步广播(sync-broadcast) * 异步并行Sink责任链流转 + 消息同步提交(sync-submit) + 异步批量推送(kafka-clients-producer, 支持异步批量) ##### 4、使用示例 **Sink任务流组建启动** ```java final String brokers = "127.0.0.1:6667,127.0.0.1:6668,127.0.0.1:6669"; //[{"val":1}, {"val":-1}, {"val":2}, ...] final TopicMeta fromTopic = new TopicMeta("id_topic_test_from_0", "TOPIC_FORM", brokers); final TopicMeta toTopic = new TopicMeta("id_topic_test_to_0", "TOPIC_TO", brokers); SinkTask.builder() .upStreamTopic(ConsumerCfg.defaultCfg() //上游Kafka消费者源配置 .topicMeta(fromTopic) .build()) .downStreamPipeline(DefaultPipeline.pipelines() //中间责任链定制 .addLast(new LogHandler()) .addLast(new StringToJsonObjConverter()) .addLast(new ConditionalMsgFilter(msg -> { final Integer val = (Integer) msg.get("val"); return val >= 0; //过滤val>=0的record })) .addLast(new ObjToBytesConverter()) ) .downStreamTopic(ProducerCfg.defaultCfg() //下游Kafka生产者源配置 .topicMeta(toTopic) .build()) .build() //build sink task .start(); //start sink task ``` **上游Kafka消费者详细参数配置** > 见 [ConsumerCfg](src/main/java/com/cnpc/bds/sdk/sink/kafka/meta/ConsumerCfg.java). **下游Kafka生产者详细参数配置** > 见 [ProducerCfg](src/main/java/com/cnpc/bds/sdk/sink/kafka/meta/ProducerCfg.java).