正文前先来一波福利推荐:

福利一:

百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。

福利二:

毕业答辩以及工作上各种答辩,平时积累了不少精品PPT,现在共享给大家,大大小小加起来有几千套,总有适合你的一款,很多是网上是下载不到。

获取方式:

微信关注 精品3分钟 ,id为 jingpin3mins,关注后回复   百万年薪架构师 ,精品收藏PPT  获取云盘链接,谢谢大家支持!

------------------------正文开始---------------------------

消费者:

---------------------- 构造初始化:

public RabbitMqReceiver(String host, int port, String username, String password) 
{
connFactory = new ConnectionFactory();
connFactory.setHost(host);
connFactory.setPort(port);
connFactory.setUsername(username);
connFactory.setPassword(password);
}
******************************************************************************** ---------------------- 构造初始化:
    public Channel createChannel() throws IOException {
getConnection();
Channel channel = connection.createChannel();
if (channel != null) {
           int prefetchCount = 1;
           channel.basicQos(prefetchCount);//最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。
        boolean durable = true; //Server端的Queue持久化
        channel.queueDeclare("task_queue", durable, false, false, null); 
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功创建Channel");
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver创建Channel失败");
} return channel;
}
********************************************************************************
---------------------- 取得connection实例:
private void getConnection() throws IOException
{
synchronized (this) {
if (connection == null || !connection.isOpen()) {
connection = connFactory.newConnection();
if (connection != null) {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功获取连接");
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver获取连接失败");
}
} else {
logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver连接已存在,复用此连接");
}
}
}
********************************************************************************
----------------------获取Consumer实例:
public QueueingConsumer createConsumer(Channel channel, String queueName) throws IOException {
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer); //自动消息确认打开,默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。 return consumer;
}
********************************************************************************
----------------------从从rabbitMQ提取消息并转换为对象:
private String getMessageFromMQ() {
String message = StringUtils.EMPTY;
String source = StringUtils.EMPTY;
try {
message = receiver.nextMessage(checkNotNull(consumer), 1000);
source = message;
} catch (ShutdownSignalException e) {
logger.error("", e);
} catch (ConsumerCancelledException e) {
logger.error("consumer exception", e);
} catch (InterruptedException e) {
logger.error("timeout exception", e);
}
try {
if (StringUtils.isNotBlank(message)) {
message = checkNotNull(StringUtils.substringAfter(message, "yyy:"), "xxx");
message = checkNotNull(StringEscapeUtils.unescapeJava(message), "unescape error");
int size = message.length();
if (size > 1) {
message = checkNotNull(message.substring(0, message.length() - 1), "get json-data error");// 去掉末尾的”
} else {
logger.warn(String.format("数据异常,message=%s", source));
}
}
} catch (Throwable e) {
logger.error(String.format("数据异常,message=%s", source), e);
}
return message;
}
********************************************************************************
----------------------每次读取一条消息:

public String nextMessage(QueueingConsumer consumer, long timeOut) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException {
    QueueingConsumer.Delivery delivery;
if (timeOut > 0) {
delivery = consumer.nextDelivery(timeOut);
} else {
delivery = consumer.nextDelivery();
}
if (delivery == null) {
return StringUtils.EMPTY;
} String message = new String(delivery.getBody());
return message;
}
********************************************************************************

---------------------- 在storm中创建mq实例:
SpoutOutputCollector collector;
RabbitMqReceiver receiver;
Channel channel;
QueueingConsumer consumer;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  //初始化调用一次
{
this.collector = collector;
receiver = checkNotNull(new RabbitMqReceiver(conf.get("crash.mq.host").toString(),
Integer.valueOf(conf.get("crash.mq.port").toString()), conf.get("crash.mq.user").toString(),
conf.get("crash.mq.pwd").toString()), "receiver is null");
try {
channel = checkNotNull(receiver.createChannel(), "channel is null");
consumer = checkNotNull(receiver.createConsumer(channel, conf.get("crash.mq.channel").toString()),
"comsumer is null");
} catch (Exception e) {
logger.error("init mq-client error:", e);
}
}
---------------------- 在storm中循环执行获得消息实例:
@Override
public void nextTuple()
{
String message = getMessageFromMQ();
} 生产者:
--------------------------------------------------: private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败
public class Producer {  

    private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败  

    public static void main(String[] argv) throws java.io.IOException {  

        /* 使用工厂类建立Connection和Channel,并且设置参数 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.111");// MQ的IP
factory.setPort(5672);// MQ端口
factory.setUsername("asdf");// MQ用户名
factory.setPassword("123456");// MQ密码
Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); /* 创建消息队列,并且发送消息 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "消息2";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); //Message持久化
System.out.println("生产了个'" + message + "'"); /* 关闭连接 */
channel.close();
connection.close();
} }

  

												

最新文章

  1. TStringList的bug问题
  2. java--接口和抽象类
  3. P1092 虫食算 NOIP2002
  4. sql compact 使用EF无法更新的问题?
  5. Velocity原理
  6. mono for andorid 引用外部的dll问题
  7. 二:ZooKeeper术语概念
  8. DeleteDC() 与 ReleaseDC() 的区别 [转]
  9. UVA 674 Coin Change (DP)
  10. Mybatis代码生成器 xml配置文件 连接SQL SERVER 2005
  11. 玩转iOS 9的UIDynamics(转)
  12. jmeter监控服务资源
  13. 使用MyBatis 框架犯的错误
  14. 20164305徐广皓 - Exp1 PC平台逆向破解(5)M
  15. Yii2基本概念之——行为(Behavior)
  16. Win10下JDK环境变量的设置
  17. 最小生成树之克鲁斯卡尔(kruskal)算法
  18. Web Services的学习一
  19. Java:concurrent包下面的Collection接口框架图( CopyOnWriteArraySet, CopyOnWriteArrayList,ConcurrentLinkedQueue,BlockingQueue)
  20. HDU1505 City Game 悬线法

热门文章

  1. [Svelte 3] Use an onMount lifecycle method to fetch and render data in Svelte 3
  2. LDA的参数确定和主题数确定方法
  3. 用JavaScript实现快排
  4. IntelliJ IDEA 运行项目的时候提示 Command line is too long 错误
  5. 第02组团队Git现场编程实战
  6. 泛目录程序(莲花泛目录程序/黑帽SEO/寄生虫/莲花泛目录解析/泛目录软件)
  7. C#中如何去掉字"/0"
  8. KDC添加加密
  9. Java 标准 IO 流编程一览笔录( 下 )
  10. HTML5调用本地摄像头画面,拍照,上传服务器