Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/helpers/es_api.py: 73%

232 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 23:52 -0600

1"""Functions that make Elasticsearch API Calls""" 

2 

3import typing as t 

4from os import getenv 

5from elasticsearch8.exceptions import NotFoundError 

6from es_wait import Exists, Snapshot 

7from ..defaults import MAPPING, PAUSE_DEFAULT, PAUSE_ENVVAR 

8from ..exceptions import ( 

9 NameChanged, 

10 ResultNotExpected, 

11 TestbedFailure, 

12 TestbedMisconfig, 

13 TimeoutException, 

14) 

15from ..helpers.utils import ( 

16 doc_gen, 

17 get_routing, 

18 getlogger, 

19 mounted_name, 

20 storage_type, 

21) 

22 

23if t.TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24, because the condition on line 23 was never true

24 from elasticsearch8 import Elasticsearch 

25 

26LOGGER = getlogger(__name__) 

27PAUSE_VALUE = float(getenv(PAUSE_ENVVAR, default=PAUSE_DEFAULT)) 

28# pylint: disable=broad-except,W0707 

29 

30 

31def emap(kind: str, es: 'Elasticsearch', value=None) -> t.Dict[str, t.Any]: 

32 """Return a value from a dictionary""" 

33 _ = { 

34 'alias': { 

35 'delete': es.indices.delete_alias, 

36 'exists': es.indices.exists_alias, 

37 'get': es.indices.get_alias, 

38 'kwargs': {'index': value, 'expand_wildcards': ['open', 'closed']}, 

39 'plural': 'alias(es)', 

40 }, 

41 'data_stream': { 

42 'delete': es.indices.delete_data_stream, 

43 'exists': es.indices.exists, 

44 'get': es.indices.get_data_stream, 

45 'kwargs': {'name': value, 'expand_wildcards': ['open', 'closed']}, 

46 'plural': 'data_stream(s)', 

47 'key': 'data_streams', 

48 }, 

49 'index': { 

50 'delete': es.indices.delete, 

51 'exists': es.indices.exists, 

52 'get': es.indices.get, 

53 'kwargs': {'index': value, 'expand_wildcards': ['open', 'closed']}, 

54 'plural': 'index(es)', 

55 }, 

56 'template': { 

57 'delete': es.indices.delete_index_template, 

58 'exists': es.indices.exists_index_template, 

59 'get': es.indices.get_index_template, 

60 'kwargs': {'name': value}, 

61 'plural': 'index template(s)', 

62 'key': 'index_templates', 

63 }, 

64 'ilm': { 

65 'delete': es.ilm.delete_lifecycle, 

66 'exists': es.ilm.get_lifecycle, 

67 'get': es.ilm.get_lifecycle, 

68 'kwargs': {'name': value}, 

69 'plural': 'ilm policy(ies)', 

70 }, 

71 'component': { 

72 'delete': es.cluster.delete_component_template, 

73 'exists': es.cluster.exists_component_template, 

74 'get': es.cluster.get_component_template, 

75 'kwargs': {'name': value}, 

76 'plural': 'component template(s)', 

77 'key': 'component_templates', 

78 }, 

79 'snapshot': { 

80 'delete': es.snapshot.delete, 

81 'exists': es.snapshot.get, 

82 'get': es.snapshot.get, 

83 'kwargs': {'snapshot': value}, 

84 'plural': 'snapshot(s)', 

85 }, 

86 } 

87 return _[kind] 

88 

89 

90def change_ds(client: 'Elasticsearch', actions: t.Union[str, None] = None) -> None: 

91 """Change/Modify/Update a datastream""" 

92 try: 

93 client.indices.modify_data_stream(actions=actions, body=None) 

94 except Exception as err: 

95 raise ResultNotExpected(f'Unable to modify datastreams. {err}') from err 

96 

97 

98def create_data_stream(client: 'Elasticsearch', name: str) -> None: 

99 """Create a datastream""" 

100 try: 

101 client.indices.create_data_stream(name=name) 

102 test = Exists(client, name=name, kind='datastream', pause=PAUSE_VALUE) 

103 test.wait_for_it() 

104 except Exception as err: 

105 raise TestbedFailure( 

106 f'Unable to create datastream {name}. Error: {err}' 

107 ) from err 

108 

109 

110def create_index( 

111 client: 'Elasticsearch', 

112 name: str, 

113 aliases: t.Union[t.Dict, None] = None, 

114 settings: t.Union[t.Dict, None] = None, 

115 tier: str = 'hot', 

116) -> None: 

117 """Create named index""" 

118 if not settings: 

119 settings = get_routing(tier=tier) 

120 else: 

121 settings.update(get_routing(tier=tier)) 

122 client.indices.create( 

123 index=name, aliases=aliases, mappings=MAPPING, settings=settings 

124 ) 

125 try: 

126 test = Exists(client, name=name, kind='index', pause=PAUSE_VALUE) 

127 test.wait_for_it() 

128 except TimeoutException as err: 

129 raise ResultNotExpected(f'Failed to create index {name}') from err 

130 return exists(client, 'index', name) 

131 

132 

133def verify( 

134 client: 'Elasticsearch', 

135 kind: str, 

136 name: str, 

137 repository: t.Union[str, None] = None, 

138) -> bool: 

139 """Verify that whatever was deleted is actually deleted""" 

140 success = True 

141 items = ','.split(name) 

142 for item in items: 

143 result = exists(client, kind, item, repository=repository) 

144 if result: # That means it's still in the cluster 

145 success = False 

146 return success 

147 

148 

149def delete( 

150 client: 'Elasticsearch', 

151 kind: str, 

152 name: str, 

153 repository: t.Union[str, None] = None, 

154) -> bool: 

155 """Delete the named object of type kind""" 

156 which = emap(kind, client) 

157 func = which['delete'] 

158 success = False 

159 if name is not None: # Typically only with ilm 159 ↛ 178line 159 didn't jump to line 178, because the condition on line 159 was never false

160 try: 

161 if kind == 'snapshot': 

162 res = func(snapshot=name, repository=repository) 

163 elif kind == 'index': 

164 res = func(index=name) 

165 else: 

166 res = func(name=name) 

167 except NotFoundError as err: 167 ↛ 168line 167 didn't jump to line 168, because the exception caught by line 167 didn't happen

168 LOGGER.warning('%s named %s not found: %s', kind, name, err) 

169 success = True 

170 except Exception as err: 

171 raise ResultNotExpected(f'Unexpected result: {err}') from err 

172 if 'acknowledged' in res and res['acknowledged']: 172 ↛ 176line 172 didn't jump to line 176, because the condition on line 172 was never false

173 success = True 

174 LOGGER.info('Deleted %s: "%s"', which['plural'], name) 

175 else: 

176 success = verify(client, kind, name, repository=repository) 

177 else: 

178 LOGGER.debug('"%s" has a None value for name', kind) 

179 return success 

180 

181 

182def do_snap( 

183 client: 'Elasticsearch', repo: str, snap: str, idx: str, tier: str = 'cold' 

184) -> None: 

185 """Perform a snapshot""" 

186 client.snapshot.create(repository=repo, snapshot=snap, indices=idx) 

187 test = Snapshot( 

188 client, action='snapshot', snapshot=snap, repository=repo, pause=1, timeout=60 

189 ) 

190 test.wait_for_it() 

191 

192 # Mount the index accordingly 

193 client.searchable_snapshots.mount( 

194 repository=repo, 

195 snapshot=snap, 

196 index=idx, 

197 index_settings=get_routing(tier=tier), 

198 renamed_index=mounted_name(idx, tier), 

199 storage=storage_type(tier), 

200 wait_for_completion=True, 

201 ) 

202 # Fix aliases 

203 fix_aliases(client, idx, mounted_name(idx, tier)) 

204 

205 

206def exists( 

207 client: 'Elasticsearch', kind: str, name: str, repository: t.Union[str, None] = None 

208) -> bool: 

209 """Return boolean existence of the named kind of object""" 

210 if name is None: 210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true

211 return False 

212 retval = True 

213 func = emap(kind, client)['exists'] 

214 try: 

215 if kind == 'snapshot': 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true

216 retval = func(snapshot=name, repository=repository) 

217 elif kind == 'ilm': 

218 retval = func(name=name) 

219 elif kind in ['index', 'data_stream']: 

220 retval = func(index=name) 

221 else: 

222 retval = func(name=name) 

223 except NotFoundError: 

224 retval = False 

225 except Exception as err: 

226 raise ResultNotExpected(f'Unexpected result: {err}') from err 

227 return retval 

228 

229 

230def fill_index( 

231 client: 'Elasticsearch', 

232 name: t.Union[str, None] = None, 

233 count: t.Union[int, None] = None, 

234 start_num: t.Union[int, None] = None, 

235 match: bool = True, 

236) -> None: 

237 """ 

238 Create and fill the named index with mappings and settings as directed 

239 

240 :param client: ES client 

241 :param name: Index name 

242 :param count: The number of docs to create 

243 :param start_number: Where to start the incrementing number 

244 :param match: Whether to use the default values for key (True) or random strings 

245 (False) 

246 

247 :type client: es 

248 :type name: str 

249 :type count: int 

250 :type start_number: int 

251 :type match: bool 

252 

253 :rtype: None 

254 :returns: No return value 

255 """ 

256 for doc in doc_gen(count=count, start_at=start_num, match=match): 

257 client.index(index=name, document=doc) 

258 client.indices.flush(index=name) 

259 client.indices.refresh(index=name) 

260 

261 

262def find_write_index(client: 'Elasticsearch', name: str) -> t.AnyStr: 

263 """Find the write_index for an alias by searching any index the alias points to""" 

264 retval = None 

265 for alias in get_aliases(client, name): 

266 retval = get_write_index(client, alias) 

267 if retval: 267 ↛ 265line 267 didn't jump to line 265, because the condition on line 267 was never false

268 break 

269 return retval 

270 

271 

272def fix_aliases(client: 'Elasticsearch', oldidx: str, newidx: str) -> None: 

273 """Fix aliases using the new and old index names as data""" 

274 # Delete the original index 

275 client.indices.delete(index=oldidx) 

276 # Add the original index name as an alias to the mounted index 

277 client.indices.put_alias(index=f'{newidx}', name=oldidx) 

278 

279 

280def get( 

281 client: 'Elasticsearch', 

282 kind: str, 

283 pattern: str, 

284 repository: t.Union[str, None] = None, 

285) -> t.Sequence[str]: 

286 """get any/all objects of type kind matching pattern""" 

287 if pattern is None: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true

288 msg = f'"{kind}" has a None value for pattern' 

289 LOGGER.error(msg) 

290 raise TestbedMisconfig(msg) 

291 which = emap(kind, client, value=pattern) 

292 func = which['get'] 

293 kwargs = which['kwargs'] 

294 if kind == 'snapshot': 

295 kwargs['repository'] = repository 

296 try: 

297 result = func(**kwargs) 

298 except NotFoundError: 298 ↛ 301line 298 didn't jump to line 301

299 LOGGER.debug('%s pattern "%s" had zero matches', kind, pattern) 

300 return [] 

301 except Exception as err: 

302 raise ResultNotExpected(f'Unexpected result: {err}') from err 

303 if kind == 'snapshot': 

304 retval = [x['snapshot'] for x in result['snapshots']] 

305 elif kind in ['data_stream', 'template', 'component']: 

306 retval = [x['name'] for x in result[which['key']]] 

307 else: 

308 # ['alias', 'ilm', 'index'] 

309 retval = list(result.keys()) 

310 return retval 

311 

312 

313def get_aliases(client: 'Elasticsearch', name: str) -> t.Sequence[str]: 

314 """Get aliases from index 'name'""" 

315 res = client.indices.get(index=name) 

316 try: 

317 retval = list(res[name]['aliases'].keys()) 

318 except KeyError: 

319 retval = None 

320 return retval 

321 

322 

323def get_backing_indices(client: 'Elasticsearch', name: str) -> t.Sequence[str]: 

324 """Get the backing indices from the named data_stream""" 

325 resp = resolver(client, name) 

326 data_streams = resp['data_streams'] 

327 retval = [] 

328 if data_streams: 328 ↛ 334line 328 didn't jump to line 334, because the condition on line 328 was never false

329 if len(data_streams) > 1: 329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true

330 raise ResultNotExpected( 

331 f'Expected only a single data_stream matching {name}' 

332 ) 

333 retval = data_streams[0]['backing_indices'] 

334 return retval 

335 

336 

337def get_ds_current(client: 'Elasticsearch', name: str) -> str: 

338 """ 

339 Find which index is the current 'write' index of the datastream 

340 This is best accomplished by grabbing the last backing_index 

341 """ 

342 backers = get_backing_indices(client, name) 

343 retval = None 

344 if backers: 344 ↛ 346line 344 didn't jump to line 346, because the condition on line 344 was never false

345 retval = backers[-1] 

346 return retval 

347 

348 

349def get_ilm(client: 'Elasticsearch', pattern: str) -> t.Union[t.Dict[str, str], None]: 

350 """Get any ILM entity in ES that matches pattern""" 

351 try: 

352 return client.ilm.get_lifecycle(name=pattern) 

353 except Exception as err: 

354 msg = f'Unable to get ILM lifecycle matching {pattern}. Error: {err}' 

355 LOGGER.critical(msg) 

356 raise ResultNotExpected(msg) from err 

357 

358 

359def get_ilm_phases(client: 'Elasticsearch', name: str) -> t.Dict: 

360 """Return the policy/phases part of the ILM policy identified by 'name'""" 

361 ilm = get_ilm(client, name) 

362 try: 

363 return ilm[name]['policy']['phases'] 

364 except KeyError as err: 

365 msg = f'Unable to get ILM lifecycle named {name}. Error: {err}' 

366 LOGGER.critical(msg) 

367 raise ResultNotExpected(msg) from err 

368 

369 

370def get_write_index(client: 'Elasticsearch', name: str) -> str: 

371 """ 

372 Calls :py:meth:`~.elasticsearch.client.IndicesClient.get_alias` 

373 

374 :param client: A client connection object 

375 :param name: An alias name 

376 

377 :type client: :py:class:`~.elasticsearch.Elasticsearch` 

378 

379 :returns: The the index name associated with the alias that is designated 

380 ``is_write_index`` 

381 """ 

382 response = client.indices.get_alias(index=name) 

383 retval = None 

384 for index in list(response.keys()): 384 ↛ 391line 384 didn't jump to line 391, because the loop on line 384 didn't complete

385 try: 

386 if response[index]['aliases'][name]['is_write_index']: 

387 retval = index 

388 break 

389 except KeyError: 

390 continue 

391 return retval 

392 

393 

394def snapshot_name(client: 'Elasticsearch', name: str) -> t.Union[t.AnyStr, None]: 

395 """Get the name of the snapshot behind the mounted index data""" 

396 res = {} 

397 if exists(client, 'index', name): # Can jump straight to nested keys if it exists 397 ↛ 399line 397 didn't jump to line 399, because the condition on line 397 was never false

398 res = client.indices.get(index=name)[name]['settings']['index'] 

399 try: 

400 retval = res['store']['snapshot']['snapshot_name'] 

401 except KeyError: 

402 LOGGER.error('%s is not a searchable snapshot') 

403 retval = None 

404 return retval 

405 

406 

407def ilm_explain(client: 'Elasticsearch', name: str) -> t.Union[t.Dict, None]: 

408 """Return the results from the ILM Explain API call for the named index""" 

409 try: 

410 retval = client.ilm.explain_lifecycle(index=name)['indices'][name] 

411 except KeyError: 

412 LOGGER.debug('Index name changed') 

413 new = list(client.ilm.explain_lifecycle(index=name)['indices'].keys())[0] 

414 retval = client.ilm.explain_lifecycle(index=new)['indices'][new] 

415 except NotFoundError as err: 415 ↛ 418line 415 didn't jump to line 418

416 LOGGER.warning('Datastream/Index Name changed. %s was not found', name) 

417 raise NameChanged(f'{name} was not found, likely due to a name change') from err 

418 except Exception as err: 

419 msg = f'Unable to get ILM information for index {name}' 

420 LOGGER.critical(msg) 

421 raise ResultNotExpected(f'{msg}. Exception: {err}') from err 

422 return retval 

423 

424 

425def ilm_move( 

426 client: 'Elasticsearch', name: str, current_step: t.Dict, next_step: t.Dict 

427) -> None: 

428 """Move index 'name' from the current step to the next step""" 

429 try: 

430 client.ilm.move_to_step( 

431 index=name, current_step=current_step, next_step=next_step 

432 ) 

433 except Exception as err: 

434 msg = f'Unable to move index {name} to ILM next step: {next}. Error: {err}' 

435 LOGGER.critical(msg) 

436 raise ResultNotExpected(msg, err) 

437 

438 

439def put_comp_tmpl(client: 'Elasticsearch', name: str, component: t.Dict) -> None: 

440 """Publish a component template""" 

441 try: 

442 client.cluster.put_component_template( 

443 name=name, template=component, create=True 

444 ) 

445 test = Exists(client, name=name, kind='component', pause=PAUSE_VALUE) 

446 test.wait_for_it() 

447 except Exception as err: 

448 raise TestbedFailure( 

449 f'Unable to create component template {name}. Error: {err}' 

450 ) from err 

451 

452 

453def put_idx_tmpl( 

454 client, 

455 name: str, 

456 index_patterns: list, 

457 components: list, 

458 data_stream: t.Union[t.Dict, None] = None, 

459) -> None: 

460 """Publish an index template""" 

461 try: 

462 client.indices.put_index_template( 

463 name=name, 

464 composed_of=components, 

465 data_stream=data_stream, 

466 index_patterns=index_patterns, 

467 create=True, 

468 ) 

469 test = Exists(client, name=name, kind='template', pause=PAUSE_VALUE) 

470 test.wait_for_it() 

471 except Exception as err: 

472 raise TestbedFailure( 

473 f'Unable to create index template {name}. Error: {err}' 

474 ) from err 

475 

476 

477def put_ilm( 

478 client: 'Elasticsearch', name: str, policy: t.Union[t.Dict, None] = None 

479) -> None: 

480 """Publish an ILM Policy""" 

481 try: 

482 client.ilm.put_lifecycle(name=name, policy=policy) 

483 except Exception as err: 

484 raise TestbedFailure( 

485 f'Unable to put index lifecycle policy {name}. Error: {err}' 

486 ) from err 

487 

488 

489def resolver(client: 'Elasticsearch', name: str) -> dict: 

490 """ 

491 Resolve details about the entity, be it an index, alias, or data_stream 

492 

493 Because you can pass search patterns and aliases as name, each element comes back 

494 as an array: 

495 

496 {'indices': [], 'aliases': [], 'data_streams': []} 

497 

498 If you only resolve a single index or data stream, you will still have a 1-element 

499 list 

500 """ 

501 return client.indices.resolve_index(name=name, expand_wildcards=['open', 'closed']) 

502 

503 

504def rollover(client: 'Elasticsearch', name: str) -> None: 

505 """Rollover alias or datastream identified by name""" 

506 client.indices.rollover(alias=name, wait_for_active_shards='all')