整个项目目录结构如下:

JmsSpringContext.java

package com.wulj.jms.internal.activisor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext; import com.wulj.jms.message.Message;
import com.wulj.jms.util.AmqUtils; /**
* @author:wulj
* @功能描述: 启动JMS
* @created:2019-8-12
*/
public class JmsSpringContext { private static final Logger log = LoggerFactory.getLogger(JmsSpringContext.class); private static ApplicationContext context; private static Object lock = new Object(); public synchronized void startup() {
log.info("start instance JmsSpringContext !! ");
initContext();
log.info("startup JmsSpringContext success!! ");
} public static ApplicationContext getApplicationContext() {
initContext();
return context;
} private static void initContext() {
synchronized (lock) {
if (context == null) {
context = new GenericXmlApplicationContext("classpath*:applicationContex.xml");
JmsSpringContext jmsSpringContext = new JmsSpringContext();
jmsSpringContext.sendTestTopic();
}
}
} /**
* @author:wulj
* @date:2019-8-12
* @Description:发送测试jms数据,检验是否正常
* @return void: 返回值类型
* @throws
*/
private void sendTestTopic() {
LoggerFactory.getLogger(this.getClass()).info("JMS模块已启动");
Message m = new Message();
m.setMessageType("testSendMessage");
try {
AmqUtils.sendTopic("test.test1", m);
}
catch (Exception e) {
e.printStackTrace();
}
} }

JmsExceptionListener.java

package com.wulj.jms.internal.listener;

import javax.jms.ExceptionListener;
import javax.jms.JMSException; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; /**
*
* @author wulj
*/
@Component
public class JmsExceptionListener implements ExceptionListener{ private static final Logger logger = LoggerFactory.getLogger(JmsExceptionListener.class); public void onException(JMSException exception) {
logger.error("监听Direcotor的JMS出错:{}" + exception.getMessage(),exception); } // @Override
// public void onException(JMSException exception) {
// logger.error("监听Direcotor的JMS出错:{}" + exception.getMessage(),exception);
// } }

MessageSender.java

package com.wulj.jms.internal.sender;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component; /**
* 消息发送器
*
* @author wulj
*/
@Component
public class MessageSender { private static final Logger logger = LoggerFactory.getLogger(MessageSender.class); private final JmsTemplate jmsTemplate; @Autowired
public MessageSender(final JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
} public void sendTopic(final String topic, final Object message) {
jmsTemplate.setPubSubDomain(true);
try{
jmsTemplate.convertAndSend(topic, message);
}catch (org.springframework.jms.UncategorizedJmsException e){
logger.error("JMS消息服务器连接失败:{"+e.getMessage()+"}");
}
} public void sendQueue(final String queue, final Object message) {
jmsTemplate.setPubSubDomain(false);
jmsTemplate.convertAndSend(queue, message);
} }

JmsPropertyPlaceholderConfigurer.java

package com.wulj.jms.internal.spring;

import java.io.File;

import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource; /**
*
* @author 张炎民
*
*/
public class JmsPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigurer { private static final String JMS_FILE_PROPERTIES = "jms.properties"; @Override
public void setLocation(Resource location) {
try {
FileSystemResource resource = new FileSystemResource(getHome() + JMS_FILE_PROPERTIES);
super.setLocation(resource);
} catch (Exception e) {
e.printStackTrace();
}
} private static String getHome() {
String path = "jms";
String osType = System.getProperty("os.name");
String userHome = System.getProperty("user.home");
String fileSeparator = System.getProperty("file.separator");
String homePath;
if (osType.toLowerCase().contains("windows")) {
homePath = userHome + (userHome.endsWith(fileSeparator) ? "" : fileSeparator) + "." + path + fileSeparator;
}
else {
homePath = fileSeparator + "opt" + fileSeparator + path + fileSeparator + "config" + fileSeparator;
} File file = new File(homePath);
if (!file.exists()) {
file.mkdir();
}
return homePath;
}
}

Message.java

package com.wulj.jms.message;

import java.io.Serializable;

/**
* @author wulj
*/
public class Message implements Serializable {
private static final long serialVersionUID = 4009232658158976386L; private String messageType;
private String messageName;
private String messageDesc;
public String getMessageType() {
return messageType;
}
public void setMessageType(String messageType) {
this.messageType = messageType;
}
public String getMessageName() {
return messageName;
}
public void setMessageName(String messageName) {
this.messageName = messageName;
}
public String getMessageDesc() {
return messageDesc;
}
public void setMessageDesc(String messageDesc) {
this.messageDesc = messageDesc;
} }

AmqUtils.java

package com.wulj.jms.util;

import java.io.IOException;
import java.lang.reflect.Field; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.wulj.jms.internal.activisor.JmsSpringContext;
import com.wulj.jms.internal.sender.MessageSender;
import com.wulj.jms.message.Message; /**
* ActiveMQ工具类,用于获取消息发送器等
*
* @author wulj 2019-8-12
*/
public class AmqUtils {
// public static String taskBody;
private static final Logger logger = LoggerFactory.getLogger(AmqUtils.class); /**
* 获取消息发送器
*
* @return
*/
private static MessageSender getSender() {
return JmsSpringContext.getApplicationContext().getBean(MessageSender.class);
} public static void sendTopic(String topic, Message message) {
try {
MessageSender sender = getSender();
if (sender != null) {
ActiveMQTextMessage textMsg = createMsg(message);
if (textMsg != null) {
sender.sendTopic(topic, textMsg);
}
}
else {
logger.warn("发送Topic失败:" + message.toString());
}
}
catch (Exception e) {
e.printStackTrace();
try {
logger.warn("发送Topic失败:" + message.toString(), e);
}
catch (Exception e1) {
e1.printStackTrace();
}
}
} public static void sendTopic(String ip,String topic, Message message) {
try {
MessageSender sender = getSender();
if (sender != null) {
ActiveMQTextMessage textMsg = createMsg(message);
if (textMsg != null) {
sender.sendTopic(topic, textMsg);
}
}
else {
logger.warn("发送Topic失败:" + message.toString());
}
}
catch (Exception e) {
e.printStackTrace();
try {
logger.warn("发送Topic失败:" + message.toString(), e);
}
catch (Exception e1) {
e1.printStackTrace();
}
}
} public static void sendQueue(String queue, Message wceMessage) {
MessageSender sender = getSender(); if (sender != null) {
ActiveMQTextMessage textMsg = createMsg(wceMessage);
if (textMsg != null) {
sender.sendQueue(queue, textMsg);
}
}
} private static ActiveMQTextMessage createMsg(Message message) {
try {
ActiveMQTextMessage text = new ActiveMQTextMessage();
Field[] fields = message.getClass().getFields();
for (int i = 0; i < fields.length; i++) {
Field field = fields[i];
if("serialVersionUID".equals(field.getName()) || field.get(message)==null) {
continue;
}
field.setAccessible(true);
text.setProperty(field.getName(), field.get(message)); }
return text;
} catch (IllegalArgumentException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalAccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} return null;
} public static void sendTest() {
MessageSender sender = getSender(); if (sender != null) {
try {
ActiveMQTextMessage text = new ActiveMQTextMessage();
text.setStringProperty("ComponentType", "Workload");
text.setStringProperty("ComponentValue", "stoppedWorkload");
text.setStringProperty("EventText", "你好世界!");
text.setLongProperty("Progress", 100);
text.setStringProperty("ResourceId", "2012");
text.setStringProperty("ResourceName", "虚机");
String topic = "wulj_test_test";
sender.sendTopic(topic, text);
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
}

StringUtil.java

package com.wulj.jms.util;

import java.io.UnsupportedEncodingException;
import java.sql.Blob;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; public class StringUtil {
/**
* @param str
* @return
*/
public static String replaceNull(String str) {
if (str == null || str.equals("") || str.equals("null")) {
str = "";
}
return str;
} /**
* 将Blob类型的值转为字符串
*
* @param cont
* @return
*/
public static String readBlobAsString(Blob cont) {
if (cont == null) {
return null;
} try {
return new String(cont.getBytes(1L, (int)cont.length()));
}
catch (SQLException e) {
return null;
}
} /**
* @author:wangyy
* @date:2015-7-10
* @Description:遍历传进来的数,并以传入的分割符进行分割
* @return void: 返回值类型
* @throws
*/
public static String traversalNum(Integer num, String splitStr) {
if (num == null) {
return null;
} if (StringUtils.isEmpty(splitStr)) {
splitStr = ",";
}
StringBuffer str = new StringBuffer();
for (int i = 0; i < num; i++) {
str.append(i);
if (i != num - 1) {
str.append(splitStr);
}
}
return str.toString();
} /**
* 根据字节长度截取字串
*
* @param input
* @param n
* @return
*/
public static String substrb(String input, int n) {
if (n <= 0) {
return "";
} byte[] bytes = input.getBytes();
if (n >= bytes.length) {
return input;
} byte[] data = new byte[n]; // 结果 int x = 0;
int y = 0;
while (x < n) {
int length = Character.toString(input.charAt(y)).getBytes().length;
if (length > 1) { // 双字节
if (x >= n - 1) { // 如果是最后一个字节
break;
} data[x] = bytes[x];
data[x + 1] = bytes[x + 1];
x += 2;
y++;
}
else {
data[x] = bytes[x];
x++;
y++;
}
}
return new String(data, 0, x);
} /**
* 得到字符串的半角长度
*
* @param strInput
* 待检测的字符串
* @return 字符串的长度
*/ public static int length(String strInput) {
if (!isNull(strInput)) {
int i = 0;
Pattern p = Pattern.compile("[\u4E00-\u9FA5]");
Matcher m = p.matcher(strInput);
while (m.find())
i++;
return strInput.length() + i / 2;
}
return 0;
} /**
* 得到字符串是否为空
*
* @param strInput
* 待检测的字符串
* @return 若为空返回 true
*/
@Deprecated
public static boolean isNull(String strInput, String... strings) {
if (strInput != null && !strInput.trim().equals("")) {
return false;
}
for (String s : strings) {
if (!isNull(s)) {
return false;
}
}
return true;
} /**
* 得到长度为ilength的格式化字符串
*
* @param strInput
* 待检测的字符串
* @param ilength
* 目标长度
* @param isLeft
* 为true时从左侧加入空格,否则从右加入
* @return 格式化后的字符串
*/ public static String formatToLength(String strInput, int ilength, boolean isLeft) {
if (!isNull(strInput)) {
if (length(strInput) < ilength) {
if (isLeft) {
String temp = " ";
for (int i = length(strInput); i < ilength; i++) {
temp = temp.concat(" ");
}
strInput = temp.concat(strInput);
}
else {
for (int i = length(strInput); i < ilength; i++) {
strInput = strInput.concat(" ");
}
}
}
return strInput;
} StringBuffer sb = new StringBuffer();
for (int i = 0; i < ilength; i++)
sb.append(" ");
return sb.toString();
} /**
* 将List<HashMap>中的值,转换为以分隔符分割的字符串
*
* @param list
* 待转换的List
* @param split
* 分割符
* @param preFix
* 添加的前缀
* @param afterFix
* 添加的后缀
* @return 转换后的字符串
*/
public static String conditionConvert(List list, String split, String preFix, String afterFix) {
StringBuilder builder = new StringBuilder();
int lsLen = list.size();
for (int index = 0; index < lsLen; index++) {
if (index > 0) {
builder.append(split);
}
builder.append(preFix);
builder.append(list.get(index).toString());
builder.append(afterFix);
} return builder.toString();
} /**
* 将List中的值,转换为以逗号分隔的字符串,每个值,以"'"分割
*
* @param list
* 待转换的List<HashMap>
* @return 转换后的字符串
*/
@Deprecated
public static String ConditionConvertStr(List list) {
return conditionConvert(list, ",", "'", "'");
} public static String convertWithCode(String str, String orgiCode, String newCode) {
try {
if (isNull(str) || isNull(newCode)) {
return str;
} byte[] bytes = str.getBytes(isNull(orgiCode) ? "ISO8859-1" : orgiCode); return new String(bytes, newCode);
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
return str;
} } /**
* 把String中的特殊字符串(",",<,>,&)转换为转义字符
*
* @param str
* 待转换的String
* @return 转换后的字符串
*/
public static String convertSpecChar(String str) {
if (str == null || str.equals("")) {
return str;
}
StringBuffer sb = new StringBuffer();
for (int i = 0; i < str.length(); i++) {
char c = str.charAt(i);
switch (c) {
case '&':
sb.append("&amp;");
break;
case '<':
sb.append("&lt;");
break;
case '>':
sb.append("&gt;");
break;
case '\'':
sb.append("&apos;");
break;
case '"':
sb.append("&quot;");
break;
default:
sb.append(c);
break;
}
}
return sb.toString();
} /**
* 获取非""值的字符串

applicationContex.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"
default-lazy-init="true">
<!-- 扫描指定包下面的所有类 -->
<context:component-scan base-package="com.wulj.jms" /> <!-- 配置文件加载器 -->
<!-- <bean id="propertyConfigurer" class="com.wulj.jms.internal.spring.JmsPropertyPlaceholderConfigurer">
<property name="location" value="jms.properties"/>
</bean> --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.215.207:61616?wireFormat.maxInactivityDurationInitalDelay=30000" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="exceptionListener" ref="jmsExceptionListener" />
<property name="sessionCacheSize" value="10" />
</bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
</bean> </beans>

jms.properties

activemq.ip=192.168.215.207

测试类:

package com.wulj.jms.test;

import com.wulj.jms.message.Message;
import com.wulj.jms.util.AmqUtils; public class TestSendMessage { public static void sendMessage() {
Message message = new Message();
message.setMessageDesc("hahahahahahaahaha");
AmqUtils.sendTopic("wulj_test_send_topic", message);
} public static void main(String[] args) {
sendMessage();
}
}

登陆http://192.168.215.207:8161/admin,可以看到Name为wulj_test_send_topic的Topics

最新文章

  1. .NET的Actor模型:Orleans
  2. T-SQL—理解CTEs
  3. 【BZOJ 2434】【NOI 2011】阿狸的打字机 fail树
  4. 【vijos1266】搜集环盖
  5. JavaScript中Get和Set访问器的实现
  6. BZOJ1324: Exca王者之剑
  7. 安卓天天练练(十一)用list绑数据
  8. Mysql 基础语法1
  9. 反射型XSS的逆袭之路
  10. inner join 与 left join 之间的区别
  11. 关于css盒子模型和BFC的理解
  12. Python内置函数(39)——locals
  13. thinkphp5 部署注意事项
  14. 【linux】Linux系统SELinux简介
  15. [转] 语音识别基本原理介绍----gmm-hmm中的embedded training (嵌入式训练)
  16. SparkException: Master removed our application
  17. java8新特性(四)_Stream详解
  18. Jquery-plugins-toastr-消息提示
  19. c# list排序的三种实现方式 (转帖)
  20. OpenACC 简单的直方图

热门文章

  1. OpenCascade建模:构建圆环API--BRepPrimAPI_MakeTortus()
  2. CF293E Close Vertices 点分治+树状数组
  3. String Compression
  4. linux 下使用命令查看jvm信息
  5. Java中的Unicode与码点
  6. sublime text3 最新 license注册码分享 2018
  7. Java缓存技术有哪些
  8. 移动端 iphone手机在中文情况下不执行keyup事件
  9. EBS 清除高速缓存
  10. fastjson解析list ,object中含有list, object中含有map