# flink-jobs-quickstart **Repository Path**: ZaneGitHome/flink-jobs-quickstart ## Basic Information - **Project Name**: flink-jobs-quickstart - **Description**: flink-jobs应用快速入门 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 8 - **Created**: 2023-06-12 - **Last Updated**: 2023-06-12 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink-jobs-quickstart ## 介绍 flink-jobs-quickstart是一个简单的网页应用,用于带领开发者快速入门[flink-jobs](https://gitee.com/tenmg/flink-jobs),它演示了如何将[flink-jobs](https://gitee.com/tenmg/flink-jobs)集成到现有基于Java应用中(例如JavaWeb等)。该网站只有一个主页面,可以启动和停止一个flink-jobs应用程序或普通flink应用程序。程序被启动后,页面还可以显示一些简单的监控信息。 ## 安装教程 1. 需先准备Flink、Tomcat、MySQL、StarRocks等环境,Flink需要配置`state.savepoints.dir`以便可以通过[flink-jobs-clients](https://gitee.com/tenmg/flink-jobs/tree/master/flink-jobs-clients)安全停止任务。 2. 下载[flink-jobs-quickstart](https://gitee.com/tenmg/flink-jobs-quickstart)并根据实际运行环境修改配置文件`main/resources/flink-jobs.properties`和`main/resources/flink-jobs-clients.properties`后运行`mvn clean package -Dmaven.test.skip=true`打包。 3. 将依赖的JAR包(如mysql-connector-java、flink-connector-starrocks等)上传到Flink的lib目录下,重启Flink。 4. 将WAR包上传到Tomcat的webapps目录下,并启动Tomcat。 5. 浏览器打开主页http://${youraddress}:8080/flink-jobs-quickstart/,可以看到文本框中默认的flink-jobs配置内容。 ![主页.png](assets/index.png) 6. 根据自己的环境,修改jar后点击“启动”按钮,测试是否可以正常提交Flink的官方范例。 7. Flink官方范例跑通后停止测试任务(也可以直接跳过官方范例测试),在两个数据库中分别创建test数据库、test_table表,并在MySQL插入一些数据。 ``` /* MySQL */ /* Create database */ CREATE DATABASE test; /* Create table */ USE test; CREATE TABLE test_table ( ID int NOT NULL, NAME varchar(255), CREATE_TIME datetime, PRIMARY KEY (ID) ); /* Insert records */ INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (1, 'Flink Jobs', '2022-09-20 15:13:24'); INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (2, 'Flink CDC', '2022-09-20 15:24:01'); INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (3, 'MySQL', '2022-09-20 17:12:16'); INSERT INTO test_table(ID, NAME, CREATE_TIME) VALUES (4, 'StarRocks', '2022-09-20 17:12:55'); ``` ``` /* StarRocks */ /* Create database */ CREATE DATABASE test; /* Create table */ USE test; CREATE TABLE test_table ( ID int(11) NOT NULL, NAME varchar(765), CREATE_TIME datetime, EVENT_TIMESTAMP datetime COMMENT "事件时间", ETL_TIMESTAMP datetime COMMENT "清洗时间" ) PRIMARY KEY(ID) DISTRIBUTED BY HASH(ID) BUCKETS 3; ``` 8. 将任务配置改为如下内容并启动任务: ``` ``` 9. 任务正常运行后,对MySQL中的数据进行增、删、改,观察StarRocks中的数据,将看到实时同步数据已经生效。 10. StarRocks的主键模型还存在问题,不适合数据量较大的表的数据同步,可以通过使用[flink-cdc-log-connectors](https://gitee.com/tenmg/flink-cdc-log-connectors)并在更新模型的表中加入`OP`列来解决数据删除问题。 ## 使用说明 1. 打开主页后,可修改配置内容后点击“启动”按钮启动flink-jobs任务,也可以直接点击“启动”按钮启动flink的WordCount样例程序。关于flink-jobs的详细配置详见[https://gitee.com/tenmg/flink-jobs](https://gitee.com/tenmg/flink-jobs)。 ![启动任务.png](assets/start.png.png) 2. 任务启动后,网页便开始计时,并不断请求后台监控任务运行的情况,主页上会显示相关信息。 ![监控任务.png](assets/minitor.png.png) 3. 任务启动后,可以随时关闭任务,关闭任务后会返回保存点,将返回的保存点存储起来,再下次提交任务时作为启动选项提交给[flink-jobs](https://gitee.com/tenmg/flink-jobs)可实现从保存点重新恢复或重启任务的效果。 ![停止任务.png](assets/stop.png) ## 玩转flink 将flink-jobs任务配置内容保存在数据库中,通过flink-jobs-clients启动任务后,将返回的`jobsId`保存到数据库的任务运行日志中,再使用`jobsId`通过flink-jobs-clients(或者flink-clients、Flink REST)提供的接口监控任务可快速将Flink彻底集成到现有系统中,实现统一平台管理。 ![玩转flink](https://images.gitee.com/uploads/images/2021/0824/155531_ef090a42_7920102.png "玩转flink.png") ## 相关链接 flink-jobs开源地址:https://gitee.com/tenmg/flink-jobs flink-cdc-log-connectors开源地址:https://gitee.com/tenmg/flink-cdc-log-connectors DSL开源地址:https://gitee.com/tenmg/dsl Flink官网:https://flink.apache.org Debezuim官网:https://debezium.io