# flink-connector-mqtt **Repository Path**: davidfantasy/flink-connector-mqtt ## Basic Information - **Project Name**: flink-connector-mqtt - **Description**: 基于flink最新的[FLIP-27]架构对MQTT connector的实现 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 2 - **Created**: 2023-08-21 - **Last Updated**: 2025-01-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README [中文版](readme_zh.md) ## flink-connector-mqtt Implemented based on the latest [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) architecture of MQTT connector for Flink. The main features are as follows: - Compatible with the latest Flink version (1.17.1). - Supports reading data from multiple topics simultaneously and automatically shards based on topics. - Uses a high-performance MQTT client (hivemq-mqtt). - Supports querying in Flink SQL style. ## Dependency Description - Must use JDK 17 or higher. - Currently only supports MQTT 3 protocol, MQTT 5 will be supported in the future. - Flink version supports 1.17.1 and above. ## Usage 1. Add the dependency: ```xml com.github.davidfantasy flink-connector-mqtt 1.1.0 ``` 2. Example code for using as a streaming data source: ```java public class MqttSourceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); MqttProperties mqttProp = new MqttProperties(); mqttProp.setHost("broker-cn.emqx.io"); mqttProp.setPort(1883); // mqttProp.setUsername(""); // mqttProp.setPassword(""); List topics = new ArrayList<>(); topics.add(new MqttTopic("/flink-connector/mqtt/source/test", 0)); var source = env.fromSource(new MqttSource(mqttProp, topics), WatermarkStrategy.noWatermarks(), "Mqtt Source"); source.map(v -> { var msg = (MqttMessage) v; return msg.getTopic() + ":" + new String(msg.getPayload()); }).print(); env.execute("MQTT Source Test"); } } ``` Create a table in Flink SQL: ```sql CREATE TABLE mqttTest ( id INTEGER, code STRING ) WITH ( 'connector' = 'mqtt', 'server' = 'broker-cn.emqx.io', 'port' = '1883', 'topic' = '/flink-connector/mqtt/source/test' ) ``` **Note**: When using Flink SQL, the message format for MQTT must be in JSON format. The JSON format corresponding to the table structure above is: ```json {"id":123,"code":"some hello"} ``` Currently, the following configurations can be used in the table: - connector: Fixed as mqtt - server: MQTT broker host, required - port: MQTT broker port, required - username: Authentication username, optional - password: Authentication password, optional - topic: MQTT topic corresponding to this table, required - qos: Quality of Service level for subscription, optional, default is 0