百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

用rabbitmq实现消息重发的功能

csdh11 2025-03-12 13:40 21 浏览

前言:

在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被throws或被catch,没有往下走。再比如对方接收代码出现问题,或者网络问题,接口没通,同步失败。

遇到上面同步失败的情况,就会影响到业务的正常使用了,本文只讨论第二种调用失败的情况(第一种情况可以把同步代码封装起来,提供一个接口出来用于手动调用hhhh,很笨但是很救命的办法),所以必须要加入重发机制,来让程序更加的健壮。

因为项目中也用到了rabbitmq,所以第一时间就想到了死信队列,之前的一种实现方式是使用死信队列的超时时间特性,第一次失败后,把参数放入死信队列,但是参数的bean中要有一个记录次数的值,第一次放的时候set为1。在消息超时后放入死信队列,被监听到时,再去调用接口,如果失败了,就按照次数去计算下一次执行的时间,然后重新放入到参数的bean中,再把bean重新放入到正常消费队列中,直到下一次消息超时被死信队列接收。不过这样的是要在代码中设置一个最大循环次数的,否则调用不通的情况下,会一直循环。如果成功了,那么我们就手动调用下 channel.basicAck 去手动签收一下(这里是要在配置中把自动签收改成手动签收)。


spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收

这种方式虽然实现了功能,但是确增大了代码量,尤其是需要增添2个队列(一个正常消费队列,一个消费队列绑定的死信队列),不太方便扩展,而且也增加了复杂度,所以不太推荐这样的写法(代码就不贴了,如果有人感兴趣,可以给我留言,到时候我再更新)。

为了优化掉现有代码,于是我就又重新研究了一下,发现不就是重发么,rabbitmq早就给我们准备好了,-_-|| 自己这又是死信又是手动签收的,,,一顿操作,确实浪费了不少功夫。

实现过程:

那下面我们来看一下是如何实现的。

首先写贴下配置

##rabbit地址

spring.rabbitmq.addresses=amqp://guest:guest@localhost:5672

# 开启重发

spring.rabbitmq.listener.simple.retry.enabled=true

# 最大重发次数

spring.rabbitmq.listener.simple.retry.max-attempts=10

# 重试间隔时间 单位毫秒

spring.rabbitmq.listener.simple.retry.initial-interval=3000ms

# 重试最大间隔时间 单位毫秒

spring.rabbitmq.listener.simple.retry.max-interval=86400000ms

# 重发间隔因子 间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

spring.rabbitmq.listener.simple.retry.multiplier=2

开启rabbitmq的重发机制,并且设置好重试间隔时间(这个间隔时间应该是第一次到第二次的间隔时间,往后的间隔时间是通过间隔因子算的),以及最大间隔时间(避免出现无限重试的问题),还有重要的间隔因子,这样保证了每次的间隔时间是成比例增长的。

配置好后,接下来就要声明我们所用到的队列

import org.springframework.amqp.core.*;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* rabbitmq重发配置

*/

@Configuration

public class RepeatSendRabbitmqConfig {

/**

* 正常队列

*/

public final static String REPEAT_QUEUE = "repeat_queue";

/**

* 交换机

*/

public final static String REPEAT_EXCHANGE = "repeat_exchange";

/**

* 路由键

*/

public final static String REPEAT_ROUTING_KEY = "repeat_routing_key";

/**

* 声明队列

*/

@Bean

Queue repeatQueue() {

return QueueBuilder.durable(REPEAT_QUEUE).build();

}

/**

* 声明交换机

* @return

*/

@Bean

DirectExchange repeatExchange() {

return new DirectExchange(REPEAT_EXCHANGE);

}

/**

* 将队列和交换机进行绑定

* @param repeatQueue

* @param repeatExchange

* @return

*/

@Bean

Binding dlxBinding(Queue repeatQueue, DirectExchange repeatExchange) {

return BindingBuilder.bind(repeatQueue).to(repeatExchange).with(REPEAT_ROUTING_KEY);

}

}

我们把队列,交换机,路由键都声明好后,下一步就要写接收代码了。

首先我们要写好生产者,也就是把消息放入到消息队列中的那步。

import com.google.gson.Gson;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**

* 生产者,将消息放入队列

*/

@Component

public class MqSender {

Gson gson = new Gson();

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 将消息放入队列

* @param request

*/

public void repeatSend(RequestBean request){

log.err("repeat request " + gson.toJson(request),"接收时间:" + LocalDateTime.now());

rabbitTemplate.convertAndSend(RepeatSendRabbitmqConfig.REPEAT_EXCHANGE,

RepeatSendRabbitmqConfig.REPEAT_ROUTING_KEY,request);

}

}

注意了,repeatSend这个方法,只运行一次,这里并没有指定超时时间,仅仅是传入了exchange和routing key,通过之前的绑定,就能定位到是哪个队列,然后把参数放到队列中。

(这里有一个小坑,之前是使用的 AmqpTemplate amqpTemplate 这个接口来做数据存放,但是用了其中的方法,在测试的时候偶尔不会消费,,不知道什么原因,为了赶工,就改成使用 RabbitTemplate rabbitTemplate 了,有知道的小伙伴可以留言讨论一下)。

前期准备工作好后,最后一步就是拿来消费了。

import com.google.gson.Gson;

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.Objects;

import java.time.LocalDateTime;

/**

* 接收者,消费者

*/

@Component

public class MqReceiver {

Gson gson = new Gson();

@Autowired

private RemoteService remoteService;

/**

* 同步接口重发队列实现

* @param request

*/

@RabbitListener(queues = RepeatSendRabbitmqConfig.REPEAT_QUEUE)

public void ListenRepeatSend(RequestBean request){

log.err(" ListenRepeatSend request " + gson.toJson(request),

"同步时间 " + LocalDateTime.now());

BaseResp response = remoteService.send(request);

// 重新同步,失败后抛异常重试

if(!"SUCCESS".equals(response.getCode())){

throw new RuntimeException();

}

}

}

使用 @RabbitListener 指定监听队列,那么这个队列就会被这个消费者所监听了。把参数传入我们自己的同步方法中,如果失败了,我们就抛异常出去,不用做其他的任何操作,只需要抛出去,rabbitmq就会按照配置的时间,以及间隔,来重新执行方法了,直到不抛异常,或者超过了配置中的最大时间,就停止重复执行了。

最后,使用生产者时,直接在代码中直接调用一下就好。

相关推荐

教学楼里那种嵌着小石子的水磨石地面,是怎么整出来的? | 有趣的制造

今天的选题是之前小可爱「花凉」在后台发消息问的~看过以后念念不忘,满脑子都是小时候在教学楼冰冷地面上摔的跤,记不起来是不是在这种地面上磕掉的门牙...昨天发了预告后,有小可爱纷纷表示「就是这种地板,像...

教学楼里那种嵌着小石子的水磨石地面,是怎么整出来的?

话说有多少小可爱不想学习时,没事数着水磨石地面的小石子玩,然后互相评比哪颗石子最好看。到头来书又没有背完,课也没好好上,就怪地板有迷幻效果,扰乱了好好学习的坚定意志。(小编觉得即使换成瓷砖,你们也可能...

性能调优实战:Spring Boot 多线程处理SQL IN语句大量值的优化方案

环境:SpringBoot3.4.0...

RMAN备份监控及优化总结(rman全备份)

今天主要介绍一下如何对RMAN备份监控及优化,这里就不讲rman备份的一些原理了,仅供参考。一、监控RMAN备份1、确定备份源与备份设备的最大速度从磁盘读的速度和磁带写的带度、备份的速度不可能超出这两...

记Oracle中快速获取表及其各个字段注释的方法

简述java开发中,用过JPA的道友应该知道,我们可以通过写java代码自动生成对应的数据表;但这有个问题是,列名的注释并没有帮我们一起添加到数据库去,尤其在一些开发测试生产三个环境隔离的,就很不友好...

Oracle 数据库日常巡检之检查数据库cpu、I/O、内存性能

记录数据库的cpu使用、IO、内存等使用情况,使用vmstat,iostat,sar,top等命令进行信息收集并检查这些信息,判断资源使用情况。1.CPU使用情况:...

Oracle案例:ORA-00600: internal error code, arguments: 「4187」

本案例客户来自某省电信,alert日志大量的ORA-00600[4187]报错,已经影响到业务正常运行。...

MySQL索引失效的10大陷阱:从隐式类型转换到索引选择性全面优化

索引是MySQL性能优化的核心武器,但错误的使用场景可能让索引完全失效,导致查询性能断崖式下降。本文通过实际案例,深入剖析索引失效的典型场景及其底层原理,并提供可落地的解决方案。一、索引失效的核心原...

oracle查询语句执行计划分析(oracle如何查看sql执行计划)

1命令行开启配置#显示查询结果setautotraceon#不显示查询结果setautotracetraceonly2执行查询语句...

面试官:说说Oracle数据库result cache的原理是什么?

概述前面已经用实验给大家介绍了ResultCache相关内容,今天主要讨论一下Oracle11gResultCache的深层原理。从参数看,Oracle提供了ClientResultCac...

Oracle817 export 时ORA-06553和ORA-00904处理

现象:数据库版本8.1.7...

Oracle案例:一次gc buffer busy acquire诊断

本案例来自某客户两节点rac的一次生产故障,现象是大面积的gcbufferbusyacquire导致业务瘫痪。...

说文解字:“雪”字本身在造字时就很浪漫!

这是雪山的“雪”字。可是你知道吗?“雪”这个字其实和“山”是没有任何关系的。这个字下半部分“彐”并不是一座翻倒的山,而是一只手的意思。(凡是带“彐”的汉字,其实都和手有关。)“雪”字的商代甲骨文形状,...

应用最广的两类数据库的区别、优势对比、查询优化方法及案例实践

 1、通用数据库分类  1.1关系型数据库  关系型数据库是多个二维数据表的集合,数据以二维数据表的形式进行存储,数据表之间可以通过应用程序或者数据的主、外键建立特定的关联关系,让数据之间存在特定的...

【SQL】SQL 语法差异大全(PgSQL/MySQL/Oracle/TiDB/OceanBase)

以下是针对不同数据库系统的SQL语法差异总结,按功能分类展示:一、基础查询1.分页查询...