1    	# pylint: disable=too-many-lines
2    	import sys
3    	import xml.dom.minidom
4    	from io import StringIO
5    	from time import sleep
6    	from unittest import (
7    	    TestCase,
8    	    mock,
9    	)
10   	
11   	from pcs import utils
12   	from pcs.common import const
13   	
14   	from pcs_test.tools.misc import get_test_resource as rc
15   	from pcs_test.tools.xml import dom_get_child_elements
16   	
17   	cib_with_nodes = rc("cib-empty-withnodes.xml")
18   	empty_cib = rc("cib-empty.xml")
19   	
20   	TestCase.maxDiff = None
21   	
22   	
23   	class UtilsTest(TestCase):
24   	    # pylint: disable=too-many-public-methods
25   	    @staticmethod
26   	    def get_cib_empty():
27   	        return xml.dom.minidom.parse(empty_cib)
28   	
29   	    @staticmethod
30   	    def get_cib_with_nodes_minidom():
31   	        return xml.dom.minidom.parse(cib_with_nodes)
32   	
33   	    def get_cib_resources(self):
34   	        cib_dom = self.get_cib_empty()
35   	        new_resources = xml.dom.minidom.parseString(
36   	            """
37   	            <resources>
38   	                  <primitive id="myResource"
39   	                        class="ocf" provider="heartbeat" type="Dummy">
40   	                  </primitive>
41   	                  <clone id="myClone">
42   	                      <primitive id="myClonedResource"
43   	                          class="ocf" provider="heartbeat" type="Dummy">
44   	                      </primitive>
45   	                  </clone>
46   	                  <clone id="myUniqueClone">
47   	                      <primitive id="myUniqueClonedResource"
48   	                          class="ocf" provider="heartbeat" type="Dummy">
49   	                      </primitive>
50   	                      <meta-attributes>
51   	                        <nvpair name="globally-unique" value="true" />
52   	                      </meta-attributes>
53   	                  </clone>
54   	                  <master id="myMaster">
55   	                      <primitive id="myMasteredResource"
56   	                            class="ocf" provider="heartbeat" type="Dummy">
57   	                      </primitive>
58   	                  </master>
59   	                  <group id="myGroup">
60   	                      <primitive id="myGroupedResource"
61   	                            class="ocf" provider="heartbeat" type="Dummy">
62   	                      </primitive>
63   	                  </group>
64   	                  <clone id="myGroupClone">
65   	                      <group id="myClonedGroup">
66   	                          <primitive id="myClonedGroupedResource"
67   	                                class="ocf" provider="heartbeat" type="Dummy">
68   	                          </primitive>
69   	                      </group>
70   	                  </clone>
71   	                  <master id="myGroupMaster">
72   	                      <group id="myMasteredGroup">
73   	                          <primitive id="myMasteredGroupedResource"
74   	                                class="ocf" provider="heartbeat" type="Dummy">
75   	                          </primitive>
76   	                      </group>
77   	                  </master>
78   	                  <bundle id="myBundle">
79   	                      <primitive id="myBundledResource"
80   	                          class="ocf" provider="heartbeat" type="Dummy" />
81   	                  </bundle>
82   	                  <bundle id="myEmptyBundle"/>
83   	            </resources>
84   	        """
85   	        ).documentElement
86   	        resources = cib_dom.getElementsByTagName("resources")[0]
87   	        resources.parentNode.replaceChild(new_resources, resources)
88   	        return cib_dom
89   	
90   	    def test_dom_get_resources(self):  # noqa: PLR0915
91   	        # pylint: disable=too-many-statements
92   	        def test_dom_get(method, dom, ok_ids, bad_ids):
93   	            for element_id in ok_ids:
94   	                self.assert_element_id(method(dom, element_id), element_id)
95   	            for element_id in bad_ids:
96   	                self.assertFalse(method(dom, element_id))
97   	
98   	        cib_dom = self.get_cib_empty()
99   	        self.assertFalse(utils.dom_get_resource(cib_dom, "myResource"))
100  	        self.assertFalse(
101  	            utils.dom_get_resource_clone(cib_dom, "myClonedResource")
102  	        )
103  	        self.assertFalse(
104  	            utils.dom_get_resource_masterslave(cib_dom, "myMasteredResource")
105  	        )
106  	        self.assertFalse(utils.dom_get_group(cib_dom, "myGroup"))
107  	        self.assertFalse(utils.dom_get_group_clone(cib_dom, "myClonedGroup"))
108  	        self.assertFalse(
109  	            utils.dom_get_group_masterslave(cib_dom, "myMasteredGroup")
110  	        )
111  	        self.assertFalse(utils.dom_get_clone(cib_dom, "myClone"))
112  	        self.assertFalse(utils.dom_get_master(cib_dom, "myMaster"))
113  	        self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myClone"))
114  	        self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myMaster"))
115  	        self.assertFalse(
116  	            utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource")
117  	        )
118  	        self.assertFalse(
119  	            utils.dom_get_resource_clone_ms_parent(
120  	                cib_dom, "myMasteredResource"
121  	            )
122  	        )
123  	        self.assertIsNone(utils.dom_get_bundle(cib_dom, "myResource"))
124  	        self.assertIsNone(utils.dom_get_bundle(cib_dom, "notExisting"))
125  	        self.assertIsNone(
126  	            utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource")
127  	        )
128  	
129  	        cib_dom = self.get_cib_resources()
130  	        all_ids = {
131  	            "none",
132  	            "myResource",
133  	            "myClone",
134  	            "myClonedResource",
135  	            "myUniqueClone",
136  	            "myUniqueClonedResource",
137  	            "myMaster",
138  	            "myMasteredResource",
139  	            "myGroup",
140  	            "myGroupedResource",
141  	            "myGroupClone",
142  	            "myClonedGroup",
143  	            "myClonedGroupedResource",
144  	            "myGroupMaster",
145  	            "myMasteredGroup",
146  	            "myMasteredGroupedResource",
147  	            "myBundledResource",
148  	            "myBundle",
149  	            "myEmptyBundle",
150  	        }
151  	
152  	        resource_ids = {
153  	            "myResource",
154  	            "myClonedResource",
155  	            "myUniqueClonedResource",
156  	            "myGroupedResource",
157  	            "myMasteredResource",
158  	            "myClonedGroupedResource",
159  	            "myMasteredGroupedResource",
160  	            "myBundledResource",
161  	        }
162  	        test_dom_get(
163  	            utils.dom_get_resource,
164  	            cib_dom,
165  	            resource_ids,
166  	            all_ids - resource_ids,
167  	        )
168  	
169  	        cloned_ids = {
170  	            "myClonedResource",
171  	            "myUniqueClonedResource",
172  	            "myClonedGroupedResource",
173  	        }
174  	        test_dom_get(
175  	            utils.dom_get_resource_clone,
176  	            cib_dom,
177  	            cloned_ids,
178  	            all_ids - cloned_ids,
179  	        )
180  	
181  	        mastered_ids = {"myMasteredResource", "myMasteredGroupedResource"}
182  	        test_dom_get(
183  	            utils.dom_get_resource_masterslave,
184  	            cib_dom,
185  	            mastered_ids,
186  	            all_ids - mastered_ids,
187  	        )
188  	
189  	        group_ids = {"myGroup", "myClonedGroup", "myMasteredGroup"}
190  	        test_dom_get(
191  	            utils.dom_get_group, cib_dom, group_ids, all_ids - group_ids
192  	        )
193  	
194  	        cloned_group_ids = {"myClonedGroup"}
195  	        test_dom_get(
196  	            utils.dom_get_group_clone,
197  	            cib_dom,
198  	            cloned_group_ids,
199  	            all_ids - cloned_group_ids,
200  	        )
201  	
202  	        clone_ids = {"myClone", "myUniqueClone", "myGroupClone"}
203  	        test_dom_get(
204  	            utils.dom_get_clone, cib_dom, clone_ids, all_ids - clone_ids
205  	        )
206  	
207  	        mastered_group_ids = {"myMasteredGroup"}
208  	        test_dom_get(
209  	            utils.dom_get_group_masterslave,
210  	            cib_dom,
211  	            mastered_group_ids,
212  	            all_ids - mastered_group_ids,
213  	        )
214  	
215  	        master_ids = {"myMaster", "myGroupMaster"}
216  	        test_dom_get(
217  	            utils.dom_get_master, cib_dom, master_ids, all_ids - master_ids
218  	        )
219  	
220  	        bundle_ids = {"myBundle", "myEmptyBundle"}
221  	        test_dom_get(
222  	            utils.dom_get_bundle, cib_dom, bundle_ids, all_ids - bundle_ids
223  	        )
224  	
225  	        self.assert_element_id(
226  	            utils.dom_get_clone_ms_resource(cib_dom, "myClone"),
227  	            "myClonedResource",
228  	        )
229  	        self.assert_element_id(
230  	            utils.dom_get_clone_ms_resource(cib_dom, "myGroupClone"),
231  	            "myClonedGroup",
232  	        )
233  	        self.assert_element_id(
234  	            utils.dom_get_clone_ms_resource(cib_dom, "myMaster"),
235  	            "myMasteredResource",
236  	        )
237  	        self.assert_element_id(
238  	            utils.dom_get_clone_ms_resource(cib_dom, "myGroupMaster"),
239  	            "myMasteredGroup",
240  	        )
241  	
242  	        self.assert_element_id(
243  	            utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource"),
244  	            "myClone",
245  	        )
246  	        self.assert_element_id(
247  	            utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedGroup"),
248  	            "myGroupClone",
249  	        )
250  	        self.assert_element_id(
251  	            utils.dom_get_resource_clone_ms_parent(
252  	                cib_dom, "myClonedGroupedResource"
253  	            ),
254  	            "myGroupClone",
255  	        )
256  	        self.assert_element_id(
257  	            utils.dom_get_resource_clone_ms_parent(
258  	                cib_dom, "myMasteredResource"
259  	            ),
260  	            "myMaster",
261  	        )
262  	        self.assert_element_id(
263  	            utils.dom_get_resource_clone_ms_parent(cib_dom, "myMasteredGroup"),
264  	            "myGroupMaster",
265  	        )
266  	        self.assert_element_id(
267  	            utils.dom_get_resource_clone_ms_parent(
268  	                cib_dom, "myMasteredGroupedResource"
269  	            ),
270  	            "myGroupMaster",
271  	        )
272  	        self.assertEqual(
273  	            None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myResource")
274  	        )
275  	        self.assertEqual(
276  	            None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myGroup")
277  	        )
278  	        self.assertEqual(
279  	            None,
280  	            utils.dom_get_resource_clone_ms_parent(
281  	                cib_dom, "myGroupedResource"
282  	            ),
283  	        )
284  	
285  	        self.assertIsNone(
286  	            utils.dom_get_resource_bundle(
287  	                utils.dom_get_bundle(cib_dom, "myEmptyBundle")
288  	            )
289  	        )
290  	        self.assert_element_id(
291  	            utils.dom_get_resource_bundle(
292  	                utils.dom_get_bundle(cib_dom, "myBundle")
293  	            ),
294  	            "myBundledResource",
295  	            "primitive",
296  	        )
297  	
298  	        self.assert_element_id(
299  	            utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource"),
300  	            "myBundle",
301  	        )
302  	        self.assertIsNone(
303  	            utils.dom_get_resource_bundle_parent(cib_dom, "myResource")
304  	        )
305  	        self.assertIsNone(
306  	            utils.dom_get_resource_bundle_parent(cib_dom, "myClone")
307  	        )
308  	        self.assertIsNone(
309  	            utils.dom_get_resource_bundle_parent(cib_dom, "myClonedResource")
310  	        )
311  	        self.assertIsNone(
312  	            utils.dom_get_resource_bundle_parent(cib_dom, "myMaster")
313  	        )
314  	        self.assertIsNone(
315  	            utils.dom_get_resource_bundle_parent(cib_dom, "myMasteredGroup")
316  	        )
317  	        self.assertIsNone(
318  	            utils.dom_get_resource_bundle_parent(cib_dom, "myGroup")
319  	        )
320  	        self.assertIsNone(
321  	            utils.dom_get_resource_bundle_parent(cib_dom, "myGroupedResource")
322  	        )
323  	        self.assertIsNone(
324  	            utils.dom_get_resource_bundle_parent(cib_dom, "myGroupClone")
325  	        )
326  	        self.assertIsNone(
327  	            utils.dom_get_resource_bundle_parent(cib_dom, "myClonedGroup")
328  	        )
329  	        self.assertIsNone(
330  	            utils.dom_get_resource_bundle_parent(
331  	                cib_dom, "myClonedGroupedResource"
332  	            )
333  	        )
334  	
335  	    def test_dom_get_resource_remote_node_name(self):
336  	        dom = self.get_cib_empty()
337  	        new_resources = xml.dom.minidom.parseString(
338  	            """
339  	            <resources>
340  	                <primitive id="dummy1"
341  	                        class="ocf" provider="heartbeat" type="Dummy">
342  	                </primitive>
343  	                <primitive class="ocf" id="vm-guest1" provider="heartbeat"
344  	                        type="VirtualDomain">
345  	                    <instance_attributes id="vm-guest1-instance_attributes">
346  	                        <nvpair id="vm-guest1-instance_attributes-hypervisor"
347  	                            name="hypervisor" value="qemu:///system"/>
348  	                        <nvpair id="vm-guest1-instance_attributes-config"
349  	                            name="config" value="/root/guest1.xml"/>
350  	                    </instance_attributes>
351  	                    <meta_attributes id="vm-guest1-meta_attributes">
352  	                        <nvpair id="vm-guest1-meta_attributes-remote-node"
353  	                            name="remote-node" value="guest1"/>
354  	                    </meta_attributes>
355  	                </primitive>
356  	                <primitive id="dummy2"
357  	                        class="ocf" provider="heartbeat" type="Dummy">
358  	                    <instance_attributes id="vm-guest1-meta_attributes">
359  	                        <nvpair id="dummy2-remote-node"
360  	                            name="remote-node" value="guest2"/>
361  	                    </instance_attributes>
362  	                </primitive>
363  	                <primitive id="dummy3"
364  	                        class="ocf" provider="pacemaker" type="remote">
365  	                </primitive>
366  	            </resources>
367  	        """
368  	        ).documentElement
369  	        resources = dom.getElementsByTagName("resources")[0]
370  	        resources.parentNode.replaceChild(new_resources, resources)
371  	
372  	        self.assertEqual(
373  	            None,
374  	            utils.dom_get_resource_remote_node_name(
375  	                utils.dom_get_resource(dom, "dummy1")
376  	            ),
377  	        )
378  	        self.assertEqual(
379  	            None,
380  	            utils.dom_get_resource_remote_node_name(
381  	                utils.dom_get_resource(dom, "dummy2")
382  	            ),
383  	        )
384  	        self.assertEqual(
385  	            "guest1",
386  	            utils.dom_get_resource_remote_node_name(
387  	                utils.dom_get_resource(dom, "vm-guest1")
388  	            ),
389  	        )
390  	        self.assertEqual(
391  	            "dummy3",
392  	            utils.dom_get_resource_remote_node_name(
393  	                utils.dom_get_resource(dom, "dummy3")
394  	            ),
395  	        )
396  	
397  	    def test_dom_get_meta_attr_value(self):
398  	        dom = self.get_cib_empty()
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
399  	        new_resources = xml.dom.minidom.parseString(
400  	            """
401  	            <resources>
402  	                <primitive id="dummy1"
403  	                        class="ocf" provider="heartbeat" type="Dummy">
404  	                </primitive>
405  	                <primitive class="ocf" id="vm-guest1" provider="heartbeat"
406  	                        type="VirtualDomain">
407  	                    <instance_attributes id="vm-guest1-instance_attributes">
408  	                        <nvpair id="vm-guest1-instance_attributes-hypervisor"
409  	                            name="hypervisor" value="qemu:///system"/>
410  	                        <nvpair id="vm-guest1-instance_attributes-config"
411  	                            name="config" value="/root/guest1.xml"/>
412  	                    </instance_attributes>
413  	                    <meta_attributes id="vm-guest1-meta_attributes">
414  	                        <nvpair id="vm-guest1-meta_attributes-remote-node"
415  	                            name="remote-node" value="guest1"/>
416  	                    </meta_attributes>
417  	                </primitive>
418  	                <primitive id="dummy2"
419  	                        class="ocf" provider="heartbeat" type="Dummy">
420  	                    <instance_attributes id="vm-guest1-meta_attributes">
421  	                        <nvpair id="dummy2-remote-node"
422  	                            name="remote-node" value="guest2"/>
423  	                    </instance_attributes>
424  	                </primitive>
425  	            </resources>
426  	        """
427  	        ).documentElement
428  	        resources = dom.getElementsByTagName("resources")[0]
429  	        resources.parentNode.replaceChild(new_resources, resources)
430  	
431  	        self.assertEqual(
432  	            None,
433  	            utils.dom_get_meta_attr_value(
434  	                utils.dom_get_resource(dom, "dummy1"), "foo"
435  	            ),
436  	        )
437  	        self.assertEqual(
438  	            None,
439  	            utils.dom_get_meta_attr_value(
440  	                utils.dom_get_resource(dom, "dummy2"), "remote-node"
441  	            ),
442  	        )
443  	        self.assertEqual(
444  	            "guest1",
445  	            utils.dom_get_meta_attr_value(
446  	                utils.dom_get_resource(dom, "vm-guest1"), "remote-node"
447  	            ),
448  	        )
449  	        self.assertEqual(
450  	            None,
451  	            utils.dom_get_meta_attr_value(
452  	                utils.dom_get_resource(dom, "vm-guest1"), "foo"
453  	            ),
454  	        )
455  	
456  	    def test_dom_get_parent_by_tag_name(self):
457  	        def dom_get_element_with_id(dom, tag_name, element_id):
458  	            for elem in dom.getElementsByTagName(tag_name):
459  	                if (
460  	                    elem.hasAttribute("id")
461  	                    and elem.getAttribute("id") == element_id
462  	                ):
463  	                    return elem
464  	            return None
465  	
466  	        dom = xml.dom.minidom.parseString(
467  	            """
468  	            <aa id="aa1">
469  	                <bb id="bb1"/>
470  	                <bb id="bb2">
471  	                    <cc id="cc1"/>
472  	                </bb>
473  	                <bb id="bb3">
474  	                    <cc id="cc2"/>
475  	                </bb>
476  	                <dd id="dd1" />
477  	            </aa>
478  	        """
479  	        ).documentElement
480  	        bb1 = dom_get_element_with_id(dom, "bb", "bb1")
481  	        cc1 = dom_get_element_with_id(dom, "cc", "cc1")
482  	
483  	        self.assert_element_id(
484  	            utils.dom_get_parent_by_tag_names(bb1, ["aa"]), "aa1"
485  	        )
486  	        self.assert_element_id(
487  	            utils.dom_get_parent_by_tag_names(cc1, ["aa"]), "aa1"
488  	        )
489  	        self.assert_element_id(
490  	            utils.dom_get_parent_by_tag_names(cc1, ["bb"]), "bb2"
491  	        )
492  	
493  	        self.assertEqual(None, utils.dom_get_parent_by_tag_names(bb1, ["cc"]))
494  	        self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["dd"]))
495  	        self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["ee"]))
496  	
497  	    def test_validate_constraint_resource(self):
498  	        dom = self.get_cib_resources()
499  	        self.assertEqual(
500  	            (True, "", "myClone"),
501  	            utils.validate_constraint_resource(dom, "myClone"),
502  	        )
503  	        self.assertEqual(
504  	            (True, "", "myGroupClone"),
505  	            utils.validate_constraint_resource(dom, "myGroupClone"),
506  	        )
507  	        self.assertEqual(
508  	            (True, "", "myMaster"),
509  	            utils.validate_constraint_resource(dom, "myMaster"),
510  	        )
511  	        self.assertEqual(
512  	            (True, "", "myGroupMaster"),
513  	            utils.validate_constraint_resource(dom, "myGroupMaster"),
514  	        )
515  	        self.assertEqual(
516  	            (True, "", "myBundle"),
517  	            utils.validate_constraint_resource(dom, "myBundle"),
518  	        )
519  	        self.assertEqual(
520  	            (True, "", "myEmptyBundle"),
521  	            utils.validate_constraint_resource(dom, "myEmptyBundle"),
522  	        )
523  	        self.assertEqual(
524  	            (True, "", "myResource"),
525  	            utils.validate_constraint_resource(dom, "myResource"),
526  	        )
527  	        self.assertEqual(
528  	            (True, "", "myGroup"),
529  	            utils.validate_constraint_resource(dom, "myGroup"),
530  	        )
531  	        self.assertEqual(
532  	            (True, "", "myGroupedResource"),
533  	            utils.validate_constraint_resource(dom, "myGroupedResource"),
534  	        )
535  	
536  	        self.assertEqual(
537  	            (False, "Resource 'myNonexistent' does not exist", None),
538  	            utils.validate_constraint_resource(dom, "myNonexistent"),
539  	        )
540  	
541  	        message = (
542  	            "%s is a clone resource, you should use the clone id: "
543  	            "%s when adding constraints. Use --force to override."
544  	        )
545  	        self.assertEqual(
546  	            (False, message % ("myClonedResource", "myClone"), "myClone"),
547  	            utils.validate_constraint_resource(dom, "myClonedResource"),
548  	        )
549  	        self.assertEqual(
550  	            (
551  	                False,
552  	                message % ("myClonedGroup", "myGroupClone"),
553  	                "myGroupClone",
554  	            ),
555  	            utils.validate_constraint_resource(dom, "myClonedGroup"),
556  	        )
557  	        self.assertEqual(
558  	            (
559  	                False,
560  	                message % ("myClonedGroupedResource", "myGroupClone"),
561  	                "myGroupClone",
562  	            ),
563  	            utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
564  	        )
565  	
566  	        message = (
567  	            "%s is a clone resource, you should use the clone id: "
568  	            "%s when adding constraints. Use --force to override."
569  	        )
570  	        self.assertEqual(
571  	            (False, message % ("myMasteredResource", "myMaster"), "myMaster"),
572  	            utils.validate_constraint_resource(dom, "myMasteredResource"),
573  	        )
574  	        self.assertEqual(
575  	            (
576  	                False,
577  	                message % ("myMasteredGroup", "myGroupMaster"),
578  	                "myGroupMaster",
579  	            ),
580  	            utils.validate_constraint_resource(dom, "myMasteredGroup"),
581  	        )
582  	        self.assertEqual(
583  	            (
584  	                False,
585  	                message % ("myMasteredGroupedResource", "myGroupMaster"),
586  	                "myGroupMaster",
587  	            ),
588  	            utils.validate_constraint_resource(
589  	                dom, "myMasteredGroupedResource"
590  	            ),
591  	        )
592  	
593  	        message = (
594  	            "%s is a bundle resource, you should use the bundle id: "
595  	            "%s when adding constraints. Use --force to override."
596  	        )
597  	        self.assertEqual(
598  	            (False, message % ("myBundledResource", "myBundle"), "myBundle"),
599  	            utils.validate_constraint_resource(dom, "myBundledResource"),
600  	        )
601  	
602  	        utils.pcs_options["--force"] = True
603  	        self.assertEqual(
604  	            (True, "", "myClone"),
605  	            utils.validate_constraint_resource(dom, "myClonedResource"),
606  	        )
607  	        self.assertEqual(
608  	            (True, "", "myGroupClone"),
609  	            utils.validate_constraint_resource(dom, "myClonedGroup"),
610  	        )
611  	        self.assertEqual(
612  	            (True, "", "myGroupClone"),
613  	            utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
614  	        )
615  	        self.assertEqual(
616  	            (True, "", "myMaster"),
617  	            utils.validate_constraint_resource(dom, "myMasteredResource"),
618  	        )
619  	        self.assertEqual(
620  	            (True, "", "myGroupMaster"),
621  	            utils.validate_constraint_resource(dom, "myMasteredGroup"),
622  	        )
623  	        self.assertEqual(
624  	            (True, "", "myGroupMaster"),
625  	            utils.validate_constraint_resource(
626  	                dom, "myMasteredGroupedResource"
627  	            ),
628  	        )
629  	        self.assertEqual(
630  	            (True, "", "myBundle"),
631  	            utils.validate_constraint_resource(dom, "myBundledResource"),
632  	        )
633  	
634  	    def test_validate_xml_id(self):
635  	        self.assertEqual((True, ""), utils.validate_xml_id("dummy"))
636  	        self.assertEqual((True, ""), utils.validate_xml_id("DUMMY"))
637  	        self.assertEqual((True, ""), utils.validate_xml_id("dUmMy"))
638  	        self.assertEqual((True, ""), utils.validate_xml_id("dummy0"))
639  	        self.assertEqual((True, ""), utils.validate_xml_id("dum0my"))
640  	        self.assertEqual((True, ""), utils.validate_xml_id("dummy-"))
641  	        self.assertEqual((True, ""), utils.validate_xml_id("dum-my"))
642  	        self.assertEqual((True, ""), utils.validate_xml_id("dummy."))
643  	        self.assertEqual((True, ""), utils.validate_xml_id("dum.my"))
644  	        self.assertEqual((True, ""), utils.validate_xml_id("_dummy"))
645  	        self.assertEqual((True, ""), utils.validate_xml_id("dummy_"))
646  	        self.assertEqual((True, ""), utils.validate_xml_id("dum_my"))
647  	
648  	        self.assertEqual(
649  	            (False, "test id cannot be empty"),
650  	            utils.validate_xml_id("", "test id"),
651  	        )
652  	
653  	        msg = "invalid test id '%s', '%s' is not a valid first character for a test id"
654  	        self.assertEqual(
655  	            (False, msg % ("0", "0")), utils.validate_xml_id("0", "test id")
656  	        )
657  	        self.assertEqual(
658  	            (False, msg % ("-", "-")), utils.validate_xml_id("-", "test id")
659  	        )
660  	        self.assertEqual(
661  	            (False, msg % (".", ".")), utils.validate_xml_id(".", "test id")
662  	        )
663  	        self.assertEqual(
664  	            (False, msg % (":", ":")), utils.validate_xml_id(":", "test id")
665  	        )
666  	        self.assertEqual(
667  	            (False, msg % ("0dummy", "0")),
668  	            utils.validate_xml_id("0dummy", "test id"),
669  	        )
670  	        self.assertEqual(
671  	            (False, msg % ("-dummy", "-")),
672  	            utils.validate_xml_id("-dummy", "test id"),
673  	        )
674  	        self.assertEqual(
675  	            (False, msg % (".dummy", ".")),
676  	            utils.validate_xml_id(".dummy", "test id"),
677  	        )
678  	        self.assertEqual(
679  	            (False, msg % (":dummy", ":")),
680  	            utils.validate_xml_id(":dummy", "test id"),
681  	        )
682  	
683  	        msg = (
684  	            "invalid test id '%s', '%s' is not a valid character for a test id"
685  	        )
686  	        self.assertEqual(
687  	            (False, msg % ("dum:my", ":")),
688  	            utils.validate_xml_id("dum:my", "test id"),
689  	        )
690  	        self.assertEqual(
691  	            (False, msg % ("dummy:", ":")),
692  	            utils.validate_xml_id("dummy:", "test id"),
693  	        )
694  	        self.assertEqual(
695  	            (False, msg % ("dum?my", "?")),
696  	            utils.validate_xml_id("dum?my", "test id"),
697  	        )
698  	        self.assertEqual(
699  	            (False, msg % ("dummy?", "?")),
700  	            utils.validate_xml_id("dummy?", "test id"),
701  	        )
702  	
703  	    def test_is_score(self):
704  	        self.assertTrue(utils.is_score("INFINITY"))
705  	        self.assertTrue(utils.is_score("+INFINITY"))
706  	        self.assertTrue(utils.is_score("-INFINITY"))
707  	        self.assertTrue(utils.is_score("0"))
708  	        self.assertTrue(utils.is_score("+0"))
709  	        self.assertTrue(utils.is_score("-0"))
710  	        self.assertTrue(utils.is_score("123"))
711  	        self.assertTrue(utils.is_score("-123"))
712  	        self.assertTrue(utils.is_score("+123"))
713  	
714  	        self.assertFalse(utils.is_score(""))
715  	        self.assertFalse(utils.is_score("abc"))
716  	        self.assertFalse(utils.is_score("+abc"))
717  	        self.assertFalse(utils.is_score("-abc"))
718  	        self.assertFalse(utils.is_score("10a"))
719  	        self.assertFalse(utils.is_score("+10a"))
720  	        self.assertFalse(utils.is_score("-10a"))
721  	        self.assertFalse(utils.is_score("a10"))
722  	        self.assertFalse(utils.is_score("+a10"))
723  	        self.assertFalse(utils.is_score("a-10"))
724  	        self.assertFalse(utils.is_score("infinity"))
725  	        self.assertFalse(utils.is_score("+infinity"))
726  	        self.assertFalse(utils.is_score("-infinity"))
727  	        self.assertFalse(utils.is_score("+InFiNiTy"))
728  	        self.assertFalse(utils.is_score("INFINITY10"))
729  	        self.assertFalse(utils.is_score("INFINITY+10"))
730  	        self.assertFalse(utils.is_score("-INFINITY10"))
731  	        self.assertFalse(utils.is_score("+INFINITY+10"))
732  	        self.assertFalse(utils.is_score("10INFINITY"))
733  	        self.assertFalse(utils.is_score("+10+INFINITY"))
734  	
735  	    def get_cib_status_lrm(self):
736  	        cib_dom = self.get_cib_empty()
737  	        new_status = xml.dom.minidom.parseString(
738  	            """
739  	<status>
740  	  <node_state id="1" uname="rh70-node1">
741  	    <lrm id="1">
742  	      <lrm_resources>
743  	        <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
744  	          <lrm_rsc_op id="dummy_monitor_30000" operation="monitor" call-id="34"
745  	            rc-code="1" on_node="Xrh70-node1X" exit-reason="test" />
746  	          <lrm_rsc_op id="dummy_stop_0" operation="stop" call-id="32"
747  	            rc-code="0" />
748  	          <lrm_rsc_op id="dummy_start_0" operation="start" call-id="33"
749  	            rc-code="0" />
750  	        </lrm_resource>
751  	      </lrm_resources>
752  	    </lrm>
753  	  </node_state>
754  	  <node_state id="2" uname="rh70-node2">
755  	    <lrm id="2">
756  	      <lrm_resources>
757  	        <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
758  	          <lrm_rsc_op id="dummy_monitor_0" operation="monitor" call-id="5"
759  	            rc-code="1" />
760  	        </lrm_resource>
761  	        <lrm_resource id="dummy1" type="Dummy" class="ocf" provider="heartbeat">
762  	          <lrm_rsc_op id="dummy1_monitor_0" operation="monitor" call-id="3"
763  	            rc-code="0" />
764  	        </lrm_resource>
765  	      </lrm_resources>
766  	    </lrm>
767  	  </node_state>
768  	</status>
769  	        """
770  	        ).documentElement
771  	        status = cib_dom.getElementsByTagName("status")[0]
772  	        status.parentNode.replaceChild(new_status, status)
773  	        return cib_dom
774  	
775  	    def test_resource_running_on(self):
776  	        status = xml.dom.minidom.parseString(
777  	            f"""
778  	<crm_mon>
779  	    <summary />
780  	    <nodes />
781  	    <resources>
782  	        <resource id="myResource" role="Started">
783  	            <node name="rh70-node1" />
784  	        </resource>
785  	        <clone id="myClone">
786  	            <resource id="myClonedResource" role="Started">
787  	                <node name="rh70-node1" />
788  	            </resource>
789  	            <resource id="myClonedResource" role="Started">
790  	                <node name="rh70-node2" />
791  	            </resource>
792  	            <resource id="myClonedResource" role="Started">
793  	                <node name="rh70-node3" />
794  	            </resource>
795  	        </clone>
796  	        <clone id="myMaster">
797  	            <resource id="myMasteredResource:1" role="{const.PCMK_ROLE_UNPROMOTED}">
798  	                <node name="rh70-node2" />
799  	            </resource>
800  	            <resource id="myMasteredResource" role="{const.PCMK_ROLE_UNPROMOTED}">
801  	                <node name="rh70-node3" />
802  	            </resource>
803  	            <resource id="myMasteredResource" role="{const.PCMK_ROLE_PROMOTED}">
804  	                <node name="rh70-node1" />
805  	            </resource>
806  	        </clone>
807  	        <group id="myGroup">
808  	             <resource id="myGroupedResource" role="Started">
809  	                 <node name="rh70-node2" />
810  	             </resource>
811  	        </group>
812  	        <clone id="myGroupClone">
813  	            <group id="myClonedGroup:0">
814  	                 <resource id="myClonedGroupedResource" role="Started">
815  	                     <node name="rh70-node1" />
816  	                 </resource>
817  	            </group>
818  	            <group id="myClonedGroup:1">
819  	                 <resource id="myClonedGroupedResource" role="Started">
820  	                     <node name="rh70-node2" />
821  	                 </resource>
822  	            </group>
823  	            <group id="myClonedGroup:2">
824  	                 <resource id="myClonedGroupedResource" role="Started">
825  	                     <node name="rh70-node3" />
826  	                 </resource>
827  	            </group>
828  	            <group id="myClonedGroup:3">
829  	                 <resource id="myClonedGroupedResource" role="Started">
830  	                     <node name="rh70-node3" />
831  	                 </resource>
832  	            </group>
833  	        </clone>
834  	        <clone id="myGroupMaster">
835  	            <group id="myMasteredGroup:0">
836  	                 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED}">
837  	                     <node name="rh70-node1" />
838  	                 </resource>
839  	            </group>
840  	            <group id="myMasteredGroup:1">
841  	                 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_PROMOTED}">
842  	                     <node name="rh70-node2" />
843  	                 </resource>
844  	            </group>
845  	            <group id="myMasteredGroup:2">
846  	                 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED}">
847  	                     <node name="rh70-node3" />
848  	                 </resource>
849  	            </group>
850  	        </clone>
851  	        <resource id="myStoppedResource" role="Stopped">
852  	        </resource>
853  	    </resources>
854  	</crm_mon>
855  	        """
856  	        ).documentElement
857  	
858  	        self.assertEqual(
859  	            utils.resource_running_on("myResource", status),
860  	            {
861  	                "message": "Resource 'myResource' is running on node rh70-node1.",
862  	                "is_running": True,
863  	            },
864  	        )
865  	        self.assertEqual(
866  	            utils.resource_running_on("myClonedResource", status),
867  	            {
868  	                "message": "Resource 'myClonedResource' is running on nodes "
869  	                "rh70-node1, rh70-node2, rh70-node3.",
870  	                "is_running": True,
871  	            },
872  	        )
873  	        self.assertEqual(
874  	            utils.resource_running_on("myClone", status),
875  	            {
876  	                "message": "Resource 'myClone' is running on nodes "
877  	                "rh70-node1, rh70-node2, rh70-node3.",
878  	                "is_running": True,
879  	            },
880  	        )
881  	        self.assertEqual(
882  	            utils.resource_running_on("myMasteredResource", status),
883  	            {
884  	                "message": (
885  	                    "Resource 'myMasteredResource' is {promoted} on node "
886  	                    "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
887  	                ).format(
888  	                    promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
889  	                    unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
890  	                ),
891  	                "is_running": True,
892  	            },
893  	        )
894  	        self.assertEqual(
895  	            utils.resource_running_on("myMaster", status),
896  	            {
897  	                "message": (
898  	                    "Resource 'myMaster' is {promoted} on node "
899  	                    "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
900  	                ).format(
901  	                    promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
902  	                    unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
903  	                ),
904  	                "is_running": True,
905  	            },
906  	        )
907  	        self.assertEqual(
908  	            utils.resource_running_on("myGroupedResource", status),
909  	            {
910  	                "message": "Resource 'myGroupedResource' is running on node "
911  	                "rh70-node2.",
912  	                "is_running": True,
913  	            },
914  	        )
915  	        self.assertEqual(
916  	            utils.resource_running_on("myGroup", status),
917  	            {
918  	                "message": "Resource 'myGroup' is running on node rh70-node2.",
919  	                "is_running": True,
920  	            },
921  	        )
922  	        self.assertEqual(
923  	            utils.resource_running_on("myClonedGroupedResource", status),
924  	            {
925  	                "message": "Resource 'myClonedGroupedResource' is running on nodes "
926  	                "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
927  	                "is_running": True,
928  	            },
929  	        )
930  	        self.assertEqual(
931  	            utils.resource_running_on("myClonedGroup", status),
932  	            {
933  	                "message": "Resource 'myClonedGroup' is running on nodes "
934  	                "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
935  	                "is_running": True,
936  	            },
937  	        )
938  	        self.assertEqual(
939  	            utils.resource_running_on("myGroupClone", status),
940  	            {
941  	                "message": "Resource 'myGroupClone' is running on nodes "
942  	                "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
943  	                "is_running": True,
944  	            },
945  	        )
946  	        self.assertEqual(
947  	            utils.resource_running_on("myMasteredGroupedResource", status),
948  	            {
949  	                "message": (
950  	                    "Resource 'myMasteredGroupedResource' is {promoted} on node "
951  	                    "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
952  	                ).format(
953  	                    promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
954  	                    unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
955  	                ),
956  	                "is_running": True,
957  	            },
958  	        )
959  	        self.assertEqual(
960  	            utils.resource_running_on("myMasteredGroup", status),
961  	            {
962  	                "message": (
963  	                    "Resource 'myMasteredGroup' is {promoted} on node "
964  	                    "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
965  	                ).format(
966  	                    promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
967  	                    unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
968  	                ),
969  	                "is_running": True,
970  	            },
971  	        )
972  	        self.assertEqual(
973  	            utils.resource_running_on("myGroupMaster", status),
974  	            {
975  	                "message": (
976  	                    "Resource 'myGroupMaster' is {promoted} on node "
977  	                    "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
978  	                ).format(
979  	                    promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
980  	                    unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
981  	                ),
982  	                "is_running": True,
983  	            },
984  	        )
985  	        self.assertEqual(
986  	            utils.resource_running_on("notMyResource", status),
987  	            {
988  	                "message": "Resource 'notMyResource' is not running on any node",
989  	                "is_running": False,
990  	            },
991  	        )
992  	        self.assertEqual(
993  	            utils.resource_running_on("myStoppedResource", status),
994  	            {
995  	                "message": "Resource 'myStoppedResource' is not running on any node",
996  	                "is_running": False,
997  	            },
998  	        )
999  	
1000 	    def test_get_operations_from_transitions(self):
1001 	        transitions = xml.dom.minidom.parse(rc("transitions01.xml"))
1002 	        self.assertEqual(
1003 	            [
1004 	                {
1005 	                    "id": "dummy",
1006 	                    "long_id": "dummy",
1007 	                    "operation": "stop",
1008 	                    "on_node": "rh7-3",
1009 	                },
1010 	                {
1011 	                    "id": "dummy",
1012 	                    "long_id": "dummy",
1013 	                    "operation": "start",
1014 	                    "on_node": "rh7-2",
1015 	                },
1016 	                {
1017 	                    "id": "d0",
1018 	                    "long_id": "d0:1",
1019 	                    "operation": "stop",
1020 	                    "on_node": "rh7-1",
1021 	                },
1022 	                {
1023 	                    "id": "d0",
1024 	                    "long_id": "d0:1",
1025 	                    "operation": "start",
1026 	                    "on_node": "rh7-2",
1027 	                },
1028 	                {
1029 	                    "id": "state",
1030 	                    "long_id": "state:0",
1031 	                    "operation": "stop",
1032 	                    "on_node": "rh7-3",
1033 	                },
1034 	                {
1035 	                    "id": "state",
1036 	                    "long_id": "state:0",
1037 	                    "operation": "start",
1038 	                    "on_node": "rh7-2",
1039 	                },
1040 	            ],
1041 	            utils.get_operations_from_transitions(transitions),
1042 	        )
1043 	
1044 	        transitions = xml.dom.minidom.parse(rc("transitions02.xml"))
1045 	        self.assertEqual(
1046 	            [
1047 	                {
1048 	                    "id": "RemoteNode",
1049 	                    "long_id": "RemoteNode",
1050 	                    "operation": "stop",
1051 	                    "on_node": "virt-143",
1052 	                },
1053 	                {
1054 	                    "id": "RemoteNode",
1055 	                    "long_id": "RemoteNode",
1056 	                    "operation": "migrate_to",
1057 	                    "on_node": "virt-143",
1058 	                },
1059 	                {
1060 	                    "id": "RemoteNode",
1061 	                    "long_id": "RemoteNode",
1062 	                    "operation": "migrate_from",
1063 	                    "on_node": "virt-142",
1064 	                },
1065 	                {
1066 	                    "id": "dummy8",
1067 	                    "long_id": "dummy8",
1068 	                    "operation": "stop",
1069 	                    "on_node": "virt-143",
1070 	                },
1071 	                {
1072 	                    "id": "dummy8",
1073 	                    "long_id": "dummy8",
1074 	                    "operation": "start",
1075 	                    "on_node": "virt-142",
1076 	                },
1077 	            ],
1078 	            utils.get_operations_from_transitions(transitions),
1079 	        )
1080 	
1081 	    def test_get_resources_location_from_operations(self):
1082 	        cib_dom = self.get_cib_resources()
1083 	
1084 	        operations = []
1085 	        self.assertEqual(
1086 	            {},
1087 	            utils.get_resources_location_from_operations(cib_dom, operations),
1088 	        )
1089 	
1090 	        operations = [
1091 	            {
1092 	                "id": "myResource",
1093 	                "long_id": "myResource",
1094 	                "operation": "start",
1095 	                "on_node": "rh7-1",
1096 	            },
1097 	        ]
1098 	        self.assertEqual(
1099 	            {
1100 	                "myResource": {
1101 	                    "id": "myResource",
1102 	                    "id_for_constraint": "myResource",
1103 	                    "long_id": "myResource",
1104 	                    "start_on_node": "rh7-1",
1105 	                },
1106 	            },
1107 	            utils.get_resources_location_from_operations(cib_dom, operations),
1108 	        )
1109 	
1110 	        operations = [
1111 	            {
1112 	                "id": "myResource",
1113 	                "long_id": "myResource",
1114 	                "operation": "start",
1115 	                "on_node": "rh7-1",
1116 	            },
1117 	            {
1118 	                "id": "myResource",
1119 	                "long_id": "myResource",
1120 	                "operation": "start",
1121 	                "on_node": "rh7-2",
1122 	            },
1123 	            {
1124 	                "id": "myResource",
1125 	                "long_id": "myResource",
1126 	                "operation": "monitor",
1127 	                "on_node": "rh7-3",
1128 	            },
1129 	            {
1130 	                "id": "myResource",
1131 	                "long_id": "myResource",
1132 	                "operation": "stop",
1133 	                "on_node": "rh7-3",
1134 	            },
1135 	        ]
1136 	        self.assertEqual(
1137 	            {
1138 	                "myResource": {
1139 	                    "id": "myResource",
1140 	                    "id_for_constraint": "myResource",
1141 	                    "long_id": "myResource",
1142 	                    "start_on_node": "rh7-2",
1143 	                },
1144 	            },
1145 	            utils.get_resources_location_from_operations(cib_dom, operations),
1146 	        )
1147 	
1148 	        operations = [
1149 	            {
1150 	                "id": "myResource",
1151 	                "long_id": "myResource",
1152 	                "operation": "start",
1153 	                "on_node": "rh7-1",
1154 	            },
1155 	            {
1156 	                "id": "myClonedResource",
1157 	                "long_id": "myClonedResource:0",
1158 	                "operation": "start",
1159 	                "on_node": "rh7-1",
1160 	            },
1161 	            {
1162 	                "id": "myClonedResource",
1163 	                "long_id": "myClonedResource:0",
1164 	                "operation": "start",
1165 	                "on_node": "rh7-2",
1166 	            },
1167 	            {
1168 	                "id": "myClonedResource",
1169 	                "long_id": "myClonedResource:1",
1170 	                "operation": "start",
1171 	                "on_node": "rh7-3",
1172 	            },
1173 	        ]
1174 	        self.assertEqual(
1175 	            {
1176 	                "myResource": {
1177 	                    "id": "myResource",
1178 	                    "id_for_constraint": "myResource",
1179 	                    "long_id": "myResource",
1180 	                    "start_on_node": "rh7-1",
1181 	                },
1182 	                "myClonedResource:0": {
1183 	                    "id": "myClonedResource",
1184 	                    "id_for_constraint": "myClone",
1185 	                    "long_id": "myClonedResource:0",
1186 	                    "start_on_node": "rh7-2",
1187 	                },
1188 	                "myClonedResource:1": {
1189 	                    "id": "myClonedResource",
1190 	                    "id_for_constraint": "myClone",
1191 	                    "long_id": "myClonedResource:1",
1192 	                    "start_on_node": "rh7-3",
1193 	                },
1194 	            },
1195 	            utils.get_resources_location_from_operations(cib_dom, operations),
1196 	        )
1197 	
1198 	        operations = [
1199 	            {
1200 	                "id": "myUniqueClonedResource:0",
1201 	                "long_id": "myUniqueClonedResource:0",
1202 	                "operation": "start",
1203 	                "on_node": "rh7-1",
1204 	            },
1205 	            {
1206 	                "id": "myUniqueClonedResource:1",
1207 	                "long_id": "myUniqueClonedResource:1",
1208 	                "operation": "monitor",
1209 	                "on_node": "rh7-2",
1210 	            },
1211 	            {
1212 	                "id": "myUniqueClonedResource:2",
1213 	                "long_id": "myUniqueClonedResource:2",
1214 	                "operation": "start",
1215 	                "on_node": "rh7-3",
1216 	            },
1217 	        ]
1218 	        self.assertEqual(
1219 	            {
1220 	                "myUniqueClonedResource:0": {
1221 	                    "id": "myUniqueClonedResource:0",
1222 	                    "id_for_constraint": "myUniqueClone",
1223 	                    "long_id": "myUniqueClonedResource:0",
1224 	                    "start_on_node": "rh7-1",
1225 	                },
1226 	                "myUniqueClonedResource:2": {
1227 	                    "id": "myUniqueClonedResource:2",
1228 	                    "id_for_constraint": "myUniqueClone",
1229 	                    "long_id": "myUniqueClonedResource:2",
1230 	                    "start_on_node": "rh7-3",
1231 	                },
1232 	            },
1233 	            utils.get_resources_location_from_operations(cib_dom, operations),
1234 	        )
1235 	
1236 	        operations = [
1237 	            {
1238 	                "id": "myMasteredGroupedResource",
1239 	                "long_id": "myMasteredGroupedResource:0",
1240 	                "operation": "start",
1241 	                "on_node": "rh7-1",
1242 	            },
1243 	            {
1244 	                "id": "myMasteredGroupedResource",
1245 	                "long_id": "myMasteredGroupedResource:1",
1246 	                "operation": "demote",
1247 	                "on_node": "rh7-2",
1248 	            },
1249 	            {
1250 	                "id": "myMasteredGroupedResource",
1251 	                "long_id": "myMasteredGroupedResource:1",
1252 	                "operation": "promote",
1253 	                "on_node": "rh7-3",
1254 	            },
1255 	        ]
1256 	        self.assertEqual(
1257 	            {
1258 	                "myMasteredGroupedResource:0": {
1259 	                    "id": "myMasteredGroupedResource",
1260 	                    "id_for_constraint": "myGroupMaster",
1261 	                    "long_id": "myMasteredGroupedResource:0",
1262 	                    "start_on_node": "rh7-1",
1263 	                },
1264 	                "myMasteredGroupedResource:1": {
1265 	                    "id": "myMasteredGroupedResource",
1266 	                    "id_for_constraint": "myGroupMaster",
1267 	                    "long_id": "myMasteredGroupedResource:1",
1268 	                    "promote_on_node": "rh7-3",
1269 	                },
1270 	            },
1271 	            utils.get_resources_location_from_operations(cib_dom, operations),
1272 	        )
1273 	
1274 	        operations = [
1275 	            {
1276 	                "id": "myResource",
1277 	                "long_id": "myResource",
1278 	                "operation": "stop",
1279 	                "on_node": "rh7-1",
1280 	            },
1281 	            {
1282 	                "id": "myResource",
1283 	                "long_id": "myResource",
1284 	                "operation": "migrate_to",
1285 	                "on_node": "rh7-1",
1286 	            },
1287 	            {
1288 	                "id": "myResource",
1289 	                "long_id": "myResource",
1290 	                "operation": "migrate_from",
1291 	                "on_node": "rh7-2",
1292 	            },
1293 	        ]
1294 	        self.assertEqual(
1295 	            {
1296 	                "myResource": {
1297 	                    "id": "myResource",
1298 	                    "id_for_constraint": "myResource",
1299 	                    "long_id": "myResource",
1300 	                    "start_on_node": "rh7-2",
1301 	                },
1302 	            },
1303 	            utils.get_resources_location_from_operations(cib_dom, operations),
1304 	        )
1305 	
1306 	    def test_is_int(self):
1307 	        self.assertTrue(utils.is_int("-999"))
1308 	        self.assertTrue(utils.is_int("-1"))
1309 	        self.assertTrue(utils.is_int("0"))
1310 	        self.assertTrue(utils.is_int("1"))
1311 	        self.assertTrue(utils.is_int("99999"))
1312 	        self.assertTrue(utils.is_int(" 99999  "))
1313 	        self.assertFalse(utils.is_int("0.0"))
1314 	        self.assertFalse(utils.is_int("-1.0"))
1315 	        self.assertFalse(utils.is_int("-0.1"))
1316 	        self.assertFalse(utils.is_int("0.001"))
1317 	        self.assertFalse(utils.is_int("-999999.1"))
1318 	        self.assertFalse(utils.is_int("0.0001"))
1319 	        self.assertFalse(utils.is_int(""))
1320 	        self.assertFalse(utils.is_int("   "))
1321 	        self.assertFalse(utils.is_int("A"))
1322 	        self.assertFalse(utils.is_int("random 15 47 text  "))
1323 	
1324 	    def test_dom_get_node(self):
1325 	        cib = self.get_cib_with_nodes_minidom()
1326 	        self.assertIsNone(utils.dom_get_node(cib, "non-existing-node"))
1327 	        node = utils.dom_get_node(cib, "rh7-1")
1328 	        self.assertEqual(node.getAttribute("uname"), "rh7-1")
1329 	        self.assertEqual(node.getAttribute("id"), "1")
1330 	
1331 	    def test_dom_prepare_child_element(self):
1332 	        cib = self.get_cib_with_nodes_minidom()
1333 	        node = cib.getElementsByTagName("node")[0]
1334 	        self.assertEqual(len(dom_get_child_elements(node)), 0)
1335 	        child = utils.dom_prepare_child_element(
1336 	            node, "utilization", "rh7-1-utilization"
1337 	        )
1338 	        self.assertEqual(len(dom_get_child_elements(node)), 1)
1339 	        self.assertEqual(child, dom_get_child_elements(node)[0])
1340 	        self.assertEqual(dom_get_child_elements(node)[0].tagName, "utilization")
1341 	        self.assertEqual(
1342 	            dom_get_child_elements(node)[0].getAttribute("id"),
1343 	            "rh7-1-utilization",
1344 	        )
1345 	        child2 = utils.dom_prepare_child_element(
1346 	            node, "utilization", "rh7-1-utilization"
1347 	        )
1348 	        self.assertEqual(len(dom_get_child_elements(node)), 1)
1349 	        self.assertEqual(child, child2)
1350 	
1351 	    def test_dom_update_nv_pair_add(self):
1352 	        nv_set = xml.dom.minidom.parseString("<nvset/>").documentElement
1353 	        utils.dom_update_nv_pair(nv_set, "test_name", "test_val", "prefix-")
1354 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1355 	        pair = dom_get_child_elements(nv_set)[0]
1356 	        self.assertEqual(pair.getAttribute("name"), "test_name")
1357 	        self.assertEqual(pair.getAttribute("value"), "test_val")
1358 	        self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1359 	        utils.dom_update_nv_pair(nv_set, "another_name", "value", "prefix2-")
1360 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1361 	        self.assertEqual(pair, dom_get_child_elements(nv_set)[0])
1362 	        pair = dom_get_child_elements(nv_set)[1]
1363 	        self.assertEqual(pair.getAttribute("name"), "another_name")
1364 	        self.assertEqual(pair.getAttribute("value"), "value")
1365 	        self.assertEqual(pair.getAttribute("id"), "prefix2-another_name")
1366 	
1367 	    def test_dom_update_nv_pair_update(self):
1368 	        nv_set = xml.dom.minidom.parseString(
1369 	            """
1370 	        <nv_set>
1371 	            <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1372 	            <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1373 	        </nv_set>
1374 	        """
1375 	        ).documentElement
1376 	        utils.dom_update_nv_pair(nv_set, "test_name", "new_value")
1377 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1378 	        pair1 = dom_get_child_elements(nv_set)[0]
1379 	        pair2 = dom_get_child_elements(nv_set)[1]
1380 	        self.assertEqual(pair1.getAttribute("name"), "test_name")
1381 	        self.assertEqual(pair1.getAttribute("value"), "new_value")
1382 	        self.assertEqual(pair1.getAttribute("id"), "prefix-test_name")
1383 	        self.assertEqual(pair2.getAttribute("name"), "another_name")
1384 	        self.assertEqual(pair2.getAttribute("value"), "value")
1385 	        self.assertEqual(pair2.getAttribute("id"), "prefix2-another_name")
1386 	
1387 	    def test_dom_update_nv_pair_remove(self):
1388 	        nv_set = xml.dom.minidom.parseString(
1389 	            """
1390 	        <nv_set>
1391 	            <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1392 	            <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1393 	        </nv_set>
1394 	        """
1395 	        ).documentElement
1396 	        utils.dom_update_nv_pair(nv_set, "non_existing_name", "")
1397 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1398 	        utils.dom_update_nv_pair(nv_set, "another_name", "")
1399 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1400 	        pair = dom_get_child_elements(nv_set)[0]
1401 	        self.assertEqual(pair.getAttribute("name"), "test_name")
1402 	        self.assertEqual(pair.getAttribute("value"), "test_val")
1403 	        self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1404 	        utils.dom_update_nv_pair(nv_set, "test_name", "")
1405 	        self.assertEqual(len(dom_get_child_elements(nv_set)), 0)
1406 	
1407 	    def test_convert_args_to_tuples(self):
1408 	        out = utils.convert_args_to_tuples(
1409 	            ["invalid_string", "key=value", "key2=val=ue", "k e y= v a l u e "]
1410 	        )
1411 	        self.assertEqual(
1412 	            out,
1413 	            [("key", "value"), ("key2", "val=ue"), ("k e y", " v a l u e ")],
1414 	        )
1415 	
1416 	    def test_dom_update_utilization_invalid(self):
1417 	        # commands writes to stderr
1418 	        # we want clean test output, so we capture it
1419 	        tmp_stderr = sys.stderr
1420 	        sys.stderr = StringIO()
1421 	
1422 	        el = xml.dom.minidom.parseString(
1423 	            """
1424 	        <resource id="test_id"/>
1425 	        """
1426 	        ).documentElement
1427 	        self.assertRaises(
1428 	            SystemExit,
1429 	            utils.dom_update_utilization,
1430 	            el,
1431 	            {"name": "invalid_val"},
1432 	        )
1433 	
1434 	        self.assertRaises(
1435 	            SystemExit, utils.dom_update_utilization, el, {"name": "0.01"}
1436 	        )
1437 	
1438 	        sys.stderr = tmp_stderr
1439 	
1440 	    def test_dom_update_utilization_add(self):
1441 	        el = xml.dom.minidom.parseString(
1442 	            """
1443 	        <resource id="test_id"/>
1444 	        """
1445 	        ).documentElement
1446 	        utils.dom_update_utilization(
1447 	            el,
1448 	            {
1449 	                "name": "",
1450 	                "key": "-1",
1451 	                "keys": "90",
1452 	            },
1453 	        )
1454 	
1455 	        self.assertEqual(len(dom_get_child_elements(el)), 1)
1456 	        u = dom_get_child_elements(el)[0]  # pylint: disable=invalid-name
1457 	        self.assertEqual(u.tagName, "utilization")
1458 	        self.assertEqual(u.getAttribute("id"), "test_id-utilization")
1459 	        self.assertEqual(len(dom_get_child_elements(u)), 2)
1460 	
1461 	        self.assertEqual(
1462 	            dom_get_child_elements(u)[0].getAttribute("id"),
1463 	            "test_id-utilization-key",
1464 	        )
1465 	        self.assertEqual(
1466 	            dom_get_child_elements(u)[0].getAttribute("name"), "key"
1467 	        )
1468 	        self.assertEqual(
1469 	            dom_get_child_elements(u)[0].getAttribute("value"), "-1"
1470 	        )
1471 	        self.assertEqual(
1472 	            dom_get_child_elements(u)[1].getAttribute("id"),
1473 	            "test_id-utilization-keys",
1474 	        )
1475 	        self.assertEqual(
1476 	            dom_get_child_elements(u)[1].getAttribute("name"), "keys"
1477 	        )
1478 	        self.assertEqual(
1479 	            dom_get_child_elements(u)[1].getAttribute("value"), "90"
1480 	        )
1481 	
1482 	    def test_dom_update_utilization_update_remove(self):
1483 	        el = xml.dom.minidom.parseString(
1484 	            """
1485 	        <resource id="test_id">
1486 	            <utilization id="test_id-utilization">
1487 	                <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1488 	                <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1489 	            </utilization>
1490 	        </resource>
1491 	        """
1492 	        ).documentElement
1493 	        utils.dom_update_utilization(
1494 	            el,
1495 	            {
1496 	                "key": "100",
1497 	                "keys": "",
1498 	            },
1499 	        )
1500 	
1501 	        u = dom_get_child_elements(el)[0]  # pylint: disable=invalid-name
1502 	        self.assertEqual(len(dom_get_child_elements(u)), 1)
1503 	        self.assertEqual(
1504 	            dom_get_child_elements(u)[0].getAttribute("id"),
1505 	            "test_id-utilization-key",
1506 	        )
1507 	        self.assertEqual(
1508 	            dom_get_child_elements(u)[0].getAttribute("name"), "key"
1509 	        )
1510 	        self.assertEqual(
1511 	            dom_get_child_elements(u)[0].getAttribute("value"), "100"
1512 	        )
1513 	
1514 	    def test_dom_update_meta_attr_add(self):
1515 	        el = xml.dom.minidom.parseString(
1516 	            """
1517 	        <resource id="test_id"/>
1518 	        """
1519 	        ).documentElement
1520 	        utils.dom_update_meta_attr(
1521 	            el, [("name", ""), ("key", "test"), ("key2", "val")]
1522 	        )
1523 	
1524 	        self.assertEqual(len(dom_get_child_elements(el)), 1)
1525 	        u = dom_get_child_elements(el)[0]  # pylint: disable=invalid-name
1526 	        self.assertEqual(u.tagName, "meta_attributes")
1527 	        self.assertEqual(u.getAttribute("id"), "test_id-meta_attributes")
1528 	        self.assertEqual(len(dom_get_child_elements(u)), 2)
1529 	
1530 	        self.assertEqual(
1531 	            dom_get_child_elements(u)[0].getAttribute("id"),
1532 	            "test_id-meta_attributes-key",
1533 	        )
1534 	        self.assertEqual(
1535 	            dom_get_child_elements(u)[0].getAttribute("name"), "key"
1536 	        )
1537 	        self.assertEqual(
1538 	            dom_get_child_elements(u)[0].getAttribute("value"), "test"
1539 	        )
1540 	        self.assertEqual(
1541 	            dom_get_child_elements(u)[1].getAttribute("id"),
1542 	            "test_id-meta_attributes-key2",
1543 	        )
1544 	        self.assertEqual(
1545 	            dom_get_child_elements(u)[1].getAttribute("name"), "key2"
1546 	        )
1547 	        self.assertEqual(
1548 	            dom_get_child_elements(u)[1].getAttribute("value"), "val"
1549 	        )
1550 	
1551 	    def test_dom_update_meta_attr_update_remove(self):
1552 	        el = xml.dom.minidom.parseString(
1553 	            """
1554 	        <resource id="test_id">
1555 	            <meta_attributes id="test_id-utilization">
1556 	                <nvpair id="test_id-meta_attributes-key" name="key" value="test"/>
1557 	                <nvpair id="test_id-meta_attributes-key2" name="key2" value="val"/>
1558 	            </meta_attributes>
1559 	        </resource>
1560 	        """
1561 	        ).documentElement
1562 	        utils.dom_update_meta_attr(el, [("key", "another_val"), ("key2", "")])
1563 	
1564 	        u = dom_get_child_elements(el)[0]  # pylint: disable=invalid-name
1565 	        self.assertEqual(len(dom_get_child_elements(u)), 1)
1566 	        self.assertEqual(
1567 	            dom_get_child_elements(u)[0].getAttribute("id"),
1568 	            "test_id-meta_attributes-key",
1569 	        )
1570 	        self.assertEqual(
1571 	            dom_get_child_elements(u)[0].getAttribute("name"), "key"
1572 	        )
1573 	        self.assertEqual(
1574 	            dom_get_child_elements(u)[0].getAttribute("value"), "another_val"
1575 	        )
1576 	
1577 	    def test_get_utilization(self):
1578 	        el = xml.dom.minidom.parseString(
1579 	            """
1580 	        <resource id="test_id">
1581 	            <utilization id="test_id-utilization">
1582 	                <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1583 	                <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1584 	            </utilization>
1585 	        </resource>
1586 	        """
1587 	        ).documentElement
1588 	        self.assertEqual({"key": "-1", "keys": "90"}, utils.get_utilization(el))
1589 	
1590 	    def test_get_utilization_str(self):
1591 	        el = xml.dom.minidom.parseString(
1592 	            """
1593 	        <resource id="test_id">
1594 	            <utilization id="test_id-utilization">
1595 	                <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1596 	                <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1597 	            </utilization>
1598 	        </resource>
1599 	        """
1600 	        ).documentElement
1601 	        self.assertEqual("key=-1 keys=90", utils.get_utilization_str(el))
1602 	
1603 	    def assert_element_id(self, node, node_id, tag=None):
1604 	        self.assertTrue(
1605 	            isinstance(node, xml.dom.minidom.Element),
1606 	            "element with id '%s' not found" % node_id,
1607 	        )
1608 	        self.assertEqual(node.getAttribute("id"), node_id)
1609 	        if tag:
1610 	            self.assertEqual(node.tagName, tag)
1611 	
1612 	
1613 	class RunParallelTest(TestCase):
1614 	    @staticmethod
1615 	    def fixture_create_worker(log, name, sleepSeconds=0):
1616 	        # pylint: disable=invalid-name
1617 	        def worker():
1618 	            sleep(sleepSeconds)
1619 	            log.append(name)
1620 	
1621 	        return worker
1622 	
1623 	    def test_run_all_workers(self):
1624 	        log = []
1625 	        utils.run_parallel(
1626 	            [
1627 	                self.fixture_create_worker(log, "first"),
1628 	                self.fixture_create_worker(log, "second"),
1629 	            ],
1630 	            wait_seconds=0.1,
1631 	        )
1632 	
1633 	        self.assertEqual(sorted(log), sorted(["first", "second"]))
1634 	
1635 	
1636 	class NodeActionTaskTest(TestCase):
1637 	    def test_can_run_action(self):
1638 	        def action(node, arg, kwarg=None):
1639 	            return (0, ":".join([node, arg, kwarg]))
1640 	
1641 	        report_list = []
1642 	
1643 	        def report(node, returncode, output):
1644 	            report_list.append("|".join([node, str(returncode), output]))
1645 	
1646 	        task = utils.create_task(report, action, "node", "arg", kwarg="kwarg")
1647 	        task()
1648 	
1649 	        self.assertEqual(["node|0|node:arg:kwarg"], report_list)
1650 	
1651 	
1652 	class TouchCibFile(TestCase):
1653 	    @mock.patch("pcs.utils.os.path.isfile", mock.Mock(return_value=False))
1654 	    @mock.patch(
1655 	        "pcs.utils.write_empty_cib",
1656 	        mock.Mock(side_effect=EnvironmentError("some message")),
1657 	    )
1658 	    @mock.patch("pcs.utils.err")
1659 	    def test_exception_is_transformed_correctly(self, err):
1660 	        # pylint: disable=no-self-use
1661 	        filename = "/fake/filename"
1662 	        utils.touch_cib_file(filename)
1663 	        err.assert_called_once_with(
1664 	            "Unable to write to file: '/fake/filename': 'some message'"
1665 	        )
1666 	
1667 	
1668 	@mock.patch("pcs.utils.is_run_interactive", mock.Mock(return_value=False))
1669 	class GetContinueConfirmation(TestCase):
1670 	    def setUp(self):
1671 	        self.text = "some warning text"
1672 	        patcher_output = mock.patch("pcs.cli.reports.output.print_to_stderr")
1673 	        self.addCleanup(patcher_output.stop)
1674 	        self.mock_output = patcher_output.start()
1675 	
1676 	    def test_yes_force(self):
1677 	        self.assertTrue(utils.get_continue_confirmation(self.text, True, True))
1678 	        self.mock_output.assert_called_once_with(f"Warning: {self.text}")
1679 	
1680 	    def test_yes(self):
1681 	        self.assertTrue(utils.get_continue_confirmation(self.text, True, False))
1682 	        self.mock_output.assert_called_once_with(f"Warning: {self.text}")
1683 	
1684 	    def test_force(self):
1685 	        self.assertTrue(utils.get_continue_confirmation(self.text, False, True))
1686 	        self.assertEqual(
1687 	            self.mock_output.mock_calls,
1688 	            [
1689 	                mock.call(
1690 	                    "Deprecation Warning: Using --force to confirm this action "
1691 	                    "is deprecated and might be removed in a future release, "
1692 	                    "use --yes instead"
1693 	                ),
1694 	                mock.call(f"Warning: {self.text}"),
1695 	            ],
1696 	        )
1697 	
1698 	    def test_nothing(self):
1699 	        with self.assertRaises(SystemExit) as cm:
1700 	            self.assertFalse(
1701 	                utils.get_continue_confirmation(self.text, False, False)
1702 	            )
1703 	        self.assertEqual(cm.exception.code, 1)
1704 	        self.mock_output.assert_called_once_with(
1705 	            f"Error: {self.text}, use --yes to override"
1706 	        )
1707