首页 技术 正文
技术 2022年11月16日
0 收藏 597 点赞 4,525 浏览 4551 个字

概要

activemq 支持两种模式:

1.队列模式

2. 发布订阅者模式,topic有一个主题可以有多个订阅者。这种情况可以将一个消息,分发到多个消费者。

比如我有这样一个案例,用户需要同步,而且需要同步到多个系统,那么我们只需要队列添加一个主题,其他的子系统订阅该主题。分别处理自己的同步逻辑。

这样就实现了代码的解耦。

实现代码

1.生产者

import java.util.Map;import javax.annotation.Resource;
import javax.jms.Destination;import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.jms.core.JmsTemplate;public class TopicProducer {
@Resource(name = "topicJmsTemplate")
private JmsTemplate jmsTemplate;
private Map<String,Destination> topicMap=new java.util.concurrent.ConcurrentHashMap<>();
/**
* 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
* @param destination
* @param message
*/
public void sendMqMessage(String topicName, Object model){
Destination dest=null;
if(!topicMap.containsKey(topicName)){
dest=new ActiveMQTopic(topicName);
topicMap.put(topicName, dest);
}
else{
dest=topicMap.get(topicName);
}
jmsTemplate.convertAndSend(dest, model);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}

生产者XML配置

<!-- topic 连接工厂 -->
<bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="providerClienctConnect" />
</bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="userOrgTopic"/>
</bean> <!-- 消息发送者客户端 -->
<bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="providerConnectionFactory" />
<property name="defaultDestination" ref="topicDestination" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="receiveTimeout" value="10000" />
<!-- deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false-->
<property name="explicitQosEnabled" value="true"/>
<!-- 发送模式
DeliveryMode.NON_PERSISTENT=1:非持久 ;
DeliveryMode.PERSISTENT=2:持久
-->
<property name="deliveryMode" value="1"/>
</bean> <bean id="topicProducer" class="com.aps.core.jms.TopicProducer"></bean>

deliveryMode :

1:非持久 就是如果消息发送后,没有消费者启动,重启服务后,那么消息将会消失。

2.持久 消息发送后,消息没有被消费,重启服务后,消息依然存在。

2.配置消费者

java 代码

import javax.jms.Message;
import javax.jms.MessageListener;public class ConsumerMessageListener implements MessageListener{ @Override
public void onMessage(Message message) {
// ObjectMessage obj=(ObjectMessage)message;
// try {
// OsUser user=(OsUser) obj.getObject();
// System.out.println(user.getFullname());
// } catch (JMSException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// System.out.println("ok");
}}

xml 配置

<!-- 配置JMS连接工厂 -->
<bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://${jms.ip}:${jms.port})" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="consumerClienctConnect" />
</bean> <!-- 定义消息Destination -->
<bean id="topic1Destination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="mytopic"/>
</bean> <bean id="topic2Destination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="mytopic"/>
</bean> <!-- 配置消息消费监听者 -->
<bean id="consumerMessageListener" class="com.aps.jms.ConsumerMessageListener" /> <!-- 消息订阅客户端1 -->
<bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerConnectionFactory" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="topic1Destination" />
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumerClient1"/>
<property name="messageListener" ref="consumerMessageListener" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMode" value="1"/>
</bean> <!-- 消息订阅客户端2 -->
<bean id="consumerListenerClient2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerConnectionFactory" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="topic2Destination" />
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumerClient2"/>
<property name="messageListener" ref="consumerMessageListener" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMode" value="1"/>
</bean>

这里可以看到,我们配置了两个消费者。

我们测试的时候,发一个消息,可以看到两个收到两次消息。

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,091
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,568
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,416
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,188
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,825
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,908