diff --git a/README.md b/README.md index 8f84fd9..27a7cd8 100644 --- a/README.md +++ b/README.md @@ -139,13 +139,14 @@ yarn run dev ## 后续计划 - [x] 使用死信队列处理异常情况,将图表生成任务置为失败 -- [x] 引入Guava RateLimiter(单机) 和 Redisson RateLimiter(分布式) 两种限流机制 +- [x] 引入Guava RateLimiter(单机) 和 Redisson RateLimiter(分布式) 两种限流机制 (在请求方法上添加注解即可限流,方便快捷) - [x] 支持用户对失败的图表进行手动重试 - [x] 引入redis缓存提高加载速度 - [x] 引入jasypt encryption 对配置文件加密、解密 -- [ ] 图表数据分表存储,提高查询灵活性和性能 -- [ ] 给任务执行增加 guava Retrying重试机制,保证系统可靠性 +- [x] 给任务执行增加 guava Retrying重试机制,保证系统可靠性 + (guava Retrying 要使用 AttemptTimeLimiters.fixedTimeLimit()设置固定时间的超时限制 时需要 保证 guava版本在22或22以下) - [ ] 定时任务把失败状态的图表放到队列中(补偿机制) - [ ] 给任务的执行增加超时时间,超时自动标记为失败(超时控制) +- [ ] 图表数据分表存储,提高查询灵活性和性能 - [ ] 任务执行结果通过websocket实时通知给用户 - [ ] 我的图表管理页增加一个刷新、定时刷新的按钮,保证获取到图表的最新状态(前端轮询) diff --git a/pom.xml b/pom.xml index 1583315..d5455d7 100644 --- a/pom.xml +++ b/pom.xml @@ -55,12 +55,24 @@ com.github.binarywang wx-java-mp-spring-boot-starter 4.4.0 + + + guava + com.google.guava + + com.github.xiaoymin knife4j-spring-boot-starter 3.0.3 + + + guava + com.google.guava + + @@ -77,7 +89,7 @@ com.google.code.gson gson - 2.9.1 + 2.9.0 @@ -89,7 +101,7 @@ cn.hutool hutool-all - 5.8.8 + 5.8.9 org.springframework.boot @@ -130,7 +142,7 @@ com.google.guava guava - 31.1-jre + 22.0 org.redisson @@ -143,6 +155,17 @@ jasypt-spring-boot-starter 3.0.5 + + com.github.rholder + guava-retrying + 2.0.0 + + + guava + com.google.guava + + + diff --git a/src/main/java/top/peng/answerbi/bizmq/BiMessageConsumer.java b/src/main/java/top/peng/answerbi/bizmq/BiMessageConsumer.java index 2237ce9..3fbfdcb 100644 --- a/src/main/java/top/peng/answerbi/bizmq/BiMessageConsumer.java +++ b/src/main/java/top/peng/answerbi/bizmq/BiMessageConsumer.java @@ -72,11 +72,10 @@ public class BiMessageConsumer { chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.FAILED.getValue(), "更新图表执行中状态失败"); throw new BusinessException(ErrorCode.NOT_FOUND_ERROR, "图表为空"); } - - //调用AI - String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, BiUtils.buildUserInputForAi(chart)); BiResponse biResponse; try { + //调用AI + String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, BiUtils.buildUserInputForAi(chart)); biResponse = aiManager.aiAnsToBiResp(aiResult); } catch (BusinessException e) { channel.basicNack(deliveryTag, false, false); diff --git a/src/main/java/top/peng/answerbi/common/ErrorCode.java b/src/main/java/top/peng/answerbi/common/ErrorCode.java index 8742a60..760fd5a 100644 --- a/src/main/java/top/peng/answerbi/common/ErrorCode.java +++ b/src/main/java/top/peng/answerbi/common/ErrorCode.java @@ -15,6 +15,7 @@ public enum ErrorCode { NOT_FOUND_ERROR(40400, "请求数据不存在"), TOO_MANY_REQUEST(42900,"请求过于频繁"), FORBIDDEN_ERROR(40300, "禁止访问"), + REQUEST_TIME_OUT(40300, "请求超时"), SYSTEM_ERROR(50000, "系统内部异常"), OPERATION_ERROR(50001, "操作失败"); diff --git a/src/main/java/top/peng/answerbi/controller/ChartController.java b/src/main/java/top/peng/answerbi/controller/ChartController.java index 6ff0734..2d0ec32 100644 --- a/src/main/java/top/peng/answerbi/controller/ChartController.java +++ b/src/main/java/top/peng/answerbi/controller/ChartController.java @@ -283,10 +283,12 @@ public class ChartController { boolean saveResult = chartService.save(chart); ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败"); + + //创建线程任务 CompletableFuture.runAsync(() -> { //先修改图表任务状态为“执行中”; - chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.RUNNING.getValue(), ""); + chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.RUNNING.getValue(), null); //调用AI String aiResult = aiManager.doChat(BiConstant.BI_MODEL_ID, userInput); @@ -295,13 +297,17 @@ public class ChartController { biResponse = aiManager.aiAnsToBiResp(aiResult); } catch (BusinessException e) { //执行失败,状态修改为“失败”,记录任务失败信息 - chartService.updateChartStatus(chart.getId(),BiTaskStatusEnum.FAILED.getValue(), e.getMessage()); + chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.FAILED.getValue(), "AI生成错误"); throw e; } //执行成功后,修改为“已完成”、保存执行结果 biResponse.setChartId(chart.getId()); chartService.updateChartSucceedResult(biResponse); - }, threadPoolExecutor); + }, threadPoolExecutor).exceptionally((e) -> { + log.error("AI生成错误 chartId = {} userId = {} error = {}", chart.getUserId(), chart.getUserId(), e.getMessage()); + chartService.updateChartStatus(chart.getId(), BiTaskStatusEnum.FAILED.getValue(), "AI生成错误"); + return null; + }); BiResponse biResponse = new BiResponse(); biResponse.setChartId(chart.getId()); diff --git a/src/main/java/top/peng/answerbi/listener/RetryLogListener.java b/src/main/java/top/peng/answerbi/listener/RetryLogListener.java new file mode 100644 index 0000000..81930e4 --- /dev/null +++ b/src/main/java/top/peng/answerbi/listener/RetryLogListener.java @@ -0,0 +1,44 @@ +/* + * @(#)RetryLogListener.java + * + * Copyright © 2023 YunPeng Corporation. + */ +package top.peng.answerbi.listener; + + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryListener; +import lombok.extern.slf4j.Slf4j; + +/** + * RetryLogListener 重试监听器 + * + * @author yunpeng + * @version 1.0 2023/7/31 + */ +@Slf4j +public class RetryLogListener implements RetryListener { + + @Override + public void onRetry(Attempt attempt) { + // 第几次重试,(注意:第一次重试其实是第一次调用) + log.info("===== get ai response retry time : [{}] =====", attempt.getAttemptNumber()); + + // 距离第一次重试的延迟 + log.info("retry delay : [{}]", attempt.getDelaySinceFirstAttempt()); + + // 重试结果: 是异常终止, 还是正常返回 + log.info("hasException={}", attempt.hasException()); + log.info("hasResult={}", attempt.hasResult()); + + // 是什么原因导致异常 + if (attempt.hasException()) { + log.info("causeBy={}" , attempt.getExceptionCause().toString()); + } else { + // 正常返回时的结果 + log.info("result={}" , attempt.getResult()); + } + + log.info("===== log listen over. ====="); + } +} diff --git a/src/main/java/top/peng/answerbi/manager/AiManager.java b/src/main/java/top/peng/answerbi/manager/AiManager.java index f9eeebc..a02f4cb 100644 --- a/src/main/java/top/peng/answerbi/manager/AiManager.java +++ b/src/main/java/top/peng/answerbi/manager/AiManager.java @@ -5,11 +5,15 @@ */ package top.peng.answerbi.manager; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; import com.yupi.yucongming.dev.client.YuCongMingClient; import com.yupi.yucongming.dev.common.BaseResponse; import com.yupi.yucongming.dev.model.DevChatRequest; import com.yupi.yucongming.dev.model.DevChatResponse; +import java.util.concurrent.ExecutionException; import javax.annotation.Resource; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import top.peng.answerbi.common.ErrorCode; import top.peng.answerbi.constant.BiConstant; @@ -24,11 +28,15 @@ import top.peng.answerbi.model.vo.BiResponse; * @version 1.0 2023/7/14 */ @Service +@Slf4j public class AiManager { @Resource private YuCongMingClient yuCongMingClient; + @Resource + private AiRetryerBuilder aiRetryerBuilder; + /** * AI 对话 * @@ -41,9 +49,15 @@ public class AiManager { devChatRequest.setModelId(modelId); devChatRequest.setMessage(message); - BaseResponse response = yuCongMingClient.doChat(devChatRequest); + Retryer> retryer = aiRetryerBuilder.build(); + BaseResponse response = null; + try { + response = retryer.call(() -> yuCongMingClient.doChat(devChatRequest)); + } catch (ExecutionException | RetryException e) { + log.error("调用AI重试 错误: {}", e.getMessage()); + } - ThrowUtils.throwIf(response == null, ErrorCode.SYSTEM_ERROR,"AI响应错误"); + ThrowUtils.throwIf(response == null || response.getData() == null , ErrorCode.SYSTEM_ERROR,"AI响应错误"); return response.getData().getContent(); } diff --git a/src/main/java/top/peng/answerbi/manager/AiRetryerBuilder.java b/src/main/java/top/peng/answerbi/manager/AiRetryerBuilder.java new file mode 100644 index 0000000..86e7b52 --- /dev/null +++ b/src/main/java/top/peng/answerbi/manager/AiRetryerBuilder.java @@ -0,0 +1,50 @@ +/* + * @(#)GuavaRetryingConfig.java + * + * Copyright © 2023 YunPeng Corporation. + */ +package top.peng.answerbi.manager; + +import com.github.rholder.retry.AttemptTimeLimiters; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import com.google.common.base.Predicates; +import com.yupi.yucongming.dev.common.BaseResponse; +import com.yupi.yucongming.dev.model.DevChatResponse; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.springframework.stereotype.Component; +import top.peng.answerbi.listener.RetryLogListener; +import top.peng.answerbi.utils.SpinBlockStrategy; + +/** + * BiRetryerBuilder Bi智能分析业务重试机制 + * + * @author yunpeng + * @version 1.0 2023/7/31 + */ +@Component +public class AiRetryerBuilder { + + public Retryer> build(){ + return RetryerBuilder.>newBuilder() + .retryIfResult(Predicates.isNull()) + //发生IO异常时重试 + .retryIfExceptionOfType(IOException.class) + //发生运行时异常时重试 + .retryIfRuntimeException() + //重试策略 递增等待时长策略(降频重试) 依次在失败后的第5s、15s进行降频重试。 + .withWaitStrategy(WaitStrategies.incrementingWait(5, TimeUnit.SECONDS,5,TimeUnit.SECONDS)) + //最多执行3次(首次执行+最多重试2次) + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + //超时限制 超时则中断执行,继续重试。 + .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(90,TimeUnit.SECONDS)) + //自定义阻塞策略:自旋锁 + .withBlockStrategy(new SpinBlockStrategy()) + //重试监听器 + .withRetryListener(new RetryLogListener()) + .build(); + } +} diff --git a/src/main/java/top/peng/answerbi/utils/SpinBlockStrategy.java b/src/main/java/top/peng/answerbi/utils/SpinBlockStrategy.java new file mode 100644 index 0000000..e45c8c6 --- /dev/null +++ b/src/main/java/top/peng/answerbi/utils/SpinBlockStrategy.java @@ -0,0 +1,42 @@ +/* + * @(#)SpinBlockStrategy.java + * + * Copyright © 2023 YunPeng Corporation. + */ +package top.peng.answerbi.utils; + + +import com.github.rholder.retry.BlockStrategy; +import java.time.Duration; +import java.time.LocalDateTime; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * SpinBlockStrategy 自旋锁的实现, 不响应线程中断 + * + * @author yunpeng + * @version 1.0 2023/7/31 + */ +@Slf4j +@NoArgsConstructor +public class SpinBlockStrategy implements BlockStrategy { + + @Override + public void block(long sleepTime) { + LocalDateTime startTime = LocalDateTime.now(); + + long start = System.currentTimeMillis(); + long end = start; + log.info("[SpinBlockStrategy]...begin wait."); + + while (end - start <= sleepTime) { + end = System.currentTimeMillis(); + } + + //使用Java8新增的Duration计算时间间隔 + Duration duration = Duration.between(startTime, LocalDateTime.now()); + + log.info("[SpinBlockStrategy]...end wait.duration={}", duration.toMillis()); + } +}