Commit 4c9fd737 authored by 赵灿灿's avatar 赵灿灿

修改调用能运地址

parent b0882951
Pipeline #22446 failed with stages
in 3 minutes and 42 seconds
...@@ -25,10 +25,8 @@ import com.infoepoch.pms.dispatchassistant.domain.langchain.qwen.QwenLine; ...@@ -25,10 +25,8 @@ import com.infoepoch.pms.dispatchassistant.domain.langchain.qwen.QwenLine;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.Conversation; import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.Conversation;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.ConversationCriteria; import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.ConversationCriteria;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.ConversationService; import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.ConversationService;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line.ConversationLine; import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.HttpClientPoolConfig;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line.ConversationLineCriteria; import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line.*;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line.ConversationLineService;
import com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line.QuestionRequest;
import com.infoepoch.pms.dispatchassistant.domain.system.dict.DictDataService; import com.infoepoch.pms.dispatchassistant.domain.system.dict.DictDataService;
import com.infoepoch.pms.dispatchassistant.domain.system.dict.SystemDictData; import com.infoepoch.pms.dispatchassistant.domain.system.dict.SystemDictData;
import io.micrometer.core.instrument.util.StringUtils; import io.micrometer.core.instrument.util.StringUtils;
...@@ -80,6 +78,8 @@ public class LangChainController { ...@@ -80,6 +78,8 @@ public class LangChainController {
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Autowired @Autowired
private RedisTool redisTool; private RedisTool redisTool;
@Autowired
private HttpClientPoolConfig httpClientPool;
private static final Logger logger = LoggerFactory.getLogger(LangChainController.class); private static final Logger logger = LoggerFactory.getLogger(LangChainController.class);
...@@ -882,25 +882,19 @@ public class LangChainController { ...@@ -882,25 +882,19 @@ public class LangChainController {
@GetMapping("/sseFusionIntelligent") @GetMapping("/sseFusionIntelligent")
public ResponseEntity<SseEmitter> sseFusionIntelligent(@RequestParam String chatMessage,@RequestParam String dialogId, public ResponseEntity<SseEmitter> sseFusionIntelligent(@RequestParam String chatMessage,@RequestParam String dialogId,
@RequestParam String selectedExpert ,@RequestParam String selectedOrg) { @RequestParam String selectedExpert ,@RequestParam String selectedOrg) {
String condition="";
String regionName= chatService.getAuthRegionName();
// if("内部专家".equals(selectedExpert)) {
// if ("组织内".equals(selectedOrg) && StringUtils.isNotEmpty(regionName)) {
// condition = ",只能查找" + regionName;
// } else if ("组织外".equals(selectedOrg) && StringUtils.isNotEmpty(regionName)) {
// condition = ",排除" + regionName;
// }
// }
String urlAddr = "http://10.32.41.228:44501/scene_gateway/agent/open/5386f9144ee042578d1c0f66c2598d06"; String urlAddr = "http://10.32.41.25:40349/invoke/api/open/sse/unifiedcapability";
//String urlAddr = chatService.getUrl(selectedExpert); SseEmitter emitter = new SseEmitter(0L);
AgentChatParams agentChatParams = new AgentChatParams();
agentChatParams.setKeyword(chatMessage);
agentChatParams.setRequestId(getRequestId());
agentChatParams.setDialogId(dialogId);
agentChatParams.setHistory("5");
AgentChatRequest agentChatRequest = new AgentChatRequest();
agentChatRequest.setCapabilityCode("5386f9144ee042578d1c0f66c2598d06");
agentChatRequest.setCapabilityType("2");
agentChatRequest.setParams(agentChatParams);
//String urlAddr = "http://10.32.41.35:40517/scene_gateway/agent/83f77143b09c461993dd9a7db403eb94";
SseEmitter emitter = new SseEmitter(0L);
QuestionRequest questionRequest=new QuestionRequest();
questionRequest.setKeyword(chatMessage+condition);
questionRequest.setRequestId(getRequestId());
questionRequest.setDialogId(dialogId);
Conversations conversations=chatService.saveConversations(dialogId,chatMessage,selectedExpert); Conversations conversations=chatService.saveConversations(dialogId,chatMessage,selectedExpert);
Messages messagesQusetion=new Messages(); Messages messagesQusetion=new Messages();
messagesQusetion.setRequestId(dialogId); messagesQusetion.setRequestId(dialogId);
...@@ -914,57 +908,53 @@ public class LangChainController { ...@@ -914,57 +908,53 @@ public class LangChainController {
messagesContent.setParentMsgId(messagesQusetion.getId()); messagesContent.setParentMsgId(messagesQusetion.getId());
messagesContent.setSort(messagesQusetion.getSort()+1); messagesContent.setSort(messagesQusetion.getSort()+1);
chatService.insertMessage(conversations,messagesContent); chatService.insertMessage(conversations,messagesContent);
String params = JsonUtils.objectToJson(questionRequest); String params = JsonUtils.objectToJson(agentChatRequest);
StringBuffer content =new StringBuffer(); StringBuffer content =new StringBuffer();
StringBuffer lineCotent =new StringBuffer(); StringBuffer lineCotent =new StringBuffer();
new Thread(() -> { new Thread(() -> {
HttpURLConnection connection = null; CloseableHttpClient closeableHttpClient = httpClientPool.sseHttpClient();
HttpPost httpPost = new HttpPost(urlAddr);
try { try {
URL url = new URL(urlAddr);
// 建立链接 httpPost.setHeader("AuthToken", "4009fe23e6b648539792330c14f5ed8e");
connection = (HttpURLConnection) url.openConnection(); httpPost.setHeader("Content-Type", "application/json");
connection.setRequestMethod("GET");
connection.setRequestProperty("AuthToken", "4009fe23e6b648539792330c14f5ed8e"); httpPost.setEntity(new StringEntity(params, StandardCharsets.UTF_8));
// connection.setRequestProperty("AuthToken", "fc40db5b7abe47dabfe1899e61fde2d7"); logger.info("调用智宇智能体请求体:" + params);
// 允许输入和输出 boolean isSendStop = false;
connection.setDoInput(true); try (CloseableHttpResponse response = closeableHttpClient.execute(httpPost);
connection.setDoOutput(true); InputStream inputStream = response.getEntity().getContent();
// 设置超时为0,表示无限制 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))){
connection.setConnectTimeout(0); // 检查响应码
connection.setReadTimeout(0); int statusCode = response.getStatusLine().getStatusCode();
try (OutputStream os = connection.getOutputStream()) {
os.write(params.getBytes(StandardCharsets.UTF_8)); // 持续读取 SSE 数据流
os.flush();
}
// 检查响应码
int responseCode = connection.getResponseCode();
if (responseCode != HttpURLConnection.HTTP_OK) {
emitter.completeWithError(new RuntimeException("SSE 连接失败: " + responseCode));
return;
}
// 持续读取 SSE 数据流
try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
String line; String line;
while ((line = reader.readLine()) != null) { while ((line = reader.readLine()) != null) {
logger.info(line); logger.info("Response body:" + line);
if (!line.startsWith("data:CALLBACK#")) { if (!line.startsWith("data:CALLBACK#")) {
if (line.startsWith("data:")) { if (line.startsWith("data:")) {
String data = line.substring(5).trim(); String data = line.substring(5).trim();
if("stop".equals(data)) if ("stop".equals(data)) {
{ emitter.send(SseEmitter.event().data("stop"), MediaType.parseMediaType("application/json; charset=UTF-8"));
emitter.send(SseEmitter.event().data("stop"),MediaType.parseMediaType("application/json; charset=UTF-8")); isSendStop = true;
}else } else {
{
lineCotent.append(data); lineCotent.append(data);
String sendData=data.replace("attachment#[]#attachment","").replace("source#[]#source","") String sendData = data.replace("attachment#[]#attachment", "").replace("source#[]#source", "")
.replace("#","").replace("*",""); .replace("#", "").replace("*", "");
emitter.send(SseEmitter.event().data(sendData), MediaType.parseMediaType("application/json; charset=UTF-8")); emitter.send(SseEmitter.event().data(sendData), MediaType.parseMediaType("application/json; charset=UTF-8"));
if(!sendData.startsWith("SUGGEST[")) if (!sendData.startsWith("SUGGEST["))
content.append(sendData); content.append(sendData);
} }
} }
} }
} }
}catch (Exception e) {
emitter.completeWithError(e);
logger.error("调用智宇智能体接口异常: ", e);
}
if (!isSendStop){//如果没有发送stop,发送stop。
emitter.send(SseEmitter.event().data("stop"), MediaType.parseMediaType("application/json; charset=UTF-8"));
} }
emitter.complete(); // 流结束 emitter.complete(); // 流结束
} catch (Exception e) { } catch (Exception e) {
...@@ -973,9 +963,6 @@ public class LangChainController { ...@@ -973,9 +963,6 @@ public class LangChainController {
} finally { } finally {
messagesContent.setContent(content.toString()); messagesContent.setContent(content.toString());
chatService.updateMessage(messagesContent); chatService.updateMessage(messagesContent);
if (connection != null) {
connection.disconnect();
}
logger.info(lineCotent.toString()); logger.info(lineCotent.toString());
} }
}).start(); }).start();
...@@ -984,6 +971,7 @@ public class LangChainController { ...@@ -984,6 +971,7 @@ public class LangChainController {
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE) .header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
.body(emitter); .body(emitter);
} }
......
package com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class HttpClientPoolConfig {
/**
* 注册 SSE 专用的 HttpClient 实例
* (destroyMethod = "close" 确保 Spring 关闭时释放资源)
*/
@Bean(destroyMethod = "close")
public CloseableHttpClient sseHttpClient() {
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
connManager.setMaxTotal(1000); // 最大连接数
connManager.setDefaultMaxPerRoute(1000); // 单路由最大连接数 如果有多个地址,则需要减少
connManager.setValidateAfterInactivity(30_000); // 30秒检测空闲连接
RequestConfig sseConfig = RequestConfig.custom()
.setConnectTimeout(5000) // 连接超时 5s
.setSocketTimeout(0) // 无限等待(SSE 必须)
.setConnectionRequestTimeout(5000) // 从池获取连接的超时
.build();
return HttpClients.custom()
.setConnectionManager(connManager)
.setDefaultRequestConfig(sseConfig)
.disableAutomaticRetries() // 禁用自动重试
.evictExpiredConnections() // 自动清理过期连接
.evictIdleConnections(60, TimeUnit.SECONDS) // 60秒清理空闲连接
.build();
}
}
\ No newline at end of file
package com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class AgentChatParams {
/**
* keyword 用户对话问题内容 必填
*/
private String keyword;
/**
* 调用流水 时间戳+6位随机数 必填
*/
private String requestId;
/**
* 对话流水 (yyyyMMddHHmmssSSS)+6位随机数 必填
*/
private String dialogId;
/**
* 用户号码 String 选填 13888888888
*/
private String phoneNumber;
/**
* 用户归属地市标识 String 选填history 11,苏州
* 12,淮安
* 13,宿迁
* 14,南京
* 15,连云港
* 16,徐州
* 17,常州
* 18,镇江
* 19,无锡
* 20,南通
* 21,泰州
* 22,盐城
* 23,扬州
* 99,省公司
*/
private String region;
/**
* 多轮标记 选填 0不开启多轮,5.开启多轮,默认不开启
*/
private String history;
}
\ No newline at end of file
package com.infoepoch.pms.dispatchassistant.domain.langchain.record.conversation.line;
import lombok.Getter;
import lombok.Setter;
/**
*
*
* 能运智宇请求体Body
*/
@Getter
@Setter
public class AgentChatRequest {
/**
* 能力编码 所要调用的能力编码,样例如下:
* 0345d9faa1164c609ec847a1cc24d763
*/
private String capabilityCode;
/**
* 能力类型 capabilityType 所要调用的能力的类型
* 1:工作流
* 2:智能体
*/
private String capabilityType;
/**
* 调用参数对象 可自定义参数追加
*/
private AgentChatParams params;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment