Browse Source

优化迭代去重逻辑

main
tanxing 6 days ago
parent
commit
5733d25d65
  1. 116
      deepsearcher/agent/deep_search.py
  2. 2
      deepsearcher/config.yaml

116
deepsearcher/agent/deep_search.py

@ -8,10 +8,10 @@ from deepsearcher.vector_db.base import BaseVectorDB, deduplicate
COLLECTION_ROUTE_PROMPT = """
I provide you with collection_name(s) and corresponding collection_description(s).
Please select the collection names that may be related to the question and return a python list of str.
If there is no collection related to the question, you can return an empty list.
Please select the collection names that may be related to the query and return a python list of str.
If there is no collection related to the query, you can return an empty list.
"QUESTION": {question}
"Query": {query}
"COLLECTION_INFO": {collection_info}
When you return, you can ONLY return a json convertable python list of str, WITHOUT any other additional content.
@ -21,7 +21,7 @@ Your selected collection name list is:
SUB_QUERY_PROMPT = """
To answer this question more comprehensively, please break down the original question into few numbers of sub-questions
(the less the better, but more if nesscessary to ensure the coverage of answering the original question).
(the less the better, but more if nesscessary to ensure answering the original question).
If this is a very simple question and no decomposition is necessary, then keep the only one original question.
Make sure each sub-question is clear, concise and atomic.
Return as list of str in python style and json convertable.
@ -64,15 +64,15 @@ Determine whether additional search queries are needed based on the original que
If returned chunks does not cover all previous sub-queries, this means that there are no related documents can be retrieved.
In this case, try generate simliar but slightly different queries to the previous sub-queries.
And if further research is needed based on the new information which those chunks provided, give more queries on the basis of them.
(which is prefered, even if the previous sub-queries can be well answered by retrieved chunks, but ultimately according to your judge)
(which is prefered, but ultimately according to your judge)
If no further research is needed, return an empty list.
Original Query: {question}
Original Query: {original_query}
Previous Sub Queries: {mini_questions}
Previous Sub Queries: {all_sub_queries}
Related Chunks:
{mini_chunk_str}
{chunks}
Respond exclusively in valid List of str format without any other text."""
@ -83,13 +83,14 @@ Please generate a long, specific and detailed answer or report based on the prev
If the chunks are not enough to answer the query or additional information is needed to enhance the content, you should answer with your own knowledge.
In this case, mark the part(s) that generated by your own with <unref>your knowledge here</unref>
(Don't place <unref></unref> part(s) individually into one paragraph, but insert it the proper place of the report)
Plus, you should give references in the report where you quote from the chunks using markdown links, and give a list of references at the end of the report.
Original Query: {question}
Original Query: {original_query}
Previous Sub Queries: {mini_questions}
Previous Sub Queries: {all_sub_queries}
Related Chunks:
{mini_chunk_str}
{chunks}
"""
@ -165,7 +166,7 @@ class DeepSearch(BaseAgent):
)
return [the_only_collection]
vector_db_search_prompt = COLLECTION_ROUTE_PROMPT.format(
question=query,
query=query,
collection_info=[
{
"collection_name": collection_info.collection_name,
@ -284,21 +285,27 @@ class DeepSearch(BaseAgent):
)
return all_retrieved_results
def _generate_gap_queries(
self, original_query: str, all_sub_queries: list[str], all_chunks: list[RetrievalResult]
def _generate_more_sub_queries(
self, original_query: str, all_sub_queries: list[str], all_retrieved_results: list[RetrievalResult]
) -> list[str]:
chunks = []
for i, chunk in enumerate(all_retrieved_results):
if self.text_window_splitter and "wider_text" in chunk.metadata:
chunks.append(chunk.metadata["wider_text"])
else:
chunks.append(f'''<chunk {i + 1}>{chunk.text}</chunk {i + 1}><reference {i + 1}>{chunk.reference}</reference {i + 1}>''')
reflect_prompt = REFLECT_PROMPT.format(
question=original_query,
mini_questions=all_sub_queries,
mini_chunk_str=self._format_chunk_texts([chunk.text for chunk in all_chunks])
if len(all_chunks) > 0
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks="\n".join(chunks)
if len(all_retrieved_results) > 0
else "NO RELATED CHUNKS FOUND.",
)
response = self.llm.chat([{"role": "user", "content": reflect_prompt}])
response = self.llm.remove_think(response)
return self.llm.literal_eval(response)
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], dict]:
def retrieve(self, original_query: str, **kwargs) -> tuple[list[RetrievalResult], list[str]]:
"""
Retrieve relevant documents from the knowledge base for the given query.
@ -317,7 +324,7 @@ class DeepSearch(BaseAgent):
max_iter = kwargs.pop("max_iter", self.max_iter)
### SUB QUERIES ###
log.color_print(f"<query> {original_query} </query>\n")
all_search_res = []
all_search_results = []
all_sub_queries = []
sub_queries = self._generate_sub_queries(original_query)
@ -327,48 +334,44 @@ class DeepSearch(BaseAgent):
else:
log.color_print(f"</think> Break down the original query into new sub queries: {sub_queries} ")
all_sub_queries.extend(sub_queries)
sub_gap_queries = sub_queries
for iter in range(max_iter):
log.color_print(f">> Iteration: {iter + 1}\n")
search_res_from_vectordb = []
# search_res_from_internet = [] # TODO
for it in range(max_iter):
log.color_print(f">> Iteration: {it + 1}\n")
# Execute all search tasks sequentially
for query in sub_gap_queries:
for query in sub_queries:
result = self._search_chunks_from_vectordb(query)
search_res_from_vectordb.extend(result)
undedup_len = len(search_res_from_vectordb)
search_res_from_vectordb = deduplicate(search_res_from_vectordb)
deduped_len = len(search_res_from_vectordb)
if undedup_len - deduped_len != 0:
all_search_results.extend(result)
undeduped_len = len(all_search_results)
all_search_results = deduplicate(all_search_results)
deduped_len = len(all_search_results)
if undeduped_len - deduped_len != 0:
log.color_print(
f"<search> Removed {undedup_len - deduped_len} duplicates </search>"
f"<search> Removed {undeduped_len - deduped_len} duplicates </search> "
)
# search_res_from_internet = deduplicate_results(search_res_from_internet)
# all_search_res.extend(search_res_from_vectordb + search_res_from_internet)
all_search_res.extend(search_res_from_vectordb)
if iter == max_iter - 1:
if it == max_iter - 1:
log.color_print("</think> Exceeded maximum iterations. Exiting. ")
break
### REFLECTION & GET GAP QUERIES ###
### REFLECTION & GET MORE SUB QUERIES ###
log.color_print("</think> Reflecting on the search results... ")
sub_gap_queries = self._generate_gap_queries(
original_query, all_sub_queries, all_search_res
sub_queries = self._generate_more_sub_queries(
original_query, all_sub_queries, all_search_results
)
if not sub_gap_queries or len(sub_gap_queries) == 0:
if not sub_queries or len(sub_queries) == 0:
log.color_print("</think> No new search queries were generated. Exiting. ")
break
else:
log.color_print(
f"</think> New search queries for next iteration: {sub_gap_queries} ")
all_sub_queries.extend(sub_gap_queries)
f"</think> New search queries for next iteration: {sub_queries} ")
all_sub_queries.extend(sub_queries)
all_search_res = deduplicate(all_search_res)
additional_info = {"all_sub_queries": all_sub_queries}
return all_search_res, additional_info
all_search_results = deduplicate(all_search_results)
return all_search_results, all_sub_queries
def query(self, query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
def query(self, original_query: str, **kwargs) -> tuple[str, list[RetrievalResult]]:
"""
Query the agent and generate an answer based on retrieved documents.
@ -384,31 +387,24 @@ class DeepSearch(BaseAgent):
- The generated answer
- A list of retrieved document results
"""
all_retrieved_results, additional_info = self.retrieve(query, **kwargs)
all_retrieved_results, all_sub_queries = self.retrieve(original_query, **kwargs)
if not all_retrieved_results or len(all_retrieved_results) == 0:
return f"No relevant information found for query '{query}'.", []
all_sub_queries = additional_info["all_sub_queries"]
chunk_texts = []
for chunk in all_retrieved_results:
return f"No relevant information found for query '{original_query}'.", []
chunks = [] # type: list[str]
for i, chunk in enumerate(all_retrieved_results):
if self.text_window_splitter and "wider_text" in chunk.metadata:
chunk_texts.append(chunk.metadata["wider_text"])
chunks.append(chunk.metadata["wider_text"])
else:
chunk_texts.append(chunk.text)
chunks.append(f'''<chunk {i + 1}>{chunk.text}</chunk {i + 1}><reference {i + 1}>{chunk.reference}</reference {i + 1}>''')
log.color_print(
f"<think> Summarize answer from all {len(all_retrieved_results)} retrieved chunks... </think>\n"
)
summary_prompt = SUMMARY_PROMPT.format(
question=query,
mini_questions=all_sub_queries,
mini_chunk_str=self._format_chunk_texts(chunk_texts),
original_query=original_query,
all_sub_queries=all_sub_queries,
chunks="\n".join(chunks)
)
response = self.llm.chat([{"role": "user", "content": summary_prompt}])
log.color_print("\n==== FINAL ANSWER====\n")
log.color_print(self.llm.remove_think(response))
return self.llm.remove_think(response), all_retrieved_results
def _format_chunk_texts(self, chunk_texts: list[str]) -> str:
chunk_str = ""
for i, chunk in enumerate(chunk_texts):
chunk_str += f"""<chunk_{i + 1}>\n{chunk}\n</chunk_{i + 1}>\n"""
return chunk_str

2
deepsearcher/config.yaml

@ -80,7 +80,7 @@ provide_settings:
# port: 6333
query_settings:
max_iter: 3
max_iter: 2
load_settings:
chunk_size: 2048

Loading…
Cancel
Save