是什么

Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。详见

安装

本文主练习怎么用,本地搭建了一个单机版,无非就是wget、tar、start这些命令,详见

Java客户端

1.引入GAV

        <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.8.0</version>
</dependency>

2.创建配置项

用于连接Pulsar等配置

  • Yml配置

    pulsar:
    url: 10.20.30.228:6650
    # url: 10.20.30.228:6650,10.20.30.228:6651 #集群配置
  • 新增配置类

    package com.project.pulsar.conf;
    
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.PulsarClientException;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration; @Configuration
    public class PulsarConf {
    @Value("${pulsar.url}")
    String url; @Bean
    public PulsarClient pulsarFactory(){
    PulsarClient client = null;
    try {
    client = PulsarClient.builder()
    .serviceUrl("pulsar://"+url)
    .build();
    } catch (PulsarClientException e) {
    }
    return client;
    }
    }

3.验证测试

​ 通过简单生产-消费测试配置是否正常

  • 创建BaseController

    注意,subscriptionName要保证唯一

    package com.project.pulsar.base;
    
    import com.project.pulsar.conf.PulsarConf;
    import org.apache.pulsar.client.api.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController; import java.util.HashMap;
    import java.util.Map; @RestController
    public class BaseController {
    @Autowired
    PulsarConf pulsarConf; /**
    * 生产消息
    * @param msg
    * @throws PulsarClientException
    */
    @GetMapping("/base/sendMsg")
    public MessageId sendMsg(String msg) throws PulsarClientException {
    PulsarClient pulsarFactory = pulsarConf.pulsarFactory(); Producer<byte[]> producer1 = pulsarFactory.newProducer()
    .topic("my-topic") .create();
    // 然后你就可以发送消息到指定的broker 和topic上:
    return producer1.send(msg.getBytes());
    } /**
    * 手动执行获取消息
    * @throws PulsarClientException
    */
    @GetMapping("/base/comsumer")
    public void comsumerByArtificial() throws PulsarClientException {
    PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
    Consumer<byte[]> consumer = pulsarFactory.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscribe();
    Message<byte[]> receive = consumer.receive();
    System.out.println(new String(receive.getData()));
    consumer.acknowledge(receive);//确认消息被消费
    consumer.close();
    } /**
    * 自动监听消费消息
    * @throws PulsarClientException
    */
    @Bean
    public void comsumerByListener() throws PulsarClientException {
    MessageListener myMessageListener = (consumer, msg) -> {
    try {
    System.out.println("Message received: " + new String(msg.getData()));
    consumer.acknowledge(msg);
    } catch (Exception e) {
    consumer.negativeAcknowledge(msg);
    }
    };
    PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
    pulsarFactory.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscriptionByListener")
    .messageListener(myMessageListener)
    .subscribe();
    } }
  • 生产消息

    [127.0.0.1:9999/base/sendMsg?msg=Hello RB](http://127.0.0.1:9999/base/sendMsg?msg=Hello RB)

  • 消费消息

    • 在生产后,如果采用监听模式,会自动消费
    • 在生产后,如果采用手动模式,执行127.0.0.1:9999/base/comsumer会被消费,如队列中无消费,则会阻塞等待

其他及代码下载

  • topic不用显式创建,当消息发送或消费者建立连接时,如未创建会自动创建
  • 代码见此Base包下

最新文章

  1. javascript中通过匿名函数进行事件绑定
  2. Hibernate(八)__级联操作、struts+hibernate+接口编程架构
  3. Oracle 常用操作【02】数据库特性
  4. MVC5-10 ModleBinder那点事
  5. Hadoop中几个基本命令行命令
  6. angularJS ng-grid 配置
  7. SpringMVC序列化Long转成String
  8. mysql 的数据类型
  9. storm的安装配置
  10. SystemParametersInfo
  11. git config --global core.excludesfile配置gitignore全局文件
  12. linux文件属性
  13. MVC验证06-自定义错误信息
  14. CSS 中的内联元素、块级元素、display的各个属性的特点
  15. 【原创】Linux基础之sudo
  16. css实现div左侧突出一个带边框的三角形
  17. mint linux 18.3 遇到“已安装的 post-installation 脚本 返回了错误号 127 ”问题的解决
  18. slecte下拉框的多选操作及获取值的 变化
  19. Python基础-TypeError:takes 2 positional arguments but 3 were given
  20. python基础易错题

热门文章

  1. elasticsearch查询之大数据集分页查询
  2. Halcon视觉入门芯片识别
  3. JVM垃圾收集器(八)
  4. JVM学习九-(复习)HotSpot 垃圾收集器
  5. docker构建镜像 (3)
  6. js获取高度
  7. 通过版本号来判断用户是否是第一次登陆----By张秀清
  8. 02 前端基础之CSS
  9. C# 字符串计算MD5
  10. 使用java程序完成大量文件目录拷贝工作