版本:

scala:2.11.8
spark:2.11
hbase:1.2.0-cdh5.14.0

报错信息:

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions

分析原因:

从指定的主题或者分区获取数据,在poll之前,你没有订阅任何主题或分区是不行的,每一次poll,消费者都会尝试使用最后一次消费的offset作为接下来获取数据的start offset,最后一次消费的offset也可以通过seek(TopicPartition, long)设置或者自动设置
通过源码可以找到:
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
// 如果没有任何订阅,抛出异常
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); // 一直poll新数据直到超时
long start = time.milliseconds();
// 距离超时还剩余多少时间
long remaining = timeout;
do {
// 获取数据,如果自动提交,则进行偏移量自动提交,如果设置offset重置,则进行offset重置
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// 再返回结果之前,我们可以进行下一轮的fetch请求,避免阻塞等待
fetcher.sendFetches();
client.pollNoWakeup();
// 如果有拦截器进行拦截,没有直接返回
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
} long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0); return ConsumerRecords.empty();
} finally {
release();
}
}
因此,需要订阅当前的topic才能消费,我之前使用的api是:(适用于非新--已经被消费者消费过的)
val inputDStream1 = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Assign[String, String](
fromOffsets.keys,kafkaParams,fromOffsets)
)
修改:(全新的topic,没有被消费者消费过)
val inputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

  

最新文章

  1. Python 【第五章】:线程、进程和协程
  2. sublime自动生成头部注释
  3. HTML5移动Web开发(二)——配置移动开发环境以及简单示例
  4. xcode报错,svn : is not a workingCopy
  5. 前端性能优化(DOM篇)
  6. 样式其他与JS脚本语言
  7. Cortex-M3/4的Hard Fault调试方法
  8. symbolicatecrash位置
  9. discuz2.0升级后不能自动跳转问题
  10. cmd&amp;Linux 下使用mysql全记录
  11. EMA计算的C#实现(c# Exponential Moving Average (EMA) indicator )
  12. 你确定你是一个合格的.Net开发人员吗?
  13. NPM实用指北
  14. 学unity3d需要什么基础
  15. Angular结构型指令,模块和样式
  16. 2019-oo-第二次总结
  17. 【消灭代办】第5周 - null拷贝,input自适应,进度条加载,颜色随机值
  18. Restful架构学习
  19. jvm常见的面试题
  20. Django登陆以后重定向到请求登陆的页面

热门文章

  1. 030_CORS深究
  2. 漏洞扫描工具Nessu的安装和简单使用
  3. Java替换中使用正则表达式实现中间模糊匹配
  4. elasticsearch6.3.1 安装以及配置IK 使用
  5. ACM-ICPC 2018 焦作赛区网络预赛 G Give Candies
  6. 原来商家登录系统的commonjs
  7. Hibernatede 一对多映射配置
  8. 【Java】「深入理解Java虚拟机」学习笔记(2)- JVM内存区域
  9. clock gen sdk 代码笔记
  10. servlet 会话管理