@ -1,5 +1,4 @@
import asyncio
from typing import List , Tuple
from deepsearcher . agent . base import RAGAgent , describe_class
from deepsearcher . agent . collection_router import CollectionRouter
@ -33,7 +32,9 @@ Example output:
Provide your response in a python code list of str format :
"""
RERANK_PROMPT = """ Based on the query questions and the retrieved chunks, determine whether each chunk is helpful in answering any of the query questions. For each chunk, you must return " YES " or " NO " without any other information.
RERANK_PROMPT = """
Based on the query questions and the retrieved chunks , determine whether each chunk is helpful in answering any of the query questions .
For each chunk , you must return " YES " or " NO " without any other information .
Query Questions : { query }
@ -43,7 +44,9 @@ Retrieved Chunks:
Respond with a list of " YES " or " NO " values , one for each chunk , in the same order as the chunks are listed . For example a list of chunks of three : [ " YES " , " NO " , " YES " ] """
REFLECT_PROMPT = """ Determine whether additional search queries are needed based on the original query, previous sub queries, and all retrieved document chunks. If further research is required, provide a Python list of up to 3 search queries. If no further research is required, return an empty list.
REFLECT_PROMPT = """
Determine whether additional search queries are needed based on the original query , previous sub queries , and all retrieved document chunks .
If further research is required , provide a Python list of up to 3 search queries . If no further research is required , return an empty list .
If the original query is to write a report , then you prefer to generate some further queries , instead return an empty list .
@ -51,19 +54,21 @@ Original Query: {question}
Previous Sub Queries : { mini_questions }
Related Chunks :
Related Chunks :
{ mini_chunk_str }
Respond exclusively in valid List of str format without any other text . """
SUMMARY_PROMPT = """ You are a AI content analysis expert, good at summarizing content. Please summarize a long, specific and detailed answer or report based on the previous queries and the retrieved document chunks.
SUMMARY_PROMPT = """
You are a AI content analysis expert , good at summarizing content .
Please summarize a long , specific and detailed answer or report based on the previous queries and the retrieved document chunks .
Original Query : { question }
Previous Sub Queries : { mini_questions }
Related Chunks :
Related Chunks :
{ mini_chunk_str }
"""
@ -112,25 +117,22 @@ class DeepSearch(RAGAgent):
)
self . text_window_splitter = text_window_splitter
def _generate_sub_queries ( self , original_query : str ) - > Tuple [ L ist[ str ] , int ] :
chat_response = self . llm . chat (
def _generate_sub_queries ( self , original_query : str ) - > tuple [ l ist[ str ] , int ] :
content = self . llm . chat (
messages = [
{ " role " : " user " , " content " : SUB_QUERY_PROMPT . format ( original_query = original_query ) }
]
)
response_ content = self . llm . remove_think ( chat_response . content )
return self . llm . literal_eval ( response_ content) , chat_response . total_tokens
content = self . llm . remove_think ( content )
return self . llm . literal_eval ( content )
async def _search_chunks_from_vectordb ( self , query : str ) :
consume_tokens = 0
if self . route_collection :
selected_collections , n_token_route = self . collection_router . invoke (
selected_collections = self . collection_router . invoke (
query = query , dim = self . embedding_model . dimension
)
else :
selected_collections = self . collection_router . all_collections
n_token_route = 0
consume_tokens + = n_token_route
all_retrieved_results = [ ]
query_vector = self . embedding_model . embed_query ( query )
@ -144,14 +146,14 @@ class DeepSearch(RAGAgent):
f " <search> No relevant document chunks found in ' { collection } ' ! </search> \n "
)
continue
# Format all chunks for batch processing
formatted_chunks = " "
for i , retrieved_result in enumerate ( retrieved_results ) :
formatted_chunks + = f " <chunk_ { i } > \n { retrieved_result . text } \n </chunk_ { i } > \n "
# Batch process all chunks with a single LLM call
chat_response = self . llm . chat (
content = self . llm . chat (
messages = [
{
" role " : " user " ,
@ -162,37 +164,38 @@ class DeepSearch(RAGAgent):
}
]
)
consume_tokens + = chat_response . total_tokens
response_content = self . llm . remove_think ( chat_response . content ) . strip ( )
content = self . llm . remove_think ( content ) . strip ( )
# Parse the response to determine which chunks are relevant
try :
relevance_list = self . llm . literal_eval ( response_ content)
relevance_list = self . llm . literal_eval ( content )
if not isinstance ( relevance_list , list ) :
raise ValueError ( " Response is not a list " )
except ( ValueError , SyntaxError ) :
# Fallback: if parsing fails, treat all chunks as relevant
log . color_print ( f " Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: { response_ content} " )
log . color_print ( f " Warning: Failed to parse relevance response. Treating all chunks as relevant. Response was: { content } " )
relevance_list = [ " YES " ] * len ( retrieved_results )
# Ensure we have enough relevance judgments for all chunks
while len ( relevance_list ) < len ( retrieved_results ) :
relevance_list . append ( " YES " ) # Default to relevant if no judgment provided
# Filter relevant chunks based on LLM response
accepted_chunk_num = 0
references = set ( )
for i , retrieved_result in enumerate ( retrieved_results ) :
# Check if we have a relevance judgment for this chunk
is_relevant = ( i < len ( relevance_list ) and
" YES " in relevance_list [ i ] . upper ( ) and
" NO " not in relevance_list [ i ] . upper ( ) ) if i < len ( relevance_list ) else True
is_relevant = (
i < len ( relevance_list ) and
" YES " in relevance_list [ i ] . upper ( ) and
" NO " not in relevance_list [ i ] . upper ( ) ) if i < len ( relevance_list
) else True
if is_relevant :
all_retrieved_results . append ( retrieved_result )
accepted_chunk_num + = 1
references . add ( retrieved_result . reference )
if accepted_chunk_num > 0 :
log . color_print (
f " <search> Accept { accepted_chunk_num } document chunk(s) from references: { list ( references ) } </search> \n "
@ -201,11 +204,11 @@ class DeepSearch(RAGAgent):
log . color_print (
f " <search> No document chunk accepted from ' { collection } ' ! </search> \n "
)
return all_retrieved_results , consume_tokens
return all_retrieved_results
def _generate_gap_queries (
self , original_query : str , all_sub_queries : L ist[ str ] , all_chunks : L ist[ RetrievalResult ]
) - > Tup le [ L ist[ str ] , int ] :
self , original_query : str , all_sub_queries : l ist[ str ] , all_chunks : l ist[ RetrievalResult ]
) - > list [ str ] :
reflect_prompt = REFLECT_PROMPT . format (
question = original_query ,
mini_questions = all_sub_queries ,
@ -213,11 +216,11 @@ class DeepSearch(RAGAgent):
if len ( all_chunks ) > 0
else " NO RELATED CHUNKS FOUND. " ,
)
chat_ response = self . llm . chat ( [ { " role " : " user " , " content " : reflect_prompt } ] )
response_content = self . llm . remove_think ( chat_ response. content )
return self . llm . literal_eval ( response_content ) , chat_response . total_tokens
response = self . llm . chat ( [ { " role " : " user " , " content " : reflect_prompt } ] )
response = self . llm . remove_think ( response )
return self . llm . literal_eval ( response )
def retrieve ( self , original_query : str , * * kwargs ) - > Tuple [ L ist[ RetrievalResult ] , int , dict ] :
def retrieve ( self , original_query : str , * * kwargs ) - > tuple [ l ist[ RetrievalResult ] , dict ] :
"""
Retrieve relevant documents from the knowledge base for the given query .
@ -231,26 +234,23 @@ class DeepSearch(RAGAgent):
Returns :
Tuple [ List [ RetrievalResult ] , int , dict ] : A tuple containing :
- A list of retrieved document results
- The token usage for the retrieval operation
- Additional information about the retrieval process
"""
return asyncio . run ( self . async_retrieve ( original_query , * * kwargs ) )
async def async_retrieve (
self , original_query : str , * * kwargs
) - > Tuple [ L ist[ RetrievalResult ] , int , dict ] :
) - > tuple [ l ist[ RetrievalResult ] , dict ] :
max_iter = kwargs . pop ( " max_iter " , self . max_iter )
### SUB QUERIES ###
log . color_print ( f " <query> { original_query } </query> \n " )
all_search_res = [ ]
all_sub_queries = [ ]
total_tokens = 0
sub_queries , used_token = self . _generate_sub_queries ( original_query )
total_tokens + = used_token
sub_queries = self . _generate_sub_queries ( original_query )
if not sub_queries :
log . color_print ( " No sub queries were generated by the LLM. Exiting. " )
return [ ] , total_tokens , { }
return [ ] , { }
else :
log . color_print (
f " <think> Break down the original query into new sub queries: { sub_queries } </think> \n "
@ -272,8 +272,7 @@ class DeepSearch(RAGAgent):
search_results = await asyncio . gather ( * search_tasks )
# Merge all results
for result in search_results :
search_res , consumed_token = result
total_tokens + = consumed_token
search_res = result
search_res_from_vectordb . extend ( search_res )
search_res_from_vectordb = deduplicate_results ( search_res_from_vectordb )
@ -284,10 +283,9 @@ class DeepSearch(RAGAgent):
break
### REFLECTION & GET GAP QUERIES ###
log . color_print ( " <think> Reflecting on the search results... </think> \n " )
sub_gap_queries , consumed_token = self . _generate_gap_queries (
sub_gap_queries = self . _generate_gap_queries (
original_query , all_sub_queries , all_search_res
)
total_tokens + = consumed_token
if not sub_gap_queries or len ( sub_gap_queries ) == 0 :
log . color_print ( " <think> No new search queries were generated. Exiting. </think> \n " )
break
@ -299,9 +297,9 @@ class DeepSearch(RAGAgent):
all_search_res = deduplicate_results ( all_search_res )
additional_info = { " all_sub_queries " : all_sub_queries }
return all_search_res , total_tokens , additional_info
return all_search_res , additional_info
def query ( self , query : str , * * kwargs ) - > T uple[ str , L ist[ RetrievalResult ] , int ] :
def query ( self , query : str , * * kwargs ) - > t uple[ str , l ist[ RetrievalResult ] ] :
"""
Query the agent and generate an answer based on retrieved documents .
@ -316,11 +314,10 @@ class DeepSearch(RAGAgent):
Tuple [ str , List [ RetrievalResult ] , int ] : A tuple containing :
- The generated answer
- A list of retrieved document results
- The total token usage
"""
all_retrieved_results , n_token_retrieval , additional_info = self . retrieve ( query , * * kwargs )
all_retrieved_results , additional_info = self . retrieve ( query , * * kwargs )
if not all_retrieved_results or len ( all_retrieved_results ) == 0 :
return f " No relevant information found for query ' { query } ' . " , [ ] , n_token_retrieval
return f " No relevant information found for query ' { query } ' . " , [ ]
all_sub_queries = additional_info [ " all_sub_queries " ]
chunk_texts = [ ]
for chunk in all_retrieved_results :
@ -336,16 +333,12 @@ class DeepSearch(RAGAgent):
mini_questions = all_sub_queries ,
mini_chunk_str = self . _format_chunk_texts ( chunk_texts ) ,
)
chat_ response = self . llm . chat ( [ { " role " : " user " , " content " : summary_prompt } ] )
response = self . llm . chat ( [ { " role " : " user " , " content " : summary_prompt } ] )
log . color_print ( " \n ==== FINAL ANSWER==== \n " )
log . color_print ( self . llm . remove_think ( chat_response . content ) )
return (
self . llm . remove_think ( chat_response . content ) ,
all_retrieved_results ,
n_token_retrieval + chat_response . total_tokens ,
)
log . color_print ( self . llm . remove_think ( response ) )
return self . llm . remove_think ( response ) , all_retrieved_results
def _format_chunk_texts ( self , chunk_texts : L ist[ str ] ) - > str :
def _format_chunk_texts ( self , chunk_texts : list [ str ] ) - > str :
chunk_str = " "
for i , chunk in enumerate ( chunk_texts ) :
chunk_str + = f """ <chunk_ { i } > \n { chunk } \n </chunk_ { i } > \n """