springcloud集成kafka
2024-08-31 19:37:49
项目名称:布控预警 水平拆分出来的项目,作为一个单独的可以对外提供服务的项目
项目设计:springcloud,可以集成各个不同平台的一个作为对外提供的微服务项目
项目功能:实现各个平台和本平台之间的布控(对人员和摄像头进行和厂商对接可以进行实时抓拍)和预警(厂商抓拍到之后实时通知给本平台)业务。
先说明一下我的kafka在项目中的应用场景:
由于有多个平台对接本平台,中间有一些消息机制,比如摄像头抓拍到某个人员之后会由视频厂商进行通知到本平台,然后本平台根据业务来源和平台来源分别组建不同的消息进行分平台分+业务形式实现消息分发。
实现方式:
1、pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
2、application.yml
cloud:
stream:
kafka:
binder:
brokers: 127.0.0.1:9092 # kafka服务地址和端口
zk-nodes: 127.0.0.1:2181 # ZK的集群配置地址和端口
3、可以根据表中的topic名称自动初始化topic到kafka,后续如果有新增的平台和业务可以动态根据代码自动生成topic
package com.tianque.xueliang.controlalarm.config; import com.tianque.xueliang.controlalarm.domain.vo.TopicConfigVo;
import com.tianque.xueliang.controlalarm.service.TopicConfigService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component; import javax.annotation.PostConstruct;
import java.util.List; /**
* @Title: TopicConfig
* @Description: 项目启动动态读取kafka的topic配置并创建topic,创建完成回写表状态已在kafka生成topic
* @author: sxs@hztianque.com
* @date: Created in 10:25 2019/8/5
* @Modifired by:
*/
@Component
public class KafkaTopicConfig { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConfig.class); @Autowired
private TopicConfigService topicConfigService; @Autowired
private BinderAwareChannelResolver resolver; @PostConstruct
public void initKafkaTopic() {
// 获取本地数据表中的topic,只获取kafka中没有的topic
List<TopicConfigVo> list = topicConfigService.getTopicList();
try{
// 循环去生成topic,生成完毕将表中的状态更新为kafka已存在
for (TopicConfigVo topicConfigVo: list) {
String topicName = topicConfigVo.getPlatformId() + topicConfigVo.getWorkName();
// 这行代码是动态去生成topic的,先检查kafka中有没有传入的topic,有就直接返回topic,没有则新建
MessageChannel messageChannel = resolver.resolveDestination(topicName);
if (null != messageChannel) {
// 更新表中的状态为kafka中已存在改topic
topicConfigService.updateTopicStatusById(topicConfigVo.getTopicId());
}
}
}catch (Exception e) {
logger.error("kafka.topic初始化创建失败..", e);
}
}
}
@PostConstruct注解:作用在方法上,表示项目启动自动加载该方法
4、发送消息代码:
package com.tianque.xueliang.controlalarm.stream; import com.tianque.xueliang.controlalarm.domain.msg.Msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils; /**
* Created with IntelliJ IDEA.
* @author : sunxuesong
* Date: 2019/3/12
* Time: 上午11:16
* To change this template use File | Settings | File Templates.
* Description:
*/
@Service
public class WarningService {
private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired
private BinderAwareChannelResolver resolver; /**
* 发送预警消息到指定topic,这里的topic是由平台编码+平台名称组成
* 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
* @param warnings
* @param topic
* @return
*/
public String sendWarning(final Msg warnings, String topic) {
logger.info("Sending warnings {}", warnings); // 获取预警的topic,然后发送预警消息到kafka的topic
MessageChannel messageChannel = resolver.resolveDestination(topic);
messageChannel.send(MessageBuilder
.withPayload(warnings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build()); return "send msg ok";
} /**
* 发送布控消息到指定topic,这里的topic是由平台编码+平台名称组成
* 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
* @param msg
* @param topic
* @return
*/
public String sendControl(final Msg msg, String topic) {
logger.info("Sending controlMsg {}", msg);
// 获取布控的topic,然后发送布控消息到kafka的topic
MessageChannel messageChannel = resolver.resolveDestination(topic);
messageChannel.send(MessageBuilder
.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build()); return "send msg ok";
}
}
说明一下:springcloud和springboot集成kafka的方式不一样,虽然都是通过application.yml去做配置,但是jar包和配置还是有区别的。
如有不清楚欢迎留言..
最新文章
- c# Entity DbArithmeticExpression arguments must have a numeric common type
- 文件权限及chmod使用方法
- Linux命令学习总结:dos2unix - unix2dos
- Nodejs学习(四)- express目录的分析
- 使用type=";redirect";重定向,传递List等变量到jsp页面的问题
- c++嵌套类-内存分配
- Android The content of the adapter has changed but ListView did not receive a notification终极解决方法
- gtest的安装和测试[good]
- invalid mode &#39;kCFRunLoopCommonModes&#39; provided to CFRunLoopRunSpecific
- IDEA循环依赖报错解决方案
- leetcode55
- Python语言中的按位运算
- Pandas存储为Excel格式:单个xlsx文件下多sheet存储方法
- android开发 一个更优的listView的写法
- android 相对布局例子代码
- python表单验证封装
- JavaScript 代码小片段
- Linux /dev/null详解
- oracle相关链接
- sort、sorted、heapq、bisect排序