import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema; /**
* @Auther: Created By gaoxing
* @Date: 2020/4/7 14:07
* @Description: flink-1.10.0版本中几种创建table的方法,即创建数据源的方法
*/
public class CreateView { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); /**
* 第一种方法
* stream to view
*/
DataStreamSource<String> socketLines = env.socketTextStream("localhost", 8888);
tableEnv.createTemporaryView("t_socket", socketLines, "row_data"); /**
* 第二种方法
* 使用flink的connect连接器
* 该方法没有任何的返回值,直接注册一张临时视图出来
*/
tableEnv.connect(
new Kafka()
.version("universal")
.topic("sql-json-test")
.startFromEarliest()
.property("zookeeper.connect", "")
.property("bootstrap.servers", "")
.property("group.id", "flink")
).withFormat(
new Json()
.failOnMissingField(true)
// 如果不指定schema信息,则会自动推断信息,派生出来schema,这个行为默认是生效的,不再需要显式申明
// .schema(new RowTypeInfo(
// new BasicTypeInfo[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO},
// new String[]{"id", "name"}
// ))
).withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
).inAppendMode()
.createTemporaryTable("kafka_test"); /**
* 第三种方法
* 使用create table的sSQL语句进行创建
*/ tableEnv.sqlUpdate(
"CREATE TABLE t_kafka(" +
" id int," +
" name string" +
" ) WITH (" +
" 'connector.type' = 'kafka'," +
" 'connector.version' = 'universal'," +
" 'connector.topic' = 'sql-json-test', " +
" 'connector.startup-mode' = 'earliest-offset'," +
" 'connector.properties.zookeeper.connect' = ''," +
" 'connector.properties.bootstrap.servers' = ''," +
" 'update-mode' = 'append'," +
" 'format.type' = 'json'," +
" 'format.derive-schema' = 'true'," +
" 'format.fail-on-missing-field' = 'true'"
); env.execute("CreateTable"); }
}

最新文章

  1. php常用数组函数回顾一
  2. Oracle表空间,用户,用户授权
  3. dissmiss a UISearchBar with an SearchBarController
  4. [iOS]iPhone推送原理
  5. PCB阻抗调节
  6. 《Linux内核修炼之道》 系列
  7. SICP 习题 (2.6) 解题总结:丘奇计数
  8. Learning Ionic中文版本
  9. 安装oh-my-zsh
  10. 版本控制—使用Gradle自动管理应用程序版本
  11. vue-13-swiper组件的使用
  12. python装饰器1:函数装饰器详解
  13. SpringBoot(四)thymeleaf+MyBatis+MySql
  14. _ai_creature
  15. bash-4.2$ bash: /home/test/.bashrc: 权限不够
  16. DES的加密与解密算法(Python实现)
  17. 使用Puppeteer进行数据抓取(四)——图片下载
  18. 一个好玩的计算题目(c++ 位运算)
  19. NOIP2013 D1 T2火柴排队
  20. Codeforces Round #479 (Div. 3)

热门文章

  1. 使用Gateway配置路由以及动态路由
  2. 在npm发布自己造的轮子
  3. PHP array_pop() 函数
  4. PDOStatement::execute
  5. C/C++编程笔记:C++入门知识丨认识C++面向过程编程的特点
  6. bzoj 2839 集合计数 容斥\广义容斥
  7. C++类、函数、指针
  8. 解决Exception in thread &quot;AWT-EventQueue-0&quot; java.lang.UnsatisfiedLinkError: no jogl in java.library.path问题
  9. C语言学习笔记之一个程序弄清&amp;&amp;、||、i++、++i
  10. SonarQube 自定义规则开发