Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/_base.py: 83%
119 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:52 -0600
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:52 -0600
1"""Base TestBed Class"""
3import typing as t
4from datetime import datetime, timezone
5from dotmap import DotMap
6from .exceptions import ResultNotExpected
7from .defaults import NAMEMAPPER
8from .helpers.es_api import delete, get
9from .helpers.utils import getlogger
10from ._plan import PlanBuilder
11from .mgrs import (
12 ComponentMgr,
13 DataStreamMgr,
14 IlmMgr,
15 IndexMgr,
16 SnapshotMgr,
17 TemplateMgr,
18)
20if t.TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true
21 from elasticsearch8 import Elasticsearch
23# pylint: disable=R0902,W0719
26class TestBed:
27 """TestBed Class"""
29 __test__ = False
31 def __init__(
32 self,
33 client: 'Elasticsearch' = None,
34 plan: DotMap = None,
35 autobuild: t.Optional[bool] = False,
36 ):
37 self.logger = getlogger('es_testbed.TestBed')
38 self.client = client
39 if plan is None: 39 ↛ 40line 39 didn't jump to line 40, because the condition on line 39 was never true
40 plan = PlanBuilder().plan # Use defaults
41 self.plan = plan
43 # Set up for tracking
44 self.ilmmgr = None
45 self.componentmgr = None
46 self.templatemgr = None
47 self.snapshotmgr = None
48 self.indexmgr = None
49 self.data_streammgr = None
51 if autobuild: 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true
52 self.setup()
54 def _erase(self, kind: str, lst: t.Sequence[str]) -> None:
55 overall_success = True
56 if not lst:
57 self.logger.debug('%s: nothing to delete.', kind)
58 return True
59 if kind == 'ilm': # ILM policies can't be batch deleted
60 ilm = [self._while(kind, x) for x in lst]
61 overall_success = False not in ilm # No False values == True
62 else:
63 overall_success = self._while(kind, ','.join(lst))
64 return overall_success
66 def _fodder_generator(
67 self,
68 ) -> t.Generator[str, t.Sequence[str], None]:
69 """Method to delete everything matching our pattern(s)"""
70 items = ['index', 'data_stream', 'snapshot', 'template', 'component', 'ilm']
71 for i in items:
72 if i == 'snapshot' and self.plan.repository is None: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true
73 self.logger.debug('No repository, no snapshots.')
74 continue
75 # self.logger.debug('Generating a list of type "%s"', i)
76 pattern = f'*{self.plan.prefix}-{NAMEMAPPER[i]}-{self.plan.uniq}*'
77 entities = get(self.client, i, pattern, repository=self.plan.repository)
78 yield (i, entities)
80 def _while(self, kind: str, item: str) -> bool:
81 count = 1
82 success = False
83 exc = None
84 while count < 4 and not success: 84 ↛ 94line 84 didn't jump to line 94, because the condition on line 84 was never false
85 try:
86 success = delete(
87 self.client, kind, item, repository=self.plan.repository
88 )
89 break
90 except ResultNotExpected as err:
91 self.logger.debug('Tried deleting "%s" %s time(s)', item, count)
92 exc = err
93 count += 1
94 if not success: 94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true
95 self.logger.warning(
96 'Failed to delete "%s" after %s tries. Final error: %s',
97 item,
98 count - 1,
99 exc,
100 )
101 return success
103 def get_ilm_polling(self) -> None:
104 """
105 Get current ILM polling settings and store them in self.plan.polling_interval
106 """
107 self.logger.info('Storing current ILM polling settings, if any...')
108 try:
109 res = self.client.cluster.get_settings()
110 self.logger.debug('Cluster settings: %s', res)
111 except Exception as err:
112 self.logger.critical('Unable to get persistent cluster settings')
113 self.logger.critical('This could be permissions, or something larger.')
114 self.logger.critical('Exception: %s', err)
115 self.logger.critical('Exiting.')
116 raise Exception from err
117 try:
118 retval = res['persistent']['indices']['lifecycle']['poll_interval']
119 except KeyError:
120 self.logger.debug(
121 'No setting for indices.lifecycle.poll_interval. Must be default'
122 )
123 retval = None # Must be an actual value to go into a DotMap
124 if retval == '1s': 124 ↛ 125line 124 didn't jump to line 125
125 msg = (
126 'ILM polling already set at 1s. A previous run most likely did not '
127 'tear down properly. Resetting to null after this run'
128 )
129 self.logger.warning(msg)
130 retval = None # Must be an actual value to go into a DotMap
131 self.logger.debug('retval = %s', retval)
132 self.plan.ilm_polling_interval = retval
133 self.logger.info('Stored ILM Polling Interval: %s', retval)
135 def ilm_polling(self, interval: t.Union[str, None] = None) -> t.Dict:
136 """Return persistent cluster settings to speed up ILM polling during testing"""
137 return {'indices.lifecycle.poll_interval': interval}
139 def setup(self) -> None:
140 """Setup the instance"""
141 start = datetime.now(timezone.utc)
142 self.get_ilm_polling()
143 self.logger.info('Setting: %s', self.ilm_polling(interval='1s'))
144 self.client.cluster.put_settings(persistent=self.ilm_polling(interval='1s'))
145 self.setup_entitymgrs()
146 end = datetime.now(timezone.utc)
147 self.logger.info(
148 'Testbed setup elapsed time: %s', (end - start).total_seconds()
149 )
151 def setup_entitymgrs(self) -> None:
152 """
153 Setup each EntityMgr child class
154 """
155 kw = {'client': self.client, 'plan': self.plan}
156 self.ilmmgr = IlmMgr(**kw)
157 self.componentmgr = ComponentMgr(**kw)
158 self.templatemgr = TemplateMgr(**kw)
159 self.snapshotmgr = SnapshotMgr(**kw)
160 if self.plan.type == 'indices':
161 self.indexmgr = IndexMgr(**kw, snapmgr=self.snapshotmgr)
162 if self.plan.type == 'data_stream':
163 self.data_streammgr = DataStreamMgr(**kw, snapmgr=self.snapshotmgr)
165 def teardown(self) -> None:
166 """Tear down anything we created"""
167 start = datetime.now(timezone.utc)
168 successful = True
169 for kind, list_of_kind in self._fodder_generator():
170 if not self._erase(kind, list_of_kind): 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true
171 successful = False
172 persist = self.ilm_polling(interval=self.plan.ilm_polling_interval)
173 self.logger.info(
174 'Restoring ILM polling to previous value: %s',
175 self.plan.ilm_polling_interval,
176 )
177 self.client.cluster.put_settings(persistent=persist)
178 end = datetime.now(timezone.utc)
179 self.logger.info(
180 'Testbed teardown elapsed time: %s', (end - start).total_seconds()
181 )
182 if successful: 182 ↛ 185line 182 didn't jump to line 185, because the condition on line 182 was never false
183 self.logger.info('Cleanup successful')
184 else:
185 self.logger.error('Cleanup was unsuccessful/incomplete')
186 self.plan.cleanup = successful