1 # pylint: disable=too-many-lines
2 import sys
3 import xml.dom.minidom
4 from io import StringIO
5 from time import sleep
6 from unittest import (
7 TestCase,
8 mock,
9 )
10
11 from pcs import utils
12 from pcs.common import const
13
14 from pcs_test.tools.misc import get_test_resource as rc
15 from pcs_test.tools.xml import dom_get_child_elements
16
17 cib_with_nodes = rc("cib-empty-withnodes.xml")
18 empty_cib = rc("cib-empty.xml")
19
20 TestCase.maxDiff = None
21
22
23 class UtilsTest(TestCase):
24 # pylint: disable=too-many-public-methods
25 @staticmethod
26 def get_cib_empty():
27 return xml.dom.minidom.parse(empty_cib)
28
29 @staticmethod
30 def get_cib_with_nodes_minidom():
31 return xml.dom.minidom.parse(cib_with_nodes)
32
33 def get_cib_resources(self):
34 cib_dom = self.get_cib_empty()
35 new_resources = xml.dom.minidom.parseString(
36 """
37 <resources>
38 <primitive id="myResource"
39 class="ocf" provider="heartbeat" type="Dummy">
40 </primitive>
41 <clone id="myClone">
42 <primitive id="myClonedResource"
43 class="ocf" provider="heartbeat" type="Dummy">
44 </primitive>
45 </clone>
46 <clone id="myUniqueClone">
47 <primitive id="myUniqueClonedResource"
48 class="ocf" provider="heartbeat" type="Dummy">
49 </primitive>
50 <meta-attributes>
51 <nvpair name="globally-unique" value="true" />
52 </meta-attributes>
53 </clone>
54 <master id="myMaster">
55 <primitive id="myMasteredResource"
56 class="ocf" provider="heartbeat" type="Dummy">
57 </primitive>
58 </master>
59 <group id="myGroup">
60 <primitive id="myGroupedResource"
61 class="ocf" provider="heartbeat" type="Dummy">
62 </primitive>
63 </group>
64 <clone id="myGroupClone">
65 <group id="myClonedGroup">
66 <primitive id="myClonedGroupedResource"
67 class="ocf" provider="heartbeat" type="Dummy">
68 </primitive>
69 </group>
70 </clone>
71 <master id="myGroupMaster">
72 <group id="myMasteredGroup">
73 <primitive id="myMasteredGroupedResource"
74 class="ocf" provider="heartbeat" type="Dummy">
75 </primitive>
76 </group>
77 </master>
78 <bundle id="myBundle">
79 <primitive id="myBundledResource"
80 class="ocf" provider="heartbeat" type="Dummy" />
81 </bundle>
82 <bundle id="myEmptyBundle"/>
83 </resources>
84 """
85 ).documentElement
86 resources = cib_dom.getElementsByTagName("resources")[0]
87 resources.parentNode.replaceChild(new_resources, resources)
88 return cib_dom
89
90 def test_dom_get_resources(self): # noqa: PLR0915
91 # pylint: disable=too-many-statements
92 def test_dom_get(method, dom, ok_ids, bad_ids):
93 for element_id in ok_ids:
94 self.assert_element_id(method(dom, element_id), element_id)
95 for element_id in bad_ids:
96 self.assertFalse(method(dom, element_id))
97
98 cib_dom = self.get_cib_empty()
99 self.assertFalse(utils.dom_get_resource(cib_dom, "myResource"))
100 self.assertFalse(
101 utils.dom_get_resource_clone(cib_dom, "myClonedResource")
102 )
103 self.assertFalse(
104 utils.dom_get_resource_masterslave(cib_dom, "myMasteredResource")
105 )
106 self.assertFalse(utils.dom_get_group(cib_dom, "myGroup"))
107 self.assertFalse(utils.dom_get_group_clone(cib_dom, "myClonedGroup"))
108 self.assertFalse(
109 utils.dom_get_group_masterslave(cib_dom, "myMasteredGroup")
110 )
111 self.assertFalse(utils.dom_get_clone(cib_dom, "myClone"))
112 self.assertFalse(utils.dom_get_master(cib_dom, "myMaster"))
113 self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myClone"))
114 self.assertFalse(utils.dom_get_clone_ms_resource(cib_dom, "myMaster"))
115 self.assertFalse(
116 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource")
117 )
118 self.assertFalse(
119 utils.dom_get_resource_clone_ms_parent(
120 cib_dom, "myMasteredResource"
121 )
122 )
123 self.assertIsNone(utils.dom_get_bundle(cib_dom, "myResource"))
124 self.assertIsNone(utils.dom_get_bundle(cib_dom, "notExisting"))
125 self.assertIsNone(
126 utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource")
127 )
128
129 cib_dom = self.get_cib_resources()
130 all_ids = {
131 "none",
132 "myResource",
133 "myClone",
134 "myClonedResource",
135 "myUniqueClone",
136 "myUniqueClonedResource",
137 "myMaster",
138 "myMasteredResource",
139 "myGroup",
140 "myGroupedResource",
141 "myGroupClone",
142 "myClonedGroup",
143 "myClonedGroupedResource",
144 "myGroupMaster",
145 "myMasteredGroup",
146 "myMasteredGroupedResource",
147 "myBundledResource",
148 "myBundle",
149 "myEmptyBundle",
150 }
151
152 resource_ids = {
153 "myResource",
154 "myClonedResource",
155 "myUniqueClonedResource",
156 "myGroupedResource",
157 "myMasteredResource",
158 "myClonedGroupedResource",
159 "myMasteredGroupedResource",
160 "myBundledResource",
161 }
162 test_dom_get(
163 utils.dom_get_resource,
164 cib_dom,
165 resource_ids,
166 all_ids - resource_ids,
167 )
168
169 cloned_ids = {
170 "myClonedResource",
171 "myUniqueClonedResource",
172 "myClonedGroupedResource",
173 }
174 test_dom_get(
175 utils.dom_get_resource_clone,
176 cib_dom,
177 cloned_ids,
178 all_ids - cloned_ids,
179 )
180
181 mastered_ids = {"myMasteredResource", "myMasteredGroupedResource"}
182 test_dom_get(
183 utils.dom_get_resource_masterslave,
184 cib_dom,
185 mastered_ids,
186 all_ids - mastered_ids,
187 )
188
189 group_ids = {"myGroup", "myClonedGroup", "myMasteredGroup"}
190 test_dom_get(
191 utils.dom_get_group, cib_dom, group_ids, all_ids - group_ids
192 )
193
194 cloned_group_ids = {"myClonedGroup"}
195 test_dom_get(
196 utils.dom_get_group_clone,
197 cib_dom,
198 cloned_group_ids,
199 all_ids - cloned_group_ids,
200 )
201
202 clone_ids = {"myClone", "myUniqueClone", "myGroupClone"}
203 test_dom_get(
204 utils.dom_get_clone, cib_dom, clone_ids, all_ids - clone_ids
205 )
206
207 mastered_group_ids = {"myMasteredGroup"}
208 test_dom_get(
209 utils.dom_get_group_masterslave,
210 cib_dom,
211 mastered_group_ids,
212 all_ids - mastered_group_ids,
213 )
214
215 master_ids = {"myMaster", "myGroupMaster"}
216 test_dom_get(
217 utils.dom_get_master, cib_dom, master_ids, all_ids - master_ids
218 )
219
220 bundle_ids = {"myBundle", "myEmptyBundle"}
221 test_dom_get(
222 utils.dom_get_bundle, cib_dom, bundle_ids, all_ids - bundle_ids
223 )
224
225 self.assert_element_id(
226 utils.dom_get_clone_ms_resource(cib_dom, "myClone"),
227 "myClonedResource",
228 )
229 self.assert_element_id(
230 utils.dom_get_clone_ms_resource(cib_dom, "myGroupClone"),
231 "myClonedGroup",
232 )
233 self.assert_element_id(
234 utils.dom_get_clone_ms_resource(cib_dom, "myMaster"),
235 "myMasteredResource",
236 )
237 self.assert_element_id(
238 utils.dom_get_clone_ms_resource(cib_dom, "myGroupMaster"),
239 "myMasteredGroup",
240 )
241
242 self.assert_element_id(
243 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedResource"),
244 "myClone",
245 )
246 self.assert_element_id(
247 utils.dom_get_resource_clone_ms_parent(cib_dom, "myClonedGroup"),
248 "myGroupClone",
249 )
250 self.assert_element_id(
251 utils.dom_get_resource_clone_ms_parent(
252 cib_dom, "myClonedGroupedResource"
253 ),
254 "myGroupClone",
255 )
256 self.assert_element_id(
257 utils.dom_get_resource_clone_ms_parent(
258 cib_dom, "myMasteredResource"
259 ),
260 "myMaster",
261 )
262 self.assert_element_id(
263 utils.dom_get_resource_clone_ms_parent(cib_dom, "myMasteredGroup"),
264 "myGroupMaster",
265 )
266 self.assert_element_id(
267 utils.dom_get_resource_clone_ms_parent(
268 cib_dom, "myMasteredGroupedResource"
269 ),
270 "myGroupMaster",
271 )
272 self.assertEqual(
273 None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myResource")
274 )
275 self.assertEqual(
276 None, utils.dom_get_resource_clone_ms_parent(cib_dom, "myGroup")
277 )
278 self.assertEqual(
279 None,
280 utils.dom_get_resource_clone_ms_parent(
281 cib_dom, "myGroupedResource"
282 ),
283 )
284
285 self.assertIsNone(
286 utils.dom_get_resource_bundle(
287 utils.dom_get_bundle(cib_dom, "myEmptyBundle")
288 )
289 )
290 self.assert_element_id(
291 utils.dom_get_resource_bundle(
292 utils.dom_get_bundle(cib_dom, "myBundle")
293 ),
294 "myBundledResource",
295 "primitive",
296 )
297
298 self.assert_element_id(
299 utils.dom_get_resource_bundle_parent(cib_dom, "myBundledResource"),
300 "myBundle",
301 )
302 self.assertIsNone(
303 utils.dom_get_resource_bundle_parent(cib_dom, "myResource")
304 )
305 self.assertIsNone(
306 utils.dom_get_resource_bundle_parent(cib_dom, "myClone")
307 )
308 self.assertIsNone(
309 utils.dom_get_resource_bundle_parent(cib_dom, "myClonedResource")
310 )
311 self.assertIsNone(
312 utils.dom_get_resource_bundle_parent(cib_dom, "myMaster")
313 )
314 self.assertIsNone(
315 utils.dom_get_resource_bundle_parent(cib_dom, "myMasteredGroup")
316 )
317 self.assertIsNone(
318 utils.dom_get_resource_bundle_parent(cib_dom, "myGroup")
319 )
320 self.assertIsNone(
321 utils.dom_get_resource_bundle_parent(cib_dom, "myGroupedResource")
322 )
323 self.assertIsNone(
324 utils.dom_get_resource_bundle_parent(cib_dom, "myGroupClone")
325 )
326 self.assertIsNone(
327 utils.dom_get_resource_bundle_parent(cib_dom, "myClonedGroup")
328 )
329 self.assertIsNone(
330 utils.dom_get_resource_bundle_parent(
331 cib_dom, "myClonedGroupedResource"
332 )
333 )
334
335 def test_dom_get_resource_remote_node_name(self):
336 dom = self.get_cib_empty()
337 new_resources = xml.dom.minidom.parseString(
338 """
339 <resources>
340 <primitive id="dummy1"
341 class="ocf" provider="heartbeat" type="Dummy">
342 </primitive>
343 <primitive class="ocf" id="vm-guest1" provider="heartbeat"
344 type="VirtualDomain">
345 <instance_attributes id="vm-guest1-instance_attributes">
346 <nvpair id="vm-guest1-instance_attributes-hypervisor"
347 name="hypervisor" value="qemu:///system"/>
348 <nvpair id="vm-guest1-instance_attributes-config"
349 name="config" value="/root/guest1.xml"/>
350 </instance_attributes>
351 <meta_attributes id="vm-guest1-meta_attributes">
352 <nvpair id="vm-guest1-meta_attributes-remote-node"
353 name="remote-node" value="guest1"/>
354 </meta_attributes>
355 </primitive>
356 <primitive id="dummy2"
357 class="ocf" provider="heartbeat" type="Dummy">
358 <instance_attributes id="vm-guest1-meta_attributes">
359 <nvpair id="dummy2-remote-node"
360 name="remote-node" value="guest2"/>
361 </instance_attributes>
362 </primitive>
363 <primitive id="dummy3"
364 class="ocf" provider="pacemaker" type="remote">
365 </primitive>
366 </resources>
367 """
368 ).documentElement
369 resources = dom.getElementsByTagName("resources")[0]
370 resources.parentNode.replaceChild(new_resources, resources)
371
372 self.assertEqual(
373 None,
374 utils.dom_get_resource_remote_node_name(
375 utils.dom_get_resource(dom, "dummy1")
376 ),
377 )
378 self.assertEqual(
379 None,
380 utils.dom_get_resource_remote_node_name(
381 utils.dom_get_resource(dom, "dummy2")
382 ),
383 )
384 self.assertEqual(
385 "guest1",
386 utils.dom_get_resource_remote_node_name(
387 utils.dom_get_resource(dom, "vm-guest1")
388 ),
389 )
390 self.assertEqual(
391 "dummy3",
392 utils.dom_get_resource_remote_node_name(
393 utils.dom_get_resource(dom, "dummy3")
394 ),
395 )
396
397 def test_dom_get_meta_attr_value(self):
398 dom = self.get_cib_empty()
399 new_resources = xml.dom.minidom.parseString(
400 """
401 <resources>
402 <primitive id="dummy1"
403 class="ocf" provider="heartbeat" type="Dummy">
404 </primitive>
405 <primitive class="ocf" id="vm-guest1" provider="heartbeat"
406 type="VirtualDomain">
407 <instance_attributes id="vm-guest1-instance_attributes">
408 <nvpair id="vm-guest1-instance_attributes-hypervisor"
409 name="hypervisor" value="qemu:///system"/>
410 <nvpair id="vm-guest1-instance_attributes-config"
411 name="config" value="/root/guest1.xml"/>
412 </instance_attributes>
413 <meta_attributes id="vm-guest1-meta_attributes">
414 <nvpair id="vm-guest1-meta_attributes-remote-node"
415 name="remote-node" value="guest1"/>
416 </meta_attributes>
417 </primitive>
418 <primitive id="dummy2"
419 class="ocf" provider="heartbeat" type="Dummy">
420 <instance_attributes id="vm-guest1-meta_attributes">
421 <nvpair id="dummy2-remote-node"
422 name="remote-node" value="guest2"/>
423 </instance_attributes>
424 </primitive>
425 </resources>
426 """
427 ).documentElement
428 resources = dom.getElementsByTagName("resources")[0]
429 resources.parentNode.replaceChild(new_resources, resources)
430
431 self.assertEqual(
432 None,
433 utils.dom_get_meta_attr_value(
434 utils.dom_get_resource(dom, "dummy1"), "foo"
435 ),
436 )
437 self.assertEqual(
438 None,
439 utils.dom_get_meta_attr_value(
440 utils.dom_get_resource(dom, "dummy2"), "remote-node"
441 ),
442 )
443 self.assertEqual(
444 "guest1",
445 utils.dom_get_meta_attr_value(
446 utils.dom_get_resource(dom, "vm-guest1"), "remote-node"
447 ),
448 )
449 self.assertEqual(
450 None,
451 utils.dom_get_meta_attr_value(
452 utils.dom_get_resource(dom, "vm-guest1"), "foo"
453 ),
454 )
455
456 def test_get_element_with_id(self):
457 dom = xml.dom.minidom.parseString(
458 """
459 <aa>
460 <bb id="bb1"/>
461 <bb/>
462 <bb id="bb2">
463 <cc id="cc1"/>
464 </bb>
465 <bb id="bb3">
466 <cc id="cc2"/>
467 </bb>
468 </aa>
469 """
470 ).documentElement
471
472 self.assert_element_id(
473 utils.dom_get_element_with_id(dom, "bb", "bb1"), "bb1"
474 )
475 self.assert_element_id(
476 utils.dom_get_element_with_id(dom, "bb", "bb2"), "bb2"
477 )
478 self.assert_element_id(
479 utils.dom_get_element_with_id(dom, "cc", "cc1"), "cc1"
480 )
481 self.assert_element_id(
482 utils.dom_get_element_with_id(
483 utils.dom_get_element_with_id(dom, "bb", "bb2"), "cc", "cc1"
484 ),
485 "cc1",
486 )
487 self.assertEqual(None, utils.dom_get_element_with_id(dom, "dd", "bb1"))
488 self.assertEqual(None, utils.dom_get_element_with_id(dom, "bb", "bb4"))
489 self.assertEqual(None, utils.dom_get_element_with_id(dom, "bb", "cc1"))
490 self.assertEqual(
491 None,
492 utils.dom_get_element_with_id(
493 utils.dom_get_element_with_id(dom, "bb", "bb2"), "cc", "cc2"
494 ),
495 )
496
497 def test_dom_get_parent_by_tag_name(self):
498 dom = xml.dom.minidom.parseString(
499 """
500 <aa id="aa1">
501 <bb id="bb1"/>
502 <bb id="bb2">
503 <cc id="cc1"/>
504 </bb>
505 <bb id="bb3">
506 <cc id="cc2"/>
507 </bb>
508 <dd id="dd1" />
509 </aa>
510 """
511 ).documentElement
512 bb1 = utils.dom_get_element_with_id(dom, "bb", "bb1")
513 cc1 = utils.dom_get_element_with_id(dom, "cc", "cc1")
514
515 self.assert_element_id(
516 utils.dom_get_parent_by_tag_names(bb1, ["aa"]), "aa1"
517 )
518 self.assert_element_id(
519 utils.dom_get_parent_by_tag_names(cc1, ["aa"]), "aa1"
520 )
521 self.assert_element_id(
522 utils.dom_get_parent_by_tag_names(cc1, ["bb"]), "bb2"
523 )
524
525 self.assertEqual(None, utils.dom_get_parent_by_tag_names(bb1, ["cc"]))
526 self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["dd"]))
527 self.assertEqual(None, utils.dom_get_parent_by_tag_names(cc1, ["ee"]))
528
529 def test_validate_constraint_resource(self):
530 dom = self.get_cib_resources()
531 self.assertEqual(
532 (True, "", "myClone"),
533 utils.validate_constraint_resource(dom, "myClone"),
534 )
535 self.assertEqual(
536 (True, "", "myGroupClone"),
537 utils.validate_constraint_resource(dom, "myGroupClone"),
538 )
539 self.assertEqual(
540 (True, "", "myMaster"),
541 utils.validate_constraint_resource(dom, "myMaster"),
542 )
543 self.assertEqual(
544 (True, "", "myGroupMaster"),
545 utils.validate_constraint_resource(dom, "myGroupMaster"),
546 )
547 self.assertEqual(
548 (True, "", "myBundle"),
549 utils.validate_constraint_resource(dom, "myBundle"),
550 )
551 self.assertEqual(
552 (True, "", "myEmptyBundle"),
553 utils.validate_constraint_resource(dom, "myEmptyBundle"),
554 )
555 self.assertEqual(
556 (True, "", "myResource"),
557 utils.validate_constraint_resource(dom, "myResource"),
558 )
559 self.assertEqual(
560 (True, "", "myGroup"),
561 utils.validate_constraint_resource(dom, "myGroup"),
562 )
563 self.assertEqual(
564 (True, "", "myGroupedResource"),
565 utils.validate_constraint_resource(dom, "myGroupedResource"),
566 )
567
568 self.assertEqual(
569 (False, "Resource 'myNonexistent' does not exist", None),
570 utils.validate_constraint_resource(dom, "myNonexistent"),
571 )
572
573 message = (
574 "%s is a clone resource, you should use the clone id: "
575 "%s when adding constraints. Use --force to override."
576 )
577 self.assertEqual(
578 (False, message % ("myClonedResource", "myClone"), "myClone"),
579 utils.validate_constraint_resource(dom, "myClonedResource"),
580 )
581 self.assertEqual(
582 (
583 False,
584 message % ("myClonedGroup", "myGroupClone"),
585 "myGroupClone",
586 ),
587 utils.validate_constraint_resource(dom, "myClonedGroup"),
588 )
589 self.assertEqual(
590 (
591 False,
592 message % ("myClonedGroupedResource", "myGroupClone"),
593 "myGroupClone",
594 ),
595 utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
596 )
597
598 message = (
599 "%s is a clone resource, you should use the clone id: "
600 "%s when adding constraints. Use --force to override."
601 )
602 self.assertEqual(
603 (False, message % ("myMasteredResource", "myMaster"), "myMaster"),
604 utils.validate_constraint_resource(dom, "myMasteredResource"),
605 )
606 self.assertEqual(
607 (
608 False,
609 message % ("myMasteredGroup", "myGroupMaster"),
610 "myGroupMaster",
611 ),
612 utils.validate_constraint_resource(dom, "myMasteredGroup"),
613 )
614 self.assertEqual(
615 (
616 False,
617 message % ("myMasteredGroupedResource", "myGroupMaster"),
618 "myGroupMaster",
619 ),
620 utils.validate_constraint_resource(
621 dom, "myMasteredGroupedResource"
622 ),
623 )
624
625 message = (
626 "%s is a bundle resource, you should use the bundle id: "
627 "%s when adding constraints. Use --force to override."
628 )
629 self.assertEqual(
630 (False, message % ("myBundledResource", "myBundle"), "myBundle"),
631 utils.validate_constraint_resource(dom, "myBundledResource"),
632 )
633
634 utils.pcs_options["--force"] = True
635 self.assertEqual(
636 (True, "", "myClone"),
637 utils.validate_constraint_resource(dom, "myClonedResource"),
638 )
639 self.assertEqual(
640 (True, "", "myGroupClone"),
641 utils.validate_constraint_resource(dom, "myClonedGroup"),
642 )
643 self.assertEqual(
644 (True, "", "myGroupClone"),
645 utils.validate_constraint_resource(dom, "myClonedGroupedResource"),
646 )
647 self.assertEqual(
648 (True, "", "myMaster"),
649 utils.validate_constraint_resource(dom, "myMasteredResource"),
650 )
651 self.assertEqual(
652 (True, "", "myGroupMaster"),
653 utils.validate_constraint_resource(dom, "myMasteredGroup"),
654 )
655 self.assertEqual(
656 (True, "", "myGroupMaster"),
657 utils.validate_constraint_resource(
658 dom, "myMasteredGroupedResource"
659 ),
660 )
661 self.assertEqual(
662 (True, "", "myBundle"),
663 utils.validate_constraint_resource(dom, "myBundledResource"),
664 )
665
666 def test_validate_xml_id(self):
667 self.assertEqual((True, ""), utils.validate_xml_id("dummy"))
668 self.assertEqual((True, ""), utils.validate_xml_id("DUMMY"))
669 self.assertEqual((True, ""), utils.validate_xml_id("dUmMy"))
670 self.assertEqual((True, ""), utils.validate_xml_id("dummy0"))
671 self.assertEqual((True, ""), utils.validate_xml_id("dum0my"))
672 self.assertEqual((True, ""), utils.validate_xml_id("dummy-"))
673 self.assertEqual((True, ""), utils.validate_xml_id("dum-my"))
674 self.assertEqual((True, ""), utils.validate_xml_id("dummy."))
675 self.assertEqual((True, ""), utils.validate_xml_id("dum.my"))
676 self.assertEqual((True, ""), utils.validate_xml_id("_dummy"))
677 self.assertEqual((True, ""), utils.validate_xml_id("dummy_"))
678 self.assertEqual((True, ""), utils.validate_xml_id("dum_my"))
679
680 self.assertEqual(
681 (False, "test id cannot be empty"),
682 utils.validate_xml_id("", "test id"),
683 )
684
685 msg = "invalid test id '%s', '%s' is not a valid first character for a test id"
686 self.assertEqual(
687 (False, msg % ("0", "0")), utils.validate_xml_id("0", "test id")
688 )
689 self.assertEqual(
690 (False, msg % ("-", "-")), utils.validate_xml_id("-", "test id")
691 )
692 self.assertEqual(
693 (False, msg % (".", ".")), utils.validate_xml_id(".", "test id")
694 )
695 self.assertEqual(
696 (False, msg % (":", ":")), utils.validate_xml_id(":", "test id")
697 )
698 self.assertEqual(
699 (False, msg % ("0dummy", "0")),
700 utils.validate_xml_id("0dummy", "test id"),
701 )
702 self.assertEqual(
703 (False, msg % ("-dummy", "-")),
704 utils.validate_xml_id("-dummy", "test id"),
705 )
706 self.assertEqual(
707 (False, msg % (".dummy", ".")),
708 utils.validate_xml_id(".dummy", "test id"),
709 )
710 self.assertEqual(
711 (False, msg % (":dummy", ":")),
712 utils.validate_xml_id(":dummy", "test id"),
713 )
714
715 msg = (
716 "invalid test id '%s', '%s' is not a valid character for a test id"
717 )
718 self.assertEqual(
719 (False, msg % ("dum:my", ":")),
720 utils.validate_xml_id("dum:my", "test id"),
721 )
722 self.assertEqual(
723 (False, msg % ("dummy:", ":")),
724 utils.validate_xml_id("dummy:", "test id"),
725 )
726 self.assertEqual(
727 (False, msg % ("dum?my", "?")),
728 utils.validate_xml_id("dum?my", "test id"),
729 )
730 self.assertEqual(
731 (False, msg % ("dummy?", "?")),
732 utils.validate_xml_id("dummy?", "test id"),
733 )
734
735 def test_is_iso8601_date(self):
736 self.assertTrue(utils.is_iso8601_date("2014-07-03"))
737 self.assertTrue(utils.is_iso8601_date("2014-07-03T11:35:14"))
738 self.assertTrue(utils.is_iso8601_date("20140703"))
739 self.assertTrue(utils.is_iso8601_date("2014-W27-4"))
740 self.assertTrue(utils.is_iso8601_date("2014-184"))
741
742 self.assertFalse(utils.is_iso8601_date(""))
743 self.assertFalse(utils.is_iso8601_date("foo"))
744 self.assertFalse(utils.is_iso8601_date("2014-07-32"))
745 self.assertFalse(utils.is_iso8601_date("2014-13-03"))
746 self.assertFalse(utils.is_iso8601_date("2014-W27-8"))
747 self.assertFalse(utils.is_iso8601_date("2014-367"))
748
749 def test_is_score(self):
750 self.assertTrue(utils.is_score("INFINITY"))
751 self.assertTrue(utils.is_score("+INFINITY"))
752 self.assertTrue(utils.is_score("-INFINITY"))
753 self.assertTrue(utils.is_score("0"))
754 self.assertTrue(utils.is_score("+0"))
755 self.assertTrue(utils.is_score("-0"))
756 self.assertTrue(utils.is_score("123"))
757 self.assertTrue(utils.is_score("-123"))
758 self.assertTrue(utils.is_score("+123"))
759
760 self.assertFalse(utils.is_score(""))
761 self.assertFalse(utils.is_score("abc"))
762 self.assertFalse(utils.is_score("+abc"))
763 self.assertFalse(utils.is_score("-abc"))
764 self.assertFalse(utils.is_score("10a"))
765 self.assertFalse(utils.is_score("+10a"))
766 self.assertFalse(utils.is_score("-10a"))
767 self.assertFalse(utils.is_score("a10"))
768 self.assertFalse(utils.is_score("+a10"))
769 self.assertFalse(utils.is_score("a-10"))
770 self.assertFalse(utils.is_score("infinity"))
771 self.assertFalse(utils.is_score("+infinity"))
772 self.assertFalse(utils.is_score("-infinity"))
773 self.assertFalse(utils.is_score("+InFiNiTy"))
774 self.assertFalse(utils.is_score("INFINITY10"))
775 self.assertFalse(utils.is_score("INFINITY+10"))
776 self.assertFalse(utils.is_score("-INFINITY10"))
777 self.assertFalse(utils.is_score("+INFINITY+10"))
778 self.assertFalse(utils.is_score("10INFINITY"))
779 self.assertFalse(utils.is_score("+10+INFINITY"))
780
781 def get_cib_status_lrm(self):
782 cib_dom = self.get_cib_empty()
783 new_status = xml.dom.minidom.parseString(
784 """
785 <status>
786 <node_state id="1" uname="rh70-node1">
787 <lrm id="1">
788 <lrm_resources>
789 <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
790 <lrm_rsc_op id="dummy_monitor_30000" operation="monitor" call-id="34"
791 rc-code="1" on_node="Xrh70-node1X" exit-reason="test" />
792 <lrm_rsc_op id="dummy_stop_0" operation="stop" call-id="32"
793 rc-code="0" />
794 <lrm_rsc_op id="dummy_start_0" operation="start" call-id="33"
795 rc-code="0" />
796 </lrm_resource>
797 </lrm_resources>
798 </lrm>
799 </node_state>
800 <node_state id="2" uname="rh70-node2">
801 <lrm id="2">
802 <lrm_resources>
803 <lrm_resource id="dummy" type="Dummy" class="ocf" provider="heartbeat">
804 <lrm_rsc_op id="dummy_monitor_0" operation="monitor" call-id="5"
805 rc-code="1" />
806 </lrm_resource>
807 <lrm_resource id="dummy1" type="Dummy" class="ocf" provider="heartbeat">
808 <lrm_rsc_op id="dummy1_monitor_0" operation="monitor" call-id="3"
809 rc-code="0" />
810 </lrm_resource>
811 </lrm_resources>
812 </lrm>
813 </node_state>
814 </status>
815 """
816 ).documentElement
817 status = cib_dom.getElementsByTagName("status")[0]
818 status.parentNode.replaceChild(new_status, status)
819 return cib_dom
820
821 def test_resource_running_on(self):
822 status = xml.dom.minidom.parseString(
823 f"""
824 <crm_mon>
825 <summary />
826 <nodes />
827 <resources>
828 <resource id="myResource" role="Started">
829 <node name="rh70-node1" />
830 </resource>
831 <clone id="myClone">
832 <resource id="myClonedResource" role="Started">
833 <node name="rh70-node1" />
834 </resource>
835 <resource id="myClonedResource" role="Started">
836 <node name="rh70-node2" />
837 </resource>
838 <resource id="myClonedResource" role="Started">
839 <node name="rh70-node3" />
840 </resource>
841 </clone>
842 <clone id="myMaster">
843 <resource id="myMasteredResource:1" role="{const.PCMK_ROLE_UNPROMOTED_PRIMARY}">
844 <node name="rh70-node2" />
845 </resource>
846 <resource id="myMasteredResource" role="{const.PCMK_ROLE_UNPROMOTED_PRIMARY}">
847 <node name="rh70-node3" />
848 </resource>
849 <resource id="myMasteredResource" role="{const.PCMK_ROLE_PROMOTED_PRIMARY}">
850 <node name="rh70-node1" />
851 </resource>
852 </clone>
853 <group id="myGroup">
854 <resource id="myGroupedResource" role="Started">
855 <node name="rh70-node2" />
856 </resource>
857 </group>
858 <clone id="myGroupClone">
859 <group id="myClonedGroup:0">
860 <resource id="myClonedGroupedResource" role="Started">
861 <node name="rh70-node1" />
862 </resource>
863 </group>
864 <group id="myClonedGroup:1">
865 <resource id="myClonedGroupedResource" role="Started">
866 <node name="rh70-node2" />
867 </resource>
868 </group>
869 <group id="myClonedGroup:2">
870 <resource id="myClonedGroupedResource" role="Started">
871 <node name="rh70-node3" />
872 </resource>
873 </group>
874 <group id="myClonedGroup:3">
875 <resource id="myClonedGroupedResource" role="Started">
876 <node name="rh70-node3" />
877 </resource>
878 </group>
879 </clone>
880 <clone id="myGroupMaster">
881 <group id="myMasteredGroup:0">
882 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED_PRIMARY}">
883 <node name="rh70-node1" />
884 </resource>
885 </group>
886 <group id="myMasteredGroup:1">
887 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_PROMOTED_PRIMARY}">
888 <node name="rh70-node2" />
889 </resource>
890 </group>
891 <group id="myMasteredGroup:2">
892 <resource id="myMasteredGroupedResource" role="{const.PCMK_ROLE_UNPROMOTED_PRIMARY}">
893 <node name="rh70-node3" />
894 </resource>
895 </group>
896 </clone>
897 <resource id="myStoppedResource" role="Stopped">
898 </resource>
899 </resources>
900 </crm_mon>
901 """
902 ).documentElement
903
904 self.assertEqual(
905 utils.resource_running_on("myResource", status),
906 {
907 "message": "Resource 'myResource' is running on node rh70-node1.",
908 "is_running": True,
909 },
910 )
911 self.assertEqual(
912 utils.resource_running_on("myClonedResource", status),
913 {
914 "message": "Resource 'myClonedResource' is running on nodes "
915 "rh70-node1, rh70-node2, rh70-node3.",
916 "is_running": True,
917 },
918 )
919 self.assertEqual(
920 utils.resource_running_on("myClone", status),
921 {
922 "message": "Resource 'myClone' is running on nodes "
923 "rh70-node1, rh70-node2, rh70-node3.",
924 "is_running": True,
925 },
926 )
927 self.assertEqual(
928 utils.resource_running_on("myMasteredResource", status),
929 {
930 "message": (
931 "Resource 'myMasteredResource' is {promoted} on node "
932 "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
933 ).format(
934 promoted=str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower(),
935 unpromoted=str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower(),
936 ),
937 "is_running": True,
938 },
939 )
940 self.assertEqual(
941 utils.resource_running_on("myMaster", status),
942 {
943 "message": (
944 "Resource 'myMaster' is {promoted} on node "
945 "rh70-node1; {unpromoted} on nodes rh70-node2, rh70-node3."
946 ).format(
947 promoted=str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower(),
948 unpromoted=str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower(),
949 ),
950 "is_running": True,
951 },
952 )
953 self.assertEqual(
954 utils.resource_running_on("myGroupedResource", status),
955 {
956 "message": "Resource 'myGroupedResource' is running on node "
957 "rh70-node2.",
958 "is_running": True,
959 },
960 )
961 self.assertEqual(
962 utils.resource_running_on("myGroup", status),
963 {
964 "message": "Resource 'myGroup' is running on node rh70-node2.",
965 "is_running": True,
966 },
967 )
968 self.assertEqual(
969 utils.resource_running_on("myClonedGroupedResource", status),
970 {
971 "message": "Resource 'myClonedGroupedResource' is running on nodes "
972 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
973 "is_running": True,
974 },
975 )
976 self.assertEqual(
977 utils.resource_running_on("myClonedGroup", status),
978 {
979 "message": "Resource 'myClonedGroup' is running on nodes "
980 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
981 "is_running": True,
982 },
983 )
984 self.assertEqual(
985 utils.resource_running_on("myGroupClone", status),
986 {
987 "message": "Resource 'myGroupClone' is running on nodes "
988 "rh70-node1, rh70-node2, rh70-node3, rh70-node3.",
989 "is_running": True,
990 },
991 )
992 self.assertEqual(
993 utils.resource_running_on("myMasteredGroupedResource", status),
994 {
995 "message": (
996 "Resource 'myMasteredGroupedResource' is {promoted} on node "
997 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
998 ).format(
999 promoted=str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower(),
1000 unpromoted=str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower(),
1001 ),
1002 "is_running": True,
1003 },
1004 )
1005 self.assertEqual(
1006 utils.resource_running_on("myMasteredGroup", status),
1007 {
1008 "message": (
1009 "Resource 'myMasteredGroup' is {promoted} on node "
1010 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
1011 ).format(
1012 promoted=str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower(),
1013 unpromoted=str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower(),
1014 ),
1015 "is_running": True,
1016 },
1017 )
1018 self.assertEqual(
1019 utils.resource_running_on("myGroupMaster", status),
1020 {
1021 "message": (
1022 "Resource 'myGroupMaster' is {promoted} on node "
1023 "rh70-node2; {unpromoted} on nodes rh70-node1, rh70-node3."
1024 ).format(
1025 promoted=str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower(),
1026 unpromoted=str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower(),
1027 ),
1028 "is_running": True,
1029 },
1030 )
1031 self.assertEqual(
1032 utils.resource_running_on("notMyResource", status),
1033 {
1034 "message": "Resource 'notMyResource' is not running on any node",
1035 "is_running": False,
1036 },
1037 )
1038 self.assertEqual(
1039 utils.resource_running_on("myStoppedResource", status),
1040 {
1041 "message": "Resource 'myStoppedResource' is not running on any node",
1042 "is_running": False,
1043 },
1044 )
1045
1046 def test_get_operations_from_transitions(self):
1047 transitions = xml.dom.minidom.parse(rc("transitions01.xml"))
1048 self.assertEqual(
1049 [
1050 {
1051 "id": "dummy",
1052 "long_id": "dummy",
1053 "operation": "stop",
1054 "on_node": "rh7-3",
1055 },
1056 {
1057 "id": "dummy",
1058 "long_id": "dummy",
1059 "operation": "start",
1060 "on_node": "rh7-2",
1061 },
1062 {
1063 "id": "d0",
1064 "long_id": "d0:1",
1065 "operation": "stop",
1066 "on_node": "rh7-1",
1067 },
1068 {
1069 "id": "d0",
1070 "long_id": "d0:1",
1071 "operation": "start",
1072 "on_node": "rh7-2",
1073 },
1074 {
1075 "id": "state",
1076 "long_id": "state:0",
1077 "operation": "stop",
1078 "on_node": "rh7-3",
1079 },
1080 {
1081 "id": "state",
1082 "long_id": "state:0",
1083 "operation": "start",
1084 "on_node": "rh7-2",
1085 },
1086 ],
1087 utils.get_operations_from_transitions(transitions),
1088 )
1089
1090 transitions = xml.dom.minidom.parse(rc("transitions02.xml"))
1091 self.assertEqual(
1092 [
1093 {
1094 "id": "RemoteNode",
1095 "long_id": "RemoteNode",
1096 "operation": "stop",
1097 "on_node": "virt-143",
1098 },
1099 {
1100 "id": "RemoteNode",
1101 "long_id": "RemoteNode",
1102 "operation": "migrate_to",
1103 "on_node": "virt-143",
1104 },
1105 {
1106 "id": "RemoteNode",
1107 "long_id": "RemoteNode",
1108 "operation": "migrate_from",
1109 "on_node": "virt-142",
1110 },
1111 {
1112 "id": "dummy8",
1113 "long_id": "dummy8",
1114 "operation": "stop",
1115 "on_node": "virt-143",
1116 },
1117 {
1118 "id": "dummy8",
1119 "long_id": "dummy8",
1120 "operation": "start",
1121 "on_node": "virt-142",
1122 },
1123 ],
1124 utils.get_operations_from_transitions(transitions),
1125 )
1126
1127 def test_get_resources_location_from_operations(self):
1128 cib_dom = self.get_cib_resources()
1129
1130 operations = []
1131 self.assertEqual(
1132 {},
1133 utils.get_resources_location_from_operations(cib_dom, operations),
1134 )
1135
1136 operations = [
1137 {
1138 "id": "myResource",
1139 "long_id": "myResource",
1140 "operation": "start",
1141 "on_node": "rh7-1",
1142 },
1143 ]
1144 self.assertEqual(
1145 {
1146 "myResource": {
1147 "id": "myResource",
1148 "id_for_constraint": "myResource",
1149 "long_id": "myResource",
1150 "start_on_node": "rh7-1",
1151 },
1152 },
1153 utils.get_resources_location_from_operations(cib_dom, operations),
1154 )
1155
1156 operations = [
1157 {
1158 "id": "myResource",
1159 "long_id": "myResource",
1160 "operation": "start",
1161 "on_node": "rh7-1",
1162 },
1163 {
1164 "id": "myResource",
1165 "long_id": "myResource",
1166 "operation": "start",
1167 "on_node": "rh7-2",
1168 },
1169 {
1170 "id": "myResource",
1171 "long_id": "myResource",
1172 "operation": "monitor",
1173 "on_node": "rh7-3",
1174 },
1175 {
1176 "id": "myResource",
1177 "long_id": "myResource",
1178 "operation": "stop",
1179 "on_node": "rh7-3",
1180 },
1181 ]
1182 self.assertEqual(
1183 {
1184 "myResource": {
1185 "id": "myResource",
1186 "id_for_constraint": "myResource",
1187 "long_id": "myResource",
1188 "start_on_node": "rh7-2",
1189 },
1190 },
1191 utils.get_resources_location_from_operations(cib_dom, operations),
1192 )
1193
1194 operations = [
1195 {
1196 "id": "myResource",
1197 "long_id": "myResource",
1198 "operation": "start",
1199 "on_node": "rh7-1",
1200 },
1201 {
1202 "id": "myClonedResource",
1203 "long_id": "myClonedResource:0",
1204 "operation": "start",
1205 "on_node": "rh7-1",
1206 },
1207 {
1208 "id": "myClonedResource",
1209 "long_id": "myClonedResource:0",
1210 "operation": "start",
1211 "on_node": "rh7-2",
1212 },
1213 {
1214 "id": "myClonedResource",
1215 "long_id": "myClonedResource:1",
1216 "operation": "start",
1217 "on_node": "rh7-3",
1218 },
1219 ]
1220 self.assertEqual(
1221 {
1222 "myResource": {
1223 "id": "myResource",
1224 "id_for_constraint": "myResource",
1225 "long_id": "myResource",
1226 "start_on_node": "rh7-1",
1227 },
1228 "myClonedResource:0": {
1229 "id": "myClonedResource",
1230 "id_for_constraint": "myClone",
1231 "long_id": "myClonedResource:0",
1232 "start_on_node": "rh7-2",
1233 },
1234 "myClonedResource:1": {
1235 "id": "myClonedResource",
1236 "id_for_constraint": "myClone",
1237 "long_id": "myClonedResource:1",
1238 "start_on_node": "rh7-3",
1239 },
1240 },
1241 utils.get_resources_location_from_operations(cib_dom, operations),
1242 )
1243
1244 operations = [
1245 {
1246 "id": "myUniqueClonedResource:0",
1247 "long_id": "myUniqueClonedResource:0",
1248 "operation": "start",
1249 "on_node": "rh7-1",
1250 },
1251 {
1252 "id": "myUniqueClonedResource:1",
1253 "long_id": "myUniqueClonedResource:1",
1254 "operation": "monitor",
1255 "on_node": "rh7-2",
1256 },
1257 {
1258 "id": "myUniqueClonedResource:2",
1259 "long_id": "myUniqueClonedResource:2",
1260 "operation": "start",
1261 "on_node": "rh7-3",
1262 },
1263 ]
1264 self.assertEqual(
1265 {
1266 "myUniqueClonedResource:0": {
1267 "id": "myUniqueClonedResource:0",
1268 "id_for_constraint": "myUniqueClone",
1269 "long_id": "myUniqueClonedResource:0",
1270 "start_on_node": "rh7-1",
1271 },
1272 "myUniqueClonedResource:2": {
1273 "id": "myUniqueClonedResource:2",
1274 "id_for_constraint": "myUniqueClone",
1275 "long_id": "myUniqueClonedResource:2",
1276 "start_on_node": "rh7-3",
1277 },
1278 },
1279 utils.get_resources_location_from_operations(cib_dom, operations),
1280 )
1281
1282 operations = [
1283 {
1284 "id": "myMasteredGroupedResource",
1285 "long_id": "myMasteredGroupedResource:0",
1286 "operation": "start",
1287 "on_node": "rh7-1",
1288 },
1289 {
1290 "id": "myMasteredGroupedResource",
1291 "long_id": "myMasteredGroupedResource:1",
1292 "operation": "demote",
1293 "on_node": "rh7-2",
1294 },
1295 {
1296 "id": "myMasteredGroupedResource",
1297 "long_id": "myMasteredGroupedResource:1",
1298 "operation": "promote",
1299 "on_node": "rh7-3",
1300 },
1301 ]
1302 self.assertEqual(
1303 {
1304 "myMasteredGroupedResource:0": {
1305 "id": "myMasteredGroupedResource",
1306 "id_for_constraint": "myGroupMaster",
1307 "long_id": "myMasteredGroupedResource:0",
1308 "start_on_node": "rh7-1",
1309 },
1310 "myMasteredGroupedResource:1": {
1311 "id": "myMasteredGroupedResource",
1312 "id_for_constraint": "myGroupMaster",
1313 "long_id": "myMasteredGroupedResource:1",
1314 "promote_on_node": "rh7-3",
1315 },
1316 },
1317 utils.get_resources_location_from_operations(cib_dom, operations),
1318 )
1319
1320 operations = [
1321 {
1322 "id": "myResource",
1323 "long_id": "myResource",
1324 "operation": "stop",
1325 "on_node": "rh7-1",
1326 },
1327 {
1328 "id": "myResource",
1329 "long_id": "myResource",
1330 "operation": "migrate_to",
1331 "on_node": "rh7-1",
1332 },
1333 {
1334 "id": "myResource",
1335 "long_id": "myResource",
1336 "operation": "migrate_from",
1337 "on_node": "rh7-2",
1338 },
1339 ]
1340 self.assertEqual(
1341 {
1342 "myResource": {
1343 "id": "myResource",
1344 "id_for_constraint": "myResource",
1345 "long_id": "myResource",
1346 "start_on_node": "rh7-2",
1347 },
1348 },
1349 utils.get_resources_location_from_operations(cib_dom, operations),
1350 )
1351
1352 def test_is_int(self):
1353 self.assertTrue(utils.is_int("-999"))
1354 self.assertTrue(utils.is_int("-1"))
1355 self.assertTrue(utils.is_int("0"))
1356 self.assertTrue(utils.is_int("1"))
1357 self.assertTrue(utils.is_int("99999"))
1358 self.assertTrue(utils.is_int(" 99999 "))
1359 self.assertFalse(utils.is_int("0.0"))
1360 self.assertFalse(utils.is_int("-1.0"))
1361 self.assertFalse(utils.is_int("-0.1"))
1362 self.assertFalse(utils.is_int("0.001"))
1363 self.assertFalse(utils.is_int("-999999.1"))
1364 self.assertFalse(utils.is_int("0.0001"))
1365 self.assertFalse(utils.is_int(""))
1366 self.assertFalse(utils.is_int(" "))
1367 self.assertFalse(utils.is_int("A"))
1368 self.assertFalse(utils.is_int("random 15 47 text "))
1369
1370 def test_dom_get_node(self):
1371 cib = self.get_cib_with_nodes_minidom()
1372 self.assertIsNone(utils.dom_get_node(cib, "non-existing-node"))
1373 node = utils.dom_get_node(cib, "rh7-1")
1374 self.assertEqual(node.getAttribute("uname"), "rh7-1")
1375 self.assertEqual(node.getAttribute("id"), "1")
1376
1377 def test_dom_prepare_child_element(self):
1378 cib = self.get_cib_with_nodes_minidom()
1379 node = cib.getElementsByTagName("node")[0]
1380 self.assertEqual(len(dom_get_child_elements(node)), 0)
1381 child = utils.dom_prepare_child_element(
1382 node, "utilization", "rh7-1-utilization"
1383 )
1384 self.assertEqual(len(dom_get_child_elements(node)), 1)
1385 self.assertEqual(child, dom_get_child_elements(node)[0])
1386 self.assertEqual(dom_get_child_elements(node)[0].tagName, "utilization")
1387 self.assertEqual(
1388 dom_get_child_elements(node)[0].getAttribute("id"),
1389 "rh7-1-utilization",
1390 )
1391 child2 = utils.dom_prepare_child_element(
1392 node, "utilization", "rh7-1-utilization"
1393 )
1394 self.assertEqual(len(dom_get_child_elements(node)), 1)
1395 self.assertEqual(child, child2)
1396
1397 def test_dom_update_nv_pair_add(self):
1398 nv_set = xml.dom.minidom.parseString("<nvset/>").documentElement
1399 utils.dom_update_nv_pair(nv_set, "test_name", "test_val", "prefix-")
1400 self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1401 pair = dom_get_child_elements(nv_set)[0]
1402 self.assertEqual(pair.getAttribute("name"), "test_name")
1403 self.assertEqual(pair.getAttribute("value"), "test_val")
1404 self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1405 utils.dom_update_nv_pair(nv_set, "another_name", "value", "prefix2-")
1406 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1407 self.assertEqual(pair, dom_get_child_elements(nv_set)[0])
1408 pair = dom_get_child_elements(nv_set)[1]
1409 self.assertEqual(pair.getAttribute("name"), "another_name")
1410 self.assertEqual(pair.getAttribute("value"), "value")
1411 self.assertEqual(pair.getAttribute("id"), "prefix2-another_name")
1412
1413 def test_dom_update_nv_pair_update(self):
1414 nv_set = xml.dom.minidom.parseString(
1415 """
1416 <nv_set>
1417 <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1418 <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1419 </nv_set>
1420 """
1421 ).documentElement
1422 utils.dom_update_nv_pair(nv_set, "test_name", "new_value")
1423 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1424 pair1 = dom_get_child_elements(nv_set)[0]
1425 pair2 = dom_get_child_elements(nv_set)[1]
1426 self.assertEqual(pair1.getAttribute("name"), "test_name")
1427 self.assertEqual(pair1.getAttribute("value"), "new_value")
1428 self.assertEqual(pair1.getAttribute("id"), "prefix-test_name")
1429 self.assertEqual(pair2.getAttribute("name"), "another_name")
1430 self.assertEqual(pair2.getAttribute("value"), "value")
1431 self.assertEqual(pair2.getAttribute("id"), "prefix2-another_name")
1432
1433 def test_dom_update_nv_pair_remove(self):
1434 nv_set = xml.dom.minidom.parseString(
1435 """
1436 <nv_set>
1437 <nvpair id="prefix-test_name" name="test_name" value="test_val"/>
1438 <nvpair id="prefix2-another_name" name="another_name" value="value"/>
1439 </nv_set>
1440 """
1441 ).documentElement
1442 utils.dom_update_nv_pair(nv_set, "non_existing_name", "")
1443 self.assertEqual(len(dom_get_child_elements(nv_set)), 2)
1444 utils.dom_update_nv_pair(nv_set, "another_name", "")
1445 self.assertEqual(len(dom_get_child_elements(nv_set)), 1)
1446 pair = dom_get_child_elements(nv_set)[0]
1447 self.assertEqual(pair.getAttribute("name"), "test_name")
1448 self.assertEqual(pair.getAttribute("value"), "test_val")
1449 self.assertEqual(pair.getAttribute("id"), "prefix-test_name")
1450 utils.dom_update_nv_pair(nv_set, "test_name", "")
1451 self.assertEqual(len(dom_get_child_elements(nv_set)), 0)
1452
1453 def test_convert_args_to_tuples(self):
1454 out = utils.convert_args_to_tuples(
1455 ["invalid_string", "key=value", "key2=val=ue", "k e y= v a l u e "]
1456 )
1457 self.assertEqual(
1458 out,
1459 [("key", "value"), ("key2", "val=ue"), ("k e y", " v a l u e ")],
1460 )
1461
1462 def test_dom_update_utilization_invalid(self):
1463 # commands writes to stderr
1464 # we want clean test output, so we capture it
1465 tmp_stderr = sys.stderr
1466 sys.stderr = StringIO()
1467
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1468 el = xml.dom.minidom.parseString(
1469 """
1470 <resource id="test_id"/>
1471 """
1472 ).documentElement
1473 self.assertRaises(
1474 SystemExit,
1475 utils.dom_update_utilization,
1476 el,
1477 {"name": "invalid_val"},
1478 )
1479
1480 self.assertRaises(
1481 SystemExit, utils.dom_update_utilization, el, {"name": "0.01"}
1482 )
1483
1484 sys.stderr = tmp_stderr
1485
1486 def test_dom_update_utilization_add(self):
1487 el = xml.dom.minidom.parseString(
1488 """
1489 <resource id="test_id"/>
1490 """
1491 ).documentElement
1492 utils.dom_update_utilization(
1493 el,
1494 {
1495 "name": "",
1496 "key": "-1",
1497 "keys": "90",
1498 },
1499 )
1500
1501 self.assertEqual(len(dom_get_child_elements(el)), 1)
1502 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1503 self.assertEqual(u.tagName, "utilization")
1504 self.assertEqual(u.getAttribute("id"), "test_id-utilization")
1505 self.assertEqual(len(dom_get_child_elements(u)), 2)
1506
1507 self.assertEqual(
1508 dom_get_child_elements(u)[0].getAttribute("id"),
1509 "test_id-utilization-key",
1510 )
1511 self.assertEqual(
1512 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1513 )
1514 self.assertEqual(
1515 dom_get_child_elements(u)[0].getAttribute("value"), "-1"
1516 )
1517 self.assertEqual(
1518 dom_get_child_elements(u)[1].getAttribute("id"),
1519 "test_id-utilization-keys",
1520 )
1521 self.assertEqual(
1522 dom_get_child_elements(u)[1].getAttribute("name"), "keys"
1523 )
1524 self.assertEqual(
1525 dom_get_child_elements(u)[1].getAttribute("value"), "90"
1526 )
1527
1528 def test_dom_update_utilization_update_remove(self):
1529 el = xml.dom.minidom.parseString(
1530 """
1531 <resource id="test_id">
1532 <utilization id="test_id-utilization">
1533 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1534 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1535 </utilization>
1536 </resource>
1537 """
1538 ).documentElement
1539 utils.dom_update_utilization(
1540 el,
1541 {
1542 "key": "100",
1543 "keys": "",
1544 },
1545 )
1546
1547 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1548 self.assertEqual(len(dom_get_child_elements(u)), 1)
1549 self.assertEqual(
1550 dom_get_child_elements(u)[0].getAttribute("id"),
1551 "test_id-utilization-key",
1552 )
1553 self.assertEqual(
1554 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1555 )
1556 self.assertEqual(
1557 dom_get_child_elements(u)[0].getAttribute("value"), "100"
1558 )
1559
1560 def test_dom_update_meta_attr_add(self):
1561 el = xml.dom.minidom.parseString(
1562 """
1563 <resource id="test_id"/>
1564 """
1565 ).documentElement
1566 utils.dom_update_meta_attr(
1567 el, [("name", ""), ("key", "test"), ("key2", "val")]
1568 )
1569
1570 self.assertEqual(len(dom_get_child_elements(el)), 1)
1571 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1572 self.assertEqual(u.tagName, "meta_attributes")
1573 self.assertEqual(u.getAttribute("id"), "test_id-meta_attributes")
1574 self.assertEqual(len(dom_get_child_elements(u)), 2)
1575
1576 self.assertEqual(
1577 dom_get_child_elements(u)[0].getAttribute("id"),
1578 "test_id-meta_attributes-key",
1579 )
1580 self.assertEqual(
1581 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1582 )
1583 self.assertEqual(
1584 dom_get_child_elements(u)[0].getAttribute("value"), "test"
1585 )
1586 self.assertEqual(
1587 dom_get_child_elements(u)[1].getAttribute("id"),
1588 "test_id-meta_attributes-key2",
1589 )
1590 self.assertEqual(
1591 dom_get_child_elements(u)[1].getAttribute("name"), "key2"
1592 )
1593 self.assertEqual(
1594 dom_get_child_elements(u)[1].getAttribute("value"), "val"
1595 )
1596
1597 def test_dom_update_meta_attr_update_remove(self):
1598 el = xml.dom.minidom.parseString(
1599 """
1600 <resource id="test_id">
1601 <meta_attributes id="test_id-utilization">
1602 <nvpair id="test_id-meta_attributes-key" name="key" value="test"/>
1603 <nvpair id="test_id-meta_attributes-key2" name="key2" value="val"/>
1604 </meta_attributes>
1605 </resource>
1606 """
1607 ).documentElement
1608 utils.dom_update_meta_attr(el, [("key", "another_val"), ("key2", "")])
1609
1610 u = dom_get_child_elements(el)[0] # pylint: disable=invalid-name
1611 self.assertEqual(len(dom_get_child_elements(u)), 1)
1612 self.assertEqual(
1613 dom_get_child_elements(u)[0].getAttribute("id"),
1614 "test_id-meta_attributes-key",
1615 )
1616 self.assertEqual(
1617 dom_get_child_elements(u)[0].getAttribute("name"), "key"
1618 )
1619 self.assertEqual(
1620 dom_get_child_elements(u)[0].getAttribute("value"), "another_val"
1621 )
1622
1623 def test_get_utilization(self):
1624 el = xml.dom.minidom.parseString(
1625 """
1626 <resource id="test_id">
1627 <utilization id="test_id-utilization">
1628 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1629 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1630 </utilization>
1631 </resource>
1632 """
1633 ).documentElement
1634 self.assertEqual({"key": "-1", "keys": "90"}, utils.get_utilization(el))
1635
1636 def test_get_utilization_str(self):
1637 el = xml.dom.minidom.parseString(
1638 """
1639 <resource id="test_id">
1640 <utilization id="test_id-utilization">
1641 <nvpair id="test_id-utilization-key" name="key" value="-1"/>
1642 <nvpair id="test_id-utilization-keys" name="keys" value="90"/>
1643 </utilization>
1644 </resource>
1645 """
1646 ).documentElement
1647 self.assertEqual("key=-1 keys=90", utils.get_utilization_str(el))
1648
1649 def assert_element_id(self, node, node_id, tag=None):
1650 self.assertTrue(
1651 isinstance(node, xml.dom.minidom.Element),
1652 "element with id '%s' not found" % node_id,
1653 )
1654 self.assertEqual(node.getAttribute("id"), node_id)
1655 if tag:
1656 self.assertEqual(node.tagName, tag)
1657
1658
1659 class RunParallelTest(TestCase):
1660 @staticmethod
1661 def fixture_create_worker(log, name, sleepSeconds=0):
1662 # pylint: disable=invalid-name
1663 def worker():
1664 sleep(sleepSeconds)
1665 log.append(name)
1666
1667 return worker
1668
1669 def test_run_all_workers(self):
1670 log = []
1671 utils.run_parallel(
1672 [
1673 self.fixture_create_worker(log, "first"),
1674 self.fixture_create_worker(log, "second"),
1675 ],
1676 wait_seconds=0.1,
1677 )
1678
1679 self.assertEqual(sorted(log), sorted(["first", "second"]))
1680
1681
1682 class NodeActionTaskTest(TestCase):
1683 def test_can_run_action(self):
1684 def action(node, arg, kwarg=None):
1685 return (0, ":".join([node, arg, kwarg]))
1686
1687 report_list = []
1688
1689 def report(node, returncode, output):
1690 report_list.append("|".join([node, str(returncode), output]))
1691
1692 task = utils.create_task(report, action, "node", "arg", kwarg="kwarg")
1693 task()
1694
1695 self.assertEqual(["node|0|node:arg:kwarg"], report_list)
1696
1697
1698 class TouchCibFile(TestCase):
1699 @mock.patch("pcs.utils.os.path.isfile", mock.Mock(return_value=False))
1700 @mock.patch(
1701 "pcs.utils.write_empty_cib",
1702 mock.Mock(side_effect=EnvironmentError("some message")),
1703 )
1704 @mock.patch("pcs.utils.err")
1705 def test_exception_is_transformed_correctly(self, err):
1706 # pylint: disable=no-self-use
1707 filename = "/fake/filename"
1708 utils.touch_cib_file(filename)
1709 err.assert_called_once_with(
1710 "Unable to write to file: '/fake/filename': 'some message'"
1711 )
1712