File size: 7,132 Bytes
b699122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""Common classes/functions for tree index operations."""


import asyncio
import logging
from typing import Dict, List, Optional, Sequence, Tuple

from gpt_index.async_utils import run_async_tasks
from gpt_index.data_structs.data_structs_v2 import IndexGraph
from gpt_index.data_structs.node_v2 import Node
from gpt_index.docstore import DocumentStore
from gpt_index.indices.service_context import ServiceContext
from gpt_index.indices.utils import get_sorted_node_list, truncate_text
from gpt_index.prompts.prompts import SummaryPrompt

logger = logging.getLogger(__name__)


class GPTTreeIndexBuilder:
    """GPT tree index builder.

    Helper class to build the tree-structured index,
    or to synthesize an answer.

    """

    def __init__(
        self,
        num_children: int,
        summary_prompt: SummaryPrompt,
        service_context: ServiceContext,
        docstore: Optional[DocumentStore] = None,
        use_async: bool = False,
    ) -> None:
        """Initialize with params."""
        if num_children < 2:
            raise ValueError("Invalid number of children.")
        self.num_children = num_children
        self.summary_prompt = summary_prompt
        self._service_context = service_context
        self._use_async = use_async
        self._docstore = docstore or DocumentStore()

    @property
    def docstore(self) -> DocumentStore:
        """Return docstore."""
        return self._docstore

    def build_from_nodes(
        self,
        nodes: Sequence[Node],
        build_tree: bool = True,
    ) -> IndexGraph:
        """Build from text.

        Returns:
            IndexGraph: graph object consisting of all_nodes, root_nodes

        """
        index_graph = IndexGraph()
        for node in nodes:
            index_graph.insert(node)

        if build_tree:
            return self.build_index_from_nodes(
                index_graph, index_graph.all_nodes, index_graph.all_nodes, level=0
            )
        else:
            return index_graph

    def _prepare_node_and_text_chunks(
        self, cur_node_ids: Dict[int, str]
    ) -> Tuple[List[int], List[List[Node]], List[str]]:
        """Prepare node and text chunks."""
        cur_nodes = {
            index: self._docstore.get_node(node_id)
            for index, node_id in cur_node_ids.items()
        }
        cur_node_list = get_sorted_node_list(cur_nodes)
        logger.info(
            f"> Building index from nodes: {len(cur_nodes) // self.num_children} chunks"
        )
        indices, cur_nodes_chunks, text_chunks = [], [], []
        for i in range(0, len(cur_node_list), self.num_children):
            cur_nodes_chunk = cur_node_list[i : i + self.num_children]
            text_chunk = self._service_context.prompt_helper.get_text_from_nodes(
                cur_nodes_chunk, prompt=self.summary_prompt
            )
            indices.append(i)
            cur_nodes_chunks.append(cur_nodes_chunk)
            text_chunks.append(text_chunk)
        return indices, cur_nodes_chunks, text_chunks

    def _construct_parent_nodes(
        self,
        index_graph: IndexGraph,
        indices: List[int],
        cur_nodes_chunks: List[List[Node]],
        summaries: List[str],
    ) -> Dict[int, str]:
        """Construct parent nodes.

        Save nodes to docstore.

        """
        new_node_dict = {}
        for i, cur_nodes_chunk, new_summary in zip(
            indices, cur_nodes_chunks, summaries
        ):
            logger.debug(
                f"> {i}/{len(cur_nodes_chunk)}, "
                f"summary: {truncate_text(new_summary, 50)}"
            )
            new_node = Node(
                text=new_summary,
            )
            index_graph.insert(new_node, children_nodes=cur_nodes_chunk)
            index = index_graph.get_index(new_node)
            new_node_dict[index] = new_node.get_doc_id()
            self._docstore.add_documents([new_node], allow_update=False)
        return new_node_dict

    def build_index_from_nodes(
        self,
        index_graph: IndexGraph,
        cur_node_ids: Dict[int, str],
        all_node_ids: Dict[int, str],
        level: int = 0,
    ) -> IndexGraph:
        """Consolidates chunks recursively, in a bottoms-up fashion."""
        if len(cur_node_ids) <= self.num_children:
            index_graph.root_nodes = cur_node_ids
            return index_graph

        indices, cur_nodes_chunks, text_chunks = self._prepare_node_and_text_chunks(
            cur_node_ids
        )

        if self._use_async:
            tasks = [
                self._service_context.llm_predictor.apredict(
                    self.summary_prompt, context_str=text_chunk
                )
                for text_chunk in text_chunks
            ]
            outputs: List[Tuple[str, str]] = run_async_tasks(tasks)
            summaries = [output[0] for output in outputs]
        else:
            summaries = [
                self._service_context.llm_predictor.predict(
                    self.summary_prompt, context_str=text_chunk
                )[0]
                for text_chunk in text_chunks
            ]
        self._service_context.llama_logger.add_log(
            {"summaries": summaries, "level": level}
        )

        new_node_dict = self._construct_parent_nodes(
            index_graph, indices, cur_nodes_chunks, summaries
        )
        all_node_ids.update(new_node_dict)

        index_graph.root_nodes = new_node_dict

        if len(new_node_dict) <= self.num_children:
            return index_graph
        else:
            return self.build_index_from_nodes(
                index_graph, new_node_dict, all_node_ids, level=level + 1
            )

    async def abuild_index_from_nodes(
        self,
        index_graph: IndexGraph,
        cur_node_ids: Dict[int, str],
        all_node_ids: Dict[int, str],
        level: int = 0,
    ) -> IndexGraph:
        """Consolidates chunks recursively, in a bottoms-up fashion."""
        if len(cur_node_ids) <= self.num_children:
            index_graph.root_nodes = cur_node_ids
            return index_graph

        indices, cur_nodes_chunks, text_chunks = self._prepare_node_and_text_chunks(
            cur_node_ids
        )

        tasks = [
            self._service_context.llm_predictor.apredict(
                self.summary_prompt, context_str=text_chunk
            )
            for text_chunk in text_chunks
        ]
        outputs: List[Tuple[str, str]] = await asyncio.gather(*tasks)
        summaries = [output[0] for output in outputs]
        self._service_context.llama_logger.add_log(
            {"summaries": summaries, "level": level}
        )

        new_node_dict = self._construct_parent_nodes(
            index_graph, indices, cur_nodes_chunks, summaries
        )
        all_node_ids.update(new_node_dict)

        index_graph.root_nodes = new_node_dict

        if len(new_node_dict) <= self.num_children:
            return index_graph
        else:
            return await self.abuild_index_from_nodes(
                index_graph, new_node_dict, all_node_ids, level=level + 1
            )