func (f *FreeAskUsecase) freeAskHandle(ctx context.Context, req *FreeAskReq, output *FreeAskResp) { // 上下文控制 ctx, cancel := context.WithTimeout(ctx, time.Minute*2) defer cancel() // 初始化请求管道 modelCh := make(chan *ModelResponse, 10) interruptCh := make(chan struct{}, 1) var msgIdStr string var msg *Message // 前置处理:创建消息、安全审核等 msg, err := f.createInitialMessage(ctx, req) if err != nil { f.log.Errorf("创建初始消息失败: %v", err) output.Status = StatusFailed output.Message = "创建消息失败" return } msgIdStr = msg.Msg.Id output.MsgId = msgIdStr // 获取历史对话记录 chatHistory, err := f.repo.GetRecentChatHistory(ctx, req.TalId, req.SubjectId, 2) if err != nil { f.log.Warnf("获取历史对话记录失败: %v", err) // 继续处理,不影响主流程 } // 构建消息历史 messages := make([]Message, 0, len(chatHistory)*2+1) // 系统提示词 systemPrompt := f.buildSystemPrompt(req.SubjectId, intent, intentDetails) messages = append(messages, Message{ Role: "system", // 系统角色 Content: systemPrompt, }) // 添加历史对话 for _, chat := range chatHistory { // 用户问题 messages = append(messages, Message{ Role: "user", // 用户角色 Content: chat.Question, }) // AI回答 messages = append(messages, Message{ Role: "assistant", // 模型角色 Content: chat.Answer, }) } // 添加当前问题 messages = append(messages, Message{ Role: "user", Content: req.Question, }) // 用户意图识别 intent, intentDetails := f.recognizeUserIntent(ctx, req.Question) f.log.Infof("用户意图识别结果: %s, 详情: %+v", intent, intentDetails) // 根据意图处理特殊请求 if f.handleSpecialIntent(ctx, intent, intentDetails, req, output) { // 如果特殊意图已处理完毕,直接返回 return } // 安全审核 safeResult, err := f.safetyCheck(ctx, req.Question) if err != nil || !safeResult.IsSafe { f.log.Errorf("内容安全审核未通过: %v", err) output.Status = StatusRejected output.Message = "内容包含不安全信息,请修改后重试" // 更新消息状态为拒绝 f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusRejected) return } // 2. 创建中断监听 // 用户可能会打断模型输出 go f.listenForInterruption(ctx, req.TalId, msgIdStr, interruptCh) // 3. 构建模型提示词 // 将用户意图信息添加到提示词中 promptOptions := &PromptOptions{ Intent: intent, IntentDetails: intentDetails, } prompt, err := f.buildPromptWithOptions(ctx, req, promptOptions) if err != nil { f.log.Errorf("构建提示词失败: %v", err) output.Status = StatusFailed output.Message = "系统处理异常" return } // 创建DeepSeek-R1模型请求 modelRequest := &DeepSeekModelRequest{ Model: "deepseek-r1", Messages: messages, MaxTokens: 2048, Temperature: 0.7, Stream: true, // 流式输出 } // 构建模型上下文 modelCtx, modelCancel := context.WithCancel(ctx) defer modelCancel() // 添加中断处理 go func() { select { case <-interruptCh: // 接收到中断信号,取消模型请求 modelCancel() case <-ctx.Done(): // 上下文已结束 return } }() // 创建响应通道 modelCh := make(chan *DeepSeekResponse, 10) // 异步调用模型 go func() { defer close(modelCh) // 调用DeepSeek-R1模型进行流式生成 err := f.deepSeekClient.GenerateStream(modelCtx, modelRequest, func(chunk *DeepSeekChunk) error { if chunk.Error != nil { modelCh <- &DeepSeekResponse{ Error: chunk.Error, } return chunk.Error } // 处理模型流式响应 modelCh <- &DeepSeekResponse{ Content: chunk.Content, IsFinal: chunk.IsFinal, ToolCalls: chunk.ToolCalls, GeneratedText: chunk.GeneratedText, Usage: chunk.Usage, } return nil }) if err != nil && !errors.Is(err, context.Canceled) { f.log.Errorf("DeepSeek-R1模型调用失败: %v", err) modelCh <- &DeepSeekResponse{ Error: err, } } }() // 5. 处理模型响应 var fullContent strings.Builder isFirstChunk := true for { select { case <-ctx.Done(): // 处理超时 f.log.Warnf("请求处理超时: %s", msgIdStr) output.Status = StatusTimeout output.Message = "处理超时,请稍后重试" // 通过SSE发送超时事件 sseWriter.WriteEvent(&SSEEvent{ Event: "timeout", Data: map[string]interface{}{ "msg_id": msgIdStr, "message": "处理超时,请稍后重试", }, }) // 更新消息状态 f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusFailed) return case resp, ok := <-modelCh: if !ok { // 处理响应结束 goto END } // 处理模型返回的错误 if resp.Error != nil { f.log.Errorf("模型返回错误: %v", resp.Error) output.Status = StatusFailed output.Message = "AI生成回答失败" // 通过SSE发送错误事件 sseWriter.WriteEvent(&SSEEvent{ Event: "error", Data: map[string]interface{}{ "msg_id": msgIdStr, "message": "AI生成回答失败", }, }) f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusFailed) return } // 处理模型返回的数据包 // 追加内容、安全检查、发送给客户端等 content := resp.Content // 安全检查每个片段 if len(content) > 0 { safeResult, _ := f.safetyCheck(ctx, content) if !safeResult.IsSafe { f.log.Warnf("模型回复内容存在安全风险: %s", content) content = "对不起,我无法提供这方面的回答。" } } // 追加到完整内容 fullContent.WriteString(content) // 如果是第一个数据包,更新消息状态为进行中 if isFirstChunk { isFirstChunk = false f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusInProgress) // 返回初始响应给客户端 output.Status = StatusSuccess output.AnswerBegin = content // 通过SSE发送开始事件 sseWriter.WriteEvent(&SSEEvent{ Event: "answer_begin", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": content, }, }) } else { // 非首个数据包,通过SSE发送内容片段 sseWriter.WriteEvent(&SSEEvent{ Event: "answer_chunk", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": content, }, }) } // 如果响应中包含特殊标记,处理特殊逻辑 if resp.HasSpecialFunction { f.handleSpecialFunction(ctx, resp.SpecialFunction, msgIdStr, req.TalId, sseWriter) } case _, ok := <-interruptCh: if !ok { continue } // 处理用户中断 f.log.Infof("用户中断请求: %s", msgIdStr) msg.Msg.IsInterrupt = 1 // 通过SSE发送中断事件 sseWriter.WriteEvent(&SSEEvent{ Event: "interrupted", Data: map[string]interface{}{ "msg_id": msgIdStr, }, }) f.handelInterrupt(ctx, msgIdStr, msg.SubjectId) goto END } } END: // 处理结束逻辑 // 生成提示词、更新消息等 finalContent := fullContent.String() // 更新最终消息内容和状态 err = f.repo.UpdateMessageContent(ctx, msgIdStr, finalContent) if err != nil { f.log.Errorf("更新消息内容失败: %v", err) } if msg.Msg.IsInterrupt == 0 { // 正常结束 f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusCompleted) // 发送完成事件 sseWriter.WriteEvent(&SSEEvent{ Event: "answer_complete", Data: map[string]interface{}{ "msg_id": msgIdStr, "content": finalContent, }, }) // 记录会话历史 f.updateSessionHistory(ctx, req.TalId, req.Question, finalContent, msgIdStr) } else { // 中断结束 f.repo.UpdateMessageStatus(ctx, msgIdStr, MessageStatusInterrupted) } // 设置输出信息 output.FullAnswer = finalContent output.Status = StatusSuccess output.Suggestions = suggestions output.Knowledge = knowledge}