Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/helpers/utils.py: 88%

72 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 23:45 -0600

1"""Utility helper functions""" 

2 

3import typing as t 

4import random 

5import string 

6import logging 

7from datetime import datetime, timezone 

8from ..defaults import ilm_force_merge, ilm_phase, MAPPING, TIER 

9from ..exceptions import TestbedMisconfig 

10 

11 

12def build_ilm_phase( 

13 tier: str, 

14 actions: t.Union[t.Dict, None] = None, 

15 repository: t.Union[str, None] = None, 

16) -> t.Dict: 

17 """Build a single ILM policy step based on tier""" 

18 phase = ilm_phase(tier) 

19 if tier in ['cold', 'frozen']: 

20 if repository: 

21 phase[tier]['actions']['searchable_snapshot'] = { 

22 'snapshot_repository': repository 

23 } 

24 else: 

25 msg = ( 

26 f'Unable to build ILM phase for {tier} tier. Value for repository not ' 

27 f'provided' 

28 ) 

29 raise TestbedMisconfig(msg) 

30 if actions: 

31 phase[tier]['actions'].update(actions) 

32 return phase 

33 

34 

35def build_ilm_policy( 

36 tiers: list = None, 

37 forcemerge: bool = False, 

38 max_num_segments: int = 1, 

39 repository: str = None, 

40) -> t.Dict: 

41 """ 

42 Build a full ILM policy based on the provided tiers. 

43 Put forcemerge in the last tier before cold or frozen (whichever comes first) 

44 """ 

45 if not tiers: 

46 tiers = ['hot', 'delete'] 

47 phases = {} 

48 if ('cold' in tiers or 'frozen' in tiers) and not repository: 

49 raise TestbedMisconfig('Cannot build cold or frozen phase without repository') 

50 for tier in tiers: 

51 phases.update(build_ilm_phase(tier, repository=repository)) 

52 if forcemerge: 

53 phases['hot']['actions'].update( 

54 ilm_force_merge(max_num_segments=max_num_segments) 

55 ) 

56 return {'phases': phases} 

57 

58 

59def doc_gen( 

60 count: int = 10, start_at: int = 0, match: bool = True 

61) -> t.Generator[t.Dict, None, None]: 

62 """Create this doc for each count""" 

63 keys = ['message', 'nested', 'deep'] 

64 # Start with an empty map 

65 matchmap = {} 

66 # Iterate over each key 

67 for key in keys: 

68 # If match is True 

69 if match: 69 ↛ 74line 69 didn't jump to line 74, because the condition on line 69 was never false

70 # Set matchmap[key] to key 

71 matchmap[key] = key 

72 else: 

73 # Otherwise matchmap[key] will have a random string value 

74 matchmap[key] = randomstr() 

75 

76 # This is where count and start_at matter 

77 for num in range(start_at, start_at + count): 

78 yield { 

79 '@timestamp': iso8601_now(), 

80 'message': f'{matchmap["message"]}{num}', # message# or randomstr# 

81 'number': ( 

82 num if match else random.randint(1001, 32767) 

83 ), # value of num or random int 

84 'nested': {'key': f'{matchmap["nested"]}{num}'}, # nested# 

85 'deep': {'l1': {'l2': {'l3': f'{matchmap["deep"]}{num}'}}}, # deep# 

86 } 

87 

88 

89def getlogger(name: str) -> logging.getLogger: 

90 """Return a named logger""" 

91 return logging.getLogger(name) 

92 

93 

94def get_routing(tier='hot') -> t.Dict: 

95 """Return the routing allocation tier preference""" 

96 try: 

97 pref = TIER[tier]['pref'] 

98 except KeyError: 

99 # Fallback value 

100 pref = 'data_content' 

101 return {'index.routing.allocation.include._tier_preference': pref} 

102 

103 

104def iso8601_now() -> str: 

105 """ 

106 :returns: An ISO8601 timestamp based on now 

107 :rtype: str 

108 """ 

109 # Because Python 3.12 now requires non-naive timezone declarations, we must change. 

110 # 

111 # ## Example: 

112 # ## The new way: 

113 # ## datetime.now(timezone.utc).isoformat() 

114 # ## Result: 2024-04-16T16:00:00+00:00 

115 # ## End Example 

116 # 

117 # Note that the +00:00 is appended now where we affirmatively declare the 

118 # UTC timezone 

119 # 

120 # As a result, we will use this function to prune away the timezone if it is 

121 # +00:00 and replace it with Z, which is shorter Zulu notation for UTC (which 

122 # Elasticsearch uses) 

123 # 

124 # We are MANUALLY, FORCEFULLY declaring timezone.utc, so it should ALWAYS be 

125 # +00:00, but could in theory sometime show up as a Z, so we test for that. 

126 

127 parts = datetime.now(timezone.utc).isoformat().split('+') 

128 if len(parts) == 1: 128 ↛ 129line 128 didn't jump to line 129, because the condition on line 128 was never true

129 if parts[0][-1] == 'Z': 

130 return parts[0] # Our ISO8601 already ends with a Z for Zulu/UTC time 

131 return f'{parts[0]}Z' # It doesn't end with a Z so we put one there 

132 if parts[1] == '00:00': 132 ↛ 134line 132 didn't jump to line 134, because the condition on line 132 was never false

133 return f'{parts[0]}Z' # It doesn't end with a Z so we put one there 

134 return f'{parts[0]}+{parts[1]}' # Fallback publishes the +TZ, whatever that was 

135 

136 

137def mapping_component() -> t.Dict: 

138 """Return a mappings component template""" 

139 return {'mappings': MAPPING} 

140 

141 

142def mounted_name(index: str, tier: str): 

143 """Return a value for renamed_index for mounting a searchable snapshot index""" 

144 return f'{TIER[tier]["prefix"]}-{index}' 

145 

146 

147def randomstr(length: int = 16, lowercase: bool = False) -> str: 

148 """Generate a random string""" 

149 letters = string.ascii_uppercase 

150 if lowercase: 150 ↛ 152line 150 didn't jump to line 152, because the condition on line 150 was never false

151 letters = string.ascii_lowercase 

152 return str(''.join(random.choices(letters + string.digits, k=length))) 

153 

154 

155def setting_component( 

156 ilm_policy: t.Union[str, None] = None, rollover_alias: t.Union[str, None] = None 

157) -> t.Dict[str, t.Any]: 

158 """Return a settings component template""" 

159 val = {'settings': {'index.number_of_replicas': 0}} 

160 if ilm_policy: 

161 val['settings']['index.lifecycle.name'] = ilm_policy 

162 if rollover_alias: 

163 val['settings']['index.lifecycle.rollover_alias'] = rollover_alias 

164 return val 

165 

166 

167def storage_type(tier: str) -> t.Dict: 

168 """Return the storage type of a searchable snapshot by tier""" 

169 return TIER[tier]["storage"]