@ -1,5 +1,3 @@
import asyncio
from deepsearcher . agent . base import BaseAgent , describe_class
from deepsearcher . embedding . base import BaseEmbedding
from deepsearcher . llm . base import BaseLLM
@ -7,20 +5,23 @@ from deepsearcher.utils import log
from deepsearcher . vector_db import RetrievalResult
from deepsearcher . vector_db . base import BaseVectorDB , deduplicate
COLLECTION_ROUTE_PROMPT = """
I provide you with collection_name ( s ) and corresponding collection_description ( s ) .
Please select the collection names that may be related to the question and return a python list of str .
If there is no collection related to the question , you can return an empty list .
Please select the collection names that may be related to the query and return a python list of str .
If there is no collection related to the query , you can return an empty list .
" QUESTION " : { question }
" Query " : { query }
" COLLECTION_INFO " : { collection_info }
When you return , you can ONLY return a json convertable python list of str , WITHOUT any other additional content .
Your selected collection name list is :
"""
SUB_QUERY_PROMPT = """
To answer this question more comprehensively , please break down the original question into few numbers of sub - questions ( more if necessary ) .
To answer this question more comprehensively , please break down the original question into few numbers of sub - questions
( the less the better , but more if nesscessary to ensure answering the original question ) .
If this is a very simple question and no decomposition is necessary , then keep the only one original question .
Make sure each sub - question is clear , concise and atomic .
Return as list of str in python style and json convertable .
@ -43,16 +44,18 @@ Example output:
Provide your response in a python code list of str format :
"""
RERANK_PROMPT = """
Based on the query questions and the retrieved chunks , determine whether each chunk is helpful in answering any of the query questions .
For each chunk , you must return " YES " or " NO " without any other information .
Based on the query and the retrieved chunks , give a quick judge of whether each chunk is helpful in answering the query .
For each chunk , you must return " YES " or " NO " python style list without any other information .
Query Questions : { query }
Query : { query }
Retrieved Chunks :
{ retrieved_chunks }
Respond with a list of " YES " or " NO " values , one for each chunk , in the same order as the chunks are listed . For example a list of chunks of three : [ " YES " , " NO " , " YES " ]
Respond with a list of " YES " or " NO " values , one for each chunk , in the same order as the chunks are listed .
For example , if there is a list of four chunks , the answer could be : [ " YES " , " NO " , " YES " , " YES " ]
"""
@ -60,16 +63,16 @@ REFLECT_PROMPT = """
Determine whether additional search queries are needed based on the original query , previous sub queries , and all retrieved document chunks .
If returned chunks does not cover all previous sub - queries , this means that there are no related documents can be retrieved .
In this case , try generate simliar but slightly different queries to the previous sub - queries .
And if further research is needed based on the new information , provide a Python list of more queries .
( which is prefered , even if the previous sub - queries can be well answered by retrieved chunks , but ultimately according to your judge )
And if further research is needed based on the new information which those chunks provided , give more queries on the basis of them .
( which is prefered , but ultimately according to your judge )
If no further research is needed , return an empty list .
Original Query : { question }
Original Query : { original_query }
Previous Sub Queries : { mini_question s}
Previous Sub Queries : { all_sub_querie s}
Related Chunks :
{ mini_ chunk_ str }
{ chunks }
Respond exclusively in valid List of str format without any other text . """
@ -79,14 +82,15 @@ You are a AI content analysis expert.
Please generate a long , specific and detailed answer or report based on the previous queries and the retrieved document chunks .
If the chunks are not enough to answer the query or additional information is needed to enhance the content , you should answer with your own knowledge .
In this case , mark the part ( s ) that generated by your own with < unref > your knowledge here < / unref >
( Don ' t place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the context)
( Don ' t place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the report)
Plus , you should give references in the report where you quote from the chunks using markdown links , and give a list of references at the end of the report .
Original Query : { question }
Original Query : { original_query }
Previous Sub Queries : { mini_question s}
Previous Sub Queries : { all_sub_querie s}
Related Chunks :
{ mini_ chunk_ str }
{ chunks }
"""
@ -108,7 +112,7 @@ class DeepSearch(BaseAgent):
embedding_model : BaseEmbedding ,
vector_db : BaseVectorDB ,
max_iter : int = 3 ,
route_collection : bool = Tru e,
route_collection : bool = Fals e,
text_window_splitter : bool = True ,
* * kwargs ,
) :
@ -162,7 +166,7 @@ class DeepSearch(BaseAgent):
)
return [ the_only_collection ]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT . format (
question = query ,
query = query ,
collection_info = [
{
" collection_name " : collection_info . collection_name ,
@ -198,7 +202,7 @@ class DeepSearch(BaseAgent):
content = self . llm . remove_think ( content )
return self . llm . literal_eval ( content )
async def _search_chunks_from_vectordb ( self , query : str ) :
def _search_chunks_from_vectordb ( self , query : str ) :
if self . route_collection :
selected_collections = self . invoke (
query = query , dim = self . embedding_model . dimension
@ -222,7 +226,10 @@ class DeepSearch(BaseAgent):
# Format all chunks for batch processing
formatted_chunks = " "
for i , retrieved_result in enumerate ( retrieved_results ) :
formatted_chunks + = f " <chunk_ { i } > \n { retrieved_result . text } \n </chunk_ { i } > \n "
formatted_chunks + = f '''
< chunk_ { i + 1 } > \n { retrieved_result . text } \n < / chunk_ { i + 1 } > \n
< reference_ { i + 1 } > \n { retrieved_result . reference } \n < / reference_ { i + 1 } >
'''
# Batch process all chunks with a single LLM call
content = self . llm . chat (
@ -278,21 +285,27 @@ class DeepSearch(BaseAgent):
)
return all_retrieved_results
def _generate_gap _queries (
self , original_query : str , all_sub_queries : list [ str ] , all_chunk s : list [ RetrievalResult ]
def _generate_more_sub _queries (
self , original_query : str , all_sub_queries : list [ str ] , all_retrieved_result s : list [ RetrievalResult ]
) - > list [ str ] :
chunks = [ ]
for i , chunk in enumerate ( all_retrieved_results ) :
if self . text_window_splitter and " wider_text " in chunk . metadata :
chunks . append ( chunk . metadata [ " wider_text " ] )
else :
chunks . append ( f ''' <chunk { i + 1 } > { chunk . text } </chunk { i + 1 } ><reference { i + 1 } > { chunk . reference } </reference { i + 1 } > ''' )
reflect_prompt = REFLECT_PROMPT . format (
question = original_query ,
mini_questions = all_sub_queries ,
mini_chunk_str = self . _format_chunk_texts ( [ chunk . text for chunk in all_chunks ] )
if len ( all_chunks ) > 0
original_query = original_query ,
all_sub_querie s= all_sub_queries ,
chunks = " \n " . join ( chunks )
if len ( all_retrieved_result s ) > 0
else " NO RELATED CHUNKS FOUND. " ,
)
response = self . llm . chat ( [ { " role " : " user " , " content " : reflect_prompt } ] )
response = self . llm . remove_think ( response )
return self . llm . literal_eval ( response )
def retrieve ( self , original_query : str , * * kwargs ) - > tuple [ list [ RetrievalResult ] , dict ] :
def retrieve ( self , original_query : str , * * kwargs ) - > tuple [ list [ RetrievalResult ] , list [ str ] ] :
"""
Retrieve relevant documents from the knowledge base for the given query .
@ -308,15 +321,10 @@ class DeepSearch(BaseAgent):
- A list of retrieved document results
- Additional information about the retrieval process
"""
return asyncio . run ( self . async_retrieve ( original_query , * * kwargs ) )
async def async_retrieve (
self , original_query : str , * * kwargs
) - > tuple [ list [ RetrievalResult ] , dict ] :
max_iter = kwargs . pop ( " max_iter " , self . max_iter )
### SUB QUERIES ###
log . color_print ( f " <query> { original_query } </query> \n " )
all_search_res = [ ]
all_search_results = [ ]
all_sub_queries = [ ]
sub_queries = self . _generate_sub_queries ( original_query )
@ -324,54 +332,46 @@ class DeepSearch(BaseAgent):
log . color_print ( " No sub queries were generated by the LLM. Exiting. " )
return [ ] , { }
else :
log . color_print (
f " <think> Break down the original query into new sub queries: { sub_queries } </think> \n "
)
log . color_print ( f " </think> Break down the original query into new sub queries: { sub_queries } " )
all_sub_queries . extend ( sub_queries )
sub_gap_queries = sub_queries
for iter in range ( max_iter ) :
log . color_print ( f " >> Iteration: { iter + 1 } \n " )
search_res_from_vectordb = [ ]
search_res_from_internet = [ ] # TODO
# Create all search tasks
search_tasks = [
self . _search_chunks_from_vectordb ( query )
for query in sub_gap_queries
]
# Execute all tasks in parallel and wait for results
search_results = await asyncio . gather ( * search_tasks )
# Merge all results
for result in search_results :
search_res = result
search_res_from_vectordb . extend ( search_res )
search_res_from_vectordb = deduplicate ( search_res_from_vectordb )
for it in range ( max_iter ) :
log . color_print ( f " >> Iteration: { it + 1 } \n " )
# Execute all search tasks sequentially
for query in sub_queries :
result = self . _search_chunks_from_vectordb ( query )
all_search_results . extend ( result )
undeduped_len = len ( all_search_results )
all_search_results = deduplicate ( all_search_results )
deduped_len = len ( all_search_results )
if undeduped_len - deduped_len != 0 :
log . color_print (
f " <search> Removed { undeduped_len - deduped_len } duplicates </search> "
)
# search_res_from_internet = deduplicate_results(search_res_from_internet)
all_search_res . extend ( search_res_from_vectordb + search_res_from_internet )
if iter == max_iter - 1 :
log . color_print ( " <think> Exceeded maximum iterations. Exiting. </think> \n " )
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet )
if it == max_iter - 1 :
log . color_print ( " </ think> Exceeded maximum iterations. Exiting. " )
break
### REFLECTION & GET GAP QUERIES ###
log . color_print ( " <think> Reflecting on the search results... </think> \n " )
sub_gap_ queries = self . _generate_gap _queries (
original_query , all_sub_queries , all_search_res
### REFLECTION & GET MORE SUB QUERIES ###
log . color_print ( " </ think> Reflecting on the search results... " )
sub_queries = self . _generate_more_sub _queries (
original_query , all_sub_queries , all_search_results
)
if not sub_gap_ queries or len ( sub_gap _queries ) == 0 :
log . color_print ( " <think> No new search queries were generated. Exiting. </think> \n " )
if not sub_queries or len ( sub_queries ) == 0 :
log . color_print ( " </ think> No new search queries were generated. Exiting. " )
break
else :
log . color_print (
f " <think> New search queries for next iteration: { sub_gap_queries } </think> \n "
)
all_sub_queries . extend ( sub_gap_queries )
f " </think> New search queries for next iteration: { sub_queries } " )
all_sub_queries . extend ( sub_queries )
all_search_res = deduplicate ( all_search_res )
additional_info = { " all_sub_queries " : all_sub_queries }
return all_search_res , additional_info
all_search_results = deduplicate ( all_search_results )
return all_search_results , all_sub_queries
def query ( self , query : str , * * kwargs ) - > tuple [ str , list [ RetrievalResult ] ] :
def query ( self , original_ query: str , * * kwargs ) - > tuple [ str , list [ RetrievalResult ] ] :
"""
Query the agent and generate an answer based on retrieved documents .
@ -387,31 +387,24 @@ class DeepSearch(BaseAgent):
- The generated answer
- A list of retrieved document results
"""
all_retrieved_results , additional_info = self . retrieve ( query , * * kwargs )
all_retrieved_results , all_sub_queries = self . retrieve ( original_ query, * * kwargs )
if not all_retrieved_results or len ( all_retrieved_results ) == 0 :
return f " No relevant information found for query ' { query } ' . " , [ ]
all_sub_queries = additional_info [ " all_sub_queries " ]
chunk_texts = [ ]
for chunk in all_retrieved_results :
return f " No relevant information found for query ' { original_query } ' . " , [ ]
chunks = [ ] # type: list[str]
for i , chunk in enumerate ( all_retrieved_results ) :
if self . text_window_splitter and " wider_text " in chunk . metadata :
chunk_text s . append ( chunk . metadata [ " wider_text " ] )
chunks . append ( chunk . metadata [ " wider_text " ] )
else :
chunk_text s . append ( chunk . text )
chunks . append ( f ''' <chunk { i + 1 } > { chunk . text } </chunk { i + 1 } ><reference { i + 1 } > { chunk . reference } </reference { i + 1 } > ''' )
log . color_print (
f " <think> Summarize answer from all { len ( all_retrieved_results ) } retrieved chunks... </think> \n "
)
summary_prompt = SUMMARY_PROMPT . format (
question = query ,
mini_question s= all_sub_queries ,
mini_chunk_str = self . _format_chunk_texts ( chunk_texts ) ,
original_query = original_ query,
all_sub_querie s= all_sub_queries ,
chunks = " \n " . join ( chunks )
)
response = self . llm . chat ( [ { " role " : " user " , " content " : summary_prompt } ] )
log . color_print ( " \n ==== FINAL ANSWER==== \n " )
log . color_print ( self . llm . remove_think ( response ) )
return self . llm . remove_think ( response ) , all_retrieved_results
def _format_chunk_texts ( self , chunk_texts : list [ str ] ) - > str :
chunk_str = " "
for i , chunk in enumerate ( chunk_texts ) :
chunk_str + = f """ <chunk_ { i } > \n { chunk } \n </chunk_ { i } > \n """
return chunk_str