diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMessageDlxConsumer.java b/src/main/java/top/peng/answerbi/bizmq/BiMessageDlxConsumer.java new file mode 100644 index 0000000..7fd0e83 --- /dev/null +++ b/src/main/java/top/peng/answerbi/bizmq/BiMessageDlxConsumer.java @@ -0,0 +1,62 @@ +/* + * @(#)BiMessageProducer.java + * + * Copyright © 2023 YunPeng Corporation. + */ +package top.peng.answerbi.bizmq; + +import com.rabbitmq.client.Channel; +import javax.annotation.Resource; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import top.peng.answerbi.common.ErrorCode; +import top.peng.answerbi.exception.BusinessException; +import top.peng.answerbi.model.enums.BiTaskStatusEnum; +import top.peng.answerbi.service.ChartService; + +/** + * BiMessageDlxConsumer + * + * @author yunpeng + * @version 1.0 2023/7/24 + */ +@Component +@Slf4j +public class BiMessageDlxConsumer { + + @Resource + private ChartService chartService; + + + /** + * 接收死信队列消息 + * @param message + * @param channel + * @param deliveryTag @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag 是一个方法参数注解,用于从消息头中获取投递标签 + * 在RabbitMQ中,每条消息者都会被分配一个唯一的投递标签,用于标识该消息在通道中的投递状态和顺序 + */ + @SneakyThrows + @RabbitListener(queues = {BiMqConstant.BI_DLX_QUEUE_NAME}, ackMode = "MANUAL") + public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){ + log.info("receive dead letter Message message = {}", message); + if (StringUtils.isBlank(message)){ + //如果更新失败,未拿到chartId, 拒绝当前消息, 让消息重新进入队列 + channel.basicNack(deliveryTag, false, false); + throw new BusinessException(ErrorCode.SYSTEM_ERROR); + } + long chartId = Long.parseLong(message); + + //进入死信队列的消息 更新图表状态为失败 + chartService.updateChartStatus(chartId, BiTaskStatusEnum.FAILED.getValue(), "系统繁忙,生成失败!"); + + //收到确认消息的接收 + //投递标签 deliveryTag 是一个数字标识,它在消息消费者接收到消息之后用于向RabbitMq确认消息的处理状态 + //通过将投递标签传递给channel.basicAck(deliveryTag,false)方法,可以告知RabbitMQ该消息已经成功处理,可以进行确认和从队列中删除 + channel.basicAck(deliveryTag, false); + } +} diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMessageProducer.java b/src/main/java/top/peng/answerbi/bizmq/BiMessageProducer.java index 54f88bc..2c9ac12 100644 --- a/src/main/java/top/peng/answerbi/bizmq/BiMessageProducer.java +++ b/src/main/java/top/peng/answerbi/bizmq/BiMessageProducer.java @@ -20,7 +20,12 @@ public class BiMessageProducer { @Resource private RabbitTemplate rabbitTemplate; - public void sendMessage(String message){ - rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message); + public void sendMessage(String msg){ + rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY, + msg, message -> { + //给消息设置延迟毫秒值,如果该消息30s未被消费,会被丢弃或进入死信队列(如果实现了的话) + message.getMessageProperties().setExpiration("30000"); + return message; + }); } } diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMqConfig.java b/src/main/java/top/peng/answerbi/bizmq/BiMqConfig.java new file mode 100644 index 0000000..0bc04df --- /dev/null +++ b/src/main/java/top/peng/answerbi/bizmq/BiMqConfig.java @@ -0,0 +1,77 @@ +/* + * @(#)BiMqConfig.java + * + * Copyright © 2023 YunPeng Corporation. + */ +package top.peng.answerbi.bizmq; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.ExchangeBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * BiMqConfig + * + * @author yunpeng + * @version 1.0 2023/7/25 + */ +@Configuration +public class BiMqConfig { + + //声明死信队列 + @Bean + public Queue deadLetterQueue(){ + return new Queue(BiMqConstant.BI_DLX_QUEUE_NAME); + } + + //声明死信交换机 + @Bean + public DirectExchange deadLetterExchange() { + return ExchangeBuilder + .directExchange(BiMqConstant.BI_DLX_EXCHANGE_NAME) + .build(); + } + + + //声明Bi分析业务队列 + @Bean + public Queue biQueue(){ + return QueueBuilder + .durable(BiMqConstant.BI_QUEUE_NAME) + //绑定死信交换机 + .deadLetterExchange(BiMqConstant.BI_DLX_EXCHANGE_NAME) + //绑定死信的路由key + .deadLetterRoutingKey(BiMqConstant.BI_DLX_ROUTING_KEY) + .build(); + } + + //声明Bi分析业务交换机 + @Bean + public DirectExchange biExchange() { + return ExchangeBuilder + .directExchange(BiMqConstant.BI_EXCHANGE_NAME) + .build(); + } + + //绑定Bi分析业务队列到Bi分析业务交换机 + @Bean + public Binding biBinding(){ + return BindingBuilder + .bind(biQueue()) + .to(biExchange()) + .with(BiMqConstant.BI_ROUTING_KEY); + } + //绑定Bi分析业务队列到Bi分析业务交换机 + @Bean + public Binding DeadLetterBinding(){ + return BindingBuilder + .bind(deadLetterQueue()) + .to(deadLetterExchange()) + .with(BiMqConstant.BI_DLX_ROUTING_KEY); + } +} diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMqConstant.java b/src/main/java/top/peng/answerbi/bizmq/BiMqConstant.java index c03ac5c..62217f7 100644 --- a/src/main/java/top/peng/answerbi/bizmq/BiMqConstant.java +++ b/src/main/java/top/peng/answerbi/bizmq/BiMqConstant.java @@ -12,6 +12,12 @@ package top.peng.answerbi.bizmq; * @version 1.0 2023/7/24 */ public interface BiMqConstant { + String BI_DLX_EXCHANGE_NAME = "bi_dlx_exchange"; + + String BI_DLX_QUEUE_NAME = "bi_dlx_queue"; + + String BI_DLX_ROUTING_KEY = "bi_dlx_routingKey"; + String BI_EXCHANGE_NAME = "bi_exchange"; String BI_QUEUE_NAME = "bi_queue"; diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMqInitMain.java b/src/main/java/top/peng/answerbi/bizmq/BiMqInitMain.java deleted file mode 100644 index ffaaa74..0000000 --- a/src/main/java/top/peng/answerbi/bizmq/BiMqInitMain.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * @(#)BiMqInitMain.java - * - * Copyright © 2023 YunPeng Corporation. - */ -package top.peng.answerbi.bizmq; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; - -/** - * BiMqInitMain 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次) - * - * @author yunpeng - * @version 1.0 2023/7/24 - */ -public class BiMqInitMain { - - public static void main(String[] args) { - try { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); - Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); - String exchangeName = BiMqConstant.BI_EXCHANGE_NAME; - channel.exchangeDeclare(exchangeName, "direct"); - - //创建队列 - String queueName = BiMqConstant.BI_QUEUE_NAME; - channel.queueDeclare(queueName,true,false,false,null); - channel.queueBind(queueName, exchangeName, BiMqConstant.BI_ROUTING_KEY); - } catch (Exception e) { - e.printStackTrace(); - } - } -} diff --git a/src/main/java/top/peng/answerbi/controller/ChartController.java b/src/main/java/top/peng/answerbi/controller/ChartController.java index 378b302..6ff0734 100644 --- a/src/main/java/top/peng/answerbi/controller/ChartController.java +++ b/src/main/java/top/peng/answerbi/controller/ChartController.java @@ -336,6 +336,26 @@ public class ChartController { return ResultUtils.success(biResponse); } + /** + * 重新生成 + * + * @param chartId + * @return + */ + @PostMapping("/regen") + @RedissonRateLimiter(qps = 1) + public CommonResponse regenChartByAiAsyncMq(Long chartId) { + ThrowUtils.throwIf(chartId == null, ErrorCode.PARAMS_ERROR, "数据不存在"); + //更新状态为等待中 + boolean update = chartService.updateChartStatus(chartId, BiTaskStatusEnum.WAIT.getValue(), null); + ThrowUtils.throwIf(!update, ErrorCode.SYSTEM_ERROR, "图表状态更新失败"); + + biMessageProducer.sendMessage(String.valueOf(chartId)); + BiResponse biResponse = new BiResponse(); + biResponse.setChartId(chartId); + return ResultUtils.success(biResponse); + } + /** * 预处理请求 根据用户输入构建 要存入数据库的 Chart 对象 * diff --git a/src/main/java/top/peng/answerbi/service/impl/ChartServiceImpl.java b/src/main/java/top/peng/answerbi/service/impl/ChartServiceImpl.java index b315f3d..4c60c57 100644 --- a/src/main/java/top/peng/answerbi/service/impl/ChartServiceImpl.java +++ b/src/main/java/top/peng/answerbi/service/impl/ChartServiceImpl.java @@ -1,6 +1,7 @@ package top.peng.answerbi.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; @@ -36,11 +37,11 @@ public class ChartServiceImpl extends ServiceImpl @Override @Transactional(rollbackFor = { Exception.class }) public boolean updateChartStatus(long chartId, String status, String execMessage) { - Chart updateChart = new Chart(); - updateChart.setId(chartId); - updateChart.setStatus(status); - updateChart.setExecMessage(execMessage); - boolean updateResult = this.updateById(updateChart); + LambdaUpdateWrapper wrapper = new LambdaUpdateWrapper() + .set(Chart::getStatus, status) + .set(Chart::getExecMessage,execMessage) + .eq(Chart::getId, chartId); + boolean updateResult = this.update(wrapper); if (!updateResult){ log.error("更新图表[{}]状态失败", chartId); } @@ -55,12 +56,13 @@ public class ChartServiceImpl extends ServiceImpl @Override @Transactional(rollbackFor = { Exception.class }) public boolean updateChartSucceedResult(BiResponse biResponse) { - Chart updateChart = new Chart(); - updateChart.setId(biResponse.getChartId()); - updateChart.setStatus(BiTaskStatusEnum.SUCCEED.getValue()); - updateChart.setGenChart(biResponse.getGenChart()); - updateChart.setGenResult(biResponse.getGenResult()); - boolean updateResult = this.updateById(updateChart); + LambdaUpdateWrapper wrapper = new LambdaUpdateWrapper() + .set(Chart::getStatus, BiTaskStatusEnum.SUCCEED.getValue()) + .set(Chart::getExecMessage,null) + .set(Chart::getGenChart, biResponse.getGenChart()) + .set(Chart::getGenResult, biResponse.getGenResult()) + .eq(Chart::getId, biResponse.getChartId()); + boolean updateResult = this.update(wrapper); if (!updateResult){ log.error("更新图表[{}]结果失败", biResponse.getChartId()); }