Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/mgrs/indexmgr.py: 92%

77 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 23:45 -0600

1"""Index Entity Manager Class""" 

2 

3import typing as t 

4from .entitymgr import EntityMgr 

5from .snapshotmgr import SnapshotMgr 

6from ..entities import Alias, Index 

7from ..helpers.es_api import create_index, fill_index 

8from ..helpers.utils import getlogger, setting_component 

9 

10 

11if t.TYPE_CHECKING: 11 ↛ 12line 11 didn't jump to line 12, because the condition on line 11 was never true

12 from elasticsearch8 import Elasticsearch 

13 from dotmap import DotMap 

14 

15# pylint: disable=missing-docstring 

16 

17 

18class IndexMgr(EntityMgr): 

19 kind = 'index' 

20 listname = 'indices' 

21 

22 def __init__( 

23 self, 

24 client: t.Union['Elasticsearch', None] = None, 

25 plan: t.Union['DotMap', None] = None, 

26 autobuild: t.Optional[bool] = True, 

27 snapmgr: t.Union[SnapshotMgr, None] = None, 

28 ): 

29 self.doc_incr = 0 

30 self.snapmgr = snapmgr 

31 self.alias = None # Only used for tracking the rollover alias 

32 self.logger = getlogger('es_testbed.IndexMgr') 

33 super().__init__(client=client, plan=plan, autobuild=autobuild) 

34 

35 @property 

36 def indexlist(self) -> t.Sequence[str]: 

37 """Return a list of index names currently being managed""" 

38 return [x.name for x in self.entity_list] 

39 

40 @property 

41 def policy_name(self) -> t.Union[str, None]: 

42 """Return the name of the ILM policy, if it exists""" 

43 if len(self.plan.ilm_policies) > 0: 43 ↛ 45line 43 didn't jump to line 45, because the condition on line 43 was never false

44 return self.plan.ilm_policies[-1] 

45 return None 

46 

47 def _rollover_path(self) -> None: 

48 """This is the execution path for rollover indices""" 

49 if not self.entity_list: 

50 kw = { 

51 'ilm_policy': self.policy_name, 

52 'rollover_alias': self.plan.rollover_alias, 

53 } 

54 cfg = setting_component(**kw)['settings'] 

55 acfg = {self.plan.rollover_alias: {'is_write_index': True}} 

56 self.logger.debug( 

57 'No indices created yet. Starting with a rollover alias index...' 

58 ) 

59 create_index(self.client, self.name, aliases=acfg, settings=cfg) 

60 self.logger.debug( 

61 'Created %s with rollover alias %s', self.name, self.plan.rollover_alias 

62 ) 

63 self.track_alias() 

64 else: 

65 self.alias.rollover() 

66 if self.policy_name: # We have an ILM policy 

67 self.logger.debug('Going to wait now...') 

68 self.last.ilm_tracker.wait4complete() 

69 self.logger.debug('The wait is over!') 

70 

71 def add(self, value) -> None: 

72 """Create a single index""" 

73 # In this case, value is a single array element from plan.entities 

74 self.logger.debug('Creating index: %s', value) 

75 create_index(self.client, value) 

76 

77 def add_indices(self) -> None: 

78 """Add indices according to plan""" 

79 for scheme in self.plan.entities: 

80 if self.plan.rollover_alias: 

81 self._rollover_path() 

82 else: 

83 self.add(self.name) 

84 self.filler(scheme) 

85 self.track_index(self.name) 

86 self.logger.debug('Created indices: %s', self.indexlist) 

87 if self.plan.rollover_alias: 

88 if not self.alias.verify(self.indexlist): 88 ↛ 89line 88 didn't jump to line 89, because the condition on line 88 was never true

89 self.logger.error( 

90 'Unable to confirm rollover of alias "%s" was successfully executed' 

91 ) 

92 

93 def filler(self, scheme) -> None: 

94 """If the scheme from the TestPlan says to write docs, do it""" 

95 # scheme is a single array element from plan.entities 

96 self.logger.debug('Adding docs to %s', self.name) 

97 if scheme['docs'] > 0: 97 ↛ 105line 97 didn't jump to line 105, because the condition on line 97 was never false

98 fill_index( 

99 self.client, 

100 name=self.name, 

101 count=scheme['docs'], 

102 start_num=self.doc_incr, 

103 match=scheme['match'], 

104 ) 

105 self.doc_incr += scheme['docs'] 

106 

107 def searchable(self) -> None: 

108 """If the indices were marked as searchable snapshots, we do that now""" 

109 for idx, scheme in enumerate(self.plan.entities): 

110 self.entity_list[idx].mount_ss(scheme) 

111 

112 def setup(self) -> None: 

113 """Setup the entity manager""" 

114 self.logger.debug('Beginning setup...') 

115 if self.plan.rollover_alias: 

116 self.logger.debug('rollover_alias is True...') 

117 self.add_indices() 

118 self.searchable() 

119 self.logger.info('Successfully created indices: %s', self.indexlist) 

120 self.success = True 

121 

122 def track_alias(self) -> None: 

123 """Track a rollover alias""" 

124 self.logger.debug('Tracking alias: %s', self.plan.rollover_alias) 

125 self.alias = Alias(client=self.client, name=self.plan.rollover_alias) 

126 

127 def track_index(self, name: str) -> None: 

128 """Track an index and append that tracking entity to entity_list""" 

129 self.logger.debug('Tracking index: %s', name) 

130 entity = Index( 

131 client=self.client, 

132 name=name, 

133 snapmgr=self.snapmgr, 

134 policy_name=self.policy_name, 

135 ) 

136 self.entity_list.append(entity)