Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/mgrs/entitymgr.py: 91%
46 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 22:51 -0600
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 22:51 -0600
1"""Entity Class Definition"""
3import typing as t
4from ..defaults import NAMEMAPPER
6if t.TYPE_CHECKING: 6 ↛ 7line 6 didn't jump to line 7, because the condition on line 6 was never true
7 from elasticsearch8 import Elasticsearch
8 from dotmap import DotMap
10# pylint: disable=missing-docstring,broad-exception-caught,too-many-instance-attributes
13class EntityMgr:
14 kind = 'entity_type'
15 listname = 'entity_mgrs'
17 def __init__(
18 self,
19 client: t.Union['Elasticsearch', None] = None,
20 plan: t.Union['DotMap', None] = None,
21 autobuild: t.Optional[bool] = True,
22 ):
23 self.client = client
24 self.plan = plan
25 self.success = False
26 if autobuild:
27 self.setup()
29 @property
30 def entity_list(self) -> t.List:
31 """Return the stored list of entities"""
32 return self.plan[self.listname]
34 @entity_list.setter
35 def entity_list(self, value: t.Sequence) -> None:
36 self.plan[self.listname] = value
38 @property
39 def entity_root(self) -> str:
40 """The entity root name builder"""
41 return f'{self.plan.prefix}-{self.ident()}-{self.plan.uniq}'
43 @property
44 def indexlist(self) -> t.Sequence[str]:
45 """Empty attribute/property waiting to be overridden"""
46 return []
48 @property
49 def last(self) -> str:
50 """Return the most recently appended entity"""
51 return self.entity_list[-1]
53 @property
54 def name(self) -> str:
55 """Return the full, incrementing name of a not yet appended entity"""
56 return f'{self.entity_root}{self.suffix}'
58 @property
59 def pattern(self) -> str:
60 """Return the search pattern for the managed entity"""
61 return f'*{self.entity_root}*'
63 @property
64 def suffix(self) -> str:
65 """Return the incrementing index suffix"""
66 return f'-{len(self.entity_list) + 1:06}'
68 def appender(self, name: str) -> None:
69 """Append an item to entity_list"""
70 self.entity_list.append(name)
72 def ident(self, dkey=None) -> str:
73 """Get the formatted name string of the managed entity"""
74 if not dkey:
75 dkey = self.kind
76 return NAMEMAPPER[dkey]
78 def setup(self) -> None:
79 """Setup the entity manager"""
81 def track_index(self, name: str) -> None:
82 """Track an index and append that tracking entity to entity_list"""