1、去apach网站下载ActiveMQ,地址为http://activemq.apache.org/download.html;
ActiveMQ类似tomcat可以独立启动,相当于开了消息服务,包括连接客户端管理、消息存储设置,对于复杂的系统还可以设置双机热备、负载均衡等;
ActiveMQ有个管理界面,可以查看客户端连接情况,消息队列的发送和接收情况等,也可以直接新建消息;
启动bin/activemq.bat即可启动ActiveMQ服务;
conf/activemq.xml是配置文件,可以配置相关性能参数,消息存储形式和大小等;
2、MQ的spring配置
ActimveMQ的spring配置文件如下:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- property name="brokerURL" value="tcp://127.0.0.1:61616" / -->
<property name="brokerURL" value="failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=100" />
<property name="useAsyncSend" value="true" />
<property name="maxThreadPoolSize" value="50" />
</bean>
<!-- <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="connectionFactory" />
</bean> -->
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"><ref local="connectionFactory" /></property>
</bean>
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="myqueue0"/>
</bean>
<!-- 发送消息的目的地(一个队列) -->
<bean id="vehPassTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="vehPass.messages" />
</bean>
<bean id="vehAlarmTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg index="0" value="vehAlarm.messages" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- <property name="connectionFactory" ref="connectionFactory"/> -->
<property name="connectionFactory" ref="singleConnectionFactory" />
<!-- <property name="defaultDestination" ref="destination"/> -->
<!-- 区别它采用的模式为false是p2p为true是订阅 -->
<!-- <property name="pubSubDomain" value="true" /> -->
</bean>
注意这行很关键
<property name="brokerURL" value="failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=100" />
是为了保证断线自动重连;如果没有设置,程序与ActiveMQ服务连接后期间出现过网络异常,再程序无法与ActiveMQ正常通讯;
3、测试程序
发送消息程序:
public class MessageSender{
protected Logger log = Logger.getLogger(MessageReceiver.class.getName());
private static JmsTemplate jmsTemplate = (JmsTemplate)MyApplicationContextUtil.context.getBean("jmsTemplate");
private static Destination vehAlarmTopic = (Destination)MyApplicationContextUtil.context.getBean("vehAlarmTopic");
private static Destination vehPassTopic = (Destination)MyApplicationContextUtil.context.getBean("vehPassTopic");
static MyMessageCreator mc = null;
/*
*
*/
public static boolean sendLiveMessage(Object msg){
boolean isSend = false;
mc = new MyMessageCreator(msg);
jmsTemplate.send(vehPassTopic, mc);
isSend = true;
return isSend;
}
public static boolean sendBKMessage(Object msg){
boolean isSend = false;
MyMessageCreator mc = new MyMessageCreator(msg);
jmsTemplate.send(vehAlarmTopic, mc);
isSend = true;
return isSend;
}
public static void main(String args[]) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext-mq.xml" });
JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
Destination destinationTopic = (Destination) context.getBean("destinationTopic");
for (int i = 1; i < 10; i++) {
MyMessageCreator mc = new MyMessageCreator("test");//生成消息
jmsTemplate.send(destinationTopic, mc);
Thread.sleep(1000);//1秒后发送下一条消息
}
}
}
接收消息程序: public class MessageReceiver {
protected Logger log = Logger.getLogger(MessageReceiver.class.getName());
private static JmsTemplate jmsTemplate = (JmsTemplate)MyApplicationContextUtil.context.getBean("jmsTemplate");
private static Destination vehPassTopic = (Destination)MyApplicationContextUtil.context.getBean("vehPassTopic");
/**
*
* @return
*/
public static List<Message> receiveMessage(){
List<Message> msgList = null;
Message message = null;
for (int i = 0; i < 100;) {//每次接收100条记录
message = jmsTemplate.receive(vehPassTopic);
if(message != null){
msgList.add(message);
/*if (message instanceof TextMessage) {
i++;
msg = (TextMessage)message;
returnMsg = msg.getText();
log.debug("收到消息 :" + msg.getText());
}*/
}else{
break;
}
}
return msgList;
}
public static void main(String args[]) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "applicationContext-mq.xml" });
JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
Destination destinationTopic = (Destination) context.getBean("destinationTopic");
TextMessage msg = null;
for (int i = 0; i < 10; i ++) {
msg = (TextMessage) jmsTemplate.receive(destinationTopic);
System.out.println("收到消息 :" + msg.getText());
Thread.sleep(1000);
}
}
}
消息具体发送和接收情况都可在ActiveMQ的管理界面查看。
管理界面如下:访问地址:http://127.0.0.1:8161/admin/