Commit 5435092e authored by jiangyz's avatar jiangyz

代码提交

parent da58786b
package com.infoepoch.pms.agent.controller.request;
/**
* 精准关爱智能体流式会话请求。
*
* @param sessionId 会话唯一标识,用于串联短会话上下文
* @param message 用户发送给精准关爱智能体的自然语言内容
*/
public record CareAgentChatRequest(String sessionId, String message) {
}
package com.infoepoch.pms.agent.domain.care.log;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.Collection;
import java.util.Map;
import java.util.StringJoiner;
import java.util.UUID;
/**
* 精准关爱日志辅助
*/
public final class CareTraceLogSupport {
private static final int MAX_PREVIEW_LENGTH = 60;
private CareTraceLogSupport() {
}
/**
* 生成单次请求唯一 traceId。
*/
public static String newTraceId() {
return UUID.randomUUID().toString().replace("-", "");
}
/**
* 统一拼接日志前缀。
*/
public static String format(String traceId, String sessionId, String stage, String detail) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] %s".formatted(
safeValue(traceId),
safeValue(sessionId),
safeValue(stage),
safeValue(detail)
);
}
/**
* 统一拼接完整模型请求日志,保留原始换行便于排查 prompt 结构。
*/
public static String formatModelPrompt(String traceId, String sessionId, String stage, String prompt) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] 模型请求内容:%n%s".formatted(
safeValue(traceId),
safeValue(sessionId),
safeValue(stage),
prompt == null ? "" : prompt
);
}
/**
* 统一拼接完整模型返回日志,保留原始换行便于排查响应结构。
*/
public static String formatModelResponse(String traceId, String sessionId, String stage, String response) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] 模型返回内容:%n%s".formatted(
safeValue(traceId),
safeValue(sessionId),
safeValue(stage),
response == null ? "" : response
);
}
/**
* 统一拼接流式模型分片日志,便于查看模型逐块返回的原始内容。
*/
public static String formatModelStreamChunk(String traceId, String sessionId, String stage, int index, String chunk) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] chunkIndex=%s, chunkLength=%s, chunk内容:%n%s".formatted(
safeValue(traceId),
safeValue(sessionId),
safeValue(stage),
index,
chunk == null ? 0 : chunk.length(),
chunk == null ? "" : chunk
);
}
/**
* 返回 prompt 的原始长度,便于判断上下文体积。
*/
public static int promptLength(String prompt) {
return prompt == null ? 0 : prompt.length();
}
/**
* 生成安全的消息摘要。
*/
public static String safeMessageSummary(String message) {
if (!StringUtils.hasText(message)) {
return "长度=0";
}
String normalized = normalizeText(message);
return "长度=%d 摘要=%s".formatted(message.length(), abbreviate(normalized));
}
/**
* 生成安全的条件摘要。
*/
public static String safeConditionsSummary(Map<String, Object> conditions) {
if (conditions == null || conditions.isEmpty()) {
return "空";
}
StringJoiner joiner = new StringJoiner(", ", "{", "}");
conditions.forEach((key, value) -> joiner.add(key + "=" + summarizeValue(value)));
return joiner.toString();
}
/**
* 生成安全的文本摘要。
*/
public static String safeText(String text) {
if (!StringUtils.hasText(text)) {
return "";
}
return abbreviate(normalizeText(text));
}
/**
* 生成安全的集合条数摘要。
*/
public static String safeCount(String label, Collection<?> values) {
int size = CollectionUtils.isEmpty(values) ? 0 : values.size();
return label + "=" + size;
}
/**
* 兼容多种对象值的安全摘要。
*/
private static String summarizeValue(Object value) {
if (value == null) {
return "空";
}
if (value instanceof Collection<?> collection) {
return "数量=" + collection.size();
}
if (value instanceof Map<?, ?> map) {
return "数量=" + map.size();
}
return abbreviate(normalizeText(value.toString()));
}
/**
* 归一化文本中的连续空白。
*/
private static String normalizeText(String text) {
return text.replaceAll("\\s+", " ").trim();
}
/**
* 截断过长文本,避免日志膨胀。
*/
private static String abbreviate(String text) {
if (!StringUtils.hasText(text) || text.length() <= MAX_PREVIEW_LENGTH) {
return text;
}
return text.substring(0, MAX_PREVIEW_LENGTH) + "...";
}
/**
* 将空值统一显示为短横线。
*/
private static String safeValue(String value) {
return StringUtils.hasText(value) ? value : "-";
}
}
package com.infoepoch.pms.agent.domain.care.model;
/**
* 活动匹配用户规则。
*
* @param activityKey 活动关键词,用于匹配活动语义
* @param sex 性别编码
* @param minAge 最小年龄
* @param maxAge 最大年龄
* @param bmi BMI 分类
* @param hobby 兴趣爱好,多个值使用英文逗号拼接
* @param specialty 特长标签,多个值使用英文逗号拼接
*/
public record ActivityMatchUserRule(
String activityKey,
String sex,
Integer minAge,
Integer maxAge,
String bmi,
String hobby,
String specialty
) {
public ActivityMatchUserRule {
activityKey = trim(activityKey);
sex = trim(sex);
bmi = trim(bmi);
hobby = trim(hobby);
specialty = trim(specialty);
}
private static String trim(String value) {
return value == null ? "" : value.trim();
}
}
package com.infoepoch.pms.agent.domain.care.model;
import java.util.Collections;
import java.util.List;
/**
* 活动摘要。
*
* @param activityId 活动唯一标识,用于推荐结果去重和续轮定位
* @param activityName 活动名称,供模型理解和客户端展示
* @param activityType 活动类型,如体育、健康、节日等
* @param suitableFor 活动适合人群描述,由上游接口或摘要规则提供
* @param tags 活动标签列表,用于补充活动主题特征
* @param location 活动地点或举办区域
* @param timeWindow 活动时间范围,通常为开始时间到结束时间
* @param summary 活动摘要信息,包含描述、主办方、管理员、规模等补充内容
*/
public record ActivitySummary(
String activityId,
String activityName,
String activityType,
String suitableFor,
List<String> tags,
String location,
String timeWindow,
String summary
) {
public ActivitySummary {
activityId = activityId == null ? "" : activityId.trim();
activityName = activityName == null ? "" : activityName.trim();
activityType = activityType == null ? "" : activityType.trim();
suitableFor = suitableFor == null ? "" : suitableFor.trim();
location = location == null ? "" : location.trim();
timeWindow = timeWindow == null ? "" : timeWindow.trim();
summary = summary == null ? "" : summary.trim();
tags = tags == null ? Collections.emptyList() : List.copyOf(tags);
}
}
package com.infoepoch.pms.agent.domain.care.model;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
/**
* 会话短记忆。
*
* @param sessionId 会话唯一标识,由调用方透传
* @param taskType 当前会话对应的任务类型
* @param requestedCount 当前轮次希望返回的推荐数量
* @param activityIntent 当前会话识别出的活动语义或活动名称
* @param preferenceSummary 当前会话识别出的偏好摘要
* @param searchConditions 当前会话累计的筛选条件
* @param nextPageNo 推荐用户场景下下一次继续检索的分页页码
* @param deliveredIds 当前会话已经下发给客户端的活动或用户 ID 集合
* @param updatedAt 会话状态最后更新时间
*/
public record CareConversationState(
String sessionId,
CareTaskType taskType,
int requestedCount,
String activityIntent,
String preferenceSummary,
Map<String, Object> searchConditions,
int nextPageNo,
Set<String> deliveredIds,
LocalDateTime updatedAt
) {
public CareConversationState {
searchConditions = sanitize(searchConditions);
deliveredIds = deliveredIds == null ? Collections.emptySet() : Collections.unmodifiableSet(new LinkedHashSet<>(deliveredIds));
activityIntent = activityIntent == null ? "" : activityIntent.trim();
preferenceSummary = preferenceSummary == null ? "" : preferenceSummary.trim();
updatedAt = updatedAt == null ? LocalDateTime.now() : updatedAt;
}
/**
* 基于当前查询创建一份初始会话状态。
*/
public static CareConversationState initial(String sessionId, CareQuery query) {
return new CareConversationState(
sessionId,
query.taskType(),
query.requestedCount(),
query.activityIntent(),
query.preferenceSummary(),
query.searchConditions(),
1,
Collections.emptySet(),
LocalDateTime.now()
);
}
/**
* 推进会话游标,并累计本轮已经下发的推荐对象。
*/
public CareConversationState advance(int nextPageNo, Collection<String> newDeliveredIds) {
LinkedHashSet<String> ids = new LinkedHashSet<>(deliveredIds);
if (newDeliveredIds != null) {
ids.addAll(newDeliveredIds);
}
return new CareConversationState(
sessionId,
taskType,
requestedCount,
activityIntent,
preferenceSummary,
searchConditions,
nextPageNo,
ids,
LocalDateTime.now()
);
}
/**
* 使用新的理解结果覆盖当前会话的任务语义,并重置分页与已下发记录。
*/
public CareConversationState rewriteFromQuery(CareQuery query) {
return new CareConversationState(
sessionId,
query.taskType(),
query.requestedCount(),
query.activityIntent(),
query.preferenceSummary(),
query.searchConditions(),
1,
Collections.emptySet(),
LocalDateTime.now()
);
}
/**
* 清洗筛选条件,仅保留非空键值对。
*/
private static Map<String, Object> sanitize(Map<String, Object> searchConditions) {
if (searchConditions == null || searchConditions.isEmpty()) {
return Collections.emptyMap();
}
LinkedHashMap<String, Object> sanitized = new LinkedHashMap<>();
searchConditions.forEach((key, value) -> {
if (key != null && value != null) {
sanitized.put(key, value);
}
});
return Map.copyOf(sanitized);
}
}
package com.infoepoch.pms.agent.domain.care.model;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 用户问题理解结果。
*
* @param taskType 当前输入归属的任务类型
* @param requestedCount 本轮希望返回的推荐数量
* @param continuation 当前输入是否属于“再来一些”等续轮请求
* @param originalMessage 用户本轮原始输入
* @param activityIntent 从输入中识别出的活动语义或活动名称
* @param preferenceSummary 从输入中提取的偏好摘要
* @param searchConditions 可直接用于工具检索的筛选条件
* @param directReply 预留的直接回复内容
*/
public record CareQuery(
CareTaskType taskType,
int requestedCount,
boolean continuation,
String originalMessage,
String activityIntent,
String preferenceSummary,
Map<String, Object> searchConditions,
String directReply
) {
public CareQuery {
searchConditions = sanitize(searchConditions);
activityIntent = activityIntent == null ? "" : activityIntent.trim();
preferenceSummary = preferenceSummary == null ? "" : preferenceSummary.trim();
directReply = directReply == null ? "" : directReply.trim();
}
/**
* 清洗筛选条件,仅保留非空键值对。
*/
private static Map<String, Object> sanitize(Map<String, Object> searchConditions) {
if (searchConditions == null || searchConditions.isEmpty()) {
return Collections.emptyMap();
}
LinkedHashMap<String, Object> sanitized = new LinkedHashMap<>();
searchConditions.forEach((key, value) -> {
if (key != null && value != null) {
sanitized.put(key, value);
}
});
return Map.copyOf(sanitized);
}
}
package com.infoepoch.pms.agent.domain.care.model;
/**
* 智能体任务类型
*/
public enum CareTaskType {
/** 为当前用户推荐适合参加的活动。 */
RECOMMEND_ACTIVITIES,
/** 根据活动语义推荐适合参加的用户。 */
RECOMMEND_USERS,
/** 处理自然对话场景。 */
GENERAL_CHAT;
/**
* 判断当前任务是否属于推荐链路。
*/
public boolean isRecommendation() {
return this == RECOMMEND_ACTIVITIES || this == RECOMMEND_USERS;
}
}
package com.infoepoch.pms.agent.domain.care.model;
import java.util.Collections;
import java.util.List;
/**
* 分页结果。
*
* @param records 当前页记录列表
* @param total 当前查询命中的总记录数
* @param hasNext 是否存在下一页数据
*/
public record PagedResult<T>(
List<T> records,
long total,
boolean hasNext
) {
public PagedResult {
records = records == null ? Collections.emptyList() : List.copyOf(records);
}
}
package com.infoepoch.pms.agent.domain.care.model;
import java.util.Collections;
import java.util.List;
/**
* 用户画像摘要。
*
* @param userId 用户唯一标识,用于推荐结果去重和续轮定位
* @param userName 用户名称,供模型理解和客户端展示
* @param age 用户年龄
* @param gender 用户性别
* @param interests 用户兴趣爱好列表
* @param tags 用户标签列表,如特长、画像标签等
* @param region 用户所属区域、部门或组织归属
* @param summary 用户补充摘要,通常包含健康指标或其他辅助说明
*/
public record UserProfileSummary(
String userId,
String userName,
Integer age,
String gender,
List<String> interests,
List<String> tags,
String region,
String summary
) {
public UserProfileSummary {
userId = userId == null ? "" : userId.trim();
userName = userName == null ? "" : userName.trim();
gender = gender == null ? "" : gender.trim();
region = region == null ? "" : region.trim();
summary = summary == null ? "" : summary.trim();
interests = interests == null ? Collections.emptyList() : List.copyOf(interests);
tags = tags == null ? Collections.emptyList() : List.copyOf(tags);
}
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.understanding.CareQueryUnderstandingService;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
/**
* 推荐编排入口
*/
@Component
public class CareRecommendationOrchestrator {
private final CareQueryUnderstandingService understandingService;
private final CareConversationStateService conversationStateService;
private final ActivityRecommendationExecutor activityRecommendationExecutor;
private final UserRecommendationExecutor userRecommendationExecutor;
private final GeneralConversationExecutor generalConversationExecutor;
public CareRecommendationOrchestrator(CareQueryUnderstandingService understandingService,
CareConversationStateService conversationStateService,
ActivityRecommendationExecutor activityRecommendationExecutor,
UserRecommendationExecutor userRecommendationExecutor,
GeneralConversationExecutor generalConversationExecutor) {
this.understandingService = understandingService;
this.conversationStateService = conversationStateService;
this.activityRecommendationExecutor = activityRecommendationExecutor;
this.userRecommendationExecutor = userRecommendationExecutor;
this.generalConversationExecutor = generalConversationExecutor;
}
/**
* 统一编排推荐请求:理解用户意图后分发到具体执行器。
*/
public Flux<String> stream(String traceId, String sessionId, String message) {
CareConversationState currentState = conversationStateService.load(sessionId).orElse(null);
CareQuery query = understandingService.understand(traceId, sessionId, message, currentState);
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"路由分发结果",
"任务类型=%s,是否续轮=%s".formatted(
toChineseTaskType(query.taskType()),
query.continuation()
)
));
if (query.taskType() == CareTaskType.GENERAL_CHAT) {
return generalConversationExecutor.execute(traceId, sessionId, query);
}
if (query.taskType() == CareTaskType.RECOMMEND_USERS) {
return userRecommendationExecutor.execute(traceId, sessionId, query, currentState);
}
return activityRecommendationExecutor.execute(traceId, sessionId, query, currentState);
}
/**
* 将任务类型转换成中文展示。
*/
private String toChineseTaskType(CareTaskType taskType) {
if (taskType == null) {
return "未知";
}
return switch (taskType) {
case RECOMMEND_ACTIVITIES -> "推荐活动";
case RECOMMEND_USERS -> "推荐用户";
case GENERAL_CHAT -> "通用对话";
};
}
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.streaming.OutputType;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import org.springframework.ai.chat.messages.AbstractMessage;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 通用对话执行器
*/
@Component
public class GeneralConversationExecutor {
private final CareStreamingResponseAssembler responseAssembler;
private final ReactAgent careAgent;
public GeneralConversationExecutor(CareStreamingResponseAssembler responseAssembler,
@Qualifier("CareAgent") ReactAgent careAgent) {
this.responseAssembler = responseAssembler;
this.careAgent = careAgent;
}
/**
* 执行通用对话。
*/
public Flux<String> execute(String traceId, String sessionId, CareQuery query) {
String prompt = query.originalMessage();
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"通用对话开始",
"用户输入=%s".formatted(CareTraceLogSupport.safeMessageSummary(query.originalMessage()))
));
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"模型调用开始",
"场景=通用对话,prompt长度=%s,提示词摘要=%s".formatted(
CareTraceLogSupport.promptLength(prompt),
CareTraceLogSupport.safeMessageSummary(prompt)
)
));
LogHelper.info(this, CareTraceLogSupport.formatModelPrompt(
traceId,
sessionId,
"通用对话模型请求",
prompt
));
try {
StringBuilder responseBuffer = new StringBuilder();
AtomicInteger chunkIndex = new AtomicInteger(0);
return careAgent.stream(prompt)
.filter(StreamingOutput.class::isInstance)
.map(StreamingOutput.class::cast)
.filter(output -> output.getOutputType() == OutputType.AGENT_MODEL_STREAMING)
.map(StreamingOutput::message)
.filter(AssistantMessage.class::isInstance)
.map(AssistantMessage.class::cast)
.map(AbstractMessage::getText)
.filter(StringUtils::hasText)
.switchIfEmpty(Flux.defer(() -> {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"通用对话兜底",
"模型未返回有效内容"
));
return Flux.just(responseAssembler.buildGeneralFallbackReply());
}))
.doOnComplete(() -> {
LogHelper.info(this, CareTraceLogSupport.formatModelResponse(
traceId,
sessionId,
"通用对话模型完整返回",
responseBuffer.toString()
));
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"通用对话完成",
"通用对话处理完成"
));
})
.doOnError(error -> LogHelper.error(this, CareTraceLogSupport.format(
traceId,
sessionId,
"流式请求异常",
"通用对话失败,异常=" + CareTraceLogSupport.safeText(error.getMessage()) +
",已接收chunk数=%s".formatted(chunkIndex.get())
), error));
} catch (Exception e) {
LogHelper.error(this, CareTraceLogSupport.format(
traceId,
sessionId,
"流式请求异常",
"通用对话立即失败,异常=" + CareTraceLogSupport.safeText(e.getMessage())
), e);
return Flux.error(new IllegalArgumentException("调用模型失败"));
}
}
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.UserSearchTool;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
/**
* 推荐用户执行器
*/
@Component
public class UserRecommendationExecutor {
private final UserSearchTool userSearchTool;
private final CareConversationStateService conversationStateService;
private final CareStreamingResponseAssembler responseAssembler;
private final CareAgentProperties properties;
public UserRecommendationExecutor(UserSearchTool userSearchTool,
CareConversationStateService conversationStateService,
CareStreamingResponseAssembler responseAssembler,
CareAgentProperties properties) {
this.userSearchTool = userSearchTool;
this.conversationStateService = conversationStateService;
this.responseAssembler = responseAssembler;
this.properties = properties;
}
/**
* 执行用户推荐:按页拉取、去重、累计并持续输出。
*/
public Flux<String> execute(String traceId, String sessionId, CareQuery query, CareConversationState state) {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"用户推荐开始",
"目标数量=%s,是否续轮=%s,筛选条件=%s".formatted(
query.requestedCount(),
query.continuation(),
CareTraceLogSupport.safeConditionsSummary(query.searchConditions())
)
));
try {
UserRecommendationPlan plan = prepareUserRecommendationPlan(traceId, sessionId, query, state);
Duration itemDelay = Duration.ofMillis(Math.max(0, properties.getUserStreamItemDelayMillis()));
CareConversationState currentState = state == null
? CareConversationState.initial(sessionId, query)
: state.rewriteFromQuery(query);
CareConversationState nextState = currentState.advance(plan.nextPageNo(), plan.deliveredIds());
return Flux.concat(
Flux.just(responseAssembler.buildUserIntro(query)),
buildDelayedUserItemsFlux(plan.userChunks(), itemDelay),
buildClosingFlux(
responseAssembler.buildUserClosing(plan.emittedCount(), query.requestedCount()),
itemDelay,
!plan.userChunks().isEmpty()
)
)
.doOnComplete(() -> {
conversationStateService.save(nextState);
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"用户推荐完成",
"输出数量=%s,累计已下发=%s,下一页=%s,结束原因=%s".formatted(
plan.emittedCount(),
plan.deliveredIds().size(),
plan.nextPageNo(),
plan.stopReason()
)
));
})
.doOnError(error -> LogHelper.error(this, CareTraceLogSupport.format(
traceId,
sessionId,
"流式请求异常",
"用户推荐失败,异常=" + CareTraceLogSupport.safeText(error.getMessage())
), error));
} catch (Exception exception) {
LogHelper.error(this, CareTraceLogSupport.format(
traceId,
sessionId,
"流式请求异常",
"用户推荐失败,异常=" + CareTraceLogSupport.safeText(exception.getMessage())
), exception);
return Flux.error(new IllegalArgumentException("用户推荐过程中出现异常"));
}
}
/**
* 同步完成分页检索与去重,生成本轮输出计划。
*/
private UserRecommendationPlan prepareUserRecommendationPlan(String traceId,
String sessionId,
CareQuery query,
CareConversationState state) {
int nextPageNo = state != null && query.continuation() ? state.nextPageNo() : 1;
LinkedHashSet<String> deliveredIds = state == null
? new LinkedHashSet<>()
: new LinkedHashSet<>(state.deliveredIds());
List<String> userChunks = new ArrayList<>();
int emittedCount = 0;
String stopReason = "正常完成";
while (emittedCount < query.requestedCount()) {
PagedResult<UserProfileSummary> page = userSearchTool.execute(
traceId,
sessionId,
query.searchConditions(),
nextPageNo,
properties.getUserPageSize()
);
if (page.records().isEmpty()) {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"用户分页已加载",
"页码=%s,每页数量=%s,返回数量=0,总数=%s".formatted(
nextPageNo,
properties.getUserPageSize(),
page.total()
)
));
stopReason = "空页结束";
break;
}
List<UserProfileSummary> freshUsers = page.records().stream()
.filter(user -> !deliveredIds.contains(user.userId()))
.limit(query.requestedCount() - emittedCount)
.toList();
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"用户分页已加载",
"页码=%s,每页数量=%s,返回数量=%s,去重后新增=%s,总数=%s,输出前累计=%s".formatted(
nextPageNo,
properties.getUserPageSize(),
page.records().size(),
freshUsers.size(),
page.total(),
emittedCount
)
));
userChunks.addAll(responseAssembler.buildUserItems(freshUsers, emittedCount + 1, query));
for (UserProfileSummary user : freshUsers) {
deliveredIds.add(user.userId());
}
emittedCount += freshUsers.size();
nextPageNo++;
if (emittedCount >= query.requestedCount()) {
stopReason = "达到目标数量";
break;
}
if (!page.hasNext()) {
stopReason = "无下一页";
break;
}
}
return new UserRecommendationPlan(userChunks, emittedCount, nextPageNo, deliveredIds, stopReason);
}
/**
* 将用户推荐条目转换成逐条延迟输出的假流式。
*/
private Flux<String> buildDelayedUserItemsFlux(List<String> userChunks, Duration itemDelay) {
if (userChunks.isEmpty()) {
return Flux.empty();
}
if (itemDelay.isZero() || itemDelay.isNegative()) {
return Flux.fromIterable(userChunks);
}
return Flux.fromIterable(userChunks)
.concatMap(chunk -> Mono.just(chunk).delayElement(itemDelay));
}
/**
* 生成收尾文案,若已输出用户则沿用同样节奏延后显示。
*/
private Flux<String> buildClosingFlux(String closing, Duration itemDelay, boolean hasUserChunks) {
if (!hasUserChunks || itemDelay.isZero() || itemDelay.isNegative()) {
return Flux.just(closing);
}
return Mono.just(closing)
.delaySubscription(itemDelay)
.flux();
}
private record UserRecommendationPlan(
List<String> userChunks,
int emittedCount,
int nextPageNo,
LinkedHashSet<String> deliveredIds,
String stopReason
) {
}
}
package com.infoepoch.pms.agent.domain.care.state;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.config.JsonUtils;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.properties.CareAgentProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.Optional;
/**
* 会话状态缓存
*/
@Service
@RequiredArgsConstructor
public class CareConversationStateService {
private static final String CARE_STATE_KEY_PREFIX = "care:state:";
private final RedisTemplate<String, Object> redisTemplate;
private final CareAgentProperties properties;
/**
* 按 sessionId 读取短会话状态。
*/
public Optional<CareConversationState> load(String sessionId) {
if (!StringUtils.hasText(sessionId)) {
return Optional.empty();
}
Object cached = redisTemplate.opsForValue().get(buildKey(sessionId));
if (cached == null) {
return Optional.empty();
}
String cachedType = cached.getClass().getName();
try {
String json = JsonUtils.objectToJson(cached);
if (!StringUtils.hasText(json)) {
LogHelper.error(this, "读取会话状态转换失败,sessionId={},cachedType={},原因=对象转JSON为空", sessionId, cachedType);
return Optional.empty();
}
CareConversationState state = JsonUtils.jsonToObject(json, CareConversationState.class);
return Optional.ofNullable(state);
} catch (RuntimeException exception) {
LogHelper.error(
this,
"读取会话状态转换失败,sessionId={},cachedType={},异常={}",
sessionId,
cachedType,
exception.getMessage()
);
return Optional.empty();
}
}
/**
* 写入短会话状态并刷新 TTL。
*/
public void save(CareConversationState state) {
if (state == null || !StringUtils.hasText(state.sessionId())) {
return;
}
redisTemplate.opsForValue().set(
buildKey(state.sessionId()),
state,
Duration.ofMinutes(properties.getConversationTtlMinutes())
);
}
/**
* 生成 Redis key。
*/
private String buildKey(String sessionId) {
return CARE_STATE_KEY_PREFIX + sessionId;
}
}
package com.infoepoch.pms.agent.domain.care.stream;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.StringJoiner;
/**
* 用户可见文案组装
*/
@Component
public class CareStreamingResponseAssembler {
/**
* 将直接回复转换成可直接流式返回的文本片段。
*/
public List<String> directReplyChunks(String message) {
return List.of(ensureTrailingLineBreak(message));
}
/**
* 生成通用对话场景的兜底回复。
*/
public String buildGeneralFallbackReply() {
return """
你好,我是精准关爱智能体。
请换一种说法,或者补充更多信息后再试一次。
""";
}
/**
* 生成活动推荐场景的标题和开场文案。
*/
public String buildActivityIntro(CareQuery query) {
if (query.continuation()) {
return """
### 活动推荐
我继续为你补充更匹配的活动:
""";
}
return """
### 活动推荐
已为你整理出以下更匹配的活动:
""";
}
/**
* 生成活动推荐场景的收尾文案。
*/
public String buildActivityClosing(int deliveredCount, int requestedCount) {
if (deliveredCount <= 0) {
return "暂时没有查询到可推荐的活动,建议稍后重试或补充更具体的偏好条件。\n";
}
if (deliveredCount < requestedCount) {
return "\n目前先为你整理到这些更匹配的活动,如需继续收窄偏好,也可以直接告诉我。\n";
}
return "\n如需继续推荐更多活动,可以直接说“再来一些”。\n";
}
/**
* 生成用户推荐场景的标题和开场文案。
*/
public String buildUserIntro(CareQuery query) {
if (query.continuation()) {
return """
### 用户推荐
我继续为你补充更匹配的用户:
""";
}
if (StringUtils.hasText(query.activityIntent())) {
return """
### 用户推荐
已为你整理出以下和“%s”更匹配的用户:
""".formatted(query.activityIntent());
}
return """
### 用户推荐
已为你整理出以下更匹配的用户:
""";
}
/**
* 把用户列表转换成连续编号的 Markdown 展示文案。
*/
public List<String> buildUserItems(List<UserProfileSummary> users, int startIndex, CareQuery query) {
List<String> lines = new ArrayList<>();
int counter = startIndex;
for (UserProfileSummary user : users) {
lines.add(formatUserLine(counter++, user, query));
}
return lines;
}
/**
* 生成用户推荐场景的收尾文案。
*/
public String buildUserClosing(int deliveredCount, int requestedCount) {
if (deliveredCount <= 0) {
return "暂时没有整理到符合条件的用户,建议补充更具体的活动描述或筛选条件。\n";
}
if (deliveredCount < requestedCount) {
return "\n目前先为你整理到这些更匹配的用户,如需继续收窄条件,也可以直接告诉我。\n";
}
return "\n如需继续补充更多用户,可以直接说“再来一些”。\n";
}
/**
* 生成活动候选为空时的兜底提示。
*/
public String buildNoActivityDataMessage() {
return "暂时没有查询到可推荐的活动,建议稍后重试或补充更具体的偏好条件。\n";
}
/**
* 生成按任务类型区分的通用错误文案。
*/
public String buildGenericErrorMessage(CareTaskType taskType) {
if (taskType == CareTaskType.RECOMMEND_USERS) {
return "用户推荐过程中出现异常,请稍后重试。\n";
}
if (taskType == CareTaskType.RECOMMEND_ACTIVITIES) {
return "活动推荐过程中出现异常,请稍后重试。\n";
}
return "智能体回复过程中出现异常,请稍后重试。\n";
}
/**
* 生成活动推荐条目为空时的模板化兜底提示。
*/
public String buildNoActivityTemplate(CareQuery query) {
return buildActivityIntro(query) + buildNoActivityDataMessage();
}
/**
* 将单个用户格式化成 Markdown 推荐条目。
*/
private String formatUserLine(int index, UserProfileSummary user, CareQuery query) {
StringBuilder builder = new StringBuilder();
builder.append(index)
.append(". **")
.append(resolveDisplayName(user))
.append("**\n");
String basicInfo = buildUserBasicInfo(user);
if (StringUtils.hasText(basicInfo)) {
builder.append(" 基本信息:").append(basicInfo).append('\n');
}
builder.append(" 匹配理由:").append(buildUserReason(user, query)).append('\n');
if (StringUtils.hasText(user.summary())) {
builder.append(" 补充说明:").append(user.summary()).append('\n');
}
builder.append('\n');
return builder.toString();
}
/**
* 生成用户展示名称。
*/
private String resolveDisplayName(UserProfileSummary user) {
if (StringUtils.hasText(user.userName())) {
return user.userName();
}
if (StringUtils.hasText(user.userId())) {
return "用户" + user.userId();
}
return "未命名用户";
}
/**
* 汇总用户基础信息为一行展示文本。
*/
private String buildUserBasicInfo(UserProfileSummary user) {
StringJoiner joiner = new StringJoiner(" / ");
if (user.age() != null) {
joiner.add(user.age() + "岁");
}
if (StringUtils.hasText(user.gender())) {
joiner.add(user.gender());
}
if (StringUtils.hasText(user.region())) {
joiner.add(user.region());
}
return joiner.toString();
}
/**
* 生成用户推荐条目的匹配理由。
*/
private String buildUserReason(UserProfileSummary user, CareQuery query) {
List<String> reasonParts = new ArrayList<>();
String target = StringUtils.hasText(query.activityIntent())
? "“" + query.activityIntent() + "”"
: "当前活动需求";
reasonParts.add("结合" + target + "的需求,该用户与当前筛选方向较为匹配");
if (!CollectionUtils.isEmpty(user.interests())) {
reasonParts.add("兴趣偏好为" + String.join("、", user.interests()));
}
if (!CollectionUtils.isEmpty(user.tags())) {
reasonParts.add("特征标签为" + String.join("、", user.tags()));
} else if (StringUtils.hasText(user.region())) {
reasonParts.add("所在区域为" + user.region());
}
return String.join(",", reasonParts) + "。";
}
/**
* 确保返回给前端的片段以换行结束,便于连续阅读。
*/
private String ensureTrailingLineBreak(String message) {
if (!StringUtils.hasText(message)) {
return "\n";
}
return message.endsWith("\n") ? message : message + "\n";
}
}
package com.infoepoch.pms.agent.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 精准关爱智能体参数
*/
@Getter
@Setter
@ConfigurationProperties(prefix = "pms.care.agent")
public class CareAgentProperties {
private int defaultCount = 10;
private int maxRequestedCount = 500;
private int userPageSize = 100;
private int activityPageSize = 50;
private int activityCandidateLimit = 40;
private long userStreamItemDelayMillis = 250;
private long conversationTtlMinutes = 120;
}
package com.infoepoch.pms.agent.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 精准关爱业务接口配置
*/
@Getter
@Setter
@ConfigurationProperties(prefix = "pms.care.business")
public class CareBusinessProperties {
private String baseUrl;
private String currentUserPath;
private String eligibleActivitiesPath;
private String userSearchPath;
private String activityMatchUserRulesPath;
}
package com.infoepoch.pms.agent.tool.union.care;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
/**
* 当前用户信息工具
*/
@Component
@RequiredArgsConstructor
public class CurrentUserProfileTool {
private final CareBusinessDataProvider careBusinessDataProvider;
/**
* 查询当前会话绑定用户的基础画像信息。
*/
public UserProfileSummary execute(String traceId, String sessionId) {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"当前用户工具分发",
"调用当前用户信息工具"
));
return careBusinessDataProvider.getCurrentUserProfile(traceId, sessionId);
}
}
package com.infoepoch.pms.agent.tool.union.care;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 可参加活动查询工具
*/
@Component
@RequiredArgsConstructor
public class EligibleActivitiesTool {
private final CareBusinessDataProvider careBusinessDataProvider;
/**
* 根据会话用户一次性查询可推荐活动列表。
*/
public List<ActivitySummary> execute(String traceId, String sessionId) {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"活动工具分发",
"调用可推荐活动工具"
));
return careBusinessDataProvider.searchEligibleActivities(traceId, sessionId);
}
}
package com.infoepoch.pms.agent.tool.union.care;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 用户检索工具
*/
@Component
@RequiredArgsConstructor
public class UserSearchTool {
private final CareBusinessDataProvider careBusinessDataProvider;
/**
* 按接口文档支持的筛选条件分页检索用户。
*/
public PagedResult<UserProfileSummary> execute(String traceId,
String sessionId,
Map<String, Object> conditions,
int pageNo,
int pageSize) {
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"用户检索工具分发",
"页码=%s,每页数量=%s,筛选条件=%s".formatted(
pageNo,
pageSize,
CareTraceLogSupport.safeConditionsSummary(conditions)
)
));
return careBusinessDataProvider.searchUsersByConditions(traceId, sessionId, conditions, pageNo, pageSize);
}
}
package com.infoepoch.pms.agent.tool.union.care.provider;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
import com.infoepoch.pms.agent.domain.care.model.ActivityMatchUserRule;
import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import java.util.List;
import java.util.Map;
/**
* 精准关爱业务数据提供者
*/
public interface CareBusinessDataProvider {
UserProfileSummary getCurrentUserProfile(String traceId, String sessionId);
List<ActivitySummary> searchEligibleActivities(String traceId, String sessionId);
List<ActivityMatchUserRule> listActivityMatchUserRules(String traceId, String sessionId);
PagedResult<UserProfileSummary> searchUsersByConditions(String traceId,
String sessionId,
Map<String, Object> conditions,
int pageNo,
int pageSize);
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.CurrentUserProfileTool;
import com.infoepoch.pms.agent.tool.union.care.EligibleActivitiesTool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ActivityRecommendationExecutorTest {
@Mock
private CurrentUserProfileTool currentUserProfileTool;
@Mock
private EligibleActivitiesTool eligibleActivitiesTool;
@Mock
private CareConversationStateService conversationStateService;
@Mock
private ReactAgent careAgent;
private ActivityRecommendationExecutor executor;
@BeforeEach
void setUp() {
CareAgentProperties properties = new CareAgentProperties();
properties.setActivityCandidateLimit(4);
executor = new ActivityRecommendationExecutor(
currentUserProfileTool,
eligibleActivitiesTool,
conversationStateService,
new CareStreamingResponseAssembler(),
properties,
careAgent
);
}
@Test
void shouldSaveOnlyDisplayedActivitiesAfterSuccessfulCompletion() throws Exception {
CareQuery query = new CareQuery(
CareTaskType.RECOMMEND_ACTIVITIES,
2,
false,
"推荐活动",
"",
"推荐活动",
java.util.Map.of(),
""
);
CareConversationState state = new CareConversationState(
"session-1",
CareTaskType.RECOMMEND_ACTIVITIES,
2,
"",
"推荐活动",
java.util.Map.of(),
1,
Set.of("old-1"),
LocalDateTime.now()
);
when(currentUserProfileTool.execute("trace-1", "session-1"))
.thenReturn(new UserProfileSummary("u-1", "张三", 68, "男", List.of("健步"), List.of("活跃"), "浦东", ""));
when(eligibleActivitiesTool.execute("trace-1", "session-1"))
.thenReturn(List.of(
activity("a-1"),
activity("a-2"),
activity("a-3"),
activity("a-4")
));
when(careAgent.stream(anyString())).thenReturn((Flux) Flux.empty());
executor.execute("trace-1", "session-1", query, state).collectList().block();
ArgumentCaptor<CareConversationState> captor = ArgumentCaptor.forClass(CareConversationState.class);
verify(conversationStateService).save(captor.capture());
assertEquals(Set.of("old-1", "a-1", "a-2"), captor.getValue().deliveredIds());
}
@Test
void shouldNotSaveDeliveredActivitiesWhenStreamingFails() throws Exception {
CareQuery query = new CareQuery(
CareTaskType.RECOMMEND_ACTIVITIES,
2,
false,
"推荐活动",
"",
"推荐活动",
java.util.Map.of(),
""
);
when(currentUserProfileTool.execute("trace-1", "session-1"))
.thenReturn(new UserProfileSummary("u-1", "张三", 68, "男", List.of(), List.of(), "浦东", ""));
when(eligibleActivitiesTool.execute("trace-1", "session-1"))
.thenReturn(List.of(activity("a-1"), activity("a-2"), activity("a-3")));
when(careAgent.stream(anyString())).thenReturn((Flux) Flux.error(new RuntimeException("boom")));
assertThrows(RuntimeException.class,
() -> executor.execute("trace-1", "session-1", query, null).collectList().block());
verify(conversationStateService, never()).save(org.mockito.ArgumentMatchers.any());
}
private ActivitySummary activity(String id) {
return new ActivitySummary(id, "活动" + id, "运动", "老人", List.of("户外"), "浦东", "周三", "摘要");
}
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Method;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class GeneralConversationExecutorTest {
@Test
void shouldBuildPromptWithOnlyRoleAndUserMessage() throws Exception {
GeneralConversationExecutor executor = new GeneralConversationExecutor(
new CareStreamingResponseAssembler(),
null
);
Method method = GeneralConversationExecutor.class.getDeclaredMethod("buildPrompt", String.class);
method.setAccessible(true);
String prompt = (String) method.invoke(executor, "活动历史有哪些");
assertTrue(prompt.contains("请以精准关爱智能体的身份回复用户。"));
assertTrue(prompt.contains("用户输入:活动历史有哪些"));
assertFalse(prompt.contains("你只有以下三个技能"));
assertFalse(prompt.contains("有限通用对话"));
assertFalse(prompt.contains("暂无推荐"));
assertFalse(prompt.contains("不扩展成百科问答或开放世界知识回答"));
assertFalse(prompt.contains("引导用户改问"));
}
}
package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.UserSearchTool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class UserRecommendationExecutorTest {
@Mock
private UserSearchTool userSearchTool;
@Mock
private CareConversationStateService conversationStateService;
private UserRecommendationExecutor executor;
@BeforeEach
void setUp() {
CareAgentProperties properties = new CareAgentProperties();
properties.setUserPageSize(100);
properties.setUserStreamItemDelayMillis(250);
executor = new UserRecommendationExecutor(
userSearchTool,
conversationStateService,
new CareStreamingResponseAssembler(),
properties
);
}
@Test
void shouldKeepDifferentUsersWhenNameMatchesButMobileDiffers() {
CareQuery query = new CareQuery(
CareTaskType.RECOMMEND_USERS,
2,
false,
"推荐用户",
"健步走",
"推荐用户",
Map.of(),
""
);
when(userSearchTool.execute(eq("trace-1"), eq("session-1"), anyMap(), eq(1), anyInt()))
.thenReturn(new PagedResult<>(
List.of(
user("user:zhang-138", "张三", "13800000000"),
user("user:zhang-139", "张三", "13900000000")
),
2,
false
));
StepVerifier.withVirtualTime(() -> executor.execute("trace-1", "session-1", query, null))
.expectSubscription()
.expectNextMatches(chunk -> chunk.contains("### 用户推荐"))
.expectNoEvent(Duration.ofMillis(249))
.thenAwait(Duration.ofMillis(1))
.expectNextMatches(chunk -> chunk.contains("1. **张三**"))
.expectNoEvent(Duration.ofMillis(249))
.thenAwait(Duration.ofMillis(1))
.expectNextMatches(chunk -> chunk.contains("2. **张三**"))
.expectNoEvent(Duration.ofMillis(249))
.thenAwait(Duration.ofMillis(1))
.expectNextMatches(chunk -> chunk.contains("如需继续补充更多用户"))
.verifyComplete();
ArgumentCaptor<CareConversationState> captor = ArgumentCaptor.forClass(CareConversationState.class);
verify(conversationStateService).save(captor.capture());
assertEquals(Set.of("user:zhang-138", "user:zhang-139"), captor.getValue().deliveredIds());
}
@Test
void shouldDeduplicateUsersWhenNameAndMobileBothMatch() {
CareQuery query = new CareQuery(
CareTaskType.RECOMMEND_USERS,
2,
true,
"再来一些",
"健步走",
"推荐用户",
Map.of(),
""
);
CareConversationState state = new CareConversationState(
"session-1",
CareTaskType.RECOMMEND_USERS,
2,
"健步走",
"推荐用户",
Map.of(),
2,
Set.of("user:zhang-138"),
LocalDateTime.now()
);
when(userSearchTool.execute(eq("trace-1"), eq("session-1"), anyMap(), eq(2), anyInt()))
.thenReturn(new PagedResult<>(
List.of(
user("user:zhang-138", "张三", "13800000000"),
user("user:li-139", "李四", "13900000000")
),
2,
false
));
StepVerifier.withVirtualTime(() -> executor.execute("trace-1", "session-1", query, state))
.expectSubscription()
.expectNextMatches(chunk -> chunk.contains("### 用户推荐"))
.expectNoEvent(Duration.ofMillis(249))
.thenAwait(Duration.ofMillis(1))
.expectNextMatches(chunk -> chunk.contains("1. **李四**"))
.expectNoEvent(Duration.ofMillis(249))
.thenAwait(Duration.ofMillis(1))
.expectNextMatches(chunk -> chunk.contains("目前先为你整理到这些更匹配的用户"))
.verifyComplete();
ArgumentCaptor<CareConversationState> captor = ArgumentCaptor.forClass(CareConversationState.class);
verify(conversationStateService).save(captor.capture());
assertEquals(Set.of("user:zhang-138", "user:li-139"), captor.getValue().deliveredIds());
}
private UserProfileSummary user(String userId, String name, String mobile) {
return new UserProfileSummary(
userId,
name,
65,
"男",
List.of(),
List.of(),
"部门A",
"手机号:" + mobile
);
}
}
package com.infoepoch.pms.agent.domain.care.state;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.understanding.CareQueryUnderstandingService;
import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CareConversationStateServiceTest {
@Mock
private RedisTemplate<String, Object> redisTemplate;
@Mock
private ValueOperations<String, Object> valueOperations;
private CareConversationStateService service;
@BeforeEach
void setUp() {
CareAgentProperties properties = new CareAgentProperties();
properties.setConversationTtlMinutes(120);
when(redisTemplate.opsForValue()).thenReturn(valueOperations);
service = new CareConversationStateService(redisTemplate, properties);
}
@Test
void shouldLoadTypedConversationStateDirectly() {
CareConversationState expectedState = createState();
when(valueOperations.get("care:state:session-1")).thenReturn(expectedState);
Optional<CareConversationState> loaded = service.load("session-1");
assertTrue(loaded.isPresent());
assertEquals(expectedState, loaded.orElseThrow());
}
@Test
void shouldConvertLinkedHashMapToConversationState() {
CareConversationState expectedState = createState();
when(valueOperations.get("care:state:session-1")).thenReturn(toCachedMap(expectedState));
Optional<CareConversationState> loaded = service.load("session-1");
assertTrue(loaded.isPresent());
CareConversationState actual = loaded.orElseThrow();
assertEquals(expectedState.sessionId(), actual.sessionId());
assertEquals(expectedState.taskType(), actual.taskType());
assertEquals(expectedState.requestedCount(), actual.requestedCount());
assertEquals(expectedState.activityIntent(), actual.activityIntent());
assertEquals(expectedState.preferenceSummary(), actual.preferenceSummary());
assertEquals(expectedState.searchConditions(), actual.searchConditions());
assertEquals(expectedState.nextPageNo(), actual.nextPageNo());
assertEquals(expectedState.deliveredIds(), actual.deliveredIds());
assertEquals(expectedState.updatedAt(), actual.updatedAt());
}
@Test
void shouldReturnEmptyWhenCachedObjectCannotConvertToConversationState() {
LinkedHashMap<String, Object> invalidState = new LinkedHashMap<>();
invalidState.put("sessionId", "session-1");
invalidState.put("taskType", "RECOMMEND_USERS");
invalidState.put("updatedAt", "not-a-date");
when(valueOperations.get("care:state:session-1")).thenReturn(invalidState);
Optional<CareConversationState> loaded = service.load("session-1");
assertFalse(loaded.isPresent());
}
@Test
void shouldReuseConvertedStateForContinuationQuery() {
when(valueOperations.get("care:state:session-1")).thenReturn(toCachedMap(createState()));
Optional<CareConversationState> loaded = service.load("session-1");
ChatModel chatModel = mock(ChatModel.class);
CareBusinessDataProvider dataProvider = mock(CareBusinessDataProvider.class);
CareAgentProperties properties = new CareAgentProperties();
properties.setDefaultCount(10);
properties.setMaxRequestedCount(500);
CareQueryUnderstandingService understandingService =
new CareQueryUnderstandingService(chatModel, properties, dataProvider);
CareQuery query = understandingService.understand("trace-1", "session-1", "再来一些", loaded.orElse(null));
assertTrue(loaded.isPresent());
assertTrue(query.continuation());
assertEquals(CareTaskType.RECOMMEND_USERS, query.taskType());
assertEquals("健步走活动", query.activityIntent());
assertEquals(Map.of("regionName", "浦东新区"), query.searchConditions());
verify(chatModel, never()).call(org.mockito.ArgumentMatchers.anyString());
}
private CareConversationState createState() {
return new CareConversationState(
"session-1",
CareTaskType.RECOMMEND_USERS,
25,
"健步走活动",
"推荐更多适合健步走的用户",
Map.of("regionName", "浦东新区"),
2,
Set.of("u-1", "u-2"),
LocalDateTime.of(2026, 4, 8, 14, 38, 5)
);
}
private Map<String, Object> toCachedMap(CareConversationState state) {
LinkedHashMap<String, Object> cached = new LinkedHashMap<>();
cached.put("sessionId", state.sessionId());
cached.put("taskType", state.taskType().name());
cached.put("requestedCount", state.requestedCount());
cached.put("activityIntent", state.activityIntent());
cached.put("preferenceSummary", state.preferenceSummary());
cached.put("searchConditions", new LinkedHashMap<>(state.searchConditions()));
cached.put("nextPageNo", state.nextPageNo());
cached.put("deliveredIds", List.copyOf(state.deliveredIds()));
cached.put("updatedAt", state.updatedAt().toString());
return cached;
}
}
package com.infoepoch.pms.agent.tool.union.care.provider;
import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.properties.CareBusinessProperties;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
class HttpCareBusinessDataProviderTest {
private HttpServer server;
@AfterEach
void tearDown() {
if (server != null) {
server.stop(0);
}
}
@Test
void shouldDifferentiateUsersWithSameMobileButDifferentNames() throws IOException {
HttpCareBusinessDataProvider provider = createProvider("""
{
"code": 1,
"msg": "ok",
"data": {
"totalCount": 2,
"dataList": [
{ "name": "张三", "mobile": "13800000000", "departmentName": "一部" },
{ "name": "李四", "mobile": "13800000000", "departmentName": "二部" }
]
}
}
""");
PagedResult<UserProfileSummary> result = provider.searchUsersByConditions(
"trace-1",
"session-1",
Map.of(),
1,
10
);
assertEquals(2, result.records().size());
assertNotEquals(result.records().get(0).userId(), result.records().get(1).userId());
}
@Test
void shouldIgnoreDepartmentWhenMobileIsBlank() throws IOException {
HttpCareBusinessDataProvider provider = createProvider("""
{
"code": 1,
"msg": "ok",
"data": {
"totalCount": 2,
"dataList": [
{ "name": "张三", "mobile": "", "departmentName": "一部" },
{ "name": "张三", "mobile": "", "departmentName": "二部" }
]
}
}
""");
PagedResult<UserProfileSummary> result = provider.searchUsersByConditions(
"trace-1",
"session-1",
Map.of(),
1,
10
);
assertEquals(2, result.records().size());
assertEquals(result.records().get(0).userId(), result.records().get(1).userId());
}
@Test
void shouldDifferentiateUsersWithSameNameButDifferentMobiles() throws IOException {
HttpCareBusinessDataProvider provider = createProvider("""
{
"code": 1,
"msg": "ok",
"data": {
"totalCount": 2,
"dataList": [
{ "name": "张三", "mobile": "13800000000", "departmentName": "一部" },
{ "name": "张三", "mobile": "13900000000", "departmentName": "一部" }
]
}
}
""");
PagedResult<UserProfileSummary> result = provider.searchUsersByConditions(
"trace-1",
"session-1",
Map.of(),
1,
10
);
assertEquals(2, result.records().size());
assertNotEquals(result.records().get(0).userId(), result.records().get(1).userId());
}
private HttpCareBusinessDataProvider createProvider(String responseBody) throws IOException {
server = HttpServer.create(new InetSocketAddress(0), 0);
server.createContext("/union-js/api/functionCallTools/getUserInfoList", exchange -> writeJsonResponse(exchange, responseBody));
server.start();
CareBusinessProperties properties = new CareBusinessProperties();
properties.setBaseUrl("http://127.0.0.1:" + server.getAddress().getPort());
properties.setUserSearchPath("/union-js/api/functionCallTools/getUserInfoList");
return new HttpCareBusinessDataProvider(properties);
}
private void writeJsonResponse(HttpExchange exchange, String responseBody) throws IOException {
byte[] body = responseBody.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=UTF-8");
exchange.sendResponseHeaders(200, body.length);
try (OutputStream outputStream = exchange.getResponseBody()) {
outputStream.write(body);
} finally {
exchange.close();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment