# datahub-flink-connector
**Repository Path**: geshihui/datahub-flink-connector
## Basic Information
- **Project Name**: datahub-flink-connector
- **Description**: Flink Connector for Aliyun DataHub.
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 1
- **Created**: 2022-01-10
- **Last Updated**: 2022-12-30
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# DataHub Flink Connector
## 概述
GitHub:https://github.com/CharleyWuCL/datahub-flink-connector
Author: Charley Wu
E-Mail: charleywu@aliyun.com
Blog: www.charleywu.com
## 简介
我们在阿里云上使用DataHub作为Flink程序输入输出的消息队列,使用成本比较低,但由于是阿里云的云产品,周边生态做的不是很好,Flink Stream的Connector并没有开源出来。因此本人参照RocketMQ Flink Connector写了DataHub的Flink Connector。
DataHub主要提供两个SDK,`aliyun-sdk-datahub`和`datahub-client-library`;前者是较为基础的SDK,提供了丰富的DataHub操作接口,使用较为复杂,需要对DataHub有较为深入的了解;后者在前者的基础上进行了封装,提供Consumer和Producer进行协同消费及生产。本SDK使用的是后者。
## 依赖介绍
- Flink:org.apache.flink:flink-streaming-java_2.12:1.13.1
- datahub-client-library: com.aliyun.datahub:datahub-client-library:1.2.0-public
- aliyun-sdk-datahub: com.aliyun.datahub:aliyun-sdk-datahub:2.21.6-public
```xml
UTF-8
1.8
1.18.22
1.13.1
2.12
2.21.6-public
1.2.0-public
org.projectlombok
lombok
${lombok.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
compile
com.aliyun.datahub
datahub-client-library
${datahub-library.version}
com.aliyun.datahub
aliyun-sdk-datahub
com.aliyun.datahub
aliyun-sdk-datahub
${datahub-sdk.version}
```
## Connector使用
### 依赖
目前还没有上传中央仓库,需使用者自行下载代码进行编译。
```xml
charley.wu
datahub-flink-connector
1.0-SNAPSHOT
```
### Source
```java
/**
* DataHub Source
*
* @return new DataHubSource.
*/
public DataHubSource createDataHubSource() {
// Validate common params.
Validate.isTrue(props.containsKey(DataHubConfig.ENDPOINT), "DataHub endpoint can not be null");
Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_ID), "DataHub accessId can not be null");
Validate.isTrue(props.containsKey(DataHubConfig.ACCESS_KEY), "DataHub accessKey can not be null");
// Validate source params.
Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_PROJECT), "DataHub project can not be null");
Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_TOPIC), "DataHub topic can not be null");
Validate.isTrue(props.containsKey(DataHubConfig.SOURCE_SUBID), "DataHub subId can not be null");
// Return new DataHub Source.
return new DataHubSource<>(props, new DataHubSourceDeserializer());
}
```
### Sink
```java
public DataHubSink