# flink **Repository Path**: wolfcub.com/flink ## Basic Information - **Project Name**: flink - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-10-24 - **Last Updated**: 2021-10-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # flink > [参考链接](https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/try-flink/table_api/) #### 代码借鉴例子编写 ```java package org.apache.flink.playgrounds.spendreport; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.expressions.TimeIntervalUnit; import static org.apache.flink.table.api.Expressions.*; public class SpendReport { public static Table report(Table transactions) { return transactions .window(Slide.over(lit(5).hour()).every(lit(1).hour()).on($("transaction_time")).as("log_ts")) .groupBy($("account_id"), $("log_ts")) .select( $("account_id"), $("log_ts").start().as("log_ts"), $("amount").avg().as("amount")); } public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv.executeSql("CREATE TABLE transactions (\n" + " account_id BIGINT,\n" + " amount BIGINT,\n" + " transaction_time TIMESTAMP(3),\n" + " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'transactions',\n" + " 'properties.bootstrap.servers' = 'kafka:9092',\n" + " 'format' = 'csv'\n" + ")"); tEnv.executeSql("CREATE TABLE spend_report (\n" + " account_id BIGINT,\n" + " log_ts TIMESTAMP(3),\n" + " amount BIGINT\n," + " PRIMARY KEY (account_id, log_ts) NOT ENFORCED" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" + " 'table-name' = 'spend_report',\n" + " 'driver' = 'com.mysql.jdbc.Driver',\n" + " 'username' = 'sql-demo',\n" + " 'password' = 'demo-sql'\n" + ")"); Table transactions = tEnv.from("transactions"); report(transactions).executeInsert("spend_report"); } } ``` #### 运行结果 ![运行结果](https://images.gitee.com/uploads/images/2021/1024/221706_bff61142_330140.png "屏幕截图.png")