Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.0</version>
</dependency>

另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.5.0</version>
</dependency>

Table API和SQL程序的结构

Flink的批处理和流处理的Table API和SQL程序遵循相同的模式;

所以我们只需要使用一种来演示即可

要想执行flink的SQL语句,首先需要获取SQL的执行环境:

两种方式(batch和streaming):

// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责:

- 在内部目录中注册一个表
- 注册外部目录
- 执行SQL查询
- 注册用户定义的(标量,表格或聚合)函数
- 转换DataStream或DataSet成Table
- 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment

在内部目录中注册一个表

TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格输出表格

输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统

输入表可以从各种来源注册:

- 现有`Table`对象,通常是表API或SQL查询的结果。
- `TableSource`,它访问外部数据,例如文件,数据库或消息传递系统。
- `DataStream`或`DataSet`来自DataStream或DataSet程序。

输出表可以使用注册TableSink

注册一个表

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env) // register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable) // Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("projectedTable ").select(...)

注册一个tableSource

TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)

注册一个tableSink

注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

例子:

 //创建batch执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//创建table环境用于batch查询
val tableEnvironment = TableEnvironment.getTableEnvironment(env)
//加载外部数据
val csvTableSource = CsvTableSource.builder()
.path("data1.csv")//文件路径
.field("id" , Types.INT)//第一列数据
.field("name" , Types.STRING)//第二列数据
.field("age" , Types.INT)//第三列数据
.fieldDelimiter(",")//列分隔符,默认是","
.lineDelimiter("\n")//换行符
.ignoreFirstLine()//忽略第一行
.ignoreParseErrors()//忽略解析错误
.build()
//将外部数据构建成表
tableEnvironment.registerTableSource("tableA" , csvTableSource)
//TODO 1:使用table方式查询数据
val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'")
//将数据写出去
table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
//TODO 2:使用sql方式
// val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2")
//// //将数据写出去
// sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
env.execute()

最新文章

  1. [LeetCode] Shortest Palindrome 最短回文串
  2. Spring(一)
  3. jmeter中线程之间传递参数
  4. SqlBulkCopy大批量数据插入到sql表中
  5. [转]extern,static存储空间矛盾
  6. (原)编译caffe时提示未定义的引用(undefined reference to)
  7. python 云打码 hhtp接口
  8. python3 使用ssl安全连接发送邮件
  9. CentOS修改SSH端口号和禁止root用户直接登录
  10. python练习题-day17
  11. jqeury-地区三级联动
  12. Android_Fragment和Activity之间的通信
  13. 去掉idea中竖线
  14. Python 爬取 猫眼 top100 电影例子
  15. sys 模块
  16. PHP 弹出文件下载 原理 代码
  17. 修改Python IDLE代码配色及语法高亮主题
  18. django视图缓存的实现
  19. uva 110 Meta-Loopless Sorts 用程序写程序 有点复杂的回溯水题
  20. 使用HTML5构建iOS原生APP(2)

热门文章

  1. 034_nginx报错总结
  2. 神经网络rbf
  3. Laravel 怎么在 blade 视图中将带 HTML 字符原样输出
  4. MySQL将查询出来的一组数据拼装成一个字符串
  5. mybatis xml中不能直接用大于号、小于号要用转义字符
  6. 关于STM32 __IO 的变量定义
  7. STM32L476应用开发之四:触摸屏驱动与数据交互
  8. Java的MVC模式简介
  9. Confluence 6 使用电子邮件可见
  10. 探索一个NSObject对象占用多少内存?