Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/mgrs/entitymgr.py: 91%

46 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 22:51 -0600

1"""Entity Class Definition""" 

2 

3import typing as t 

4from ..defaults import NAMEMAPPER 

5 

6if t.TYPE_CHECKING: 6 ↛ 7line 6 didn't jump to line 7, because the condition on line 6 was never true

7 from elasticsearch8 import Elasticsearch 

8 from dotmap import DotMap 

9 

10# pylint: disable=missing-docstring,broad-exception-caught,too-many-instance-attributes 

11 

12 

13class EntityMgr: 

14 kind = 'entity_type' 

15 listname = 'entity_mgrs' 

16 

17 def __init__( 

18 self, 

19 client: t.Union['Elasticsearch', None] = None, 

20 plan: t.Union['DotMap', None] = None, 

21 autobuild: t.Optional[bool] = True, 

22 ): 

23 self.client = client 

24 self.plan = plan 

25 self.success = False 

26 if autobuild: 

27 self.setup() 

28 

29 @property 

30 def entity_list(self) -> t.List: 

31 """Return the stored list of entities""" 

32 return self.plan[self.listname] 

33 

34 @entity_list.setter 

35 def entity_list(self, value: t.Sequence) -> None: 

36 self.plan[self.listname] = value 

37 

38 @property 

39 def entity_root(self) -> str: 

40 """The entity root name builder""" 

41 return f'{self.plan.prefix}-{self.ident()}-{self.plan.uniq}' 

42 

43 @property 

44 def indexlist(self) -> t.Sequence[str]: 

45 """Empty attribute/property waiting to be overridden""" 

46 return [] 

47 

48 @property 

49 def last(self) -> str: 

50 """Return the most recently appended entity""" 

51 return self.entity_list[-1] 

52 

53 @property 

54 def name(self) -> str: 

55 """Return the full, incrementing name of a not yet appended entity""" 

56 return f'{self.entity_root}{self.suffix}' 

57 

58 @property 

59 def pattern(self) -> str: 

60 """Return the search pattern for the managed entity""" 

61 return f'*{self.entity_root}*' 

62 

63 @property 

64 def suffix(self) -> str: 

65 """Return the incrementing index suffix""" 

66 return f'-{len(self.entity_list) + 1:06}' 

67 

68 def appender(self, name: str) -> None: 

69 """Append an item to entity_list""" 

70 self.entity_list.append(name) 

71 

72 def ident(self, dkey=None) -> str: 

73 """Get the formatted name string of the managed entity""" 

74 if not dkey: 

75 dkey = self.kind 

76 return NAMEMAPPER[dkey] 

77 

78 def setup(self) -> None: 

79 """Setup the entity manager""" 

80 

81 def track_index(self, name: str) -> None: 

82 """Track an index and append that tracking entity to entity_list"""