Oracle GoldenGate to Confluent with Kafka Connect
Confluent is a company founded by the team that built Apache Kafka. It builds a platform around Kafka that enables companies to easily access data as real-time streams.
Confluent offers three different ways to get started with Kafka.
- Confluent Open Source
- Confluent Enterprise
- Confluent Cloud
While we in this series of Kafka Tutorial discuss much about Confluent Open Source, you may check the other two ways based on your requirement and interest.
While comparing Confluent Open Source with Apache Kafka, we get the following capabilities or tools in addition to standard Apache Kafka :
- Additional Clients: Supports C, C++, Python, .NET and several other non-Java Clients.
- REST Proxy – Provides universal access to Kafka from any network connected device via HTTP
- Schema Registry – Central registry for the format of Kafka data – guarantees all data is always consumable
- Pre-Built Connectors – HDFS, JDBC, Elasticsearch, Amazon S3 and other connectors fully certified and supported by Confluent
Confluent installation:
1.Unzip Confluent
2.curl -L https://cnfl.io/cli | sh -s -- -b /u01/confluent-5.3.1/bin
3.set java_home: /etc/alternatives/jre_1.8.0 Oracle Linux 7
4.Start server
[oracle@instance-20191202-1420 ~]$ $CONFLUENT_HOME/bin/confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.Vn0uJJY4
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
5.OGG for conluent kafka connect
Confluent version:5.3.1
OGG4BD:Version 19.1.0.0.2 OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039
Error msg:
org.springframework.beans.factory.BeanCreationException: Error creating bean wit
h name 'userExitDataSource' defined in class path resource [oracle/goldengate/da
tasource/DataSource-context.xml]: Bean instantiation via factory method failed;
nested exception is org.springframework.beans.BeanInstantiationException: Failed
to instantiate [oracle.goldengate.datasource.GGDataSource]: Factory method 'get
DataSource' threw exception; nested exception is org.apache.kafka.common.config.
ConfigException: Missing required configuration "converter.type" which has no de
fault value.
Workaround:
0. Add below 3 lines into kafkaconnect.properties
converter.type=key
converter.type=value
converter.type=header
https://support.oracle.com/epmos/faces/DocumentDisplay?_afrLoop=184647734022308&parent=EXTERNAL_SEARCH&sourceId=PROBLEM&id=2455697.1&_afrWindowMode=0&_adf.ctrl-state=ifht4s4f7_4
What we have right now:
[oracle@instance-20191202-1420 dirprm]$ cat kafkaconnect.properties
bootstrap.servers=localhost:9092
acks=1
#JSON Converter Settings
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
#Avro Converter Settings
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
#key.converter.schema.registry.url=http://localhost:8081
#value.converter.schema.registry.url=http://localhost:8081
converter.type=key
converter.type=value
converter.type=header
#Adjust for performance
buffer.memory=33554432
batch.size=16384
linger.ms=0
1. Enable listner(Need to confirm if it is a must):
<Confluent_home>/etc/kafka/server.properties
Update:listeners=PLAINTEXT://localhost:9092
Restart the kafka-server
2. Review the kc.props
[oracle@instance-20191202-1420 dirprm]$ cat kc.props
gg.handlerlist=kafkaconnect
#The handler properties
gg.handler.kafkaconnect.type=kafkaconnect
gg.handler.kafkaconnect.kafkaProducerConfigFile=kafkaconnect.properties
gg.handler.kafkaconnect.mode=op
#The following selects the topic name based on the fully qualified table name
gg.handler.kafkaconnect.topicMappingTemplate=ogg_topic
#The following selects the message key using the concatenated primary keys
gg.handler.kafkaconnect.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.MetaHeaderTemplate=${alltokens}
#The formatter properties
gg.handler.kafkaconnect.messageFormatting=row
gg.handler.kafkaconnect.insertOpKey=I
gg.handler.kafkaconnect.updateOpKey=U
gg.handler.kafkaconnect.deleteOpKey=D
gg.handler.kafkaconnect.truncateOpKey=T
gg.handler.kafkaconnect.treatAllColumnsAsStrings=false
gg.handler.kafkaconnect.iso8601Format=false
gg.handler.kafkaconnect.pkUpdateHandling=abend
gg.handler.kafkaconnect.includeTableName=true
gg.handler.kafkaconnect.includeOpType=true
gg.handler.kafkaconnect.includeOpTimestamp=true
gg.handler.kafkaconnect.includeCurrentTimestamp=true
gg.handler.kafkaconnect.includePosition=true
gg.handler.kafkaconnect.includePrimaryKeys=false
gg.handler.kafkaconnect.includeTokens=false
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Apache Kafka Classpath
#gg.classpath={Kafka install dir}/libs
#gg.classpath=/u01/confluent-5.3.1/share/java/schema-registry
#Confluent IO classpath
#gg.classpath={Confluent install dir}/share/java/kafka-serde-tools/*:{Confluent install dir}/share/java/kafka/*:{Confluent install dir}/share/java/confluent-common/*
gg.classpath=/u01/confluent-5.3.1/share/java/kafka-serde-tools/*:/u01/confluent-5.3.1/share/java/kafka/*:/u01/confluent-5.3.1/share/java/confluent-common/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=.:ggjava/ggjava.jar:./dirprm
3. Test
GGSCI (instance-20191202-1420) 1> stats kc
Sending STATS request to REPLICAT KC ...
Start of Statistics at 2019-12-02 09:54:55.
Replicating from QASOURCE.TCUSTMER to QASOURCE.TCUSTMER:
*** Total statistics since 2019-12-02 09:44:32 ***
Total inserts 5.00
Total updates 1.00
Total deletes 0.00
Total upserts 0.00
Total discards 0.00
Total operations 6.00
最新文章
- VS Code 配置Python
- CentOS6.5安装Eclipse
- OC基础--Property
- Python 中的数据结构总结(一)
- relative 和 absolute
- 一、HTML和CSS基础--开发工具--Sublime前端开发工具技巧介绍
- PostgreSQL Replication之第十一章 使用Skytools(2)
- php中json_decode返回数组或对象的实例
- String.IsNullOrWhiteSpace和String.IsNullOrEmpty的区别
- [转]C++宏定义详解
- Eddy&#39;s爱好 hdu2204
- mybatis-plus的代码生成器
- ___树形菜单Ztree.js显示.
- Knockout示例:User数据CRUD
- C# 哈希表HashTable的简单使用
- 解决启动vs2010 未能找到自动配置的设置文件
- mac nginx compile
- vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍
- 精心挑选的HTML5/CSS3应用及源码
- 【Linux_Unix系统编程】chapter6 进程