Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/entities/entity.py: 90%

15 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 23:45 -0600

1"""Base Entity Class""" 

2 

3import typing as t 

4from ..helpers.es_api import find_write_index, get_ds_current, resolver 

5 

6if t.TYPE_CHECKING: 6 ↛ 7line 6 didn't jump to line 7, because the condition on line 6 was never true

7 from elasticsearch8 import Elasticsearch 

8 

9# pylint: disable=missing-docstring,too-many-arguments,R0903 

10 

11 

12class Entity: 

13 

14 def __init__( 

15 self, 

16 client: 'Elasticsearch', 

17 name: t.Union[str, None] = None, 

18 ): 

19 self.client = client 

20 self.name = name # This will change with entity name changes 

21 self.aka = [] # Aliases, in other words 

22 

23 @property 

24 def am_i_write_idx(self) -> bool: 

25 if self.name.startswith('.ds-'): # Datastream 

26 ds = resolver(self.client, self.name)['indices'][0]['data_stream'] 

27 return bool(self.name == get_ds_current(self.client, ds)) 

28 return bool(self.name == find_write_index(self.client, self.name))