bbm25_haystack
1# SPDX-FileCopyrightText: 2024-present Yuxuan Wang <wangy49@seas.upenn.edu> 2# 3# SPDX-License-Identifier: Apache-2.0 4from bbm25_haystack.bbm25_retriever import BetterBM25Retriever 5from bbm25_haystack.bbm25_store import BetterBM25DocumentStore 6from bbm25_haystack.filters import apply_filters_to_document 7 8__all__ = [ 9 "BetterBM25DocumentStore", 10 "BetterBM25Retriever", 11 "apply_filters_to_document", 12]
51class BetterBM25DocumentStore: 52 """ 53 An in-memory BM25 document store intended to improve the default 54 ``InMemoryDocumentStore`` shipped with Haystack. 55 """ 56 57 default_sp_file: Final = os.path.join( 58 os.path.dirname(os.path.abspath(__file__)), "default.model" 59 ) 60 """@private""" 61 62 def __init__( 63 self, 64 *, 65 k: float = 1.5, 66 b: float = 0.75, 67 delta: float = 1.0, 68 sp_file: Optional[str] = None, 69 n_grams: Union[int, tuple[int, int]] = 1, 70 haystack_filter_logic: bool = True, 71 ) -> None: 72 """ 73 Creates a new ``BetterBM25DocumentStore`` instance. 74 75 :param k: k1 parameter in BM25+ formula. 76 :type k: ``Optional[float]`` 77 :param b: b parameter in BM25+ formula. 78 :type b: ``Optional[float]`` 79 :param delta: delta parameter in BM25+ formula. 80 :type delta: ``Optional[float]`` 81 :param sp_file: ``SentencePiece`` tokenizer ``.model`` file to 82 use. A default from LLaMA-2-32K is used if not provided. 83 :type sp_file: ``Optional[str]`` 84 :param n_grams: The n-gram window size. Can be a range of n-grams 85 to include in text representation. If a single integer is 86 provided, it will be treated as the maximum n-gram window size, 87 which is equivalent to ``(1, n_grams)``. 88 :type n_grams: ``Optional[Union[int, tuple[int, int]]]`` 89 :param haystack_filter_logic: Whether to use the Haystack filter 90 logic or the one implemented in this store. 91 :type haystack_filter_logic: ``Optional[bool]`` 92 """ 93 self.k = k 94 """@private""" 95 96 self.b = b 97 """@private""" 98 99 self.delta = delta / (self.k + 1.0) 100 """@private 101 102 Adjust the delta value so that we can bring the ``(k1 + 1)`` 103 term out of the 'term frequency' term in BM25+ formula and 104 delete it; this will not affect the ranking. 105 """ 106 107 self._parse_sp_file(sp_file=sp_file) 108 self._parse_n_grams(n_grams=n_grams) 109 110 self._haystack_filter_logic = haystack_filter_logic 111 self._filter_func = ( 112 document_matches_filter 113 if self._haystack_filter_logic 114 else apply_filters_to_document 115 ) 116 117 self._avg_doc_len: float = 0.0 118 self._freq_doc: Counter = Counter() 119 self._index: dict[str, tuple[Document, dict[tuple[str], int], int]] = {} 120 121 def _parse_sp_file(self, sp_file: Optional[str]) -> None: 122 self._sp_file = sp_file 123 124 if sp_file is None: 125 self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file) 126 return 127 128 if not os.path.exists(sp_file) or not os.path.isfile(sp_file): 129 msg = ( 130 f"Tokenizer model file '{sp_file}' not accessible; " 131 f"fallback to default {self.default_sp_file}." 132 ) 133 logger.warn(msg) 134 self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file) 135 return 136 137 try: 138 self._sp_inst = SentencePieceProcessor(model_file=sp_file) 139 except Exception as exc: 140 msg = ( 141 f"Failed to load tokenizer model file '{sp_file}': {exc}; " 142 f"fallback to default {self.default_sp_file}." 143 ) 144 logger.error(msg) 145 self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file) 146 147 def _parse_n_grams(self, n_grams: Optional[Union[int, tuple[int, int]]]) -> None: 148 self._n_grams = n_grams 149 150 if isinstance(n_grams, int): 151 self._n_grams_min = 1 152 self._n_grams_max = n_grams 153 return 154 155 if isinstance(n_grams, tuple): 156 self._n_grams_min, self._n_grams_max = n_grams 157 if not all(isinstance(n, int) for n in n_grams): 158 msg = f"Invalid n-gram window size: {n_grams}." 159 raise ValueError(msg) 160 return 161 162 msg = f"Invalid n-gram window size: {n_grams}; expected int or tuple." 163 raise ValueError(msg) 164 165 def _tokenize(self, texts: Union[str, list[str]]) -> list[list[tuple[str]]]: 166 """ 167 Tokenize input text using SentencePiece model. 168 169 The input text can either be a single string or a list of strings, 170 such as a single user query or a group of raw document. The tokenized 171 text will be augmented into set of n-grams based. 172 173 :param texts: Input text to tokenize, queries or documents. 174 :type texts: ``Union[str, list[str]]`` 175 176 :return: Tokenized and n-gram augmented texts. 177 :rtype: ``list[list[tuple[str]]]`` 178 """ 179 180 def _augment_to_n_grams(tokens: list[str]) -> list[tuple[str]]: 181 it = ( 182 _n_grams(tokens, n) 183 for n in range(self._n_grams_min, self._n_grams_max + 1) 184 ) 185 return list(chain(*it)) 186 187 if isinstance(texts, str): 188 texts = [texts] 189 return [ 190 _augment_to_n_grams(tokens) 191 for tokens in self._sp_inst.encode(texts, out_type=str) 192 ] 193 194 def _compute_bm25plus( 195 self, 196 query: str, 197 documents: list[Document], 198 ) -> list[tuple[Document, float]]: 199 """ 200 Calculate the BM25+ score for all documents in this index. 201 202 :param query: Query to calculate the BM25+ score for. 203 :type query: ``str`` 204 :param documents: Filtered pool of documents retrieve from. 205 :type documents: ``list[Document]`` 206 207 :return: Documents and corresponding BM25+ scores. 208 :rtype: ``list[tuple[Document, float]]`` 209 """ 210 cnt = lambda ng: self._freq_doc.get(ng, 0) 211 idf = { 212 ng: math.log(1 + (len(self._index) - cnt(ng) + 0.5) / (cnt(ng) + 0.5)) 213 for ng in self._tokenize(query)[0] 214 } 215 216 sim = [] 217 for doc in documents: 218 _, freq, doc_len = self._index[doc.id] 219 doc_len_scaled = doc_len / self._avg_doc_len 220 221 scr = 0.0 222 for token, idf_val in idf.items(): 223 freq_term = freq.get(token, 0.0) 224 freq_damp = self.k * (1 + self.b * (doc_len_scaled - 1)) 225 226 tf_val = freq_term / (freq_term + freq_damp) + self.delta 227 scr += idf_val * tf_val 228 229 sim.append((doc, scr)) 230 231 return sim 232 233 def _retrieval( 234 self, 235 query: str, 236 *, 237 filters: Optional[dict[str, Any]] = None, 238 top_k: Optional[int] = None, 239 ) -> list[tuple[Document, float]]: 240 """ 241 Retrieve documents from the store using the given query. 242 243 :param query: Query to search for. 244 :type query: ``str`` 245 :param filters: Filters to apply to the document list. 246 :type filters: ``Optional[dict[str, Any]]`` 247 :param top_k: Number of documents to return. 248 :type top_k: ``int`` 249 250 :return: Top ``k`` documents and corresponding BM25+ scores. 251 :rtype: ``list[tuple[Document, float]]`` 252 """ 253 documents = self.filter_documents(filters) 254 if not documents: 255 return [] 256 257 sim = self._compute_bm25plus(query, documents) 258 if top_k is None: 259 return sorted(sim, key=lambda x: x[1], reverse=True) 260 return heapq.nlargest(top_k, sim, key=lambda x: x[1]) 261 262 def count_documents(self) -> int: 263 """ 264 Returns how many documents are present in this store. 265 266 :return: Number of documents in the store. 267 :rtype: ``int`` 268 """ 269 return len(self._index.keys()) 270 271 def filter_documents( 272 self, filters: Optional[dict[str, Any]] = None 273 ) -> list[Document]: 274 """ 275 Filter documents in the store using the given filters. 276 277 :param filters: Filters to apply to the document list. 278 :type filters: ``Optional[dict[str, Any]]`` 279 280 :return: List of documents that match the given filters. 281 :rtype: ``list[Document]`` 282 """ 283 if filters is None or not filters: 284 return [doc for doc, _, _ in self._index.values()] 285 return [ 286 doc 287 for doc, _, _ in self._index.values() 288 if self._filter_func(filters, doc) 289 ] 290 291 def write_documents( 292 self, 293 documents: list[Document], 294 policy: DuplicatePolicy = DuplicatePolicy.NONE, 295 ) -> int: 296 """ 297 Writes (or overwrites) documents into the store. 298 299 :param documents: List of documents to write. 300 :type documents: ``list[Document]`` 301 :param policy: Documents with the same ``Document.id`` count as 302 duplicates. When duplicates are met, the store can: 303 - ``SKIP``: keep the existing document and ignore the new one. 304 - ``OVERWRITE``: remove the old document and write the new one. 305 - ``FAIL``: an error is raised (default behavior if not specified) 306 :type policy: ``Optional[DuplicatePolicy]`` 307 308 :raises ValueError: Exception trigger on invalid duplicate policy. 309 :raises DuplicateDocumentError: Exception trigger on duplicate 310 document if ``policy=DuplicatePolicy.FAIL`` 311 312 :return: Number of documents written. 313 :rtype: ``int`` 314 """ 315 if policy not in DuplicatePolicy: 316 msg = f"Invalid duplicate policy: {policy}." 317 raise ValueError(msg) 318 319 if policy == DuplicatePolicy.NONE: 320 policy = DuplicatePolicy.FAIL 321 322 n_written = 0 323 for doc in documents: 324 if not isinstance(doc, Document): 325 msg = f"Expected document type, got '{doc}' of type '{type(doc)}'." 326 raise ValueError(msg) 327 328 if doc.id in self._index.keys(): 329 if policy == DuplicatePolicy.SKIP: 330 continue 331 elif policy == DuplicatePolicy.FAIL: 332 msg = f"Document with ID '{doc.id}' already exists in the store." 333 raise DuplicateDocumentError(msg) 334 335 # Overwrite if exists; delete first to keep the statistics consistent 336 logger.debug( 337 f"Document '{doc.id}' already exists in the store, overwriting." 338 ) 339 self.delete_documents([doc.id]) 340 341 content = doc.content or "" 342 if content == "" and isinstance(doc.dataframe, pd.DataFrame): 343 content = doc.dataframe.astype(str).to_csv(index=False) 344 345 tokens = self._tokenize(content)[0] 346 347 self._index[doc.id] = (doc, Counter(tokens), len(tokens)) 348 self._freq_doc.update(set(tokens)) 349 self._avg_doc_len = ( 350 len(tokens) + self._avg_doc_len * len(self._index) 351 ) / (len(self._index) + 1) 352 353 logger.debug(f"Document '{doc.id}' written to store.") 354 n_written += 1 355 356 return n_written 357 358 def delete_documents(self, document_ids: list[str]) -> int: 359 """ 360 Deletes all documents with a matching ID. 361 362 :param document_ids: List of ``object_id`` to delete 363 :type document_ids: ``list[str]`` 364 365 :raises MissingDocumentError: Triggered on document not found. 366 367 :return: Number of documents deleted. 368 :rtype: ``int`` 369 """ 370 n_removal = 0 371 for doc_id in document_ids: 372 try: 373 _, freq, doc_len = self._index.pop(doc_id) 374 self._freq_doc.subtract(Counter(freq.keys())) 375 try: 376 self._avg_doc_len = ( 377 self._avg_doc_len * (len(self._index) + 1) - doc_len 378 ) / len(self._index) 379 except ZeroDivisionError: 380 self._avg_doc_len = 0 381 382 logger.debug(f"Document '{doc_id}' deleted from store.") 383 n_removal += 1 384 except KeyError as exc: 385 msg = f"Document with ID '{doc_id}' not found, cannot delete it." 386 raise MissingDocumentError(msg) from exc 387 388 return n_removal 389 390 def to_dict(self) -> dict[str, Any]: 391 """Serializes this store to a dictionary.""" 392 return default_to_dict( 393 self, 394 k=self.k, 395 b=self.b, 396 delta=self.delta * (self.k + 1.0), # Because we scaled it on init 397 sp_file=self._sp_file, 398 n_grams=self._n_grams, 399 haystack_filter_logic=self._haystack_filter_logic, 400 ) 401 402 @classmethod 403 def from_dict(cls, data: dict[str, Any]) -> "BetterBM25DocumentStore": 404 """Deserializes the store from a dictionary.""" 405 return default_from_dict(cls, data)
An in-memory BM25 document store intended to improve the default
InMemoryDocumentStore shipped with Haystack.
62 def __init__( 63 self, 64 *, 65 k: float = 1.5, 66 b: float = 0.75, 67 delta: float = 1.0, 68 sp_file: Optional[str] = None, 69 n_grams: Union[int, tuple[int, int]] = 1, 70 haystack_filter_logic: bool = True, 71 ) -> None: 72 """ 73 Creates a new ``BetterBM25DocumentStore`` instance. 74 75 :param k: k1 parameter in BM25+ formula. 76 :type k: ``Optional[float]`` 77 :param b: b parameter in BM25+ formula. 78 :type b: ``Optional[float]`` 79 :param delta: delta parameter in BM25+ formula. 80 :type delta: ``Optional[float]`` 81 :param sp_file: ``SentencePiece`` tokenizer ``.model`` file to 82 use. A default from LLaMA-2-32K is used if not provided. 83 :type sp_file: ``Optional[str]`` 84 :param n_grams: The n-gram window size. Can be a range of n-grams 85 to include in text representation. If a single integer is 86 provided, it will be treated as the maximum n-gram window size, 87 which is equivalent to ``(1, n_grams)``. 88 :type n_grams: ``Optional[Union[int, tuple[int, int]]]`` 89 :param haystack_filter_logic: Whether to use the Haystack filter 90 logic or the one implemented in this store. 91 :type haystack_filter_logic: ``Optional[bool]`` 92 """ 93 self.k = k 94 """@private""" 95 96 self.b = b 97 """@private""" 98 99 self.delta = delta / (self.k + 1.0) 100 """@private 101 102 Adjust the delta value so that we can bring the ``(k1 + 1)`` 103 term out of the 'term frequency' term in BM25+ formula and 104 delete it; this will not affect the ranking. 105 """ 106 107 self._parse_sp_file(sp_file=sp_file) 108 self._parse_n_grams(n_grams=n_grams) 109 110 self._haystack_filter_logic = haystack_filter_logic 111 self._filter_func = ( 112 document_matches_filter 113 if self._haystack_filter_logic 114 else apply_filters_to_document 115 ) 116 117 self._avg_doc_len: float = 0.0 118 self._freq_doc: Counter = Counter() 119 self._index: dict[str, tuple[Document, dict[tuple[str], int], int]] = {}
Creates a new BetterBM25DocumentStore instance.
Parameters
- k: k1 parameter in BM25+ formula.
- b: b parameter in BM25+ formula.
- delta: delta parameter in BM25+ formula.
- sp_file:
SentencePiecetokenizer.modelfile to use. A default from LLaMA-2-32K is used if not provided. - n_grams: The n-gram window size. Can be a range of n-grams
to include in text representation. If a single integer is
provided, it will be treated as the maximum n-gram window size,
which is equivalent to
(1, n_grams). - haystack_filter_logic: Whether to use the Haystack filter logic or the one implemented in this store.
262 def count_documents(self) -> int: 263 """ 264 Returns how many documents are present in this store. 265 266 :return: Number of documents in the store. 267 :rtype: ``int`` 268 """ 269 return len(self._index.keys())
Returns how many documents are present in this store.
Returns
Number of documents in the store.
271 def filter_documents( 272 self, filters: Optional[dict[str, Any]] = None 273 ) -> list[Document]: 274 """ 275 Filter documents in the store using the given filters. 276 277 :param filters: Filters to apply to the document list. 278 :type filters: ``Optional[dict[str, Any]]`` 279 280 :return: List of documents that match the given filters. 281 :rtype: ``list[Document]`` 282 """ 283 if filters is None or not filters: 284 return [doc for doc, _, _ in self._index.values()] 285 return [ 286 doc 287 for doc, _, _ in self._index.values() 288 if self._filter_func(filters, doc) 289 ]
Filter documents in the store using the given filters.
Parameters
- filters: Filters to apply to the document list.
Returns
List of documents that match the given filters.
291 def write_documents( 292 self, 293 documents: list[Document], 294 policy: DuplicatePolicy = DuplicatePolicy.NONE, 295 ) -> int: 296 """ 297 Writes (or overwrites) documents into the store. 298 299 :param documents: List of documents to write. 300 :type documents: ``list[Document]`` 301 :param policy: Documents with the same ``Document.id`` count as 302 duplicates. When duplicates are met, the store can: 303 - ``SKIP``: keep the existing document and ignore the new one. 304 - ``OVERWRITE``: remove the old document and write the new one. 305 - ``FAIL``: an error is raised (default behavior if not specified) 306 :type policy: ``Optional[DuplicatePolicy]`` 307 308 :raises ValueError: Exception trigger on invalid duplicate policy. 309 :raises DuplicateDocumentError: Exception trigger on duplicate 310 document if ``policy=DuplicatePolicy.FAIL`` 311 312 :return: Number of documents written. 313 :rtype: ``int`` 314 """ 315 if policy not in DuplicatePolicy: 316 msg = f"Invalid duplicate policy: {policy}." 317 raise ValueError(msg) 318 319 if policy == DuplicatePolicy.NONE: 320 policy = DuplicatePolicy.FAIL 321 322 n_written = 0 323 for doc in documents: 324 if not isinstance(doc, Document): 325 msg = f"Expected document type, got '{doc}' of type '{type(doc)}'." 326 raise ValueError(msg) 327 328 if doc.id in self._index.keys(): 329 if policy == DuplicatePolicy.SKIP: 330 continue 331 elif policy == DuplicatePolicy.FAIL: 332 msg = f"Document with ID '{doc.id}' already exists in the store." 333 raise DuplicateDocumentError(msg) 334 335 # Overwrite if exists; delete first to keep the statistics consistent 336 logger.debug( 337 f"Document '{doc.id}' already exists in the store, overwriting." 338 ) 339 self.delete_documents([doc.id]) 340 341 content = doc.content or "" 342 if content == "" and isinstance(doc.dataframe, pd.DataFrame): 343 content = doc.dataframe.astype(str).to_csv(index=False) 344 345 tokens = self._tokenize(content)[0] 346 347 self._index[doc.id] = (doc, Counter(tokens), len(tokens)) 348 self._freq_doc.update(set(tokens)) 349 self._avg_doc_len = ( 350 len(tokens) + self._avg_doc_len * len(self._index) 351 ) / (len(self._index) + 1) 352 353 logger.debug(f"Document '{doc.id}' written to store.") 354 n_written += 1 355 356 return n_written
Writes (or overwrites) documents into the store.
Parameters
- documents: List of documents to write.
- policy: Documents with the same
Document.idcount as duplicates. When duplicates are met, the store can:SKIP: keep the existing document and ignore the new one.OVERWRITE: remove the old document and write the new one.FAIL: an error is raised (default behavior if not specified)
Raises
- ValueError: Exception trigger on invalid duplicate policy.
- DuplicateDocumentError: Exception trigger on duplicate
document if
policy=DuplicatePolicy.FAIL
Returns
Number of documents written.
358 def delete_documents(self, document_ids: list[str]) -> int: 359 """ 360 Deletes all documents with a matching ID. 361 362 :param document_ids: List of ``object_id`` to delete 363 :type document_ids: ``list[str]`` 364 365 :raises MissingDocumentError: Triggered on document not found. 366 367 :return: Number of documents deleted. 368 :rtype: ``int`` 369 """ 370 n_removal = 0 371 for doc_id in document_ids: 372 try: 373 _, freq, doc_len = self._index.pop(doc_id) 374 self._freq_doc.subtract(Counter(freq.keys())) 375 try: 376 self._avg_doc_len = ( 377 self._avg_doc_len * (len(self._index) + 1) - doc_len 378 ) / len(self._index) 379 except ZeroDivisionError: 380 self._avg_doc_len = 0 381 382 logger.debug(f"Document '{doc_id}' deleted from store.") 383 n_removal += 1 384 except KeyError as exc: 385 msg = f"Document with ID '{doc_id}' not found, cannot delete it." 386 raise MissingDocumentError(msg) from exc 387 388 return n_removal
Deletes all documents with a matching ID.
Parameters
- document_ids: List of
object_idto delete
Raises
- MissingDocumentError: Triggered on document not found.
Returns
Number of documents deleted.
390 def to_dict(self) -> dict[str, Any]: 391 """Serializes this store to a dictionary.""" 392 return default_to_dict( 393 self, 394 k=self.k, 395 b=self.b, 396 delta=self.delta * (self.k + 1.0), # Because we scaled it on init 397 sp_file=self._sp_file, 398 n_grams=self._n_grams, 399 haystack_filter_logic=self._haystack_filter_logic, 400 )
Serializes this store to a dictionary.
44@component 45class BetterBM25Retriever: 46 """ 47 A component for retrieving documents from a ``BetterBM25DocumentStore``. 48 """ 49 50 def __init__( 51 self, 52 document_store: BetterBM25DocumentStore, 53 *, 54 filters: Optional[dict[str, Any]] = None, 55 top_k: int = 10, 56 set_score: bool = True, 57 ) -> None: 58 """ 59 Create a ``BetterBM25Retriever`` component. 60 61 :param document_store: A ``BetterBM25DocumentStore`` instance. 62 :type document_store: ``BetterBM25DocumentStore`` 63 :param filters: Haystack filters, a dictionary with filters to 64 narrow down the search space. The filters are applied 65 **before** similarity retrieval. 66 :type filters: ``Optional[dict[str, Any]]`` 67 :param top_k: The maximum number of documents to return. 68 :type top_k: ``int`` 69 :param set_score: Whether to set the similarity scores to returned 70 documents under ``Document.score`` attribute. This is useful in 71 hybrid retrieval setting where you may want to merge results. 72 Note that returned documents are **copies** so that the original 73 instances in the document store are not modified. 74 :type set_score: ``bool`` 75 76 :raises ValueError: If the ``filters`` or ``top_k`` is invalid. 77 :raises TypeError: If the ``document_store`` is not an instance of 78 ``BetterBM25DocumentStore``. 79 """ 80 _validate_search_params(filters, top_k) 81 82 self.filters = filters 83 """@private""" 84 85 self.top_k = top_k 86 """@private""" 87 88 self.set_score = set_score 89 """@private""" 90 91 if not isinstance(document_store, BetterBM25DocumentStore): 92 msg = "'document_store' must of type 'BetterBM25DocumentStore'" 93 raise TypeError(msg) 94 95 self.document_store = document_store 96 """@private""" 97 98 @component.output_types(documents=list[Document]) 99 def run( 100 self, 101 query: str, 102 *, 103 filters: Optional[dict[str, Any]] = None, 104 top_k: Optional[int] = None, 105 ) -> dict[str, list[Document]]: 106 """ 107 Run the Retriever on the given query. This method always return 108 copies of the documents retrieved from the document store. 109 110 :param query: The text search term. 111 :type query: ``str`` 112 :param filters: Haystack filters, a dictionary with filters to 113 narrow down the search space. The filters are applied 114 **before** similarity retrieval. 115 :type filters: ``Optional[dict[str, Any]]`` 116 :param top_k: The maximum number of documents to return. 117 :type top_k: ``Optional[int]`` 118 119 :return: The retrieved documents in a dictionary with key "documents". 120 """ 121 filters = filters or self.filters 122 top_k = top_k or self.top_k 123 124 _validate_search_params(filters, top_k) 125 126 sim = self.document_store._retrieval(query, filters=filters, top_k=top_k) 127 128 ret = [] 129 for doc, scr in sim: 130 data = doc.to_dict() 131 if self.set_score: 132 data["score"] = scr 133 ret.append(Document.from_dict(data)) 134 135 return {"documents": ret} 136 137 def to_dict(self) -> dict[str, Any]: 138 """Serializes the component to a dictionary.""" 139 return default_to_dict( 140 self, 141 filters=self.filters, 142 top_k=self.top_k, 143 document_store=self.document_store.to_dict(), 144 set_score=self.set_score, 145 ) 146 147 @classmethod 148 def from_dict(cls, data: dict[str, Any]) -> "BetterBM25Retriever": 149 """Deserializes the retriever from a dictionary.""" 150 doc_store_params = data["init_parameters"].get("document_store") 151 if doc_store_params is None: 152 msg = "Missing 'document_store' in serialization data" 153 raise DeserializationError(msg) 154 155 if doc_store_params.get("type") is None: 156 msg = "Missing 'type' in document store's serialization data" 157 raise DeserializationError(msg) 158 159 data["init_parameters"]["document_store"] = ( 160 BetterBM25DocumentStore.from_dict(doc_store_params) 161 ) 162 return default_from_dict(cls, data)
A component for retrieving documents from a BetterBM25DocumentStore.
50 def __init__( 51 self, 52 document_store: BetterBM25DocumentStore, 53 *, 54 filters: Optional[dict[str, Any]] = None, 55 top_k: int = 10, 56 set_score: bool = True, 57 ) -> None: 58 """ 59 Create a ``BetterBM25Retriever`` component. 60 61 :param document_store: A ``BetterBM25DocumentStore`` instance. 62 :type document_store: ``BetterBM25DocumentStore`` 63 :param filters: Haystack filters, a dictionary with filters to 64 narrow down the search space. The filters are applied 65 **before** similarity retrieval. 66 :type filters: ``Optional[dict[str, Any]]`` 67 :param top_k: The maximum number of documents to return. 68 :type top_k: ``int`` 69 :param set_score: Whether to set the similarity scores to returned 70 documents under ``Document.score`` attribute. This is useful in 71 hybrid retrieval setting where you may want to merge results. 72 Note that returned documents are **copies** so that the original 73 instances in the document store are not modified. 74 :type set_score: ``bool`` 75 76 :raises ValueError: If the ``filters`` or ``top_k`` is invalid. 77 :raises TypeError: If the ``document_store`` is not an instance of 78 ``BetterBM25DocumentStore``. 79 """ 80 _validate_search_params(filters, top_k) 81 82 self.filters = filters 83 """@private""" 84 85 self.top_k = top_k 86 """@private""" 87 88 self.set_score = set_score 89 """@private""" 90 91 if not isinstance(document_store, BetterBM25DocumentStore): 92 msg = "'document_store' must of type 'BetterBM25DocumentStore'" 93 raise TypeError(msg) 94 95 self.document_store = document_store 96 """@private"""
Create a BetterBM25Retriever component.
Parameters
- document_store: A
BetterBM25DocumentStoreinstance. - filters: Haystack filters, a dictionary with filters to narrow down the search space. The filters are applied before similarity retrieval.
- top_k: The maximum number of documents to return.
- set_score: Whether to set the similarity scores to returned
documents under
Document.scoreattribute. This is useful in hybrid retrieval setting where you may want to merge results. Note that returned documents are copies so that the original instances in the document store are not modified.
Raises
- ValueError: If the
filtersortop_kis invalid. - TypeError: If the
document_storeis not an instance ofBetterBM25DocumentStore.
98 @component.output_types(documents=list[Document]) 99 def run( 100 self, 101 query: str, 102 *, 103 filters: Optional[dict[str, Any]] = None, 104 top_k: Optional[int] = None, 105 ) -> dict[str, list[Document]]: 106 """ 107 Run the Retriever on the given query. This method always return 108 copies of the documents retrieved from the document store. 109 110 :param query: The text search term. 111 :type query: ``str`` 112 :param filters: Haystack filters, a dictionary with filters to 113 narrow down the search space. The filters are applied 114 **before** similarity retrieval. 115 :type filters: ``Optional[dict[str, Any]]`` 116 :param top_k: The maximum number of documents to return. 117 :type top_k: ``Optional[int]`` 118 119 :return: The retrieved documents in a dictionary with key "documents". 120 """ 121 filters = filters or self.filters 122 top_k = top_k or self.top_k 123 124 _validate_search_params(filters, top_k) 125 126 sim = self.document_store._retrieval(query, filters=filters, top_k=top_k) 127 128 ret = [] 129 for doc, scr in sim: 130 data = doc.to_dict() 131 if self.set_score: 132 data["score"] = scr 133 ret.append(Document.from_dict(data)) 134 135 return {"documents": ret}
Run the Retriever on the given query. This method always return copies of the documents retrieved from the document store.
Parameters
- query: The text search term.
- filters: Haystack filters, a dictionary with filters to narrow down the search space. The filters are applied before similarity retrieval.
- top_k: The maximum number of documents to return.
Returns
The retrieved documents in a dictionary with key "documents".
137 def to_dict(self) -> dict[str, Any]: 138 """Serializes the component to a dictionary.""" 139 return default_to_dict( 140 self, 141 filters=self.filters, 142 top_k=self.top_k, 143 document_store=self.document_store.to_dict(), 144 set_score=self.set_score, 145 )
Serializes the component to a dictionary.
147 @classmethod 148 def from_dict(cls, data: dict[str, Any]) -> "BetterBM25Retriever": 149 """Deserializes the retriever from a dictionary.""" 150 doc_store_params = data["init_parameters"].get("document_store") 151 if doc_store_params is None: 152 msg = "Missing 'document_store' in serialization data" 153 raise DeserializationError(msg) 154 155 if doc_store_params.get("type") is None: 156 msg = "Missing 'type' in document store's serialization data" 157 raise DeserializationError(msg) 158 159 data["init_parameters"]["document_store"] = ( 160 BetterBM25DocumentStore.from_dict(doc_store_params) 161 ) 162 return default_from_dict(cls, data)
Deserializes the retriever from a dictionary.
14def apply_filters_to_document( 15 filters: Optional[dict[str, Any]], document: Document 16) -> bool: 17 """ 18 Apply filters to a document. Differences with the official 19 Haystack implementation: 20 21 - Comparison with ``None``, i.e., missing values, involved will 22 always return ``False``, no matter missing the document 23 attribute value or missing the filter value. 24 - Comparison with ``pandas.DataFrame`` is always prohibited to 25 reduce surprises. 26 - No implicit ``datetime`` conversion from string values. 27 - ``in`` and ``not in`` allows any ``Iterable`` as filter value, 28 without the ``list`` constraint. 29 30 :param filters: The filters to apply to the document. 31 :type filters: ``dict[str, Any]`` 32 :param document: The document to apply the filters to. 33 :type document: ``Document`` 34 35 :return: ``True`` if the document passes the filters. 36 :rtype: ``bool`` 37 """ 38 if filters is None or not filters: 39 return True 40 return _run_comparison_condition(filters, document)
Apply filters to a document. Differences with the official Haystack implementation:
- Comparison with
None, i.e., missing values, involved will always returnFalse, no matter missing the document attribute value or missing the filter value. - Comparison with
pandas.DataFrameis always prohibited to reduce surprises. - No implicit
datetimeconversion from string values. inandnot inallows anyIterableas filter value, without thelistconstraint.
Parameters
- filters: The filters to apply to the document.
- document: The document to apply the filters to.
Returns
Trueif the document passes the filters.