Browse Source

feat: 简化消息队列

main
tanxing 5 days ago
parent
commit
cfaf7b9126
  1. 37
      deepsearcher/agent/deep_search.py
  2. 51
      deepsearcher/backend/templates/index.html
  3. 2
      deepsearcher/llm/openai_llm.py
  4. 56
      deepsearcher/utils/message_stream.py
  5. 197
      docs/message_stream_system.md
  6. 60
      main.py
  7. 180
      simplified_message_format.md
  8. 38
      test_fix.py

37
deepsearcher/agent/deep_search.py

@ -2,7 +2,7 @@ from deepsearcher.agent.base import BaseAgent, describe_class
from deepsearcher.embedding.base import BaseEmbedding
from deepsearcher.llm.base import BaseLLM
from deepsearcher.utils import log
from deepsearcher.utils.message_stream import send_search, send_think, send_answer
from deepsearcher.utils.message_stream import send_info, send_answer
from deepsearcher.vector_db import RetrievalResult
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
from collections import defaultdict
@ -22,7 +22,8 @@ COLLECTION_ROUTE_PROMPT = """
SUB_QUERY_PROMPT = """
为了能够全面的回答这个问题请你尝试把原本的问题拆分或扩展为多个子问题
为了能够全面的回答这个问题请你尝试把原本的问题拆分或扩展为几个子问题
不可以太多但是也不可以太少请根据问题复杂程度来决定子问题的数量
如果原问题本身非常简单没有必要进行拆分则保留输出原问题本身
需要保证每个子问题都具体清晰不可分原子性最终返回一个字符串列表
@ -180,7 +181,7 @@ class DeepSearch(BaseAgent):
if len(collection_infos) == 1:
the_only_collection = collection_infos[0].collection_name
log.color_print(
f"<think> Perform search [{query}] on the vector DB collection: {the_only_collection} </think>\n"
f"Perform search [{query}] on the vector DB collection: {the_only_collection}\n"
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
@ -207,7 +208,7 @@ class DeepSearch(BaseAgent):
selected_collections.append(collection_info.collection_name)
selected_collections = list(set(selected_collections))
log.color_print(
f"<think> Perform search [{query}] on the vector DB collections: {selected_collections} </think>\n"
f"Perform search [{query}] on the vector DB collections: {selected_collections}\n"
)
return selected_collections
@ -231,12 +232,12 @@ class DeepSearch(BaseAgent):
all_retrieved_results = []
query_vector = self.embedding_model.embed_query(query)
for collection in selected_collections:
send_search(f"Search [{query}] in [{collection}]...")
send_info(f"正在 [{collection}] 中搜索 [{query}] ...")
retrieved_results = self.vector_db.search_data(
collection=collection, vector=query_vector, query_text=query
)
if not retrieved_results or len(retrieved_results) == 0:
send_search(f"No relevant document chunks found in '{collection}'!")
send_info(f"'{collection}' 中没有找到相关文档!")
continue
# Format all chunks for batch processing
@ -287,9 +288,9 @@ class DeepSearch(BaseAgent):
references.add(retrieved_result.reference)
if accepted_chunk_num > 0:
send_search(f"Accept {accepted_chunk_num} document chunk(s) from references: {list(references)}")
send_info(f"采纳 {accepted_chunk_num} 个文档片段,来源:{list(references)}")
else:
send_search(f"No document chunk accepted from '{collection}'!")
send_info(f"没有采纳任何 '{collection}' 中找到的文档片段!")
return all_retrieved_results
def _generate_more_sub_queries(
@ -327,7 +328,6 @@ class DeepSearch(BaseAgent):
max_iter = kwargs.get('max_iter', self.max_iter)
### SUB QUERIES ###
send_think(f"<query> {original_query} </query>")
all_search_results = []
all_sub_queries = []
@ -336,11 +336,11 @@ class DeepSearch(BaseAgent):
log.color_print("No sub queries were generated by the LLM. Exiting.")
return [], {}
else:
send_think(f"Break down the original query into new sub queries: {sub_queries}")
send_info(f"原问题被拆分为这些子问题: {sub_queries}")
all_sub_queries.extend(sub_queries)
for it in range(max_iter):
send_think(f">> Iteration: {it + 1}")
send_info(f" {it + 1} 轮搜索:")
# Execute all search tasks sequentially
for query in sub_queries:
@ -350,25 +350,25 @@ class DeepSearch(BaseAgent):
all_search_results = deduplicate(all_search_results)
deduped_len = len(all_search_results)
if undeduped_len - deduped_len != 0:
send_search(f"Remove {undeduped_len - deduped_len} duplicates")
send_info(f"移除 {undeduped_len - deduped_len} 个重复文档片段")
# search_res_from_internet = deduplicate_results(search_res_from_internet)
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
### REFLECTION & GET MORE SUB QUERIES ###
# Only generate more queries if we haven't reached the maximum iterations
if it + 1 < max_iter:
send_think("Reflecting on the search results...")
send_info("正在根据文档片段思考 ...")
sub_queries = self._generate_more_sub_queries(
original_query, all_sub_queries, all_search_results
)
if not sub_queries or len(sub_queries) == 0:
send_think("No new search queries were generated. Exiting.")
send_info("没能生成更多的子问题,正在退出 ....")
break
else:
send_think(f"New search queries for next iteration: {sub_queries}")
send_info(f"下一轮搜索的子问题: {sub_queries}")
all_sub_queries.extend(sub_queries)
else:
send_think("Reached maximum iterations. Exiting.")
send_info("已达到最大搜索轮数,正在退出 ...")
break
all_search_results = deduplicate(all_search_results)
@ -392,10 +392,10 @@ class DeepSearch(BaseAgent):
"""
all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs)
if not all_retrieved_results or len(all_retrieved_results) == 0:
send_think(f"No relevant information found for query '{original_query}'.")
send_info(f"'{original_query}'没能找到更多信息!")
return "", []
chunks = self._format_chunks(all_retrieved_results)
send_think(f"Summarize answer from all {len(all_retrieved_results)} retrieved chunks...")
send_info(f"正在总结 {len(all_retrieved_results)} 个查找到的文档片段")
summary_prompt = SUMMARY_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
@ -403,7 +403,6 @@ class DeepSearch(BaseAgent):
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
final_answer = self.llm.remove_think(response)
# 直接发送最终答案,不发送占位符
send_answer(final_answer)
return self.llm.remove_think(response), all_retrieved_results

51
deepsearcher/backend/templates/index.html

@ -214,12 +214,12 @@
line-height: 1.4;
}
.message-search {
.message-start {
background-color: #f0f9ff;
border-left-color: var(--info-color);
}
.message-think {
.message-info {
background-color: #fef3c7;
border-left-color: var(--warning-color);
}
@ -229,6 +229,16 @@
border-left-color: var(--success-color);
}
.message-complete {
background-color: #dbeafe;
border-left-color: var(--primary-color);
}
.message-error {
background-color: #fee2e2;
border-left-color: var(--error-color);
}
.message-timestamp {
font-size: 12px;
color: var(--text-secondary);
@ -473,13 +483,15 @@
case 'heartbeat':
// 心跳消息,不需要处理
break;
case 'query_start':
console.log('Query started:', message.query);
showStatus('queryStatus', '查询已开始,正在处理...', 'loading');
case 'start':
console.log('Query started:', message.content);
showStatus('queryStatus', '正在处理...', 'loading');
addMessageToContainer(message);
break;
case 'complete':
console.log('Query completed - closing connection');
showStatus('queryStatus', '查询完成', 'success');
addMessageToContainer(message);
// 关闭EventSource连接
if (window.currentEventSource) {
console.log('Closing currentEventSource');
@ -490,20 +502,10 @@
setButtonLoading(document.getElementById('queryBtn'), false);
console.log('Query completed - connection closed, isStreaming set to false');
break;
case 'query_error':
console.error('Query error:', message.error);
showStatus('queryStatus', `查询失败: ${message.error}`, 'error');
// 关闭EventSource连接
if (window.currentEventSource) {
window.currentEventSource.close();
window.currentEventSource = null;
}
isStreaming = false;
setButtonLoading(document.getElementById('queryBtn'), false);
break;
case 'stream_error':
console.error('Stream error:', message.error);
showStatus('queryStatus', `流错误: ${message.error}`, 'error');
case 'error':
console.error('Error:', message.content);
showStatus('queryStatus', message.content, 'error');
addMessageToContainer(message);
// 关闭EventSource连接
if (window.currentEventSource) {
window.currentEventSource.close();
@ -512,15 +514,14 @@
isStreaming = false;
setButtonLoading(document.getElementById('queryBtn'), false);
break;
case 'search':
case 'think':
// 处理常规消息
console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...');
case 'info':
// 处理信息消息
console.log('Processing info message:', message.content.substring(0, 100) + '...');
addMessageToContainer(message);
break;
case 'answer':
// 处理answer类型,显示查询结果
console.log('Processing message type:', message.type, 'with content:', message.content.substring(0, 100) + '...');
console.log('Processing answer message:', message.content.substring(0, 100) + '...');
// 将结果内容显示在结果区域
if (message.content && message.content !== "==== FINAL ANSWER====") {
document.getElementById('resultText').textContent = message.content;
@ -720,7 +721,7 @@
eventSource.onopen = function(event) {
console.log('EventSource connection opened for query');
showStatus('queryStatus', '查询已开始,正在处理...', 'loading');
showStatus('queryStatus', '正在处理...', 'loading');
};
eventSource.onmessage = function(event) {

2
deepsearcher/llm/openai_llm.py

@ -52,7 +52,7 @@ class OpenAILLM(BaseLLM):
top_p=0.8,
presence_penalty=1.2
) as stream:
# stream仅做测试,不需要发送到前端
# stream到控制台测试
content = ""
reasoning_content = ""
for chunk in stream:

56
deepsearcher/utils/message_stream.py

@ -7,16 +7,17 @@ from datetime import datetime
class MessageType(Enum):
"""消息类型枚举"""
SEARCH = "search"
THINK = "think"
ANSWER = "answer"
COMPLETE = "complete"
"""简化的消息类型枚举"""
START = "start" # 开始消息
INFO = "info" # 信息消息
ANSWER = "answer" # 答案消息
COMPLETE = "complete" # 完成消息
ERROR = "error" # 错误消息
@dataclass
class Message:
"""消息数据结构"""
"""简化的消息数据结构"""
type: MessageType
content: str
timestamp: float
@ -37,7 +38,7 @@ class Message:
class MessageStream:
"""消息流管理器"""
"""简化的消息流管理器"""
def __init__(self):
self._messages: List[Message] = []
@ -82,21 +83,25 @@ class MessageStream:
except Exception as e:
print(f"Error in message callback: {e}")
def send_search(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送搜索消息"""
self.send_message(MessageType.SEARCH, content, metadata)
def send_start(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送开始消息"""
self.send_message(MessageType.START, content, metadata)
def send_think(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送思考消息"""
self.send_message(MessageType.THINK, content, metadata)
def send_info(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送信息消息"""
self.send_message(MessageType.INFO, content, metadata)
def send_answer(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送答案消息"""
self.send_message(MessageType.ANSWER, content, metadata)
def send_complete(self, metadata: Optional[Dict[str, Any]] = None):
def send_complete(self, content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
"""发送完成消息"""
self.send_message(MessageType.COMPLETE, "", metadata)
self.send_message(MessageType.COMPLETE, content, metadata)
def send_error(self, content: str, metadata: Optional[Dict[str, Any]] = None):
"""发送错误消息"""
self.send_message(MessageType.ERROR, content, metadata)
def get_messages(self) -> List[Message]:
"""获取所有消息"""
@ -123,14 +128,14 @@ class MessageStream:
message_stream = MessageStream()
def send_search(content: str, metadata: Optional[Dict[str, Any]] = None):
"""全局搜索消息发送函数"""
message_stream.send_search(content, metadata)
def send_start(content: str, metadata: Optional[Dict[str, Any]] = None):
"""全局开始消息发送函数"""
message_stream.send_start(content, metadata)
def send_think(content: str, metadata: Optional[Dict[str, Any]] = None):
"""全局思考消息发送函数"""
message_stream.send_think(content, metadata)
def send_info(content: str, metadata: Optional[Dict[str, Any]] = None):
"""全局信息消息发送函数"""
message_stream.send_info(content, metadata)
def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None):
@ -138,9 +143,14 @@ def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None):
message_stream.send_answer(content, metadata)
def send_complete(metadata: Optional[Dict[str, Any]] = None):
def send_complete(content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
"""全局完成消息发送函数"""
message_stream.send_complete(metadata)
message_stream.send_complete(content, metadata)
def send_error(content: str, metadata: Optional[Dict[str, Any]] = None):
"""全局错误消息发送函数"""
message_stream.send_error(content, metadata)
def get_message_stream() -> MessageStream:

197
docs/message_stream_system.md

@ -1,197 +0,0 @@
# 消息流系统 (Message Stream System)
## 概述
DeepSearcher 的消息流系统是一个新的消息传输机制,用于替代原来的日志传输方式。该系统支持三种类型的消息:`search`、`think` 和 `answer`,并提供了灵活的消息管理和传输功能。
## 消息类型
### 1. Search 消息
- **类型**: `search`
- **用途**: 表示搜索相关的操作和状态
- **示例**:
- "在向量数据库中搜索相关信息..."
- "找到5个相关文档片段"
- "搜索人工智能的定义..."
### 2. Think 消息
- **类型**: `think`
- **用途**: 表示思考和推理过程
- **示例**:
- "开始处理查询: 什么是人工智能?"
- "分析搜索结果..."
- "生成子查询: 人工智能的定义、历史、应用"
### 3. Answer 消息
- **类型**: `answer`
- **用途**: 表示最终答案和结果
- **示例**:
- "==== FINAL ANSWER===="
- "人工智能是计算机科学的一个分支..."
## 核心组件
### MessageStream 类
主要的消息流管理器,提供以下功能:
```python
from deepsearcher.utils.message_stream import MessageStream
# 创建消息流实例
message_stream = MessageStream()
# 发送消息
message_stream.send_search("搜索内容...")
message_stream.send_think("思考内容...")
message_stream.send_answer("答案内容...")
# 获取消息
messages = message_stream.get_messages()
messages_dict = message_stream.get_messages_as_dicts()
messages_json = message_stream.get_messages_as_json()
```
### 全局函数
为了方便使用,提供了全局函数:
```python
from deepsearcher.utils.message_stream import send_search, send_think, send_answer
# 直接发送消息
send_search("搜索内容...")
send_think("思考内容...")
send_answer("答案内容...")
```
## API 接口
### 1. 获取消息
```
GET /messages/
```
返回所有消息的列表:
```json
{
"messages": [
{
"type": "search",
"content": "搜索内容...",
"timestamp": 1755043653.9606102,
"metadata": {}
}
]
}
```
### 2. 清空消息
```
POST /clear-messages/
```
清空所有消息并返回成功状态:
```json
{
"message": "Messages cleared successfully"
}
```
### 3. 查询接口(已更新)
```
GET /query/?original_query=<query>&max_iter=<iterations>
```
现在返回结果包含消息流:
```json
{
"result": "最终答案...",
"messages": [
{
"type": "search",
"content": "搜索内容...",
"timestamp": 1755043653.9606102,
"metadata": {}
}
]
}
```
## 前端集成
前端界面现在包含一个消息流显示区域,实时显示处理过程中的各种消息:
### CSS 样式
- `.message-search`: 搜索消息样式(蓝色边框)
- `.message-think`: 思考消息样式(黄色边框)
- `.message-answer`: 答案消息样式(绿色边框)
### JavaScript 功能
- `displayMessages(messages)`: 显示消息流
- 自动滚动到最新消息
- 时间戳显示
## 使用示例
### 后端使用
```python
from deepsearcher.utils.message_stream import send_search, send_think, send_answer
# 在搜索过程中发送消息
send_think("开始处理查询...")
send_search("在数据库中搜索...")
send_search("找到相关文档...")
send_think("分析结果...")
send_answer("最终答案...")
```
### 前端使用
```javascript
// 获取消息
const response = await fetch('/query/?original_query=test&max_iter=3');
const data = await response.json();
// 显示消息流
if (data.messages && data.messages.length > 0) {
displayMessages(data.messages);
}
```
## 优势
1. **结构化数据**: 消息包含类型、内容、时间戳和元数据
2. **类型安全**: 使用枚举确保消息类型的一致性
3. **灵活传输**: 支持多种输出格式(字典、JSON)
4. **实时显示**: 前端可以实时显示处理过程
5. **易于扩展**: 可以轻松添加新的消息类型和功能
## 迁移说明
从原来的日志系统迁移到新的消息流系统:
### 原来的代码
```python
log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
log.color_print(f"<think> Summarize answer... </think>\n")
log.color_print(f"<answer> Final answer... </answer>\n")
```
### 新的代码
```python
send_search(f"Search [{query}] in [{collection}]...")
send_think("Summarize answer...")
send_answer("Final answer...")
```
## 测试
运行测试脚本验证系统功能:
```bash
python test_message_stream.py
```
这将测试消息流的基本功能,包括消息发送、获取和格式化。

60
main.py

@ -228,7 +228,6 @@ def perform_query(
# 清空之前的消息
message_stream = get_message_stream()
message_stream.clear_messages()
result_text, _ = query(original_query, max_iter)
return {
@ -270,13 +269,13 @@ async def perform_query_stream(
# 清空之前的消息
message_stream = get_message_stream()
message_stream.clear_messages()
# 发送查询开始消息
yield f"data: {json.dumps({'type': 'query_start', 'query': original_query, 'max_iter': max_iter}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'start', 'content': '开始查询'}, ensure_ascii=False)}\n\n"
# 创建一个线程安全的队列来接收消息
message_queue = queue.Queue()
def message_callback(message):
"""消息回调函数"""
try:
@ -284,26 +283,29 @@ async def perform_query_stream(
message_queue.put(message)
except Exception as e:
print(f"Error in message callback: {e}")
# 注册回调函数
message_stream.add_callback(message_callback)
# 在后台线程中执行查询
def run_query():
try:
print(f"Starting query: {original_query} with max_iter: {max_iter}")
result_text, _ = query(original_query, max_iter)
result_text, retrieval_results = query(original_query, max_iter)
print(f"Query completed with result length: {len(result_text) if result_text else 0}")
print(f"Retrieved {len(retrieval_results) if retrieval_results else 0} documents")
return result_text, None
except Exception as e:
import traceback
print(f"Query failed with error: {e}")
print(f"Traceback: {traceback.format_exc()}")
return None, str(e)
# 使用线程池执行查询
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
query_future = executor.submit(run_query)
# 监听消息和查询结果
while True:
try:
@ -312,17 +314,17 @@ async def perform_query_stream(
result_text, error = query_future.result()
if error:
print(f"Query error: {error}")
yield f"data: {json.dumps({'type': 'query_error', 'error': error}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'error', 'content': f'查询失败: {error}'}, ensure_ascii=False)}\n\n"
return
# 成功完成时,发送complete消息并结束
print("Query completed successfully, sending complete message")
yield f"data: {json.dumps({'type': 'complete'}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'complete', 'content': '查询完成'}, ensure_ascii=False)}\n\n"
print("Complete message sent, ending stream")
return
# 尝试从队列获取消息,设置超时
try:
message = message_queue.get(timeout=0.5)
message = message_queue.get(timeout=2.0)
message_data = {
'type': message.type.value,
'content': message.content,
@ -333,19 +335,19 @@ async def perform_query_stream(
except queue.Empty:
# 超时,继续循环
continue
except Exception as e:
print(f"Error in stream loop: {e}")
yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'error', 'content': f'流处理错误: {str(e)}'}, ensure_ascii=False)}\n\n"
return
except Exception as e:
yield f"data: {json.dumps({'type': 'stream_error', 'error': str(e)}, ensure_ascii=False)}\n\n"
yield f"data: {json.dumps({'type': 'error', 'content': f'查询错误: {str(e)}'}, ensure_ascii=False)}\n\n"
finally:
# 清理回调函数
if message_callback:
message_stream.remove_callback(message_callback)
return StreamingResponse(
query_stream_generator(),
media_type="text/event-stream",
@ -379,7 +381,7 @@ def get_messages():
async def stream_messages() -> StreamingResponse:
"""
Stream messages in real-time using Server-Sent Events.
Returns:
StreamingResponse: A streaming response with real-time messages.
"""
@ -387,7 +389,7 @@ async def stream_messages() -> StreamingResponse:
"""生成SSE事件流"""
# 创建一个队列来接收消息
message_queue = asyncio.Queue()
def message_callback(message):
"""消息回调函数"""
try:
@ -395,21 +397,21 @@ async def stream_messages() -> StreamingResponse:
asyncio.create_task(message_queue.put(message))
except Exception as e:
print(f"Error in message callback: {e}")
# 注册回调函数
message_callbacks.append(message_callback)
message_stream = get_message_stream()
message_stream.add_callback(message_callback)
try:
# 发送连接建立消息
yield f"data: {json.dumps({'type': 'connection', 'message': 'Connected to message stream'}, ensure_ascii=False)}\n\n"
while True:
try:
# 等待新消息,设置超时以便能够检查连接状态
message = await asyncio.wait_for(message_queue.get(), timeout=1.0)
message = await asyncio.wait_for(message_queue.get(), timeout=2.0)
# 发送消息数据
message_data = {
'type': message.type.value,
@ -418,11 +420,11 @@ async def stream_messages() -> StreamingResponse:
'metadata': message.metadata or {}
}
yield f"data: {json.dumps(message_data, ensure_ascii=False)}\n\n"
except asyncio.TimeoutError:
# 发送心跳保持连接
yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': asyncio.get_event_loop().time()}, ensure_ascii=False)}\n\n"
except Exception as e:
print(f"Error in event generator: {e}")
finally:
@ -430,7 +432,7 @@ async def stream_messages() -> StreamingResponse:
if message_callback in message_callbacks:
message_callbacks.remove(message_callback)
message_stream.remove_callback(message_callback)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",

180
simplified_message_format.md

@ -0,0 +1,180 @@
# 简化消息格式
## 概述
为了简化前后端的消息通信,我们将原来的多种消息类型统一为简单的 `type``content` 格式。
## 消息格式
所有消息都遵循以下统一格式:
```json
{
"type": "消息类型",
"content": "消息内容",
"timestamp": 时间戳,
"metadata": {}
}
```
## 消息类型
### 1. start - 开始消息
- **用途**: 表示查询或操作的开始
- **示例**:
```json
{
"type": "start",
"content": "开始处理查询: 请写一篇关于Milvus向量数据库的报告"
}
```
### 2. info - 信息消息
- **用途**: 表示处理过程中的信息、状态更新、迭代信息等
- **示例**:
```json
{
"type": "info",
"content": "iteration 1: 正在搜索相关文档..."
}
```
### 3. answer - 答案消息
- **用途**: 表示最终答案或重要结果
- **示例**:
```json
{
"type": "answer",
"content": "Milvus的详细报告: ..."
}
```
### 4. complete - 完成消息
- **用途**: 表示操作完成
- **示例**:
```json
{
"type": "complete",
"content": "查询完成"
}
```
### 5. error - 错误消息
- **用途**: 表示错误信息
- **示例**:
```json
{
"type": "error",
"content": "查询失败: 无法连接到数据库"
}
```
## 使用示例
### 后端使用
```python
from deepsearcher.utils.message_stream import (
send_start, send_info, send_answer, send_complete, send_error
)
# 发送开始消息
send_start("开始处理查询: 请写一篇关于Milvus的报告")
# 发送信息消息
send_info("iteration 1: 正在搜索相关文档...")
send_info("找到5个相关文档片段")
# 发送答案消息
send_answer("Milvus的详细报告: ...")
# 发送完成消息
send_complete("查询完成")
# 发送错误消息
send_error("查询失败: 无法连接到数据库")
```
### 前端处理
```javascript
// 处理消息流
function handleStreamMessage(data) {
const message = JSON.parse(data);
switch (message.type) {
case 'start':
console.log('开始:', message.content);
break;
case 'info':
console.log('信息:', message.content);
break;
case 'answer':
console.log('答案:', message.content);
break;
case 'complete':
console.log('完成:', message.content);
break;
case 'error':
console.error('错误:', message.content);
break;
}
}
```
## 优势
1. **简化格式**: 统一的消息格式,易于理解和处理
2. **类型清晰**: 5种基本类型覆盖所有使用场景
3. **易于扩展**: 可以轻松添加新的消息类型
4. **前端友好**: 前端处理逻辑更加简单
5. **调试方便**: 消息格式清晰,便于调试和日志记录
## 迁移说明
### 从旧格式迁移
| 旧类型 | 新类型 | 说明 |
|--------|--------|------|
| `search` | `info` | 搜索相关信息 |
| `think` | `info` | 思考过程信息 |
| `answer` | `answer` | 保持不变 |
| `complete` | `complete` | 保持不变 |
| `query_start` | `start` | 查询开始 |
| `query_error` | `error` | 查询错误 |
| `stream_error` | `error` | 流错误 |
### 代码更新
所有使用旧消息类型的代码都需要更新:
```python
# 旧代码
send_search("搜索内容...")
send_think("思考内容...")
# 新代码
send_info("搜索内容...")
send_info("思考内容...")
```
## 测试
可以使用以下方式测试消息格式:
```python
from deepsearcher.utils.message_stream import get_message_stream
# 获取消息流实例
message_stream = get_message_stream()
# 获取所有消息
messages = message_stream.get_messages_as_dicts()
# 验证格式
for msg in messages:
assert 'type' in msg
assert 'content' in msg
assert 'timestamp' in msg
print(f"类型: {msg['type']}, 内容: {msg['content']}")
```

38
test_fix.py

@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
测试修复后的查询功能
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from deepsearcher.configuration import Configuration, init_config
from deepsearcher.online_query import query
def test_query_fix():
"""测试修复后的查询功能"""
print("=== 测试修复后的查询功能 ===")
# 初始化配置
config = Configuration()
init_config(config)
try:
print("开始查询...")
result_text, retrieval_results = query("什么是Milvus?", max_iter=1)
print(f"查询完成!")
print(f"结果长度: {len(result_text) if result_text else 0}")
print(f"检索结果数量: {len(retrieval_results) if retrieval_results else 0}")
if result_text:
print(f"结果预览: {result_text[:200]}...")
except Exception as e:
import traceback
print(f"查询失败: {e}")
print(f"错误详情: {traceback.format_exc()}")
if __name__ == "__main__":
test_query_fix()
Loading…
Cancel
Save