Spaces:
Paused
Paused
File size: 6,492 Bytes
89cbc4d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
#####################################################
### DOCUMENT PROCESSOR [MERGER]
#####################################################
# Jonathan Wang
# ABOUT:
# This project creates an app to chat with PDFs.
# This is the MERGER
# which defines how two lists with scores
# should be merged together into one list.
# (Useful for fusing things like keywords or textnodes)
#####################################################
## TODOS:
# We're looping through A/B more than necessary.
#####################################################
## IMPORTS:
from __future__ import annotations
from typing import TYPE_CHECKING, Sequence, TypeVar, Union
import numpy as np
if TYPE_CHECKING:
from numpy.typing import NDArray
#####################################################
## CODE:
GenericType = TypeVar("GenericType")
### TODO(Jonathan Wang): Implement Maximum Marginal Relevance (MMR)
# https://en.wikipedia.org/wiki/Maximum_marginal_relevance
# def mmr(documents, query, scores, lambda_param=0.5):
# """
# Calculate Maximum Marginal Relevance (MMR) for a list of documents.
# Parameters:
# documents (list of np.array): List of document vectors.
# query (np.array): Query vector.
# scores (list of float): Relevance scores for each document.
# lambda_param (float): Trade-off parameter between relevance and diversity.
# Returns:
# list of int: Indices of selected documents in order of selection.
# """
# selected = []
# remaining = list(range(len(documents)))
# while remaining:
# if not selected:
# # Select the document with the highest relevance score
# idx = np.argmax(scores)
# else:
# # Calculate MMR for remaining documents
# mmr_scores = []
# for i in remaining:
# relevance = scores[i]
# diversity = max([np.dot(documents[i], documents[j]) for j in selected])
# mmr_score = lambda_param * relevance - (1 - lambda_param) * diversity
# mmr_scores.append(mmr_score)
# idx = remaining[np.argmax(mmr_scores)]
# selected.append(idx)
# remaining.remove(idx)
# return selected
def _merge_on_scores(
a_list: Sequence[GenericType],
b_list: Sequence[GenericType],
a_scores_input: Sequence[float | np.float64 | None],
b_scores_input: Sequence[float | np.float64 | None],
use_distribution: bool = True,
a_weight: float = 0.5,
top_k: int = 5,
) -> Sequence[GenericType]:
"""
Given two lists of elements with scores, fuse them together using "Distribution-Based Score Fusion".
Elements which have high scores in both lists are given even higher ranking here.
Inputs:
a_list: list of elements for A
a_scores: list of scores for each element in A. Assume higher is better. Share the same index.
b_list: list of elements for B
b_scores: list of scores for each element in B. Assume higher is better. Share the same index.
use_distribution: Whether to fuse using Min-Max Scaling (FALSE) or Distribution Based Score Fusion (TRUE)
Outputs:
List: List of elements that passed the merge.
"""
# Guard Clauses
if ((len(a_list) != len(a_scores_input)) or (len(b_list) != len(b_scores_input))):
msg = (
f"""_merge_on_scores: Differing number of elements and scores!
a_list: {a_list}
a_scores: {a_scores_input}
b_list: {b_list}
b_scores: {b_scores_input}
"""
)
raise ValueError(msg)
if (a_weight > 1 or a_weight < 0):
msg = "_merge_on_scores: weight for the A list should be between 0 and 1."
raise ValueError(msg)
if (top_k < 0): # or top_k > :
# TODO(Jonathan Wang): Find a nice way to get the number of unique elements in a list
# where those elements are potentially unhashable AND unorderable.
# I know about the n^2 solution with two lists and (if not in x), but it's a bit annoying.
msg = "_merge_on_scores: top_k must be between 0 and the total number of elements."
raise ValueError(msg)
# 0. Convert to numpy arrays
# NOTE: When using a SubQuestionQueryEngine, the subanswers are saved as NodesWithScores, but their score is None.
# We want to filter these out, so we get citations when the two texts are very similar.
a_scores: NDArray[np.float64] = np.array(a_scores_input, dtype=np.float64)
b_scores: NDArray[np.float64] = np.array(b_scores_input, dtype=np.float64)
# 1. Calculate mean of scores.
a_mean = np.nanmean(a_scores) # np.nan if empty
b_mean = np.nanmean(b_scores)
# 2. Calculate standard deviations
a_stdev = np.nanstd(a_scores)
b_stdev = np.nanstd(b_scores)
# 3. Get minimum and maximum bands as 3std from mean
# alternatively, use actual min-max scaling
a_min = a_mean - 3 * a_stdev if use_distribution else np.nanmin(a_scores)
a_max = a_mean + 3 * a_stdev if use_distribution else np.nanmax(a_scores)
b_min = b_mean - 3 * b_stdev if use_distribution else np.nanmin(b_scores)
b_max = b_mean + 3 * b_stdev if use_distribution else np.nanmax(b_scores)
# 4. Rescale the distributions
if (a_max > a_min):
a_scores = np.array([
((x - a_min) / (a_max - a_min))
for x in a_scores
], dtype=np.float64)
if (b_max > b_min):
b_scores = np.array([
(x - b_min) / (b_max - b_min)
for x in b_scores
], dtype=np.float64)
# 5. Fuse the scores together
full_dict: list[tuple[GenericType, float]] = []
for index, element in enumerate(a_list):
a_score = a_scores[index]
if (element in b_list):
# In both A and B. Fuse score.
b_score = b_scores[b_list.index(element)]
fused_score = a_weight * a_score + (1-a_weight) * b_score
full_dict.append((element, fused_score))
else:
# Only in A.
full_dict.append((element, a_weight * a_score))
for index, element in enumerate(b_list):
if (element not in a_list):
b_score = b_scores[index]
full_dict.append((element, (1-a_weight) * b_score))
full_dict = sorted(full_dict, key=lambda item: item[1], reverse=True)
output_list = [item[0] for item in full_dict]
if (top_k >= len(full_dict)):
return output_list
# create final response object
return output_list[:top_k]
|