Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/helpers/utils.py: 88%
72 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:45 -0600
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-27 23:45 -0600
1"""Utility helper functions"""
3import typing as t
4import random
5import string
6import logging
7from datetime import datetime, timezone
8from ..defaults import ilm_force_merge, ilm_phase, MAPPING, TIER
9from ..exceptions import TestbedMisconfig
12def build_ilm_phase(
13 tier: str,
14 actions: t.Union[t.Dict, None] = None,
15 repository: t.Union[str, None] = None,
16) -> t.Dict:
17 """Build a single ILM policy step based on tier"""
18 phase = ilm_phase(tier)
19 if tier in ['cold', 'frozen']:
20 if repository:
21 phase[tier]['actions']['searchable_snapshot'] = {
22 'snapshot_repository': repository
23 }
24 else:
25 msg = (
26 f'Unable to build ILM phase for {tier} tier. Value for repository not '
27 f'provided'
28 )
29 raise TestbedMisconfig(msg)
30 if actions:
31 phase[tier]['actions'].update(actions)
32 return phase
35def build_ilm_policy(
36 tiers: list = None,
37 forcemerge: bool = False,
38 max_num_segments: int = 1,
39 repository: str = None,
40) -> t.Dict:
41 """
42 Build a full ILM policy based on the provided tiers.
43 Put forcemerge in the last tier before cold or frozen (whichever comes first)
44 """
45 if not tiers:
46 tiers = ['hot', 'delete']
47 phases = {}
48 if ('cold' in tiers or 'frozen' in tiers) and not repository:
49 raise TestbedMisconfig('Cannot build cold or frozen phase without repository')
50 for tier in tiers:
51 phases.update(build_ilm_phase(tier, repository=repository))
52 if forcemerge:
53 phases['hot']['actions'].update(
54 ilm_force_merge(max_num_segments=max_num_segments)
55 )
56 return {'phases': phases}
59def doc_gen(
60 count: int = 10, start_at: int = 0, match: bool = True
61) -> t.Generator[t.Dict, None, None]:
62 """Create this doc for each count"""
63 keys = ['message', 'nested', 'deep']
64 # Start with an empty map
65 matchmap = {}
66 # Iterate over each key
67 for key in keys:
68 # If match is True
69 if match: 69 ↛ 74line 69 didn't jump to line 74, because the condition on line 69 was never false
70 # Set matchmap[key] to key
71 matchmap[key] = key
72 else:
73 # Otherwise matchmap[key] will have a random string value
74 matchmap[key] = randomstr()
76 # This is where count and start_at matter
77 for num in range(start_at, start_at + count):
78 yield {
79 '@timestamp': iso8601_now(),
80 'message': f'{matchmap["message"]}{num}', # message# or randomstr#
81 'number': (
82 num if match else random.randint(1001, 32767)
83 ), # value of num or random int
84 'nested': {'key': f'{matchmap["nested"]}{num}'}, # nested#
85 'deep': {'l1': {'l2': {'l3': f'{matchmap["deep"]}{num}'}}}, # deep#
86 }
89def getlogger(name: str) -> logging.getLogger:
90 """Return a named logger"""
91 return logging.getLogger(name)
94def get_routing(tier='hot') -> t.Dict:
95 """Return the routing allocation tier preference"""
96 try:
97 pref = TIER[tier]['pref']
98 except KeyError:
99 # Fallback value
100 pref = 'data_content'
101 return {'index.routing.allocation.include._tier_preference': pref}
104def iso8601_now() -> str:
105 """
106 :returns: An ISO8601 timestamp based on now
107 :rtype: str
108 """
109 # Because Python 3.12 now requires non-naive timezone declarations, we must change.
110 #
111 # ## Example:
112 # ## The new way:
113 # ## datetime.now(timezone.utc).isoformat()
114 # ## Result: 2024-04-16T16:00:00+00:00
115 # ## End Example
116 #
117 # Note that the +00:00 is appended now where we affirmatively declare the
118 # UTC timezone
119 #
120 # As a result, we will use this function to prune away the timezone if it is
121 # +00:00 and replace it with Z, which is shorter Zulu notation for UTC (which
122 # Elasticsearch uses)
123 #
124 # We are MANUALLY, FORCEFULLY declaring timezone.utc, so it should ALWAYS be
125 # +00:00, but could in theory sometime show up as a Z, so we test for that.
127 parts = datetime.now(timezone.utc).isoformat().split('+')
128 if len(parts) == 1: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true
129 if parts[0][-1] == 'Z':
130 return parts[0] # Our ISO8601 already ends with a Z for Zulu/UTC time
131 return f'{parts[0]}Z' # It doesn't end with a Z so we put one there
132 if parts[1] == '00:00': 132 ↛ 134line 132 didn't jump to line 134, because the condition on line 132 was never false
133 return f'{parts[0]}Z' # It doesn't end with a Z so we put one there
134 return f'{parts[0]}+{parts[1]}' # Fallback publishes the +TZ, whatever that was
137def mapping_component() -> t.Dict:
138 """Return a mappings component template"""
139 return {'mappings': MAPPING}
142def mounted_name(index: str, tier: str):
143 """Return a value for renamed_index for mounting a searchable snapshot index"""
144 return f'{TIER[tier]["prefix"]}-{index}'
147def randomstr(length: int = 16, lowercase: bool = False) -> str:
148 """Generate a random string"""
149 letters = string.ascii_uppercase
150 if lowercase: 150 ↛ 152line 150 didn't jump to line 152, because the condition on line 150 was never false
151 letters = string.ascii_lowercase
152 return str(''.join(random.choices(letters + string.digits, k=length)))
155def setting_component(
156 ilm_policy: t.Union[str, None] = None, rollover_alias: t.Union[str, None] = None
157) -> t.Dict[str, t.Any]:
158 """Return a settings component template"""
159 val = {'settings': {'index.number_of_replicas': 0}}
160 if ilm_policy:
161 val['settings']['index.lifecycle.name'] = ilm_policy
162 if rollover_alias:
163 val['settings']['index.lifecycle.rollover_alias'] = rollover_alias
164 return val
167def storage_type(tier: str) -> t.Dict:
168 """Return the storage type of a searchable snapshot by tier"""
169 return TIER[tier]["storage"]