Message Queue(MQ)

Message Queue

Message Queue(MQ)

消息队列中间件是分布式系统中的重要组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性的架构。

使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,MetaMQ,ZeroMQ ,roketMQ等  

JMS规范

提供了操作MQ的接口

有两套消息模型

点对点模式:一个消息只能让一个消费者消费,并且消费完成后消息会从mq中删除

发布订阅模式:一个消息可以让多个消费者消费,消费完成后消息不会删除

JMS开发

(1) ConnectionFactory

创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

(2) Destination  Queue(点对点) Topic(发布订阅)

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

(3) Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

(4) Session

Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

(5) 消息的生产者 Producer

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

(6) 消息消费者 Consumer

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

(7) MessageListener 针对消费者

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。

(8) 消息的类型:

TextMessage–一个字符串对象

MapMessage–一套名称-值对

ObjectMessage–一个序列化的 Java 对象

BytesMessage–一个字节的数据流

StreamMessage — Java 原始值的数据流

activeMQ的使用
1.导入依赖
<dependencies>
   <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-all</artifactId>
       <version>5.11.2</version>
   </dependency>
</dependencies>
2.生产者代码:
// 生产者代码
public class Producer {
   public static void main(String[] args) throws Exception {
       // 创建连接工厂
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
       // 获取连接
       Connection connection = connectionFactory.createConnection();
       // 启动连接
       connection.start();
       // 创建session对象 p1:是否开启事务  p2:事务的类型
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 可以通过session创建生产者 消费者 消息等
       // 创建一个目标 点对点
       Destination destination = session.createQueue("queue_test");
       // TODO 创建一个目标 订阅模式
       //  Destination destination = session.createTopic("topic_test");

       // 创建生产者
       MessageProducer producer = session.createProducer(destination);
       // 创建消息
       TextMessage textMessage = session.createTextMessage("this is my first test");
       // 发送消息
       producer.send(textMessage);
       //  9、关闭资源
       producer.close();
       session.close();
       connection.close();
       System.out.println("message is send successfully");
   }
}
3.消费者代码
// 消费者代码
public class Consume {
   public static void main(String[] args) throws Exception {
       // 创建连接工厂
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
       // 获取连接
       Connection connection = connectionFactory.createConnection();
       // 启动连接
       connection.start();
       // 创建session对象 p1:是否开启事务  p2:事务的类型
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       // 可以通过session创建生产者 消费者 消息等
       // 创建一个目标 点对点
       Destination destination = session.createQueue("queue_test");
       // TODO 创建一个目标 订阅模式
       //  Destination destination = session.createTopic("topic_test");

       // 创建消费者
       MessageConsumer consumer = session.createConsumer(destination);
       // 创建消费消息
       consumer.setMessageListener(new MessageListener() {
           public void onMessage(Message message) {
               TextMessage textMessage = (TextMessage) message; // 因为生产者 session.createTextMessage 所有这里需要强转赋值一下
               try {
                   String text = textMessage.getText();
                   System.out.println("receive message:"+text);
               } catch (JMSException e) {
                   e.printStackTrace();
               }
           }
       });
       System.in.read();//等待控制台输入回车,如果不回车 以下代码不会执行
       //  9、关闭资源
       consumer.close();
       session.close();
       connection.close();
   }
}
Spring整合JMS

场景,商品上下架的时候要更新索引库和静态页面

1.导入依赖

<dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-jms</artifactId>
 <version>${spring.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.activemq</groupId>
 <artifactId>activemq-all</artifactId>
 <version>${activemq.version}</version>
</dependency>

2.生产者的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans  
 http://www.springframework.org/schema/beans/spring-beans.xsd
 http://www.springframework.org/schema/context  
 http://www.springframework.org/schema/context/spring-context.xsd
 http://www.springframework.org/schema/jms
 http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--<context:component-scan base-package="fun.chenqi.demo"/>-->
   <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
</bean>          
   <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
</bean>  

   <!-- Spring提供的JMS工具类,它可以进行消息发送等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <property name="connectionFactory" ref="connectionFactory"/>  
</bean>


   <!-- 这个是队列目的地  点对点的  文本信息 -->
<!-- <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
 <constructor-arg value="queue_text"/>
</bean> -->
 
<!-- 这个是队列目的地  发布订阅模式  文本信息 -->
<bean id="solrItempageUpdate" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="solr_itempage_update"/>
</bean>

<bean id="solrItempageDelete" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="solr_itempage_delete"/>
</bean>
</beans>
    @Autowired
   private JmsTemplate jmsTemplate;  
   @Autowired
   @Qualifier("solrItempageUpdate")
   private Destination solrItempageUpdate;
   @Autowired
   @Qualifier("solrItempageDelete")
   private Destination solrItempageDelete;

// 商家上下架商品
@Override
public void updateMarket(Long[] ids, String market) {
   for (Long id : ids) {
       Map map = new HashMap();
       map.put("id", id);
       map.put("market", market);
       goodsMapper.updateMarket(map);
       if (market.equals("1")) {
           // 上架
           // 如果商品上架 把商品的Id传入消息队列
           jmsTemplate.send(solrItempageUpdate, new MessageCreator() {
               @Override
               public Message createMessage(Session session) throws JMSException {
                   Message message = session.createTextMessage(String.valueOf(id));
                   return message;
               }
           });
       } else {
           // 下架
           jmsTemplate.send(solrItempageDelete, new MessageCreator() {
               @Override
               public Message createMessage(Session session) throws JMSException {
                   Message message = session.createTextMessage(String.valueOf(id));
                   return message;
               }
           });
       }

   }
}
消费者

配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:jms="http://www.springframework.org/schema/jms"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans.xsd
     http://www.springframework.org/schema/context  
     http://www.springframework.org/schema/context/spring-context.xsd
     http://www.springframework.org/schema/jms
     http://www.springframework.org/schema/jms/spring-jms.xsd">
   <!--<context:component-scan base-package="fun.chenqi.demo"/>-->
   <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
   <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
       <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
   </bean>
   <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
   <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
       <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
       <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
   </bean>


   <!-- Spring提供的JMS工具类,它可以进行消息发送等 -->
   <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
       <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
       <property name="connectionFactory" ref="connectionFactory"/>
   </bean>


   <!-- 这个是队列目的地  点对点的  文本信息 -->
   <!-- <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="queue_text"/>
   </bean> -->

   <!-- 这个是队列目的地  发布订阅模式  文本信息 -->
   <bean id="solrItempageUpdate" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="solr_itempage_update"/>
   </bean>
   <!-- 这个是队列目的地  发布订阅模式  文本信息 -->
   <bean id="solrItempageDelete" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="solr_itempage_delete"/>
   </bean>

   <!--  更新索引监听类 -->
   <bean id="updateListener" class="com.pyg.search.mq.SolrUpdateConsumer"/>
   <!-- 更新索引消息监听容器 -->
   <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
       <property name="connectionFactory" ref="connectionFactory"/>
       <property name="destination" ref="solrItempageUpdate"/>
       <property name="messageListener" ref="updateListener"/>
   </bean>

   <!--  删除索引监听类 -->
   <bean id="deleteListener" class="com.pyg.search.mq.SolrDeleteConsumer"/>
   <!--  删除索引消息监听容器 -->
   <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
       <property name="connectionFactory" ref="connectionFactory"/>
       <property name="destination" ref="solrItempageDelete"/>
       <property name="messageListener" ref="deleteListener"/>
   </bean>

</beans>
// SolrUpdateConsumer.java
public class SolrUpdateConsumer implements MessageListener {

   @Autowired
   private SolrTemplate solrTemplate;

   @Autowired
   private TbItemMapper itemMapper;

   @Override
   public void onMessage(Message message) {
       TextMessage textMessage = (TextMessage) message;

       try {
           String goodsId = textMessage.getText(); // 获取到商品的ID了
           TbItemExample example = new TbItemExample();
           example.createCriteria().andGoodsIdEqualTo(Long.valueOf(goodsId));
           List<TbItem> tbItems = itemMapper.selectByExample(example);
           for (TbItem item : tbItems) {
               String spec = item.getSpec(); // {"网络":"移动4G","机身内存":"64G"}
               Map<String, String> map = JSON.parseObject(spec, Map.class);
               item.setMap(map);
           }
           solrTemplate.saveBeans(tbItems);
           solrTemplate.commit();
           System.out.println("solr update is ok!!");

       } catch (JMSException e) {
           e.printStackTrace();
       }
   }
}
// SolrDeleteConsumer.java
public class SolrDeleteConsumer implements MessageListener {
   @Autowired
   private SolrTemplate solrTemplate;
   @Override
   public void onMessage(Message message) {
       TextMessage textMessage = (TextMessage) message;
       try {
           String goodsId = textMessage.getText(); // 获取到商品的ID了

           // TODO
           SolrDataQuery query = new SimpleQuery("item_goodsid:" + goodsId);
           solrTemplate.delete(query);
           solrTemplate.commit();
           System.out.println("solr delete is ok!!");
       } catch (JMSException e) {
           e.printStackTrace();
       }
   }
}

发表评论