Commit 4f8cf0b6 authored by jiangyz's avatar jiangyz

日志

parent 5435092e
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.8</version> <version>3.5.13</version>
<relativePath/> <relativePath/>
</parent> </parent>
<groupId>com.infoepoch.pms</groupId> <groupId>com.infoepoch.pms</groupId>
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
<description>pms-agent</description> <description>pms-agent</description>
<properties> <properties>
<java.version>17</java.version> <java.version>21</java.version>
<lombok.version>1.18.38</lombok.version>
<redisson.version>3.45.0</redisson.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
...@@ -47,6 +49,10 @@ ...@@ -47,6 +49,10 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId> <artifactId>spring-boot-starter-tomcat</artifactId>
...@@ -72,8 +78,14 @@ ...@@ -72,8 +78,14 @@
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.jcraft</groupId> <groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId> <artifactId>jsch</artifactId>
...@@ -152,6 +164,7 @@ ...@@ -152,6 +164,7 @@
<path> <path>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</path> </path>
</annotationProcessorPaths> </annotationProcessorPaths>
</configuration> </configuration>
......
---
name: spring-ai-alibaba-hooks-interceptors
description: Build, review, or explain Spring AI Alibaba hook and interceptor customizations for ReactAgent and related agent workflows. Use when requests mention ReactAgent.builder().hooks(...), .interceptors(...), MessagesModelHook, ModelHook, AgentHook, ModelInterceptor, ToolInterceptor, HumanInTheLoopHook, SummarizationHook, ModelCallLimitHook, ToolRetryInterceptor, TodoListInterceptor, ToolSelectionInterceptor, ToolEmulatorInterceptor, ContextEditingInterceptor, or when Codex must decide whether a behavior belongs in a hook or an interceptor.
---
# Spring AI Alibaba Hooks And Interceptors
Use this skill to implement or explain how Spring AI Alibaba customizes agent execution around `ReactAgent`. Read [references/hooks-interceptors.md](./references/hooks-interceptors.md) when exact built-in options, execution order, or API shapes matter.
## Workflow
1. Start from the control goal.
Classify the request as one of: observe, transform, enforce, limit, pause for approval, or short-circuit execution. Pick the extension point from that goal instead of starting from class names.
2. Choose the narrowest extension point that can solve the problem.
- Use `MessagesModelHook` for simple message-list changes before or after model calls.
- Use `ModelHook` when logic depends on `OverAllState`, custom state keys, or cross-call coordination.
- Use `AgentHook` when behavior belongs to overall agent start or finish.
- Use `ModelInterceptor` when wrapping or rewriting the model request or response.
- Use `ToolInterceptor` when wrapping, retrying, caching, monitoring, or rewriting tool execution.
3. Prefer built-ins before custom code.
Reach for the official implementations first: `SummarizationHook`, `HumanInTheLoopHook`, `ModelCallLimitHook`, `PIIDetectionHook`, `ToolRetryInterceptor`, `TodoListInterceptor`, `ToolSelectionInterceptor`, `ToolEmulatorInterceptor`, and `ContextEditingInterceptor`.
4. Keep each hook or interceptor single-purpose.
Compose several small units instead of one large class that does logging, moderation, retries, and prompt rewriting together.
5. Respect ordering rules.
`before*` hooks run in registration order, `after*` hooks run in reverse order, and interceptors wrap each other like nested middleware. Put broad outer concerns first and narrow inner concerns later.
## Selection Rules
- Need pause/resume, approval, or agent lifecycle control: use a `Hook`.
- Need request/response interception without resumable control flow: use an `Interceptor`.
- Need only message trimming, message filtering, or system-message injection: prefer `MessagesModelHook`.
- Need full state access, counters, shared state, or custom state updates: use `ModelHook`.
- Need behavior only at agent start/end: use `AgentHook`.
- Need dynamic tools, moderation, model-side logging, or response rewriting: use `ModelInterceptor`.
- Need tool retry, caching, failure wrapping, or per-tool telemetry: use `ToolInterceptor`.
## Implementation Defaults
- Prefer `MessagesModelHook` over `ModelHook` for message-only work. It is the recommended and simpler API.
- Use `RunnableConfig.context()` for transient counters and metrics shared within one agent run.
- Namespace custom context keys defensively, for example with double underscores such as `__model_call_count__`.
- Add a saver or checkpointer when using `HumanInTheLoopHook`; the official guide calls out that interrupted execution needs persisted state.
- Keep `ReactAgent` assembly in the service layer, not controllers, to stay aligned with this repository's Spring Boot structure.
- Read [references/hooks-interceptors.md](./references/hooks-interceptors.md) before inventing a custom class; an official built-in often already covers the need.
## Minimal Assembly Pattern
```java
ReactAgent agent = ReactAgent.builder()
.name("my_agent")
.model(chatModel)
.tools(tools)
.hooks(summarizationHook, modelCallLimitHook)
.interceptors(toolRetryInterceptor, contextEditingInterceptor)
.build();
```
## References
- Read [references/hooks-interceptors.md](./references/hooks-interceptors.md) for the extracted tutorial guidance on built-ins, custom extension points, execution order, and example patterns.
interface:
display_name: "Spring AI Alibaba Hooks & Interceptors"
short_description: "Build hooks and interceptor extensions"
default_prompt: "Use $spring-ai-alibaba-hooks-interceptors to build or explain Spring AI Alibaba hooks and interceptors in this Java project using the official tutorial patterns."
# Spring AI Alibaba Hooks And Interceptors Reference
This reference is distilled from the official Spring AI Alibaba Hooks and Interceptors tutorial and its linked example source. Use it when implementing, reviewing, or explaining customization points around `ReactAgent`.
## Sources
- Tutorial: https://java2ai.com/docs/frameworks/agent-framework/tutorials/hooks/
- Example code: https://github.com/alibaba/spring-ai-alibaba/blob/main/examples/documentation/src/main/java/com/alibaba/cloud/ai/examples/documentation/framework/tutorials/HooksExample.java
The tutorial page was last updated on January 19, 2026 according to the site footer.
## Mental Model
- Hooks expose agent lifecycle checkpoints around agent execution and model calls.
- Interceptors wrap model or tool calls like middleware.
- Use hooks when you need lifecycle awareness or interruptible control flow.
- Use interceptors when you need request/response wrapping, mutation, logging, retry, fallback, or caching.
## Built-In Implementations
### Built-in hooks
- `SummarizationHook`
Use for long-running conversations approaching token limits.
Key settings: `model`, `maxTokensBeforeSummary`, `messagesToKeep`.
- `HumanInTheLoopHook`
Use when selected tools require approval or editing before execution.
Important: pair it with a saver or checkpointer such as `RedisSaver` so paused execution can resume.
- `ModelCallLimitHook`
Use to cap model-call loops and enforce budget limits.
- `PIIDetectionHook`
Use to detect or redact sensitive user data in input or output.
Common settings include `piiType`, `strategy`, and `applyToInput`.
### Built-in interceptors
- `ToolRetryInterceptor`
Use to retry failed tool calls with configurable retry count and failure behavior.
- `TodoListInterceptor`
Use to force a planning step before tool execution for multi-step tasks.
- `ToolSelectionInterceptor`
Use when multiple tools overlap and an LLM should choose the best one for the current request.
- `ToolEmulatorInterceptor`
Use to simulate tool outputs with an LLM during demos, testing, or low-risk development.
- `ContextEditingInterceptor`
Use to trim, inject, or rewrite context before it reaches the model.
## Custom Extension Points
### `MessagesModelHook`
Use this first for message-only transformations. It works directly on `List<Message>` and returns `AgentCommand`, which is simpler than editing `OverAllState`.
Typical fit:
- message trimming
- system message injection
- message filtering
- lightweight summary or context shaping
Useful details:
- `UpdatePolicy.REPLACE` replaces the full message list
- `UpdatePolicy.APPEND` appends new messages
- `canJumpTo()` plus `JumpTo.end` can short-circuit execution before the model call
### `ModelHook`
Use when hook logic must read or update more than just messages.
Typical fit:
- stateful counters
- custom status flags
- global decision logic based on `OverAllState`
- coordinated before/after behavior around the model call
Useful details:
- methods typically return `CompletableFuture<Map<String, Object>>`
- use `ReplaceAllWith` when replacing message collections
- use `RemoveByHash` carefully when deleting messages while preserving message order
### `AgentHook`
Use for logic that belongs to the entire agent run rather than a single model call.
Typical fit:
- start and stop telemetry
- resource setup and cleanup
- total execution timing
- run-level initialization
Common hook positions:
- `HookPosition.BEFORE_AGENT`
- `HookPosition.AFTER_AGENT`
### `ModelInterceptor`
Use to wrap the model request/response path.
Typical fit:
- logging and performance timing
- moderation on input and output
- request rewriting
- dynamic tool injection or tool filtering
Useful details:
- `ModelRequest.builder(request)` can clone and mutate the incoming request
- the tutorial explicitly shows dynamic tool control through `dynamicToolCallbacks(...)` and `tools(...)`
### `ToolInterceptor`
Use to wrap actual tool execution.
Typical fit:
- retries
- failure translation
- caching
- per-tool performance metrics
- graceful fallback messages
Useful details:
- the call boundary is `handler.call(request)`
- interceptors can return a synthetic `ToolCallResponse` instead of propagating an exception
## Decision Guide
- Need resumable human approval: `HumanInTheLoopHook`
- Need automatic history compaction: `SummarizationHook`
- Need loop limits or stateful guards: `ModelCallLimitHook` or custom `ModelHook`
- Need request/response moderation: `ModelInterceptor`
- Need dynamic tool availability: `ModelInterceptor`
- Need tool retries or tool cache: `ToolInterceptor`
- Need to modify only messages: `MessagesModelHook`
- Need run-level timing or setup/teardown: `AgentHook`
## Execution Order
When several extensions are registered:
1. `beforeAgent` hooks run in registration order.
2. `beforeModel` hooks run in registration order.
3. model interceptors wrap the model call in registration order.
4. `afterModel` hooks run in reverse order.
5. tool interceptors wrap tool execution in registration order when tools are called.
6. `afterAgent` hooks run in reverse order.
Practical consequence:
- put broad, outer concerns first
- keep related before/after state in the same hook class
- avoid relying on accidental ordering between unrelated concerns
## RunnableConfig Context
`RunnableConfig.context()` is the shared per-run scratchpad shown in the tutorial for:
- counting model calls
- storing start timestamps
- accumulating total duration
- enforcing limits across several hook invocations
Guidelines:
- treat it as per-run transient state, not durable storage
- choose stable key names such as `__total_model_time__`
- manage types carefully because stored values are generic objects
## Example Patterns To Reuse
- content moderation with `ModelInterceptor`
- model and tool performance timing with interceptors
- tool result caching with `ToolInterceptor`
- early exit from `MessagesModelHook` with `JumpTo.end`
- call counting and limit enforcement with `ModelHook` plus `RunnableConfig.context()`
## Minimal Builder Example
```java
ReactAgent agent = ReactAgent.builder()
.name("resilient_agent")
.model(chatModel)
.tools(searchTool, databaseTool)
.hooks(ModelCallLimitHook.builder().runLimit(5).build())
.interceptors(ToolRetryInterceptor.builder().maxRetries(2).build())
.build();
```
package com.infoepoch.pms.agent.aiMode; package com.infoepoch.pms.agent.aiMode;
import com.alibaba.cloud.ai.graph.agent.ReactAgent; import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.infoepoch.pms.agent.observability.ai.ObservedReactAgentFactory;
import org.springframework.ai.chat.model.ChatModel; import org.springframework.ai.chat.model.ChatModel;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -14,11 +15,12 @@ import org.springframework.context.annotation.Configuration; ...@@ -14,11 +15,12 @@ import org.springframework.context.annotation.Configuration;
public class CareAgentConfiguration { public class CareAgentConfiguration {
@Bean @Bean
ReactAgent CareAgent(@Qualifier("SiliconFlowChatModel") ChatModel chatModel) { ReactAgent CareAgent(@Qualifier("SiliconFlowChatModel") ChatModel chatModel,
return ReactAgent.builder() ObservedReactAgentFactory observedReactAgentFactory) {
.name("careAgent") return observedReactAgentFactory.create(
.model(chatModel) "careAgent",
.systemPrompt(""" chatModel,
"""
你是精准关爱智能体。 你是精准关爱智能体。
当用户需要推荐活动时,结合调用方提供的真实用户、活动和偏好信息给出推荐和简短理由。 当用户需要推荐活动时,结合调用方提供的真实用户、活动和偏好信息给出推荐和简短理由。
...@@ -33,7 +35,7 @@ public class CareAgentConfiguration { ...@@ -33,7 +35,7 @@ public class CareAgentConfiguration {
- 保持自然、友好、简洁 - 保持自然、友好、简洁
- 绝不能提到分页、页码、后台处理中、接口调用、候选集、模型或任何技术实现细节 - 绝不能提到分页、页码、后台处理中、接口调用、候选集、模型或任何技术实现细节
- 当结果不足时,只输出实际合适的结果,并自然说明 - 当结果不足时,只输出实际合适的结果,并自然说明
""") """
.build(); );
} }
} }
package com.infoepoch.pms.agent.config;
import com.infoepoch.pms.agent.observability.ai.AiModelInvoker;
import com.infoepoch.pms.agent.observability.ai.AiObservationFilter;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.observability.ai.ObservedReactAgentFactory;
import com.infoepoch.pms.agent.observability.ai.ObservedRestClientFactory;
import com.infoepoch.pms.agent.observability.ai.hook.AiAgentLifecycleObservationHook;
import com.infoepoch.pms.agent.observability.ai.hook.AiModelMessageObservationHook;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* AI 观测基础设施配置。
*/
@Configuration
public class AiObservabilityConfiguration {
@Bean
AiObservationSupport aiObservationSupport(AiObservabilityProperties properties,
@Value("${spring.application.name:pms-agent}") String serviceName) {
return new AiObservationSupport(properties, serviceName);
}
@Bean
AiObservationLogger aiObservationLogger(AiObservationSupport support) {
return new AiObservationLogger(support);
}
@Bean
AiModelInvoker aiModelInvoker(AiObservationLogger logger, AiObservationSupport support) {
return new AiModelInvoker(logger, support);
}
@Bean
ObservedRestClientFactory observedRestClientFactory(AiObservationSupport support) {
return new ObservedRestClientFactory(support);
}
@Bean
AiAgentLifecycleObservationHook aiAgentLifecycleObservationHook(AiObservationLogger logger,
AiObservationSupport support) {
return new AiAgentLifecycleObservationHook(logger, support);
}
@Bean
AiModelMessageObservationHook aiModelMessageObservationHook(AiObservationLogger logger,
AiObservationSupport support) {
return new AiModelMessageObservationHook(logger, support);
}
@Bean
ObservedReactAgentFactory observedReactAgentFactory(AiAgentLifecycleObservationHook agentHook,
AiModelMessageObservationHook modelHook) {
return new ObservedReactAgentFactory(agentHook, modelHook);
}
@Bean
FilterRegistrationBean<AiObservationFilter> aiObservationFilterRegistration(AiObservationSupport support) {
FilterRegistrationBean<AiObservationFilter> registrationBean = new FilterRegistrationBean<>();
registrationBean.setFilter(new AiObservationFilter(support));
registrationBean.addUrlPatterns("/api/agent/*");
registrationBean.addUrlPatterns("/api/chat/*");
registrationBean.addUrlPatterns("/api/admin/*");
registrationBean.setOrder(Integer.MIN_VALUE);
return registrationBean;
}
}
...@@ -36,15 +36,15 @@ public class AgentChatController { ...@@ -36,15 +36,15 @@ public class AgentChatController {
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"流式请求开始", "care_stream_start",
"收到请求," + CareTraceLogSupport.safeMessageSummary(message) "request received, " + CareTraceLogSupport.safeMessageSummary(message)
)); ));
if (!StringUtils.hasText(sessionId)) { if (!StringUtils.hasText(sessionId)) {
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"参数校验失败", "care_stream_validation_failed",
"sessionId为空" "sessionId is blank"
)); ));
return Flux.just(buildEvent("error", "sessionId不能为空")); return Flux.just(buildEvent("error", "sessionId不能为空"));
} }
...@@ -54,14 +54,14 @@ public class AgentChatController { ...@@ -54,14 +54,14 @@ public class AgentChatController {
.doOnComplete(() -> LogHelper.info(this, CareTraceLogSupport.format( .doOnComplete(() -> LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"流式请求完成", "care_stream_complete",
"SSE输出完成" "sse completed"
))) )))
.doOnError(error -> LogHelper.error(this, CareTraceLogSupport.format( .doOnError(error -> LogHelper.error(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"流式请求异常", "care_stream_error",
"SSE输出失败,异常=" + CareTraceLogSupport.safeText(resolveErrorMessage(error)) "sse failed, error=" + CareTraceLogSupport.safeText(resolveErrorMessage(error))
), error)) ), error))
.onErrorResume(error -> Flux.just(buildEvent("error", resolveErrorMessage(error)))); .onErrorResume(error -> Flux.just(buildEvent("error", resolveErrorMessage(error))));
} }
...@@ -83,4 +83,3 @@ public class AgentChatController { ...@@ -83,4 +83,3 @@ public class AgentChatController {
return StringUtils.hasText(error.getMessage()) ? error.getMessage() : "careAgent流式调用失败"; return StringUtils.hasText(error.getMessage()) ? error.getMessage() : "careAgent流式调用失败";
} }
} }
package com.infoepoch.pms.agent.domain.care.log; package com.infoepoch.pms.agent.domain.care.log;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import org.springframework.ai.chat.messages.AbstractMessage;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.StringJoiner; import java.util.StringJoiner;
...@@ -14,6 +19,9 @@ import java.util.UUID; ...@@ -14,6 +19,9 @@ import java.util.UUID;
public final class CareTraceLogSupport { public final class CareTraceLogSupport {
private static final int MAX_PREVIEW_LENGTH = 60; private static final int MAX_PREVIEW_LENGTH = 60;
public static final String METADATA_TRACE_ID = "traceId";
public static final String METADATA_SESSION_ID = "sessionId";
public static final String METADATA_SCENE = "scene";
private CareTraceLogSupport() { private CareTraceLogSupport() {
} }
...@@ -75,6 +83,92 @@ public final class CareTraceLogSupport { ...@@ -75,6 +83,92 @@ public final class CareTraceLogSupport {
); );
} }
/**
* 构造 care agent 调用使用的统一运行时配置。
*/
public static RunnableConfig buildAgentRunnableConfig(String traceId, String sessionId, String scene) {
return RunnableConfig.builder()
.threadId(sessionId)
.addMetadata(METADATA_TRACE_ID, traceId)
.addMetadata(METADATA_SESSION_ID, sessionId)
.addMetadata(METADATA_SCENE, scene)
.build();
}
/**
* 根据 RunnableConfig 读取 traceId。
*/
public static String resolveTraceId(RunnableConfig runnableConfig) {
return metadataValue(runnableConfig, METADATA_TRACE_ID);
}
/**
* 根据 RunnableConfig 读取 sessionId,缺失时回退到 threadId。
*/
public static String resolveSessionId(RunnableConfig runnableConfig) {
String sessionId = metadataValue(runnableConfig, METADATA_SESSION_ID);
if (StringUtils.hasText(sessionId)) {
return sessionId;
}
return runnableConfig == null ? null : runnableConfig.threadId().orElse(null);
}
/**
* 根据 RunnableConfig 读取业务场景。
*/
public static String resolveScene(RunnableConfig runnableConfig) {
return metadataValue(runnableConfig, METADATA_SCENE);
}
/**
* 将真实消息列表渲染成便于排查的日志文本。
*/
public static String formatModelMessages(String traceId, String sessionId, String stage, List<Message> messages) {
String lineSeparator = System.lineSeparator();
StringBuilder builder = new StringBuilder("[链路ID=%s] [会话ID=%s] [阶段=%s] 模型请求消息:%s".formatted(
safeValue(traceId),
safeValue(sessionId),
safeValue(stage),
lineSeparator
));
if (CollectionUtils.isEmpty(messages)) {
return builder.append("(空)").toString();
}
for (int index = 0; index < messages.size(); index++) {
Message message = messages.get(index);
builder.append('[')
.append(index + 1)
.append("] role=")
.append(resolveMessageRole(message))
.append(lineSeparator)
.append(resolveMessageText(message));
if (index < messages.size() - 1) {
builder.append(lineSeparator);
}
}
return builder.toString();
}
/**
* 提取消息列表中的最后一条 assistant 文本。
*/
public static String findLastAssistantText(List<Message> messages) {
if (CollectionUtils.isEmpty(messages)) {
return "";
}
for (int index = messages.size() - 1; index >= 0; index--) {
Message message = messages.get(index);
if (message == null || message.getMessageType() != MessageType.ASSISTANT) {
continue;
}
String text = resolveMessageText(message);
if (text != null) {
return text;
}
}
return "";
}
/** /**
* 返回 prompt 的原始长度,便于判断上下文体积。 * 返回 prompt 的原始长度,便于判断上下文体积。
*/ */
...@@ -139,6 +233,38 @@ public final class CareTraceLogSupport { ...@@ -139,6 +233,38 @@ public final class CareTraceLogSupport {
return abbreviate(normalizeText(value.toString())); return abbreviate(normalizeText(value.toString()));
} }
/**
* 读取 RunnableConfig metadata 中的文本值。
*/
private static String metadataValue(RunnableConfig runnableConfig, String key) {
if (runnableConfig == null || !StringUtils.hasText(key)) {
return null;
}
return runnableConfig.metadata(key)
.map(Object::toString)
.orElse(null);
}
/**
* 提取消息角色。
*/
private static String resolveMessageRole(Message message) {
if (message == null || message.getMessageType() == null) {
return "unknown";
}
return message.getMessageType().getValue();
}
/**
* 提取消息文本。
*/
private static String resolveMessageText(Message message) {
if (message instanceof AbstractMessage abstractMessage) {
return abstractMessage.getText() == null ? "" : abstractMessage.getText();
}
return message == null ? "" : message.toString();
}
/** /**
* 归一化文本中的连续空白。 * 归一化文本中的连续空白。
*/ */
......
package com.infoepoch.pms.agent.domain.care.log.hook;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.hook.AgentHook;
import com.alibaba.cloud.ai.graph.agent.hook.HookPosition;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* care agent 生命周期日志 hook。
*/
public class CareAgentLifecycleLogHook extends AgentHook {
private static final String HOOK_NAME = "careAgentLifecycleLogHook";
@Override
public String getName() {
return HOOK_NAME;
}
@Override
public int getOrder() {
return 0;
}
@Override
public HookPosition[] getHookPositions() {
return new HookPosition[]{HookPosition.BEFORE_AGENT, HookPosition.AFTER_AGENT};
}
@Override
public CompletableFuture<Map<String, Object>> beforeAgent(OverAllState state, RunnableConfig runnableConfig) {
LogHelper.info(this, CareTraceLogSupport.format(
CareTraceLogSupport.resolveTraceId(runnableConfig),
CareTraceLogSupport.resolveSessionId(runnableConfig),
"Agent开始",
"场景=%s,agent=%s".formatted(
CareTraceLogSupport.resolveScene(runnableConfig),
getAgentName()
)
));
return CompletableFuture.completedFuture(Collections.emptyMap());
}
@Override
public CompletableFuture<Map<String, Object>> afterAgent(OverAllState state, RunnableConfig runnableConfig) {
LogHelper.info(this, CareTraceLogSupport.format(
CareTraceLogSupport.resolveTraceId(runnableConfig),
CareTraceLogSupport.resolveSessionId(runnableConfig),
"Agent完成",
"场景=%s,agent=%s".formatted(
CareTraceLogSupport.resolveScene(runnableConfig),
getAgentName()
)
));
return CompletableFuture.completedFuture(Collections.emptyMap());
}
}
package com.infoepoch.pms.agent.domain.care.log.hook;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.hook.HookPosition;
import com.alibaba.cloud.ai.graph.agent.hook.messages.AgentCommand;
import com.alibaba.cloud.ai.graph.agent.hook.messages.MessagesModelHook;
import com.alibaba.cloud.ai.graph.agent.hook.messages.UpdatePolicy;
import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import org.springframework.ai.chat.messages.Message;
import java.util.List;
/**
* care agent 模型请求与响应日志 hook。
*/
public class CareModelMessageLogHook extends MessagesModelHook {
private static final String HOOK_NAME = "careModelMessageLogHook";
@Override
public String getName() {
return HOOK_NAME;
}
@Override
public int getOrder() {
return 100;
}
@Override
public HookPosition[] getHookPositions() {
return new HookPosition[]{HookPosition.BEFORE_MODEL, HookPosition.AFTER_MODEL};
}
@Override
public AgentCommand beforeModel(List<Message> messages, RunnableConfig runnableConfig) {
String scene = CareTraceLogSupport.resolveScene(runnableConfig);
LogHelper.info(this, CareTraceLogSupport.formatModelMessages(
CareTraceLogSupport.resolveTraceId(runnableConfig),
CareTraceLogSupport.resolveSessionId(runnableConfig),
scene + "模型请求",
messages
));
return new AgentCommand(messages, UpdatePolicy.REPLACE);
}
@Override
public AgentCommand afterModel(List<Message> messages, RunnableConfig runnableConfig) {
String scene = CareTraceLogSupport.resolveScene(runnableConfig);
LogHelper.info(this, CareTraceLogSupport.formatModelResponse(
CareTraceLogSupport.resolveTraceId(runnableConfig),
CareTraceLogSupport.resolveSessionId(runnableConfig),
scene + "模型完整返回",
CareTraceLogSupport.findLastAssistantText(messages)
));
return new AgentCommand(messages, UpdatePolicy.REPLACE);
}
}
...@@ -3,6 +3,7 @@ package com.infoepoch.pms.agent.domain.care.orchestrator; ...@@ -3,6 +3,7 @@ package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.agent.ReactAgent; import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.streaming.OutputType; import com.alibaba.cloud.ai.graph.streaming.OutputType;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput; import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.infoepoch.pms.agent.common.LogHelper; import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport; import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary; import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
...@@ -11,6 +12,9 @@ import com.infoepoch.pms.agent.domain.care.model.CareQuery; ...@@ -11,6 +12,9 @@ import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary; import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService; import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler; import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.observability.ai.AiComponentType;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.CareAgentProperties; import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.CurrentUserProfileTool; import com.infoepoch.pms.agent.tool.union.care.CurrentUserProfileTool;
import com.infoepoch.pms.agent.tool.union.care.EligibleActivitiesTool; import com.infoepoch.pms.agent.tool.union.care.EligibleActivitiesTool;
...@@ -37,19 +41,22 @@ public class ActivityRecommendationExecutor { ...@@ -37,19 +41,22 @@ public class ActivityRecommendationExecutor {
private final CareStreamingResponseAssembler responseAssembler; private final CareStreamingResponseAssembler responseAssembler;
private final CareAgentProperties properties; private final CareAgentProperties properties;
private final ReactAgent careAgent; private final ReactAgent careAgent;
private final AiObservationSupport observationSupport;
public ActivityRecommendationExecutor(CurrentUserProfileTool currentUserProfileTool, public ActivityRecommendationExecutor(CurrentUserProfileTool currentUserProfileTool,
EligibleActivitiesTool eligibleActivitiesTool, EligibleActivitiesTool eligibleActivitiesTool,
CareConversationStateService conversationStateService, CareConversationStateService conversationStateService,
CareStreamingResponseAssembler responseAssembler, CareStreamingResponseAssembler responseAssembler,
CareAgentProperties properties, CareAgentProperties properties,
@Qualifier("CareAgent") ReactAgent careAgent) { @Qualifier("CareAgent") ReactAgent careAgent,
AiObservationSupport observationSupport) {
this.currentUserProfileTool = currentUserProfileTool; this.currentUserProfileTool = currentUserProfileTool;
this.eligibleActivitiesTool = eligibleActivitiesTool; this.eligibleActivitiesTool = eligibleActivitiesTool;
this.conversationStateService = conversationStateService; this.conversationStateService = conversationStateService;
this.responseAssembler = responseAssembler; this.responseAssembler = responseAssembler;
this.properties = properties; this.properties = properties;
this.careAgent = careAgent; this.careAgent = careAgent;
this.observationSupport = observationSupport;
} }
/** /**
...@@ -116,28 +123,12 @@ public class ActivityRecommendationExecutor { ...@@ -116,28 +123,12 @@ public class ActivityRecommendationExecutor {
} }
String prompt = buildPrompt(currentUser, query, promptCandidates); String prompt = buildPrompt(currentUser, query, promptCandidates);
LogHelper.info(this, CareTraceLogSupport.format( RunnableConfig runnableConfig = buildRunnableConfig(traceId, sessionId, "活动推荐");
traceId,
sessionId,
"模型调用开始",
"场景=活动推荐,候选数量=%s,prompt长度=%s,提示词摘要=%s".formatted(
promptCandidates.size(),
CareTraceLogSupport.promptLength(prompt),
CareTraceLogSupport.safeMessageSummary(prompt)
)
));
LogHelper.info(this, CareTraceLogSupport.formatModelPrompt(
traceId,
sessionId,
"活动推荐模型请求",
prompt
));
try { try {
StringBuilder responseBuffer = new StringBuilder();
AtomicInteger chunkIndex = new AtomicInteger(0); AtomicInteger chunkIndex = new AtomicInteger(0);
return Flux.concat( return Flux.concat(
Flux.just(responseAssembler.buildActivityIntro(query)), Flux.just(responseAssembler.buildActivityIntro(query)),
careAgent.stream(prompt) careAgent.stream(prompt, runnableConfig)
.filter(StreamingOutput.class::isInstance) .filter(StreamingOutput.class::isInstance)
.map(StreamingOutput.class::cast) .map(StreamingOutput.class::cast)
.filter(output -> output.getOutputType() == OutputType.AGENT_MODEL_STREAMING) .filter(output -> output.getOutputType() == OutputType.AGENT_MODEL_STREAMING)
...@@ -146,20 +137,11 @@ public class ActivityRecommendationExecutor { ...@@ -146,20 +137,11 @@ public class ActivityRecommendationExecutor {
.map(AssistantMessage.class::cast) .map(AssistantMessage.class::cast)
.map(AbstractMessage::getText) .map(AbstractMessage::getText)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.doOnNext(chunk -> { .doOnNext(chunk -> chunkIndex.incrementAndGet()),
chunkIndex.incrementAndGet();
responseBuffer.append(chunk);
}),
Flux.just(responseAssembler.buildActivityClosing(deliveredCount, query.requestedCount())) Flux.just(responseAssembler.buildActivityClosing(deliveredCount, query.requestedCount()))
) )
.doOnComplete(() -> { .doOnComplete(() -> {
conversationStateService.save(resolveNextState(sessionId, query, state, displayedActivityIds)); conversationStateService.save(resolveNextState(sessionId, query, state, displayedActivityIds));
LogHelper.info(this, CareTraceLogSupport.formatModelResponse(
traceId,
sessionId,
"活动推荐模型完整返回",
responseBuffer.toString()
));
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
...@@ -189,6 +171,23 @@ public class ActivityRecommendationExecutor { ...@@ -189,6 +171,23 @@ public class ActivityRecommendationExecutor {
} }
} }
private RunnableConfig buildRunnableConfig(String traceId, String sessionId, String scene) {
AiObservationContext currentContext = observationSupport.currentContext();
String requestId = StringUtils.hasText(currentContext.requestId()) ? currentContext.requestId() : traceId;
String flowId = StringUtils.hasText(currentContext.flowId()) ? currentContext.flowId() : requestId;
AiObservationContext context = currentContext.toBuilder()
.traceId(traceId)
.requestId(requestId)
.sessionId(sessionId)
.flowId(flowId)
.scene(scene)
.componentType(AiComponentType.AGENT)
.componentName("careAgent")
.agentName("careAgent")
.build();
return observationSupport.buildRunnableConfig(sessionId, context);
}
/** /**
* 生成下一轮会话状态,记录已下发活动。 * 生成下一轮会话状态,记录已下发活动。
*/ */
...@@ -198,10 +197,30 @@ public class ActivityRecommendationExecutor { ...@@ -198,10 +197,30 @@ public class ActivityRecommendationExecutor {
Set<String> deliveredIds) { Set<String> deliveredIds) {
CareConversationState currentState = state == null CareConversationState currentState = state == null
? CareConversationState.initial(sessionId, query) ? CareConversationState.initial(sessionId, query)
: state.rewriteFromQuery(query); : rebuildStateForActivityRecommendation(state, query);
return currentState.advance(1, deliveredIds); return currentState.advance(1, deliveredIds);
} }
/**
* 为活动推荐重建下一轮状态,同任务场景下保留历史已下发活动。
*/
private CareConversationState rebuildStateForActivityRecommendation(CareConversationState state, CareQuery query) {
if (state.taskType() != query.taskType()) {
return CareConversationState.initial(state.sessionId(), query);
}
return new CareConversationState(
state.sessionId(),
query.taskType(),
query.requestedCount(),
query.activityIntent(),
query.preferenceSummary(),
query.searchConditions(),
1,
state.deliveredIds(),
null
);
}
/** /**
* 提取本轮实际视为已交付的活动 ID。 * 提取本轮实际视为已交付的活动 ID。
*/ */
......
...@@ -3,10 +3,14 @@ package com.infoepoch.pms.agent.domain.care.orchestrator; ...@@ -3,10 +3,14 @@ package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.agent.ReactAgent; import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.streaming.OutputType; import com.alibaba.cloud.ai.graph.streaming.OutputType;
import com.alibaba.cloud.ai.graph.streaming.StreamingOutput; import com.alibaba.cloud.ai.graph.streaming.StreamingOutput;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.infoepoch.pms.agent.common.LogHelper; import com.infoepoch.pms.agent.common.LogHelper;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport; import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareQuery; import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler; import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.observability.ai.AiComponentType;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import org.springframework.ai.chat.messages.AbstractMessage; import org.springframework.ai.chat.messages.AbstractMessage;
import org.springframework.ai.chat.messages.AssistantMessage; import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
...@@ -24,11 +28,14 @@ public class GeneralConversationExecutor { ...@@ -24,11 +28,14 @@ public class GeneralConversationExecutor {
private final CareStreamingResponseAssembler responseAssembler; private final CareStreamingResponseAssembler responseAssembler;
private final ReactAgent careAgent; private final ReactAgent careAgent;
private final AiObservationSupport observationSupport;
public GeneralConversationExecutor(CareStreamingResponseAssembler responseAssembler, public GeneralConversationExecutor(CareStreamingResponseAssembler responseAssembler,
@Qualifier("CareAgent") ReactAgent careAgent) { @Qualifier("CareAgent") ReactAgent careAgent,
AiObservationSupport observationSupport) {
this.responseAssembler = responseAssembler; this.responseAssembler = responseAssembler;
this.careAgent = careAgent; this.careAgent = careAgent;
this.observationSupport = observationSupport;
} }
/** /**
...@@ -42,25 +49,10 @@ public class GeneralConversationExecutor { ...@@ -42,25 +49,10 @@ public class GeneralConversationExecutor {
"通用对话开始", "通用对话开始",
"用户输入=%s".formatted(CareTraceLogSupport.safeMessageSummary(query.originalMessage())) "用户输入=%s".formatted(CareTraceLogSupport.safeMessageSummary(query.originalMessage()))
)); ));
LogHelper.info(this, CareTraceLogSupport.format( RunnableConfig runnableConfig = buildRunnableConfig(traceId, sessionId, "通用对话");
traceId,
sessionId,
"模型调用开始",
"场景=通用对话,prompt长度=%s,提示词摘要=%s".formatted(
CareTraceLogSupport.promptLength(prompt),
CareTraceLogSupport.safeMessageSummary(prompt)
)
));
LogHelper.info(this, CareTraceLogSupport.formatModelPrompt(
traceId,
sessionId,
"通用对话模型请求",
prompt
));
try { try {
StringBuilder responseBuffer = new StringBuilder();
AtomicInteger chunkIndex = new AtomicInteger(0); AtomicInteger chunkIndex = new AtomicInteger(0);
return careAgent.stream(prompt) return careAgent.stream(prompt, runnableConfig)
.filter(StreamingOutput.class::isInstance) .filter(StreamingOutput.class::isInstance)
.map(StreamingOutput.class::cast) .map(StreamingOutput.class::cast)
.filter(output -> output.getOutputType() == OutputType.AGENT_MODEL_STREAMING) .filter(output -> output.getOutputType() == OutputType.AGENT_MODEL_STREAMING)
...@@ -69,6 +61,7 @@ public class GeneralConversationExecutor { ...@@ -69,6 +61,7 @@ public class GeneralConversationExecutor {
.map(AssistantMessage.class::cast) .map(AssistantMessage.class::cast)
.map(AbstractMessage::getText) .map(AbstractMessage::getText)
.filter(StringUtils::hasText) .filter(StringUtils::hasText)
.doOnNext(chunk -> chunkIndex.incrementAndGet())
.switchIfEmpty(Flux.defer(() -> { .switchIfEmpty(Flux.defer(() -> {
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
...@@ -79,12 +72,6 @@ public class GeneralConversationExecutor { ...@@ -79,12 +72,6 @@ public class GeneralConversationExecutor {
return Flux.just(responseAssembler.buildGeneralFallbackReply()); return Flux.just(responseAssembler.buildGeneralFallbackReply());
})) }))
.doOnComplete(() -> { .doOnComplete(() -> {
LogHelper.info(this, CareTraceLogSupport.formatModelResponse(
traceId,
sessionId,
"通用对话模型完整返回",
responseBuffer.toString()
));
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
...@@ -109,5 +96,21 @@ public class GeneralConversationExecutor { ...@@ -109,5 +96,21 @@ public class GeneralConversationExecutor {
return Flux.error(new IllegalArgumentException("调用模型失败")); return Flux.error(new IllegalArgumentException("调用模型失败"));
} }
} }
}
private RunnableConfig buildRunnableConfig(String traceId, String sessionId, String scene) {
AiObservationContext currentContext = observationSupport.currentContext();
String requestId = StringUtils.hasText(currentContext.requestId()) ? currentContext.requestId() : traceId;
String flowId = StringUtils.hasText(currentContext.flowId()) ? currentContext.flowId() : requestId;
AiObservationContext context = currentContext.toBuilder()
.traceId(traceId)
.requestId(requestId)
.sessionId(sessionId)
.flowId(flowId)
.scene(scene)
.componentType(AiComponentType.AGENT)
.componentName("careAgent")
.agentName("careAgent")
.build();
return observationSupport.buildRunnableConfig(sessionId, context);
}
}
...@@ -8,6 +8,10 @@ import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport; ...@@ -8,6 +8,10 @@ import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState; import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery; import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType; import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.observability.ai.AiComponentType;
import com.infoepoch.pms.agent.observability.ai.AiModelInvoker;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.CareAgentProperties; import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider; import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import org.springframework.ai.chat.model.ChatModel; import org.springframework.ai.chat.model.ChatModel;
...@@ -41,13 +45,19 @@ public class CareQueryUnderstandingService { ...@@ -41,13 +45,19 @@ public class CareQueryUnderstandingService {
private final ChatModel chatModel; private final ChatModel chatModel;
private final CareAgentProperties properties; private final CareAgentProperties properties;
private final CareBusinessDataProvider careBusinessDataProvider; private final CareBusinessDataProvider careBusinessDataProvider;
private final AiModelInvoker aiModelInvoker;
private final AiObservationSupport observationSupport;
public CareQueryUnderstandingService(@Qualifier("SiliconFlowChatModel") ChatModel chatModel, public CareQueryUnderstandingService(@Qualifier("SiliconFlowChatModel") ChatModel chatModel,
CareAgentProperties properties, CareAgentProperties properties,
CareBusinessDataProvider careBusinessDataProvider) { CareBusinessDataProvider careBusinessDataProvider,
AiModelInvoker aiModelInvoker,
AiObservationSupport observationSupport) {
this.chatModel = chatModel; this.chatModel = chatModel;
this.properties = properties; this.properties = properties;
this.careBusinessDataProvider = careBusinessDataProvider; this.careBusinessDataProvider = careBusinessDataProvider;
this.aiModelInvoker = aiModelInvoker;
this.observationSupport = observationSupport;
} }
/** /**
...@@ -270,27 +280,13 @@ public class CareQueryUnderstandingService { ...@@ -270,27 +280,13 @@ public class CareQueryUnderstandingService {
用户输入:%s 用户输入:%s
""".formatted(message); """.formatted(message);
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"模型调用开始",
"场景=任务分类,prompt长度=%s".formatted(CareTraceLogSupport.promptLength(classificationPrompt))
));
LogHelper.info(this, CareTraceLogSupport.formatModelPrompt(
traceId,
sessionId,
"任务分类模型请求",
classificationPrompt
));
try { try {
String modelDecision = chatModel.call(classificationPrompt); String modelDecision = aiModelInvoker.call(
buildModelContext(traceId, sessionId, "任务分类", "care.intentClassifier"),
classificationPrompt,
chatModel
);
String normalizedDecision = modelDecision == null ? "" : modelDecision.trim(); String normalizedDecision = modelDecision == null ? "" : modelDecision.trim();
LogHelper.info(this, CareTraceLogSupport.formatModelResponse(
traceId,
sessionId,
"任务分类模型返回",
normalizedDecision
));
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
...@@ -304,12 +300,12 @@ public class CareQueryUnderstandingService { ...@@ -304,12 +300,12 @@ public class CareQueryUnderstandingService {
return CareTaskType.RECOMMEND_ACTIVITIES; return CareTaskType.RECOMMEND_ACTIVITIES;
} }
} catch (RuntimeException exception) { } catch (RuntimeException exception) {
LogHelper.error(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"任务分类模型异常", "任务分类模型降级",
"分类失败,降级为通用对话,异常=" + CareTraceLogSupport.safeText(exception.getMessage()) "分类失败,降级为通用对话"
), exception); ));
} }
return CareTaskType.GENERAL_CHAT; return CareTaskType.GENERAL_CHAT;
} }
...@@ -344,30 +340,12 @@ public class CareQueryUnderstandingService { ...@@ -344,30 +340,12 @@ public class CareQueryUnderstandingService {
"searchConditions": {} "searchConditions": {}
} }
""".formatted(taskType.name(), stateSummary, message); """.formatted(taskType.name(), stateSummary, message);
LogHelper.info(this, CareTraceLogSupport.format(
traceId,
sessionId,
"模型调用开始",
"场景=意图理解,任务类型=%s,会话摘要=%s,prompt长度=%s".formatted(
toChineseTaskType(taskType),
CareTraceLogSupport.safeText(stateSummary),
CareTraceLogSupport.promptLength(understandingPrompt)
)
));
LogHelper.info(this, CareTraceLogSupport.formatModelPrompt(
traceId,
sessionId,
"意图理解模型请求",
understandingPrompt
));
try { try {
String response = chatModel.call(understandingPrompt); String response = aiModelInvoker.call(
LogHelper.info(this, CareTraceLogSupport.formatModelResponse( buildModelContext(traceId, sessionId, "意图理解", "care.queryUnderstanding"),
traceId, understandingPrompt,
sessionId, chatModel
"意图理解模型返回", );
response
));
String sanitized = sanitizeJson(response); String sanitized = sanitizeJson(response);
if (!StringUtils.hasText(sanitized)) { if (!StringUtils.hasText(sanitized)) {
LogHelper.info(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
...@@ -384,16 +362,33 @@ public class CareQueryUnderstandingService { ...@@ -384,16 +362,33 @@ public class CareQueryUnderstandingService {
} }
return result; return result;
} catch (RuntimeException exception) { } catch (RuntimeException exception) {
LogHelper.error(this, CareTraceLogSupport.format( LogHelper.info(this, CareTraceLogSupport.format(
traceId, traceId,
sessionId, sessionId,
"意图识别模型异常", "意图识别模型降级",
"模型理解调用或解析失败,异常=" + CareTraceLogSupport.safeText(exception.getMessage()) "模型理解失败,回退保守结果"
), exception); ));
return fallbackUnderstanding(taskType, message, state); return fallbackUnderstanding(taskType, message, state);
} }
} }
private AiObservationContext buildModelContext(String traceId, String sessionId, String scene, String componentName) {
AiObservationContext currentContext = observationSupport.currentContext();
String requestId = StringUtils.hasText(currentContext.requestId()) ? currentContext.requestId() : traceId;
String flowId = StringUtils.hasText(currentContext.flowId()) ? currentContext.flowId() : requestId;
return currentContext.toBuilder()
.traceId(traceId)
.requestId(requestId)
.sessionId(sessionId)
.flowId(flowId)
.scene(scene)
.componentType(AiComponentType.MODEL)
.componentName(componentName)
.modelProvider("siliconflow")
.modelName(chatModel.getClass().getSimpleName())
.build();
}
/** /**
* 构造供模型参考的短会话摘要。 * 构造供模型参考的短会话摘要。
*/ */
......
package com.infoepoch.pms.agent.observability.ai;
/**
* AI 观测组件类型。
*/
public enum AiComponentType {
HTTP_INBOUND,
AGENT,
MODEL,
TOOL,
HTTP_OUTBOUND,
ORCHESTRATOR
}
package com.infoepoch.pms.agent.observability.ai;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.util.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 统一模型调用入口。
*/
public class AiModelInvoker {
private final AiObservationLogger logger;
private final AiObservationSupport support;
public AiModelInvoker(AiObservationLogger logger, AiObservationSupport support) {
this.logger = logger;
this.support = support;
}
public String call(AiObservationContext context, String prompt, ChatModel chatModel) {
String requestStage = AiReadableLogSupport.stageWithScene(context, "模型请求");
logger.info(
AiModelInvoker.class,
AiReadableLogSupport.formatModelPrompt(context, requestStage, prompt),
context,
buildRequestFields(prompt, context)
);
try {
String response = support.withObservationContext(context, () -> chatModel.call(prompt));
String responseStage = AiReadableLogSupport.stageWithScene(context, "模型完整返回");
logger.info(
AiModelInvoker.class,
AiReadableLogSupport.formatModelResponse(context, responseStage, response),
context,
buildResponseFields(response)
);
return response;
} catch (RuntimeException exception) {
String errorStage = AiReadableLogSupport.stageWithScene(context, "模型异常");
logger.error(
AiModelInvoker.class,
AiReadableLogSupport.format(
context,
errorStage,
"模型调用失败,异常=%s".formatted(resolveErrorMessage(exception))
),
context,
Map.of(
"status", "failed",
"error.type", exception.getClass().getName(),
"error.message", resolveErrorMessage(exception)
),
exception
);
throw exception;
}
}
private Map<String, Object> buildRequestFields(String prompt, AiObservationContext context) {
Map<String, Object> fields = new LinkedHashMap<>();
fields.put("status", "started");
fields.put("ai.input.preview", support.renderPayload(prompt));
fields.put("ai.input.length", prompt == null ? 0 : prompt.length());
if (StringUtils.hasText(context.modelProvider())) {
fields.put("ai.model.provider", context.modelProvider());
}
if (StringUtils.hasText(context.modelName())) {
fields.put("ai.model.name", context.modelName());
}
return fields;
}
private Map<String, Object> buildResponseFields(String response) {
return Map.of(
"status", "finished",
"ai.output.preview", support.renderPayload(response),
"ai.output.length", response == null ? 0 : response.length()
);
}
private String resolveErrorMessage(Throwable error) {
return error == null || !StringUtils.hasText(error.getMessage()) ? "unknown" : error.getMessage();
}
}
package com.infoepoch.pms.agent.observability.ai;
/**
* AI 观测上下文。
*/
public record AiObservationContext(
String traceId,
String requestId,
String sessionId,
String flowId,
String scene,
AiComponentType componentType,
String componentName,
String agentName,
String modelProvider,
String modelName,
String toolName
) {
public static Builder builder() {
return new Builder();
}
public Builder toBuilder() {
return new Builder()
.traceId(traceId)
.requestId(requestId)
.sessionId(sessionId)
.flowId(flowId)
.scene(scene)
.componentType(componentType)
.componentName(componentName)
.agentName(agentName)
.modelProvider(modelProvider)
.modelName(modelName)
.toolName(toolName);
}
public static final class Builder {
private String traceId;
private String requestId;
private String sessionId;
private String flowId;
private String scene;
private AiComponentType componentType;
private String componentName;
private String agentName;
private String modelProvider;
private String modelName;
private String toolName;
private Builder() {
}
public Builder traceId(String traceId) {
this.traceId = traceId;
return this;
}
public Builder requestId(String requestId) {
this.requestId = requestId;
return this;
}
public Builder sessionId(String sessionId) {
this.sessionId = sessionId;
return this;
}
public Builder flowId(String flowId) {
this.flowId = flowId;
return this;
}
public Builder scene(String scene) {
this.scene = scene;
return this;
}
public Builder componentType(AiComponentType componentType) {
this.componentType = componentType;
return this;
}
public Builder componentName(String componentName) {
this.componentName = componentName;
return this;
}
public Builder agentName(String agentName) {
this.agentName = agentName;
return this;
}
public Builder modelProvider(String modelProvider) {
this.modelProvider = modelProvider;
return this;
}
public Builder modelName(String modelName) {
this.modelName = modelName;
return this;
}
public Builder toolName(String toolName) {
this.toolName = toolName;
return this;
}
public AiObservationContext build() {
return new AiObservationContext(
traceId,
requestId,
sessionId,
flowId,
scene,
componentType,
componentName,
agentName,
modelProvider,
modelName,
toolName
);
}
}
}
package com.infoepoch.pms.agent.observability.ai;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
/**
* 统一注入 AI 观测链路上下文。
*/
public class AiObservationFilter extends OncePerRequestFilter {
public static final String HEADER_TRACE_ID = "X-Trace-Id";
public static final String HEADER_REQUEST_ID = "X-Request-Id";
public static final String REQUEST_ATTRIBUTE_TRACE_ID = "aiObservationTraceId";
public static final String REQUEST_ATTRIBUTE_REQUEST_ID = "aiObservationRequestId";
private final AiObservationSupport support;
public AiObservationFilter(AiObservationSupport support) {
this.support = support;
}
@Override
protected boolean shouldNotFilter(HttpServletRequest request) {
String uri = request.getRequestURI();
return uri == null || !uri.contains("/api/agent/");
}
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain) throws ServletException, IOException {
if (!support.isEnabled()) {
filterChain.doFilter(request, response);
return;
}
String traceId = headerOrGenerate(request, HEADER_TRACE_ID);
String requestId = headerOrGenerate(request, HEADER_REQUEST_ID);
request.setAttribute(REQUEST_ATTRIBUTE_TRACE_ID, traceId);
request.setAttribute(REQUEST_ATTRIBUTE_REQUEST_ID, requestId);
AiObservationContext context = AiObservationContext.builder()
.traceId(traceId)
.requestId(requestId)
.flowId(requestId)
.componentType(AiComponentType.HTTP_INBOUND)
.componentName(request.getRequestURI())
.build();
try {
support.withObservationContext(context, () -> {
try {
filterChain.doFilter(request, response);
return null;
} catch (IOException exception) {
throw new RuntimeIoException(exception);
} catch (ServletException exception) {
throw new RuntimeServletException(exception);
}
});
} catch (RuntimeIoException exception) {
throw (IOException) exception.getCause();
} catch (RuntimeServletException exception) {
throw (ServletException) exception.getCause();
}
}
private String headerOrGenerate(HttpServletRequest request, String headerName) {
String headerValue = request.getHeader(headerName);
return headerValue != null && !headerValue.isBlank() ? headerValue.trim() : AiObservationSupport.newObservationId();
}
private static final class RuntimeIoException extends RuntimeException {
private RuntimeIoException(IOException cause) {
super(cause);
}
}
private static final class RuntimeServletException extends RuntimeException {
private RuntimeServletException(ServletException cause) {
super(cause);
}
}
}
package com.infoepoch.pms.agent.observability.ai;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
/**
* AI 观测日志输出器。
*/
public class AiObservationLogger {
private final AiObservationSupport support;
public AiObservationLogger(AiObservationSupport support) {
this.support = support;
}
public void info(Class<?> source, String message, AiObservationContext context, Map<String, ?> fields) {
log(source, message, context, fields, null);
}
public void error(Class<?> source, String message, AiObservationContext context, Map<String, ?> fields, Throwable error) {
log(source, message, context, fields, error);
}
private void log(Class<?> source, String message, AiObservationContext context, Map<String, ?> fields, Throwable error) {
if (!support.isEnabled()) {
return;
}
Logger logger = LoggerFactory.getLogger(source);
support.withObservationContext(context, () -> {
applyFields(fields);
try {
if (error == null) {
logger.info(message);
} else {
logger.error(message, error);
}
return null;
} finally {
clearFields(fields);
}
});
}
private void applyFields(Map<String, ?> fields) {
if (fields == null || fields.isEmpty()) {
return;
}
fields.forEach((key, value) -> {
if (key != null && value != null) {
MDC.put(key, value.toString());
}
});
}
private void clearFields(Map<String, ?> fields) {
if (fields == null || fields.isEmpty()) {
return;
}
fields.keySet().forEach(MDC::remove);
}
}
package com.infoepoch.pms.agent.observability.ai;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import org.slf4j.MDC;
import org.springframework.util.StringUtils;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
/**
* AI 观测公共能力。
*/
public class AiObservationSupport {
private static final int PREVIEW_LIMIT = 160;
public static final String MDC_SERVICE_NAME = "service.name";
public static final String MDC_TRACE_ID = "trace.id";
public static final String MDC_REQUEST_ID = "request.id";
public static final String MDC_SESSION_ID = "session.id";
public static final String MDC_FLOW_ID = "ai.flow.id";
public static final String MDC_AI_SCENE = "ai.scene";
public static final String MDC_COMPONENT_TYPE = "ai.component.type";
public static final String MDC_COMPONENT_NAME = "ai.component.name";
public static final String MDC_AGENT_NAME = "ai.agent.name";
public static final String MDC_MODEL_PROVIDER = "ai.model.provider";
public static final String MDC_MODEL_NAME = "ai.model.name";
public static final String MDC_TOOL_NAME = "ai.tool.name";
private final AiObservabilityProperties properties;
private final String serviceName;
public AiObservationSupport(AiObservabilityProperties properties, String serviceName) {
this.properties = properties;
this.serviceName = serviceName;
}
public <T> T withObservationContext(AiObservationContext context, Supplier<T> supplier) {
Map<String, String> previous = MDC.getCopyOfContextMap();
try {
applyContext(context);
return supplier.get();
} finally {
restore(previous);
}
}
public String renderPayload(String payload) {
if (properties.getPayloadMode() == AiObservabilityProperties.PayloadMode.OFF) {
return "off";
}
if (payload == null) {
return "长度=0 摘要=";
}
if (properties.getPayloadMode() == AiObservabilityProperties.PayloadMode.FULL) {
return payload;
}
String normalized = payload.replaceAll("\\s+", " ").trim();
if (normalized.length() > PREVIEW_LIMIT) {
normalized = normalized.substring(0, PREVIEW_LIMIT) + "...";
}
return "长度=%s 摘要=%s".formatted(payload.length(), normalized);
}
public AiObservationContext currentContext() {
return AiObservationContext.builder()
.traceId(MDC.get(MDC_TRACE_ID))
.requestId(MDC.get(MDC_REQUEST_ID))
.sessionId(MDC.get(MDC_SESSION_ID))
.flowId(MDC.get(MDC_FLOW_ID))
.scene(MDC.get(MDC_AI_SCENE))
.componentType(resolveComponentType(MDC.get(MDC_COMPONENT_TYPE)))
.componentName(MDC.get(MDC_COMPONENT_NAME))
.agentName(MDC.get(MDC_AGENT_NAME))
.modelProvider(MDC.get(MDC_MODEL_PROVIDER))
.modelName(MDC.get(MDC_MODEL_NAME))
.toolName(MDC.get(MDC_TOOL_NAME))
.build();
}
public Map<String, Object> toMetadata(AiObservationContext context) {
Map<String, Object> metadata = new LinkedHashMap<>();
put(metadata, "traceId", context.traceId());
put(metadata, "requestId", context.requestId());
put(metadata, "sessionId", context.sessionId());
put(metadata, "flowId", context.flowId());
put(metadata, "scene", context.scene());
put(metadata, "componentType", context.componentType() == null ? null : context.componentType().name());
put(metadata, "componentName", context.componentName());
put(metadata, "agentName", context.agentName());
put(metadata, "modelProvider", context.modelProvider());
put(metadata, "modelName", context.modelName());
put(metadata, "toolName", context.toolName());
return metadata;
}
public RunnableConfig buildRunnableConfig(String threadId, AiObservationContext context) {
RunnableConfig.Builder builder = RunnableConfig.builder();
if (StringUtils.hasText(threadId)) {
builder.threadId(threadId);
}
toMetadata(context).forEach(builder::addMetadata);
return builder.build();
}
public AiObservationContext fromMetadata(Map<String, ?> metadata, String threadId) {
if (metadata == null || metadata.isEmpty()) {
return AiObservationContext.builder().sessionId(threadId).build();
}
return AiObservationContext.builder()
.traceId(stringValue(metadata.get("traceId")))
.requestId(stringValue(metadata.get("requestId")))
.sessionId(StringUtils.hasText(stringValue(metadata.get("sessionId"))) ? stringValue(metadata.get("sessionId")) : threadId)
.flowId(stringValue(metadata.get("flowId")))
.scene(stringValue(metadata.get("scene")))
.componentType(resolveComponentType(stringValue(metadata.get("componentType"))))
.componentName(stringValue(metadata.get("componentName")))
.agentName(stringValue(metadata.get("agentName")))
.modelProvider(stringValue(metadata.get("modelProvider")))
.modelName(stringValue(metadata.get("modelName")))
.toolName(stringValue(metadata.get("toolName")))
.build();
}
public String serviceName() {
return serviceName;
}
public boolean isEnabled() {
return properties.isEnabled();
}
public boolean isHttpHeaderPropagationEnabled() {
return properties.getHttp().isPropagateHeaders();
}
public static String newObservationId() {
return UUID.randomUUID().toString().replace("-", "");
}
private void applyContext(AiObservationContext context) {
if (context == null) {
return;
}
putMdc(MDC_SERVICE_NAME, serviceName);
putMdc(MDC_TRACE_ID, context.traceId());
putMdc(MDC_REQUEST_ID, context.requestId());
putMdc(MDC_SESSION_ID, context.sessionId());
putMdc(MDC_FLOW_ID, context.flowId());
putMdc(MDC_AI_SCENE, context.scene());
putMdc(MDC_COMPONENT_TYPE, context.componentType() == null ? null : context.componentType().name());
putMdc(MDC_COMPONENT_NAME, context.componentName());
putMdc(MDC_AGENT_NAME, context.agentName());
putMdc(MDC_MODEL_PROVIDER, context.modelProvider());
putMdc(MDC_MODEL_NAME, context.modelName());
putMdc(MDC_TOOL_NAME, context.toolName());
}
private void restore(Map<String, String> previous) {
MDC.clear();
if (previous != null && !previous.isEmpty()) {
MDC.setContextMap(previous);
}
}
private void putMdc(String key, String value) {
if (!StringUtils.hasText(key)) {
return;
}
if (StringUtils.hasText(value)) {
MDC.put(key, value);
return;
}
MDC.remove(key);
}
private void put(Map<String, Object> map, String key, Object value) {
if (value != null) {
map.put(key, value);
}
}
private String stringValue(Object value) {
return value == null ? null : value.toString();
}
private AiComponentType resolveComponentType(String value) {
if (!StringUtils.hasText(value)) {
return null;
}
try {
return AiComponentType.valueOf(value);
} catch (IllegalArgumentException exception) {
return null;
}
}
}
package com.infoepoch.pms.agent.observability.ai;
import org.springframework.ai.chat.messages.AbstractMessage;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* AI 观测可读日志格式化工具。
*/
public final class AiReadableLogSupport {
private AiReadableLogSupport() {
}
public static String format(AiObservationContext context, String stage, String detail) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] %s".formatted(
safeValue(context == null ? null : context.traceId()),
safeValue(context == null ? null : context.sessionId()),
safeValue(stage),
safeValue(detail)
);
}
public static String formatModelPrompt(AiObservationContext context, String stage, String prompt) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] 模型请求内容:%n%s".formatted(
safeValue(context == null ? null : context.traceId()),
safeValue(context == null ? null : context.sessionId()),
safeValue(stage),
prompt == null ? "" : prompt
);
}
public static String formatModelMessages(AiObservationContext context, String stage, List<Message> messages) {
String lineSeparator = System.lineSeparator();
StringBuilder builder = new StringBuilder("[链路ID=%s] [会话ID=%s] [阶段=%s] 模型请求消息:%s".formatted(
safeValue(context == null ? null : context.traceId()),
safeValue(context == null ? null : context.sessionId()),
safeValue(stage),
lineSeparator
));
if (CollectionUtils.isEmpty(messages)) {
return builder.append("(空)").toString();
}
for (int index = 0; index < messages.size(); index++) {
Message message = messages.get(index);
builder.append('[')
.append(index + 1)
.append("] role=")
.append(resolveRole(message))
.append(lineSeparator)
.append(resolveText(message));
if (index < messages.size() - 1) {
builder.append(lineSeparator);
}
}
return builder.toString();
}
public static String formatModelResponse(AiObservationContext context, String stage, String response) {
return "[链路ID=%s] [会话ID=%s] [阶段=%s] 模型返回内容:%n%s".formatted(
safeValue(context == null ? null : context.traceId()),
safeValue(context == null ? null : context.sessionId()),
safeValue(stage),
response == null ? "" : response
);
}
public static String findLastAssistantText(List<Message> messages) {
if (CollectionUtils.isEmpty(messages)) {
return "";
}
for (int index = messages.size() - 1; index >= 0; index--) {
Message message = messages.get(index);
if (message != null && message.getMessageType() == MessageType.ASSISTANT) {
return resolveText(message);
}
}
return "";
}
public static String stageWithScene(AiObservationContext context, String suffix) {
if (context != null && StringUtils.hasText(context.scene())) {
return context.scene() + suffix;
}
return suffix;
}
private static String resolveRole(Message message) {
if (message == null || message.getMessageType() == null) {
return "unknown";
}
return message.getMessageType().getValue();
}
private static String resolveText(Message message) {
if (message instanceof AbstractMessage abstractMessage) {
return abstractMessage.getText() == null ? "" : abstractMessage.getText();
}
return message == null ? "" : message.toString();
}
private static String safeValue(String value) {
return StringUtils.hasText(value) ? value : "-";
}
}
package com.infoepoch.pms.agent.observability.ai;
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.infoepoch.pms.agent.observability.ai.hook.AiAgentLifecycleObservationHook;
import com.infoepoch.pms.agent.observability.ai.hook.AiModelMessageObservationHook;
import org.springframework.ai.chat.model.ChatModel;
/**
* 带通用观测 hooks 的 ReactAgent 工厂。
*/
public class ObservedReactAgentFactory {
private final AiAgentLifecycleObservationHook agentLifecycleObservationHook;
private final AiModelMessageObservationHook modelMessageObservationHook;
public ObservedReactAgentFactory(AiAgentLifecycleObservationHook agentLifecycleObservationHook,
AiModelMessageObservationHook modelMessageObservationHook) {
this.agentLifecycleObservationHook = agentLifecycleObservationHook;
this.modelMessageObservationHook = modelMessageObservationHook;
}
public ReactAgent create(String agentName, ChatModel chatModel, String systemPrompt) {
return ReactAgent.builder()
.name(agentName)
.model(chatModel)
.hooks(agentLifecycleObservationHook, modelMessageObservationHook)
.systemPrompt(systemPrompt)
.build();
}
}
package com.infoepoch.pms.agent.observability.ai;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.BufferingClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClient;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 带观测能力的 RestClient 工厂。
*/
public class ObservedRestClientFactory {
public static final String HEADER_TRACE_ID = "X-Trace-Id";
public static final String HEADER_REQUEST_ID = "X-Request-Id";
public static final String HEADER_SESSION_ID = "X-Session-Id";
private final AiObservationSupport support;
public ObservedRestClientFactory(AiObservationSupport support) {
this.support = support;
}
public RestClient create(String baseUrl, ClientHttpRequestFactory requestFactory, String componentName) {
RestClient.Builder builder = RestClient.builder()
.requestFactory(requestFactory == null ? defaultRequestFactory() : requestFactory)
.requestInterceptor(new ObservationClientHttpRequestInterceptor(componentName));
if (StringUtils.hasText(baseUrl)) {
builder.baseUrl(baseUrl);
}
return builder.build();
}
public ClientHttpRequestFactory defaultRequestFactory() {
return new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
}
private final class ObservationClientHttpRequestInterceptor implements ClientHttpRequestInterceptor {
private final String componentName;
private ObservationClientHttpRequestInterceptor(String componentName) {
this.componentName = componentName;
}
@Override
public ClientHttpResponse intercept(HttpRequest request,
byte[] body,
ClientHttpRequestExecution execution) throws IOException {
AiObservationContext currentContext = support.currentContext();
if (support.isHttpHeaderPropagationEnabled() && StringUtils.hasText(currentContext.traceId())) {
request.getHeaders().set(HEADER_TRACE_ID, currentContext.traceId());
}
if (support.isHttpHeaderPropagationEnabled() && StringUtils.hasText(currentContext.requestId())) {
request.getHeaders().set(HEADER_REQUEST_ID, currentContext.requestId());
}
if (support.isHttpHeaderPropagationEnabled() && StringUtils.hasText(currentContext.sessionId())) {
request.getHeaders().set(HEADER_SESSION_ID, currentContext.sessionId());
}
AiObservationContext outboundContext = currentContext.toBuilder()
.componentType(AiComponentType.HTTP_OUTBOUND)
.componentName(componentName)
.build();
Map<String, Object> requestFields = new LinkedHashMap<>();
requestFields.put("http.method", request.getMethod());
requestFields.put("http.url", request.getURI().toString());
requestFields.put("status", "started");
AiObservationLogger logger = new AiObservationLogger(support);
logger.info(ObservedRestClientFactory.class, "ai.http.client.request", outboundContext, requestFields);
try {
ClientHttpResponse response = support.withObservationContext(outboundContext, () -> {
try {
return execution.execute(request, body);
} catch (IOException exception) {
throw new RuntimeIoException(exception);
}
});
logger.info(
ObservedRestClientFactory.class,
"ai.http.client.response",
outboundContext,
Map.of(
"http.status_code", response.getStatusCode().value(),
"status", "finished"
)
);
return response;
} catch (RuntimeIoException exception) {
logger.error(
ObservedRestClientFactory.class,
"ai.request.failed",
outboundContext,
Map.of(
"status", "failed",
"error.type", IOException.class.getName(),
"error.message", exception.getCause().getMessage()
),
exception.getCause()
);
throw (IOException) exception.getCause();
}
}
}
private static final class RuntimeIoException extends RuntimeException {
private RuntimeIoException(IOException cause) {
super(cause);
}
}
}
package com.infoepoch.pms.agent.observability.ai.hook;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.hook.AgentHook;
import com.alibaba.cloud.ai.graph.agent.hook.HookPosition;
import com.infoepoch.pms.agent.observability.ai.AiComponentType;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.observability.ai.AiReadableLogSupport;
import org.springframework.util.StringUtils;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* 通用 Agent 生命周期观测 hook。
*/
public class AiAgentLifecycleObservationHook extends AgentHook {
private static final String HOOK_NAME = "aiAgentLifecycleObservationHook";
private final AiObservationLogger logger;
private final AiObservationSupport support;
public AiAgentLifecycleObservationHook(AiObservationLogger logger, AiObservationSupport support) {
this.logger = logger;
this.support = support;
}
@Override
public String getName() {
return HOOK_NAME;
}
@Override
public HookPosition[] getHookPositions() {
return new HookPosition[]{HookPosition.BEFORE_AGENT, HookPosition.AFTER_AGENT};
}
@Override
public CompletableFuture<Map<String, Object>> beforeAgent(OverAllState state, RunnableConfig runnableConfig) {
AiObservationContext context = buildContext(runnableConfig);
logger.info(
AiAgentLifecycleObservationHook.class,
AiReadableLogSupport.format(
context,
"Agent开始",
"场景=%s,agent=%s".formatted(
safeValue(context.scene()),
safeValue(context.agentName())
)
),
context,
Map.of("status", "started")
);
return CompletableFuture.completedFuture(Collections.emptyMap());
}
@Override
public CompletableFuture<Map<String, Object>> afterAgent(OverAllState state, RunnableConfig runnableConfig) {
AiObservationContext context = buildContext(runnableConfig);
logger.info(
AiAgentLifecycleObservationHook.class,
AiReadableLogSupport.format(
context,
"Agent完成",
"场景=%s,agent=%s".formatted(
safeValue(context.scene()),
safeValue(context.agentName())
)
),
context,
Map.of("status", "finished")
);
return CompletableFuture.completedFuture(Collections.emptyMap());
}
private AiObservationContext buildContext(RunnableConfig runnableConfig) {
Map<String, Object> metadata = runnableConfig.metadata().orElse(Map.of());
AiObservationContext current = support.fromMetadata(metadata, runnableConfig.threadId().orElse(null));
return current.toBuilder()
.componentType(AiComponentType.AGENT)
.componentName(StringUtils.hasText(current.componentName()) ? current.componentName() : getAgentName())
.agentName(StringUtils.hasText(current.agentName()) ? current.agentName() : getAgentName())
.build();
}
private String safeValue(String value) {
return StringUtils.hasText(value) ? value : "-";
}
}
package com.infoepoch.pms.agent.observability.ai.hook;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.hook.HookPosition;
import com.alibaba.cloud.ai.graph.agent.hook.messages.AgentCommand;
import com.alibaba.cloud.ai.graph.agent.hook.messages.MessagesModelHook;
import com.alibaba.cloud.ai.graph.agent.hook.messages.UpdatePolicy;
import com.infoepoch.pms.agent.observability.ai.AiComponentType;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.observability.ai.AiReadableLogSupport;
import org.springframework.ai.chat.messages.Message;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Map;
/**
* 通用模型消息观测 hook。
*/
public class AiModelMessageObservationHook extends MessagesModelHook {
private static final String HOOK_NAME = "aiModelMessageObservationHook";
private final AiObservationLogger logger;
private final AiObservationSupport support;
public AiModelMessageObservationHook(AiObservationLogger logger, AiObservationSupport support) {
this.logger = logger;
this.support = support;
}
@Override
public String getName() {
return HOOK_NAME;
}
@Override
public HookPosition[] getHookPositions() {
return new HookPosition[]{HookPosition.BEFORE_MODEL, HookPosition.AFTER_MODEL};
}
@Override
public AgentCommand beforeModel(List<Message> messages, RunnableConfig runnableConfig) {
AiObservationContext context = buildContext(runnableConfig);
String renderedMessages = AiReadableLogSupport.formatModelMessages(
context,
AiReadableLogSupport.stageWithScene(context, "模型请求"),
messages
);
logger.info(
AiModelMessageObservationHook.class,
renderedMessages,
context,
Map.of(
"status", "started",
"ai.input.preview", renderedMessages,
"ai.input.length", renderedMessages.length()
)
);
return new AgentCommand(messages, UpdatePolicy.REPLACE);
}
@Override
public AgentCommand afterModel(List<Message> messages, RunnableConfig runnableConfig) {
AiObservationContext context = buildContext(runnableConfig);
String response = AiReadableLogSupport.findLastAssistantText(messages);
logger.info(
AiModelMessageObservationHook.class,
AiReadableLogSupport.formatModelResponse(
context,
AiReadableLogSupport.stageWithScene(context, "模型完整返回"),
response
),
context,
Map.of(
"status", "finished",
"ai.output.preview", response == null ? "" : response,
"ai.output.length", response == null ? 0 : response.length()
)
);
return new AgentCommand(messages, UpdatePolicy.REPLACE);
}
private AiObservationContext buildContext(RunnableConfig runnableConfig) {
Map<String, Object> metadata = runnableConfig.metadata().orElse(Map.of());
AiObservationContext current = support.fromMetadata(metadata, runnableConfig.threadId().orElse(null));
return current.toBuilder()
.componentType(AiComponentType.MODEL)
.componentName(StringUtils.hasText(current.componentName())
? current.componentName()
: getAgentName() + ".model")
.agentName(StringUtils.hasText(current.agentName()) ? current.agentName() : getAgentName())
.build();
}
}
package com.infoepoch.pms.agent.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* AI 观测配置。
*/
@ConfigurationProperties(prefix = "pms.observability.ai")
public class AiObservabilityProperties {
private boolean enabled = true;
private PayloadMode payloadMode = PayloadMode.FULL;
private final Http http = new Http();
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public PayloadMode getPayloadMode() {
return payloadMode;
}
public void setPayloadMode(PayloadMode payloadMode) {
this.payloadMode = payloadMode;
}
public Http getHttp() {
return http;
}
public enum PayloadMode {
PREVIEW,
FULL,
OFF
}
public static class Http {
private boolean propagateHeaders = true;
public boolean isPropagateHeaders() {
return propagateHeaders;
}
public void setPropagateHeaders(boolean propagateHeaders) {
this.propagateHeaders = propagateHeaders;
}
}
}
...@@ -8,11 +8,9 @@ import com.infoepoch.pms.agent.domain.care.model.ActivityMatchUserRule; ...@@ -8,11 +8,9 @@ import com.infoepoch.pms.agent.domain.care.model.ActivityMatchUserRule;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary; import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
import com.infoepoch.pms.agent.domain.care.model.PagedResult; import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary; import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.observability.ai.ObservedRestClientFactory;
import com.infoepoch.pms.agent.properties.CareBusinessProperties; import com.infoepoch.pms.agent.properties.CareBusinessProperties;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.client.BufferingClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClient; import org.springframework.web.client.RestClient;
...@@ -39,13 +37,14 @@ public class HttpCareBusinessDataProvider implements CareBusinessDataProvider { ...@@ -39,13 +37,14 @@ public class HttpCareBusinessDataProvider implements CareBusinessDataProvider {
private final CareBusinessProperties properties; private final CareBusinessProperties properties;
private final RestClient restClient; private final RestClient restClient;
public HttpCareBusinessDataProvider(CareBusinessProperties properties) { public HttpCareBusinessDataProvider(CareBusinessProperties properties,
ObservedRestClientFactory observedRestClientFactory) {
this.properties = properties; this.properties = properties;
RestClient.Builder builder = RestClient.builder().requestFactory(buildRequestFactory()); this.restClient = observedRestClientFactory.create(
if (StringUtils.hasText(properties.getBaseUrl())) { properties.getBaseUrl(),
builder.baseUrl(properties.getBaseUrl()); observedRestClientFactory.defaultRequestFactory(),
} "careBusinessProvider"
this.restClient = builder.build(); );
} }
/** /**
...@@ -224,13 +223,6 @@ public class HttpCareBusinessDataProvider implements CareBusinessDataProvider { ...@@ -224,13 +223,6 @@ public class HttpCareBusinessDataProvider implements CareBusinessDataProvider {
} }
} }
/**
* 创建兼容业务网关响应的 HTTP 客户端请求工厂。
*/
private ClientHttpRequestFactory buildRequestFactory() {
return new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
}
/** /**
* 执行 HTTP POST 请求并将返回值解析为 JsonNode。 * 执行 HTTP POST 请求并将返回值解析为 JsonNode。
*/ */
......
...@@ -35,6 +35,12 @@ jasypt: ...@@ -35,6 +35,12 @@ jasypt:
iv-generator-classname: org.jasypt.iv.NoIvGenerator iv-generator-classname: org.jasypt.iv.NoIvGenerator
pms: pms:
observability:
ai:
enabled: true
payload-mode: full
http:
propagate-headers: true
care: care:
agent: agent:
default-count: 10 default-count: 10
......
package com.infoepoch.pms.agent.domain.care.orchestrator; package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.agent.ReactAgent; import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.infoepoch.pms.agent.domain.care.model.ActivitySummary; import com.infoepoch.pms.agent.domain.care.model.ActivitySummary;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState; import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery; import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType; import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary; import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService; import com.infoepoch.pms.agent.domain.care.state.CareConversationStateService;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler; import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import com.infoepoch.pms.agent.properties.CareAgentProperties; import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.CurrentUserProfileTool; import com.infoepoch.pms.agent.tool.union.care.CurrentUserProfileTool;
import com.infoepoch.pms.agent.tool.union.care.EligibleActivitiesTool; import com.infoepoch.pms.agent.tool.union.care.EligibleActivitiesTool;
...@@ -26,6 +31,7 @@ import java.util.Set; ...@@ -26,6 +31,7 @@ import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
...@@ -57,7 +63,8 @@ class ActivityRecommendationExecutorTest { ...@@ -57,7 +63,8 @@ class ActivityRecommendationExecutorTest {
conversationStateService, conversationStateService,
new CareStreamingResponseAssembler(), new CareStreamingResponseAssembler(),
properties, properties,
careAgent careAgent,
new AiObservationSupport(new AiObservabilityProperties(), "pms-agent")
); );
} }
...@@ -93,13 +100,23 @@ class ActivityRecommendationExecutorTest { ...@@ -93,13 +100,23 @@ class ActivityRecommendationExecutorTest {
activity("a-3"), activity("a-3"),
activity("a-4") activity("a-4")
)); ));
when(careAgent.stream(anyString())).thenReturn((Flux) Flux.empty()); when(careAgent.stream(anyString(), any(RunnableConfig.class))).thenReturn((Flux) Flux.empty());
executor.execute("trace-1", "session-1", query, state).collectList().block(); executor.execute("trace-1", "session-1", query, state).collectList().block();
ArgumentCaptor<CareConversationState> captor = ArgumentCaptor.forClass(CareConversationState.class); ArgumentCaptor<CareConversationState> captor = ArgumentCaptor.forClass(CareConversationState.class);
verify(conversationStateService).save(captor.capture()); verify(conversationStateService).save(captor.capture());
assertEquals(Set.of("old-1", "a-1", "a-2"), captor.getValue().deliveredIds()); assertEquals(Set.of("old-1", "a-1", "a-2"), captor.getValue().deliveredIds());
ArgumentCaptor<RunnableConfig> configCaptor = ArgumentCaptor.forClass(RunnableConfig.class);
verify(careAgent).stream(anyString(), configCaptor.capture());
RunnableConfig runnableConfig = configCaptor.getValue();
assertEquals("session-1", runnableConfig.threadId().orElseThrow());
assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_TRACE_ID));
assertEquals("session-1", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_SESSION_ID));
assertEquals("活动推荐", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_SCENE));
assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get("requestId"));
assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get("flowId"));
} }
@Test @Test
...@@ -118,7 +135,8 @@ class ActivityRecommendationExecutorTest { ...@@ -118,7 +135,8 @@ class ActivityRecommendationExecutorTest {
.thenReturn(new UserProfileSummary("u-1", "张三", 68, "男", List.of(), List.of(), "浦东", "")); .thenReturn(new UserProfileSummary("u-1", "张三", 68, "男", List.of(), List.of(), "浦东", ""));
when(eligibleActivitiesTool.execute("trace-1", "session-1")) when(eligibleActivitiesTool.execute("trace-1", "session-1"))
.thenReturn(List.of(activity("a-1"), activity("a-2"), activity("a-3"))); .thenReturn(List.of(activity("a-1"), activity("a-2"), activity("a-3")));
when(careAgent.stream(anyString())).thenReturn((Flux) Flux.error(new RuntimeException("boom"))); when(careAgent.stream(anyString(), any(RunnableConfig.class)))
.thenReturn((Flux) Flux.error(new RuntimeException("boom")));
assertThrows(RuntimeException.class, assertThrows(RuntimeException.class,
() -> executor.execute("trace-1", "session-1", query, null).collectList().block()); () -> executor.execute("trace-1", "session-1", query, null).collectList().block());
......
package com.infoepoch.pms.agent.domain.care.orchestrator; package com.infoepoch.pms.agent.domain.care.orchestrator;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.infoepoch.pms.agent.domain.care.log.CareTraceLogSupport;
import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler; import com.infoepoch.pms.agent.domain.care.stream.CareStreamingResponseAssembler;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.lang.reflect.Method; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class GeneralConversationExecutorTest { class GeneralConversationExecutorTest {
@Mock
private ReactAgent careAgent;
@Test @Test
void shouldBuildPromptWithOnlyRoleAndUserMessage() throws Exception { void shouldPassRunnableConfigAndUseFallbackWhenAgentReturnsNoContent() throws Exception {
GeneralConversationExecutor executor = new GeneralConversationExecutor( GeneralConversationExecutor executor = new GeneralConversationExecutor(
new CareStreamingResponseAssembler(), new CareStreamingResponseAssembler(),
null careAgent,
new AiObservationSupport(new AiObservabilityProperties(), "pms-agent")
);
CareQuery query = new CareQuery(
CareTaskType.GENERAL_CHAT,
10,
false,
"活动历史有哪些",
"",
"活动历史有哪些",
Map.of(),
""
); );
Method method = GeneralConversationExecutor.class.getDeclaredMethod("buildPrompt", String.class); when(careAgent.stream(anyString(), any(RunnableConfig.class))).thenReturn((Flux) Flux.empty());
method.setAccessible(true);
StepVerifier.create(executor.execute("trace-1", "session-1", query))
String prompt = (String) method.invoke(executor, "活动历史有哪些"); .expectNextMatches(chunk -> chunk.contains("你好,我是精准关爱智能体"))
.verifyComplete();
assertTrue(prompt.contains("请以精准关爱智能体的身份回复用户。"));
assertTrue(prompt.contains("用户输入:活动历史有哪些")); ArgumentCaptor<RunnableConfig> configCaptor = ArgumentCaptor.forClass(RunnableConfig.class);
assertFalse(prompt.contains("你只有以下三个技能")); verify(careAgent).stream(anyString(), configCaptor.capture());
assertFalse(prompt.contains("有限通用对话")); RunnableConfig runnableConfig = configCaptor.getValue();
assertFalse(prompt.contains("暂无推荐")); assertEquals("session-1", runnableConfig.threadId().orElseThrow());
assertFalse(prompt.contains("不扩展成百科问答或开放世界知识回答")); assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_TRACE_ID));
assertFalse(prompt.contains("引导用户改问")); assertEquals("session-1", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_SESSION_ID));
assertEquals("通用对话", runnableConfig.metadata().orElseThrow().get(CareTraceLogSupport.METADATA_SCENE));
assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get("requestId"));
assertEquals("trace-1", runnableConfig.metadata().orElseThrow().get("flowId"));
} }
} }
...@@ -4,6 +4,10 @@ import com.infoepoch.pms.agent.domain.care.model.CareConversationState; ...@@ -4,6 +4,10 @@ import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery; import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType; import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.domain.care.understanding.CareQueryUnderstandingService; import com.infoepoch.pms.agent.domain.care.understanding.CareQueryUnderstandingService;
import com.infoepoch.pms.agent.observability.ai.AiModelInvoker;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import com.infoepoch.pms.agent.properties.CareAgentProperties; import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider; import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
...@@ -103,8 +107,15 @@ class CareConversationStateServiceTest { ...@@ -103,8 +107,15 @@ class CareConversationStateServiceTest {
CareAgentProperties properties = new CareAgentProperties(); CareAgentProperties properties = new CareAgentProperties();
properties.setDefaultCount(10); properties.setDefaultCount(10);
properties.setMaxRequestedCount(500); properties.setMaxRequestedCount(500);
AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
CareQueryUnderstandingService understandingService = CareQueryUnderstandingService understandingService =
new CareQueryUnderstandingService(chatModel, properties, dataProvider); new CareQueryUnderstandingService(
chatModel,
properties,
dataProvider,
new AiModelInvoker(new AiObservationLogger(support), support),
support
);
CareQuery query = understandingService.understand("trace-1", "session-1", "再来一些", loaded.orElse(null)); CareQuery query = understandingService.understand("trace-1", "session-1", "再来一些", loaded.orElse(null));
......
...@@ -4,6 +4,10 @@ import com.infoepoch.pms.agent.domain.care.model.ActivityMatchUserRule; ...@@ -4,6 +4,10 @@ import com.infoepoch.pms.agent.domain.care.model.ActivityMatchUserRule;
import com.infoepoch.pms.agent.domain.care.model.CareConversationState; import com.infoepoch.pms.agent.domain.care.model.CareConversationState;
import com.infoepoch.pms.agent.domain.care.model.CareQuery; import com.infoepoch.pms.agent.domain.care.model.CareQuery;
import com.infoepoch.pms.agent.domain.care.model.CareTaskType; import com.infoepoch.pms.agent.domain.care.model.CareTaskType;
import com.infoepoch.pms.agent.observability.ai.AiModelInvoker;
import com.infoepoch.pms.agent.observability.ai.AiObservationLogger;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import com.infoepoch.pms.agent.properties.CareAgentProperties; import com.infoepoch.pms.agent.properties.CareAgentProperties;
import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider; import com.infoepoch.pms.agent.tool.union.care.provider.CareBusinessDataProvider;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
...@@ -42,7 +46,9 @@ class CareQueryUnderstandingServiceTest { ...@@ -42,7 +46,9 @@ class CareQueryUnderstandingServiceTest {
CareAgentProperties properties = new CareAgentProperties(); CareAgentProperties properties = new CareAgentProperties();
properties.setDefaultCount(10); properties.setDefaultCount(10);
properties.setMaxRequestedCount(500); properties.setMaxRequestedCount(500);
service = new CareQueryUnderstandingService(chatModel, properties, careBusinessDataProvider); AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
AiModelInvoker invoker = new AiModelInvoker(new AiObservationLogger(support), support);
service = new CareQueryUnderstandingService(chatModel, properties, careBusinessDataProvider, invoker, support);
} }
@Test @Test
......
package com.infoepoch.pms.agent.observability.ai;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.model.ChatModel;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
class AiModelInvokerTest {
@Test
void shouldLogReadableModelRequestAndResponse() {
ChatModel chatModel = Mockito.mock(ChatModel.class);
when(chatModel.call(anyString())).thenReturn("GENERAL_CHAT");
AiObservabilityProperties properties = new AiObservabilityProperties();
properties.setPayloadMode(AiObservabilityProperties.PayloadMode.FULL);
AiObservationSupport support = new AiObservationSupport(properties, "pms-agent");
AiObservationLogger logger = new AiObservationLogger(support);
AiModelInvoker invoker = new AiModelInvoker(logger, support);
AiObservationContext context = AiObservationContext.builder()
.traceId("trace-1")
.requestId("request-1")
.sessionId("session-1")
.flowId("flow-1")
.scene("任务分类")
.componentType(AiComponentType.MODEL)
.componentName("intent-classifier")
.modelProvider("siliconflow")
.modelName("deepseek")
.build();
ListAppender<ILoggingEvent> appender = attachAppender(AiModelInvoker.class);
String response = invoker.call(context, "用户输入:你好", chatModel);
assertEquals("GENERAL_CHAT", response);
String logs = joinedLogs(appender);
assertTrue(logs.contains("[链路ID=trace-1]"));
assertTrue(logs.contains("[会话ID=session-1]"));
assertTrue(logs.contains("[阶段=任务分类模型请求]"));
assertTrue(logs.contains("[阶段=任务分类模型完整返回]"));
assertTrue(logs.contains("用户输入:你好"));
assertTrue(logs.contains("GENERAL_CHAT"));
}
private ListAppender<ILoggingEvent> attachAppender(Class<?> loggerClass) {
Logger logger = (Logger) LoggerFactory.getLogger(loggerClass);
ListAppender<ILoggingEvent> appender = new ListAppender<>();
appender.start();
logger.addAppender(appender);
return appender;
}
private String joinedLogs(ListAppender<ILoggingEvent> appender) {
return appender.list.stream()
.map(ILoggingEvent::getFormattedMessage)
.collect(Collectors.joining("\n"));
}
}
package com.infoepoch.pms.agent.observability.ai;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.infoepoch.pms.agent.observability.ai.hook.AiAgentLifecycleObservationHook;
import com.infoepoch.pms.agent.observability.ai.hook.AiModelMessageObservationHook;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertTrue;
class AiObservationHookTest {
@Test
void shouldLogReadableAgentLifecycleMessages() {
AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
AiObservationLogger logger = new AiObservationLogger(support);
AiAgentLifecycleObservationHook hook = new AiAgentLifecycleObservationHook(logger, support);
ListAppender<ILoggingEvent> appender = attachAppender(AiAgentLifecycleObservationHook.class);
hook.beforeAgent(null, runnableConfig(support));
hook.afterAgent(null, runnableConfig(support));
String logs = joinedLogs(appender);
assertTrue(logs.contains("[链路ID=trace-1]"));
assertTrue(logs.contains("[会话ID=session-1]"));
assertTrue(logs.contains("[阶段=Agent开始]"));
assertTrue(logs.contains("[阶段=Agent完成]"));
assertTrue(logs.contains("场景=通用对话"));
assertTrue(logs.contains("agent=careAgent"));
}
@Test
void shouldLogReadableModelRequestAndResponseMessages() {
AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
AiObservationLogger logger = new AiObservationLogger(support);
AiModelMessageObservationHook hook = new AiModelMessageObservationHook(logger, support);
ListAppender<ILoggingEvent> appender = attachAppender(AiModelMessageObservationHook.class);
List<Message> beforeMessages = List.of(
new SystemMessage("你是精准关爱智能体"),
new UserMessage("活动历史有哪些")
);
List<Message> afterMessages = List.of(
new SystemMessage("你是精准关爱智能体"),
new UserMessage("活动历史有哪些"),
new AssistantMessage("这里是模型回复")
);
hook.beforeModel(beforeMessages, runnableConfig(support));
hook.afterModel(afterMessages, runnableConfig(support));
String logs = joinedLogs(appender);
assertTrue(logs.contains("[链路ID=trace-1]"));
assertTrue(logs.contains("[会话ID=session-1]"));
assertTrue(logs.contains("[阶段=通用对话模型请求]"));
assertTrue(logs.contains("[阶段=通用对话模型完整返回]"));
assertTrue(logs.contains("role=system"));
assertTrue(logs.contains("role=user"));
assertTrue(logs.contains("活动历史有哪些"));
assertTrue(logs.contains("这里是模型回复"));
}
private RunnableConfig runnableConfig(AiObservationSupport support) {
AiObservationContext context = AiObservationContext.builder()
.traceId("trace-1")
.requestId("request-1")
.sessionId("session-1")
.flowId("flow-1")
.scene("通用对话")
.componentName("careAgent")
.agentName("careAgent")
.build();
return support.buildRunnableConfig("session-1", context);
}
private ListAppender<ILoggingEvent> attachAppender(Class<?> loggerClass) {
Logger logger = (Logger) LoggerFactory.getLogger(loggerClass);
ListAppender<ILoggingEvent> appender = new ListAppender<>();
appender.start();
logger.addAppender(appender);
return appender;
}
private String joinedLogs(ListAppender<ILoggingEvent> appender) {
return appender.list.stream()
.map(ILoggingEvent::getFormattedMessage)
.collect(Collectors.joining("\n"));
}
}
package com.infoepoch.pms.agent.tool.union.care.provider; package com.infoepoch.pms.agent.tool.union.care.provider;
import com.infoepoch.pms.agent.observability.ai.AiObservationContext;
import com.infoepoch.pms.agent.observability.ai.AiObservationSupport;
import com.infoepoch.pms.agent.observability.ai.ObservedRestClientFactory;
import com.infoepoch.pms.agent.properties.AiObservabilityProperties;
import com.infoepoch.pms.agent.domain.care.model.PagedResult; import com.infoepoch.pms.agent.domain.care.model.PagedResult;
import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary; import com.infoepoch.pms.agent.domain.care.model.UserProfileSummary;
import com.infoepoch.pms.agent.properties.CareBusinessProperties; import com.infoepoch.pms.agent.properties.CareBusinessProperties;
...@@ -12,14 +16,19 @@ import java.io.IOException; ...@@ -12,14 +16,19 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Map; import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpCareBusinessDataProviderTest { class HttpCareBusinessDataProviderTest {
private HttpServer server; private HttpServer server;
private final AtomicReference<String> latestTraceIdHeader = new AtomicReference<>();
private final AtomicReference<String> latestRequestIdHeader = new AtomicReference<>();
private final AtomicReference<String> latestSessionIdHeader = new AtomicReference<>();
@AfterEach @AfterEach
void tearDown() { void tearDown() {
...@@ -112,15 +121,55 @@ class HttpCareBusinessDataProviderTest { ...@@ -112,15 +121,55 @@ class HttpCareBusinessDataProviderTest {
assertNotEquals(result.records().get(0).userId(), result.records().get(1).userId()); assertNotEquals(result.records().get(0).userId(), result.records().get(1).userId());
} }
@Test
void shouldPropagateObservationHeadersWhenSearchingUsers() throws IOException {
HttpCareBusinessDataProvider provider = createProvider("""
{
"code": 1,
"msg": "ok",
"data": {
"totalCount": 0,
"dataList": []
}
}
""");
AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
AiObservationContext context = AiObservationContext.builder()
.traceId("trace-1")
.requestId("request-1")
.sessionId("session-1")
.flowId("flow-1")
.scene("用户检索")
.build();
support.withObservationContext(context, () -> {
provider.searchUsersByConditions("trace-1", "session-1", Map.of(), 1, 10);
return null;
});
assertEquals("trace-1", latestTraceIdHeader.get());
assertEquals("request-1", latestRequestIdHeader.get());
assertEquals("session-1", latestSessionIdHeader.get());
}
private HttpCareBusinessDataProvider createProvider(String responseBody) throws IOException { private HttpCareBusinessDataProvider createProvider(String responseBody) throws IOException {
server = HttpServer.create(new InetSocketAddress(0), 0); server = HttpServer.create(new InetSocketAddress(0), 0);
server.createContext("/union-js/api/functionCallTools/getUserInfoList", exchange -> writeJsonResponse(exchange, responseBody)); server.createContext("/union-js/api/functionCallTools/getUserInfoList", exchange -> {
latestTraceIdHeader.set(exchange.getRequestHeaders().getFirst(ObservedRestClientFactory.HEADER_TRACE_ID));
latestRequestIdHeader.set(exchange.getRequestHeaders().getFirst(ObservedRestClientFactory.HEADER_REQUEST_ID));
latestSessionIdHeader.set(exchange.getRequestHeaders().getFirst(ObservedRestClientFactory.HEADER_SESSION_ID));
writeJsonResponse(exchange, responseBody);
});
server.start(); server.start();
CareBusinessProperties properties = new CareBusinessProperties(); CareBusinessProperties properties = new CareBusinessProperties();
properties.setBaseUrl("http://127.0.0.1:" + server.getAddress().getPort()); properties.setBaseUrl("http://127.0.0.1:" + server.getAddress().getPort());
properties.setUserSearchPath("/union-js/api/functionCallTools/getUserInfoList"); properties.setUserSearchPath("/union-js/api/functionCallTools/getUserInfoList");
return new HttpCareBusinessDataProvider(properties); AiObservationSupport support = new AiObservationSupport(new AiObservabilityProperties(), "pms-agent");
return new HttpCareBusinessDataProvider(
properties,
new ObservedRestClientFactory(support)
);
} }
private void writeJsonResponse(HttpExchange exchange, String responseBody) throws IOException { private void writeJsonResponse(HttpExchange exchange, String responseBody) throws IOException {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment