You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

419 lines
18 KiB

from deepsearcher.agent.base import BaseAgent, describe_class
2 weeks ago
from deepsearcher.embedding.base import BaseEmbedding
from deepsearcher.llm.base import BaseLLM
from deepsearcher.utils import log
from deepsearcher.vector_db import RetrievalResult
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
2 weeks ago
1 week ago
COLLECTION_ROUTE_PROMPT = """
I provide you with collection_name(s) and corresponding collection_description(s).
Please select the collection names that may be related to the query and return a python list of str.
If there is no collection related to the query, you can return an empty list.
"Query": {query}
"COLLECTION_INFO": {collection_info}
When you return, you can ONLY return a json convertable python list of str, WITHOUT any other additional content.
Your selected collection name list is:
"""
1 week ago
SUB_QUERY_PROMPT = """
To answer this question more comprehensively, please break down the original question into few numbers of sub-questions
(the less the better, but more if nesscessary to ensure answering the original question).
If this is a very simple question and no decomposition is necessary, then keep the only one original question.
Make sure each sub-question is clear, concise and atomic.
Return as list of str in python style and json convertable.
2 weeks ago
Original Question: {original_query}
<EXAMPLE>
Example input:
"Explain deep learning"
Example output:
[
"What is deep learning?",
"What is the difference between deep learning and machine learning?",
"What is the history of deep learning?"
]
</EXAMPLE>
Provide your response in a python code list of str format:
"""
1 week ago
RERANK_PROMPT = """
Based on the query and the retrieved chunks, give a quick judge of whether each chunk is helpful in answering the query.
For each chunk, you must return "YES" or "NO" python style list without any other information.
2 weeks ago
Query: {query}
2 weeks ago
Retrieved Chunks:
{retrieved_chunks}
Respond with a list of "YES" or "NO" values, one for each chunk, in the same order as the chunks are listed.
For example, if there is a list of four chunks, the answer could be: ["YES", "NO", "YES", "YES"]
"""
2 weeks ago
REFLECT_PROMPT = """
Determine whether additional search queries are needed based on the original query, previous sub queries, and all retrieved document chunks.
If returned chunks does not cover all previous sub-queries, this means that there are no related documents can be retrieved.
In this case, try generate simliar but slightly different queries to the previous sub-queries.
And if further research is needed based on the new information which those chunks provided, give more queries on the basis of them.
(which is prefered, but ultimately according to your judge)
If no further research is needed, return an empty list.
2 weeks ago
Original Query: {original_query}
2 weeks ago
Previous Sub Queries: {all_sub_queries}
2 weeks ago
Related Chunks:
{chunks}
2 weeks ago
Respond exclusively in valid List of str format without any other text."""
SUMMARY_PROMPT = """
You are a AI content analysis expert.
Please generate a long, specific and detailed answer or report based on the previous queries and the retrieved document chunks.
If the chunks are not enough to answer the query or additional information is needed to enhance the content, you should answer with your own knowledge.
In this case, mark the part(s) that generated by your own with <unref>your knowledge here</unref>
(Don't place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the report)
Plus, you should quote chunk references and give a list of references at the end of the report.
Here is an example:
<EXAMPLE>
Quote example (an upper quote anchor, strictly apply the format below):
XGBoost is a powerful ensemble learning method[<sup>[2]</sup>](#2)
Reference list example (should be exact the same as the <reference><reference>):
<div id="2"><a href="MachineLearning.pdf">[2] MachineLearning.pdf</a></div>
</EXAMPLE>
2 weeks ago
Original Query: {original_query}
2 weeks ago
Previous Sub Queries: {all_sub_queries}
2 weeks ago
Related Chunks:
{chunks}
2 weeks ago
"""
@describe_class(
"This agent is suitable for handling general and simple queries, such as given a topic and then writing a report, survey, or article."
)
class DeepSearch(BaseAgent):
2 weeks ago
"""
Deep Search agent implementation for comprehensive information retrieval.
This agent performs a thorough search through the knowledge base, analyzing
multiple aspects of the query to provide comprehensive and detailed answers.
"""
def __init__(
self,
llm: BaseLLM,
embedding_model: BaseEmbedding,
vector_db: BaseVectorDB,
max_iter: int,
1 week ago
route_collection: bool = False,
2 weeks ago
text_window_splitter: bool = True,
**kwargs,
):
"""
Initialize the DeepSearch agent.
Args:
llm: The language model to use for generating answers.
embedding_model: The embedding model to use for query embedding.
vector_db: The vector database to search for relevant documents.
max_iter: The maximum number of iterations for the search process.
route_collection: Whether to use a collection router for search.
text_window_splitter: Whether to use text_window splitter.
**kwargs: Additional keyword arguments for customization.
"""
self.llm = llm
self.embedding_model = embedding_model
self.vector_db = vector_db
self.max_iter = max_iter
self.route_collection = route_collection
self.all_collections = [
collection_info.collection_name
for collection_info in self.vector_db.list_collections(dim=embedding_model.dimension)
]
2 weeks ago
self.text_window_splitter = text_window_splitter
def invoke(self, query: str, dim: int, **kwargs) -> list[str]:
"""
Determine which collections are relevant for the given query.
This method analyzes the query content and selects collections that are
most likely to contain information relevant to answering the query.
Args:
query (str): The query to analyze.
dim (int): The dimension of the vector space to search in.
Returns:
List[str]: A list of selected collection names
"""
collection_infos = self.vector_db.list_collections(dim=dim)
if len(collection_infos) == 0:
log.color_print(
"No collections found in the vector database. Please check the database connection."
)
return []
if len(collection_infos) == 1:
the_only_collection = collection_infos[0].collection_name
log.color_print(
f"<think> Perform search [{query}] on the vector DB collection: {the_only_collection} </think>\n"
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
query=query,
collection_info=[
{
"collection_name": collection_info.collection_name,
"collection_description": collection_info.description,
}
for collection_info in collection_infos
],
)
response = self.llm.chat(
messages=[{"role": "user", "content": vector_db_search_prompt}]
)
selected_collections = self.llm.literal_eval(response)
for collection_info in collection_infos:
# If a collection description is not provided, use the query as the search query
if not collection_info.description:
selected_collections.append(collection_info.collection_name)
# If the default collection exists, use the query as the search query
if self.vector_db.default_collection == collection_info.collection_name:
selected_collections.append(collection_info.collection_name)
selected_collections = list(set(selected_collections))
log.color_print(
f"<think> Perform search [{query}] on the vector DB collections: {selected_collections} </think>\n"
)
return selected_collections
def _generate_sub_queries(self, original_query: str) -> tuple[list[str], int]:
content = self.llm.chat(
2 weeks ago
messages=[
{"role": "user", "content": SUB_QUERY_PROMPT.format(original_query=original_query)}
]
)
content = self.llm.remove_think(content)
return self.llm.literal_eval(content)
2 weeks ago
def _search_chunks_from_vectordb(self, query: str):
2 weeks ago
if self.route_collection:
selected_collections = self.invoke(
2 weeks ago
query=query, dim=self.embedding_model.dimension
)
else:
selected_collections = self.all_collections
2 weeks ago
all_retrieved_results = []
query_vector = self.embedding_model.embed_query(query)
for collection in selected_collections:
log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
retrieved_results = self.vector_db.search_data(
collection=collection, vector=query_vector, query_text=query
)
if not retrieved_results or len(retrieved_results) == 0:
log.color_print(
f"<search> No relevant document chunks found in '{collection}'! </search>\n"
)
continue
# Format all chunks for batch processing
formatted_chunks = ""
for i, retrieved_result in enumerate(retrieved_results):
formatted_chunks += f'''
<chunk_{i + 1}>\n{retrieved_result.text}\n</chunk_{i + 1}>\n
<reference_{i + 1}>\n{retrieved_result.reference}\n</reference_{i + 1}>
'''
# Batch process all chunks with a single LLM call
content = self.llm.chat(
messages=[
{
"role": "user",
"content": RERANK_PROMPT.format(
query=query,
retrieved_chunks=formatted_chunks,
),
}
]
)
content = self.llm.remove_think(content).strip()
# Parse the response to determine which chunks are relevant
try:
relevance_list = self.llm.literal_eval(content)
if not isinstance(relevance_list, list):
raise ValueError("Response is not a list")
except (ValueError, SyntaxError):
# Fallback: if parsing fails, treat all chunks as relevant
log.color_print(f"Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: {content}")
relevance_list = ["YES"] * len(retrieved_results)
# Ensure we have enough relevance judgments for all chunks
while len(relevance_list) < len(retrieved_results):
relevance_list.append("YES") # Default to relevant if no judgment provided
# Filter relevant chunks based on LLM response
2 weeks ago
accepted_chunk_num = 0
references = set()
for i, retrieved_result in enumerate(retrieved_results):
# Check if we have a relevance judgment for this chunk
is_relevant = (
i < len(relevance_list) and
"YES" in relevance_list[i].upper() and
"NO" not in relevance_list[i].upper()) if i < len(relevance_list
) else True
if is_relevant:
2 weeks ago
all_retrieved_results.append(retrieved_result)
accepted_chunk_num += 1
references.add(retrieved_result.reference)
2 weeks ago
if accepted_chunk_num > 0:
log.color_print(
f"<search> Accept {accepted_chunk_num} document chunk(s) from references: {list(references)} </search>\n"
)
else:
log.color_print(
f"<search> No document chunk accepted from '{collection}'! </search>\n"
)
return all_retrieved_results
2 weeks ago
def _generate_more_sub_queries(
self, original_query: str, all_sub_queries: list[str], all_retrieved_results: list[RetrievalResult]
) -> list[str]:
chunks = []
for i, chunk in enumerate(all_retrieved_results):
if self.text_window_splitter and "wider_text" in chunk.metadata:
chunks.append(chunk.metadata["wider_text"])
else:
chunks.append(f'''<chunk {i + 1}>{chunk.text}</chunk {i + 1}><reference {i + 1}>{chunk.reference}</reference {i + 1}>''')
2 weeks ago
reflect_prompt = REFLECT_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks="\n".join(chunks)
if len(all_retrieved_results) > 0
2 weeks ago
else "NO RELATED CHUNKS FOUND.",
)
response = self.llm.chat([{"role": "user", "content": reflect_prompt}])
response = self.llm.remove_think(response)
return self.llm.literal_eval(response)
2 weeks ago
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], list[str]]:
2 weeks ago
"""
Retrieve relevant documents from the knowledge base for the given query.
This method performs a deep search through the vector database to find
the most relevant documents for answering the query.
Args:
original_query (str): The query to search for.
**kwargs: Additional keyword arguments for customizing the retrieval.
Returns:
Tuple[List[RetrievalResult], int, dict]: A tuple containing:
- A list of retrieved document results
- Additional information about the retrieval process
"""
max_iter = kwargs.pop("max_iter", self.max_iter)
### SUB QUERIES ###
log.color_print(f"<query> {original_query} </query>\n")
all_search_results = []
2 weeks ago
all_sub_queries = []
sub_queries = self._generate_sub_queries(original_query)
2 weeks ago
if not sub_queries:
log.color_print("No sub queries were generated by the LLM. Exiting.")
return [], {}
2 weeks ago
else:
log.color_print(f"</think> Break down the original query into new sub queries: {sub_queries} ")
2 weeks ago
all_sub_queries.extend(sub_queries)
for it in range(max_iter):
log.color_print(f">> Iteration: {it + 1}\n")
# Execute all search tasks sequentially
for query in sub_queries:
result = self._search_chunks_from_vectordb(query)
all_search_results.extend(result)
undeduped_len = len(all_search_results)
all_search_results = deduplicate(all_search_results)
deduped_len = len(all_search_results)
if undeduped_len - deduped_len != 0:
log.color_print(
f"<search> Removed {undeduped_len - deduped_len} duplicates </search> "
)
2 weeks ago
# search_res_from_internet = deduplicate_results(search_res_from_internet)
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
if it + 1 >= max_iter:
log.color_print("</think> Exceeded maximum iterations. Exiting. ")
2 weeks ago
break
### REFLECTION & GET MORE SUB QUERIES ###
log.color_print("</think> Reflecting on the search results... ")
sub_queries = self._generate_more_sub_queries(
original_query, all_sub_queries, all_search_results
2 weeks ago
)
if not sub_queries or len(sub_queries) == 0:
log.color_print("</think> No new search queries were generated. Exiting. ")
2 weeks ago
break
else:
log.color_print(
f"</think> New search queries for next iteration: {sub_queries} ")
all_sub_queries.extend(sub_queries)
2 weeks ago
all_search_results = deduplicate(all_search_results)
return all_search_results, all_sub_queries
2 weeks ago
def query(self, original_query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
2 weeks ago
"""
Query the agent and generate an answer based on retrieved documents.
This method retrieves relevant documents and uses the language model
to generate a comprehensive answer to the query.
Args:
query (str): The query to answer.
**kwargs: Additional keyword arguments for customizing the query process.
Returns:
Tuple[str, List[RetrievalResult], int]: A tuple containing:
- The generated answer
- A list of retrieved document results
"""
all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs)
2 weeks ago
if not all_retrieved_results or len(all_retrieved_results) == 0:
return f"No relevant information found for query '{original_query}'.", []
chunks = [] # type: list[str]
for i, chunk in enumerate(all_retrieved_results):
2 weeks ago
if self.text_window_splitter and "wider_text" in chunk.metadata:
chunks.append(chunk.metadata["wider_text"])
2 weeks ago
else:
chunks.append(f'''<chunk {i + 1}>{chunk.text}</chunk {i + 1}><reference {i + 1}>{chunk.reference}</reference {i + 1}>''')
2 weeks ago
log.color_print(
f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n"
)
summary_prompt = SUMMARY_PROMPT.format(
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks="\n".join(chunks)
2 weeks ago
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
2 weeks ago
log.color_print("\n==== FINAL ANSWER====\n")
log.color_print(self.llm.remove_think(response))
return self.llm.remove_think(response), all_retrieved_results