Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/helpers/es_api.py: 73%
232 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:52 -0600
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:52 -0600
1"""Functions that make Elasticsearch API Calls"""
3import typing as t
4from os import getenv
5from elasticsearch8.exceptions import NotFoundError
6from es_wait import Exists, Snapshot
7from ..defaults import MAPPING, PAUSE_DEFAULT, PAUSE_ENVVAR
8from ..exceptions import (
9 NameChanged,
10 ResultNotExpected,
11 TestbedFailure,
12 TestbedMisconfig,
13 TimeoutException,
14)
15from ..helpers.utils import (
16 doc_gen,
17 get_routing,
18 getlogger,
19 mounted_name,
20 storage_type,
21)
23if t.TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24, because the condition on line 23 was never true
24 from elasticsearch8 import Elasticsearch
26LOGGER = getlogger(__name__)
27PAUSE_VALUE = float(getenv(PAUSE_ENVVAR, default=PAUSE_DEFAULT))
28# pylint: disable=broad-except,W0707
31def emap(kind: str, es: 'Elasticsearch', value=None) -> t.Dict[str, t.Any]:
32 """Return a value from a dictionary"""
33 _ = {
34 'alias': {
35 'delete': es.indices.delete_alias,
36 'exists': es.indices.exists_alias,
37 'get': es.indices.get_alias,
38 'kwargs': {'index': value, 'expand_wildcards': ['open', 'closed']},
39 'plural': 'alias(es)',
40 },
41 'data_stream': {
42 'delete': es.indices.delete_data_stream,
43 'exists': es.indices.exists,
44 'get': es.indices.get_data_stream,
45 'kwargs': {'name': value, 'expand_wildcards': ['open', 'closed']},
46 'plural': 'data_stream(s)',
47 'key': 'data_streams',
48 },
49 'index': {
50 'delete': es.indices.delete,
51 'exists': es.indices.exists,
52 'get': es.indices.get,
53 'kwargs': {'index': value, 'expand_wildcards': ['open', 'closed']},
54 'plural': 'index(es)',
55 },
56 'template': {
57 'delete': es.indices.delete_index_template,
58 'exists': es.indices.exists_index_template,
59 'get': es.indices.get_index_template,
60 'kwargs': {'name': value},
61 'plural': 'index template(s)',
62 'key': 'index_templates',
63 },
64 'ilm': {
65 'delete': es.ilm.delete_lifecycle,
66 'exists': es.ilm.get_lifecycle,
67 'get': es.ilm.get_lifecycle,
68 'kwargs': {'name': value},
69 'plural': 'ilm policy(ies)',
70 },
71 'component': {
72 'delete': es.cluster.delete_component_template,
73 'exists': es.cluster.exists_component_template,
74 'get': es.cluster.get_component_template,
75 'kwargs': {'name': value},
76 'plural': 'component template(s)',
77 'key': 'component_templates',
78 },
79 'snapshot': {
80 'delete': es.snapshot.delete,
81 'exists': es.snapshot.get,
82 'get': es.snapshot.get,
83 'kwargs': {'snapshot': value},
84 'plural': 'snapshot(s)',
85 },
86 }
87 return _[kind]
90def change_ds(client: 'Elasticsearch', actions: t.Union[str, None] = None) -> None:
91 """Change/Modify/Update a datastream"""
92 try:
93 client.indices.modify_data_stream(actions=actions, body=None)
94 except Exception as err:
95 raise ResultNotExpected(f'Unable to modify datastreams. {err}') from err
98def create_data_stream(client: 'Elasticsearch', name: str) -> None:
99 """Create a datastream"""
100 try:
101 client.indices.create_data_stream(name=name)
102 test = Exists(client, name=name, kind='datastream', pause=PAUSE_VALUE)
103 test.wait_for_it()
104 except Exception as err:
105 raise TestbedFailure(
106 f'Unable to create datastream {name}. Error: {err}'
107 ) from err
110def create_index(
111 client: 'Elasticsearch',
112 name: str,
113 aliases: t.Union[t.Dict, None] = None,
114 settings: t.Union[t.Dict, None] = None,
115 tier: str = 'hot',
116) -> None:
117 """Create named index"""
118 if not settings:
119 settings = get_routing(tier=tier)
120 else:
121 settings.update(get_routing(tier=tier))
122 client.indices.create(
123 index=name, aliases=aliases, mappings=MAPPING, settings=settings
124 )
125 try:
126 test = Exists(client, name=name, kind='index', pause=PAUSE_VALUE)
127 test.wait_for_it()
128 except TimeoutException as err:
129 raise ResultNotExpected(f'Failed to create index {name}') from err
130 return exists(client, 'index', name)
133def verify(
134 client: 'Elasticsearch',
135 kind: str,
136 name: str,
137 repository: t.Union[str, None] = None,
138) -> bool:
139 """Verify that whatever was deleted is actually deleted"""
140 success = True
141 items = ','.split(name)
142 for item in items:
143 result = exists(client, kind, item, repository=repository)
144 if result: # That means it's still in the cluster
145 success = False
146 return success
149def delete(
150 client: 'Elasticsearch',
151 kind: str,
152 name: str,
153 repository: t.Union[str, None] = None,
154) -> bool:
155 """Delete the named object of type kind"""
156 which = emap(kind, client)
157 func = which['delete']
158 success = False
159 if name is not None: # Typically only with ilm 159 ↛ 178line 159 didn't jump to line 178, because the condition on line 159 was never false
160 try:
161 if kind == 'snapshot':
162 res = func(snapshot=name, repository=repository)
163 elif kind == 'index':
164 res = func(index=name)
165 else:
166 res = func(name=name)
167 except NotFoundError as err: 167 ↛ 168line 167 didn't jump to line 168, because the exception caught by line 167 didn't happen
168 LOGGER.warning('%s named %s not found: %s', kind, name, err)
169 success = True
170 except Exception as err:
171 raise ResultNotExpected(f'Unexpected result: {err}') from err
172 if 'acknowledged' in res and res['acknowledged']: 172 ↛ 176line 172 didn't jump to line 176, because the condition on line 172 was never false
173 success = True
174 LOGGER.info('Deleted %s: "%s"', which['plural'], name)
175 else:
176 success = verify(client, kind, name, repository=repository)
177 else:
178 LOGGER.debug('"%s" has a None value for name', kind)
179 return success
182def do_snap(
183 client: 'Elasticsearch', repo: str, snap: str, idx: str, tier: str = 'cold'
184) -> None:
185 """Perform a snapshot"""
186 client.snapshot.create(repository=repo, snapshot=snap, indices=idx)
187 test = Snapshot(
188 client, action='snapshot', snapshot=snap, repository=repo, pause=1, timeout=60
189 )
190 test.wait_for_it()
192 # Mount the index accordingly
193 client.searchable_snapshots.mount(
194 repository=repo,
195 snapshot=snap,
196 index=idx,
197 index_settings=get_routing(tier=tier),
198 renamed_index=mounted_name(idx, tier),
199 storage=storage_type(tier),
200 wait_for_completion=True,
201 )
202 # Fix aliases
203 fix_aliases(client, idx, mounted_name(idx, tier))
206def exists(
207 client: 'Elasticsearch', kind: str, name: str, repository: t.Union[str, None] = None
208) -> bool:
209 """Return boolean existence of the named kind of object"""
210 if name is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true
211 return False
212 retval = True
213 func = emap(kind, client)['exists']
214 try:
215 if kind == 'snapshot': 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true
216 retval = func(snapshot=name, repository=repository)
217 elif kind == 'ilm':
218 retval = func(name=name)
219 elif kind in ['index', 'data_stream']:
220 retval = func(index=name)
221 else:
222 retval = func(name=name)
223 except NotFoundError:
224 retval = False
225 except Exception as err:
226 raise ResultNotExpected(f'Unexpected result: {err}') from err
227 return retval
230def fill_index(
231 client: 'Elasticsearch',
232 name: t.Union[str, None] = None,
233 count: t.Union[int, None] = None,
234 start_num: t.Union[int, None] = None,
235 match: bool = True,
236) -> None:
237 """
238 Create and fill the named index with mappings and settings as directed
240 :param client: ES client
241 :param name: Index name
242 :param count: The number of docs to create
243 :param start_number: Where to start the incrementing number
244 :param match: Whether to use the default values for key (True) or random strings
245 (False)
247 :type client: es
248 :type name: str
249 :type count: int
250 :type start_number: int
251 :type match: bool
253 :rtype: None
254 :returns: No return value
255 """
256 for doc in doc_gen(count=count, start_at=start_num, match=match):
257 client.index(index=name, document=doc)
258 client.indices.flush(index=name)
259 client.indices.refresh(index=name)
262def find_write_index(client: 'Elasticsearch', name: str) -> t.AnyStr:
263 """Find the write_index for an alias by searching any index the alias points to"""
264 retval = None
265 for alias in get_aliases(client, name):
266 retval = get_write_index(client, alias)
267 if retval: 267 ↛ 265line 267 didn't jump to line 265, because the condition on line 267 was never false
268 break
269 return retval
272def fix_aliases(client: 'Elasticsearch', oldidx: str, newidx: str) -> None:
273 """Fix aliases using the new and old index names as data"""
274 # Delete the original index
275 client.indices.delete(index=oldidx)
276 # Add the original index name as an alias to the mounted index
277 client.indices.put_alias(index=f'{newidx}', name=oldidx)
280def get(
281 client: 'Elasticsearch',
282 kind: str,
283 pattern: str,
284 repository: t.Union[str, None] = None,
285) -> t.Sequence[str]:
286 """get any/all objects of type kind matching pattern"""
287 if pattern is None: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true
288 msg = f'"{kind}" has a None value for pattern'
289 LOGGER.error(msg)
290 raise TestbedMisconfig(msg)
291 which = emap(kind, client, value=pattern)
292 func = which['get']
293 kwargs = which['kwargs']
294 if kind == 'snapshot':
295 kwargs['repository'] = repository
296 try:
297 result = func(**kwargs)
298 except NotFoundError: 298 ↛ 301line 298 didn't jump to line 301
299 LOGGER.debug('%s pattern "%s" had zero matches', kind, pattern)
300 return []
301 except Exception as err:
302 raise ResultNotExpected(f'Unexpected result: {err}') from err
303 if kind == 'snapshot':
304 retval = [x['snapshot'] for x in result['snapshots']]
305 elif kind in ['data_stream', 'template', 'component']:
306 retval = [x['name'] for x in result[which['key']]]
307 else:
308 # ['alias', 'ilm', 'index']
309 retval = list(result.keys())
310 return retval
313def get_aliases(client: 'Elasticsearch', name: str) -> t.Sequence[str]:
314 """Get aliases from index 'name'"""
315 res = client.indices.get(index=name)
316 try:
317 retval = list(res[name]['aliases'].keys())
318 except KeyError:
319 retval = None
320 return retval
323def get_backing_indices(client: 'Elasticsearch', name: str) -> t.Sequence[str]:
324 """Get the backing indices from the named data_stream"""
325 resp = resolver(client, name)
326 data_streams = resp['data_streams']
327 retval = []
328 if data_streams: 328 ↛ 334line 328 didn't jump to line 334, because the condition on line 328 was never false
329 if len(data_streams) > 1: 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true
330 raise ResultNotExpected(
331 f'Expected only a single data_stream matching {name}'
332 )
333 retval = data_streams[0]['backing_indices']
334 return retval
337def get_ds_current(client: 'Elasticsearch', name: str) -> str:
338 """
339 Find which index is the current 'write' index of the datastream
340 This is best accomplished by grabbing the last backing_index
341 """
342 backers = get_backing_indices(client, name)
343 retval = None
344 if backers: 344 ↛ 346line 344 didn't jump to line 346, because the condition on line 344 was never false
345 retval = backers[-1]
346 return retval
349def get_ilm(client: 'Elasticsearch', pattern: str) -> t.Union[t.Dict[str, str], None]:
350 """Get any ILM entity in ES that matches pattern"""
351 try:
352 return client.ilm.get_lifecycle(name=pattern)
353 except Exception as err:
354 msg = f'Unable to get ILM lifecycle matching {pattern}. Error: {err}'
355 LOGGER.critical(msg)
356 raise ResultNotExpected(msg) from err
359def get_ilm_phases(client: 'Elasticsearch', name: str) -> t.Dict:
360 """Return the policy/phases part of the ILM policy identified by 'name'"""
361 ilm = get_ilm(client, name)
362 try:
363 return ilm[name]['policy']['phases']
364 except KeyError as err:
365 msg = f'Unable to get ILM lifecycle named {name}. Error: {err}'
366 LOGGER.critical(msg)
367 raise ResultNotExpected(msg) from err
370def get_write_index(client: 'Elasticsearch', name: str) -> str:
371 """
372 Calls :py:meth:`~.elasticsearch.client.IndicesClient.get_alias`
374 :param client: A client connection object
375 :param name: An alias name
377 :type client: :py:class:`~.elasticsearch.Elasticsearch`
379 :returns: The the index name associated with the alias that is designated
380 ``is_write_index``
381 """
382 response = client.indices.get_alias(index=name)
383 retval = None
384 for index in list(response.keys()): 384 ↛ 391line 384 didn't jump to line 391, because the loop on line 384 didn't complete
385 try:
386 if response[index]['aliases'][name]['is_write_index']:
387 retval = index
388 break
389 except KeyError:
390 continue
391 return retval
394def snapshot_name(client: 'Elasticsearch', name: str) -> t.Union[t.AnyStr, None]:
395 """Get the name of the snapshot behind the mounted index data"""
396 res = {}
397 if exists(client, 'index', name): # Can jump straight to nested keys if it exists 397 ↛ 399line 397 didn't jump to line 399, because the condition on line 397 was never false
398 res = client.indices.get(index=name)[name]['settings']['index']
399 try:
400 retval = res['store']['snapshot']['snapshot_name']
401 except KeyError:
402 LOGGER.error('%s is not a searchable snapshot')
403 retval = None
404 return retval
407def ilm_explain(client: 'Elasticsearch', name: str) -> t.Union[t.Dict, None]:
408 """Return the results from the ILM Explain API call for the named index"""
409 try:
410 retval = client.ilm.explain_lifecycle(index=name)['indices'][name]
411 except KeyError:
412 LOGGER.debug('Index name changed')
413 new = list(client.ilm.explain_lifecycle(index=name)['indices'].keys())[0]
414 retval = client.ilm.explain_lifecycle(index=new)['indices'][new]
415 except NotFoundError as err: 415 ↛ 418line 415 didn't jump to line 418
416 LOGGER.warning('Datastream/Index Name changed. %s was not found', name)
417 raise NameChanged(f'{name} was not found, likely due to a name change') from err
418 except Exception as err:
419 msg = f'Unable to get ILM information for index {name}'
420 LOGGER.critical(msg)
421 raise ResultNotExpected(f'{msg}. Exception: {err}') from err
422 return retval
425def ilm_move(
426 client: 'Elasticsearch', name: str, current_step: t.Dict, next_step: t.Dict
427) -> None:
428 """Move index 'name' from the current step to the next step"""
429 try:
430 client.ilm.move_to_step(
431 index=name, current_step=current_step, next_step=next_step
432 )
433 except Exception as err:
434 msg = f'Unable to move index {name} to ILM next step: {next}. Error: {err}'
435 LOGGER.critical(msg)
436 raise ResultNotExpected(msg, err)
439def put_comp_tmpl(client: 'Elasticsearch', name: str, component: t.Dict) -> None:
440 """Publish a component template"""
441 try:
442 client.cluster.put_component_template(
443 name=name, template=component, create=True
444 )
445 test = Exists(client, name=name, kind='component', pause=PAUSE_VALUE)
446 test.wait_for_it()
447 except Exception as err:
448 raise TestbedFailure(
449 f'Unable to create component template {name}. Error: {err}'
450 ) from err
453def put_idx_tmpl(
454 client,
455 name: str,
456 index_patterns: list,
457 components: list,
458 data_stream: t.Union[t.Dict, None] = None,
459) -> None:
460 """Publish an index template"""
461 try:
462 client.indices.put_index_template(
463 name=name,
464 composed_of=components,
465 data_stream=data_stream,
466 index_patterns=index_patterns,
467 create=True,
468 )
469 test = Exists(client, name=name, kind='template', pause=PAUSE_VALUE)
470 test.wait_for_it()
471 except Exception as err:
472 raise TestbedFailure(
473 f'Unable to create index template {name}. Error: {err}'
474 ) from err
477def put_ilm(
478 client: 'Elasticsearch', name: str, policy: t.Union[t.Dict, None] = None
479) -> None:
480 """Publish an ILM Policy"""
481 try:
482 client.ilm.put_lifecycle(name=name, policy=policy)
483 except Exception as err:
484 raise TestbedFailure(
485 f'Unable to put index lifecycle policy {name}. Error: {err}'
486 ) from err
489def resolver(client: 'Elasticsearch', name: str) -> dict:
490 """
491 Resolve details about the entity, be it an index, alias, or data_stream
493 Because you can pass search patterns and aliases as name, each element comes back
494 as an array:
496 {'indices': [], 'aliases': [], 'data_streams': []}
498 If you only resolve a single index or data stream, you will still have a 1-element
499 list
500 """
501 return client.indices.resolve_index(name=name, expand_wildcards=['open', 'closed'])
504def rollover(client: 'Elasticsearch', name: str) -> None:
505 """Rollover alias or datastream identified by name"""
506 client.indices.rollover(alias=name, wait_for_active_shards='all')