You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
414 lines
18 KiB
414 lines
18 KiB
from deepsearcher.agent.base import BaseAgent, describe_class
|
|
from deepsearcher.embedding.base import BaseEmbedding
|
|
from deepsearcher.llm.base import BaseLLM
|
|
from deepsearcher.utils import log
|
|
from deepsearcher.vector_db import RetrievalResult
|
|
from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
|
|
|
|
|
|
COLLECTION_ROUTE_PROMPT = """
|
|
I provide you with collection_name(s) and corresponding collection_description(s).
|
|
Please select the collection names that may be related to the question and return a python list of str.
|
|
If there is no collection related to the question, you can return an empty list.
|
|
|
|
"QUESTION": {question}
|
|
"COLLECTION_INFO": {collection_info}
|
|
|
|
When you return, you can ONLY return a json convertable python list of str, WITHOUT any other additional content.
|
|
Your selected collection name list is:
|
|
"""
|
|
|
|
|
|
SUB_QUERY_PROMPT = """
|
|
To answer this question more comprehensively, please break down the original question into few numbers of sub-questions
|
|
(the less the better, but more if nesscessary to ensure the coverage of answering the original question).
|
|
If this is a very simple question and no decomposition is necessary, then keep the only one original question.
|
|
Make sure each sub-question is clear, concise and atomic.
|
|
Return as list of str in python style and json convertable.
|
|
|
|
Original Question: {original_query}
|
|
|
|
|
|
<EXAMPLE>
|
|
Example input:
|
|
"Explain deep learning"
|
|
|
|
Example output:
|
|
[
|
|
"What is deep learning?",
|
|
"What is the difference between deep learning and machine learning?",
|
|
"What is the history of deep learning?"
|
|
]
|
|
</EXAMPLE>
|
|
|
|
Provide your response in a python code list of str format:
|
|
"""
|
|
|
|
|
|
RERANK_PROMPT = """
|
|
Based on the query and the retrieved chunks, give a quick judge of whether each chunk is helpful in answering the query.
|
|
For each chunk, you must return "YES" or "NO" python style list without any other information.
|
|
|
|
Query: {query}
|
|
|
|
Retrieved Chunks:
|
|
{retrieved_chunks}
|
|
|
|
Respond with a list of "YES" or "NO" values, one for each chunk, in the same order as the chunks are listed.
|
|
For example, if there is a list of four chunks, the answer could be: ["YES", "NO", "YES", "YES"]
|
|
"""
|
|
|
|
|
|
REFLECT_PROMPT = """
|
|
Determine whether additional search queries are needed based on the original query, previous sub queries, and all retrieved document chunks.
|
|
If returned chunks does not cover all previous sub-queries, this means that there are no related documents can be retrieved.
|
|
In this case, try generate simliar but slightly different queries to the previous sub-queries.
|
|
And if further research is needed based on the new information which those chunks provided, give more queries on the basis of them.
|
|
(which is prefered, even if the previous sub-queries can be well answered by retrieved chunks, but ultimately according to your judge)
|
|
If no further research is needed, return an empty list.
|
|
|
|
Original Query: {question}
|
|
|
|
Previous Sub Queries: {mini_questions}
|
|
|
|
Related Chunks:
|
|
{mini_chunk_str}
|
|
|
|
Respond exclusively in valid List of str format without any other text."""
|
|
|
|
|
|
SUMMARY_PROMPT = """
|
|
You are a AI content analysis expert.
|
|
Please generate a long, specific and detailed answer or report based on the previous queries and the retrieved document chunks.
|
|
If the chunks are not enough to answer the query or additional information is needed to enhance the content, you should answer with your own knowledge.
|
|
In this case, mark the part(s) that generated by your own with <unref>your knowledge here</unref>
|
|
(Don't place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the report)
|
|
|
|
Original Query: {question}
|
|
|
|
Previous Sub Queries: {mini_questions}
|
|
|
|
Related Chunks:
|
|
{mini_chunk_str}
|
|
|
|
"""
|
|
|
|
|
|
@describe_class(
|
|
"This agent is suitable for handling general and simple queries, such as given a topic and then writing a report, survey, or article."
|
|
)
|
|
class DeepSearch(BaseAgent):
|
|
"""
|
|
Deep Search agent implementation for comprehensive information retrieval.
|
|
|
|
This agent performs a thorough search through the knowledge base, analyzing
|
|
multiple aspects of the query to provide comprehensive and detailed answers.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
llm: BaseLLM,
|
|
embedding_model: BaseEmbedding,
|
|
vector_db: BaseVectorDB,
|
|
max_iter: int = 3,
|
|
route_collection: bool = False,
|
|
text_window_splitter: bool = True,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Initialize the DeepSearch agent.
|
|
|
|
Args:
|
|
llm: The language model to use for generating answers.
|
|
embedding_model: The embedding model to use for query embedding.
|
|
vector_db: The vector database to search for relevant documents.
|
|
max_iter: The maximum number of iterations for the search process.
|
|
route_collection: Whether to use a collection router for search.
|
|
text_window_splitter: Whether to use text_window splitter.
|
|
**kwargs: Additional keyword arguments for customization.
|
|
"""
|
|
self.llm = llm
|
|
self.embedding_model = embedding_model
|
|
self.vector_db = vector_db
|
|
self.max_iter = max_iter
|
|
self.route_collection = route_collection
|
|
self.all_collections = [
|
|
collection_info.collection_name
|
|
for collection_info in self.vector_db.list_collections(dim=embedding_model.dimension)
|
|
]
|
|
self.text_window_splitter = text_window_splitter
|
|
|
|
def invoke(self, query: str, dim: int, **kwargs) -> list[str]:
|
|
"""
|
|
Determine which collections are relevant for the given query.
|
|
|
|
This method analyzes the query content and selects collections that are
|
|
most likely to contain information relevant to answering the query.
|
|
|
|
Args:
|
|
query (str): The query to analyze.
|
|
dim (int): The dimension of the vector space to search in.
|
|
|
|
Returns:
|
|
List[str]: A list of selected collection names
|
|
"""
|
|
collection_infos = self.vector_db.list_collections(dim=dim)
|
|
if len(collection_infos) == 0:
|
|
log.color_print(
|
|
"No collections found in the vector database. Please check the database connection."
|
|
)
|
|
return []
|
|
if len(collection_infos) == 1:
|
|
the_only_collection = collection_infos[0].collection_name
|
|
log.color_print(
|
|
f"<think> Perform search [{query}] on the vector DB collection: {the_only_collection} </think>\n"
|
|
)
|
|
return [the_only_collection]
|
|
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
|
|
question=query,
|
|
collection_info=[
|
|
{
|
|
"collection_name": collection_info.collection_name,
|
|
"collection_description": collection_info.description,
|
|
}
|
|
for collection_info in collection_infos
|
|
],
|
|
)
|
|
response = self.llm.chat(
|
|
messages=[{"role": "user", "content": vector_db_search_prompt}]
|
|
)
|
|
selected_collections = self.llm.literal_eval(response)
|
|
|
|
for collection_info in collection_infos:
|
|
# If a collection description is not provided, use the query as the search query
|
|
if not collection_info.description:
|
|
selected_collections.append(collection_info.collection_name)
|
|
# If the default collection exists, use the query as the search query
|
|
if self.vector_db.default_collection == collection_info.collection_name:
|
|
selected_collections.append(collection_info.collection_name)
|
|
selected_collections = list(set(selected_collections))
|
|
log.color_print(
|
|
f"<think> Perform search [{query}] on the vector DB collections: {selected_collections} </think>\n"
|
|
)
|
|
return selected_collections
|
|
|
|
def _generate_sub_queries(self, original_query: str) -> tuple[list[str], int]:
|
|
content = self.llm.chat(
|
|
messages=[
|
|
{"role": "user", "content": SUB_QUERY_PROMPT.format(original_query=original_query)}
|
|
]
|
|
)
|
|
content = self.llm.remove_think(content)
|
|
return self.llm.literal_eval(content)
|
|
|
|
def _search_chunks_from_vectordb(self, query: str):
|
|
if self.route_collection:
|
|
selected_collections = self.invoke(
|
|
query=query, dim=self.embedding_model.dimension
|
|
)
|
|
else:
|
|
selected_collections = self.all_collections
|
|
|
|
all_retrieved_results = []
|
|
query_vector = self.embedding_model.embed_query(query)
|
|
for collection in selected_collections:
|
|
log.color_print(f"<search> Search [{query}] in [{collection}]... </search>\n")
|
|
retrieved_results = self.vector_db.search_data(
|
|
collection=collection, vector=query_vector, query_text=query
|
|
)
|
|
if not retrieved_results or len(retrieved_results) == 0:
|
|
log.color_print(
|
|
f"<search> No relevant document chunks found in '{collection}'! </search>\n"
|
|
)
|
|
continue
|
|
|
|
# Format all chunks for batch processing
|
|
formatted_chunks = ""
|
|
for i, retrieved_result in enumerate(retrieved_results):
|
|
formatted_chunks += f'''
|
|
<chunk_{i + 1}>\n{retrieved_result.text}\n</chunk_{i + 1}>\n
|
|
<reference_{i + 1}>\n{retrieved_result.reference}\n</reference_{i + 1}>
|
|
'''
|
|
|
|
# Batch process all chunks with a single LLM call
|
|
content = self.llm.chat(
|
|
messages=[
|
|
{
|
|
"role": "user",
|
|
"content": RERANK_PROMPT.format(
|
|
query=query,
|
|
retrieved_chunks=formatted_chunks,
|
|
),
|
|
}
|
|
]
|
|
)
|
|
content = self.llm.remove_think(content).strip()
|
|
|
|
# Parse the response to determine which chunks are relevant
|
|
try:
|
|
relevance_list = self.llm.literal_eval(content)
|
|
if not isinstance(relevance_list, list):
|
|
raise ValueError("Response is not a list")
|
|
except (ValueError, SyntaxError):
|
|
# Fallback: if parsing fails, treat all chunks as relevant
|
|
log.color_print(f"Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: {content}")
|
|
relevance_list = ["YES"] * len(retrieved_results)
|
|
|
|
# Ensure we have enough relevance judgments for all chunks
|
|
while len(relevance_list) < len(retrieved_results):
|
|
relevance_list.append("YES") # Default to relevant if no judgment provided
|
|
|
|
# Filter relevant chunks based on LLM response
|
|
accepted_chunk_num = 0
|
|
references = set()
|
|
for i, retrieved_result in enumerate(retrieved_results):
|
|
# Check if we have a relevance judgment for this chunk
|
|
is_relevant = (
|
|
i < len(relevance_list) and
|
|
"YES" in relevance_list[i].upper() and
|
|
"NO" not in relevance_list[i].upper()) if i < len(relevance_list
|
|
) else True
|
|
|
|
if is_relevant:
|
|
all_retrieved_results.append(retrieved_result)
|
|
accepted_chunk_num += 1
|
|
references.add(retrieved_result.reference)
|
|
|
|
if accepted_chunk_num > 0:
|
|
log.color_print(
|
|
f"<search> Accept {accepted_chunk_num} document chunk(s) from references: {list(references)} </search>\n"
|
|
)
|
|
else:
|
|
log.color_print(
|
|
f"<search> No document chunk accepted from '{collection}'! </search>\n"
|
|
)
|
|
return all_retrieved_results
|
|
|
|
def _generate_gap_queries(
|
|
self, original_query: str, all_sub_queries: list[str], all_chunks: list[RetrievalResult]
|
|
) -> list[str]:
|
|
reflect_prompt = REFLECT_PROMPT.format(
|
|
question=original_query,
|
|
mini_questions=all_sub_queries,
|
|
mini_chunk_str=self._format_chunk_texts([chunk.text for chunk in all_chunks])
|
|
if len(all_chunks) > 0
|
|
else "NO RELATED CHUNKS FOUND.",
|
|
)
|
|
response = self.llm.chat([{"role": "user", "content": reflect_prompt}])
|
|
response = self.llm.remove_think(response)
|
|
return self.llm.literal_eval(response)
|
|
|
|
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], dict]:
|
|
"""
|
|
Retrieve relevant documents from the knowledge base for the given query.
|
|
|
|
This method performs a deep search through the vector database to find
|
|
the most relevant documents for answering the query.
|
|
|
|
Args:
|
|
original_query (str): The query to search for.
|
|
**kwargs: Additional keyword arguments for customizing the retrieval.
|
|
|
|
Returns:
|
|
Tuple[List[RetrievalResult], int, dict]: A tuple containing:
|
|
- A list of retrieved document results
|
|
- Additional information about the retrieval process
|
|
"""
|
|
max_iter = kwargs.pop("max_iter", self.max_iter)
|
|
### SUB QUERIES ###
|
|
log.color_print(f"<query> {original_query} </query>\n")
|
|
all_search_res = []
|
|
all_sub_queries = []
|
|
|
|
sub_queries = self._generate_sub_queries(original_query)
|
|
if not sub_queries:
|
|
log.color_print("No sub queries were generated by the LLM. Exiting.")
|
|
return [], {}
|
|
else:
|
|
log.color_print(f"</think> Break down the original query into new sub queries: {sub_queries} ")
|
|
all_sub_queries.extend(sub_queries)
|
|
sub_gap_queries = sub_queries
|
|
|
|
for iter in range(max_iter):
|
|
log.color_print(f">> Iteration: {iter + 1}\n")
|
|
search_res_from_vectordb = []
|
|
# search_res_from_internet = [] # TODO
|
|
|
|
# Execute all search tasks sequentially
|
|
for query in sub_gap_queries:
|
|
result = self._search_chunks_from_vectordb(query)
|
|
search_res_from_vectordb.extend(result)
|
|
undedup_len = len(search_res_from_vectordb)
|
|
search_res_from_vectordb = deduplicate(search_res_from_vectordb)
|
|
deduped_len = len(search_res_from_vectordb)
|
|
if undedup_len - deduped_len != 0:
|
|
log.color_print(
|
|
f"<search> Removed {undedup_len - deduped_len} duplicates </search>"
|
|
)
|
|
# search_res_from_internet = deduplicate_results(search_res_from_internet)
|
|
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
|
|
all_search_res.extend(search_res_from_vectordb)
|
|
if iter == max_iter - 1:
|
|
log.color_print("</think> Exceeded maximum iterations. Exiting. ")
|
|
break
|
|
### REFLECTION & GET GAP QUERIES ###
|
|
log.color_print("</think> Reflecting on the search results... ")
|
|
sub_gap_queries = self._generate_gap_queries(
|
|
original_query, all_sub_queries, all_search_res
|
|
)
|
|
if not sub_gap_queries or len(sub_gap_queries) == 0:
|
|
log.color_print("</think> No new search queries were generated. Exiting. ")
|
|
break
|
|
else:
|
|
log.color_print(
|
|
f"</think> New search queries for next iteration: {sub_gap_queries} ")
|
|
all_sub_queries.extend(sub_gap_queries)
|
|
|
|
all_search_res = deduplicate(all_search_res)
|
|
additional_info = {"all_sub_queries": all_sub_queries}
|
|
return all_search_res, additional_info
|
|
|
|
def query(self, query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
|
|
"""
|
|
Query the agent and generate an answer based on retrieved documents.
|
|
|
|
This method retrieves relevant documents and uses the language model
|
|
to generate a comprehensive answer to the query.
|
|
|
|
Args:
|
|
query (str): The query to answer.
|
|
**kwargs: Additional keyword arguments for customizing the query process.
|
|
|
|
Returns:
|
|
Tuple[str, List[RetrievalResult], int]: A tuple containing:
|
|
- The generated answer
|
|
- A list of retrieved document results
|
|
"""
|
|
all_retrieved_results, additional_info = self.retrieve(query, **kwargs)
|
|
if not all_retrieved_results or len(all_retrieved_results) == 0:
|
|
return f"No relevant information found for query '{query}'.", []
|
|
all_sub_queries = additional_info["all_sub_queries"]
|
|
chunk_texts = []
|
|
for chunk in all_retrieved_results:
|
|
if self.text_window_splitter and "wider_text" in chunk.metadata:
|
|
chunk_texts.append(chunk.metadata["wider_text"])
|
|
else:
|
|
chunk_texts.append(chunk.text)
|
|
log.color_print(
|
|
f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n"
|
|
)
|
|
summary_prompt = SUMMARY_PROMPT.format(
|
|
question=query,
|
|
mini_questions=all_sub_queries,
|
|
mini_chunk_str=self._format_chunk_texts(chunk_texts),
|
|
)
|
|
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
|
|
log.color_print("\n==== FINAL ANSWER====\n")
|
|
log.color_print(self.llm.remove_think(response))
|
|
return self.llm.remove_think(response), all_retrieved_results
|
|
|
|
def _format_chunk_texts(self, chunk_texts: list[str]) -> str:
|
|
chunk_str = ""
|
|
for i, chunk in enumerate(chunk_texts):
|
|
chunk_str += f"""<chunk_{i + 1}>\n{chunk}\n</chunk_{i + 1}>\n"""
|
|
return chunk_str
|
|
|