一. 使用原生Api

1.RabbitMQ 相关

      <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
class RabbitMqContex {
private static final String host = "127.0.0.1";
private static final int port = 5672; public static ConnectionFactory getFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername("guest");
factory.setPassword("guest"); return factory;
} public static Connection getFactoryConnection() throws IOException, TimeoutException {
return getFactory().newConnection();
} public static Channel getChannel() throws IOException, TimeoutException {
return getFactoryConnection().createChannel();
}
}
class DelayProducer {
public static void publish(String exchange, String content, int delayMillseconds) throws Exception {
Channel channel = null;
String delayExchangeName = exchange + "_delay";
String delayQueueName = delayExchangeName + "->queue_delay";
String delayRouteKey = "dead";
//设置死信队列参数
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", exchange);
try {
channel = RabbitMqContex.getChannel();
channel.exchangeDeclare(exchange, "fanout", true, false, null);
channel.exchangeDeclare(delayExchangeName, "direct", true, false, null);
channel.queueDeclare(delayQueueName, true, false, false, arguments);
channel.queueBind(delayQueueName, delayExchangeName, delayRouteKey);
//设计消息超时参数
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(Integer.toString(delayMillseconds)).build();
channel.basicPublish(delayExchangeName, delayRouteKey, properties, content.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception ex) { }
}
}
}
}

2.调用如下

  public static void main(String[] args) throws Exception {

        DelayProducer.publish("rewardSuccess", "{\"customerId\":123032}", 60 * 1000);
}

二使用spring的 RabbitTemplate 和 RabbitAdmin

1.注入bean

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; /**
* 系统配置
*
* @author
* @date 2017/10/19
*/
@Configuration
public class RabbitMqConfig { @Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username:guest}")
private String username;
@Value("${spring.rabbitmq.password:guest}")
private String password; @Bean(name = "rabbitMq.connectionFactory")
public ConnectionFactory getConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
return factory;
} @Bean
public RabbitTemplate getRabbitTemplate(
@Qualifier("rabbitMq.connectionFactory") ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
} @Bean
public RabbitAdmin getRabbitAdmin(
@Qualifier("rabbitMq.connectionFactory") ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
} @Bean
public MessageProperties getRabbitTemplateMessageProperties() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setHeader("content_encoding", "JSON");
return messageProperties;
}
}

2.发送工具类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; /**
* 功能描述:
*
* @version 1.0
* @author:
* @createDate: 2017/10/19
*/
@Configuration
@Slf4j
public class RabbitMqSender {
/**
* rabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate; /**
* rabbitAdmin
*/
@Autowired
private RabbitAdmin rabbitAdmin; /**
* messageProperties
*/
@Autowired
private MessageProperties messageProperties; /**
* init
*/
@PostConstruct
public void init() {
} private void declareExchange(String exchange) {
rabbitAdmin.declareExchange(new FanoutExchange(exchange, true, false, null));
} private MessageProperties getMessageProperties(Map<String, String> header) {
if (header == null) {
return this.messageProperties;
} MessageProperties customMessageProperties = new MessageProperties();
customMessageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
customMessageProperties.setHeader("content_encoding", "JSON");
for (Map.Entry<String, String> item : header.entrySet()) {
customMessageProperties.setHeader(item.getKey(), item.getValue());
}
return customMessageProperties;
} /**
* sendMessage
*/
public void publish(String exchange, String content) {
publish(exchange, content, (Map) null);
} /**
* sendMessage
*/
public void publish(String exchange, String content, Map<String, String> properties) {
publish(exchange, content, getMessageProperties(properties));
} /**
* sendMessage
*/
private void publish(String exchange, String content, MessageProperties messageProperties) {
declareExchange(exchange);
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
try {
rabbitTemplate.send(exchange, "", message);
log.debug("推送给exchange:{},消息体:{} 成功", exchange, content);
} catch (Exception e) {
log.error("推送给exchange:{},消息体:{} 失败!!", exchange, content, e);
throw e;
}
} public void delayPublish(String exchange, String content, int millseconds) {
String delayExchangeName = exchange + "_delay";
String delayQueueName = delayExchangeName + "->queue_delay";
String delayRouteKey = "dead";
Map<String, Object> arguments = new HashMap<>();
arguments.putIfAbsent("x-dead-letter-exchange", exchange);
declareExchange(exchange);
declareExchange(delayExchangeName);
rabbitAdmin.declareQueue(new Queue(delayQueueName, true, false, false, arguments));
rabbitAdmin.declareBinding(new Binding(delayQueueName, Binding.DestinationType.QUEUE, delayExchangeName,
delayRouteKey, Collections.emptyMap()));
MessageProperties messageProperties = getMessageProperties(Collections.emptyMap());
messageProperties.setExpiration(Integer.toString(millseconds));
publish(delayExchangeName, content, messageProperties);
}
}

最新文章

  1. 添加事件及Event对象的兼容写法
  2. Redis主从在线互相切换
  3. linux包之gdb之gdb命令与core文件产生
  4. angular.js_$scope
  5. 从mysql数据库取一条记录里的某个字段的值
  6. 软件161A 张慧敏
  7. 【spring源码分析】IOC容器初始化(五)
  8. 计蒜客 2019 蓝桥杯省赛 B 组模拟赛(三)数字拆分
  9. 前后端分离djangorestframework——路由组件
  10. 谈一谈java里面的反射机制
  11. windows服务器的误解
  12. 第五天 py if使用
  13. 用HTML5 video标签插入视频,在谷歌浏览器上播放视频时,右下角显示的下载按钮怎么去掉
  14. 【机器学习算法-python实现】採样算法的简单实现
  15. 最全Java学习路线图——Java学习指南
  16. 关于location.href赋值的php用法
  17. js正则表达式test方法、exec方法与字符串search方法区别
  18. [GO]使用select实现斐波那契
  19. C++的引用本质上等同于C的指针
  20. 代码阅读:AFNetworking背后的思想

热门文章

  1. [洛谷P4172] WC2006 水管局长
  2. 51nod 1514 美妙的序列 分治NTT + 容斥
  3. 虚拟机CentOS7安装docker并搭建Gitlab私服
  4. python之-框架
  5. PHP 三元运算符?:的小坑
  6. linux使用apache发布静态html网页
  7. DR 项目小结
  8. 《图解设计模式》读书笔记3-3 Builder模式
  9. delphi 牛逼 了 app (已在软件界掀起波澜)10分钟10行代码做出让人惊叹的程序
  10. websocket初体验