引入MQ进行异步分析

This commit is contained in:
brian 2023-07-24 19:19:59 +08:00
parent 1d1d195a66
commit 00ebe067c1
11 changed files with 344 additions and 66 deletions

View File

@ -121,6 +121,11 @@
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.yucongming</groupId> <groupId>com.yucongming</groupId>
<artifactId>yucongming-java-sdk</artifactId> <artifactId>yucongming-java-sdk</artifactId>

View File

@ -0,0 +1,100 @@
/*
* @(#)BiMessageProducer.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import com.rabbitmq.client.Channel;
import javax.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import top.peng.answerbi.common.ErrorCode;
import top.peng.answerbi.constant.BiConstant;
import top.peng.answerbi.exception.BusinessException;
import top.peng.answerbi.manager.AiManager;
import top.peng.answerbi.model.entity.Chart;
import top.peng.answerbi.model.enums.BiTaskStatusEnum;
import top.peng.answerbi.model.vo.BiResponse;
import top.peng.answerbi.service.ChartService;
import top.peng.answerbi.utils.bizutils.BiUtils;
/**
* BiMessageProducer
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
@Component
@Slf4j
public class BiMessageConsumer {
@Resource
private ChartService chartService;
@Resource
private AiManager aiManager;
/**
* 接收消息
* @param message
* @param channel
* @param deliveryTag @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag 是一个方法参数注解用于从消息头中获取投递标签
* 在RabbitMQ中每条消息者都会被分配一个唯一的投递标签用于标识该消息在通道中的投递状态和顺序
*/
@SneakyThrows
@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
log.info("receiveMessage message = {}", message);
if (StringUtils.isBlank(message)){
//如果更新失败未拿到chartId, 拒绝当前消息 让消息重新进入队列
channel.basicNack(deliveryTag, false, false);
throw new BusinessException(ErrorCode.SYSTEM_ERROR);
}
long chartId = Long.parseLong(message);
Chart chart = chartService.getById(chartId);
if (chart == null){
//如果更新失败未拿到chartId, 拒绝当前消息 让消息重新进入队列
channel.basicNack(deliveryTag, false, false);
throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空");
}
//先修改图表任务状态为执行中;
boolean updateRunningRes = chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.RUNNING.getValue(), null);
if (!updateRunningRes){
channel.basicNack(deliveryTag, false, false);
chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.FAILED.getValue(), "更新图表执行中状态失败");
throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空");
}
//调用AI
String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, BiUtils.buildUserInputForAi(chart));
BiResponse biResponse;
try {
biResponse = aiManager.aiAnsToBiResp(aiResult);
} catch (BusinessException e) {
channel.basicNack(deliveryTag, false, false);
//执行失败状态修改为失败,记录任务失败信息
chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.FAILED.getValue(), e.getMessage());
throw e;
}
//执行成功后修改为已完成保存执行结果
biResponse.setChartId(chart.getId());
boolean updateSucceedRes = chartService.updateChartSucceedResult(biResponse);
if (!updateSucceedRes){
channel.basicNack(deliveryTag, false, false);
chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.FAILED.getValue(), "更新图表成功状态失败");
}
//收到确认消息的接收
//投递标签 deliveryTag 是一个数字标识它在消息消费者接收到消息之后用于向RabbitMq确认消息的处理状态
//通过将投递标签传递给channel.basicAck(deliveryTag,false)方法可以告知RabbitMQ该消息已经成功处理可以进行确认和从队列中删除
channel.basicAck(deliveryTag, false);
}
}

View File

@ -0,0 +1,26 @@
/*
* @(#)BiMessageProducer.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* BiMessageProducer
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
@Component
public class BiMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message){
rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message);
}
}

View File

@ -0,0 +1,20 @@
/*
* @(#)BiMqConstant.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
/**
* BiMqConstant
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
public interface BiMqConstant {
String BI_EXCHANGE_NAME = "bi_exchange";
String BI_QUEUE_NAME = "bi_queue";
String BI_ROUTING_KEY = "bi_routingKey";
}

View File

@ -0,0 +1,37 @@
/*
* @(#)BiMqInitMain.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.bizmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* BiMqInitMain 用于创建测试程序用到的交换机和队列只用在程序启动前执行一次
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
public class BiMqInitMain {
public static void main(String[] args) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = BiMqConstant.BI_EXCHANGE_NAME;
channel.exchangeDeclare(exchangeName, "direct");
//创建队列
String queueName = BiMqConstant.BI_QUEUE_NAME;
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName, exchangeName, BiMqConstant.BI_ROUTING_KEY);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -2,11 +2,8 @@ package top.peng.answerbi.controller;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -21,6 +18,7 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import top.peng.answerbi.annotation.AuthCheck; import top.peng.answerbi.annotation.AuthCheck;
import top.peng.answerbi.annotation.RedissonRateLimiter; import top.peng.answerbi.annotation.RedissonRateLimiter;
import top.peng.answerbi.bizmq.BiMessageProducer;
import top.peng.answerbi.common.CommonResponse; import top.peng.answerbi.common.CommonResponse;
import top.peng.answerbi.common.DeleteRequest; import top.peng.answerbi.common.DeleteRequest;
import top.peng.answerbi.common.ErrorCode; import top.peng.answerbi.common.ErrorCode;
@ -43,6 +41,7 @@ import top.peng.answerbi.service.ChartService;
import top.peng.answerbi.service.UserService; import top.peng.answerbi.service.UserService;
import top.peng.answerbi.utils.ExcelUtils; import top.peng.answerbi.utils.ExcelUtils;
import top.peng.answerbi.utils.ValidUtils; import top.peng.answerbi.utils.ValidUtils;
import top.peng.answerbi.utils.bizutils.BiUtils;
/** /**
* 图表接口 * 图表接口
@ -67,6 +66,9 @@ public class ChartController {
@Resource @Resource
private ThreadPoolExecutor threadPoolExecutor; private ThreadPoolExecutor threadPoolExecutor;
@Resource
private BiMessageProducer biMessageProducer;
// region 增删改查 // region 增删改查
/** /**
@ -242,9 +244,8 @@ public class ChartController {
GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) { GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
//生成参数 //生成参数
Map<String, Object> aiMsgAndChartData = genAiMsgAndChartData(genChartByAiRequest, multipartFile, request); Chart chart = preHandleGenChartRequest(genChartByAiRequest, multipartFile, request);
String userInput = (String) aiMsgAndChartData.get("userInput"); String userInput = BiUtils.buildUserInputForAi(chart);
Chart chart = (Chart) aiMsgAndChartData.get("chartObj");
//调用AI //调用AI
String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, userInput); String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, userInput);
@ -274,20 +275,18 @@ public class ChartController {
GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) { GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
//生成参数 //生成参数
Map<String, Object> aiMsgAndChartData = genAiMsgAndChartData(genChartByAiRequest, multipartFile, request); Chart chart = preHandleGenChartRequest(genChartByAiRequest, multipartFile, request);
String userInput = (String) aiMsgAndChartData.get("userInput"); String userInput = BiUtils.buildUserInputForAi(chart);
Chart chart = (Chart) aiMsgAndChartData.get("chartObj");
//先插入数据库, 状态为排队中 //先插入数据库, 状态为排队中
chart.setStatus(BiTaskStatusEnum.WAIT.getValue()); chart.setStatus(BiTaskStatusEnum.WAIT.getValue());
boolean saveResult = chartService.save(chart); boolean saveResult = chartService.save(chart);
ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败"); ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");
AtomicReference<BiResponse> ResBiResponse = new AtomicReference<>(new BiResponse());
//创建线程任务 //创建线程任务
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
//先修改图表任务状态为执行中; //先修改图表任务状态为执行中;
handleChartStatusUpdate(chart.getId(),BiTaskStatusEnum.RUNNING.getValue(), ""); chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.RUNNING.getValue(), "");
//调用AI //调用AI
String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, userInput); String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, userInput);
@ -296,27 +295,56 @@ public class ChartController {
biResponse = aiManager.aiAnsToBiResp(aiResult); biResponse = aiManager.aiAnsToBiResp(aiResult);
} catch (BusinessException e) { } catch (BusinessException e) {
//执行失败状态修改为失败,记录任务失败信息 //执行失败状态修改为失败,记录任务失败信息
handleChartStatusUpdate(chart.getId(),BiTaskStatusEnum.FAILED.getValue(), e.getMessage()); chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.FAILED.getValue(), e.getMessage());
throw e; throw e;
} }
//执行成功后修改为已完成保存执行结果 //执行成功后修改为已完成保存执行结果
biResponse.setChartId(chart.getId()); biResponse.setChartId(chart.getId());
handleChartSuccessStatusUpdate(biResponse); chartService.updateChartSucceedResult(biResponse);
ResBiResponse.set(biResponse);
}, threadPoolExecutor); }, threadPoolExecutor);
return ResultUtils.success(ResBiResponse.get()); BiResponse biResponse = new BiResponse();
biResponse.setChartId(chart.getId());
return ResultUtils.success(biResponse);
} }
/** /**
* 根据用户输入构建 要发送给AI的消息 要存入数据库的 Chart 对象 * 智能分析 (异步消息队列)
*
* @param multipartFile
* @param genChartByAiRequest
* @param request
* @return
*/
@PostMapping("/gen/async/mq")
@RedissonRateLimiter(qps = 1)
public CommonResponse<BiResponse> genChartByAiAsyncMq(@RequestPart("file") MultipartFile multipartFile,
GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
//生成参数
Chart chart = preHandleGenChartRequest(genChartByAiRequest, multipartFile, request);
//先插入数据库, 状态为排队中
chart.setStatus(BiTaskStatusEnum.WAIT.getValue());
boolean saveResult = chartService.save(chart);
ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");
biMessageProducer.sendMessage(String.valueOf(chart.getId()));
BiResponse biResponse = new BiResponse();
biResponse.setChartId(chart.getId());
return ResultUtils.success(biResponse);
}
/**
* 预处理请求 根据用户输入构建 要存入数据库的 Chart 对象
* *
* @param genChartByAiRequest * @param genChartByAiRequest
* @param multipartFile * @param multipartFile
* @param request * @param request
* @return * @return
*/ */
private Map<String,Object> genAiMsgAndChartData(GenChartByAiRequest genChartByAiRequest,MultipartFile multipartFile, HttpServletRequest request){ private Chart preHandleGenChartRequest(GenChartByAiRequest genChartByAiRequest,MultipartFile multipartFile, HttpServletRequest request){
//通过request对象拿到用户id(必须登录才能使用) //通过request对象拿到用户id(必须登录才能使用)
User loginUser = userService.getLoginUser(request); User loginUser = userService.getLoginUser(request);
@ -332,19 +360,7 @@ public class ChartController {
ThrowUtils.throwIf(StringUtils.isNotBlank(chartName) && chartName.length() > 100,ErrorCode.PARAMS_ERROR,"图表名称过长"); ThrowUtils.throwIf(StringUtils.isNotBlank(chartName) && chartName.length() > 100,ErrorCode.PARAMS_ERROR,"图表名称过长");
ValidUtils.validFile(multipartFile, 1, Arrays.asList("xls","xlsx")); ValidUtils.validFile(multipartFile, 1, Arrays.asList("xls","xlsx"));
//用户输入
StringBuilder userInput = new StringBuilder();
userInput.append("分析需求:").append("\n");
//拼接分析目标
String userGoal = goal;
if (StringUtils.isNotBlank(chartType)){
userGoal += ",请使用" + chartType;
}
userInput.append(userGoal).append("\n");
userInput.append("原始数据:").append("\n");
//压缩后的数据
String csvData = ExcelUtils.excelToCsv(multipartFile); String csvData = ExcelUtils.excelToCsv(multipartFile);
userInput.append(csvData).append("\n");
Chart chart = new Chart(); Chart chart = new Chart();
chart.setChartName(chartName); chart.setChartName(chartName);
@ -352,34 +368,7 @@ public class ChartController {
chart.setChartType(chartType); chart.setChartType(chartType);
chart.setChartData(csvData); chart.setChartData(csvData);
chart.setUserId(loginUser.getId()); chart.setUserId(loginUser.getId());
return chart;
Map<String, Object> result = new HashMap<>();
result.put("userInput",userInput.toString());
result.put("chartObj", chart);
return result;
}
private void handleChartStatusUpdate(long chartId,String status,String execMessage){
Chart updateChart = new Chart();
updateChart.setId(chartId);
updateChart.setStatus(status);
updateChart.setExecMessage(execMessage);
boolean updateResult = chartService.updateById(updateChart);
if (!updateResult){
log.error("更新图表[{}]状态失败", chartId);
}
}
private void handleChartSuccessStatusUpdate(BiResponse biResponse){
Chart updateChart = new Chart();
updateChart.setId(biResponse.getChartId());
updateChart.setStatus(BiTaskStatusEnum.SUCCEED.getValue());
updateChart.setGenChart(biResponse.getGenChart());
updateChart.setGenResult(biResponse.getGenResult());
boolean updateResult = chartService.updateById(updateChart);
if (!updateResult){
log.error("更新图表[{}]结果失败", biResponse.getChartId());
}
} }
} }

View File

@ -1,11 +1,10 @@
package top.peng.answerbi.service; package top.peng.answerbi.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import top.peng.answerbi.model.dto.chart.ChartQueryRequest;
import top.peng.answerbi.model.dto.post.PostQueryRequest;
import top.peng.answerbi.model.entity.Chart;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import top.peng.answerbi.model.entity.Post; import top.peng.answerbi.model.dto.chart.ChartQueryRequest;
import top.peng.answerbi.model.entity.Chart;
import top.peng.answerbi.model.vo.BiResponse;
/** /**
* @author yunpeng.zhang * @author yunpeng.zhang
@ -14,6 +13,21 @@ import top.peng.answerbi.model.entity.Post;
*/ */
public interface ChartService extends IService<Chart> { public interface ChartService extends IService<Chart> {
/**
* 更新图表状态
* @param chartId
* @param status
* @param execMessage
*/
boolean updateChartStatus(long chartId,String status,String execMessage);
/**
* 更新图表生成成功结果
* @param biResponse
*/
boolean updateChartSucceedResult(BiResponse biResponse);
/** /**
* 获取查询条件 * 获取查询条件
* *

View File

@ -2,14 +2,18 @@ package top.peng.answerbi.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import top.peng.answerbi.constant.CommonConstant; import top.peng.answerbi.constant.CommonConstant;
import top.peng.answerbi.mapper.ChartMapper;
import top.peng.answerbi.model.dto.chart.ChartQueryRequest; import top.peng.answerbi.model.dto.chart.ChartQueryRequest;
import top.peng.answerbi.model.entity.Chart; import top.peng.answerbi.model.entity.Chart;
import top.peng.answerbi.model.enums.BiTaskStatusEnum;
import top.peng.answerbi.model.vo.BiResponse;
import top.peng.answerbi.service.ChartService; import top.peng.answerbi.service.ChartService;
import top.peng.answerbi.mapper.ChartMapper;
import org.springframework.stereotype.Service;
import top.peng.answerbi.utils.SqlUtils; import top.peng.answerbi.utils.SqlUtils;
/** /**
@ -18,9 +22,51 @@ import top.peng.answerbi.utils.SqlUtils;
* @createDate 2023-07-10 16:45:42 * @createDate 2023-07-10 16:45:42
*/ */
@Service @Service
@Slf4j
public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart> public class ChartServiceImpl extends ServiceImpl<ChartMapper, Chart>
implements ChartService{ implements ChartService{
/**
* 更新图表状态
*
* @param chartId
* @param status
* @param execMessage
*/
@Override
@Transactional(rollbackFor = { Exception.class })
public boolean updateChartStatus(long chartId, String status, String execMessage) {
Chart updateChart = new Chart();
updateChart.setId(chartId);
updateChart.setStatus(status);
updateChart.setExecMessage(execMessage);
boolean updateResult = this.updateById(updateChart);
if (!updateResult){
log.error("更新图表[{}]状态失败", chartId);
}
return updateResult;
}
/**
* 更新图表生成成功结果
*
* @param biResponse
*/
@Override
@Transactional(rollbackFor = { Exception.class })
public boolean updateChartSucceedResult(BiResponse biResponse) {
Chart updateChart = new Chart();
updateChart.setId(biResponse.getChartId());
updateChart.setStatus(BiTaskStatusEnum.SUCCEED.getValue());
updateChart.setGenChart(biResponse.getGenChart());
updateChart.setGenResult(biResponse.getGenResult());
boolean updateResult = this.updateById(updateChart);
if (!updateResult){
log.error("更新图表[{}]结果失败", biResponse.getChartId());
}
return updateResult;
}
/** /**
* 获取查询条件 * 获取查询条件
* *

View File

@ -6,13 +6,10 @@
package top.peng.answerbi.utils; package top.peng.answerbi.utils;
import cn.hutool.core.io.FileUtil; import cn.hutool.core.io.FileUtil;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import top.peng.answerbi.common.ErrorCode; import top.peng.answerbi.common.ErrorCode;
import top.peng.answerbi.exception.BusinessException;
import top.peng.answerbi.exception.ThrowUtils; import top.peng.answerbi.exception.ThrowUtils;
import top.peng.answerbi.model.enums.FileUploadBizEnum;
/** /**
* ValidUtils 校验工具类 * ValidUtils 校验工具类

View File

@ -0,0 +1,39 @@
/*
* @(#)BiUtils.java
*
* Copyright © 2023 YunPeng Corporation.
*/
package top.peng.answerbi.utils.bizutils;
import org.apache.commons.lang3.StringUtils;
import top.peng.answerbi.model.entity.Chart;
/**
* BiUtils 业务工具
*
* @author yunpeng
* @version 1.0 2023/7/24
*/
public class BiUtils {
/**
* 构建AI可以识别的用户输入
*/
public static String buildUserInputForAi(Chart chart){
String goal = chart.getGoal();
String chartType = chart.getChartType();
String csvData = chart.getChartData();
//用户输入
StringBuilder userInput = new StringBuilder();
userInput.append("分析需求:").append("\n");
//拼接分析目标
String userGoal = goal;
if (StringUtils.isNotBlank(chartType)){
userGoal += ",请使用" + chartType;
}
userInput.append(userGoal).append("\n");
userInput.append("原始数据:").append("\n");
userInput.append(csvData).append("\n");
return userInput.toString();
}
}

View File

@ -39,6 +39,11 @@ spring:
multipart: multipart:
# 大小限制 # 大小限制
max-file-size: 10MB max-file-size: 10MB
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
server: server:
address: 0.0.0.0 address: 0.0.0.0
port: 8101 port: 8101