1 # pylint: disable=too-many-lines
2 import sys
3 import xml.dom.minidom
4 from io import StringIO
5 from time import sleep
6 from unittest import (
7 TestCase,
8 mock,
9 )
10
11 from pcs import utils
12 from pcs.common import const
13
14 from pcs_test.tools.misc import get_test_resource as rc
15 from pcs_test.tools.xml import dom_get_child_elements
16
17 cib_with_nodes = rc("cib-empty-withnodes.xml")
18 empty_cib = rc("cib-empty.xml")
19
20 TestCase.maxDiff = None
21
22
23 class UtilsTest(TestCase):
24 # pylint: disable=too-many-public-methods
25 @staticmethod
26 def get_cib_empty():
27 return xml.dom.minidom.parse(empty_cib)
28
29 @staticmethod
30 def get_cib_with_nodes_minidom():
31 return xml.dom.minidom.parse(cib_with_nodes)
32
33 def get_cib_resources(self):
34 cib_dom = self.get_cib_empty()
35 new_resources = xml.dom.minidom.parseString(
36 """
37 <resources>
38 <primitive id="myResource"
39 class="ocf" provider="heartbeat" type="Dummy">
40 </primitive>
41 <clone id="myClone">
42 <primitive id="myClonedResource"
43 class="ocf" provider="heartbeat" type="Dummy">
44 </primitive>
45 </clone>
46 <clone id="myUniqueClone">
47 <primitive id="myUniqueClonedResource"
48 class="ocf" provider="heartbeat" type="Dummy">
49 </primitive>
50 <meta-attributes>
51 <nvpair name="globally-unique" value="true" />
52 </meta-attributes>
53 </clone>
54 <master id="myMaster">
55 <primitive id="myMasteredResource"
56 class="ocf" provider="heartbeat" type="Dummy">
57 </primitive>
58 </master>
59 <group id="myGroup">
60 <primitive id="myGroupedResource"
61 class="ocf" provider="heartbeat" type="Dummy">
62 </primitive>
63 </group>
64 <clone id="myGroupClone">
65 <group id="myClonedGroup">
66 <primitive id="myClonedGroupedResource"
67 class="ocf" provider="heartbeat" type="Dummy">
68 </primitive>
69 </group>
70 </clone>
71 <master id="myGroupMaster">
72 <group id="myMasteredGroup">
73 <primitive id="myMasteredGroupedResource"
74 class="ocf" provider="heartbeat" type="Dummy">
75 </primitive>
76 </group>
77 </master>
78 <bundle id="myBundle">
79 <primitive id="myBundledResource"
80 class="ocf" provider="heartbeat" type="Dummy" />
81 </bundle>
82 <bundle id="myEmptyBundle"/>
83 </resources>
84 """
85 ).documentElement
86 resources = cib_dom.getElementsByTagName("resources")[0]
87 resources.parentNode.replaceChild(new_resources, resources)
88 return cib_dom
89
90 def test_dom_get_resources(self): # noqa: PLR0915
91 # pylint: disable=too-many-statements
92 def test_dom_get(method, dom, ok_ids, bad_ids):
93 for element_id in ok_ids:
94 self.assert_element_id(method(dom, element_id), element_id)
95 for element_id in bad_ids:
96 self.assertFalse(method(dom, element_id))
97
98 cib_dom = self.get_cib_empty()
99 self.assertFalse(utils.dom_get_resource(cib_dom, "myResource"))
100 self.assertFalse(
101 utils.dom_get_resource_clone(cib_dom, "myClonedResource")
102 )
103 self.assertFalse(
104 utils.dom_get_resource_masterslave(cib_dom, "myMasteredResource")
105 )
106 self.assertFalse(utils.dom_get_group(cib_dom, "myGroup"))
107 self.assertFalse(utils.dom_get_group_clone(cib_dom, "myClonedGroup"))
108 self.assertFalse(
109 utils.dom_get_group_masterslave(cib_dom, "myMasteredGroup")
110 )
111 self.assertFalse(utils.dom_get_clone(cib_dom, "myClone"))
112 self.assertFalse(utils.dom_get_master(cib_dom, "myMaster"))
113 self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myClone"))
114 self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myMaster"))
115 self.assertFalse(
116 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource")
117 )
118 self.assertFalse(
119 utils.dom_get_resource_clone_ms_parent(
120 cib_dom, "myMasteredResource"
121 )
122 )
123 self.assertIsNone(utils.dom_get_bundle(cib_dom, "myResource"))
124 self.assertIsNone(utils.dom_get_bundle(cib_dom, "notExisting"))
125 self.assertIsNone(
126 utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource")
127 )
128
129 cib_dom = self.get_cib_resources()
130 all_ids = {
131 "none",
132 "myResource",
133 "myClone",
134 "myClonedResource",
135 "myUniqueClone",
136 "myUniqueClonedResource",
137 "myMaster",
138 "myMasteredResource",
139 "myGroup",
140 "myGroupedResource",
141 "myGroupClone",
142 "myClonedGroup",
143 "myClonedGroupedResource",
144 "myGroupMaster",
145 "myMasteredGroup",
146 "myMasteredGroupedResource",
147 "myBundledResource",
148 "myBundle",
149 "myEmptyBundle",
150 }
151
152 resource_ids = {
153 "myResource",
154 "myClonedResource",
155 "myUniqueClonedResource",
156 "myGroupedResource",
157 "myMasteredResource",
158 "myClonedGroupedResource",
159 "myMasteredGroupedResource",
160 "myBundledResource",
161 }
162 test_dom_get(
163 utils.dom_get_resource,
164 cib_dom,
165 resource_ids,
166 all_ids - resource_ids,
167 )
168
169 cloned_ids = {
170 "myClonedResource",
171 "myUniqueClonedResource",
172 "myClonedGroupedResource",
173 }
174 test_dom_get(
175 utils.dom_get_resource_clone,
176 cib_dom,
177 cloned_ids,
178 all_ids - cloned_ids,
179 )
180
181 mastered_ids = {"myMasteredResource", "myMasteredGroupedResource"}
182 test_dom_get(
183 utils.dom_get_resource_masterslave,
184 cib_dom,
185 mastered_ids,
186 all_ids - mastered_ids,
187 )
188
189 group_ids = {"myGroup", "myClonedGroup", "myMasteredGroup"}
190 test_dom_get(
191 utils.dom_get_group, cib_dom, group_ids, all_ids - group_ids
192 )
193
194 cloned_group_ids = {"myClonedGroup"}
195 test_dom_get(
196 utils.dom_get_group_clone,
197 cib_dom,
198 cloned_group_ids,
199 all_ids - cloned_group_ids,
200 )
201
202 clone_ids = {"myClone", "myUniqueClone", "myGroupClone"}
203 test_dom_get(
204 utils.dom_get_clone, cib_dom, clone_ids, all_ids - clone_ids
205 )
206
207 mastered_group_ids = {"myMasteredGroup"}
208 test_dom_get(
209 utils.dom_get_group_masterslave,
210 cib_dom,
211 mastered_group_ids,
212 all_ids - mastered_group_ids,
213 )
214
215 master_ids = {"myMaster", "myGroupMaster"}
216 test_dom_get(
217 utils.dom_get_master, cib_dom, master_ids, all_ids - master_ids
218 )
219
220 bundle_ids = {"myBundle", "myEmptyBundle"}
221 test_dom_get(
222 utils.dom_get_bundle, cib_dom, bundle_ids, all_ids - bundle_ids
223 )
224
225 self.assert_element_id(
226 utils.dom_get_clone_ms_resource(cib_dom, "myClone"),
227 "myClonedResource",
228 )
229 self.assert_element_id(
230 utils.dom_get_clone_ms_resource(cib_dom, "myGroupClone"),
231 "myClonedGroup",
232 )
233 self.assert_element_id(
234 utils.dom_get_clone_ms_resource(cib_dom, "myMaster"),
235 "myMasteredResource",
236 )
237 self.assert_element_id(
238 utils.dom_get_clone_ms_resource(cib_dom, "myGroupMaster"),
239 "myMasteredGroup",
240 )
241
242 self.assert_element_id(
243 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource"),
244 "myClone",
245 )
246 self.assert_element_id(
247 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedGroup"),
248 "myGroupClone",
249 )
250 self.assert_element_id(
251 utils.dom_get_resource_clone_ms_parent(
252 cib_dom, "myClonedGroupedResource"
253 ),
254 "myGroupClone",
255 )
256 self.assert_element_id(
257 utils.dom_get_resource_clone_ms_parent(
258 cib_dom, "myMasteredResource"
259 ),
260 "myMaster",
261 )
262 self.assert_element_id(
263 utils.dom_get_resource_clone_ms_parent(cib_dom, "myMasteredGroup"),
264 "myGroupMaster",
265 )
266 self.assert_element_id(
267 utils.dom_get_resource_clone_ms_parent(
268 cib_dom, "myMasteredGroupedResource"
269 ),
270 "myGroupMaster",
271 )
272 self.assertEqual(
273 None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myResource")
274 )
275 self.assertEqual(
276 None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myGroup")
277 )
278 self.assertEqual(
279 None,
280 utils.dom_get_resource_clone_ms_parent(
281 cib_dom, "myGroupedResource"
282 ),
283 )
284
285 self.assertIsNone(
286 utils.dom_get_resource_bundle(
287 utils.dom_get_bundle(cib_dom, "myEmptyBundle")
288 )
289 )
290 self.assert_element_id(
291 utils.dom_get_resource_bundle(
292 utils.dom_get_bundle(cib_dom, "myBundle")
293 ),
294 "myBundledResource",
295 "primitive",
296 )
297
298 self.assert_element_id(
299 utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource"),
300 "myBundle",
301 )
302 self.assertIsNone(
303 utils.dom_get_resource_bundle_parent(cib_dom, "myResource")
304 )
305 self.assertIsNone(
306 utils.dom_get_resource_bundle_parent(cib_dom, "myClone")
307 )
308 self.assertIsNone(
309 utils.dom_get_resource_bundle_parent(cib_dom, "myClonedResource")
310 )
311 self.assertIsNone(
312 utils.dom_get_resource_bundle_parent(cib_dom, "myMaster")
313 )
314 self.assertIsNone(
315 utils.dom_get_resource_bundle_parent(cib_dom, "myMasteredGroup")
316 )
317 self.assertIsNone(
318 utils.dom_get_resource_bundle_parent(cib_dom, "myGroup")
319 )
320 self.assertIsNone(
321 utils.dom_get_resource_bundle_parent(cib_dom, "myGroupedResource")
322 )
323 self.assertIsNone(
324 utils.dom_get_resource_bundle_parent(cib_dom, "myGroupClone")
325 )
326 self.assertIsNone(
327 utils.dom_get_resource_bundle_parent(cib_dom, "myClonedGroup")
328 )
329 self.assertIsNone(
330 utils.dom_get_resource_bundle_parent(
331 cib_dom, "myClonedGroupedResource"
332 )
333 )
334
335 def test_dom_get_resource_remote_node_name(self):
336 dom = self.get_cib_empty()
337 new_resources = xml.dom.minidom.parseString(
338 """
339 <resources>
340 <primitive id="dummy1"
341 class="ocf" provider="heartbeat" type="Dummy">
342 </primitive>
343 <primitive class="ocf" id="vm-guest1" provider="heartbeat"
344 type="VirtualDomain">
345 <instance_attributes id="vm-guest1-instance_attributes">
346 <nvpair id="vm-guest1-instance_attributes-hypervisor"
347 name="hypervisor" value="qemu:///system"/>
348 <nvpair id="vm-guest1-instance_attributes-config"
349 name="config" value="/root/guest1.xml"/>
350 </instance_attributes>
351 <meta_attributes id="vm-guest1-meta_attributes">
352 <nvpair id="vm-guest1-meta_attributes-remote-node"
353 name="remote-node" value="guest1"/>
354 </meta_attributes>
355 </primitive>
356 <primitive id="dummy2"
357 class="ocf" provider="heartbeat" type="Dummy">
358 <instance_attributes id="vm-guest1-meta_attributes">
359 <nvpair id="dummy2-remote-node"
360 name="remote-node" value="guest2"/>
361 </instance_attributes>
362 </primitive>
363 <primitive id="dummy3"
364 class="ocf" provider="pacemaker" type="remote">
365 </primitive>
366 </resources>
367 """
368 ).documentElement
369 resources = dom.getElementsByTagName("resources")[0]
370 resources.parentNode.replaceChild(new_resources, resources)
371
372 self.assertEqual(
373 None,
374 utils.dom_get_resource_remote_node_name(
375 utils.dom_get_resource(dom, "dummy1")
376 ),
377 )
378 self.assertEqual(
379 None,
380 utils.dom_get_resource_remote_node_name(
381 utils.dom_get_resource(dom, "dummy2")
382 ),
383 )
384 self.assertEqual(
385 "guest1",
386 utils.dom_get_resource_remote_node_name(
387 utils.dom_get_resource(dom, "vm-guest1")
388 ),
389 )
390 self.assertEqual(
391 "dummy3",
392 utils.dom_get_resource_remote_node_name(
393 utils.dom_get_resource(dom, "dummy3")
394 ),
395 )
396
397 def test_dom_get_meta_attr_value(self):
398 dom = self.get_cib_empty()
399 new_resources = xml.dom.minidom.parseString(
400 """
401 <resources>
402 <primitive id="dummy1"
403 class="ocf" provider="heartbeat" type="Dummy">
404 </primitive>
405 <primitive class="ocf" id="vm-guest1" provider="heartbeat"
406 type="VirtualDomain">
407 <instance_attributes id="vm-guest1-instance_attributes">
408 <nvpair id="vm-guest1-instance_attributes-hypervisor"
409 name="hypervisor" value="qemu:///system"/>
410 <nvpair id="vm-guest1-instance_attributes-config"
411 name="config" value="/root/guest1.xml"/>
412 </instance_attributes>
413 <meta_attributes id="vm-guest1-meta_attributes">
414 <nvpair id="vm-guest1-meta_attributes-remote-node"
415 name="remote-node" value="guest1"/>
416 </meta_attributes>
417 </primitive>
418 <primitive id="dummy2"
419 class="ocf" provider="heartbeat" type="Dummy">
420 <instance_attributes id="vm-guest1-meta_attributes">
421 <nvpair id="dummy2-remote-node"
422 name="remote-node" value="guest2"/>
423 </instance_attributes>
424 </primitive>
425 </resources>
426 """
427 ).documentElement
428 resources = dom.getElementsByTagName("resources")[0]
429 resources.parentNode.replaceChild(new_resources, resources)
430
431 self.assertEqual(
432 None,
433 utils.dom_get_meta_attr_value(
434 utils.dom_get_resource(dom, "dummy1"), "foo"
435 ),
436 )
437 self.assertEqual(
438 None,
439 utils.dom_get_meta_attr_value(
440 utils.dom_get_resource(dom, "dummy2"), "remote-node"
441 ),
442 )
443 self.assertEqual(
444 "guest1",
445 utils.dom_get_meta_attr_value(
446 utils.dom_get_resource(dom, "vm-guest1"), "remote-node"
447 ),
448 )
449 self.assertEqual(
450 None,
451 utils.dom_get_meta_attr_value(
452 utils.dom_get_resource(dom, "vm-guest1"), "foo"
453 ),
454 )
455
456 def test_dom_get_parent_by_tag_name(self):
457 def dom_get_element_with_id(dom, tag_name, element_id):
458 for elem in dom.getElementsByTagName(tag_name):
459 if (
460 elem.hasAttribute("id")
461 and elem.getAttribute("id") == element_id
462 ):
463 return elem
464 return None
465
466 dom = xml.dom.minidom.parseString(
467 """
468 <aa id="aa1">
469 <bb id="bb1"/>
470 <bb id="bb2">
471 <cc id="cc1"/>
472 </bb>
473 <bb id="bb3">
474 <cc id="cc2"/>
475 </bb>
476 <dd id="dd1" />
477 </aa>
478 """
479 ).documentElement
480 bb1 = dom_get_element_with_id(dom, "bb", "bb1")
481 cc1 = dom_get_element_with_id(dom, "cc", "cc1")
482
483 self.assert_element_id(
484 utils.dom_get_parent_by_tag_names(bb1, ["aa"]), "aa1"
485 )
486 self.assert_element_id(
487 utils.dom_get_parent_by_tag_names(cc1, ["aa"]), "aa1"
488 )
489 self.assert_element_id(
490 utils.dom_get_parent_by_tag_names(cc1, ["bb"]), "bb2"
491 )
492
493 self.assertEqual(None, utils.dom_get_parent_by_tag_names(bb1, ["cc"]))
494 self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["dd"]))
495 self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["ee"]))
496
497 def test_validate_constraint_resource(self):
498 dom = self.get_cib_resources()
499 self.assertEqual(
500 (True, "", "myClone"),
501 utils.validate_constraint_resource(dom, "myClone"),
502 )
503 self.assertEqual(
504 (True, "", "myGroupClone"),
505 utils.validate_constraint_resource(dom, "myGroupClone"),
506 )
507 self.assertEqual(
508 (True, "", "myMaster"),
509 utils.validate_constraint_resource(dom, "myMaster"),
510 )
511 self.assertEqual(
512 (True, "", "myGroupMaster"),
513 utils.validate_constraint_resource(dom, "myGroupMaster"),
514 )
515 self.assertEqual(
516 (True, "", "myBundle"),
517 utils.validate_constraint_resource(dom, "myBundle"),
518 )
519 self.assertEqual(
520 (True, "", "myEmptyBundle"),
521 utils.validate_constraint_resource(dom, "myEmptyBundle"),
522 )
523 self.assertEqual(
524 (True, "", "myResource"),
525 utils.validate_constraint_resource(dom, "myResource"),
526 )
527 self.assertEqual(
528 (True, "", "myGroup"),
529 utils.validate_constraint_resource(dom, "myGroup"),
530 )
531 self.assertEqual(
532 (True, "", "myGroupedResource"),
533 utils.validate_constraint_resource(dom, "myGroupedResource"),
534 )
535
536 self.assertEqual(
537 (False, "Resource 'myNonexistent' does not exist", None),
538 utils.validate_constraint_resource(dom, "myNonexistent"),
539 )
540
541 message = (
542 "%s is a clone resource, you should use the clone id: "
543 "%s when adding constraints. Use --force to override."
544 )
545 self.assertEqual(
546 (False, message % ("myClonedResource", "myClone"), "myClone"),
547 utils.validate_constraint_resource(dom, "myClonedResource"),
548 )
549 self.assertEqual(
550 (
551 False,
552 message % ("myClonedGroup", "myGroupClone"),
553 "myGroupClone",
554 ),
555 utils.validate_constraint_resource(dom, "myClonedGroup"),
556 )
557 self.assertEqual(
558 (
559 False,
560 message % ("myClonedGroupedResource", "myGroupClone"),
561 "myGroupClone",
562 ),
563 utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
564 )
565
566 message = (
567 "%s is a clone resource, you should use the clone id: "
568 "%s when adding constraints. Use --force to override."
569 )
570 self.assertEqual(
571 (False, message % ("myMasteredResource", "myMaster"), "myMaster"),
572 utils.validate_constraint_resource(dom, "myMasteredResource"),
573 )
574 self.assertEqual(
575 (
576 False,
577 message % ("myMasteredGroup", "myGroupMaster"),
578 "myGroupMaster",
579 ),
580 utils.validate_constraint_resource(dom, "myMasteredGroup"),
581 )
582 self.assertEqual(
583 (
584 False,
585 message % ("myMasteredGroupedResource", "myGroupMaster"),
586 "myGroupMaster",
587 ),
588 utils.validate_constraint_resource(
589 dom, "myMasteredGroupedResource"
590 ),
591 )
592
593 message = (
594 "%s is a bundle resource, you should use the bundle id: "
595 "%s when adding constraints. Use --force to override."
596 )
597 self.assertEqual(
598 (False, message % ("myBundledResource", "myBundle"), "myBundle"),
599 utils.validate_constraint_resource(dom, "myBundledResource"),
600 )
601
602 utils.pcs_options["--force"] = True
603 self.assertEqual(
604 (True, "", "myClone"),
605 utils.validate_constraint_resource(dom, "myClonedResource"),
606 )
607 self.assertEqual(
608 (True, "", "myGroupClone"),
609 utils.validate_constraint_resource(dom, "myClonedGroup"),
610 )
611 self.assertEqual(
612 (True, "", "myGroupClone"),
613 utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
614 )
615 self.assertEqual(
616 (True, "", "myMaster"),
617 utils.validate_constraint_resource(dom, "myMasteredResource"),
618 )
619 self.assertEqual(
620 (True, "", "myGroupMaster"),
621 utils.validate_constraint_resource(dom, "myMasteredGroup"),
622 )
623 self.assertEqual(
624 (True, "", "myGroupMaster"),
625 utils.validate_constraint_resource(
626 dom, "myMasteredGroupedResource"
627 ),
628 )
629 self.assertEqual(
630 (True, "", "myBundle"),
631 utils.validate_constraint_resource(dom, "myBundledResource"),
632 )
633
634 def test_validate_xml_id(self):
635 self.assertEqual((True, ""), utils.validate_xml_id("dummy"))
636 self.assertEqual((True, ""), utils.validate_xml_id("DUMMY"))
637 self.assertEqual((True, ""), utils.validate_xml_id("dUmMy"))
638 self.assertEqual((True, ""), utils.validate_xml_id("dummy0"))
639 self.assertEqual((True, ""), utils.validate_xml_id("dum0my"))
640 self.assertEqual((True, ""), utils.validate_xml_id("dummy-"))
641 self.assertEqual((True, ""), utils.validate_xml_id("dum-my"))
642 self.assertEqual((True, ""), utils.validate_xml_id("dummy."))
643 self.assertEqual((True, ""), utils.validate_xml_id("dum.my"))
644 self.assertEqual((True, ""), utils.validate_xml_id("_dummy"))
645 self.assertEqual((True, ""), utils.validate_xml_id("dummy_"))
646 self.assertEqual((True, ""), utils.validate_xml_id("dum_my"))
647
648 self.assertEqual(
649 (False, "test id cannot be empty"),
650 utils.validate_xml_id("", "test id"),
651 )
652
653 msg = "invalid test id '%s', '%s' is not a valid first character for a test id"
654 self.assertEqual(
655 (False, msg % ("0", "0")), utils.validate_xml_id("0", "test id")
656 )
657 self.assertEqual(
658 (False, msg % ("-", "-")), utils.validate_xml_id("-", "test id")
659 )
660 self.assertEqual(
661 (False, msg % (".", ".")), utils.validate_xml_id(".", "test id")
662 )
663 self.assertEqual(
664 (False, msg % (":", ":")), utils.validate_xml_id(":", "test id")
665 )
666 self.assertEqual(
667 (False, msg % ("0dummy", "0")),
668 utils.validate_xml_id("0dummy", "test id"),
669 )
670 self.assertEqual(
671 (False, msg % ("-dummy", "-")),
672 utils.validate_xml_id("-dummy", "test id"),
673 )
674 self.assertEqual(
675 (False, msg % (".dummy", ".")),
676 utils.validate_xml_id(".dummy", "test id"),
677 )
678 self.assertEqual(
679 (False, msg % (":dummy", ":")),
680 utils.validate_xml_id(":dummy", "test id"),
681 )
682
683 msg = (
684 "invalid test id '%s', '%s' is not a valid character for a test id"
685 )
686 self.assertEqual(
687 (False, msg % ("dum:my", ":")),
688 utils.validate_xml_id("dum:my", "test id"),
689 )
690 self.assertEqual(
691 (False, msg % ("dummy:", ":")),
692 utils.validate_xml_id("dummy:", "test id"),
693 )
694 self.assertEqual(
695 (False, msg % ("dum?my", "?")),
696 utils.validate_xml_id("dum?my", "test id"),
697 )
698 self.assertEqual(
699 (False, msg % ("dummy?", "?")),
700 utils.validate_xml_id("dummy?", "test id"),
701 )
702
703 def test_is_score(self):
704 self.assertTrue(utils.is_score("INFINITY"))
705 self.assertTrue(utils.is_score("+INFINITY"))
706 self.assertTrue(utils.is_score("-INFINITY"))
707 self.assertTrue(utils.is_score("0"))
708 self.assertTrue(utils.is_score("+0"))
709 self.assertTrue(utils.is_score("-0"))
710 self.assertTrue(utils.is_score("123"))
711 self.assertTrue(utils.is_score("-123"))
712 self.assertTrue(utils.is_score("+123"))
713
714 self.assertFalse(utils.is_score(""))
715 self.assertFalse(utils.is_score("abc"))
716 self.assertFalse(utils.is_score("+abc"))
717 self.assertFalse(utils.is_score("-abc"))
718 self.assertFalse(utils.is_score("10a"))
719 self.assertFalse(utils.is_score("+10a"))
720 self.assertFalse(utils.is_score("-10a"))
721 self.assertFalse(utils.is_score("a10"))
722 self.assertFalse(utils.is_score("+a10"))
723 self.assertFalse(utils.is_score("a-10"))
724 self.assertFalse(utils.is_score("infinity"))
725 self.assertFalse(utils.is_score("+infinity"))
726 self.assertFalse(utils.is_score("-infinity"))
727 self.assertFalse(utils.is_score("+InFiNiTy"))
728 self.assertFalse(utils.is_score("INFINITY10"))
729 self.assertFalse(utils.is_score("INFINITY+10"))
730 self.assertFalse(utils.is_score("-INFINITY10"))
731 self.assertFalse(utils.is_score("+INFINITY+10"))
732 self.assertFalse(utils.is_score("10INFINITY"))
733 self.assertFalse(utils.is_score("+10+INFINITY"))
734
735 def get_cib_status_lrm(self):
736 cib_dom = self.get_cib_empty()
737 new_status = xml.dom.minidom.parseString(
738 """
739 <status>
740 <node_state id="1" uname="rh70-node1">
741 <lrm id="1">
742 <lrm_resources>
743 <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
744 <lrm_rsc_op id="dummy_monitor_30000" operation="monitor" call-id="34"
745 rc-code="1" on_node="Xrh70-node1X" exit-reason="test" />
746 <lrm_rsc_op id="dummy_stop_0" operation="stop" call-id="32"
747 rc-code="0" />
748 <lrm_rsc_op id="dummy_start_0" operation="start" call-id="33"
749 rc-code="0" />
750 </lrm_resource>
751 </lrm_resources>
752 </lrm>
753 </node_state>
754 <node_state id="2" uname="rh70-node2">
755 <lrm id="2">
756 <lrm_resources>
757 <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
758 <lrm_rsc_op id="dummy_monitor_0" operation="monitor" call-id="5"
759 rc-code="1" />
760 </lrm_resource>
761 <lrm_resource id="dummy1" type="Dummy" class="ocf" provider="heartbeat">
762 <lrm_rsc_op id="dummy1_monitor_0" operation="monitor" call-id="3"
763 rc-code="0" />
764 </lrm_resource>
765 </lrm_resources>
766 </lrm>
767 </node_state>
768 </status>
769 """
770 ).documentElement
771 status = cib_dom.getElementsByTagName("status")[0]
772 status.parentNode.replaceChild(new_status, status)
773 return cib_dom
774
775 def test_resource_running_on(self):
776 status = xml.dom.minidom.parseString(
777 f"""
778 <crm_mon>
779 <summary />
780 <nodes />
781 <resources>
782 <resource id="myResource" role="Started">
783 <node name="rh70-node1" />
784 </resource>
785 <clone id="myClone">
786 <resource id="myClonedResource" role="Started">
787 <node name="rh70-node1" />
788 </resource>
789 <resource id="myClonedResource" role="Started">
790 <node name="rh70-node2" />
791 </resource>
792 <resource id="myClonedResource" role="Started">
793 <node name="rh70-node3" />
794 </resource>
795 </clone>
796 <clone id="myMaster">
797 <resource id="myMasteredResource:1" role="{const.PCMK_ROLE_UNPROMOTED}">
798 <node name="rh70-node2" />
799 </resource>
800 <resource id="myMasteredResource" role="{const.PCMK_ROLE_UNPROMOTED}">
801 <node name="rh70-node3" />
802 </resource>
803 <resource id="myMasteredResource" role="{const.PCMK_ROLE_PROMOTED}">
804 <node name="rh70-node1" />
805 </resource>
806 </clone>
807 <group id="myGroup">
808 <resource id="myGroupedResource" role="Started">
809 <node name="rh70-node2" />
810 </resource>
811 </group>
812 <clone id="myGroupClone">
813 <group id="myClonedGroup:0">
814 <resource id="myClonedGroupedResource" role="Started">
815 <node name="rh70-node1" />
816 </resource>
817 </group>
818 <group id="myClonedGroup:1">
819 <resource id="myClonedGroupedResource" role="Started">
820 <node name="rh70-node2" />
821 </resource>
822 </group>
823 <group id="myClonedGroup:2">
824 <resource id="myClonedGroupedResource" role="Started">
825 <node name="rh70-node3" />
826 </resource>
827 </group>
828 <group id="myClonedGroup:3">
829 <resource id="myClonedGroupedResource" role="Started">
830 <node name="rh70-node3" />
831 </resource>
832 </group>
833 </clone>
834 <clone id="myGroupMaster">
835 <group id="myMasteredGroup:0">
836 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED}">
837 <node name="rh70-node1" />
838 </resource>
839 </group>
840 <group id="myMasteredGroup:1">
841 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_PROMOTED}">
842 <node name="rh70-node2" />
843 </resource>
844 </group>
845 <group id="myMasteredGroup:2">
846 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED}">
847 <node name="rh70-node3" />
848 </resource>
849 </group>
850 </clone>
851 <resource id="myStoppedResource" role="Stopped">
852 </resource>
853 </resources>
854 </crm_mon>
855 """
856 ).documentElement
857
858 self.assertEqual(
859 utils.resource_running_on("myResource", status),
860 {
861 "message": "Resource 'myResource' is running on node rh70-node1.",
862 "is_running": True,
863 },
864 )
865 self.assertEqual(
866 utils.resource_running_on("myClonedResource", status),
867 {
868 "message": "Resource 'myClonedResource' is running on nodes "
869 "rh70-node1, rh70-node2, rh70-node3.",
870 "is_running": True,
871 },
872 )
873 self.assertEqual(
874 utils.resource_running_on("myClone", status),
875 {
876 "message": "Resource 'myClone' is running on nodes "
877 "rh70-node1, rh70-node2, rh70-node3.",
878 "is_running": True,
879 },
880 )
881 self.assertEqual(
882 utils.resource_running_on("myMasteredResource", status),
883 {
884 "message": (
885 "Resource 'myMasteredResource' is {promoted} on node "
886 "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
887 ).format(
888 promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
889 unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
890 ),
891 "is_running": True,
892 },
893 )
894 self.assertEqual(
895 utils.resource_running_on("myMaster", status),
896 {
897 "message": (
898 "Resource 'myMaster' is {promoted} on node "
899 "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
900 ).format(
901 promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
902 unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
903 ),
904 "is_running": True,
905 },
906 )
907 self.assertEqual(
908 utils.resource_running_on("myGroupedResource", status),
909 {
910 "message": "Resource 'myGroupedResource' is running on node "
911 "rh70-node2.",
912 "is_running": True,
913 },
914 )
915 self.assertEqual(
916 utils.resource_running_on("myGroup", status),
917 {
918 "message": "Resource 'myGroup' is running on node rh70-node2.",
919 "is_running": True,
920 },
921 )
922 self.assertEqual(
923 utils.resource_running_on("myClonedGroupedResource", status),
924 {
925 "message": "Resource 'myClonedGroupedResource' is running on nodes "
926 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
927 "is_running": True,
928 },
929 )
930 self.assertEqual(
931 utils.resource_running_on("myClonedGroup", status),
932 {
933 "message": "Resource 'myClonedGroup' is running on nodes "
934 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
935 "is_running": True,
936 },
937 )
938 self.assertEqual(
939 utils.resource_running_on("myGroupClone", status),
940 {
941 "message": "Resource 'myGroupClone' is running on nodes "
942 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
943 "is_running": True,
944 },
945 )
946 self.assertEqual(
947 utils.resource_running_on("myMasteredGroupedResource", status),
948 {
949 "message": (
950 "Resource 'myMasteredGroupedResource' is {promoted} on node "
951 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
952 ).format(
953 promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
954 unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
955 ),
956 "is_running": True,
957 },
958 )
959 self.assertEqual(
960 utils.resource_running_on("myMasteredGroup", status),
961 {
962 "message": (
963 "Resource 'myMasteredGroup' is {promoted} on node "
964 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
965 ).format(
966 promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
967 unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
968 ),
969 "is_running": True,
970 },
971 )
972 self.assertEqual(
973 utils.resource_running_on("myGroupMaster", status),
974 {
975 "message": (
976 "Resource 'myGroupMaster' is {promoted} on node "
977 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
978 ).format(
979 promoted=str(const.PCMK_ROLE_PROMOTED).lower(),
980 unpromoted=str(const.PCMK_ROLE_UNPROMOTED).lower(),
981 ),
982 "is_running": True,
983 },
984 )
985 self.assertEqual(
986 utils.resource_running_on("notMyResource", status),
987 {
988 "message": "Resource 'notMyResource' is not running on any node",
989 "is_running": False,
990 },
991 )
992 self.assertEqual(
993 utils.resource_running_on("myStoppedResource", status),
994 {
995 "message": "Resource 'myStoppedResource' is not running on any node",
996 "is_running": False,
997 },
998 )
999
1000 def test_get_operations_from_transitions(self):
1001 transitions = xml.dom.minidom.parse(rc("transitions01.xml"))
1002 self.assertEqual(
1003 [
1004 {
1005 "id": "dummy",
1006 "long_id": "dummy",
1007 "operation": "stop",
1008 "on_node": "rh7-3",
1009 },
1010 {
1011 "id": "dummy",
1012 "long_id": "dummy",
1013 "operation": "start",
1014 "on_node": "rh7-2",
1015 },
1016 {
1017 "id": "d0",
1018 "long_id": "d0:1",
1019 "operation": "stop",
1020 "on_node": "rh7-1",
1021 },
1022 {
1023 "id": "d0",
1024 "long_id": "d0:1",
1025 "operation": "start",
1026 "on_node": "rh7-2",
1027 },
1028 {
1029 "id": "state",
1030 "long_id": "state:0",
1031 "operation": "stop",
1032 "on_node": "rh7-3",
1033 },
1034 {
1035 "id": "state",
1036 "long_id": "state:0",
1037 "operation": "start",
1038 "on_node": "rh7-2",
1039 },
1040 ],
1041 utils.get_operations_from_transitions(transitions),
1042 )
1043
1044 transitions = xml.dom.minidom.parse(rc("transitions02.xml"))
1045 self.assertEqual(
1046 [
1047 {
1048 "id": "RemoteNode",
1049 "long_id": "RemoteNode",
1050 "operation": "stop",
1051 "on_node": "virt-143",
1052 },
1053 {
1054 "id": "RemoteNode",
1055 "long_id": "RemoteNode",
1056 "operation": "migrate_to",
1057 "on_node": "virt-143",
1058 },
1059 {
1060 "id": "RemoteNode",
1061 "long_id": "RemoteNode",
1062 "operation": "migrate_from",
1063 "on_node": "virt-142",
1064 },
1065 {
1066 "id": "dummy8",
1067 "long_id": "dummy8",
1068 "operation": "stop",
1069 "on_node": "virt-143",
1070 },
1071 {
1072 "id": "dummy8",
1073 "long_id": "dummy8",
1074 "operation": "start",
1075 "on_node": "virt-142",
1076 },
1077 ],
1078 utils.get_operations_from_transitions(transitions),
1079 )
1080
1081 def test_get_resources_location_from_operations(self):
1082 cib_dom = self.get_cib_resources()
1083
1084 operations = []
1085 self.assertEqual(
1086 {},
1087 utils.get_resources_location_from_operations(cib_dom, operations),
1088 )
1089
1090 operations = [
1091 {
1092 "id": "myResource",
1093 "long_id": "myResource",
1094 "operation": "start",
1095 "on_node": "rh7-1",
1096 },
1097 ]
1098 self.assertEqual(
1099 {
1100 "myResource": {
1101 "id": "myResource",
1102 "id_for_constraint": "myResource",
1103 "long_id": "myResource",
1104 "start_on_node": "rh7-1",
1105 },
1106 },
1107 utils.get_resources_location_from_operations(cib_dom, operations),
1108 )
1109
1110 operations = [
1111 {
1112 "id": "myResource",
1113 "long_id": "myResource",
1114 "operation": "start",
1115 "on_node": "rh7-1",
1116 },
1117 {
1118 "id": "myResource",
1119 "long_id": "myResource",
1120 "operation": "start",
1121 "on_node": "rh7-2",
1122 },
1123 {
1124 "id": "myResource",
1125 "long_id": "myResource",
1126 "operation": "monitor",
1127 "on_node": "rh7-3",
1128 },
1129 {
1130 "id": "myResource",
1131 "long_id": "myResource",
1132 "operation": "stop",
1133 "on_node": "rh7-3",
1134 },
1135 ]
1136 self.assertEqual(
1137 {
1138 "myResource": {
1139 "id": "myResource",
1140 "id_for_constraint": "myResource",
1141 "long_id": "myResource",
1142 "start_on_node": "rh7-2",
1143 },
1144 },
1145 utils.get_resources_location_from_operations(cib_dom, operations),
1146 )
1147
1148 operations = [
1149 {
1150 "id": "myResource",
1151 "long_id": "myResource",
1152 "operation": "start",
1153 "on_node": "rh7-1",
1154 },
1155 {
1156 "id": "myClonedResource",
1157 "long_id": "myClonedResource:0",
1158 "operation": "start",
1159 "on_node": "rh7-1",
1160 },
1161 {
1162 "id": "myClonedResource",
1163 "long_id": "myClonedResource:0",
1164 "operation": "start",
1165 "on_node": "rh7-2",
1166 },
1167 {
1168 "id": "myClonedResource",
1169 "long_id": "myClonedResource:1",
1170 "operation": "start",
1171 "on_node": "rh7-3",
1172 },
1173 ]
1174 self.assertEqual(
1175 {
1176 "myResource": {
1177 "id": "myResource",
1178 "id_for_constraint": "myResource",
1179 "long_id": "myResource",
1180 "start_on_node": "rh7-1",
1181 },
1182 "myClonedResource:0": {
1183 "id": "myClonedResource",
1184 "id_for_constraint": "myClone",
1185 "long_id": "myClonedResource:0",
1186 "start_on_node": "rh7-2",
1187 },
1188 "myClonedResource:1": {
1189 "id": "myClonedResource",
1190 "id_for_constraint": "myClone",
1191 "long_id": "myClonedResource:1",
1192 "start_on_node": "rh7-3",
1193 },
1194 },
1195 utils.get_resources_location_from_operations(cib_dom, operations),
1196 )
1197
1198 operations = [
1199 {
1200 "id": "myUniqueClonedResource:0",
1201 "long_id": "myUniqueClonedResource:0",
1202 "operation": "start",
1203 "on_node": "rh7-1",
1204 },
1205 {
1206 "id": "myUniqueClonedResource:1",
1207 "long_id": "myUniqueClonedResource:1",
1208 "operation": "monitor",
1209 "on_node": "rh7-2",
1210 },
1211 {
1212 "id": "myUniqueClonedResource:2",
1213 "long_id": "myUniqueClonedResource:2",
1214 "operation": "start",
1215 "on_node": "rh7-3",
1216 },
1217 ]
1218 self.assertEqual(
1219 {
1220 "myUniqueClonedResource:0": {
1221 "id": "myUniqueClonedResource:0",
1222 "id_for_constraint": "myUniqueClone",
1223 "long_id": "myUniqueClonedResource:0",
1224 "start_on_node": "rh7-1",
1225 },
1226 "myUniqueClonedResource:2": {
1227 "id": "myUniqueClonedResource:2",
1228 "id_for_constraint": "myUniqueClone",
1229 "long_id": "myUniqueClonedResource:2",
1230 "start_on_node": "rh7-3",
1231 },
1232 },
1233 utils.get_resources_location_from_operations(cib_dom, operations),
1234 )
1235
1236 operations = [
1237 {
1238 "id": "myMasteredGroupedResource",
1239 "long_id": "myMasteredGroupedResource:0",
1240 "operation": "start",
1241 "on_node": "rh7-1",
1242 },
1243 {
1244 "id": "myMasteredGroupedResource",
1245 "long_id": "myMasteredGroupedResource:1",
1246 "operation": "demote",
1247 "on_node": "rh7-2",
1248 },
1249 {
1250 "id": "myMasteredGroupedResource",
1251 "long_id": "myMasteredGroupedResource:1",
1252 "operation": "promote",
1253 "on_node": "rh7-3",
1254 },
1255 ]
1256 self.assertEqual(
1257 {
1258 "myMasteredGroupedResource:0": {
1259 "id": "myMasteredGroupedResource",
1260 "id_for_constraint": "myGroupMaster",
1261 "long_id": "myMasteredGroupedResource:0",
1262 "start_on_node": "rh7-1",
1263 },
1264 "myMasteredGroupedResource:1": {
1265 "id": "myMasteredGroupedResource",
1266 "id_for_constraint": "myGroupMaster",
1267 "long_id": "myMasteredGroupedResource:1",
1268 "promote_on_node": "rh7-3",
1269 },
1270 },
1271 utils.get_resources_location_from_operations(cib_dom, operations),
1272 )
1273
1274 operations = [
1275 {
1276 "id": "myResource",
1277 "long_id": "myResource",
1278 "operation": "stop",
1279 "on_node": "rh7-1",
1280 },
1281 {
1282 "id": "myResource",
1283 "long_id": "myResource",
1284 "operation": "migrate_to",
1285 "on_node": "rh7-1",
1286 },
1287 {
1288 "id": "myResource",
1289 "long_id": "myResource",
1290 "operation": "migrate_from",
1291 "on_node": "rh7-2",
1292 },
1293 ]
1294 self.assertEqual(
1295 {
1296 "myResource": {
1297 "id": "myResource",
1298 "id_for_constraint": "myResource",
1299 "long_id": "myResource",
1300 "start_on_node": "rh7-2",
1301 },
1302 },
1303 utils.get_resources_location_from_operations(cib_dom, operations),
1304 )
1305
1306 def test_is_int(self):
1307 self.assertTrue(utils.is_int("-999"))
1308 self.assertTrue(utils.is_int("-1"))
1309 self.assertTrue(utils.is_int("0"))
1310 self.assertTrue(utils.is_int("1"))
1311 self.assertTrue(utils.is_int("99999"))
1312 self.assertTrue(utils.is_int(" 99999 "))
1313 self.assertFalse(utils.is_int("0.0"))
1314 self.assertFalse(utils.is_int("-1.0"))
1315 self.assertFalse(utils.is_int("-0.1"))
1316 self.assertFalse(utils.is_int("0.001"))
1317 self.assertFalse(utils.is_int("-999999.1"))
1318 self.assertFalse(utils.is_int("0.0001"))
1319 self.assertFalse(utils.is_int(""))
1320 self.assertFalse(utils.is_int(" "))
1321 self.assertFalse(utils.is_int("A"))
1322 self.assertFalse(utils.is_int("random 15 47 text "))
1323
1324 def test_dom_get_node(self):
1325 cib = self.get_cib_with_nodes_minidom()
1326 self.assertIsNone(utils.dom_get_node(cib, "non-existing-node"))
1327 node = utils.dom_get_node(cib, "rh7-1")
1328 self.assertEqual(node.getAttribute("uname"), "rh7-1")
1329 self.assertEqual(node.getAttribute("id"), "1")
1330
1331 def test_dom_prepare_child_element(self):
1332 cib = self.get_cib_with_nodes_minidom()
1333 node = cib.getElementsByTagName("node")[0]
1334 self.assertEqual(len(dom_get_child_elements(node)), 0)
1335 child = utils.dom_prepare_child_element(
1336 node, "utilization", "rh7-1-utilization"
1337 )
1338 self.assertEqual(len(dom_get_child_elements(node)), 1)
1339 self.assertEqual(child, dom_get_child_elements(node)[0])
1340 self.assertEqual(dom_get_child_elements(node)[0].tagName, "utilization")
1341 self.assertEqual(
1342 dom_get_child_elements(node)[0].getAttribute("id"),
1343 "rh7-1-utilization",
1344 )
1345 child2 = utils.dom_prepare_child_element(
1346 node, "utilization", "rh7-1-utilization"
1347 )
1348 self.assertEqual(len(dom_get_child_elements(node)), 1)
1349 self.assertEqual(child, child2)
1350
1351 def test_dom_update_nv_pair_add(self):
1352 nv_set = xml.dom.minidom.parseString("<nvset/>").documentElement
1353 utils.dom_update_nv_pair(nv_set, "test_name", "test_val", "prefix-")
1354 self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1355 pair = dom_get_child_elements(nv_set)[0]
1356 self.assertEqual(pair.getAttribute("name"), "test_name")
1357 self.assertEqual(pair.getAttribute("value"), "test_val")
1358 self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1359 utils.dom_update_nv_pair(nv_set, "another_name", "value", "prefix2-")
1360 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1361 self.assertEqual(pair, dom_get_child_elements(nv_set)[0])
1362 pair = dom_get_child_elements(nv_set)[1]
1363 self.assertEqual(pair.getAttribute("name"), "another_name")
1364 self.assertEqual(pair.getAttribute("value"), "value")
1365 self.assertEqual(pair.getAttribute("id"), "prefix2-another_name")
1366
1367 def test_dom_update_nv_pair_update(self):
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1368 nv_set = xml.dom.minidom.parseString(
1369 """
1370 <nv_set>
1371 <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1372 <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1373 </nv_set>
1374 """
1375 ).documentElement
1376 utils.dom_update_nv_pair(nv_set, "test_name", "new_value")
1377 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1378 pair1 = dom_get_child_elements(nv_set)[0]
1379 pair2 = dom_get_child_elements(nv_set)[1]
1380 self.assertEqual(pair1.getAttribute("name"), "test_name")
1381 self.assertEqual(pair1.getAttribute("value"), "new_value")
1382 self.assertEqual(pair1.getAttribute("id"), "prefix-test_name")
1383 self.assertEqual(pair2.getAttribute("name"), "another_name")
1384 self.assertEqual(pair2.getAttribute("value"), "value")
1385 self.assertEqual(pair2.getAttribute("id"), "prefix2-another_name")
1386
1387 def test_dom_update_nv_pair_remove(self):
1388 nv_set = xml.dom.minidom.parseString(
1389 """
1390 <nv_set>
1391 <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1392 <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1393 </nv_set>
1394 """
1395 ).documentElement
1396 utils.dom_update_nv_pair(nv_set, "non_existing_name", "")
1397 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1398 utils.dom_update_nv_pair(nv_set, "another_name", "")
1399 self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1400 pair = dom_get_child_elements(nv_set)[0]
1401 self.assertEqual(pair.getAttribute("name"), "test_name")
1402 self.assertEqual(pair.getAttribute("value"), "test_val")
1403 self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1404 utils.dom_update_nv_pair(nv_set, "test_name", "")
1405 self.assertEqual(len(dom_get_child_elements(nv_set)), 0)
1406
1407 def test_convert_args_to_tuples(self):
1408 out = utils.convert_args_to_tuples(
1409 ["invalid_string", "key=value", "key2=val=ue", "k e y= v a l u e "]
1410 )
1411 self.assertEqual(
1412 out,
1413 [("key", "value"), ("key2", "val=ue"), ("k e y", " v a l u e ")],
1414 )
1415
1416 def test_dom_update_utilization_invalid(self):
1417 # commands writes to stderr
1418 # we want clean test output, so we capture it
1419 tmp_stderr = sys.stderr
1420 sys.stderr = StringIO()
1421
1422 el = xml.dom.minidom.parseString(
1423 """
1424 <resource id="test_id"/>
1425 """
1426 ).documentElement
1427 self.assertRaises(
1428 SystemExit,
1429 utils.dom_update_utilization,
1430 el,
1431 {"name": "invalid_val"},
1432 )
1433
1434 self.assertRaises(
1435 SystemExit, utils.dom_update_utilization, el, {"name": "0.01"}
1436 )
1437
1438 sys.stderr = tmp_stderr
1439
1440 def test_dom_update_utilization_add(self):
1441 el = xml.dom.minidom.parseString(
1442 """
1443 <resource id="test_id"/>
1444 """
1445 ).documentElement
1446 utils.dom_update_utilization(
1447 el,
1448 {
1449 "name": "",
1450 "key": "-1",
1451 "keys": "90",
1452 },
1453 )
1454
1455 self.assertEqual(len(dom_get_child_elements(el)), 1)
1456 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1457 self.assertEqual(u.tagName, "utilization")
1458 self.assertEqual(u.getAttribute("id"), "test_id-utilization")
1459 self.assertEqual(len(dom_get_child_elements(u)), 2)
1460
1461 self.assertEqual(
1462 dom_get_child_elements(u)[0].getAttribute("id"),
1463 "test_id-utilization-key",
1464 )
1465 self.assertEqual(
1466 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1467 )
1468 self.assertEqual(
1469 dom_get_child_elements(u)[0].getAttribute("value"), "-1"
1470 )
1471 self.assertEqual(
1472 dom_get_child_elements(u)[1].getAttribute("id"),
1473 "test_id-utilization-keys",
1474 )
1475 self.assertEqual(
1476 dom_get_child_elements(u)[1].getAttribute("name"), "keys"
1477 )
1478 self.assertEqual(
1479 dom_get_child_elements(u)[1].getAttribute("value"), "90"
1480 )
1481
1482 def test_dom_update_utilization_update_remove(self):
1483 el = xml.dom.minidom.parseString(
1484 """
1485 <resource id="test_id">
1486 <utilization id="test_id-utilization">
1487 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1488 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1489 </utilization>
1490 </resource>
1491 """
1492 ).documentElement
1493 utils.dom_update_utilization(
1494 el,
1495 {
1496 "key": "100",
1497 "keys": "",
1498 },
1499 )
1500
1501 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1502 self.assertEqual(len(dom_get_child_elements(u)), 1)
1503 self.assertEqual(
1504 dom_get_child_elements(u)[0].getAttribute("id"),
1505 "test_id-utilization-key",
1506 )
1507 self.assertEqual(
1508 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1509 )
1510 self.assertEqual(
1511 dom_get_child_elements(u)[0].getAttribute("value"), "100"
1512 )
1513
1514 def test_dom_update_meta_attr_add(self):
1515 el = xml.dom.minidom.parseString(
1516 """
1517 <resource id="test_id"/>
1518 """
1519 ).documentElement
1520 utils.dom_update_meta_attr(
1521 el, [("name", ""), ("key", "test"), ("key2", "val")]
1522 )
1523
1524 self.assertEqual(len(dom_get_child_elements(el)), 1)
1525 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1526 self.assertEqual(u.tagName, "meta_attributes")
1527 self.assertEqual(u.getAttribute("id"), "test_id-meta_attributes")
1528 self.assertEqual(len(dom_get_child_elements(u)), 2)
1529
1530 self.assertEqual(
1531 dom_get_child_elements(u)[0].getAttribute("id"),
1532 "test_id-meta_attributes-key",
1533 )
1534 self.assertEqual(
1535 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1536 )
1537 self.assertEqual(
1538 dom_get_child_elements(u)[0].getAttribute("value"), "test"
1539 )
1540 self.assertEqual(
1541 dom_get_child_elements(u)[1].getAttribute("id"),
1542 "test_id-meta_attributes-key2",
1543 )
1544 self.assertEqual(
1545 dom_get_child_elements(u)[1].getAttribute("name"), "key2"
1546 )
1547 self.assertEqual(
1548 dom_get_child_elements(u)[1].getAttribute("value"), "val"
1549 )
1550
1551 def test_dom_update_meta_attr_update_remove(self):
1552 el = xml.dom.minidom.parseString(
1553 """
1554 <resource id="test_id">
1555 <meta_attributes id="test_id-utilization">
1556 <nvpair id="test_id-meta_attributes-key" name="key" value="test"/>
1557 <nvpair id="test_id-meta_attributes-key2" name="key2" value="val"/>
1558 </meta_attributes>
1559 </resource>
1560 """
1561 ).documentElement
1562 utils.dom_update_meta_attr(el, [("key", "another_val"), ("key2", "")])
1563
1564 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1565 self.assertEqual(len(dom_get_child_elements(u)), 1)
1566 self.assertEqual(
1567 dom_get_child_elements(u)[0].getAttribute("id"),
1568 "test_id-meta_attributes-key",
1569 )
1570 self.assertEqual(
1571 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1572 )
1573 self.assertEqual(
1574 dom_get_child_elements(u)[0].getAttribute("value"), "another_val"
1575 )
1576
1577 def test_get_utilization(self):
1578 el = xml.dom.minidom.parseString(
1579 """
1580 <resource id="test_id">
1581 <utilization id="test_id-utilization">
1582 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1583 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1584 </utilization>
1585 </resource>
1586 """
1587 ).documentElement
1588 self.assertEqual({"key": "-1", "keys": "90"}, utils.get_utilization(el))
1589
1590 def test_get_utilization_str(self):
1591 el = xml.dom.minidom.parseString(
1592 """
1593 <resource id="test_id">
1594 <utilization id="test_id-utilization">
1595 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1596 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1597 </utilization>
1598 </resource>
1599 """
1600 ).documentElement
1601 self.assertEqual("key=-1 keys=90", utils.get_utilization_str(el))
1602
1603 def assert_element_id(self, node, node_id, tag=None):
1604 self.assertTrue(
1605 isinstance(node, xml.dom.minidom.Element),
1606 "element with id '%s' not found" % node_id,
1607 )
1608 self.assertEqual(node.getAttribute("id"), node_id)
1609 if tag:
1610 self.assertEqual(node.tagName, tag)
1611
1612
1613 class RunParallelTest(TestCase):
1614 @staticmethod
1615 def fixture_create_worker(log, name, sleepSeconds=0):
1616 # pylint: disable=invalid-name
1617 def worker():
1618 sleep(sleepSeconds)
1619 log.append(name)
1620
1621 return worker
1622
1623 def test_run_all_workers(self):
1624 log = []
1625 utils.run_parallel(
1626 [
1627 self.fixture_create_worker(log, "first"),
1628 self.fixture_create_worker(log, "second"),
1629 ],
1630 wait_seconds=0.1,
1631 )
1632
1633 self.assertEqual(sorted(log), sorted(["first", "second"]))
1634
1635
1636 class NodeActionTaskTest(TestCase):
1637 def test_can_run_action(self):
1638 def action(node, arg, kwarg=None):
1639 return (0, ":".join([node, arg, kwarg]))
1640
1641 report_list = []
1642
1643 def report(node, returncode, output):
1644 report_list.append("|".join([node, str(returncode), output]))
1645
1646 task = utils.create_task(report, action, "node", "arg", kwarg="kwarg")
1647 task()
1648
1649 self.assertEqual(["node|0|node:arg:kwarg"], report_list)
1650
1651
1652 class TouchCibFile(TestCase):
1653 @mock.patch("pcs.utils.os.path.isfile", mock.Mock(return_value=False))
1654 @mock.patch(
1655 "pcs.utils.write_empty_cib",
1656 mock.Mock(side_effect=EnvironmentError("some message")),
1657 )
1658 @mock.patch("pcs.utils.err")
1659 def test_exception_is_transformed_correctly(self, err):
1660 # pylint: disable=no-self-use
1661 filename = "/fake/filename"
1662 utils.touch_cib_file(filename)
1663 err.assert_called_once_with(
1664 "Unable to write to file: '/fake/filename': 'some message'"
1665 )
1666
1667
1668 @mock.patch("pcs.utils.is_run_interactive", mock.Mock(return_value=False))
1669 class GetContinueConfirmation(TestCase):
1670 def setUp(self):
1671 self.text = "some warning text"
1672 patcher_output = mock.patch("pcs.cli.reports.output.print_to_stderr")
1673 self.addCleanup(patcher_output.stop)
1674 self.mock_output = patcher_output.start()
1675
1676 def test_yes_force(self):
1677 self.assertTrue(utils.get_continue_confirmation(self.text, True, True))
1678 self.mock_output.assert_called_once_with(f"Warning: {self.text}")
1679
1680 def test_yes(self):
1681 self.assertTrue(utils.get_continue_confirmation(self.text, True, False))
1682 self.mock_output.assert_called_once_with(f"Warning: {self.text}")
1683
1684 def test_force(self):
1685 self.assertTrue(utils.get_continue_confirmation(self.text, False, True))
1686 self.assertEqual(
1687 self.mock_output.mock_calls,
1688 [
1689 mock.call(
1690 "Deprecation Warning: Using --force to confirm this action "
1691 "is deprecated and might be removed in a future release, "
1692 "use --yes instead"
1693 ),
1694 mock.call(f"Warning: {self.text}"),
1695 ],
1696 )
1697
1698 def test_nothing(self):
1699 with self.assertRaises(SystemExit) as cm:
1700 self.assertFalse(
1701 utils.get_continue_confirmation(self.text, False, False)
1702 )
1703 self.assertEqual(cm.exception.code, 1)
1704 self.mock_output.assert_called_once_with(
1705 f"Error: {self.text}, use --yes to override"
1706 )
1707