You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

4.4 KiB

消息流系统 (Message Stream System)

概述

DeepSearcher 的消息流系统是一个新的消息传输机制,用于替代原来的日志传输方式。该系统支持三种类型的消息:searchthinkanswer,并提供了灵活的消息管理和传输功能。

消息类型

1. Search 消息

  • 类型: search
  • 用途: 表示搜索相关的操作和状态
  • 示例:
    • "在向量数据库中搜索相关信息..."
    • "找到5个相关文档片段"
    • "搜索人工智能的定义..."

2. Think 消息

  • 类型: think
  • 用途: 表示思考和推理过程
  • 示例:
    • "开始处理查询: 什么是人工智能?"
    • "分析搜索结果..."
    • "生成子查询: 人工智能的定义、历史、应用"

3. Answer 消息

  • 类型: answer
  • 用途: 表示最终答案和结果
  • 示例:
    • "==== FINAL ANSWER===="
    • "人工智能是计算机科学的一个分支..."

核心组件

MessageStream 类

主要的消息流管理器,提供以下功能:

from deepsearcher.utils.message_stream import MessageStream

# 创建消息流实例
message_stream = MessageStream()

# 发送消息
message_stream.send_search("搜索内容...")
message_stream.send_think("思考内容...")
message_stream.send_answer("答案内容...")

# 获取消息
messages = message_stream.get_messages()
messages_dict = message_stream.get_messages_as_dicts()
messages_json = message_stream.get_messages_as_json()

全局函数

为了方便使用,提供了全局函数:

from deepsearcher.utils.message_stream import send_search, send_think, send_answer

# 直接发送消息
send_search("搜索内容...")
send_think("思考内容...")
send_answer("答案内容...")

API 接口

1. 获取消息

GET /messages/

返回所有消息的列表:

{
  "messages": [
    {
      "type": "search",
      "content": "搜索内容...",
      "timestamp": 1755043653.9606102,
      "metadata": {}
    }
  ]
}

2. 清空消息

POST /clear-messages/

清空所有消息并返回成功状态:

{
  "message": "Messages cleared successfully"
}

3. 查询接口(已更新)

GET /query/?original_query=<query>&max_iter=<iterations>

现在返回结果包含消息流:

{
  "result": "最终答案...",
  "messages": [
    {
      "type": "search",
      "content": "搜索内容...",
      "timestamp": 1755043653.9606102,
      "metadata": {}
    }
  ]
}

前端集成

前端界面现在包含一个消息流显示区域,实时显示处理过程中的各种消息:

CSS 样式

  • .message-search: 搜索消息样式(蓝色边框)
  • .message-think: 思考消息样式(黄色边框)
  • .message-answer: 答案消息样式(绿色边框)

JavaScript 功能

  • displayMessages(messages): 显示消息流
  • 自动滚动到最新消息
  • 时间戳显示

使用示例

后端使用

from deepsearcher.utils.message_stream import send_search, send_think, send_answer

# 在搜索过程中发送消息
send_think("开始处理查询...")
send_search("在数据库中搜索...")
send_search("找到相关文档...")
send_think("分析结果...")
send_answer("最终答案...")

前端使用

// 获取消息
const response = await fetch('/query/?original_query=test&max_iter=3');
const data = await response.json();

// 显示消息流
if (data.messages && data.messages.length > 0) {
    displayMessages(data.messages);
}

优势

  1. 结构化数据: 消息包含类型、内容、时间戳和元数据
  2. 类型安全: 使用枚举确保消息类型的一致性
  3. 灵活传输: 支持多种输出格式(字典、JSON)
  4. 实时显示: 前端可以实时显示处理过程
  5. 易于扩展: 可以轻松添加新的消息类型和功能

迁移说明

从原来的日志系统迁移到新的消息流系统:

原来的代码

log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
log.color_print(f"<think> Summarize answer... </think>\n")
log.color_print(f"<answer> Final answer... </answer>\n")

新的代码

send_search(f"Search [{query}] in [{collection}]...")
send_think("Summarize answer...")
send_answer("Final answer...")

测试

运行测试脚本验证系统功能:

python test_message_stream.py

这将测试消息流的基本功能,包括消息发送、获取和格式化。