参考程序员DD大佬的文章,自己新建demo学习学习,由于需要消息回执,看到了@SendTo这个注解能够实现,下面开始学习demo,新建两个项目cloud-stream-consumer消费端 和 cloud-stream-consumer 生产端

public interface StreamReceive {

    @Input("MQRece")
SubscribableChannel mqReceive();
}

添加一个StreamReceive接口,定义@input通道

@Component
@Slf4j
public class ReceiveListener { @StreamListener("MQRece")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
return "ok".getBytes();
} }

添加消息监听,接受消息定义为byte[]

添加application.properties配置文件信息

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876
spring.cloud.stream.bindings.MQRece.destination=message-topic
spring.cloud.stream.bindings.MQRece.group=rece-group
server.port=19999

为MQRece通道添加主题message-topic,组名rece-group

到此Stream 客户端消费就完成了,本节需要把@SendTo注解用起来,需要新建一个MessageChannel进行产生消息

public interface MsgBackPush {

    @Output("back-push")
MessageChannel backPush();
}

然后在ReceiveListener添加@SendTo

@Component
@Slf4j
public class ReceiveListener { @StreamListener("MQRece")
@SendTo("back-push")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
return "ok".getBytes();
} }

新增通道配置application.properties

spring.cloud.stream.bindings.back-push.destination=back-topic
spring.cloud.stream.bindings.back-push.group=back-group

SpringBoot启动类记得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})

@SpringBootApplication
@EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
public class CloudStreamConsumerApplication { public static void main(String[] args) {
SpringApplication.run(CloudStreamConsumerApplication.class, args);
} }

到此,cloud-stream-consumer这个demo就完成了

接下来看看 cloud-stream-producer

public interface StreamPush {

    @Output("MQPush")
MessageChannel mqPush();
}

定义一个通道名为MQPush,进行消息生产

public interface ProducerReceive {

    @Input("producer-receive")
SubscribableChannel producerReceive();
}

定义一个通道名为producer-receive,进行回执消息的消费

@Component
@Slf4j
public class ProducerListener {
@StreamListener("producer-receive")
public void producerReceive(byte[] bytes){
log.info("come back message:"+new String(bytes));
}
}

具体回执消息处理逻辑,再来看看application.properties

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876
spring.cloud.stream.bindings.MQPush.destination=message-topic
spring.cloud.stream.bindings.MQPush.group=push-group spring.cloud.stream.bindings.producer-receive.destination=back-topic
spring.cloud.stream.bindings.producer-receive.group=back-group server.port=20000

为通道设置topic和group,新建一个Http接口测试一下成果

@SpringBootApplication
@EnableBinding(value = {StreamPush.class,ProducerReceive.class})
@RestController
public class CloudStreamProducerApplication { public static void main(String[] args) {
SpringApplication.run(CloudStreamProducerApplication.class, args);
} @Autowired
private StreamPush streamPush; @GetMapping("/sendMessage")
public String sendMessage(){
streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build());
return "ok";
}
}

访问http://localhost:20000/sendMessage,结果图如下

cloud-stream-consumer日志输出

cloud-stream-producer日志输出

学习@ServiceActivator这个注解,上面的项目cloud-stream-consumer ReceiveListener类中添加

@Component
@Slf4j
public class ReceiveListener { @StreamListener("MQRece")
@SendTo("back-push")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
// 抛出异常
if(1==1){
throw new RuntimeException("Message consumer failed!");
}
return "ok".getBytes();
} @Autowired
private MsgBackPush msgBackPush; @ServiceActivator(inputChannel = "message-topic.rece-group.errors")
public void error(Message<?> message){
log.info("消费者消费消息失败:"+message);
msgBackPush.backPush().send(MessageBuilder.withPayload("消息消费失败".getBytes()).build());
}
}

通过使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某个通道的错误处理映射。其中,inputChannel的配置中对应关系如下:

  • message-topic:消息通道对应的目标(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置)
  • rece-group:消息通道对应的消费组(group,即:spring.cloud.stream.bindings.MQRece.group的配置)

访问http://localhost:20000/sendMessage,结果图如下

cloud-stream-consumer日志输出

cloud-stream-producer日志输出

个人联系方式QQ:944484545,欢迎大家的加入,分享学习是一件开心事

最新文章

  1. c语言选择排序
  2. 在IIS中部署ASP.NET 5应用程序遭遇的问题
  3. Linux操作系统学习笔记
  4. Android APK 文件自动安装
  5. Cisco IOS basic system management command reference
  6. 黄聪:jquery mobile通过a标签页面跳转后,样式丢失、js失效的解决方法
  7. 如何将开源项目部分代码作为private放在github上?
  8. Java将整个文件夹里的文本中的字符串替换成另外一个字符串(可用于项目复制,变成另一个项目)
  9. java堆栈 (转)
  10. JS将时间戳转换为JS Date类型
  11. hdu4602(矩阵快速幂)
  12. 【JVM命令系列】javap
  13. Database Design Guidelines
  14. 使用sessionStorage、localStorage存储数组与对象
  15. 微信网页浏览器打开链接后跳转到其他浏览器下载APK文件包
  16. json、demjson
  17. Python2.X和Python3.X的w7同时安装使用
  18. python基础--------字符串的调用详解(2)
  19. Deepin 15.4 更改为 阿里云源
  20. bzoj 2527 Meteors - 整体二分 - 树状数组

热门文章

  1. jq动画和停止动画
  2. HDU 1114 完全背包问题
  3. 最短路径Dijkstra算法和Floyd算法整理、
  4. 利用谓词实现List&lt;&gt;的Find等高级操作
  5. Native memory allocation (mmap) failed to map 142606336 bytes for committing reserved memory.
  6. MySQL高级配置
  7. Mysql怎样控制replace替换的次数?
  8. 插播一条 WMI修复教程
  9. 21个项目玩转深度学习:基于TensorFlow的实践详解02—CIFAR10图像识别
  10. Moq基础 判断方法被执行