bbm25_haystack

 1# SPDX-FileCopyrightText: 2024-present Yuxuan Wang <wangy49@seas.upenn.edu>
 2#
 3# SPDX-License-Identifier: Apache-2.0
 4from bbm25_haystack.bbm25_retriever import BetterBM25Retriever
 5from bbm25_haystack.bbm25_store import BetterBM25DocumentStore
 6from bbm25_haystack.filters import apply_filters_to_document
 7
 8__all__ = [
 9    "BetterBM25DocumentStore",
10    "BetterBM25Retriever",
11    "apply_filters_to_document",
12]
class BetterBM25DocumentStore:
 51class BetterBM25DocumentStore:
 52    """
 53    An in-memory BM25 document store intended to improve the default
 54    ``InMemoryDocumentStore`` shipped with Haystack.
 55    """
 56
 57    default_sp_file: Final = os.path.join(
 58        os.path.dirname(os.path.abspath(__file__)), "default.model"
 59    )
 60    """@private"""
 61
 62    def __init__(
 63        self,
 64        *,
 65        k: float = 1.5,
 66        b: float = 0.75,
 67        delta: float = 1.0,
 68        sp_file: Optional[str] = None,
 69        n_grams: Union[int, tuple[int, int]] = 1,
 70        haystack_filter_logic: bool = True,
 71    ) -> None:
 72        """
 73        Creates a new ``BetterBM25DocumentStore`` instance.
 74
 75        :param k: k1 parameter in BM25+ formula.
 76        :type k: ``Optional[float]``
 77        :param b: b parameter in BM25+ formula.
 78        :type b: ``Optional[float]``
 79        :param delta: delta parameter in BM25+ formula.
 80        :type delta: ``Optional[float]``
 81        :param sp_file: ``SentencePiece`` tokenizer ``.model`` file to
 82            use. A default from LLaMA-2-32K is used if not provided.
 83        :type sp_file: ``Optional[str]``
 84        :param n_grams: The n-gram window size. Can be a range of n-grams
 85            to include in text representation. If a single integer is
 86            provided, it will be treated as the maximum n-gram window size,
 87            which is equivalent to ``(1, n_grams)``.
 88        :type n_grams: ``Optional[Union[int, tuple[int, int]]]``
 89        :param haystack_filter_logic: Whether to use the Haystack filter
 90            logic or the one implemented in this store.
 91        :type haystack_filter_logic: ``Optional[bool]``
 92        """
 93        self.k = k
 94        """@private"""
 95
 96        self.b = b
 97        """@private"""
 98
 99        self.delta = delta / (self.k + 1.0)
100        """@private
101
102        Adjust the delta value so that we can bring the ``(k1 + 1)``
103        term out of the 'term frequency' term in BM25+ formula and
104        delete it; this will not affect the ranking.
105        """
106
107        self._parse_sp_file(sp_file=sp_file)
108        self._parse_n_grams(n_grams=n_grams)
109
110        self._haystack_filter_logic = haystack_filter_logic
111        self._filter_func = (
112            document_matches_filter
113            if self._haystack_filter_logic
114            else apply_filters_to_document
115        )
116
117        self._avg_doc_len: float = 0.0
118        self._freq_doc: Counter = Counter()
119        self._index: dict[str, tuple[Document, dict[tuple[str], int], int]] = {}
120
121    def _parse_sp_file(self, sp_file: Optional[str]) -> None:
122        self._sp_file = sp_file
123
124        if sp_file is None:
125            self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file)
126            return
127
128        if not os.path.exists(sp_file) or not os.path.isfile(sp_file):
129            msg = (
130                f"Tokenizer model file '{sp_file}' not accessible; "
131                f"fallback to default {self.default_sp_file}."
132            )
133            logger.warn(msg)
134            self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file)
135            return
136
137        try:
138            self._sp_inst = SentencePieceProcessor(model_file=sp_file)
139        except Exception as exc:
140            msg = (
141                f"Failed to load tokenizer model file '{sp_file}': {exc}; "
142                f"fallback to default {self.default_sp_file}."
143            )
144            logger.error(msg)
145            self._sp_inst = SentencePieceProcessor(model_file=self.default_sp_file)
146
147    def _parse_n_grams(self, n_grams: Optional[Union[int, tuple[int, int]]]) -> None:
148        self._n_grams = n_grams
149
150        if isinstance(n_grams, int):
151            self._n_grams_min = 1
152            self._n_grams_max = n_grams
153            return
154
155        if isinstance(n_grams, tuple):
156            self._n_grams_min, self._n_grams_max = n_grams
157            if not all(isinstance(n, int) for n in n_grams):
158                msg = f"Invalid n-gram window size: {n_grams}."
159                raise ValueError(msg)
160            return
161
162        msg = f"Invalid n-gram window size: {n_grams}; expected int or tuple."
163        raise ValueError(msg)
164
165    def _tokenize(self, texts: Union[str, list[str]]) -> list[list[tuple[str]]]:
166        """
167        Tokenize input text using SentencePiece model.
168
169        The input text can either be a single string or a list of strings,
170        such as a single user query or a group of raw document. The tokenized
171        text will be augmented into set of n-grams based.
172
173        :param texts: Input text to tokenize, queries or documents.
174        :type texts: ``Union[str, list[str]]``
175
176        :return: Tokenized and n-gram augmented texts.
177        :rtype: ``list[list[tuple[str]]]``
178        """
179
180        def _augment_to_n_grams(tokens: list[str]) -> list[tuple[str]]:
181            it = (
182                _n_grams(tokens, n)
183                for n in range(self._n_grams_min, self._n_grams_max + 1)
184            )
185            return list(chain(*it))
186
187        if isinstance(texts, str):
188            texts = [texts]
189        return [
190            _augment_to_n_grams(tokens)
191            for tokens in self._sp_inst.encode(texts, out_type=str)
192        ]
193
194    def _compute_bm25plus(
195        self,
196        query: str,
197        documents: list[Document],
198    ) -> list[tuple[Document, float]]:
199        """
200        Calculate the BM25+ score for all documents in this index.
201
202        :param query: Query to calculate the BM25+ score for.
203        :type query: ``str``
204        :param documents: Filtered pool of documents retrieve from.
205        :type documents: ``list[Document]``
206
207        :return: Documents and corresponding BM25+ scores.
208        :rtype: ``list[tuple[Document, float]]``
209        """
210        cnt = lambda ng: self._freq_doc.get(ng, 0)
211        idf = {
212            ng: math.log(1 + (len(self._index) - cnt(ng) + 0.5) / (cnt(ng) + 0.5))
213            for ng in self._tokenize(query)[0]
214        }
215
216        sim = []
217        for doc in documents:
218            _, freq, doc_len = self._index[doc.id]
219            doc_len_scaled = doc_len / self._avg_doc_len
220
221            scr = 0.0
222            for token, idf_val in idf.items():
223                freq_term = freq.get(token, 0.0)
224                freq_damp = self.k * (1 + self.b * (doc_len_scaled - 1))
225
226                tf_val = freq_term / (freq_term + freq_damp) + self.delta
227                scr += idf_val * tf_val
228
229            sim.append((doc, scr))
230
231        return sim
232
233    def _retrieval(
234        self,
235        query: str,
236        *,
237        filters: Optional[dict[str, Any]] = None,
238        top_k: Optional[int] = None,
239    ) -> list[tuple[Document, float]]:
240        """
241        Retrieve documents from the store using the given query.
242
243        :param query: Query to search for.
244        :type query: ``str``
245        :param filters: Filters to apply to the document list.
246        :type filters: ``Optional[dict[str, Any]]``
247        :param top_k: Number of documents to return.
248        :type top_k: ``int``
249
250        :return: Top ``k`` documents and corresponding BM25+ scores.
251        :rtype: ``list[tuple[Document, float]]``
252        """
253        documents = self.filter_documents(filters)
254        if not documents:
255            return []
256
257        sim = self._compute_bm25plus(query, documents)
258        if top_k is None:
259            return sorted(sim, key=lambda x: x[1], reverse=True)
260        return heapq.nlargest(top_k, sim, key=lambda x: x[1])
261
262    def count_documents(self) -> int:
263        """
264        Returns how many documents are present in this store.
265
266        :return: Number of documents in the store.
267        :rtype: ``int``
268        """
269        return len(self._index.keys())
270
271    def filter_documents(
272        self, filters: Optional[dict[str, Any]] = None
273    ) -> list[Document]:
274        """
275        Filter documents in the store using the given filters.
276
277        :param filters: Filters to apply to the document list.
278        :type filters: ``Optional[dict[str, Any]]``
279
280        :return: List of documents that match the given filters.
281        :rtype: ``list[Document]``
282        """
283        if filters is None or not filters:
284            return [doc for doc, _, _ in self._index.values()]
285        return [
286            doc
287            for doc, _, _ in self._index.values()
288            if self._filter_func(filters, doc)
289        ]
290
291    def write_documents(
292        self,
293        documents: list[Document],
294        policy: DuplicatePolicy = DuplicatePolicy.NONE,
295    ) -> int:
296        """
297        Writes (or overwrites) documents into the store.
298
299        :param documents: List of documents to write.
300        :type documents: ``list[Document]``
301        :param policy: Documents with the same ``Document.id`` count as
302            duplicates. When duplicates are met, the store can:
303             - ``SKIP``: keep the existing document and ignore the new one.
304             - ``OVERWRITE``: remove the old document and write the new one.
305             - ``FAIL``: an error is raised (default behavior if not specified)
306        :type policy: ``Optional[DuplicatePolicy]``
307
308        :raises ValueError: Exception trigger on invalid duplicate policy.
309        :raises DuplicateDocumentError: Exception trigger on duplicate
310            document if ``policy=DuplicatePolicy.FAIL``
311
312        :return: Number of documents written.
313        :rtype: ``int``
314        """
315        if policy not in DuplicatePolicy:
316            msg = f"Invalid duplicate policy: {policy}."
317            raise ValueError(msg)
318
319        if policy == DuplicatePolicy.NONE:
320            policy = DuplicatePolicy.FAIL
321
322        n_written = 0
323        for doc in documents:
324            if not isinstance(doc, Document):
325                msg = f"Expected document type, got '{doc}' of type '{type(doc)}'."
326                raise ValueError(msg)
327
328            if doc.id in self._index.keys():
329                if policy == DuplicatePolicy.SKIP:
330                    continue
331                elif policy == DuplicatePolicy.FAIL:
332                    msg = f"Document with ID '{doc.id}' already exists in the store."
333                    raise DuplicateDocumentError(msg)
334
335                # Overwrite if exists; delete first to keep the statistics consistent
336                logger.debug(
337                    f"Document '{doc.id}' already exists in the store, overwriting."
338                )
339                self.delete_documents([doc.id])
340
341            content = doc.content or ""
342            if content == "" and isinstance(doc.dataframe, pd.DataFrame):
343                content = doc.dataframe.astype(str).to_csv(index=False)
344
345            tokens = self._tokenize(content)[0]
346
347            self._index[doc.id] = (doc, Counter(tokens), len(tokens))
348            self._freq_doc.update(set(tokens))
349            self._avg_doc_len = (
350                len(tokens) + self._avg_doc_len * len(self._index)
351            ) / (len(self._index) + 1)
352
353            logger.debug(f"Document '{doc.id}' written to store.")
354            n_written += 1
355
356        return n_written
357
358    def delete_documents(self, document_ids: list[str]) -> int:
359        """
360        Deletes all documents with a matching ID.
361
362        :param document_ids: List of ``object_id`` to delete
363        :type document_ids: ``list[str]``
364
365        :raises MissingDocumentError: Triggered on document not found.
366
367        :return: Number of documents deleted.
368        :rtype: ``int``
369        """
370        n_removal = 0
371        for doc_id in document_ids:
372            try:
373                _, freq, doc_len = self._index.pop(doc_id)
374                self._freq_doc.subtract(Counter(freq.keys()))
375                try:
376                    self._avg_doc_len = (
377                        self._avg_doc_len * (len(self._index) + 1) - doc_len
378                    ) / len(self._index)
379                except ZeroDivisionError:
380                    self._avg_doc_len = 0
381
382                logger.debug(f"Document '{doc_id}' deleted from store.")
383                n_removal += 1
384            except KeyError as exc:
385                msg = f"Document with ID '{doc_id}' not found, cannot delete it."
386                raise MissingDocumentError(msg) from exc
387
388        return n_removal
389
390    def to_dict(self) -> dict[str, Any]:
391        """Serializes this store to a dictionary."""
392        return default_to_dict(
393            self,
394            k=self.k,
395            b=self.b,
396            delta=self.delta * (self.k + 1.0),  # Because we scaled it on init
397            sp_file=self._sp_file,
398            n_grams=self._n_grams,
399            haystack_filter_logic=self._haystack_filter_logic,
400        )
401
402    @classmethod
403    def from_dict(cls, data: dict[str, Any]) -> "BetterBM25DocumentStore":
404        """Deserializes the store from a dictionary."""
405        return default_from_dict(cls, data)

An in-memory BM25 document store intended to improve the default InMemoryDocumentStore shipped with Haystack.

BetterBM25DocumentStore( *, k: float = 1.5, b: float = 0.75, delta: float = 1.0, sp_file: Optional[str] = None, n_grams: Union[int, tuple[int, int]] = 1, haystack_filter_logic: bool = True)
 62    def __init__(
 63        self,
 64        *,
 65        k: float = 1.5,
 66        b: float = 0.75,
 67        delta: float = 1.0,
 68        sp_file: Optional[str] = None,
 69        n_grams: Union[int, tuple[int, int]] = 1,
 70        haystack_filter_logic: bool = True,
 71    ) -> None:
 72        """
 73        Creates a new ``BetterBM25DocumentStore`` instance.
 74
 75        :param k: k1 parameter in BM25+ formula.
 76        :type k: ``Optional[float]``
 77        :param b: b parameter in BM25+ formula.
 78        :type b: ``Optional[float]``
 79        :param delta: delta parameter in BM25+ formula.
 80        :type delta: ``Optional[float]``
 81        :param sp_file: ``SentencePiece`` tokenizer ``.model`` file to
 82            use. A default from LLaMA-2-32K is used if not provided.
 83        :type sp_file: ``Optional[str]``
 84        :param n_grams: The n-gram window size. Can be a range of n-grams
 85            to include in text representation. If a single integer is
 86            provided, it will be treated as the maximum n-gram window size,
 87            which is equivalent to ``(1, n_grams)``.
 88        :type n_grams: ``Optional[Union[int, tuple[int, int]]]``
 89        :param haystack_filter_logic: Whether to use the Haystack filter
 90            logic or the one implemented in this store.
 91        :type haystack_filter_logic: ``Optional[bool]``
 92        """
 93        self.k = k
 94        """@private"""
 95
 96        self.b = b
 97        """@private"""
 98
 99        self.delta = delta / (self.k + 1.0)
100        """@private
101
102        Adjust the delta value so that we can bring the ``(k1 + 1)``
103        term out of the 'term frequency' term in BM25+ formula and
104        delete it; this will not affect the ranking.
105        """
106
107        self._parse_sp_file(sp_file=sp_file)
108        self._parse_n_grams(n_grams=n_grams)
109
110        self._haystack_filter_logic = haystack_filter_logic
111        self._filter_func = (
112            document_matches_filter
113            if self._haystack_filter_logic
114            else apply_filters_to_document
115        )
116
117        self._avg_doc_len: float = 0.0
118        self._freq_doc: Counter = Counter()
119        self._index: dict[str, tuple[Document, dict[tuple[str], int], int]] = {}

Creates a new BetterBM25DocumentStore instance.

Parameters
  • k: k1 parameter in BM25+ formula.
  • b: b parameter in BM25+ formula.
  • delta: delta parameter in BM25+ formula.
  • sp_file: SentencePiece tokenizer .model file to use. A default from LLaMA-2-32K is used if not provided.
  • n_grams: The n-gram window size. Can be a range of n-grams to include in text representation. If a single integer is provided, it will be treated as the maximum n-gram window size, which is equivalent to (1, n_grams).
  • haystack_filter_logic: Whether to use the Haystack filter logic or the one implemented in this store.
def count_documents(self) -> int:
262    def count_documents(self) -> int:
263        """
264        Returns how many documents are present in this store.
265
266        :return: Number of documents in the store.
267        :rtype: ``int``
268        """
269        return len(self._index.keys())

Returns how many documents are present in this store.

Returns

Number of documents in the store.

def filter_documents( self, filters: Optional[dict[str, Any]] = None) -> list[haystack.dataclasses.document.Document]:
271    def filter_documents(
272        self, filters: Optional[dict[str, Any]] = None
273    ) -> list[Document]:
274        """
275        Filter documents in the store using the given filters.
276
277        :param filters: Filters to apply to the document list.
278        :type filters: ``Optional[dict[str, Any]]``
279
280        :return: List of documents that match the given filters.
281        :rtype: ``list[Document]``
282        """
283        if filters is None or not filters:
284            return [doc for doc, _, _ in self._index.values()]
285        return [
286            doc
287            for doc, _, _ in self._index.values()
288            if self._filter_func(filters, doc)
289        ]

Filter documents in the store using the given filters.

Parameters
  • filters: Filters to apply to the document list.
Returns

List of documents that match the given filters.

def write_documents( self, documents: list[haystack.dataclasses.document.Document], policy: haystack.document_stores.types.policy.DuplicatePolicy = <DuplicatePolicy.NONE: 'none'>) -> int:
291    def write_documents(
292        self,
293        documents: list[Document],
294        policy: DuplicatePolicy = DuplicatePolicy.NONE,
295    ) -> int:
296        """
297        Writes (or overwrites) documents into the store.
298
299        :param documents: List of documents to write.
300        :type documents: ``list[Document]``
301        :param policy: Documents with the same ``Document.id`` count as
302            duplicates. When duplicates are met, the store can:
303             - ``SKIP``: keep the existing document and ignore the new one.
304             - ``OVERWRITE``: remove the old document and write the new one.
305             - ``FAIL``: an error is raised (default behavior if not specified)
306        :type policy: ``Optional[DuplicatePolicy]``
307
308        :raises ValueError: Exception trigger on invalid duplicate policy.
309        :raises DuplicateDocumentError: Exception trigger on duplicate
310            document if ``policy=DuplicatePolicy.FAIL``
311
312        :return: Number of documents written.
313        :rtype: ``int``
314        """
315        if policy not in DuplicatePolicy:
316            msg = f"Invalid duplicate policy: {policy}."
317            raise ValueError(msg)
318
319        if policy == DuplicatePolicy.NONE:
320            policy = DuplicatePolicy.FAIL
321
322        n_written = 0
323        for doc in documents:
324            if not isinstance(doc, Document):
325                msg = f"Expected document type, got '{doc}' of type '{type(doc)}'."
326                raise ValueError(msg)
327
328            if doc.id in self._index.keys():
329                if policy == DuplicatePolicy.SKIP:
330                    continue
331                elif policy == DuplicatePolicy.FAIL:
332                    msg = f"Document with ID '{doc.id}' already exists in the store."
333                    raise DuplicateDocumentError(msg)
334
335                # Overwrite if exists; delete first to keep the statistics consistent
336                logger.debug(
337                    f"Document '{doc.id}' already exists in the store, overwriting."
338                )
339                self.delete_documents([doc.id])
340
341            content = doc.content or ""
342            if content == "" and isinstance(doc.dataframe, pd.DataFrame):
343                content = doc.dataframe.astype(str).to_csv(index=False)
344
345            tokens = self._tokenize(content)[0]
346
347            self._index[doc.id] = (doc, Counter(tokens), len(tokens))
348            self._freq_doc.update(set(tokens))
349            self._avg_doc_len = (
350                len(tokens) + self._avg_doc_len * len(self._index)
351            ) / (len(self._index) + 1)
352
353            logger.debug(f"Document '{doc.id}' written to store.")
354            n_written += 1
355
356        return n_written

Writes (or overwrites) documents into the store.

Parameters
  • documents: List of documents to write.
  • policy: Documents with the same Document.id count as duplicates. When duplicates are met, the store can:
    • SKIP: keep the existing document and ignore the new one.
    • OVERWRITE: remove the old document and write the new one.
    • FAIL: an error is raised (default behavior if not specified)
Raises
  • ValueError: Exception trigger on invalid duplicate policy.
  • DuplicateDocumentError: Exception trigger on duplicate document if policy=DuplicatePolicy.FAIL
Returns

Number of documents written.

def delete_documents(self, document_ids: list[str]) -> int:
358    def delete_documents(self, document_ids: list[str]) -> int:
359        """
360        Deletes all documents with a matching ID.
361
362        :param document_ids: List of ``object_id`` to delete
363        :type document_ids: ``list[str]``
364
365        :raises MissingDocumentError: Triggered on document not found.
366
367        :return: Number of documents deleted.
368        :rtype: ``int``
369        """
370        n_removal = 0
371        for doc_id in document_ids:
372            try:
373                _, freq, doc_len = self._index.pop(doc_id)
374                self._freq_doc.subtract(Counter(freq.keys()))
375                try:
376                    self._avg_doc_len = (
377                        self._avg_doc_len * (len(self._index) + 1) - doc_len
378                    ) / len(self._index)
379                except ZeroDivisionError:
380                    self._avg_doc_len = 0
381
382                logger.debug(f"Document '{doc_id}' deleted from store.")
383                n_removal += 1
384            except KeyError as exc:
385                msg = f"Document with ID '{doc_id}' not found, cannot delete it."
386                raise MissingDocumentError(msg) from exc
387
388        return n_removal

Deletes all documents with a matching ID.

Parameters
  • document_ids: List of object_id to delete
Raises
  • MissingDocumentError: Triggered on document not found.
Returns

Number of documents deleted.

def to_dict(self) -> dict[str, typing.Any]:
390    def to_dict(self) -> dict[str, Any]:
391        """Serializes this store to a dictionary."""
392        return default_to_dict(
393            self,
394            k=self.k,
395            b=self.b,
396            delta=self.delta * (self.k + 1.0),  # Because we scaled it on init
397            sp_file=self._sp_file,
398            n_grams=self._n_grams,
399            haystack_filter_logic=self._haystack_filter_logic,
400        )

Serializes this store to a dictionary.

@classmethod
def from_dict( cls, data: dict[str, typing.Any]) -> BetterBM25DocumentStore:
402    @classmethod
403    def from_dict(cls, data: dict[str, Any]) -> "BetterBM25DocumentStore":
404        """Deserializes the store from a dictionary."""
405        return default_from_dict(cls, data)

Deserializes the store from a dictionary.

@component
class BetterBM25Retriever:
 44@component
 45class BetterBM25Retriever:
 46    """
 47    A component for retrieving documents from a ``BetterBM25DocumentStore``.
 48    """
 49
 50    def __init__(
 51        self,
 52        document_store: BetterBM25DocumentStore,
 53        *,
 54        filters: Optional[dict[str, Any]] = None,
 55        top_k: int = 10,
 56        set_score: bool = True,
 57    ) -> None:
 58        """
 59        Create a ``BetterBM25Retriever`` component.
 60
 61        :param document_store: A ``BetterBM25DocumentStore`` instance.
 62        :type document_store: ``BetterBM25DocumentStore``
 63        :param filters: Haystack filters, a dictionary with filters to
 64            narrow down the search space. The filters are applied
 65            **before** similarity retrieval.
 66        :type filters: ``Optional[dict[str, Any]]``
 67        :param top_k: The maximum number of documents to return.
 68        :type top_k: ``int``
 69        :param set_score: Whether to set the similarity scores to returned
 70            documents under ``Document.score`` attribute. This is useful in
 71            hybrid retrieval setting where you may want to merge results.
 72            Note that returned documents are **copies** so that the original
 73            instances in the document store are not modified.
 74        :type set_score: ``bool``
 75
 76        :raises ValueError: If the ``filters`` or ``top_k`` is invalid.
 77        :raises TypeError: If the ``document_store`` is not an instance of
 78            ``BetterBM25DocumentStore``.
 79        """
 80        _validate_search_params(filters, top_k)
 81
 82        self.filters = filters
 83        """@private"""
 84
 85        self.top_k = top_k
 86        """@private"""
 87
 88        self.set_score = set_score
 89        """@private"""
 90
 91        if not isinstance(document_store, BetterBM25DocumentStore):
 92            msg = "'document_store' must of type 'BetterBM25DocumentStore'"
 93            raise TypeError(msg)
 94
 95        self.document_store = document_store
 96        """@private"""
 97
 98    @component.output_types(documents=list[Document])
 99    def run(
100        self,
101        query: str,
102        *,
103        filters: Optional[dict[str, Any]] = None,
104        top_k: Optional[int] = None,
105    ) -> dict[str, list[Document]]:
106        """
107        Run the Retriever on the given query. This method always return
108        copies of the documents retrieved from the document store.
109
110        :param query: The text search term.
111        :type query: ``str``
112        :param filters: Haystack filters, a dictionary with filters to
113            narrow down the search space. The filters are applied
114            **before** similarity retrieval.
115        :type filters: ``Optional[dict[str, Any]]``
116        :param top_k: The maximum number of documents to return.
117        :type top_k: ``Optional[int]``
118
119        :return: The retrieved documents in a dictionary with key "documents".
120        """
121        filters = filters or self.filters
122        top_k = top_k or self.top_k
123
124        _validate_search_params(filters, top_k)
125
126        sim = self.document_store._retrieval(query, filters=filters, top_k=top_k)
127
128        ret = []
129        for doc, scr in sim:
130            data = doc.to_dict()
131            if self.set_score:
132                data["score"] = scr
133            ret.append(Document.from_dict(data))
134
135        return {"documents": ret}
136
137    def to_dict(self) -> dict[str, Any]:
138        """Serializes the component to a dictionary."""
139        return default_to_dict(
140            self,
141            filters=self.filters,
142            top_k=self.top_k,
143            document_store=self.document_store.to_dict(),
144            set_score=self.set_score,
145        )
146
147    @classmethod
148    def from_dict(cls, data: dict[str, Any]) -> "BetterBM25Retriever":
149        """Deserializes the retriever from a dictionary."""
150        doc_store_params = data["init_parameters"].get("document_store")
151        if doc_store_params is None:
152            msg = "Missing 'document_store' in serialization data"
153            raise DeserializationError(msg)
154
155        if doc_store_params.get("type") is None:
156            msg = "Missing 'type' in document store's serialization data"
157            raise DeserializationError(msg)
158
159        data["init_parameters"]["document_store"] = (
160            BetterBM25DocumentStore.from_dict(doc_store_params)
161        )
162        return default_from_dict(cls, data)

A component for retrieving documents from a BetterBM25DocumentStore.

BetterBM25Retriever( document_store: BetterBM25DocumentStore, *, filters: Optional[dict[str, Any]] = None, top_k: int = 10, set_score: bool = True)
50    def __init__(
51        self,
52        document_store: BetterBM25DocumentStore,
53        *,
54        filters: Optional[dict[str, Any]] = None,
55        top_k: int = 10,
56        set_score: bool = True,
57    ) -> None:
58        """
59        Create a ``BetterBM25Retriever`` component.
60
61        :param document_store: A ``BetterBM25DocumentStore`` instance.
62        :type document_store: ``BetterBM25DocumentStore``
63        :param filters: Haystack filters, a dictionary with filters to
64            narrow down the search space. The filters are applied
65            **before** similarity retrieval.
66        :type filters: ``Optional[dict[str, Any]]``
67        :param top_k: The maximum number of documents to return.
68        :type top_k: ``int``
69        :param set_score: Whether to set the similarity scores to returned
70            documents under ``Document.score`` attribute. This is useful in
71            hybrid retrieval setting where you may want to merge results.
72            Note that returned documents are **copies** so that the original
73            instances in the document store are not modified.
74        :type set_score: ``bool``
75
76        :raises ValueError: If the ``filters`` or ``top_k`` is invalid.
77        :raises TypeError: If the ``document_store`` is not an instance of
78            ``BetterBM25DocumentStore``.
79        """
80        _validate_search_params(filters, top_k)
81
82        self.filters = filters
83        """@private"""
84
85        self.top_k = top_k
86        """@private"""
87
88        self.set_score = set_score
89        """@private"""
90
91        if not isinstance(document_store, BetterBM25DocumentStore):
92            msg = "'document_store' must of type 'BetterBM25DocumentStore'"
93            raise TypeError(msg)
94
95        self.document_store = document_store
96        """@private"""

Create a BetterBM25Retriever component.

Parameters
  • document_store: A BetterBM25DocumentStore instance.
  • filters: Haystack filters, a dictionary with filters to narrow down the search space. The filters are applied before similarity retrieval.
  • top_k: The maximum number of documents to return.
  • set_score: Whether to set the similarity scores to returned documents under Document.score attribute. This is useful in hybrid retrieval setting where you may want to merge results. Note that returned documents are copies so that the original instances in the document store are not modified.
Raises
  • ValueError: If the filters or top_k is invalid.
  • TypeError: If the document_store is not an instance of BetterBM25DocumentStore.
@component.output_types(documents=list[Document])
def run( self, query: str, *, filters: Optional[dict[str, Any]] = None, top_k: Optional[int] = None) -> dict[str, list[haystack.dataclasses.document.Document]]:
 98    @component.output_types(documents=list[Document])
 99    def run(
100        self,
101        query: str,
102        *,
103        filters: Optional[dict[str, Any]] = None,
104        top_k: Optional[int] = None,
105    ) -> dict[str, list[Document]]:
106        """
107        Run the Retriever on the given query. This method always return
108        copies of the documents retrieved from the document store.
109
110        :param query: The text search term.
111        :type query: ``str``
112        :param filters: Haystack filters, a dictionary with filters to
113            narrow down the search space. The filters are applied
114            **before** similarity retrieval.
115        :type filters: ``Optional[dict[str, Any]]``
116        :param top_k: The maximum number of documents to return.
117        :type top_k: ``Optional[int]``
118
119        :return: The retrieved documents in a dictionary with key "documents".
120        """
121        filters = filters or self.filters
122        top_k = top_k or self.top_k
123
124        _validate_search_params(filters, top_k)
125
126        sim = self.document_store._retrieval(query, filters=filters, top_k=top_k)
127
128        ret = []
129        for doc, scr in sim:
130            data = doc.to_dict()
131            if self.set_score:
132                data["score"] = scr
133            ret.append(Document.from_dict(data))
134
135        return {"documents": ret}

Run the Retriever on the given query. This method always return copies of the documents retrieved from the document store.

Parameters
  • query: The text search term.
  • filters: Haystack filters, a dictionary with filters to narrow down the search space. The filters are applied before similarity retrieval.
  • top_k: The maximum number of documents to return.
Returns

The retrieved documents in a dictionary with key "documents".

def to_dict(self) -> dict[str, typing.Any]:
137    def to_dict(self) -> dict[str, Any]:
138        """Serializes the component to a dictionary."""
139        return default_to_dict(
140            self,
141            filters=self.filters,
142            top_k=self.top_k,
143            document_store=self.document_store.to_dict(),
144            set_score=self.set_score,
145        )

Serializes the component to a dictionary.

@classmethod
def from_dict( cls, data: dict[str, typing.Any]) -> BetterBM25Retriever:
147    @classmethod
148    def from_dict(cls, data: dict[str, Any]) -> "BetterBM25Retriever":
149        """Deserializes the retriever from a dictionary."""
150        doc_store_params = data["init_parameters"].get("document_store")
151        if doc_store_params is None:
152            msg = "Missing 'document_store' in serialization data"
153            raise DeserializationError(msg)
154
155        if doc_store_params.get("type") is None:
156            msg = "Missing 'type' in document store's serialization data"
157            raise DeserializationError(msg)
158
159        data["init_parameters"]["document_store"] = (
160            BetterBM25DocumentStore.from_dict(doc_store_params)
161        )
162        return default_from_dict(cls, data)

Deserializes the retriever from a dictionary.

def apply_filters_to_document( filters: Optional[dict[str, Any]], document: haystack.dataclasses.document.Document) -> bool:
14def apply_filters_to_document(
15    filters: Optional[dict[str, Any]], document: Document
16) -> bool:
17    """
18    Apply filters to a document. Differences with the official
19    Haystack implementation:
20
21    - Comparison with ``None``, i.e., missing values, involved will
22        always return ``False``, no matter missing the document
23        attribute value or missing the filter value.
24    - Comparison with ``pandas.DataFrame`` is always prohibited to
25        reduce surprises.
26    - No implicit ``datetime`` conversion from string values.
27    - ``in`` and ``not in`` allows any ``Iterable`` as filter value,
28        without the ``list`` constraint.
29
30    :param filters: The filters to apply to the document.
31    :type filters: ``dict[str, Any]``
32    :param document: The document to apply the filters to.
33    :type document: ``Document``
34
35    :return: ``True`` if the document passes the filters.
36    :rtype: ``bool``
37    """
38    if filters is None or not filters:
39        return True
40    return _run_comparison_condition(filters, document)

Apply filters to a document. Differences with the official Haystack implementation:

  • Comparison with None, i.e., missing values, involved will always return False, no matter missing the document attribute value or missing the filter value.
  • Comparison with pandas.DataFrame is always prohibited to reduce surprises.
  • No implicit datetime conversion from string values.
  • in and not in allows any Iterable as filter value, without the list constraint.
Parameters
  • filters: The filters to apply to the document.
  • document: The document to apply the filters to.
Returns

True if the document passes the filters.