|
|
|
import json
|
|
|
|
import time
|
|
|
|
from typing import Dict, List, Optional, Callable, Any
|
|
|
|
from enum import Enum
|
|
|
|
from dataclasses import dataclass, asdict
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
|
|
class MessageType(Enum):
|
|
|
|
"""简化的消息类型枚举"""
|
|
|
|
START = "start" # 开始消息
|
|
|
|
INFO = "info" # 信息消息
|
|
|
|
ANSWER = "answer" # 答案消息
|
|
|
|
COMPLETE = "complete" # 完成消息
|
|
|
|
ERROR = "error" # 错误消息
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class Message:
|
|
|
|
"""简化的消息数据结构"""
|
|
|
|
type: MessageType
|
|
|
|
content: str
|
|
|
|
timestamp: float
|
|
|
|
metadata: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
|
"""转换为字典格式"""
|
|
|
|
return {
|
|
|
|
"type": self.type.value,
|
|
|
|
"content": self.content,
|
|
|
|
"timestamp": self.timestamp,
|
|
|
|
"metadata": self.metadata or {}
|
|
|
|
}
|
|
|
|
|
|
|
|
def to_json(self) -> str:
|
|
|
|
"""转换为JSON字符串"""
|
|
|
|
return json.dumps(self.to_dict(), ensure_ascii=False)
|
|
|
|
|
|
|
|
|
|
|
|
class MessageStream:
|
|
|
|
"""简化的消息流管理器"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self._messages: List[Message] = []
|
|
|
|
self._callbacks: List[Callable[[Message], None]] = []
|
|
|
|
self._enabled = True
|
|
|
|
|
|
|
|
def add_callback(self, callback: Callable[[Message], None]):
|
|
|
|
"""添加消息回调函数"""
|
|
|
|
self._callbacks.append(callback)
|
|
|
|
|
|
|
|
def remove_callback(self, callback: Callable[[Message], None]):
|
|
|
|
"""移除消息回调函数"""
|
|
|
|
if callback in self._callbacks:
|
|
|
|
self._callbacks.remove(callback)
|
|
|
|
|
|
|
|
def enable(self):
|
|
|
|
"""启用消息流"""
|
|
|
|
self._enabled = True
|
|
|
|
|
|
|
|
def disable(self):
|
|
|
|
"""禁用消息流"""
|
|
|
|
self._enabled = False
|
|
|
|
|
|
|
|
def send_message(self, msg_type: MessageType, content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送消息"""
|
|
|
|
if not self._enabled:
|
|
|
|
return
|
|
|
|
|
|
|
|
message = Message(
|
|
|
|
type=msg_type,
|
|
|
|
content=content,
|
|
|
|
timestamp=time.time(),
|
|
|
|
metadata=metadata
|
|
|
|
)
|
|
|
|
|
|
|
|
self._messages.append(message)
|
|
|
|
|
|
|
|
# 调用所有回调函数
|
|
|
|
for callback in self._callbacks:
|
|
|
|
try:
|
|
|
|
callback(message)
|
|
|
|
except Exception as e:
|
|
|
|
print(f"Error in message callback: {e}")
|
|
|
|
|
|
|
|
def send_start(self, content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送开始消息"""
|
|
|
|
self.send_message(MessageType.START, content, metadata)
|
|
|
|
|
|
|
|
def send_info(self, content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送信息消息"""
|
|
|
|
self.send_message(MessageType.INFO, content, metadata)
|
|
|
|
|
|
|
|
def send_answer(self, content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送答案消息"""
|
|
|
|
self.send_message(MessageType.ANSWER, content, metadata)
|
|
|
|
|
|
|
|
def send_complete(self, content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送完成消息"""
|
|
|
|
self.send_message(MessageType.COMPLETE, content, metadata)
|
|
|
|
|
|
|
|
def send_error(self, content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""发送错误消息"""
|
|
|
|
self.send_message(MessageType.ERROR, content, metadata)
|
|
|
|
|
|
|
|
def get_messages(self) -> List[Message]:
|
|
|
|
"""获取所有消息"""
|
|
|
|
return self._messages.copy()
|
|
|
|
|
|
|
|
def get_messages_by_type(self, msg_type: MessageType) -> List[Message]:
|
|
|
|
"""根据类型获取消息"""
|
|
|
|
return [msg for msg in self._messages if msg.type == msg_type]
|
|
|
|
|
|
|
|
def clear_messages(self):
|
|
|
|
"""清空所有消息"""
|
|
|
|
self._messages.clear()
|
|
|
|
|
|
|
|
def get_messages_as_dicts(self) -> List[Dict[str, Any]]:
|
|
|
|
"""获取所有消息的字典格式"""
|
|
|
|
return [msg.to_dict() for msg in self._messages]
|
|
|
|
|
|
|
|
def get_messages_as_json(self) -> str:
|
|
|
|
"""获取所有消息的JSON格式"""
|
|
|
|
return json.dumps(self.get_messages_as_dicts(), ensure_ascii=False)
|
|
|
|
|
|
|
|
|
|
|
|
# 全局消息流实例
|
|
|
|
message_stream = MessageStream()
|
|
|
|
|
|
|
|
|
|
|
|
def send_start(content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""全局开始消息发送函数"""
|
|
|
|
message_stream.send_start(content, metadata)
|
|
|
|
|
|
|
|
|
|
|
|
def send_info(content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""全局信息消息发送函数"""
|
|
|
|
message_stream.send_info(content, metadata)
|
|
|
|
|
|
|
|
|
|
|
|
def send_answer(content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""全局答案消息发送函数"""
|
|
|
|
message_stream.send_answer(content, metadata)
|
|
|
|
|
|
|
|
|
|
|
|
def send_complete(content: str = "查询完成", metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""全局完成消息发送函数"""
|
|
|
|
message_stream.send_complete(content, metadata)
|
|
|
|
|
|
|
|
|
|
|
|
def send_error(content: str, metadata: Optional[Dict[str, Any]] = None):
|
|
|
|
"""全局错误消息发送函数"""
|
|
|
|
message_stream.send_error(content, metadata)
|
|
|
|
|
|
|
|
|
|
|
|
def get_message_stream() -> MessageStream:
|
|
|
|
"""获取全局消息流实例"""
|
|
|
|
return message_stream
|