1 import logging
2 import os
3 from unittest import mock
4
5 from lxml import etree
6
7 from pcs import settings
8 from pcs.lib.external import CommandRunner
9
10 from pcs_test.tools.assertions import AssertPcsMixin
11 from pcs_test.tools.bin_mock import get_mock_settings
12 from pcs_test.tools.custom_mock import MockLibraryReportProcessor
13 from pcs_test.tools.misc import (
14 get_test_resource,
15 get_tmp_file,
16 write_file_to_tmpfile,
17 )
18 from pcs_test.tools.pcs_runner import PcsRunner
19 from pcs_test.tools.xml import etree_to_str
20
21
22 class CachedCibFixture(AssertPcsMixin):
23 def __init__(self, cache_name, empty_cib_path):
24 self._empty_cib_path = empty_cib_path
25 self._cache_name = cache_name
26 self._cache_path = None
27 self._pcs_runner = None
28
29 def _setup_cib(self):
30 raise NotImplementedError()
31
32 def set_up(self):
33 fixture_dir = get_test_resource("temp_fixtures")
34 os.makedirs(fixture_dir, exist_ok=True)
35 self._cache_path = os.path.join(fixture_dir, self._cache_name)
36 self._pcs_runner = PcsRunner(self._cache_path)
37 self._pcs_runner.mock_settings = get_mock_settings()
38
39 with (
40 open(self._empty_cib_path, "r") as template_file,
41 open(self.cache_path, "w") as cache_file,
42 ):
43 cache_file.write(template_file.read())
44 self._setup_cib()
45
46 def clean_up(self):
47 if os.path.isfile(self.cache_path):
48 os.unlink(self.cache_path)
49
50 @property
51 def cache_path(self):
52 if self._cache_path is None:
53 raise AssertionError("Cache has not been initialized")
54 return self._cache_path
55
56 # methods for supporting assert_pcs_success
57 @property
58 def pcs_runner(self):
59 if self._pcs_runner is None:
60 raise AssertionError("Cache has not been initialized")
61 return self._pcs_runner
62
63 def assertEqual(self, first, second, msg=None):
64 # pylint: disable=invalid-name
65 # pylint: disable=no-self-use
66 if first != second:
67 raise AssertionError(
68 f"{msg}\n{first} != {second}" if msg else f"{first} != {second}"
69 )
70
71
72 def wrap_element_by_master(cib_file, resource_id, master_id=None):
73 cib_file.seek(0)
|
(1) Event Sigma main event: |
The Python application enables entity expansion by setting the `lxml.etree.XMLParser` value `resolve_entities` to `true` or `internal` (the default value). If untrusted XML is parsed with entity expansion enabled, a malicious attacker could submit a document that contains very deeply nested entity definitions (known as a Billion Laughs Attack), causing the parser to use large amounts of memory and processing power resulting in a denial of service (DoS) condition. |
|
(2) Event remediation: |
Explicitly set `resolve_entities` argument to `False`. |
74 cib_tree = etree.parse(cib_file, etree.XMLParser(huge_tree=True)).getroot()
75 element = cib_tree.find(f'.//*[@id="{resource_id}"]')
76 final_master_id = (
77 master_id if master_id is not None else f"{resource_id}-master"
78 )
79 master_element = _xml_to_element(
80 f"""
81 <master id="{final_master_id}">
82 </master>
83 """
84 )
85 element.getparent().append(master_element)
86 master_element.append(element)
87 final_xml = etree_to_str(cib_tree)
88
89 environ = dict(os.environ)
90 environ["CIB_file"] = cib_file.name
91 runner = CommandRunner(
92 mock.MagicMock(logging.Logger), MockLibraryReportProcessor(), environ
93 )
94 stdout, stderr, retval = runner.run(
95 [
96 settings.cibadmin_exec,
97 "--replace",
98 "--scope",
99 "resources",
100 "--xml-pipe",
101 ],
102 stdin_string=final_xml,
103 )
104 assert retval == 0, (
105 "Error running wrap_element_by_master:\n" + stderr + "\n" + stdout
106 )
107
108
109 def wrap_element_by_master_file(filepath, resource_id, master_id=None):
110 cib_tmp = get_tmp_file("wrap_by_master")
111 write_file_to_tmpfile(filepath, cib_tmp)
112 wrap_element_by_master(cib_tmp, resource_id, master_id=master_id)
113 cib_tmp.seek(0)
114 with open(filepath, "w") as target:
115 target.write(cib_tmp.read())
116 cib_tmp.close()
117
118
119 def fixture_master_xml(name, all_ops=True, meta_dict=None):
120 default_ops = f"""
121 <op id="{name}-notify-interval-0s" interval="0s" name="notify"
122 timeout="5"
123 />
124 <op id="{name}-start-interval-0s" interval="0s" name="start"
125 timeout="20"
126 />
127 <op id="{name}-stop-interval-0s" interval="0s" name="stop"
128 timeout="20"
129 />
130 """
131 meta_xml = ""
132 if meta_dict:
133 meta_lines = (
134 [f'<meta_attributes id="{name}-master-meta_attributes">']
135 + [
136 f'<nvpair id="{name}-master-meta_attributes-{key}" name="{key}" value="{val}"/>'
137 for key, val in meta_dict.items()
138 ]
139 + ["</meta_attributes>"]
140 )
141 meta_xml = "\n".join(meta_lines)
142 master = f"""
143 <master id="{name}-master">
144 <primitive class="ocf" id="{name}" provider="pcsmock" type="stateful">
145 <operations>
146 <op id="{name}-monitor-interval-10" interval="10" name="monitor"
147 role="Master" timeout="20"
148 />
149 <op id="{name}-monitor-interval-11" interval="11" name="monitor"
150 role="Slave" timeout="20"
151 />
152 """
153 if all_ops:
154 master += default_ops
155 master += f"""
156 </operations>
157 </primitive>
158 {meta_xml}
159 </master>
160 """
161 return master
162
163
164 def fixture_to_cib(cib_file, xml):
165 environ = dict(os.environ)
166 environ["CIB_file"] = cib_file
167 runner = CommandRunner(
168 mock.MagicMock(logging.Logger), MockLibraryReportProcessor(), environ
169 )
170 stdout, stderr, retval = runner.run(
171 [
172 settings.cibadmin_exec,
173 "--create",
174 "--scope",
175 "resources",
176 "--xml-text",
177 xml,
178 ]
179 )
180 assert retval == 0, (
181 "Error running fixture_to_cib:\n" + stderr + "\n" + stdout
182 )
183
184
185 def _replace(element_to_replace, new_element):
186 parent = element_to_replace.getparent()
187 for child in parent:
188 if element_to_replace == child:
189 index = list(parent).index(child)
190 parent.remove(child)
191 parent.insert(index, new_element)
192 return
193
194
195 def _xml_to_element(xml):
196 try:
197 new_element = etree.fromstring(xml)
198 except etree.XMLSyntaxError as e:
199 raise AssertionError(
200 "Cannot put to the cib a non-xml fragment:\n'{0}'".format(xml)
201 ) from e
202 return new_element
203
204
205 def _find_all_in(cib_tree, element_xpath):
206 element_list = cib_tree.xpath(element_xpath)
207 if not element_list:
208 raise AssertionError(
209 "Cannot find '{0}' in given cib:\n{1}".format(
210 element_xpath, etree_to_str(cib_tree)
211 )
212 )
213 return element_list
214
215
216 def _find_in(cib_tree, element_xpath):
217 element_list = _find_all_in(cib_tree, element_xpath)
218 if len(element_list) > 1:
219 raise AssertionError(
220 "Found more than one '{0}' in given cib:\n{1}".format(
221 element_xpath, etree_to_str(cib_tree)
222 )
223 )
224 return element_list[0]
225
226
227 def remove(element_xpath):
228 def _remove(cib_tree):
229 xpath_list = (
230 [element_xpath] if isinstance(element_xpath, str) else element_xpath
231 )
232 for xpath in xpath_list:
233 for element_to_remove in _find_all_in(cib_tree, xpath):
234 element_to_remove.getparent().remove(element_to_remove)
235
236 return _remove
237
238
239 def put_or_replace(parent_xpath, new_content):
240 # This transformation makes sense in "configuration" section only. In this
241 # section there are sub-tags (optional or mandatory) that can occur max 1x.
242 #
243 # In other sections it is possible to have more occurrences of sub-tags. For
244 # such cases it is better to use `replace_all` - the difference is that in
245 # `replace_all` the element to be replaced is specified by full xpath
246 # whilst in `put_or_replace` the xpath to the parent element is specified.
247 def replace_optional(cib_tree):
248 element = _xml_to_element(new_content)
249 parent = _find_in(cib_tree, parent_xpath)
250 current_elements = parent.findall(element.tag)
251
252 if len(current_elements) > 1:
253 raise _cannot_multireplace(element.tag, parent_xpath, cib_tree)
254
255 if current_elements:
256 _replace(current_elements[0], element)
257 else:
258 parent.append(element)
259
260 return replace_optional
261
262
263 def replace_all(replacements):
264 """
265 Return a function that replace more elements (defined by replacement_dict)
266 in the cib_tree with new_content.
267
268 dict replacemens -- contains more replacements:
269 key is xpath - its destination must be one element: replacement is
270 applied only on the first occurrence
271 value is new content -contains a content that have to be placed instead
272 of an element found by element_xpath
273 """
274
275 def replace(cib_tree):
276 for xpath, new_content in replacements.items():
277 _replace(_find_in(cib_tree, xpath), _xml_to_element(new_content))
278
279 return replace
280
281
282 def append_all(append_map):
283 """
284 Return a function that appends more elements after specified (xpath) element
285 dict append_map -- a key is an xpath pointing to a target element (for
286 appending), value is appended content
287 """
288
289 def append(cib_tree):
290 for xpath, new_content in append_map.items():
291 _find_in(cib_tree, xpath).append(_xml_to_element(new_content))
292
293 return append
294
295
296 # Possible modifier shortcuts are defined here.
297 # Keep in mind that every key will be named parameter in config function
298 # (see modifier_shortcuts param in some of pcs_test.tools.command_env.config_*
299 # modules)
300 #
301 # DO NOT USE CONFLICTING KEYS HERE!
302 # 1) args of pcs_test.tools.command_env.calls#CallListBuilder.place:
303 # name, before, instead
304 # 2) args of pcs_test.tools.command_env.mock_runner#Call.__init__
305 # command, stdout, stderr, returncode, check_stdin
306 # 3) special args of pcs_test.tools.command_env.config_*
307 # modifiers, filename, load_key, wait, exception
308 # It would be not applied. Not even mention that the majority of these names do
309 # not make sense for a cib modifying ;)
310 MODIFIER_GENERATORS = {
311 "remove": remove,
312 "replace": replace_all,
313 "append": append_all,
314 "resources": lambda xml: replace_all({"./configuration/resources": xml}),
315 "nodes": lambda xml: replace_all({"./configuration/nodes": xml}),
316 "constraints": lambda xml: replace_all(
317 {"./configuration/constraints": xml}
318 ),
319 "crm_config": lambda xml: replace_all({"./configuration/crm_config": xml}),
320 "fencing_topology": lambda xml: put_or_replace("./configuration", xml),
321 "status": lambda xml: put_or_replace(".", xml),
322 "tags": lambda xml: put_or_replace("./configuration", xml),
323 "optional_in_conf": lambda xml: put_or_replace("./configuration", xml),
324 # common modifier `put_or_replace` makes not sense - see explanation inside
325 # this function - all occurrences should be satisfied by `optional_in_conf`
326 }
327
328
329 def create_modifiers(**modifier_shortcuts):
330 """
331 Return list of modifiers: list of functions that transform cib
332
333 dict modifier_shortcuts -- a new modifier is generated from each modifier
334 shortcut.
335 As key there can be keys of MODIFIER_GENERATORS.
336 Value is passed into appropriate generator from MODIFIER_GENERATORS.
337
338 """
339 unknown_shortcuts = set(modifier_shortcuts.keys()) - set(
340 MODIFIER_GENERATORS.keys()
341 )
342 if unknown_shortcuts:
343 raise AssertionError(
344 "Unknown modifier shortcuts '{0}', available are: '{1}'".format(
345 "', '".join(list(unknown_shortcuts)),
346 "', '".join(MODIFIER_GENERATORS.keys()),
347 )
348 )
349
350 return [
351 MODIFIER_GENERATORS[name](param)
352 for name, param in modifier_shortcuts.items()
353 ]
354
355
356 def modify_cib(cib_xml, modifiers=None, **modifier_shortcuts):
357 """
358 Apply modifiers to cib_xml and return the result cib_xml
359
360 string cib_xml -- initial cib
361 list of callable modifiers -- each takes cib (etree.Element)
362 dict modifier_shortcuts -- a new modifier is generated from each modifier
363 shortcut.
364 As key there can be keys of MODIFIER_GENERATORS.
365 Value is passed into appropriate generator from MODIFIER_GENERATORS.
366 """
367 modifiers = modifiers if modifiers else []
368 all_modifiers = modifiers + create_modifiers(**modifier_shortcuts)
369
370 if not all_modifiers:
371 return cib_xml
372
373 cib_tree = etree.fromstring(cib_xml)
374 for modify in all_modifiers:
375 modify(cib_tree)
376
377 return etree_to_str(cib_tree)
378
379
380 def modify_cib_file(file_path, **modifiers_shortcuts):
381 with open(file_path, "r") as file:
382 return modify_cib(file.read(), **modifiers_shortcuts)
383
384
385 def _cannot_multireplace(tag, parent_xpath, cib_tree):
386 return AssertionError(
387 (
388 "Cannot replace '{element}' in '{parent}' because '{parent}'"
389 " contains more than one '{element}' in given cib:\n{cib}"
390 ).format(element=tag, parent=parent_xpath, cib=etree_to_str(cib_tree))
391 )
392