Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。 -
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 -
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 -
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。 -
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、Flink 的 Formats
- 二、CSV Format
- 1、maven 依赖
- 2、Flink sql client 建表示例
- 3、table api建表示例
- 4、Format 参数
- 5、数据类型映射
- 三、JSON Format
- 1、maven 依赖
- 2、Flink sql client 建表示例
- 3、table api 建表示例
- 4、Format 参数
- 5、数据类型映射关系
本文介绍了Flink 支持的数据格式中的csv和json,并分别以sql和table api作为示例进行了说明。
本文依赖flink、kafka集群能正常使用。
本文分为3个部分,即概述、CSV和JSON Format。
本文的示例是在Flink 1.17版本(flink 集群和maven均是Flink 1.17)中运行。
一、Flink 的 Formats
Flink 提供了一套与表连接器(table connector)一起使用的表格式(table format)。表格式是一种存储格式,定义了如何把二进制数据映射到表的列上。
Flink 支持以下格式:
二、CSV Format
CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。
1、maven 依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.17.1</version> </dependency>
复制
2、Flink sql client 建表示例
以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。
CREATE TABLE Alan_KafkaTable ( `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); Flink SQL> CREATE TABLE Alan_KafkaTable ( > `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', > `partition` BIGINT METADATA VIRTUAL, > `offset` BIGINT METADATA VIRTUAL, > `user_id` BIGINT, > `item_id` BIGINT, > `behavior` STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'Alan_KafkaTable', > 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'csv' > ); [INFO] Execute statement succeed. # kafka客户端命令行输入数据 [alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic Alan_KafkaTable >1,1001,login >1,2001,p_read > # flink sql client 查询数据即可 Flink SQL> select * from Alan_KafkaTable; +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+ | op | event_time | partition | offset | user_id | item_id | behavior | +----+-------------------------+----------------------+----------------------+----------------------+----------------------+--------------------------------+ | +I | 2023-11-15 15:53:17.925 | 0 | 0 | 1 | 1001 | login | | +I | 2023-11-15 15:53:45.839 | 0 | 1 | 1 | 2001 | p_read |
复制
3、table api建表示例
通过table api建表,参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.and; import static org.apache.flink.table.api.Expressions.lit; import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall; import java.sql.Timestamp; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Over; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableDescriptor; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.types.Row; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @author alanchan * */ public class TestTableAPIDemo { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { testCreateTableBySQLAndAPI(); } static void testCreateTableBySQLAndAPI() throws Exception { // EnvironmentSettings env = EnvironmentSettings.newInstance().inStreamingMode().build(); // TableEnvironment tenv = TableEnvironment.create(env); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // SQL 创建输入表 // String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" + // " `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" + // " `partition` BIGINT METADATA VIRTUAL,\r\n" + // " `offset` BIGINT METADATA VIRTUAL,\r\n" + // " `user_id` BIGINT,\r\n" + // " `item_id` BIGINT,\r\n" + // " `behavior` STRING\r\n" + // ") WITH (\r\n" + // " 'connector' = 'kafka',\r\n" + // " 'topic' = 'user_behavior',\r\n" + // " 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092',\r\n" + // " 'properties.group.id' = 'testGroup',\r\n" + // " 'scan.startup.mode' = 'earliest-offset',\r\n" + // " 'format' = 'csv'\r\n" + // ");"; // tenv.executeSql(sourceSql); //API创建表 Schema schema = Schema.newBuilder() .columnByMetadata("event_time", DataTypes.TIME(3), "timestamp") .columnByMetadata("partition", DataTypes.BIGINT(), true) .columnByMetadata("offset", DataTypes.BIGINT(), true) .column("user_id", DataTypes.BIGINT()) .column("item_id", DataTypes.BIGINT()) .column("behavior", DataTypes.STRING()) .build(); TableDescriptor kafkaDescriptor = TableDescriptor.forConnector("kafka") .comment("kafka source table") .schema(schema) .option(KafkaConnectorOptions.TOPIC, Lists.newArrayList("user_behavior")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "server1:9092,server2:9092,server3:9092") .option(KafkaConnectorOptions.PROPS_GROUP_ID, "testGroup") .option("scan.startup.mode", "earliest-offset") .format("csv") .build(); tenv.createTemporaryTable("Alan_KafkaTable", kafkaDescriptor); //查询 String sql = "select * from Alan_KafkaTable "; Table resultQuery = tenv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class); // 6、sink resultDS.print(); // 7、执行 env.execute(); //kafka中输入测试数据 // 1,1001,login // 1,2001,p_read //程序运行控制台输入如下 // 11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login]) // 11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read]) } @Data @NoArgsConstructor @AllArgsConstructor public static class User { private long id; private String name; private int age; private Long rowtime; } }
复制
4、Format 参数
5、数据类型映射
目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。
下面的表格列出了flink数据和CSV数据的对应关系。
三、JSON Format
JSON Format 能读写 JSON 格式的数据。当前,JSON schema 是从 table schema 中自动推导而得的。
1、maven 依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.17.1</version> </dependency>
复制
2、Flink sql client 建表示例
以下是一个利用 Kafka 以及 JSON Format 构建表的例子。
CREATE TABLE Alan_KafkaTable_json ( `id` INT, name string, age BIGINT, t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'Alan_KafkaTable_json', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092', 'format' = 'json' ); Flink SQL> CREATE TABLE Alan_KafkaTable_json ( > `id` INT, > name string, > age BIGINT, > t_insert_time TIMESTAMP(3) METADATA FROM 'timestamp', > WATERMARK FOR t_insert_time as t_insert_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'Alan_KafkaTable_json', > 'scan.startup.mode' = 'earliest-offset', > 'properties.bootstrap.servers' = 'server1:9092,server2:9092,server3:9092', > 'format' = 'json' > ); [INFO] Execute statement succeed. # kafka 客户端命令输入数据 [alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic Alan_KafkaTable_json >{ "id":"1" ,"name":"alan","age":"12" } >{ "id":"2" ,"name":"alanchan","age":"22" } >{ "id":"3" ,"name":"alanchanchan","age":"32" } >{ "id":"4" ,"name":"alan_chan","age":"42" } >{ "id":"5" ,"name":"alan_chan_chn","age":"52" } > # flink sql client查询数据 Flink SQL> select * from Alan_KafkaTable_json; +----+-------------+--------------------------------+----------------------+-------------------------+ | op | id | name | age | t_insert_time | +----+-------------+--------------------------------+----------------------+-------------------------+ | +I | 1 | alan | 12 | 2023-11-15 16:03:49.805 | | +I | 2 | alanchan | 22 | 2023-11-15 16:04:02.632 | | +I | 3 | alanchanchan | 32 | 2023-11-15 16:04:08.810 | | +I | 4 | alan_chan | 42 | 2023-11-15 16:04:15.132 | | +I | 5 | alan_chan_chn | 52 | 2023-11-15 16:04:21.146 |
复制
3、table api 建表示例
通过table api建表,参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
参考上文中关于CSV Format的table api 建表示例,变化的是json的格式参数。
4、Format 参数
5、数据类型映射关系
当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。
在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
以上,介绍了Flink 支持的数据格式中的csv和json,并分别以sql和table api作为示例进行了说明。