You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

402 lines
17 KiB

from deepsearcher.agent.base import BaseAgent, describe_class
from deepsearcher.embedding.base import BaseEmbedding
from deepsearcher.llm.base import BaseLLM
from deepsearcher.utils import log
from deepsearcher.vector_db import RetrievalResult
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
COLLECTION_ROUTE_PROMPT = """
I provide you with collection_name(s) and corresponding collection_description(s).
Please select the collection names that may be related to the question and return a python list of str.
If there is no collection related to the question, you can return an empty list.
"QUESTION": {question}
"COLLECTION_INFO": {collection_info}
When you return, you can ONLY return a json convertable python list of str, WITHOUT any other additional content.
Your selected collection name list is:
"""
SUB_QUERY_PROMPT = """
To answer this question more comprehensively, please break down the original question into few numbers of sub-questions (more if necessary).
If this is a very simple question and no decomposition is necessary, then keep the only one original question.
Make sure each sub-question is clear, concise and atomic.
Return as list of str in python style and json convertable.
Original Question: {original_query}
<EXAMPLE>
Example input:
"Explain deep learning"
Example output:
[
"What is deep learning?",
"What is the difference between deep learning and machine learning?",
"What is the history of deep learning?"
]
</EXAMPLE>
Provide your response in a python code list of str format:
"""
RERANK_PROMPT = """
Based on the query questions and the retrieved chunks, determine whether each chunk is helpful in answering any of the query questions.
For each chunk, you must return "YES" or "NO" without any other information.
Query Questions: {query}
Retrieved Chunks:
{retrieved_chunks}
Respond with a list of "YES" or "NO" values, one for each chunk, in the same order as the chunks are listed.
For example, if there is a list of four chunks, the answer could be: ["YES", "NO", "YES", "YES"]
"""
REFLECT_PROMPT = """
Determine whether additional search queries are needed based on the original query, previous sub queries, and all retrieved document chunks.
If returned chunks does not cover all previous sub-queries, this means that there are no related documents can be retrieved.
In this case, try generate simliar but slightly different queries to the previous sub-queries.
And if further research is needed based on the new information which those chunks provided, give more queries on the basis of them.
(which is prefered, even if the previous sub-queries can be well answered by retrieved chunks, but ultimately according to your judge)
If no further research is needed, return an empty list.
Original Query: {question}
Previous Sub Queries: {mini_questions}
Related Chunks:
{mini_chunk_str}
Respond exclusively in valid List of str format without any other text."""
SUMMARY_PROMPT = """
You are a AI content analysis expert.
Please generate a long, specific and detailed answer or report based on the previous queries and the retrieved document chunks.
If the chunks are not enough to answer the query or additional information is needed to enhance the content, you should answer with your own knowledge.
In this case, mark the part(s) that generated by your own with <unref>your knowledge here</unref>
(Don't place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the report)
Original Query: {question}
Previous Sub Queries: {mini_questions}
Related Chunks:
{mini_chunk_str}
"""
@describe_class(
"This agent is suitable for handling general and simple queries, such as given a topic and then writing a report, survey, or article."
)
class DeepSearch(BaseAgent):
"""
Deep Search agent implementation for comprehensive information retrieval.
This agent performs a thorough search through the knowledge base, analyzing
multiple aspects of the query to provide comprehensive and detailed answers.
"""
def __init__(
self,
llm: BaseLLM,
embedding_model: BaseEmbedding,
vector_db: BaseVectorDB,
max_iter: int = 3,
route_collection: bool = True,
text_window_splitter: bool = True,
**kwargs,
):
"""
Initialize the DeepSearch agent.
Args:
llm: The language model to use for generating answers.
embedding_model: The embedding model to use for query embedding.
vector_db: The vector database to search for relevant documents.
max_iter: The maximum number of iterations for the search process.
route_collection: Whether to use a collection router for search.
text_window_splitter: Whether to use text_window splitter.
**kwargs: Additional keyword arguments for customization.
"""
self.llm = llm
self.embedding_model = embedding_model
self.vector_db = vector_db
self.max_iter = max_iter
self.route_collection = route_collection
self.all_collections = [
collection_info.collection_name
for collection_info in self.vector_db.list_collections(dim=embedding_model.dimension)
]
self.text_window_splitter = text_window_splitter
def invoke(self, query: str, dim: int, **kwargs) -> list[str]:
"""
Determine which collections are relevant for the given query.
This method analyzes the query content and selects collections that are
most likely to contain information relevant to answering the query.
Args:
query (str): The query to analyze.
dim (int): The dimension of the vector space to search in.
Returns:
List[str]: A list of selected collection names
"""
collection_infos = self.vector_db.list_collections(dim=dim)
if len(collection_infos) == 0:
log.color_print(
"No collections found in the vector database. Please check the database connection."
)
return []
if len(collection_infos) == 1:
the_only_collection = collection_infos[0].collection_name
log.color_print(
f"<think> Perform search [{query}] on the vector DB collection: {the_only_collection} </think>\n"
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
question=query,
collection_info=[
{
"collection_name": collection_info.collection_name,
"collection_description": collection_info.description,
}
for collection_info in collection_infos
],
)
response = self.llm.chat(
messages=[{"role": "user", "content": vector_db_search_prompt}]
)
selected_collections = self.llm.literal_eval(response)
for collection_info in collection_infos:
# If a collection description is not provided, use the query as the search query
if not collection_info.description:
selected_collections.append(collection_info.collection_name)
# If the default collection exists, use the query as the search query
if self.vector_db.default_collection == collection_info.collection_name:
selected_collections.append(collection_info.collection_name)
selected_collections = list(set(selected_collections))
log.color_print(
f"<think> Perform search [{query}] on the vector DB collections: {selected_collections} </think>\n"
)
return selected_collections
def _generate_sub_queries(self, original_query: str) -> tuple[list[str], int]:
content = self.llm.chat(
messages=[
{"role": "user", "content": SUB_QUERY_PROMPT.format(original_query=original_query)}
]
)
content = self.llm.remove_think(content)
return self.llm.literal_eval(content)
def _search_chunks_from_vectordb(self, query: str):
if self.route_collection:
selected_collections = self.invoke(
query=query, dim=self.embedding_model.dimension
)
else:
selected_collections = self.all_collections
all_retrieved_results = []
query_vector = self.embedding_model.embed_query(query)
for collection in selected_collections:
log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
retrieved_results = self.vector_db.search_data(
collection=collection, vector=query_vector, query_text=query
)
if not retrieved_results or len(retrieved_results) == 0:
log.color_print(
f"<search> No relevant document chunks found in '{collection}'! </search>\n"
)
continue
# Format all chunks for batch processing
formatted_chunks = ""
for i, retrieved_result in enumerate(retrieved_results):
formatted_chunks += f"<chunk_{i + 1}>\n{retrieved_result.text}\n</chunk_{i + 1}>\n"
# Batch process all chunks with a single LLM call
content = self.llm.chat(
messages=[
{
"role": "user",
"content": RERANK_PROMPT.format(
query=query,
retrieved_chunks=formatted_chunks,
),
}
]
)
content = self.llm.remove_think(content).strip()
# Parse the response to determine which chunks are relevant
try:
relevance_list = self.llm.literal_eval(content)
if not isinstance(relevance_list, list):
raise ValueError("Response is not a list")
except (ValueError, SyntaxError):
# Fallback: if parsing fails, treat all chunks as relevant
log.color_print(f"Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: {content}")
relevance_list = ["YES"] * len(retrieved_results)
# Ensure we have enough relevance judgments for all chunks
while len(relevance_list) < len(retrieved_results):
relevance_list.append("YES") # Default to relevant if no judgment provided
# Filter relevant chunks based on LLM response
accepted_chunk_num = 0
references = set()
for i, retrieved_result in enumerate(retrieved_results):
# Check if we have a relevance judgment for this chunk
is_relevant = (
i < len(relevance_list) and
"YES" in relevance_list[i].upper() and
"NO" not in relevance_list[i].upper()) if i < len(relevance_list
) else True
if is_relevant:
all_retrieved_results.append(retrieved_result)
accepted_chunk_num += 1
references.add(retrieved_result.reference)
if accepted_chunk_num > 0:
log.color_print(
f"<search> Accept {accepted_chunk_num} document chunk(s) from references: {list(references)} </search>\n"
)
else:
log.color_print(
f"<search> No document chunk accepted from '{collection}'! </search>\n"
)
return all_retrieved_results
def _generate_gap_queries(
self, original_query: str, all_sub_queries: list[str], all_chunks: list[RetrievalResult]
) -> list[str]:
reflect_prompt = REFLECT_PROMPT.format(
question=original_query,
mini_questions=all_sub_queries,
mini_chunk_str=self._format_chunk_texts([chunk.text for chunk in all_chunks])
if len(all_chunks) > 0
else "NO RELATED CHUNKS FOUND.",
)
response = self.llm.chat([{"role": "user", "content": reflect_prompt}])
response = self.llm.remove_think(response)
return self.llm.literal_eval(response)
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], dict]:
"""
Retrieve relevant documents from the knowledge base for the given query.
This method performs a deep search through the vector database to find
the most relevant documents for answering the query.
Args:
original_query (str): The query to search for.
**kwargs: Additional keyword arguments for customizing the retrieval.
Returns:
Tuple[List[RetrievalResult], int, dict]: A tuple containing:
- A list of retrieved document results
- Additional information about the retrieval process
"""
max_iter = kwargs.pop("max_iter", self.max_iter)
### SUB QUERIES ###
log.color_print(f"<query> {original_query} </query>\n")
all_search_res = []
all_sub_queries = []
sub_queries = self._generate_sub_queries(original_query)
if not sub_queries:
log.color_print("No sub queries were generated by the LLM. Exiting.")
return [], {}
else:
log.color_print(
f"</think> Break down the original query into new sub queries: {sub_queries} ")
all_sub_queries.extend(sub_queries)
sub_gap_queries = sub_queries
for iter in range(max_iter):
log.color_print(f">> Iteration: {iter + 1}\n")
search_res_from_vectordb = []
search_res_from_internet = [] # TODO
# Execute all search tasks sequentially
for query in sub_gap_queries:
result = self._search_chunks_from_vectordb(query)
search_res_from_vectordb.extend(result)
search_res_from_vectordb = deduplicate(search_res_from_vectordb)
# search_res_from_internet = deduplicate_results(search_res_from_internet)
all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
if iter == max_iter - 1:
log.color_print("</think> Exceeded maximum iterations. Exiting. ")
break
### REFLECTION & GET GAP QUERIES ###
log.color_print("</think> Reflecting on the search results... ")
sub_gap_queries = self._generate_gap_queries(
original_query, all_sub_queries, all_search_res
)
if not sub_gap_queries or len(sub_gap_queries) == 0:
log.color_print("</think> No new search queries were generated. Exiting. ")
break
else:
log.color_print(
f"</think> New search queries for next iteration: {sub_gap_queries} ")
all_sub_queries.extend(sub_gap_queries)
all_search_res = deduplicate(all_search_res)
additional_info = {"all_sub_queries": all_sub_queries}
return all_search_res, additional_info
def query(self, query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
"""
Query the agent and generate an answer based on retrieved documents.
This method retrieves relevant documents and uses the language model
to generate a comprehensive answer to the query.
Args:
query (str): The query to answer.
**kwargs: Additional keyword arguments for customizing the query process.
Returns:
Tuple[str, List[RetrievalResult], int]: A tuple containing:
- The generated answer
- A list of retrieved document results
"""
all_retrieved_results, additional_info = self.retrieve(query, **kwargs)
if not all_retrieved_results or len(all_retrieved_results) == 0:
return f"No relevant information found for query '{query}'.", []
all_sub_queries = additional_info["all_sub_queries"]
chunk_texts = []
for chunk in all_retrieved_results:
if self.text_window_splitter and "wider_text" in chunk.metadata:
chunk_texts.append(chunk.metadata["wider_text"])
else:
chunk_texts.append(chunk.text)
log.color_print(
f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n"
)
summary_prompt = SUMMARY_PROMPT.format(
question=query,
mini_questions=all_sub_queries,
mini_chunk_str=self._format_chunk_texts(chunk_texts),
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
log.color_print("\n==== FINAL ANSWER====\n")
log.color_print(self.llm.remove_think(response))
return self.llm.remove_think(response), all_retrieved_results
def _format_chunk_texts(self, chunk_texts: list[str]) -> str:
chunk_str = ""
for i, chunk in enumerate(chunk_texts):
chunk_str += f"""<chunk_{i}>\n{chunk}\n</chunk_{i}>\n"""
return chunk_str