1    	import logging
2    	import os
3    	from unittest import mock
4    	
5    	from lxml import etree
6    	
7    	from pcs import settings
8    	from pcs.lib.external import CommandRunner
9    	
10   	from pcs_test.tools.assertions import AssertPcsMixin
11   	from pcs_test.tools.bin_mock import get_mock_settings
12   	from pcs_test.tools.custom_mock import MockLibraryReportProcessor
13   	from pcs_test.tools.misc import (
14   	    get_test_resource,
15   	    get_tmp_file,
16   	    write_file_to_tmpfile,
17   	)
18   	from pcs_test.tools.pcs_runner import PcsRunner
19   	from pcs_test.tools.xml import etree_to_str
20   	
21   	
22   	class CachedCibFixture(AssertPcsMixin):
23   	    def __init__(self, cache_name, empty_cib_path):
24   	        self._empty_cib_path = empty_cib_path
25   	        self._cache_name = cache_name
26   	        self._cache_path = None
27   	        self._pcs_runner = None
28   	
29   	    def _setup_cib(self):
30   	        raise NotImplementedError()
31   	
32   	    def set_up(self):
33   	        fixture_dir = get_test_resource("temp_fixtures")
34   	        os.makedirs(fixture_dir, exist_ok=True)
35   	        self._cache_path = os.path.join(fixture_dir, self._cache_name)
36   	        self._pcs_runner = PcsRunner(self._cache_path)
37   	        self._pcs_runner.mock_settings = get_mock_settings()
38   	
39   	        with (
40   	            open(self._empty_cib_path, "r") as template_file,
41   	            open(self.cache_path, "w") as cache_file,
42   	        ):
43   	            cache_file.write(template_file.read())
44   	        self._setup_cib()
45   	
46   	    def clean_up(self):
47   	        if os.path.isfile(self.cache_path):
48   	            os.unlink(self.cache_path)
49   	
50   	    @property
51   	    def cache_path(self):
52   	        if self._cache_path is None:
53   	            raise AssertionError("Cache has not been initialized")
54   	        return self._cache_path
55   	
56   	    # methods for supporting assert_pcs_success
57   	    @property
58   	    def pcs_runner(self):
59   	        if self._pcs_runner is None:
60   	            raise AssertionError("Cache has not been initialized")
61   	        return self._pcs_runner
62   	
63   	    def assertEqual(self, first, second, msg=None):
64   	        # pylint: disable=invalid-name
65   	        # pylint: disable=no-self-use
66   	        if first != second:
67   	            raise AssertionError(
68   	                f"{msg}\n{first} != {second}" if msg else f"{first} != {second}"
69   	            )
70   	
71   	
72   	def wrap_element_by_master(cib_file, resource_id, master_id=None):
73   	    cib_file.seek(0)
(1) Event Sigma main event: The Python application enables entity expansion by setting the `lxml.etree.XMLParser` value `resolve_entities` to `true` or `internal` (the default value). If untrusted XML is parsed with entity expansion enabled, a malicious attacker could submit a document that contains very deeply nested entity definitions (known as a Billion Laughs Attack), causing the parser to use large amounts of memory and processing power resulting in a denial of service (DoS) condition.
(2) Event remediation: Explicitly set `resolve_entities` argument to `False`.
74   	    cib_tree = etree.parse(cib_file, etree.XMLParser(huge_tree=True)).getroot()
75   	    element = cib_tree.find(f'.//*[@id="{resource_id}"]')
76   	    final_master_id = (
77   	        master_id if master_id is not None else f"{resource_id}-master"
78   	    )
79   	    master_element = _xml_to_element(
80   	        f"""
81   	        <master id="{final_master_id}">
82   	        </master>
83   	    """
84   	    )
85   	    element.getparent().append(master_element)
86   	    master_element.append(element)
87   	    final_xml = etree_to_str(cib_tree)
88   	
89   	    environ = dict(os.environ)
90   	    environ["CIB_file"] = cib_file.name
91   	    runner = CommandRunner(
92   	        mock.MagicMock(logging.Logger), MockLibraryReportProcessor(), environ
93   	    )
94   	    stdout, stderr, retval = runner.run(
95   	        [
96   	            settings.cibadmin_exec,
97   	            "--replace",
98   	            "--scope",
99   	            "resources",
100  	            "--xml-pipe",
101  	        ],
102  	        stdin_string=final_xml,
103  	    )
104  	    assert retval == 0, (
105  	        "Error running wrap_element_by_master:\n" + stderr + "\n" + stdout
106  	    )
107  	
108  	
109  	def wrap_element_by_master_file(filepath, resource_id, master_id=None):
110  	    cib_tmp = get_tmp_file("wrap_by_master")
111  	    write_file_to_tmpfile(filepath, cib_tmp)
112  	    wrap_element_by_master(cib_tmp, resource_id, master_id=master_id)
113  	    cib_tmp.seek(0)
114  	    with open(filepath, "w") as target:
115  	        target.write(cib_tmp.read())
116  	    cib_tmp.close()
117  	
118  	
119  	def fixture_master_xml(name, all_ops=True, meta_dict=None):
120  	    default_ops = f"""
121  	            <op id="{name}-notify-interval-0s" interval="0s" name="notify"
122  	                timeout="5"
123  	            />
124  	            <op id="{name}-start-interval-0s" interval="0s" name="start"
125  	                timeout="20"
126  	            />
127  	            <op id="{name}-stop-interval-0s" interval="0s" name="stop"
128  	                timeout="20"
129  	            />
130  	    """
131  	    meta_xml = ""
132  	    if meta_dict:
133  	        meta_lines = (
134  	            [f'<meta_attributes id="{name}-master-meta_attributes">']
135  	            + [
136  	                f'<nvpair id="{name}-master-meta_attributes-{key}" name="{key}" value="{val}"/>'
137  	                for key, val in meta_dict.items()
138  	            ]
139  	            + ["</meta_attributes>"]
140  	        )
141  	        meta_xml = "\n".join(meta_lines)
142  	    master = f"""
143  	      <master id="{name}-master">
144  	        <primitive class="ocf" id="{name}" provider="pcsmock" type="stateful">
145  	          <operations>
146  	            <op id="{name}-monitor-interval-10" interval="10" name="monitor"
147  	                role="Master" timeout="20"
148  	            />
149  	            <op id="{name}-monitor-interval-11" interval="11" name="monitor"
150  	                role="Slave" timeout="20"
151  	            />
152  	    """
153  	    if all_ops:
154  	        master += default_ops
155  	    master += f"""
156  	          </operations>
157  	        </primitive>
158  	        {meta_xml}
159  	      </master>
160  	    """
161  	    return master
162  	
163  	
164  	def fixture_to_cib(cib_file, xml):
165  	    environ = dict(os.environ)
166  	    environ["CIB_file"] = cib_file
167  	    runner = CommandRunner(
168  	        mock.MagicMock(logging.Logger), MockLibraryReportProcessor(), environ
169  	    )
170  	    stdout, stderr, retval = runner.run(
171  	        [
172  	            settings.cibadmin_exec,
173  	            "--create",
174  	            "--scope",
175  	            "resources",
176  	            "--xml-text",
177  	            xml,
178  	        ]
179  	    )
180  	    assert retval == 0, (
181  	        "Error running fixture_to_cib:\n" + stderr + "\n" + stdout
182  	    )
183  	
184  	
185  	def _replace(element_to_replace, new_element):
186  	    parent = element_to_replace.getparent()
187  	    for child in parent:
188  	        if element_to_replace == child:
189  	            index = list(parent).index(child)
190  	            parent.remove(child)
191  	            parent.insert(index, new_element)
192  	            return
193  	
194  	
195  	def _xml_to_element(xml):
196  	    try:
197  	        new_element = etree.fromstring(xml)
198  	    except etree.XMLSyntaxError as e:
199  	        raise AssertionError(
200  	            "Cannot put to the cib a non-xml fragment:\n'{0}'".format(xml)
201  	        ) from e
202  	    return new_element
203  	
204  	
205  	def _find_all_in(cib_tree, element_xpath):
206  	    element_list = cib_tree.xpath(element_xpath)
207  	    if not element_list:
208  	        raise AssertionError(
209  	            "Cannot find '{0}' in given cib:\n{1}".format(
210  	                element_xpath, etree_to_str(cib_tree)
211  	            )
212  	        )
213  	    return element_list
214  	
215  	
216  	def _find_in(cib_tree, element_xpath):
217  	    element_list = _find_all_in(cib_tree, element_xpath)
218  	    if len(element_list) > 1:
219  	        raise AssertionError(
220  	            "Found more than one '{0}' in given cib:\n{1}".format(
221  	                element_xpath, etree_to_str(cib_tree)
222  	            )
223  	        )
224  	    return element_list[0]
225  	
226  	
227  	def remove(element_xpath):
228  	    def _remove(cib_tree):
229  	        xpath_list = (
230  	            [element_xpath] if isinstance(element_xpath, str) else element_xpath
231  	        )
232  	        for xpath in xpath_list:
233  	            for element_to_remove in _find_all_in(cib_tree, xpath):
234  	                element_to_remove.getparent().remove(element_to_remove)
235  	
236  	    return _remove
237  	
238  	
239  	def put_or_replace(parent_xpath, new_content):
240  	    # This transformation makes sense in "configuration" section only. In this
241  	    # section there are sub-tags (optional or mandatory) that can occur max 1x.
242  	    #
243  	    # In other sections it is possible to have more occurrences of sub-tags. For
244  	    # such cases it is better to use `replace_all` - the difference is that in
245  	    # `replace_all` the element to be replaced is specified by full xpath
246  	    # whilst in `put_or_replace` the xpath to the parent element is specified.
247  	    def replace_optional(cib_tree):
248  	        element = _xml_to_element(new_content)
249  	        parent = _find_in(cib_tree, parent_xpath)
250  	        current_elements = parent.findall(element.tag)
251  	
252  	        if len(current_elements) > 1:
253  	            raise _cannot_multireplace(element.tag, parent_xpath, cib_tree)
254  	
255  	        if current_elements:
256  	            _replace(current_elements[0], element)
257  	        else:
258  	            parent.append(element)
259  	
260  	    return replace_optional
261  	
262  	
263  	def replace_all(replacements):
264  	    """
265  	    Return a function that replace more elements (defined by replacement_dict)
266  	    in the cib_tree with new_content.
267  	
268  	    dict replacemens -- contains more replacements:
269  	        key is xpath - its destination must be one element: replacement is
270  	        applied only on the first occurrence
271  	        value is new content -contains a content that have to be placed instead
272  	        of an element found by element_xpath
273  	    """
274  	
275  	    def replace(cib_tree):
276  	        for xpath, new_content in replacements.items():
277  	            _replace(_find_in(cib_tree, xpath), _xml_to_element(new_content))
278  	
279  	    return replace
280  	
281  	
282  	def append_all(append_map):
283  	    """
284  	    Return a function that appends more elements after specified (xpath) element
285  	    dict append_map -- a key is an xpath pointing to a target element (for
286  	        appending), value is appended content
287  	    """
288  	
289  	    def append(cib_tree):
290  	        for xpath, new_content in append_map.items():
291  	            _find_in(cib_tree, xpath).append(_xml_to_element(new_content))
292  	
293  	    return append
294  	
295  	
296  	# Possible modifier shortcuts are defined here.
297  	# Keep in mind that every key will be named parameter in config function
298  	# (see modifier_shortcuts param in some of pcs_test.tools.command_env.config_*
299  	# modules)
300  	#
301  	# DO NOT USE CONFLICTING KEYS HERE!
302  	# 1) args of pcs_test.tools.command_env.calls#CallListBuilder.place:
303  	#  name, before, instead
304  	# 2) args of pcs_test.tools.command_env.mock_runner#Call.__init__
305  	#  command, stdout, stderr, returncode, check_stdin
306  	# 3) special args of pcs_test.tools.command_env.config_*
307  	#  modifiers, filename, load_key, wait, exception
308  	# It would be not applied. Not even mention that the majority of these names do
309  	# not make sense for a cib modifying ;)
310  	MODIFIER_GENERATORS = {
311  	    "remove": remove,
312  	    "replace": replace_all,
313  	    "append": append_all,
314  	    "resources": lambda xml: replace_all({"./configuration/resources": xml}),
315  	    "nodes": lambda xml: replace_all({"./configuration/nodes": xml}),
316  	    "constraints": lambda xml: replace_all(
317  	        {"./configuration/constraints": xml}
318  	    ),
319  	    "crm_config": lambda xml: replace_all({"./configuration/crm_config": xml}),
320  	    "fencing_topology": lambda xml: put_or_replace("./configuration", xml),
321  	    "status": lambda xml: put_or_replace(".", xml),
322  	    "tags": lambda xml: put_or_replace("./configuration", xml),
323  	    "optional_in_conf": lambda xml: put_or_replace("./configuration", xml),
324  	    # common modifier `put_or_replace` makes not sense - see explanation inside
325  	    # this function - all occurrences should be satisfied by `optional_in_conf`
326  	}
327  	
328  	
329  	def create_modifiers(**modifier_shortcuts):
330  	    """
331  	    Return list of modifiers: list of functions that transform cib
332  	
333  	    dict modifier_shortcuts -- a new modifier is generated from each modifier
334  	        shortcut.
335  	        As key there can be keys of MODIFIER_GENERATORS.
336  	        Value is passed into appropriate generator from MODIFIER_GENERATORS.
337  	
338  	    """
339  	    unknown_shortcuts = set(modifier_shortcuts.keys()) - set(
340  	        MODIFIER_GENERATORS.keys()
341  	    )
342  	    if unknown_shortcuts:
343  	        raise AssertionError(
344  	            "Unknown modifier shortcuts '{0}', available are: '{1}'".format(
345  	                "', '".join(list(unknown_shortcuts)),
346  	                "', '".join(MODIFIER_GENERATORS.keys()),
347  	            )
348  	        )
349  	
350  	    return [
351  	        MODIFIER_GENERATORS[name](param)
352  	        for name, param in modifier_shortcuts.items()
353  	    ]
354  	
355  	
356  	def modify_cib(cib_xml, modifiers=None, **modifier_shortcuts):
357  	    """
358  	    Apply modifiers to cib_xml and return the result cib_xml
359  	
360  	    string cib_xml -- initial cib
361  	    list of callable modifiers -- each takes cib (etree.Element)
362  	    dict modifier_shortcuts -- a new modifier is generated from each modifier
363  	        shortcut.
364  	        As key there can be keys of MODIFIER_GENERATORS.
365  	        Value is passed into appropriate generator from MODIFIER_GENERATORS.
366  	    """
367  	    modifiers = modifiers if modifiers else []
368  	    all_modifiers = modifiers + create_modifiers(**modifier_shortcuts)
369  	
370  	    if not all_modifiers:
371  	        return cib_xml
372  	
373  	    cib_tree = etree.fromstring(cib_xml)
374  	    for modify in all_modifiers:
375  	        modify(cib_tree)
376  	
377  	    return etree_to_str(cib_tree)
378  	
379  	
380  	def modify_cib_file(file_path, **modifiers_shortcuts):
381  	    with open(file_path, "r") as file:
382  	        return modify_cib(file.read(), **modifiers_shortcuts)
383  	
384  	
385  	def _cannot_multireplace(tag, parent_xpath, cib_tree):
386  	    return AssertionError(
387  	        (
388  	            "Cannot replace '{element}' in '{parent}' because '{parent}'"
389  	            " contains more than one '{element}' in given cib:\n{cib}"
390  	        ).format(element=tag, parent=parent_xpath, cib=etree_to_str(cib_tree))
391  	    )
392