You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

431 lines
18 KiB

from deepsearcher.agent.base import BaseAgent, describe_class
2 weeks ago
from deepsearcher.embedding.base import BaseEmbedding
from deepsearcher.llm.base import BaseLLM
from deepsearcher.utils import log
from deepsearcher.vector_db import RetrievalResult
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
from collections import defaultdict
2 weeks ago
7 days ago
COLLECTION_ROUTE_PROMPT = """
我现在给你提供collection_name(s)和对应的collection_description(s).
请你选择可能与问题相关的合集名称并返回一个字符串列表
如果没有相关合集请返回一个空列表
"问题": {query}
"合集信息": {collection_info}
使用的语言与问题相同
你需要返回的格式是 a python list of str without any addtional content:
"""
7 days ago
SUB_QUERY_PROMPT = """
为了能够全面的回答这个问题请你尝试把原本的问题拆分或扩展为多个子问题
如果原问题本身非常简单没有必要进行拆分则保留输出原问题本身
需要保证每个子问题都具体清晰不可分原子性最终返回一个字符串列表
2 weeks ago
原问题: {original_query}
2 weeks ago
<EXAMPLE>
示例输入:
"请解释机器学习"
2 weeks ago
示例输出例子中的数量不是要求:
2 weeks ago
[
"什么是机器学习?",
"机器学习的使用目的是什么?",
"机器学习和深度学习的区别是什么?",
"机器学习的历史演进过程?"
2 weeks ago
]
2 weeks ago
</EXAMPLE>
使用的语言与原问题相同
你需要返回的是 a python list of str without any addtional content:
2 weeks ago
"""
7 days ago
RERANK_PROMPT = """
根据当前的问题和获取到的文档片段
请你对当前获取到的文档是否能帮助回答这个问题直接或间接全面或部分都可以给出一个快速判断
对于每一个文档片段你只应该返回"YES"或者"NO"需要注意顺序和数量
2 weeks ago
问题: {query}
2 weeks ago
检索到的文档片段:
{chunks}
例如给定4个chunks实际检索到的文档片段不一定是4个返回: ["YES", "NO", "YES", "YES"]
使用的语言与问题相同
你需要返回的是 a python list of str without any addtional content:
"""
2 weeks ago
REFLECT_PROMPT = """
根据原问题和子问题以及获取到的文档片段请你决定是否要生成更多的问题
如果已经获得的文档片段没能覆盖所有的子问题这意味着这些文档无法被检索到
你可以尝试生成相似但些许不同的问题来尝试重新检索但是也可以根据获得到的文档片段进行批评思考生成新的问题来保证原问题的回答的准确和全面
如果没有真的必要继续研究取决于你的判断返回一个空列表
2 weeks ago
原问题: {original_query}
2 weeks ago
子问题: {all_sub_queries}
2 weeks ago
检索到的文档片段:
{chunks}
2 weeks ago
使用的语言与原问题相同
你需要返回的是 a python list of str without any addtional content:
"""
2 weeks ago
SUMMARY_PROMPT = """
你是一个内容分析专家请你根据提供的问题和检索到的信息生成详尽的长文回答
如果检索到的信息不足以回答问题或者必须添加额外信息才能能回答你应该使用你的知识来进行补充
这种情况下你自己提供的信息需要使用"<unreferenced>your knowledge here</unreferenced>"括起来
添加多个<unreferenced></unreferenced>块时不需要序号并且<unreferenced></unreferenced>块不应该被单独分到一个段落中而是放到回答的各处
同时你应该根据提供的信息生成文内引用和文末参考资料列表
如果多个片段是相同的来源或者一个片段可以回答多个问题文内引用可以引用多次但文末只引用一次来源即文末的引用列表中不能有重复的来源
例子:
<EXAMPLE>
文内引用示例使用markdown和html的混合语法格式必须和例子一致:
"XGBoost是非常强大的集成学习模型[<sup>[2]</sup>](#2)"
(必须使用 "[<sup>[index]</sup>](#index)" 而不是 "[index]"注意不要遗漏"(#index)"这里的index是chunk对应的reference的id)
文末引用示例 (需要href与前文参考中的一致不需要对每个chunk分配一个引用而是每一个referecen共用一个引用):
<div id="2"><a href="files/docs/chap_001_003_models.md" style="text-decoration: none;>[2] chap_001_models.md</a></div>
</EXAMPLE>
2 weeks ago
原问题: {original_query}
2 weeks ago
子问题: {all_sub_queries}
2 weeks ago
检索到的文档片段:
{chunks}
2 weeks ago
注意你需要使用与原始问题的相同的语言来回答
2 weeks ago
"""
@describe_class(
"This agent is suitable for handling general and simple queries, such as given a topic and then writing a report, survey, or article."
)
class DeepSearch(BaseAgent):
2 weeks ago
"""
Deep Search agent implementation for comprehensive information retrieval.
This agent performs a thorough search through the knowledge base, analyzing
multiple aspects of the query to provide comprehensive and detailed answers.
"""
def __init__(
self,
llm: BaseLLM,
embedding_model: BaseEmbedding,
vector_db: BaseVectorDB,
max_iter: int,
7 days ago
route_collection: bool = False,
2 weeks ago
text_window_splitter: bool = True,
**kwargs,
):
"""
Initialize the DeepSearch agent.
Args:
llm: The language model to use for generating answers.
embedding_model: The embedding model to use for query embedding.
vector_db: The vector database to search for relevant documents.
max_iter: The maximum number of iterations for the search process.
route_collection: Whether to use a collection router for search.
text_window_splitter: Whether to use text_window splitter.
**kwargs: Additional keyword arguments for customization.
"""
self.llm = llm
self.embedding_model = embedding_model
self.vector_db = vector_db
self.max_iter = max_iter
self.route_collection = route_collection
self.all_collections = [
collection_info.collection_name
for collection_info in self.vector_db.list_collections(dim=embedding_model.dimension)
]
2 weeks ago
self.text_window_splitter = text_window_splitter
def invoke(self, query: str, dim: int, **kwargs) -> list[str]:
"""
Determine which collections are relevant for the given query.
This method analyzes the query content and selects collections that are
most likely to contain information relevant to answering the query.
Args:
query (str): The query to analyze.
dim (int): The dimension of the vector space to search in.
Returns:
List[str]: A list of selected collection names
"""
collection_infos = self.vector_db.list_collections(dim=dim)
if len(collection_infos) == 0:
log.color_print(
"No collection found in the vector database!"
)
return []
if len(collection_infos) == 1:
the_only_collection = collection_infos[0].collection_name
log.color_print(
f"<think> Perform search [{query}] on the vector DB collection: {the_only_collection} </think>\n"
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
query=query,
collection_info=[
{
"collection_name": collection_info.collection_name,
"collection_description": collection_info.description,
}
for collection_info in collection_infos
],
)
response = self.llm.chat(
messages=[{"role": "user", "content": vector_db_search_prompt}]
)
selected_collections = self.llm.literal_eval(response)
for collection_info in collection_infos:
# If a collection description is not provided, use the query as the search query
if not collection_info.description:
selected_collections.append(collection_info.collection_name)
# If the default collection exists, use the query as the search query
if self.vector_db.default_collection == collection_info.collection_name:
selected_collections.append(collection_info.collection_name)
selected_collections = list(set(selected_collections))
log.color_print(
f"<think> Perform search [{query}] on the vector DB collections: {selected_collections} </think>\n"
)
return selected_collections
def _generate_sub_queries(self, original_query: str) -> tuple[list[str], int]:
content = self.llm.chat(
2 weeks ago
messages=[
{"role": "user", "content": SUB_QUERY_PROMPT.format(original_query=original_query)}
]
)
content = self.llm.remove_think(content)
return self.llm.literal_eval(content)
2 weeks ago
def _search_chunks_from_vectordb(self, query: str):
2 weeks ago
if self.route_collection:
selected_collections = self.invoke(
2 weeks ago
query=query, dim=self.embedding_model.dimension
)
else:
selected_collections = self.all_collections
2 weeks ago
all_retrieved_results = []
query_vector = self.embedding_model.embed_query(query)
for collection in selected_collections:
log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
retrieved_results = self.vector_db.search_data(
collection=collection, vector=query_vector, query_text=query
)
if not retrieved_results or len(retrieved_results) == 0:
log.color_print(
f"<search> No relevant document chunks found in '{collection}'! </search>\n"
)
continue
# Format all chunks for batch processing
chunks = self._format_chunks(retrieved_results)
# Batch process all chunks with a single LLM call
content = self.llm.chat(
messages=[
{
"role": "user",
"content": RERANK_PROMPT.format(
query=query,
chunks=chunks,
),
}
]
)
content = self.llm.remove_think(content).strip()
# Parse the response to determine which chunks are relevant
try:
relevance_list = self.llm.literal_eval(content)
if not isinstance(relevance_list, list):
raise ValueError("Response is not a list")
except (ValueError, SyntaxError):
# Fallback: if parsing fails, treat all chunks as relevant
log.color_print(f"Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: {content}")
relevance_list = ["YES"] * len(retrieved_results)
# Ensure we have enough relevance judgments for all chunks
while len(relevance_list) < len(retrieved_results):
relevance_list.append("YES") # Default to relevant if no judgment provided
# Filter relevant chunks based on LLM response
2 weeks ago
accepted_chunk_num = 0
references = set()
for i, retrieved_result in enumerate(retrieved_results):
# Check if we have a relevance judgment for this chunk
is_relevant = (
i < len(relevance_list) and
"YES" in relevance_list[i].upper() and
"NO" not in relevance_list[i].upper()) if i < len(relevance_list
) else True
if is_relevant:
2 weeks ago
all_retrieved_results.append(retrieved_result)
accepted_chunk_num += 1
references.add(retrieved_result.reference)
2 weeks ago
if accepted_chunk_num > 0:
log.color_print(
f"<search> Accept {accepted_chunk_num} document chunk(s) from references: {list(references)} </search>\n"
)
else:
log.color_print(
f"<search> No document chunk accepted from '{collection}'! </search>\n"
)
return all_retrieved_results
2 weeks ago
def _generate_more_sub_queries(
self, original_query: str, all_sub_queries: list[str], all_retrieved_results: list[RetrievalResult]
) -> list[str]:
chunks = self._format_chunks(all_retrieved_results)
2 weeks ago
reflect_prompt = REFLECT_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks=chunks
if len(all_retrieved_results) > 0
2 weeks ago
else "NO RELATED CHUNKS FOUND.",
)
response = self.llm.chat([{"role": "user", "content": reflect_prompt}])
response = self.llm.remove_think(response)
return self.llm.literal_eval(response)
2 weeks ago
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], list[str]]:
2 weeks ago
"""
Retrieve relevant documents from the knowledge base for the given query.
This method performs a deep search through the vector database to find
the most relevant documents for answering the query.
Args:
original_query (str): The query to search for.
**kwargs: Additional keyword arguments for customizing the retrieval.
Returns:
Tuple[List[RetrievalResult], int, dict]: A tuple containing:
- A list of retrieved document results
- Additional information about the retrieval process
"""
max_iter = kwargs.pop("max_iter", self.max_iter)
### SUB QUERIES ###
log.color_print(f"<query> {original_query} </query>\n")
all_search_results = []
2 weeks ago
all_sub_queries = []
sub_queries = self._generate_sub_queries(original_query)
2 weeks ago
if not sub_queries:
log.color_print("No sub queries were generated by the LLM. Exiting.")
return [], {}
2 weeks ago
else:
log.color_print(f"</think> Break down the original query into new sub queries: {sub_queries} ")
2 weeks ago
all_sub_queries.extend(sub_queries)
for it in range(max_iter):
log.color_print(f">> Iteration: {it + 1}\n")
# Execute all search tasks sequentially
for query in sub_queries:
result = self._search_chunks_from_vectordb(query)
all_search_results.extend(result)
undeduped_len = len(all_search_results)
all_search_results = deduplicate(all_search_results)
deduped_len = len(all_search_results)
if undeduped_len - deduped_len != 0:
log.color_print(
f"<search> Remove {undeduped_len - deduped_len} duplicates </search> "
)
2 weeks ago
# search_res_from_internet = deduplicate_results(search_res_from_internet)
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
if it + 1 >= max_iter:
log.color_print("</think> Exceeded maximum iterations. Exiting. ")
2 weeks ago
break
### REFLECTION & GET MORE SUB QUERIES ###
log.color_print("</think> Reflecting on the search results... ")
sub_queries = self._generate_more_sub_queries(
original_query, all_sub_queries, all_search_results
2 weeks ago
)
if not sub_queries or len(sub_queries) == 0:
log.color_print("</think> No new search queries were generated. Exiting. ")
2 weeks ago
break
else:
log.color_print(
f"</think> New search queries for next iteration: {sub_queries} ")
all_sub_queries.extend(sub_queries)
2 weeks ago
all_search_results = deduplicate(all_search_results)
return all_search_results, all_sub_queries
2 weeks ago
def query(self, original_query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
2 weeks ago
"""
Query the agent and generate an answer based on retrieved documents.
This method retrieves relevant documents and uses the language model
to generate a comprehensive answer to the query.
Args:
query (str): The query to answer.
**kwargs: Additional keyword arguments for customizing the query process.
Returns:
Tuple[str, List[RetrievalResult], int]: A tuple containing:
- The generated answer
- A list of retrieved document results
"""
all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs)
2 weeks ago
if not all_retrieved_results or len(all_retrieved_results) == 0:
log.color_print(f"No relevant information found for query '{original_query}'.")
return "", []
chunks = self._format_chunks(all_retrieved_results)
2 weeks ago
log.color_print(
f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n"
)
summary_prompt = SUMMARY_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks=chunks
2 weeks ago
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
2 weeks ago
log.color_print("\n==== FINAL ANSWER====\n")
log.color_print(self.llm.remove_think(response))
return self.llm.remove_think(response), all_retrieved_results
def _format_chunks(self, retrieved_results: list[RetrievalResult]):
# 以referecen为key,把chunk放到字典中
references = defaultdict(list)
for result in retrieved_results:
references[result.reference].append(result.text)
chunks = []
chunk_count = 0
for i, reference in enumerate(references):
formated = f"<reference id='{i + 1}' href='{reference}'>\n" + "".join(
[
f"<chunk id='{j + 1 + chunk_count}'>\n{chunk}\n</chunk>\n"
for j, chunk in enumerate(references[reference])
]
) + "</reference>\n"
chunks.append(formated)
chunk_count += len(references[reference])
return "".join(chunks)