引入死信队列

This commit is contained in:
brian 2023-07-25 18:24:29 +08:00
parent 00ebe067c1
commit f27058ce7a
7 changed files with 185 additions and 50 deletions

View File

@ -0,0 +1,62 @@
/*
* @(#)BiMessageProducer.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import com.rabbitmq.client.Channel;
import javax.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import top.peng.answerbi.common.ErrorCode;
import top.peng.answerbi.exception.BusinessException;
import top.peng.answerbi.model.enums.BiTaskStatusEnum;
import top.peng.answerbi.service.ChartService;
/**
* BiMessageDlxConsumer
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
@Component
@Slf4j
public class BiMessageDlxConsumer {
@Resource
private ChartService chartService;
/**
* 接收死信队列消息
* @param message
* @param channel
* @param deliveryTag @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag 是一个方法参数注解用于从消息头中获取投递标签
* 在RabbitMQ中每条消息者都会被分配一个唯一的投递标签用于标识该消息在通道中的投递状态和顺序
*/
@SneakyThrows
@RabbitListener(queues = {BiMqConstant.BI_DLX_QUEUE_NAME}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
log.info("receive dead letter Message message = {}", message);
if (StringUtils.isBlank(message)){
//如果更新失败未拿到chartId, 拒绝当前消息 让消息重新进入队列
channel.basicNack(deliveryTag, false, false);
throw new BusinessException(ErrorCode.SYSTEM_ERROR);
}
long chartId = Long.parseLong(message);
//进入死信队列的消息 更新图表状态为失败
chartService.updateChartStatus(chartId, BiTaskStatusEnum.FAILED.getValue(), "系统繁忙,生成失败!");
//收到确认消息的接收
//投递标签 deliveryTag 是一个数字标识它在消息消费者接收到消息之后用于向RabbitMq确认消息的处理状态
//通过将投递标签传递给channel.basicAck(deliveryTag,false)方法可以告知RabbitMQ该消息已经成功处理可以进行确认和从队列中删除
channel.basicAck(deliveryTag, false);
}
}

View File

@ -20,7 +20,12 @@ public class BiMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message){
rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message);
public void sendMessage(String msg){
rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY,
msg, message -> {
//给消息设置延迟毫秒值,如果该消息30s未被消费会被丢弃或进入死信队列(如果实现了的话)
message.getMessageProperties().setExpiration("30000");
return message;
});
}
}

View File

@ -0,0 +1,77 @@
/*
* @(#)BiMqConfig.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* BiMqConfig
*
* @author yunpeng
* @version 1.0 2023/7/25
*/
@Configuration
public class BiMqConfig {
//声明死信队列
@Bean
public Queue deadLetterQueue(){
return new Queue(BiMqConstant.BI_DLX_QUEUE_NAME);
}
//声明死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return ExchangeBuilder
.directExchange(BiMqConstant.BI_DLX_EXCHANGE_NAME)
.build();
}
//声明Bi分析业务队列
@Bean
public Queue biQueue(){
return QueueBuilder
.durable(BiMqConstant.BI_QUEUE_NAME)
//绑定死信交换机
.deadLetterExchange(BiMqConstant.BI_DLX_EXCHANGE_NAME)
//绑定死信的路由key
.deadLetterRoutingKey(BiMqConstant.BI_DLX_ROUTING_KEY)
.build();
}
//声明Bi分析业务交换机
@Bean
public DirectExchange biExchange() {
return ExchangeBuilder
.directExchange(BiMqConstant.BI_EXCHANGE_NAME)
.build();
}
//绑定Bi分析业务队列到Bi分析业务交换机
@Bean
public Binding biBinding(){
return BindingBuilder
.bind(biQueue())
.to(biExchange())
.with(BiMqConstant.BI_ROUTING_KEY);
}
//绑定Bi分析业务队列到Bi分析业务交换机
@Bean
public Binding DeadLetterBinding(){
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(BiMqConstant.BI_DLX_ROUTING_KEY);
}
}

View File

@ -12,6 +12,12 @@ package top.peng.answerbi.bizmq;
* @version 1.0 2023/7/24
*/
public interface BiMqConstant {
String BI_DLX_EXCHANGE_NAME = "bi_dlx_exchange";
String BI_DLX_QUEUE_NAME = "bi_dlx_queue";
String BI_DLX_ROUTING_KEY = "bi_dlx_routingKey";
String BI_EXCHANGE_NAME = "bi_exchange";
String BI_QUEUE_NAME = "bi_queue";

View File

@ -1,37 +0,0 @@
/*
* @(#)BiMqInitMain.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* BiMqInitMain 用于创建测试程序用到的交换机和队列只用在程序启动前执行一次
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
public class BiMqInitMain {
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = BiMqConstant.BI_EXCHANGE_NAME;
channel.exchangeDeclare(exchangeName, "direct");
//创建队列
String queueName = BiMqConstant.BI_QUEUE_NAME;
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName, exchangeName, BiMqConstant.BI_ROUTING_KEY);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -336,6 +336,26 @@ public class ChartController {
return ResultUtils.success(biResponse);
}
/**
* 重新生成
*
* @param chartId
* @return
*/
@PostMapping("/regen")
@RedissonRateLimiter(qps = 1)
public CommonResponse<BiResponse> regenChartByAiAsyncMq(Long chartId) {
ThrowUtils.throwIf(chartId == null, ErrorCode.PARAMS_ERROR, "数据不存在");
//更新状态为等待中
boolean update = chartService.updateChartStatus(chartId, BiTaskStatusEnum.WAIT.getValue(), null);
ThrowUtils.throwIf(!update, ErrorCode.SYSTEM_ERROR, "图表状态更新失败");
biMessageProducer.sendMessage(String.valueOf(chartId));
BiResponse biResponse = new BiResponse();
biResponse.setChartId(chartId);
return ResultUtils.success(biResponse);
}
/**
* 预处理请求 根据用户输入构建 要存入数据库的 Chart 对象
*

View File

@ -1,6 +1,7 @@
package top.peng.answerbi.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
@ -36,11 +37,11 @@ public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart>
@Override
@Transactional(rollbackFor = { Exception.class })
public boolean updateChartStatus(long chartId, String status, String execMessage) {
Chart updateChart = new Chart();
updateChart.setId(chartId);
updateChart.setStatus(status);
updateChart.setExecMessage(execMessage);
boolean updateResult = this.updateById(updateChart);
LambdaUpdateWrapper<Chart> wrapper = new LambdaUpdateWrapper<Chart>()
.set(Chart::getStatus, status)
.set(Chart::getExecMessage,execMessage)
.eq(Chart::getId, chartId);
boolean updateResult = this.update(wrapper);
if (!updateResult){
log.error("更新图表[{}]状态失败", chartId);
}
@ -55,12 +56,13 @@ public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart>
@Override
@Transactional(rollbackFor = { Exception.class })
public boolean updateChartSucceedResult(BiResponse biResponse) {
Chart updateChart = new Chart();
updateChart.setId(biResponse.getChartId());
updateChart.setStatus(BiTaskStatusEnum.SUCCEED.getValue());
updateChart.setGenChart(biResponse.getGenChart());
updateChart.setGenResult(biResponse.getGenResult());
boolean updateResult = this.updateById(updateChart);
LambdaUpdateWrapper<Chart> wrapper = new LambdaUpdateWrapper<Chart>()
.set(Chart::getStatus, BiTaskStatusEnum.SUCCEED.getValue())
.set(Chart::getExecMessage,null)
.set(Chart::getGenChart, biResponse.getGenChart())
.set(Chart::getGenResult, biResponse.getGenResult())
.eq(Chart::getId, biResponse.getChartId());
boolean updateResult = this.update(wrapper);
if (!updateResult){
log.error("更新图表[{}]结果失败", biResponse.getChartId());
}