diff --git a/deepsearcher/agent/deep_search.py b/deepsearcher/agent/deep_search.py
index 61d2b4d..95ee890 100644
--- a/deepsearcher/agent/deep_search.py
+++ b/deepsearcher/agent/deep_search.py
@@ -2,7 +2,7 @@ from deepsearcher.agent.base import BaseAgent, describe_class
from deepsearcher.embedding.base import BaseEmbedding
from deepsearcher.llm.base import BaseLLM
from deepsearcher.utils import log
-from deepsearcher.utils.message_stream import send_search, send_think, send_answer
+from deepsearcher.utils.message_stream import send_info, send_answer
from deepsearcher.vector_db import RetrievalResult
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
from collections import defaultdict
@@ -22,7 +22,8 @@ COLLECTION_ROUTE_PROMPT = """
SUB_QUERY_PROMPT = """
-为了能够全面的回答这个问题,请你尝试把原本的问题拆分或扩展为多个子问题
+为了能够全面的回答这个问题,请你尝试把原本的问题拆分或扩展为几个子问题
+不可以太多,但是也不可以太少,请根据问题复杂程度来决定子问题的数量
如果原问题本身非常简单,没有必要进行拆分,则保留输出原问题本身
需要保证每个子问题都具体、清晰、不可分(原子性),最终返回一个字符串列表
@@ -180,7 +181,7 @@ class DeepSearch(BaseAgent):
if len(collection_infos) == 1:
the_only_collection = collection_infos[0].collection_name
log.color_print(
- f" Perform search [{query}] on the vector DB collection: {the_only_collection} \n"
+ f"Perform search [{query}] on the vector DB collection: {the_only_collection}\n"
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
@@ -207,7 +208,7 @@ class DeepSearch(BaseAgent):
selected_collections.append(collection_info.collection_name)
selected_collections = list(set(selected_collections))
log.color_print(
- f" Perform search [{query}] on the vector DB collections: {selected_collections} \n"
+ f"Perform search [{query}] on the vector DB collections: {selected_collections}\n"
)
return selected_collections
@@ -231,12 +232,12 @@ class DeepSearch(BaseAgent):
all_retrieved_results = []
query_vector = self.embedding_model.embed_query(query)
for collection in selected_collections:
- send_search(f"Search [{query}] in [{collection}]...")
+ send_info(f"正在 [{collection}] 中搜索 [{query}] ...")
retrieved_results = self.vector_db.search_data(
collection=collection, vector=query_vector, query_text=query
)
if not retrieved_results or len(retrieved_results) == 0:
- send_search(f"No relevant document chunks found in '{collection}'!")
+ send_info(f"'{collection}' 中没有找到相关文档!")
continue
# Format all chunks for batch processing
@@ -287,9 +288,9 @@ class DeepSearch(BaseAgent):
references.add(retrieved_result.reference)
if accepted_chunk_num > 0:
- send_search(f"Accept {accepted_chunk_num} document chunk(s) from references: {list(references)}")
+ send_info(f"采纳 {accepted_chunk_num} 个文档片段,来源:{list(references)}")
else:
- send_search(f"No document chunk accepted from '{collection}'!")
+ send_info(f"没有采纳任何 '{collection}' 中找到的文档片段!")
return all_retrieved_results
def _generate_more_sub_queries(
@@ -327,7 +328,6 @@ class DeepSearch(BaseAgent):
max_iter = kwargs.get('max_iter', self.max_iter)
### SUB QUERIES ###
- send_think(f" {original_query} ")
all_search_results = []
all_sub_queries = []
@@ -336,11 +336,11 @@ class DeepSearch(BaseAgent):
log.color_print("No sub queries were generated by the LLM. Exiting.")
return [], {}
else:
- send_think(f"Break down the original query into new sub queries: {sub_queries}")
+ send_info(f"原问题被拆分为这些子问题: {sub_queries}")
all_sub_queries.extend(sub_queries)
for it in range(max_iter):
- send_think(f">> Iteration: {it + 1}")
+ send_info(f"第 {it + 1} 轮搜索:")
# Execute all search tasks sequentially
for query in sub_queries:
@@ -350,25 +350,25 @@ class DeepSearch(BaseAgent):
all_search_results = deduplicate(all_search_results)
deduped_len = len(all_search_results)
if undeduped_len - deduped_len != 0:
- send_search(f"Remove {undeduped_len - deduped_len} duplicates")
+ send_info(f"移除 {undeduped_len - deduped_len} 个重复文档片段")
# search_res_from_internet = deduplicate_results(search_res_from_internet)
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
### REFLECTION & GET MORE SUB QUERIES ###
# Only generate more queries if we haven't reached the maximum iterations
if it + 1 < max_iter:
- send_think("Reflecting on the search results...")
+ send_info("正在根据文档片段思考 ...")
sub_queries = self._generate_more_sub_queries(
original_query, all_sub_queries, all_search_results
)
if not sub_queries or len(sub_queries) == 0:
- send_think("No new search queries were generated. Exiting.")
+ send_info("没能生成更多的子问题,正在退出 ....")
break
else:
- send_think(f"New search queries for next iteration: {sub_queries}")
+ send_info(f"下一轮搜索的子问题: {sub_queries}")
all_sub_queries.extend(sub_queries)
else:
- send_think("Reached maximum iterations. Exiting.")
+ send_info("已达到最大搜索轮数,正在退出 ...")
break
all_search_results = deduplicate(all_search_results)
@@ -392,10 +392,10 @@ class DeepSearch(BaseAgent):
"""
all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs)
if not all_retrieved_results or len(all_retrieved_results) == 0:
- send_think(f"No relevant information found for query '{original_query}'.")
+ send_info(f"'{original_query}'没能找到更多信息!")
return "", []
chunks = self._format_chunks(all_retrieved_results)
- send_think(f"Summarize answer from all {len(all_retrieved_results)} retrieved chunks...")
+ send_info(f"正在总结 {len(all_retrieved_results)} 个查找到的文档片段")
summary_prompt = SUMMARY_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
@@ -403,7 +403,6 @@ class DeepSearch(BaseAgent):
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
final_answer = self.llm.remove_think(response)
- # 直接发送最终答案,不发送占位符
send_answer(final_answer)
return self.llm.remove_think(response), all_retrieved_results
diff --git a/deepsearcher/backend/templates/index.html b/deepsearcher/backend/templates/index.html
index 607d467..45714a8 100644
--- a/deepsearcher/backend/templates/index.html
+++ b/deepsearcher/backend/templates/index.html
@@ -214,12 +214,12 @@
line-height: 1.4;
}
- .message-search {
+ .message-start {
background-color: #f0f9ff;
border-left-color: var(--info-color);
}
- .message-think {
+ .message-info {
background-color: #fef3c7;
border-left-color: var(--warning-color);
}
@@ -229,6 +229,16 @@
border-left-color: var(--success-color);
}
+ .message-complete {
+ background-color: #dbeafe;
+ border-left-color: var(--primary-color);
+ }
+
+ .message-error {
+ background-color: #fee2e2;
+ border-left-color: var(--error-color);
+ }
+
.message-timestamp {
font-size: 12px;
color: var(--text-secondary);
@@ -473,13 +483,15 @@
case 'heartbeat':
// 心跳消息,不需要处理
break;
- case 'query_start':
- console.log('Query started:', message.query);
- showStatus('queryStatus', '查询已开始,正在处理...', 'loading');
+ case 'start':
+ console.log('Query started:', message.content);
+ showStatus('queryStatus', '正在处理...', 'loading');
+ addMessageToContainer(message);
break;
case 'complete':
console.log('Query completed - closing connection');
showStatus('queryStatus', '查询完成', 'success');
+ addMessageToContainer(message);
// 关闭EventSource连接
if (window.currentEventSource) {
console.log('Closing currentEventSource');
@@ -490,20 +502,10 @@
setButtonLoading(document.getElementById('queryBtn'), false);
console.log('Query completed - connection closed, isStreaming set to false');
break;
- case 'query_error':
- console.error('Query error:', message.error);
- showStatus('queryStatus', `查询失败: ${message.error}`, 'error');
- // 关闭EventSource连接
- if (window.currentEventSource) {
- window.currentEventSource.close();
- window.currentEventSource = null;
- }
- isStreaming = false;
- setButtonLoading(document.getElementById('queryBtn'), false);
- break;
- case 'stream_error':
- console.error('Stream error:', message.error);
- showStatus('queryStatus', `流错误: ${message.error}`, 'error');
+ case 'error':
+ console.error('Error:', message.content);
+ showStatus('queryStatus', message.content, 'error');
+ addMessageToContainer(message);
// 关闭EventSource连接
if (window.currentEventSource) {
window.currentEventSource.close();
@@ -512,15 +514,14 @@
isStreaming = false;
setButtonLoading(document.getElementById('queryBtn'), false);
break;
- case 'search':
- case 'think':
- // 处理常规消息
- console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...');
+ case 'info':
+ // 处理信息消息
+ console.log('Processing info message:', message.content.substring(0, 100) + '...');
addMessageToContainer(message);
break;
case 'answer':
// 处理answer类型,显示查询结果
- console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...');
+ console.log('Processing answer message:', message.content.substring(0, 100) + '...');
// 将结果内容显示在结果区域
if (message.content && message.content !== "==== FINAL ANSWER====") {
document.getElementById('resultText').textContent = message.content;
@@ -720,7 +721,7 @@
eventSource.onopen = function(event) {
console.log('EventSource connection opened for query');
- showStatus('queryStatus', '查询已开始,正在处理...', 'loading');
+ showStatus('queryStatus', '正在处理...', 'loading');
};
eventSource.onmessage = function(event) {
diff --git a/deepsearcher/llm/openai_llm.py b/deepsearcher/llm/openai_llm.py
index dd4f378..1af6046 100644
--- a/deepsearcher/llm/openai_llm.py
+++ b/deepsearcher/llm/openai_llm.py
@@ -52,7 +52,7 @@ class OpenAILLM(BaseLLM):
top_p=0.8,
presence_penalty=1.2
) as stream:
- # stream仅做测试,不需要发送到前端
+ # stream到控制台测试
content = ""
reasoning_content = ""
for chunk in stream:
diff --git a/deepsearcher/utils/message_stream.py b/deepsearcher/utils/message_stream.py
index dfc364d..61b06bb 100644
--- a/deepsearcher/utils/message_stream.py
+++ b/deepsearcher/utils/message_stream.py
@@ -7,16 +7,17 @@ from datetime import datetime
class MessageType(Enum):
- """消息类型枚举"""
- SEARCH = "search"
- THINK = "think"
- ANSWER = "answer"
- COMPLETE = "complete"
+ """简化的消息类型枚举"""
+ START = "start" # 开始消息
+ INFO = "info" # 信息消息
+ ANSWER = "answer" # 答案消息
+ COMPLETE = "complete" # 完成消息
+ ERROR = "error" # 错误消息
@dataclass
class Message:
- """消息数据结构"""
+ """简化的消息数据结构"""
type: MessageType
content: str
timestamp: float
@@ -37,7 +38,7 @@ class Message:
class MessageStream:
- """消息流管理器"""
+ """简化的消息流管理器"""
def __init__(self):
self._messages: List[Message] = []
@@ -82,21 +83,25 @@ class MessageStream:
except Exception as e:
print(f"Error in message callback: {e}")
- def send_search(self, content: str, metadata: Optional[Dict[str, Any]] = None):
- """发送搜索消息"""
- self.send_message(MessageType.SEARCH, content, metadata)
+ def send_start(self, content: str, metadata: Optional[Dict[str, Any]] = None):
+ """发送开始消息"""
+ self.send_message(MessageType.START, content, metadata)
- def send_think(self, content: str, metadata: Optional[Dict[str, Any]] = None):
- """发送思考消息"""
- self.send_message(MessageType.THINK, content, metadata)
+ def send_info(self, content: str, metadata: Optional[Dict[str, Any]] = None):
+ """发送信息消息"""
+ self.send_message(MessageType.INFO, content, metadata)
def send_answer(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送答案消息"""
self.send_message(MessageType.ANSWER, content, metadata)
- def send_complete(self, metadata: Optional[Dict[str, Any]] = None):
+ def send_complete(self, content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
"""发送完成消息"""
- self.send_message(MessageType.COMPLETE, "", metadata)
+ self.send_message(MessageType.COMPLETE, content, metadata)
+
+ def send_error(self, content: str, metadata: Optional[Dict[str, Any]] = None):
+ """发送错误消息"""
+ self.send_message(MessageType.ERROR, content, metadata)
def get_messages(self) -> List[Message]:
"""获取所有消息"""
@@ -123,14 +128,14 @@ class MessageStream:
message_stream = MessageStream()
-def send_search(content: str, metadata: Optional[Dict[str, Any]] = None):
- """全局搜索消息发送函数"""
- message_stream.send_search(content, metadata)
+def send_start(content: str, metadata: Optional[Dict[str, Any]] = None):
+ """全局开始消息发送函数"""
+ message_stream.send_start(content, metadata)
-def send_think(content: str, metadata: Optional[Dict[str, Any]] = None):
- """全局思考消息发送函数"""
- message_stream.send_think(content, metadata)
+def send_info(content: str, metadata: Optional[Dict[str, Any]] = None):
+ """全局信息消息发送函数"""
+ message_stream.send_info(content, metadata)
def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None):
@@ -138,9 +143,14 @@ def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None):
message_stream.send_answer(content, metadata)
-def send_complete(metadata: Optional[Dict[str, Any]] = None):
+def send_complete(content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
"""全局完成消息发送函数"""
- message_stream.send_complete(metadata)
+ message_stream.send_complete(content, metadata)
+
+
+def send_error(content: str, metadata: Optional[Dict[str, Any]] = None):
+ """全局错误消息发送函数"""
+ message_stream.send_error(content, metadata)
def get_message_stream() -> MessageStream:
diff --git a/docs/message_stream_system.md b/docs/message_stream_system.md
deleted file mode 100644
index b572003..0000000
--- a/docs/message_stream_system.md
+++ /dev/null
@@ -1,197 +0,0 @@
-# 消息流系统 (Message Stream System)
-
-## 概述
-
-DeepSearcher 的消息流系统是一个新的消息传输机制,用于替代原来的日志传输方式。该系统支持三种类型的消息:`search`、`think` 和 `answer`,并提供了灵活的消息管理和传输功能。
-
-## 消息类型
-
-### 1. Search 消息
-- **类型**: `search`
-- **用途**: 表示搜索相关的操作和状态
-- **示例**:
- - "在向量数据库中搜索相关信息..."
- - "找到5个相关文档片段"
- - "搜索人工智能的定义..."
-
-### 2. Think 消息
-- **类型**: `think`
-- **用途**: 表示思考和推理过程
-- **示例**:
- - "开始处理查询: 什么是人工智能?"
- - "分析搜索结果..."
- - "生成子查询: 人工智能的定义、历史、应用"
-
-### 3. Answer 消息
-- **类型**: `answer`
-- **用途**: 表示最终答案和结果
-- **示例**:
- - "==== FINAL ANSWER===="
- - "人工智能是计算机科学的一个分支..."
-
-## 核心组件
-
-### MessageStream 类
-
-主要的消息流管理器,提供以下功能:
-
-```python
-from deepsearcher.utils.message_stream import MessageStream
-
-# 创建消息流实例
-message_stream = MessageStream()
-
-# 发送消息
-message_stream.send_search("搜索内容...")
-message_stream.send_think("思考内容...")
-message_stream.send_answer("答案内容...")
-
-# 获取消息
-messages = message_stream.get_messages()
-messages_dict = message_stream.get_messages_as_dicts()
-messages_json = message_stream.get_messages_as_json()
-```
-
-### 全局函数
-
-为了方便使用,提供了全局函数:
-
-```python
-from deepsearcher.utils.message_stream import send_search, send_think, send_answer
-
-# 直接发送消息
-send_search("搜索内容...")
-send_think("思考内容...")
-send_answer("答案内容...")
-```
-
-## API 接口
-
-### 1. 获取消息
-```
-GET /messages/
-```
-
-返回所有消息的列表:
-```json
-{
- "messages": [
- {
- "type": "search",
- "content": "搜索内容...",
- "timestamp": 1755043653.9606102,
- "metadata": {}
- }
- ]
-}
-```
-
-### 2. 清空消息
-```
-POST /clear-messages/
-```
-
-清空所有消息并返回成功状态:
-```json
-{
- "message": "Messages cleared successfully"
-}
-```
-
-### 3. 查询接口(已更新)
-```
-GET /query/?original_query=&max_iter=
-```
-
-现在返回结果包含消息流:
-```json
-{
- "result": "最终答案...",
- "messages": [
- {
- "type": "search",
- "content": "搜索内容...",
- "timestamp": 1755043653.9606102,
- "metadata": {}
- }
- ]
-}
-```
-
-## 前端集成
-
-前端界面现在包含一个消息流显示区域,实时显示处理过程中的各种消息:
-
-### CSS 样式
-- `.message-search`: 搜索消息样式(蓝色边框)
-- `.message-think`: 思考消息样式(黄色边框)
-- `.message-answer`: 答案消息样式(绿色边框)
-
-### JavaScript 功能
-- `displayMessages(messages)`: 显示消息流
-- 自动滚动到最新消息
-- 时间戳显示
-
-## 使用示例
-
-### 后端使用
-
-```python
-from deepsearcher.utils.message_stream import send_search, send_think, send_answer
-
-# 在搜索过程中发送消息
-send_think("开始处理查询...")
-send_search("在数据库中搜索...")
-send_search("找到相关文档...")
-send_think("分析结果...")
-send_answer("最终答案...")
-```
-
-### 前端使用
-
-```javascript
-// 获取消息
-const response = await fetch('/query/?original_query=test&max_iter=3');
-const data = await response.json();
-
-// 显示消息流
-if (data.messages && data.messages.length > 0) {
- displayMessages(data.messages);
-}
-```
-
-## 优势
-
-1. **结构化数据**: 消息包含类型、内容、时间戳和元数据
-2. **类型安全**: 使用枚举确保消息类型的一致性
-3. **灵活传输**: 支持多种输出格式(字典、JSON)
-4. **实时显示**: 前端可以实时显示处理过程
-5. **易于扩展**: 可以轻松添加新的消息类型和功能
-
-## 迁移说明
-
-从原来的日志系统迁移到新的消息流系统:
-
-### 原来的代码
-```python
-log.color_print(f" Search [{query}] in [{collection}]... \n")
-log.color_print(f" Summarize answer... \n")
-log.color_print(f" Final answer... \n")
-```
-
-### 新的代码
-```python
-send_search(f"Search [{query}] in [{collection}]...")
-send_think("Summarize answer...")
-send_answer("Final answer...")
-```
-
-## 测试
-
-运行测试脚本验证系统功能:
-
-```bash
-python test_message_stream.py
-```
-
-这将测试消息流的基本功能,包括消息发送、获取和格式化。
diff --git a/main.py b/main.py
index e276550..58fd9f2 100644
--- a/main.py
+++ b/main.py
@@ -228,7 +228,6 @@ def perform_query(
# 清空之前的消息
message_stream = get_message_stream()
message_stream.clear_messages()
-
result_text, _ = query(original_query, max_iter)
return {
@@ -270,13 +269,13 @@ async def perform_query_stream(
# 清空之前的消息
message_stream = get_message_stream()
message_stream.clear_messages()
-
+
# 发送查询开始消息
- yield f"data: {json.dumps({'type': 'query_start', 'query': original_query, 'max_iter': max_iter}, ensure_ascii=False)}\n\n"
-
+ yield f"data: {json.dumps({'type': 'start', 'content': '开始查询'}, ensure_ascii=False)}\n\n"
+
# 创建一个线程安全的队列来接收消息
message_queue = queue.Queue()
-
+
def message_callback(message):
"""消息回调函数"""
try:
@@ -284,26 +283,29 @@ async def perform_query_stream(
message_queue.put(message)
except Exception as e:
print(f"Error in message callback: {e}")
-
+
# 注册回调函数
message_stream.add_callback(message_callback)
-
+
# 在后台线程中执行查询
def run_query():
try:
print(f"Starting query: {original_query} with max_iter: {max_iter}")
- result_text, _ = query(original_query, max_iter)
+ result_text, retrieval_results = query(original_query, max_iter)
print(f"Query completed with result length: {len(result_text) if result_text else 0}")
+ print(f"Retrieved {len(retrieval_results) if retrieval_results else 0} documents")
return result_text, None
except Exception as e:
+ import traceback
print(f"Query failed with error: {e}")
+ print(f"Traceback: {traceback.format_exc()}")
return None, str(e)
-
+
# 使用线程池执行查询
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
query_future = executor.submit(run_query)
-
+
# 监听消息和查询结果
while True:
try:
@@ -312,17 +314,17 @@ async def perform_query_stream(
result_text, error = query_future.result()
if error:
print(f"Query error: {error}")
- yield f"data: {json.dumps({'type': 'query_error', 'error': error}, ensure_ascii=False)}\n\n"
+ yield f"data: {json.dumps({'type': 'error', 'content': f'查询失败: {error}'}, ensure_ascii=False)}\n\n"
return
# 成功完成时,发送complete消息并结束
print("Query completed successfully, sending complete message")
- yield f"data: {json.dumps({'type': 'complete'}, ensure_ascii=False)}\n\n"
+ yield f"data: {json.dumps({'type': 'complete', 'content': '查询完成'}, ensure_ascii=False)}\n\n"
print("Complete message sent, ending stream")
return
-
+
# 尝试从队列获取消息,设置超时
try:
- message = message_queue.get(timeout=0.5)
+ message = message_queue.get(timeout=2.0)
message_data = {
'type': message.type.value,
'content': message.content,
@@ -333,19 +335,19 @@ async def perform_query_stream(
except queue.Empty:
# 超时,继续循环
continue
-
+
except Exception as e:
print(f"Error in stream loop: {e}")
- yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n"
+ yield f"data: {json.dumps({'type': 'error', 'content': f'流处理错误: {str(e)}'}, ensure_ascii=False)}\n\n"
return
-
+
except Exception as e:
- yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n"
+ yield f"data: {json.dumps({'type': 'error', 'content': f'查询错误: {str(e)}'}, ensure_ascii=False)}\n\n"
finally:
# 清理回调函数
if message_callback:
message_stream.remove_callback(message_callback)
-
+
return StreamingResponse(
query_stream_generator(),
media_type="text/event-stream",
@@ -379,7 +381,7 @@ def get_messages():
async def stream_messages() -> StreamingResponse:
"""
Stream messages in real-time using Server-Sent Events.
-
+
Returns:
StreamingResponse: A streaming response with real-time messages.
"""
@@ -387,7 +389,7 @@ async def stream_messages() -> StreamingResponse:
"""生成SSE事件流"""
# 创建一个队列来接收消息
message_queue = asyncio.Queue()
-
+
def message_callback(message):
"""消息回调函数"""
try:
@@ -395,21 +397,21 @@ async def stream_messages() -> StreamingResponse:
asyncio.create_task(message_queue.put(message))
except Exception as e:
print(f"Error in message callback: {e}")
-
+
# 注册回调函数
message_callbacks.append(message_callback)
message_stream = get_message_stream()
message_stream.add_callback(message_callback)
-
+
try:
# 发送连接建立消息
yield f"data: {json.dumps({'type': 'connection', 'message': 'Connected to message stream'}, ensure_ascii=False)}\n\n"
-
+
while True:
try:
# 等待新消息,设置超时以便能够检查连接状态
- message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
-
+ message = await asyncio.wait_for(message_queue.get(), timeout=2.0)
+
# 发送消息数据
message_data = {
'type': message.type.value,
@@ -418,11 +420,11 @@ async def stream_messages() -> StreamingResponse:
'metadata': message.metadata or {}
}
yield f"data: {json.dumps(message_data, ensure_ascii=False)}\n\n"
-
+
except asyncio.TimeoutError:
# 发送心跳保持连接
yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': asyncio.get_event_loop().time()}, ensure_ascii=False)}\n\n"
-
+
except Exception as e:
print(f"Error in event generator: {e}")
finally:
@@ -430,7 +432,7 @@ async def stream_messages() -> StreamingResponse:
if message_callback in message_callbacks:
message_callbacks.remove(message_callback)
message_stream.remove_callback(message_callback)
-
+
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
diff --git a/simplified_message_format.md b/simplified_message_format.md
new file mode 100644
index 0000000..a0440e2
--- /dev/null
+++ b/simplified_message_format.md
@@ -0,0 +1,180 @@
+# 简化消息格式
+
+## 概述
+
+为了简化前后端的消息通信,我们将原来的多种消息类型统一为简单的 `type` 和 `content` 格式。
+
+## 消息格式
+
+所有消息都遵循以下统一格式:
+
+```json
+{
+ "type": "消息类型",
+ "content": "消息内容",
+ "timestamp": 时间戳,
+ "metadata": {}
+}
+```
+
+## 消息类型
+
+### 1. start - 开始消息
+- **用途**: 表示查询或操作的开始
+- **示例**:
+```json
+{
+ "type": "start",
+ "content": "开始处理查询: 请写一篇关于Milvus向量数据库的报告"
+}
+```
+
+### 2. info - 信息消息
+- **用途**: 表示处理过程中的信息、状态更新、迭代信息等
+- **示例**:
+```json
+{
+ "type": "info",
+ "content": "iteration 1: 正在搜索相关文档..."
+}
+```
+
+### 3. answer - 答案消息
+- **用途**: 表示最终答案或重要结果
+- **示例**:
+```json
+{
+ "type": "answer",
+ "content": "Milvus的详细报告: ..."
+}
+```
+
+### 4. complete - 完成消息
+- **用途**: 表示操作完成
+- **示例**:
+```json
+{
+ "type": "complete",
+ "content": "查询完成"
+}
+```
+
+### 5. error - 错误消息
+- **用途**: 表示错误信息
+- **示例**:
+```json
+{
+ "type": "error",
+ "content": "查询失败: 无法连接到数据库"
+}
+```
+
+## 使用示例
+
+### 后端使用
+
+```python
+from deepsearcher.utils.message_stream import (
+ send_start, send_info, send_answer, send_complete, send_error
+)
+
+# 发送开始消息
+send_start("开始处理查询: 请写一篇关于Milvus的报告")
+
+# 发送信息消息
+send_info("iteration 1: 正在搜索相关文档...")
+send_info("找到5个相关文档片段")
+
+# 发送答案消息
+send_answer("Milvus的详细报告: ...")
+
+# 发送完成消息
+send_complete("查询完成")
+
+# 发送错误消息
+send_error("查询失败: 无法连接到数据库")
+```
+
+### 前端处理
+
+```javascript
+// 处理消息流
+function handleStreamMessage(data) {
+ const message = JSON.parse(data);
+
+ switch (message.type) {
+ case 'start':
+ console.log('开始:', message.content);
+ break;
+ case 'info':
+ console.log('信息:', message.content);
+ break;
+ case 'answer':
+ console.log('答案:', message.content);
+ break;
+ case 'complete':
+ console.log('完成:', message.content);
+ break;
+ case 'error':
+ console.error('错误:', message.content);
+ break;
+ }
+}
+```
+
+## 优势
+
+1. **简化格式**: 统一的消息格式,易于理解和处理
+2. **类型清晰**: 5种基本类型覆盖所有使用场景
+3. **易于扩展**: 可以轻松添加新的消息类型
+4. **前端友好**: 前端处理逻辑更加简单
+5. **调试方便**: 消息格式清晰,便于调试和日志记录
+
+## 迁移说明
+
+### 从旧格式迁移
+
+| 旧类型 | 新类型 | 说明 |
+|--------|--------|------|
+| `search` | `info` | 搜索相关信息 |
+| `think` | `info` | 思考过程信息 |
+| `answer` | `answer` | 保持不变 |
+| `complete` | `complete` | 保持不变 |
+| `query_start` | `start` | 查询开始 |
+| `query_error` | `error` | 查询错误 |
+| `stream_error` | `error` | 流错误 |
+
+### 代码更新
+
+所有使用旧消息类型的代码都需要更新:
+
+```python
+# 旧代码
+send_search("搜索内容...")
+send_think("思考内容...")
+
+# 新代码
+send_info("搜索内容...")
+send_info("思考内容...")
+```
+
+## 测试
+
+可以使用以下方式测试消息格式:
+
+```python
+from deepsearcher.utils.message_stream import get_message_stream
+
+# 获取消息流实例
+message_stream = get_message_stream()
+
+# 获取所有消息
+messages = message_stream.get_messages_as_dicts()
+
+# 验证格式
+for msg in messages:
+ assert 'type' in msg
+ assert 'content' in msg
+ assert 'timestamp' in msg
+ print(f"类型: {msg['type']}, 内容: {msg['content']}")
+```
diff --git a/test_fix.py b/test_fix.py
new file mode 100644
index 0000000..5fb3044
--- /dev/null
+++ b/test_fix.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python3
+"""
+测试修复后的查询功能
+"""
+
+import sys
+import os
+sys.path.append(os.path.dirname(os.path.abspath(__file__)))
+
+from deepsearcher.configuration import Configuration, init_config
+from deepsearcher.online_query import query
+
+def test_query_fix():
+ """测试修复后的查询功能"""
+ print("=== 测试修复后的查询功能 ===")
+
+ # 初始化配置
+ config = Configuration()
+ init_config(config)
+
+ try:
+ print("开始查询...")
+ result_text, retrieval_results = query("什么是Milvus?", max_iter=1)
+
+ print(f"查询完成!")
+ print(f"结果长度: {len(result_text) if result_text else 0}")
+ print(f"检索结果数量: {len(retrieval_results) if retrieval_results else 0}")
+
+ if result_text:
+ print(f"结果预览: {result_text[:200]}...")
+
+ except Exception as e:
+ import traceback
+ print(f"查询失败: {e}")
+ print(f"错误详情: {traceback.format_exc()}")
+
+if __name__ == "__main__":
+ test_query_fix()