Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/mgrs/indexmgr.py: 92%
77 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:45 -0600
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:45 -0600
1"""Index Entity Manager Class"""
3import typing as t
4from .entitymgr import EntityMgr
5from .snapshotmgr import SnapshotMgr
6from ..entities import Alias, Index
7from ..helpers.es_api import create_index, fill_index
8from ..helpers.utils import getlogger, setting_component
11if t.TYPE_CHECKING: 11 ↛ 12line 11 didn't jump to line 12, because the condition on line 11 was never true
12 from elasticsearch8 import Elasticsearch
13 from dotmap import DotMap
15# pylint: disable=missing-docstring
18class IndexMgr(EntityMgr):
19 kind = 'index'
20 listname = 'indices'
22 def __init__(
23 self,
24 client: t.Union['Elasticsearch', None] = None,
25 plan: t.Union['DotMap', None] = None,
26 autobuild: t.Optional[bool] = True,
27 snapmgr: t.Union[SnapshotMgr, None] = None,
28 ):
29 self.doc_incr = 0
30 self.snapmgr = snapmgr
31 self.alias = None # Only used for tracking the rollover alias
32 self.logger = getlogger('es_testbed.IndexMgr')
33 super().__init__(client=client, plan=plan, autobuild=autobuild)
35 @property
36 def indexlist(self) -> t.Sequence[str]:
37 """Return a list of index names currently being managed"""
38 return [x.name for x in self.entity_list]
40 @property
41 def policy_name(self) -> t.Union[str, None]:
42 """Return the name of the ILM policy, if it exists"""
43 if len(self.plan.ilm_policies) > 0: 43 ↛ 45line 43 didn't jump to line 45, because the condition on line 43 was never false
44 return self.plan.ilm_policies[-1]
45 return None
47 def _rollover_path(self) -> None:
48 """This is the execution path for rollover indices"""
49 if not self.entity_list:
50 kw = {
51 'ilm_policy': self.policy_name,
52 'rollover_alias': self.plan.rollover_alias,
53 }
54 cfg = setting_component(**kw)['settings']
55 acfg = {self.plan.rollover_alias: {'is_write_index': True}}
56 self.logger.debug(
57 'No indices created yet. Starting with a rollover alias index...'
58 )
59 create_index(self.client, self.name, aliases=acfg, settings=cfg)
60 self.logger.debug(
61 'Created %s with rollover alias %s', self.name, self.plan.rollover_alias
62 )
63 self.track_alias()
64 else:
65 self.alias.rollover()
66 if self.policy_name: # We have an ILM policy
67 self.logger.debug('Going to wait now...')
68 self.last.ilm_tracker.wait4complete()
69 self.logger.debug('The wait is over!')
71 def add(self, value) -> None:
72 """Create a single index"""
73 # In this case, value is a single array element from plan.entities
74 self.logger.debug('Creating index: %s', value)
75 create_index(self.client, value)
77 def add_indices(self) -> None:
78 """Add indices according to plan"""
79 for scheme in self.plan.entities:
80 if self.plan.rollover_alias:
81 self._rollover_path()
82 else:
83 self.add(self.name)
84 self.filler(scheme)
85 self.track_index(self.name)
86 self.logger.debug('Created indices: %s', self.indexlist)
87 if self.plan.rollover_alias:
88 if not self.alias.verify(self.indexlist): 88 ↛ 89line 88 didn't jump to line 89, because the condition on line 88 was never true
89 self.logger.error(
90 'Unable to confirm rollover of alias "%s" was successfully executed'
91 )
93 def filler(self, scheme) -> None:
94 """If the scheme from the TestPlan says to write docs, do it"""
95 # scheme is a single array element from plan.entities
96 self.logger.debug('Adding docs to %s', self.name)
97 if scheme['docs'] > 0: 97 ↛ 105line 97 didn't jump to line 105, because the condition on line 97 was never false
98 fill_index(
99 self.client,
100 name=self.name,
101 count=scheme['docs'],
102 start_num=self.doc_incr,
103 match=scheme['match'],
104 )
105 self.doc_incr += scheme['docs']
107 def searchable(self) -> None:
108 """If the indices were marked as searchable snapshots, we do that now"""
109 for idx, scheme in enumerate(self.plan.entities):
110 self.entity_list[idx].mount_ss(scheme)
112 def setup(self) -> None:
113 """Setup the entity manager"""
114 self.logger.debug('Beginning setup...')
115 if self.plan.rollover_alias:
116 self.logger.debug('rollover_alias is True...')
117 self.add_indices()
118 self.searchable()
119 self.logger.info('Successfully created indices: %s', self.indexlist)
120 self.success = True
122 def track_alias(self) -> None:
123 """Track a rollover alias"""
124 self.logger.debug('Tracking alias: %s', self.plan.rollover_alias)
125 self.alias = Alias(client=self.client, name=self.plan.rollover_alias)
127 def track_index(self, name: str) -> None:
128 """Track an index and append that tracking entity to entity_list"""
129 self.logger.debug('Tracking index: %s', name)
130 entity = Index(
131 client=self.client,
132 name=name,
133 snapmgr=self.snapmgr,
134 policy_name=self.policy_name,
135 )
136 self.entity_list.append(entity)