Coverage for /Users/buh/.pyenv/versions/3.12.2/envs/es-testbed/lib/python3.12/site-packages/es_testbed/_base.py: 83%

119 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-27 23:52 -0600

1"""Base TestBed Class""" 

2 

3import typing as t 

4from datetime import datetime, timezone 

5from dotmap import DotMap 

6from .exceptions import ResultNotExpected 

7from .defaults import NAMEMAPPER 

8from .helpers.es_api import delete, get 

9from .helpers.utils import getlogger 

10from ._plan import PlanBuilder 

11from .mgrs import ( 

12 ComponentMgr, 

13 DataStreamMgr, 

14 IlmMgr, 

15 IndexMgr, 

16 SnapshotMgr, 

17 TemplateMgr, 

18) 

19 

20if t.TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21, because the condition on line 20 was never true

21 from elasticsearch8 import Elasticsearch 

22 

23# pylint: disable=R0902,W0719 

24 

25 

26class TestBed: 

27 """TestBed Class""" 

28 

29 __test__ = False 

30 

31 def __init__( 

32 self, 

33 client: 'Elasticsearch' = None, 

34 plan: DotMap = None, 

35 autobuild: t.Optional[bool] = False, 

36 ): 

37 self.logger = getlogger('es_testbed.TestBed') 

38 self.client = client 

39 if plan is None: 39 ↛ 40line 39 didn't jump to line 40, because the condition on line 39 was never true

40 plan = PlanBuilder().plan # Use defaults 

41 self.plan = plan 

42 

43 # Set up for tracking 

44 self.ilmmgr = None 

45 self.componentmgr = None 

46 self.templatemgr = None 

47 self.snapshotmgr = None 

48 self.indexmgr = None 

49 self.data_streammgr = None 

50 

51 if autobuild: 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true

52 self.setup() 

53 

54 def _erase(self, kind: str, lst: t.Sequence[str]) -> None: 

55 overall_success = True 

56 if not lst: 

57 self.logger.debug('%s: nothing to delete.', kind) 

58 return True 

59 if kind == 'ilm': # ILM policies can't be batch deleted 

60 ilm = [self._while(kind, x) for x in lst] 

61 overall_success = False not in ilm # No False values == True 

62 else: 

63 overall_success = self._while(kind, ','.join(lst)) 

64 return overall_success 

65 

66 def _fodder_generator( 

67 self, 

68 ) -> t.Generator[str, t.Sequence[str], None]: 

69 """Method to delete everything matching our pattern(s)""" 

70 items = ['index', 'data_stream', 'snapshot', 'template', 'component', 'ilm'] 

71 for i in items: 

72 if i == 'snapshot' and self.plan.repository is None: 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 self.logger.debug('No repository, no snapshots.') 

74 continue 

75 # self.logger.debug('Generating a list of type "%s"', i) 

76 pattern = f'*{self.plan.prefix}-{NAMEMAPPER[i]}-{self.plan.uniq}*' 

77 entities = get(self.client, i, pattern, repository=self.plan.repository) 

78 yield (i, entities) 

79 

80 def _while(self, kind: str, item: str) -> bool: 

81 count = 1 

82 success = False 

83 exc = None 

84 while count < 4 and not success: 84 ↛ 94line 84 didn't jump to line 94, because the condition on line 84 was never false

85 try: 

86 success = delete( 

87 self.client, kind, item, repository=self.plan.repository 

88 ) 

89 break 

90 except ResultNotExpected as err: 

91 self.logger.debug('Tried deleting "%s" %s time(s)', item, count) 

92 exc = err 

93 count += 1 

94 if not success: 94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true

95 self.logger.warning( 

96 'Failed to delete "%s" after %s tries. Final error: %s', 

97 item, 

98 count - 1, 

99 exc, 

100 ) 

101 return success 

102 

103 def get_ilm_polling(self) -> None: 

104 """ 

105 Get current ILM polling settings and store them in self.plan.polling_interval 

106 """ 

107 self.logger.info('Storing current ILM polling settings, if any...') 

108 try: 

109 res = self.client.cluster.get_settings() 

110 self.logger.debug('Cluster settings: %s', res) 

111 except Exception as err: 

112 self.logger.critical('Unable to get persistent cluster settings') 

113 self.logger.critical('This could be permissions, or something larger.') 

114 self.logger.critical('Exception: %s', err) 

115 self.logger.critical('Exiting.') 

116 raise Exception from err 

117 try: 

118 retval = res['persistent']['indices']['lifecycle']['poll_interval'] 

119 except KeyError: 

120 self.logger.debug( 

121 'No setting for indices.lifecycle.poll_interval. Must be default' 

122 ) 

123 retval = None # Must be an actual value to go into a DotMap 

124 if retval == '1s': 124 ↛ 125line 124 didn't jump to line 125

125 msg = ( 

126 'ILM polling already set at 1s. A previous run most likely did not ' 

127 'tear down properly. Resetting to null after this run' 

128 ) 

129 self.logger.warning(msg) 

130 retval = None # Must be an actual value to go into a DotMap 

131 self.logger.debug('retval = %s', retval) 

132 self.plan.ilm_polling_interval = retval 

133 self.logger.info('Stored ILM Polling Interval: %s', retval) 

134 

135 def ilm_polling(self, interval: t.Union[str, None] = None) -> t.Dict: 

136 """Return persistent cluster settings to speed up ILM polling during testing""" 

137 return {'indices.lifecycle.poll_interval': interval} 

138 

139 def setup(self) -> None: 

140 """Setup the instance""" 

141 start = datetime.now(timezone.utc) 

142 self.get_ilm_polling() 

143 self.logger.info('Setting: %s', self.ilm_polling(interval='1s')) 

144 self.client.cluster.put_settings(persistent=self.ilm_polling(interval='1s')) 

145 self.setup_entitymgrs() 

146 end = datetime.now(timezone.utc) 

147 self.logger.info( 

148 'Testbed setup elapsed time: %s', (end - start).total_seconds() 

149 ) 

150 

151 def setup_entitymgrs(self) -> None: 

152 """ 

153 Setup each EntityMgr child class 

154 """ 

155 kw = {'client': self.client, 'plan': self.plan} 

156 self.ilmmgr = IlmMgr(**kw) 

157 self.componentmgr = ComponentMgr(**kw) 

158 self.templatemgr = TemplateMgr(**kw) 

159 self.snapshotmgr = SnapshotMgr(**kw) 

160 if self.plan.type == 'indices': 

161 self.indexmgr = IndexMgr(**kw, snapmgr=self.snapshotmgr) 

162 if self.plan.type == 'data_stream': 

163 self.data_streammgr = DataStreamMgr(**kw, snapmgr=self.snapshotmgr) 

164 

165 def teardown(self) -> None: 

166 """Tear down anything we created""" 

167 start = datetime.now(timezone.utc) 

168 successful = True 

169 for kind, list_of_kind in self._fodder_generator(): 

170 if not self._erase(kind, list_of_kind): 170 ↛ 171line 170 didn't jump to line 171, because the condition on line 170 was never true

171 successful = False 

172 persist = self.ilm_polling(interval=self.plan.ilm_polling_interval) 

173 self.logger.info( 

174 'Restoring ILM polling to previous value: %s', 

175 self.plan.ilm_polling_interval, 

176 ) 

177 self.client.cluster.put_settings(persistent=persist) 

178 end = datetime.now(timezone.utc) 

179 self.logger.info( 

180 'Testbed teardown elapsed time: %s', (end - start).total_seconds() 

181 ) 

182 if successful: 182 ↛ 185line 182 didn't jump to line 185, because the condition on line 182 was never false

183 self.logger.info('Cleanup successful') 

184 else: 

185 self.logger.error('Cleanup was unsuccessful/incomplete') 

186 self.plan.cleanup = successful