一.Mapper和Scheme

scheme:将kafka传到spout里的数据格式进行转化. record->tuple

mapper:将storm传到kafka的数据格式进行转化.tuple->record

二.为什么要自定义消息格式

在很多需求里, 从kafka传递过来的数据并不是单纯的string, 可以是任意对象.当我们需要根据对象的某个属性进行分组时, 默认的new Fields("bytes")就不太适合.但是消息传递的形式还是string.我们可以在传入kafka之前使用fastJson的转化方法将实体对象转化成jsonString.

到了scheme在转换成实体类对象.

三.怎么更改scheme

构建kafkaSpout时我们要配置很多参数, 可以看一下kafkaConfig代码.

public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
public final String topic;//从哪个topic读取消息
public final String clientId; // SimpleConsumer所用的client id
public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
public int fetchMaxWait = 10000; //当服务器没有新消息时,消费者会等待这些时间
public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
public long maxOffsetBehind = Long.MAX_VALUE;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使startOffsetTime

可以看到, 所有的配置项都是public, 所以当我们实例化一个spoutConfig之后, 可以通过直接引用的方式进行更改属性值.

我们可以看构建kafkaspout的代码:

ZkHosts zkHosts = new ZkHosts(zkHost);
// zk对地址有唯一性标识
String zkRoot = "/" + topic;
String id = UUID.randomUUID().toString();
// 构建spoutConfig
SpoutConfig spoutConf = new SpoutConfig(zkHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new SensorDataScheme());
spoutConf.startOffsetTime = OffsetRequest.LatestTime();
KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);

四.怎么自定义scheme

我们有这样一个需求,有一个实体类如下:

public class SensorData implements Serializable {
// 设备Id;
private String deviceId;
// 型号id
private String dmPropertiesId;
// 通道名称;
private String channelName;
// 采集的温度值
private double deviceTemp;
// 采集的时间;
private Date date;
}

数据进来kafka到storm消费时, 根据deviceId进行分组.当然, 我们在写入的时候对数据json化, 使用fastjson把实体对象变成字符串, 而不是直接传实体类对象进入kafka(亲测会报错, 无法进行转换).最终数据会在scheme的declare的方法里处理.

Scheme接口:

public interface Scheme extends Serializable {
List<Object> deserialize(ByteBuffer ser);
public Fields getOutputFields();
}

可以看到有两个需要实现的方法, 一个是传过来的byte数据进行转化, 一个是传入下一层bolt的时候以什么字段分组. 跟踪kafka的源码我们可以看到, 他的declare方法最终会调用scheme的方法来确认字段名.

看一下scheme的整体代码:

package dm.scheme;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List; import org.apache.storm.kafka.StringScheme;
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils; import com.alibaba.fastjson.JSON; import dm.entity.SensorData;
/**
*
* KafkaRecord 映射 tuple 转化类;
*
* @author chenwen
*
*/
public class SensorDataScheme implements Scheme {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; /**
*
* 反序列化
*/
@Override
public List<Object> deserialize(ByteBuffer byteBuffer) {
// 将kafka消息转化成jsonString
String sensorDataJson = StringScheme.deserializeString(byteBuffer);
SensorData sensorData = JSON.parseObject(sensorDataJson, SensorData.class);
String id = sensorData.getDeviceId();
return new Values(id, sensorData);
}
public static String deserializeString(ByteBuffer byteBuffer) {
if (byteBuffer.hasArray()) {
int base = byteBuffer.arrayOffset();
return new String(byteBuffer.array(), base + byteBuffer.position(), byteBuffer.remaining());
} else {
return new String(Utils.toByteArray(byteBuffer), UTF8_CHARSET);
}
}
@Override
public Fields getOutputFields() {
return new Fields("deviceId", "sensorData"); // 返回字段及其名称;
}
}

最新文章

  1. Nodejs&#183;网络服务
  2. android target unknown and state offline解决办法
  3. Orchard源码分析(4):Orchard.Environment.OrchardStarter类
  4. Python模块 (xlsxwriter)
  5. ORM之Dapper操作Sql Server和MySql数据库
  6. JAVA网站高并发解决方案
  7. 实现类似 QQ音乐网页版 的单页面总结
  8. 敏捷开发的特点(转自MBAlib)
  9. linux教程之四
  10. POJ 2186 Popular Cows (强联通)
  11. iOS获取程序运行平台
  12. BitCoin p2p通信过程
  13. asp.net core 六 Oracle ORM
  14. 如何合理封装你的轮子、飞机、大炮(以封装OkHttp为例)
  15. TeamViewer试用期满转免费版本方法
  16. Zabbix监控系统部署:前端初始化
  17. Linux 搭建安装Maven环境
  18. Python Oracle连接与操作封装
  19. getElementsByTagName
  20. CentOS 6.5 伪分布安装

热门文章

  1. windows平台vs2010编译64位libiconv与libxml2
  2. JVM调优(二)经验参数设置
  3. [Java]Socket和ServerSocket学习笔记
  4. ORACLE获取SQL绑定变量值的方法总结
  5. Troubleshooting SQL Server RESOURCE_SEMAPHORE Waittype Memory Issues
  6. Linux修改挂载目录名称
  7. 前后端分离djangorestframework——序列化与反序列化数据
  8. Failed to decrypt protected XML node &quot;DTS:Password&quot; with error 0x8009000B &quot;Key not valid for use in specified state.&quot;. You may not be authorized to access this information. This error occurs when t
  9. Linux CFS调度器之队列操作--Linux进程的管理与调度(二十七)
  10. jdk8新特性表达式1