1. create mysql table like

CREATE TABLE `test` (
`a` tinyint(4) NOT NULL DEFAULT '',
`b` decimal(12,0) DEFAULT NULL,
`c` decimal(5,0) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

2. start kafka connect using Debezium mysql plugin

{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "223344",
"database.server.name": "localhost",
"database.whitelist": "inventory",
"table.whitelist":"inventory.test",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes":"false",
"transforms": "extractField",
"transforms.extractField.type": "com.centchain.kafka.connect.mysql.DebeziumMysql$Value",
"transforms.extractField.field": "after"
}
}

3. get errors:

[-- ::,] INFO WorkerSourceTask{id=cashier--} flushing  outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:)
[-- ::,] ERROR WorkerSourceTask{id=cashier--} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum class: class java.lang.Short
at org.apache.avro.util.internal.JacksonUtils.toJson(JacksonUtils.java:)
at org.apache.avro.util.internal.JacksonUtils.toJsonNode(JacksonUtils.java:)
at org.apache.avro.Schema$Field.<init>(Schema.java:)
at org.apache.avro.Schema$Field.<init>(Schema.java:)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroDa」

4. fix

file location: avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/util/internal/JacksonUtils.java

 /**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.util.internal; import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.TokenBuffer; public class JacksonUtils {
static final String BYTES_CHARSET = "ISO-8859-1"; private JacksonUtils() {
} public static JsonNode toJsonNode(Object datum) {
if (datum == null) {
return null;
}
try {
TokenBuffer generator = new TokenBuffer(new ObjectMapper());
toJson(datum, generator);
return new ObjectMapper().readTree(generator.asParser());
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
} @SuppressWarnings(value="unchecked")
static void toJson(Object datum, JsonGenerator generator) throws IOException {
if (datum == JsonProperties.NULL_VALUE) { // null
generator.writeNull();
} else if (datum instanceof Map) { // record, map
generator.writeStartObject();
for (Map.Entry<Object,Object> entry : ((Map<Object,Object>) datum).entrySet()) {
generator.writeFieldName(entry.getKey().toString());
toJson(entry.getValue(), generator);
}
generator.writeEndObject();
} else if (datum instanceof Collection) { // array
generator.writeStartArray();
for (Object element : (Collection<?>) datum) {
toJson(element, generator);
}
generator.writeEndArray();
} else if (datum instanceof byte[]) { // bytes, fixed
generator.writeString(new String((byte[]) datum, BYTES_CHARSET));
} else if (datum instanceof CharSequence || datum instanceof Enum<?>) { // string, enum
generator.writeString(datum.toString());
} else if (datum instanceof Double) { // double
generator.writeNumber((Double) datum);
} else if (datum instanceof Float) { // float
generator.writeNumber((Float) datum);
} else if (datum instanceof Long) { // long
generator.writeNumber((Long) datum);
} else if (datum instanceof Integer) { // int
generator.writeNumber((Integer) datum);
}else if ( datum instanceof Short) { // short
generator.writeNumber(new Integer(datum.toString()));
}else if (datum instanceof Boolean) { // boolean
generator.writeBoolean((Boolean) datum);
}
else if (datum instanceof BigDecimal){
generator.writeNumber((BigDecimal) datum);
} else {
throw new AvroRuntimeException("Unknown datum class: " + datum.getClass());
}
} public static Object toObject(JsonNode jsonNode) {
return toObject(jsonNode, null);
} public static Object toObject(JsonNode jsonNode, Schema schema) {
if (schema != null && schema.getType().equals(Schema.Type.UNION)) {
return toObject(jsonNode, schema.getTypes().get(0));
}
if (jsonNode == null) {
return null;
} else if (jsonNode.isNull()) {
return JsonProperties.NULL_VALUE;
} else if (jsonNode.isBoolean()) {
return jsonNode.asBoolean();
} else if (jsonNode.isInt()) {
if (schema == null || schema.getType().equals(Schema.Type.INT)) {
return jsonNode.asInt();
} else if (schema.getType().equals(Schema.Type.LONG)) {
return jsonNode.asLong();
}
}else if (jsonNode.isBigDecimal()){
return jsonNode.asDouble();
}else if (jsonNode.isLong()) {
return jsonNode.asLong();
} else if (jsonNode.isDouble()) {
if (schema == null || schema.getType().equals(Schema.Type.DOUBLE)) {
return jsonNode.asDouble();
} else if (schema.getType().equals(Schema.Type.FLOAT)) {
return (float) jsonNode.asDouble();
}
} else if (jsonNode.isTextual()) {
if (schema == null || schema.getType().equals(Schema.Type.STRING) ||
schema.getType().equals(Schema.Type.ENUM)) {
return jsonNode.asText();
} else if (schema.getType().equals(Schema.Type.BYTES)) {
try {
return jsonNode.getTextValue().getBytes(BYTES_CHARSET);
} catch (UnsupportedEncodingException e) {
throw new AvroRuntimeException(e);
}
}
} else if (jsonNode.isArray()) {
List l = new ArrayList();
for (JsonNode node : jsonNode) {
l.add(toObject(node, schema == null ? null : schema.getElementType()));
}
return l;
} else if (jsonNode.isObject()) {
Map m = new LinkedHashMap();
for (Iterator<String> it = jsonNode.getFieldNames(); it.hasNext(); ) {
String key = it.next();
Schema s = null;
if (schema == null) {
s = null;
} else if (schema.getType().equals(Schema.Type.MAP)) {
s = schema.getValueType();
} else if (schema.getType().equals(Schema.Type.RECORD)) {
s = schema.getField(key).schema();
}
Object value = toObject(jsonNode.get(key), s);
m.put(key, value);
}
return m;
}
return null;
}
}

The key is in

line 85-86 which fix error for short

line 90-91,117-118  which fix error for BigDecimal

5. result:

5.1  mysql -> kafka

lenmom@M1701:~/workspace/software/confluent-community-5.1.-2.11$ bin/kafka-avro-console-consumer --bootstrap-server 127.0.0.1: --from-beginning   --topic localhost.a.test
{"a":,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0001"},"operation_type":"c","pt_log_d":"","last_update_timestamp":}
{"a":,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0002"},"operation_type":"c","pt_log_d":"","last_update_timestamp":}
{"a":,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0003"},"operation_type":"c","pt_log_d":"","last_update_timestamp":}
{"a":,"b":{"bytes":"\u0001"},"c":{"bytes":"\u0004"},"operation_type":"c","pt_log_d":"","last_update_timestamp":}

5.2 kafka-hive

command config for connector:

{
"name": "hive-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "",
"topics": "localhost.a.test",
"hdfs.url": "hdfs://127.0.0.1:9000/",
"logs.dir": "/logs",
"topics.dir": "/inventory/",
"hadoop.conf.dir": "/home/lenmom/workspace/software/hadoop-2.7.3/etc/hadoop/",
"flush.size": "",
"rotate.interval.ms": "",
"hive.integration": true,
"hive.database": "inventory",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.FieldPartitioner",
"partition.field.name":"pt_log_d",
"hive.metastore.uris": "thrift://127.0.0.1:9083",
"schema.compatibility": "BACKWARD"
}
}

result:

hive> select * from localhost_a_test;
OK
c -- ::17.029
c -- ::17.029
c -- ::17.029
c -- ::17.029
Time taken: 0.168 seconds, Fetched: row(s)

6. for table schema

 CREATE TABLE `decimalTest` (
`POINTSDAY` decimal(12,0) NOT NULL DEFAULT '' ,
`POINTSMONTH` decimal(12,0) NOT NULL DEFAULT '' ,
`CASHDAY` decimal(12,0) NOT NULL DEFAULT '' ,
`CASHMONTH` decimal(12,0) NOT NULL DEFAULT ''
) insert into decimalTest values(1,2,3);

if we use hdfs-connector to sink to hive, we would get error like

[-- ::,] ERROR WorkerSinkTask{id=hive-sink-} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:)
at java.util.concurrent.FutureTask.run(FutureTask.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$(WorkerSinkTask.java:)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:)
... more
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "null", schema type: BYTES
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:)
... more
[-- ::,] ERROR WorkerSinkTask{id=hive-sink-} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:)

which is caused by serialization decimal using kafka-avro-convertor error.

To fix this error, I added logic

6.1  /schema-registry-5.1.0/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java

 private Object defaultValueFromAvro(Schema schema,
org.apache.avro.Schema avroSchema,
Object value,
ToConnectContext toConnectContext) {
// The type will be JsonNode if this default was pulled from a Connect default field, or an
// Object if it's the actual Avro-specified default. If it's a regular Java object, we can
// use our existing conversion tools.
if (!(value instanceof JsonNode)) {
return toConnectData(schema, value, toConnectContext);
} JsonNode jsonValue = (JsonNode) value;
switch (avroSchema.getType()) {
case INT:
if (schema.type() == Schema.Type.INT8) {
return (byte) jsonValue.getIntValue();
} else if (schema.type() == Schema.Type.INT16) {
return (short) jsonValue.getIntValue();
} else if (schema.type() == Schema.Type.INT32) {
return jsonValue.getIntValue();
} else {
break;
} case LONG:
return jsonValue.getLongValue(); case FLOAT:
return (float) jsonValue.getDoubleValue();
case DOUBLE:
return jsonValue.getDoubleValue(); case BOOLEAN:
return jsonValue.asBoolean(); case NULL:
return null; case STRING:
case ENUM:
return jsonValue.asText(); 43 case BYTES:
44 return jsonValue.getDecimalValue();
45 case FIXED:
46 try {
47 return jsonValue.getBinaryValue();
48 } catch (IOException e) {
49 throw new DataException("Invalid binary data in default value");
50 }
// return convertIntegerToBytes(jsonValue.getIntValue());
// return jsonValue.getIntValue(); case ARRAY: {
if (!jsonValue.isArray()) {
throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
}
List<Object> result = new ArrayList<>(jsonValue.size());
for (JsonNode elem : jsonValue) {
result.add(
defaultValueFromAvro(schema, avroSchema.getElementType(), elem, toConnectContext));
}
return result;
} case MAP: {
if (!jsonValue.isObject()) {
throw new DataException("Invalid JSON for map default value: " + jsonValue.toString());
}
Map<String, Object> result = new HashMap<>(jsonValue.size());
Iterator<Map.Entry<String, JsonNode>> fieldIt = jsonValue.getFields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> field = fieldIt.next();
Object converted = defaultValueFromAvro(
schema, avroSchema.getElementType(), field.getValue(), toConnectContext);
result.put(field.getKey(), converted);
}
return result;
} case RECORD: {
if (!jsonValue.isObject()) {
throw new DataException("Invalid JSON for record default value: " + jsonValue.toString());
} Struct result = new Struct(schema);
for (org.apache.avro.Schema.Field avroField : avroSchema.getFields()) {
Field field = schema.field(avroField.name());
JsonNode fieldJson = ((JsonNode) value).get(field.name());
Object converted = defaultValueFromAvro(
field.schema(), avroField.schema(), fieldJson, toConnectContext);
result.put(avroField.name(), converted);
}
return result;
} case UNION: {
// Defaults must match first type
org.apache.avro.Schema memberAvroSchema = avroSchema.getTypes().get();
if (memberAvroSchema.getType() == org.apache.avro.Schema.Type.NULL) {
return null;
} else {
return defaultValueFromAvro(
schema.field(unionMemberFieldName(memberAvroSchema)).schema(),
memberAvroSchema,
value,
toConnectContext);
}
}
default: {
return null;
}
}
return null;
}

after the fix, rebuild the jar and replace the file kafka-connect-avro-converter-5.1.0.jar  in confluent kafka installation dir.

then the data should be able to sink to hive now.

hive> select * from decimalTest limit ;
[WARNING] Avro: Invalid default for field POINTSDAY: not a {"type":"bytes","scale":,"precision":,"connect.version":,"connect.parameters":{"scale":"","connect.decimal.precision":""},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
[WARNING] Avro: Invalid default for field POINTSMONTH: not a {"type":"bytes","scale":,"precision":,"connect.version":,"connect.parameters":{"scale":"","connect.decimal.precision":""},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
[WARNING] Avro: Invalid default for field CASHDAY: not a {"type":"bytes","scale":,"precision":,"connect.version":,"connect.parameters":{"scale":"","connect.decimal.precision":""},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}
[WARNING] Avro: Invalid default for field CASHMONTH: not a {"type":"bytes","scale":,"precision":,"connect.version":,"connect.parameters":{"scale":"","connect.decimal.precision":""},"connect.default":"AA==","connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}

6.2  as we ca see from above, there's warnings if we query data in hive, to elemiate the warnings

/avro-release-1.8.1/lang/java/avro/src/main/java/org/apache/avro/Schema.java

 private static boolean isValidDefault(Schema schema, JsonNode defaultValue) {
if (defaultValue == null)
return false;
switch (schema.getType()) { case BYTES:
7 if (schema.logicalType.getName().equals("decimal")||
8 schema.logicalType.getName().toLowerCase().equals("bigdecimal")){
9 return defaultValue.isBigDecimal();
10 }
11 else{
12 return defaultValue.isTextual();
13 }
case STRING:
case ENUM:
case FIXED:
17 return defaultValue.isTextual();
case INT:
case LONG:
case FLOAT:
case DOUBLE:
return defaultValue.isNumber();
case BOOLEAN:
return defaultValue.isBoolean();
case NULL:
return defaultValue.isNull();
case ARRAY:
if (!defaultValue.isArray())
return false;
for (JsonNode element : defaultValue)
if (!isValidDefault(schema.getElementType(), element))
return false;
return true;
case MAP:
if (!defaultValue.isObject())
return false;
for (JsonNode value : defaultValue)
if (!isValidDefault(schema.getValueType(), value))
return false;
return true;
case UNION: // union default: first branch
return isValidDefault(schema.getTypes().get(0), defaultValue);
case RECORD:
if (!defaultValue.isObject())
return false;
for (Field field : schema.getFields())
if (!isValidDefault(field.schema(),
defaultValue.has(field.name())
? defaultValue.get(field.name())
: field.defaultValue()))
return false;
return true;
default:
return false;
}
}

after the fix, replace the jar file in $HIVE_HOME and $CONFLUENT_KAFKA_HOME installation dir.

最新文章

  1. 找到文字进行标记(replaceWith)
  2. 编译器角度看C++复制构造函数
  3. 【python】id()函数
  4. hadoop命令行命令
  5. selenium打开带有扩展的chrome
  6. C#获取当前应用程序所在路径及环境变量
  7. mysql导入.sql文件
  8. vs远程调试
  9. PYTHON:新闻聚合
  10. Linux基础-最基础
  11. 【原】无脑操作:IDEA + maven + Shiro + SpringBoot + JPA + Thymeleaf实现基础认证权限
  12. Hive默认数据库修改配置
  13. mac下supervisor安装及简单配置
  14. Docker Kubernetes 健康检查
  15. CSS实现水平居中的5种思路
  16. vue插件ele使用小坑
  17. 动态的调用服务端的WCF中的方法
  18. USACO humble
  19. spring中AOP
  20. Java——基于java自身包实现消息系统间的通信(TCP/IP+BIO)

热门文章

  1. TListView控件的ReadOnly属性的一个Bug
  2. 50道sql练习题及答案与详细分析
  3. MySQL之三张表关联
  4. VueRouter基础
  5. STM32F10XX学习笔记的石墨连接
  6. vmware 共享文件夹不显示文件的问题
  7. jquery限定文字超出用省略号
  8. C# 通过 参数返回 C++ 指针
  9. java 设计模式 --委派模式
  10. vue上传大文件控件