首页 技术 正文
技术 2022年11月18日
0 收藏 835 点赞 3,768 浏览 6450 个字

配置:

spring:  rabbitmq:    addresses:    connection-timeout:    username: guest    password: guest    publisher-confirms: true    publisher-returns: true

依赖:

  <!--rabbitmq -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>
<dependency>    <groupId>org.projectlombok</groupId>    <artifactId>lombok</artifactId></dependency>

配置类:

package com.jds.rabbitmq;import com.jds.common.constant.QueueEnum;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MQConfig {    public static final String RBA_QUEUE = "rba.queue";/*    public static final String QUEUE = "queue";    public static final String TOPIC_QUEUE1 = "topic.queue1";    public static final String TOPIC_QUEUE2 = "topic.queue2";    public static final String HEADER_QUEUE = "header.queue";    public static final String TOPIC_EXCHANGE = "topicExchage";    public static final String FANOUT_EXCHANGE = "fanoutxchage";    public static final String HEADERS_EXCHANGE = "headersExchage";    *//**     * Direct模式 交换机Exchange     * *//*    @Bean    public Queue queue() {        return new Queue(QUEUE, true);    }    *//**     * Topic模式 交换机Exchange     * *//*    @Bean    public Queue topicQueue1() {        return new Queue(TOPIC_QUEUE1, true);    }    @Bean    public Queue topicQueue2() {        return new Queue(TOPIC_QUEUE2, true);    }    @Bean    public TopicExchange topicExchage(){        return new TopicExchange(TOPIC_EXCHANGE);    }    @Bean    public Binding topicBinding1() {        return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");    }    @Bean    public Binding topicBinding2() {        return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");    }    *//**     * Fanout模式 交换机Exchange     * *//*    @Bean    public FanoutExchange fanoutExchage(){        return new FanoutExchange(FANOUT_EXCHANGE);    }    @Bean    public Binding FanoutBinding1() {        return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());    }    @Bean    public Binding FanoutBinding2() {        return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());    }    *//**     * Header模式 交换机Exchange     * *//*    @Bean    public HeadersExchange headersExchage(){        return new HeadersExchange(HEADERS_EXCHANGE);    }    @Bean    public Queue headerQueue1() {        return new Queue(HEADER_QUEUE, true);    }    @Bean    public Binding headerBinding() {        Map<String, Object> map = new HashMap<String, Object>();        map.put("header1", "value1");        map.put("header2", "value2");        return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();    }*/    /**     * 订单消息实际消费队列所绑定的交换机     */    @Bean    DirectExchange orderDirect() {        return (DirectExchange) ExchangeBuilder                .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())                .durable(true)                .build();    }    /**     * 订单延迟队列队列所绑定的交换机     */    @Bean    DirectExchange orderTtlDirect() {        return (DirectExchange) ExchangeBuilder                .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())                .durable(true)                .build();    }    /**     * 订单实际消费队列     */    @Bean    public Queue orderQueue() {        return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());    }    /**     * 订单延迟队列(死信队列)     */    @Bean    public Queue orderTtlQueue() {        return QueueBuilder                .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键                .build();    }    /**     * 将订单队列绑定到交换机     */    @Bean    Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){        return BindingBuilder                .bind(orderQueue)                .to(orderDirect)                .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());    }    /**     * 将订单延迟队列绑定到交换机     */    @Bean    Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){        return BindingBuilder                .bind(orderTtlQueue)                .to(orderTtlDirect)                .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());    }}

发送者:

package com.jds.rabbitmq;/** * @program: red-bag-activity->CancelOrderSender * @description: * @author: cxy * @create: 2019-12-20 17:57 **/import com.jds.common.constant.QueueEnum;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * 取消订单消息的发出者 * */@Componentpublic class CancelOrderSender {    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);    @Autowired    private AmqpTemplate amqpTemplate;    public void sendMessage(Long orderId,final long delayTimes){        //给延迟队列发送消息        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {            @Override            public Message postProcessMessage(Message message) throws AmqpException {                //给消息设置延迟毫秒值                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));                return message;            }        });        LOGGER.info("send delay message orderId:{}",orderId);    }}

接受者:

package com.jds.rabbitmq;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * 取消订单消息的处理 */@Component@RabbitListener(queues = "mall.order.cancel")public class CancelOrderReceiver {    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);    @RabbitHandler    public void handle(Long orderId){        System.err.println(System.currentTimeMillis());        LOGGER.info("receive delay message orderId:{}",orderId);        System.err.println(System.currentTimeMillis());        System.err.println(orderId);        System.out.println("大傻逼222222");    }}

配置调用类:

package com.jds.controller;import com.jds.rabbitmq.CancelOrderSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;/** * @program: red-bag-activity->DelayQController * @description: * @author: cxy * @create: 2019-12-20 18:00 **/@RestControllerpublic class DelayQController {    @Autowired    CancelOrderSender cancelOrderSender;    @RequestMapping(value = "/id", method = RequestMethod.GET)    @ResponseBody    public void detail() {         * ;        //发送延迟消息        cancelOrderSender.sendMessage(12333333L, delayTimes);        System.out.println(12333333L);        System.err.println(delayTimes);        System.out.println(System.currentTimeMillis());        System.out.println("  大傻逼");        }}

测试结果:

-- ::-exec-] com.jds.rabbitmq.CancelOrderSender [] -| send delay message orderId:  大傻逼-- ::-] com.jds.rabbitmq.CancelOrderReceiver [] -| receive delay message orderId:大傻逼222222

调用:

http://localhost:8082/id,可以看到时间会延迟三秒。

实现主要地方:

new MessagePostProcessor() {            @Override            public Message postProcessMessage(Message message) throws AmqpException {                //给消息设置延迟毫秒值                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));                return message;            }

给参数设置延迟时间

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,964
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,486
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,331
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,114
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,747
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,781