From cfaf7b912638b2967a31ce8e5a6778d5870fe0e1 Mon Sep 17 00:00:00 2001 From: tanxing Date: Wed, 13 Aug 2025 15:12:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=AE=80=E5=8C=96=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deepsearcher/agent/deep_search.py | 37 ++-- deepsearcher/backend/templates/index.html | 51 +++--- deepsearcher/llm/openai_llm.py | 2 +- deepsearcher/utils/message_stream.py | 56 +++--- docs/message_stream_system.md | 197 ---------------------- main.py | 60 +++---- simplified_message_format.md | 180 ++++++++++++++++++++ test_fix.py | 38 +++++ 8 files changed, 327 insertions(+), 294 deletions(-) delete mode 100644 docs/message_stream_system.md create mode 100644 simplified_message_format.md create mode 100644 test_fix.py diff --git a/deepsearcher/agent/deep_search.py b/deepsearcher/agent/deep_search.py index 61d2b4d..95ee890 100644 --- a/deepsearcher/agent/deep_search.py +++ b/deepsearcher/agent/deep_search.py @@ -2,7 +2,7 @@ from deepsearcher.agent.base import BaseAgent, describe_class from deepsearcher.embedding.base import BaseEmbedding from deepsearcher.llm.base import BaseLLM from deepsearcher.utils import log -from deepsearcher.utils.message_stream import send_search, send_think, send_answer +from deepsearcher.utils.message_stream import send_info, send_answer from deepsearcher.vector_db import RetrievalResult from deepsearcher.vector_db.base import BaseVectorDB, deduplicate from collections import defaultdict @@ -22,7 +22,8 @@ COLLECTION_ROUTE_PROMPT = """ SUB_QUERY_PROMPT = """ -为了能够全面的回答这个问题,请你尝试把原本的问题拆分或扩展为多个子问题 +为了能够全面的回答这个问题,请你尝试把原本的问题拆分或扩展为几个子问题 +不可以太多,但是也不可以太少,请根据问题复杂程度来决定子问题的数量 如果原问题本身非常简单,没有必要进行拆分,则保留输出原问题本身 需要保证每个子问题都具体、清晰、不可分(原子性),最终返回一个字符串列表 @@ -180,7 +181,7 @@ class DeepSearch(BaseAgent): if len(collection_infos) == 1: the_only_collection = collection_infos[0].collection_name log.color_print( - f" Perform search [{query}] on the vector DB collection: {the_only_collection} \n" + f"Perform search [{query}] on the vector DB collection: {the_only_collection}\n" ) return [the_only_collection] vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format( @@ -207,7 +208,7 @@ class DeepSearch(BaseAgent): selected_collections.append(collection_info.collection_name) selected_collections = list(set(selected_collections)) log.color_print( - f" Perform search [{query}] on the vector DB collections: {selected_collections} \n" + f"Perform search [{query}] on the vector DB collections: {selected_collections}\n" ) return selected_collections @@ -231,12 +232,12 @@ class DeepSearch(BaseAgent): all_retrieved_results = [] query_vector = self.embedding_model.embed_query(query) for collection in selected_collections: - send_search(f"Search [{query}] in [{collection}]...") + send_info(f"正在 [{collection}] 中搜索 [{query}] ...") retrieved_results = self.vector_db.search_data( collection=collection, vector=query_vector, query_text=query ) if not retrieved_results or len(retrieved_results) == 0: - send_search(f"No relevant document chunks found in '{collection}'!") + send_info(f"'{collection}' 中没有找到相关文档!") continue # Format all chunks for batch processing @@ -287,9 +288,9 @@ class DeepSearch(BaseAgent): references.add(retrieved_result.reference) if accepted_chunk_num > 0: - send_search(f"Accept {accepted_chunk_num} document chunk(s) from references: {list(references)}") + send_info(f"采纳 {accepted_chunk_num} 个文档片段,来源:{list(references)}") else: - send_search(f"No document chunk accepted from '{collection}'!") + send_info(f"没有采纳任何 '{collection}' 中找到的文档片段!") return all_retrieved_results def _generate_more_sub_queries( @@ -327,7 +328,6 @@ class DeepSearch(BaseAgent): max_iter = kwargs.get('max_iter', self.max_iter) ### SUB QUERIES ### - send_think(f" {original_query} ") all_search_results = [] all_sub_queries = [] @@ -336,11 +336,11 @@ class DeepSearch(BaseAgent): log.color_print("No sub queries were generated by the LLM. Exiting.") return [], {} else: - send_think(f"Break down the original query into new sub queries: {sub_queries}") + send_info(f"原问题被拆分为这些子问题: {sub_queries}") all_sub_queries.extend(sub_queries) for it in range(max_iter): - send_think(f">> Iteration: {it + 1}") + send_info(f"第 {it + 1} 轮搜索:") # Execute all search tasks sequentially for query in sub_queries: @@ -350,25 +350,25 @@ class DeepSearch(BaseAgent): all_search_results = deduplicate(all_search_results) deduped_len = len(all_search_results) if undeduped_len - deduped_len != 0: - send_search(f"Remove {undeduped_len - deduped_len} duplicates") + send_info(f"移除 {undeduped_len - deduped_len} 个重复文档片段") # search_res_from_internet = deduplicate_results(search_res_from_internet) # all_search_res.extend(search_res_from_vectordb + search_res_from_internet) ### REFLECTION & GET MORE SUB QUERIES ### # Only generate more queries if we haven't reached the maximum iterations if it + 1 < max_iter: - send_think("Reflecting on the search results...") + send_info("正在根据文档片段思考 ...") sub_queries = self._generate_more_sub_queries( original_query, all_sub_queries, all_search_results ) if not sub_queries or len(sub_queries) == 0: - send_think("No new search queries were generated. Exiting.") + send_info("没能生成更多的子问题,正在退出 ....") break else: - send_think(f"New search queries for next iteration: {sub_queries}") + send_info(f"下一轮搜索的子问题: {sub_queries}") all_sub_queries.extend(sub_queries) else: - send_think("Reached maximum iterations. Exiting.") + send_info("已达到最大搜索轮数,正在退出 ...") break all_search_results = deduplicate(all_search_results) @@ -392,10 +392,10 @@ class DeepSearch(BaseAgent): """ all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs) if not all_retrieved_results or len(all_retrieved_results) == 0: - send_think(f"No relevant information found for query '{original_query}'.") + send_info(f"'{original_query}'没能找到更多信息!") return "", [] chunks = self._format_chunks(all_retrieved_results) - send_think(f"Summarize answer from all {len(all_retrieved_results)} retrieved chunks...") + send_info(f"正在总结 {len(all_retrieved_results)} 个查找到的文档片段") summary_prompt = SUMMARY_PROMPT.format( original_query=original_query, all_sub_queries=all_sub_queries, @@ -403,7 +403,6 @@ class DeepSearch(BaseAgent): ) response = self.llm.chat([{"role": "user", "content": summary_prompt}]) final_answer = self.llm.remove_think(response) - # 直接发送最终答案,不发送占位符 send_answer(final_answer) return self.llm.remove_think(response), all_retrieved_results diff --git a/deepsearcher/backend/templates/index.html b/deepsearcher/backend/templates/index.html index 607d467..45714a8 100644 --- a/deepsearcher/backend/templates/index.html +++ b/deepsearcher/backend/templates/index.html @@ -214,12 +214,12 @@ line-height: 1.4; } - .message-search { + .message-start { background-color: #f0f9ff; border-left-color: var(--info-color); } - .message-think { + .message-info { background-color: #fef3c7; border-left-color: var(--warning-color); } @@ -229,6 +229,16 @@ border-left-color: var(--success-color); } + .message-complete { + background-color: #dbeafe; + border-left-color: var(--primary-color); + } + + .message-error { + background-color: #fee2e2; + border-left-color: var(--error-color); + } + .message-timestamp { font-size: 12px; color: var(--text-secondary); @@ -473,13 +483,15 @@ case 'heartbeat': // 心跳消息,不需要处理 break; - case 'query_start': - console.log('Query started:', message.query); - showStatus('queryStatus', '查询已开始,正在处理...', 'loading'); + case 'start': + console.log('Query started:', message.content); + showStatus('queryStatus', '正在处理...', 'loading'); + addMessageToContainer(message); break; case 'complete': console.log('Query completed - closing connection'); showStatus('queryStatus', '查询完成', 'success'); + addMessageToContainer(message); // 关闭EventSource连接 if (window.currentEventSource) { console.log('Closing currentEventSource'); @@ -490,20 +502,10 @@ setButtonLoading(document.getElementById('queryBtn'), false); console.log('Query completed - connection closed, isStreaming set to false'); break; - case 'query_error': - console.error('Query error:', message.error); - showStatus('queryStatus', `查询失败: ${message.error}`, 'error'); - // 关闭EventSource连接 - if (window.currentEventSource) { - window.currentEventSource.close(); - window.currentEventSource = null; - } - isStreaming = false; - setButtonLoading(document.getElementById('queryBtn'), false); - break; - case 'stream_error': - console.error('Stream error:', message.error); - showStatus('queryStatus', `流错误: ${message.error}`, 'error'); + case 'error': + console.error('Error:', message.content); + showStatus('queryStatus', message.content, 'error'); + addMessageToContainer(message); // 关闭EventSource连接 if (window.currentEventSource) { window.currentEventSource.close(); @@ -512,15 +514,14 @@ isStreaming = false; setButtonLoading(document.getElementById('queryBtn'), false); break; - case 'search': - case 'think': - // 处理常规消息 - console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...'); + case 'info': + // 处理信息消息 + console.log('Processing info message:', message.content.substring(0, 100) + '...'); addMessageToContainer(message); break; case 'answer': // 处理answer类型,显示查询结果 - console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...'); + console.log('Processing answer message:', message.content.substring(0, 100) + '...'); // 将结果内容显示在结果区域 if (message.content && message.content !== "==== FINAL ANSWER====") { document.getElementById('resultText').textContent = message.content; @@ -720,7 +721,7 @@ eventSource.onopen = function(event) { console.log('EventSource connection opened for query'); - showStatus('queryStatus', '查询已开始,正在处理...', 'loading'); + showStatus('queryStatus', '正在处理...', 'loading'); }; eventSource.onmessage = function(event) { diff --git a/deepsearcher/llm/openai_llm.py b/deepsearcher/llm/openai_llm.py index dd4f378..1af6046 100644 --- a/deepsearcher/llm/openai_llm.py +++ b/deepsearcher/llm/openai_llm.py @@ -52,7 +52,7 @@ class OpenAILLM(BaseLLM): top_p=0.8, presence_penalty=1.2 ) as stream: - # stream仅做测试,不需要发送到前端 + # stream到控制台测试 content = "" reasoning_content = "" for chunk in stream: diff --git a/deepsearcher/utils/message_stream.py b/deepsearcher/utils/message_stream.py index dfc364d..61b06bb 100644 --- a/deepsearcher/utils/message_stream.py +++ b/deepsearcher/utils/message_stream.py @@ -7,16 +7,17 @@ from datetime import datetime class MessageType(Enum): - """消息类型枚举""" - SEARCH = "search" - THINK = "think" - ANSWER = "answer" - COMPLETE = "complete" + """简化的消息类型枚举""" + START = "start" # 开始消息 + INFO = "info" # 信息消息 + ANSWER = "answer" # 答案消息 + COMPLETE = "complete" # 完成消息 + ERROR = "error" # 错误消息 @dataclass class Message: - """消息数据结构""" + """简化的消息数据结构""" type: MessageType content: str timestamp: float @@ -37,7 +38,7 @@ class Message: class MessageStream: - """消息流管理器""" + """简化的消息流管理器""" def __init__(self): self._messages: List[Message] = [] @@ -82,21 +83,25 @@ class MessageStream: except Exception as e: print(f"Error in message callback: {e}") - def send_search(self, content: str, metadata: Optional[Dict[str, Any]] = None): - """发送搜索消息""" - self.send_message(MessageType.SEARCH, content, metadata) + def send_start(self, content: str, metadata: Optional[Dict[str, Any]] = None): + """发送开始消息""" + self.send_message(MessageType.START, content, metadata) - def send_think(self, content: str, metadata: Optional[Dict[str, Any]] = None): - """发送思考消息""" - self.send_message(MessageType.THINK, content, metadata) + def send_info(self, content: str, metadata: Optional[Dict[str, Any]] = None): + """发送信息消息""" + self.send_message(MessageType.INFO, content, metadata) def send_answer(self, content: str, metadata: Optional[Dict[str, Any]] = None): """发送答案消息""" self.send_message(MessageType.ANSWER, content, metadata) - def send_complete(self, metadata: Optional[Dict[str, Any]] = None): + def send_complete(self, content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None): """发送完成消息""" - self.send_message(MessageType.COMPLETE, "", metadata) + self.send_message(MessageType.COMPLETE, content, metadata) + + def send_error(self, content: str, metadata: Optional[Dict[str, Any]] = None): + """发送错误消息""" + self.send_message(MessageType.ERROR, content, metadata) def get_messages(self) -> List[Message]: """获取所有消息""" @@ -123,14 +128,14 @@ class MessageStream: message_stream = MessageStream() -def send_search(content: str, metadata: Optional[Dict[str, Any]] = None): - """全局搜索消息发送函数""" - message_stream.send_search(content, metadata) +def send_start(content: str, metadata: Optional[Dict[str, Any]] = None): + """全局开始消息发送函数""" + message_stream.send_start(content, metadata) -def send_think(content: str, metadata: Optional[Dict[str, Any]] = None): - """全局思考消息发送函数""" - message_stream.send_think(content, metadata) +def send_info(content: str, metadata: Optional[Dict[str, Any]] = None): + """全局信息消息发送函数""" + message_stream.send_info(content, metadata) def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None): @@ -138,9 +143,14 @@ def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None): message_stream.send_answer(content, metadata) -def send_complete(metadata: Optional[Dict[str, Any]] = None): +def send_complete(content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None): """全局完成消息发送函数""" - message_stream.send_complete(metadata) + message_stream.send_complete(content, metadata) + + +def send_error(content: str, metadata: Optional[Dict[str, Any]] = None): + """全局错误消息发送函数""" + message_stream.send_error(content, metadata) def get_message_stream() -> MessageStream: diff --git a/docs/message_stream_system.md b/docs/message_stream_system.md deleted file mode 100644 index b572003..0000000 --- a/docs/message_stream_system.md +++ /dev/null @@ -1,197 +0,0 @@ -# 消息流系统 (Message Stream System) - -## 概述 - -DeepSearcher 的消息流系统是一个新的消息传输机制,用于替代原来的日志传输方式。该系统支持三种类型的消息:`search`、`think` 和 `answer`,并提供了灵活的消息管理和传输功能。 - -## 消息类型 - -### 1. Search 消息 -- **类型**: `search` -- **用途**: 表示搜索相关的操作和状态 -- **示例**: - - "在向量数据库中搜索相关信息..." - - "找到5个相关文档片段" - - "搜索人工智能的定义..." - -### 2. Think 消息 -- **类型**: `think` -- **用途**: 表示思考和推理过程 -- **示例**: - - "开始处理查询: 什么是人工智能?" - - "分析搜索结果..." - - "生成子查询: 人工智能的定义、历史、应用" - -### 3. Answer 消息 -- **类型**: `answer` -- **用途**: 表示最终答案和结果 -- **示例**: - - "==== FINAL ANSWER====" - - "人工智能是计算机科学的一个分支..." - -## 核心组件 - -### MessageStream 类 - -主要的消息流管理器,提供以下功能: - -```python -from deepsearcher.utils.message_stream import MessageStream - -# 创建消息流实例 -message_stream = MessageStream() - -# 发送消息 -message_stream.send_search("搜索内容...") -message_stream.send_think("思考内容...") -message_stream.send_answer("答案内容...") - -# 获取消息 -messages = message_stream.get_messages() -messages_dict = message_stream.get_messages_as_dicts() -messages_json = message_stream.get_messages_as_json() -``` - -### 全局函数 - -为了方便使用,提供了全局函数: - -```python -from deepsearcher.utils.message_stream import send_search, send_think, send_answer - -# 直接发送消息 -send_search("搜索内容...") -send_think("思考内容...") -send_answer("答案内容...") -``` - -## API 接口 - -### 1. 获取消息 -``` -GET /messages/ -``` - -返回所有消息的列表: -```json -{ - "messages": [ - { - "type": "search", - "content": "搜索内容...", - "timestamp": 1755043653.9606102, - "metadata": {} - } - ] -} -``` - -### 2. 清空消息 -``` -POST /clear-messages/ -``` - -清空所有消息并返回成功状态: -```json -{ - "message": "Messages cleared successfully" -} -``` - -### 3. 查询接口(已更新) -``` -GET /query/?original_query=&max_iter= -``` - -现在返回结果包含消息流: -```json -{ - "result": "最终答案...", - "messages": [ - { - "type": "search", - "content": "搜索内容...", - "timestamp": 1755043653.9606102, - "metadata": {} - } - ] -} -``` - -## 前端集成 - -前端界面现在包含一个消息流显示区域,实时显示处理过程中的各种消息: - -### CSS 样式 -- `.message-search`: 搜索消息样式(蓝色边框) -- `.message-think`: 思考消息样式(黄色边框) -- `.message-answer`: 答案消息样式(绿色边框) - -### JavaScript 功能 -- `displayMessages(messages)`: 显示消息流 -- 自动滚动到最新消息 -- 时间戳显示 - -## 使用示例 - -### 后端使用 - -```python -from deepsearcher.utils.message_stream import send_search, send_think, send_answer - -# 在搜索过程中发送消息 -send_think("开始处理查询...") -send_search("在数据库中搜索...") -send_search("找到相关文档...") -send_think("分析结果...") -send_answer("最终答案...") -``` - -### 前端使用 - -```javascript -// 获取消息 -const response = await fetch('/query/?original_query=test&max_iter=3'); -const data = await response.json(); - -// 显示消息流 -if (data.messages && data.messages.length > 0) { - displayMessages(data.messages); -} -``` - -## 优势 - -1. **结构化数据**: 消息包含类型、内容、时间戳和元数据 -2. **类型安全**: 使用枚举确保消息类型的一致性 -3. **灵活传输**: 支持多种输出格式(字典、JSON) -4. **实时显示**: 前端可以实时显示处理过程 -5. **易于扩展**: 可以轻松添加新的消息类型和功能 - -## 迁移说明 - -从原来的日志系统迁移到新的消息流系统: - -### 原来的代码 -```python -log.color_print(f" Search [{query}] in [{collection}]... \n") -log.color_print(f" Summarize answer... \n") -log.color_print(f" Final answer... \n") -``` - -### 新的代码 -```python -send_search(f"Search [{query}] in [{collection}]...") -send_think("Summarize answer...") -send_answer("Final answer...") -``` - -## 测试 - -运行测试脚本验证系统功能: - -```bash -python test_message_stream.py -``` - -这将测试消息流的基本功能,包括消息发送、获取和格式化。 diff --git a/main.py b/main.py index e276550..58fd9f2 100644 --- a/main.py +++ b/main.py @@ -228,7 +228,6 @@ def perform_query( # 清空之前的消息 message_stream = get_message_stream() message_stream.clear_messages() - result_text, _ = query(original_query, max_iter) return { @@ -270,13 +269,13 @@ async def perform_query_stream( # 清空之前的消息 message_stream = get_message_stream() message_stream.clear_messages() - + # 发送查询开始消息 - yield f"data: {json.dumps({'type': 'query_start', 'query': original_query, 'max_iter': max_iter}, ensure_ascii=False)}\n\n" - + yield f"data: {json.dumps({'type': 'start', 'content': '开始查询'}, ensure_ascii=False)}\n\n" + # 创建一个线程安全的队列来接收消息 message_queue = queue.Queue() - + def message_callback(message): """消息回调函数""" try: @@ -284,26 +283,29 @@ async def perform_query_stream( message_queue.put(message) except Exception as e: print(f"Error in message callback: {e}") - + # 注册回调函数 message_stream.add_callback(message_callback) - + # 在后台线程中执行查询 def run_query(): try: print(f"Starting query: {original_query} with max_iter: {max_iter}") - result_text, _ = query(original_query, max_iter) + result_text, retrieval_results = query(original_query, max_iter) print(f"Query completed with result length: {len(result_text) if result_text else 0}") + print(f"Retrieved {len(retrieval_results) if retrieval_results else 0} documents") return result_text, None except Exception as e: + import traceback print(f"Query failed with error: {e}") + print(f"Traceback: {traceback.format_exc()}") return None, str(e) - + # 使用线程池执行查询 import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: query_future = executor.submit(run_query) - + # 监听消息和查询结果 while True: try: @@ -312,17 +314,17 @@ async def perform_query_stream( result_text, error = query_future.result() if error: print(f"Query error: {error}") - yield f"data: {json.dumps({'type': 'query_error', 'error': error}, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps({'type': 'error', 'content': f'查询失败: {error}'}, ensure_ascii=False)}\n\n" return # 成功完成时,发送complete消息并结束 print("Query completed successfully, sending complete message") - yield f"data: {json.dumps({'type': 'complete'}, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps({'type': 'complete', 'content': '查询完成'}, ensure_ascii=False)}\n\n" print("Complete message sent, ending stream") return - + # 尝试从队列获取消息,设置超时 try: - message = message_queue.get(timeout=0.5) + message = message_queue.get(timeout=2.0) message_data = { 'type': message.type.value, 'content': message.content, @@ -333,19 +335,19 @@ async def perform_query_stream( except queue.Empty: # 超时,继续循环 continue - + except Exception as e: print(f"Error in stream loop: {e}") - yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps({'type': 'error', 'content': f'流处理错误: {str(e)}'}, ensure_ascii=False)}\n\n" return - + except Exception as e: - yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps({'type': 'error', 'content': f'查询错误: {str(e)}'}, ensure_ascii=False)}\n\n" finally: # 清理回调函数 if message_callback: message_stream.remove_callback(message_callback) - + return StreamingResponse( query_stream_generator(), media_type="text/event-stream", @@ -379,7 +381,7 @@ def get_messages(): async def stream_messages() -> StreamingResponse: """ Stream messages in real-time using Server-Sent Events. - + Returns: StreamingResponse: A streaming response with real-time messages. """ @@ -387,7 +389,7 @@ async def stream_messages() -> StreamingResponse: """生成SSE事件流""" # 创建一个队列来接收消息 message_queue = asyncio.Queue() - + def message_callback(message): """消息回调函数""" try: @@ -395,21 +397,21 @@ async def stream_messages() -> StreamingResponse: asyncio.create_task(message_queue.put(message)) except Exception as e: print(f"Error in message callback: {e}") - + # 注册回调函数 message_callbacks.append(message_callback) message_stream = get_message_stream() message_stream.add_callback(message_callback) - + try: # 发送连接建立消息 yield f"data: {json.dumps({'type': 'connection', 'message': 'Connected to message stream'}, ensure_ascii=False)}\n\n" - + while True: try: # 等待新消息,设置超时以便能够检查连接状态 - message = await asyncio.wait_for(message_queue.get(), timeout=1.0) - + message = await asyncio.wait_for(message_queue.get(), timeout=2.0) + # 发送消息数据 message_data = { 'type': message.type.value, @@ -418,11 +420,11 @@ async def stream_messages() -> StreamingResponse: 'metadata': message.metadata or {} } yield f"data: {json.dumps(message_data, ensure_ascii=False)}\n\n" - + except asyncio.TimeoutError: # 发送心跳保持连接 yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': asyncio.get_event_loop().time()}, ensure_ascii=False)}\n\n" - + except Exception as e: print(f"Error in event generator: {e}") finally: @@ -430,7 +432,7 @@ async def stream_messages() -> StreamingResponse: if message_callback in message_callbacks: message_callbacks.remove(message_callback) message_stream.remove_callback(message_callback) - + return StreamingResponse( event_generator(), media_type="text/event-stream", diff --git a/simplified_message_format.md b/simplified_message_format.md new file mode 100644 index 0000000..a0440e2 --- /dev/null +++ b/simplified_message_format.md @@ -0,0 +1,180 @@ +# 简化消息格式 + +## 概述 + +为了简化前后端的消息通信,我们将原来的多种消息类型统一为简单的 `type` 和 `content` 格式。 + +## 消息格式 + +所有消息都遵循以下统一格式: + +```json +{ + "type": "消息类型", + "content": "消息内容", + "timestamp": 时间戳, + "metadata": {} +} +``` + +## 消息类型 + +### 1. start - 开始消息 +- **用途**: 表示查询或操作的开始 +- **示例**: +```json +{ + "type": "start", + "content": "开始处理查询: 请写一篇关于Milvus向量数据库的报告" +} +``` + +### 2. info - 信息消息 +- **用途**: 表示处理过程中的信息、状态更新、迭代信息等 +- **示例**: +```json +{ + "type": "info", + "content": "iteration 1: 正在搜索相关文档..." +} +``` + +### 3. answer - 答案消息 +- **用途**: 表示最终答案或重要结果 +- **示例**: +```json +{ + "type": "answer", + "content": "Milvus的详细报告: ..." +} +``` + +### 4. complete - 完成消息 +- **用途**: 表示操作完成 +- **示例**: +```json +{ + "type": "complete", + "content": "查询完成" +} +``` + +### 5. error - 错误消息 +- **用途**: 表示错误信息 +- **示例**: +```json +{ + "type": "error", + "content": "查询失败: 无法连接到数据库" +} +``` + +## 使用示例 + +### 后端使用 + +```python +from deepsearcher.utils.message_stream import ( + send_start, send_info, send_answer, send_complete, send_error +) + +# 发送开始消息 +send_start("开始处理查询: 请写一篇关于Milvus的报告") + +# 发送信息消息 +send_info("iteration 1: 正在搜索相关文档...") +send_info("找到5个相关文档片段") + +# 发送答案消息 +send_answer("Milvus的详细报告: ...") + +# 发送完成消息 +send_complete("查询完成") + +# 发送错误消息 +send_error("查询失败: 无法连接到数据库") +``` + +### 前端处理 + +```javascript +// 处理消息流 +function handleStreamMessage(data) { + const message = JSON.parse(data); + + switch (message.type) { + case 'start': + console.log('开始:', message.content); + break; + case 'info': + console.log('信息:', message.content); + break; + case 'answer': + console.log('答案:', message.content); + break; + case 'complete': + console.log('完成:', message.content); + break; + case 'error': + console.error('错误:', message.content); + break; + } +} +``` + +## 优势 + +1. **简化格式**: 统一的消息格式,易于理解和处理 +2. **类型清晰**: 5种基本类型覆盖所有使用场景 +3. **易于扩展**: 可以轻松添加新的消息类型 +4. **前端友好**: 前端处理逻辑更加简单 +5. **调试方便**: 消息格式清晰,便于调试和日志记录 + +## 迁移说明 + +### 从旧格式迁移 + +| 旧类型 | 新类型 | 说明 | +|--------|--------|------| +| `search` | `info` | 搜索相关信息 | +| `think` | `info` | 思考过程信息 | +| `answer` | `answer` | 保持不变 | +| `complete` | `complete` | 保持不变 | +| `query_start` | `start` | 查询开始 | +| `query_error` | `error` | 查询错误 | +| `stream_error` | `error` | 流错误 | + +### 代码更新 + +所有使用旧消息类型的代码都需要更新: + +```python +# 旧代码 +send_search("搜索内容...") +send_think("思考内容...") + +# 新代码 +send_info("搜索内容...") +send_info("思考内容...") +``` + +## 测试 + +可以使用以下方式测试消息格式: + +```python +from deepsearcher.utils.message_stream import get_message_stream + +# 获取消息流实例 +message_stream = get_message_stream() + +# 获取所有消息 +messages = message_stream.get_messages_as_dicts() + +# 验证格式 +for msg in messages: + assert 'type' in msg + assert 'content' in msg + assert 'timestamp' in msg + print(f"类型: {msg['type']}, 内容: {msg['content']}") +``` diff --git a/test_fix.py b/test_fix.py new file mode 100644 index 0000000..5fb3044 --- /dev/null +++ b/test_fix.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +""" +测试修复后的查询功能 +""" + +import sys +import os +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from deepsearcher.configuration import Configuration, init_config +from deepsearcher.online_query import query + +def test_query_fix(): + """测试修复后的查询功能""" + print("=== 测试修复后的查询功能 ===") + + # 初始化配置 + config = Configuration() + init_config(config) + + try: + print("开始查询...") + result_text, retrieval_results = query("什么是Milvus?", max_iter=1) + + print(f"查询完成!") + print(f"结果长度: {len(result_text) if result_text else 0}") + print(f"检索结果数量: {len(retrieval_results) if retrieval_results else 0}") + + if result_text: + print(f"结果预览: {result_text[:200]}...") + + except Exception as e: + import traceback + print(f"查询失败: {e}") + print(f"错误详情: {traceback.format_exc()}") + +if __name__ == "__main__": + test_query_fix()