1 import sys
2 import xml.dom.minidom
3 from enum import Enum
4 from typing import (
5 Any,
6 Iterable,
7 Optional,
8 Set,
9 TypeVar,
10 cast,
11 )
12 from xml.dom.minidom import parseString
13
14 import pcs.cli.constraint_order.command as order_command
15 from pcs import utils
16 from pcs.cli.common import parse_args
17 from pcs.cli.common.errors import (
18 CmdLineInputError,
19 raise_command_replaced,
20 )
21 from pcs.cli.common.output import (
22 INDENT_STEP,
23 lines_to_str,
24 )
25 from pcs.cli.constraint.location.command import (
26 RESOURCE_TYPE_REGEXP,
27 RESOURCE_TYPE_RESOURCE,
28 )
29 from pcs.cli.constraint.output import (
30 CibConstraintLocationAnyDto,
31 filter_constraints_by_rule_expired_status,
32 location,
33 print_config,
34 )
35 from pcs.cli.reports import process_library_reports
36 from pcs.cli.reports.output import (
37 deprecation_warning,
38 print_to_stderr,
39 warn,
40 )
41 from pcs.common import (
42 const,
43 pacemaker,
44 reports,
45 )
46 from pcs.common.pacemaker.constraint import (
47 CibConstraintColocationSetDto,
48 CibConstraintLocationSetDto,
49 CibConstraintOrderSetDto,
50 CibConstraintsDto,
51 CibConstraintTicketSetDto,
52 get_all_constraints_ids,
53 )
54 from pcs.common.pacemaker.resource.list import CibResourcesDto
55 from pcs.common.pacemaker.types import CibResourceDiscovery
56 from pcs.common.reports import ReportItem
57 from pcs.common.str_tools import (
58 format_list,
59 indent,
60 )
61 from pcs.common.types import (
62 StringCollection,
63 StringIterable,
64 StringSequence,
65 )
66 from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
67 from pcs.lib.node import get_existing_nodes_names
68 from pcs.lib.pacemaker.values import (
69 SCORE_INFINITY,
70 is_true,
71 sanitize_id,
72 )
73
74 # pylint: disable=invalid-name
75 # pylint: disable=too-many-branches
76 # pylint: disable=too-many-lines
77 # pylint: disable=too-many-locals
78 # pylint: disable=too-many-statements
79
80 DEFAULT_ACTION = const.PCMK_ACTION_START
81 DEFAULT_ROLE = const.PCMK_ROLE_STARTED
82
83 OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
84
85 LOCATION_NODE_VALIDATION_SKIP_MSG = (
86 "Validation for node existence in the cluster will be skipped"
87 )
88 STANDALONE_SCORE_MSG = (
89 "Specifying score as a standalone value is deprecated and "
90 "might be removed in a future release, use score=value instead"
91 )
92
93
94 class CrmRuleReturnCode(Enum):
95 IN_EFFECT = 0
96 EXPIRED = 110
97 TO_BE_IN_EFFECT = 111
98
99
100 def constraint_order_cmd(lib, argv, modifiers):
101 sub_cmd = "config" if not argv else argv.pop(0)
102
103 try:
104 if sub_cmd == "set":
105 order_command.create_with_set(lib, argv, modifiers)
106 elif sub_cmd in ["remove", "delete"]:
107 order_rm(lib, argv, modifiers)
108 elif sub_cmd == "show":
109 raise_command_replaced(
110 ["pcs constraint order config"], pcs_version="0.12"
111 )
112 elif sub_cmd == "config":
113 order_command.config_cmd(lib, argv, modifiers)
114 else:
115 order_start(lib, [sub_cmd] + argv, modifiers)
116 except CmdLineInputError as e:
117 utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
118
119
120 def config_cmd(
121 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
122 ) -> None:
123 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
124 if argv:
125 raise CmdLineInputError()
126
127 print_config(
128 cast(
129 CibConstraintsDto,
130 lib.constraint.get_config(evaluate_rules=True),
131 ),
132 modifiers,
133 )
134
135
136 def _validate_constraint_resource(cib_dom, resource_id):
137 (
138 resource_valid,
139 resource_error,
140 dummy_correct_id,
141 ) = utils.validate_constraint_resource(cib_dom, resource_id)
142 if not resource_valid:
143 utils.err(resource_error)
144
145
146 def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
147 if not utils.validate_resources_not_in_same_group(
148 cib_dom, resource1, resource2
149 ):
150 utils.err(
151 "Cannot create an order constraint for resources in the same group"
152 )
153
154
155 # Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
156 # possible commands:
157 # <src> with <tgt> [score] [options]
158 # <src> with <role> <tgt> [score] [options]
159 # <role> <src> with <tgt> [score] [options]
160 # <role> <src> with <role> <tgt> [score] [options]
161 # Specifying score as a single argument is deprecated, though. The correct way
162 # is score=value in options.
163 def colocation_add(lib, argv, modifiers): # noqa: PLR0912, PLR0915
164 """
165 Options:
166 * -f - CIB file
167 * --force - allow constraint on any resource, allow duplicate constraints
168 """
169
170 def _parse_score_options(argv):
171 # When passed an array of arguments if the first argument doesn't have
172 # an '=' then it's the score, otherwise they're all arguments. Return a
173 # tuple with the score and array of name,value pairs
174 """
175 Commandline options: no options
176 """
177 if not argv:
178 return None, []
179 score = None
180 if "=" not in argv[0]:
181 score = argv.pop(0)
182 # TODO added to pcs in the first 0.12.x version
183 deprecation_warning(STANDALONE_SCORE_MSG)
184
185 # create a list of 2-tuples (name, value)
186 arg_array = [
187 parse_args.split_option(arg, allow_empty_value=False)
188 for arg in argv
189 ]
190 return score, arg_array
191
192 del lib
193 modifiers.ensure_only_supported("-f", "--force")
194 if len(argv) < 3:
195 raise CmdLineInputError()
196
197 role1 = ""
198 role2 = ""
199
200 cib_dom = utils.get_cib_dom()
201 new_roles_supported = utils.isCibVersionSatisfied(
202 cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
203 )
204
205 def _validate_and_prepare_role(role):
206 role_cleaned = role.lower().capitalize()
207 if role_cleaned not in const.PCMK_ROLES:
208 utils.err(
209 "invalid role value '{0}', allowed values are: {1}".format(
210 role, format_list(const.PCMK_ROLES)
211 )
212 )
213 return pacemaker.role.get_value_for_cib(
214 role_cleaned, new_roles_supported
215 )
216
217 if argv[2] == "with":
218 role1 = _validate_and_prepare_role(argv.pop(0))
219 resource1 = argv.pop(0)
220 elif argv[1] == "with":
221 resource1 = argv.pop(0)
222 else:
223 raise CmdLineInputError()
224
225 if argv.pop(0) != "with":
226 raise CmdLineInputError()
227 if "with" in argv:
228 raise CmdLineInputError(
229 message="Multiple 'with's cannot be specified.",
230 hint=(
231 "Use the 'pcs constraint colocation set' command if you want "
232 "to create a constraint for more than two resources."
233 ),
234 show_both_usage_and_message=True,
235 )
236
237 if not argv:
238 raise CmdLineInputError()
239 if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
240 resource2 = argv.pop(0)
241 else:
242 role2 = _validate_and_prepare_role(argv.pop(0))
243 resource2 = argv.pop(0)
244
245 score, nv_pairs = _parse_score_options(argv)
246
247 _validate_constraint_resource(cib_dom, resource1)
248 _validate_constraint_resource(cib_dom, resource2)
249
250 id_in_nvpairs = None
251 for name, value in nv_pairs:
252 if name == "id":
253 id_valid, id_error = utils.validate_xml_id(value, "constraint id")
254 if not id_valid:
255 utils.err(id_error)
256 if utils.does_id_exist(cib_dom, value):
257 utils.err(
258 "id '%s' is already in use, please specify another one"
259 % value
260 )
261 id_in_nvpairs = True
262 elif name == "score":
263 score = value
264 if score is None:
265 score = SCORE_INFINITY
266 if not id_in_nvpairs:
267 nv_pairs.append(
268 (
269 "id",
270 utils.find_unique_id(
271 cib_dom,
272 "colocation-%s-%s-%s" % (resource1, resource2, score),
273 ),
274 )
275 )
276
277 (dom, constraintsElement) = getCurrentConstraints(cib_dom)
278
279 # If one role is specified, the other should default to "started"
280 if role1 != "" and role2 == "":
281 role2 = DEFAULT_ROLE
282 if role2 != "" and role1 == "":
283 role1 = DEFAULT_ROLE
284 element = dom.createElement("rsc_colocation")
285 element.setAttribute("rsc", resource1)
286 element.setAttribute("with-rsc", resource2)
287 element.setAttribute("score", score)
288 if role1 != "":
289 element.setAttribute("rsc-role", role1)
290 if role2 != "":
291 element.setAttribute("with-rsc-role", role2)
292 for nv_pair in nv_pairs:
293 element.setAttribute(nv_pair[0], nv_pair[1])
294 if not modifiers.get("--force"):
295
296 def _constraint_export(constraint_info):
297 options_dict = constraint_info["options"]
298 co_resource1 = options_dict.get("rsc", "")
299 co_resource2 = options_dict.get("with-rsc", "")
300 co_id = options_dict.get("id", "")
301 co_score = options_dict.get("score", "")
302 score_text = "(score:" + co_score + ")"
303 console_option_list = [
304 f"({option[0]}:{option[1]})"
305 for option in sorted(options_dict.items())
306 if option[0] not in ("rsc", "with-rsc", "id", "score")
307 ]
308 console_option_list.append(f"(id:{co_id})")
309 return " ".join(
310 [co_resource1, "with", co_resource2, score_text]
311 + console_option_list
312 )
313
314 duplicates = colocation_find_duplicates(constraintsElement, element)
315 if duplicates:
316 utils.err(
317 "duplicate constraint already exists, use --force to override\n"
318 + "\n".join(
319 [
320 " "
321 + _constraint_export(
322 {"options": dict(dup.attributes.items())}
323 )
324 for dup in duplicates
325 ]
326 )
327 )
328 constraintsElement.appendChild(element)
329 utils.replace_cib_configuration(dom)
330
331
332 def colocation_find_duplicates(dom, constraint_el):
333 """
334 Commandline options: no options
335 """
336 new_roles_supported = utils.isCibVersionSatisfied(
337 dom, const.PCMK_NEW_ROLES_CIB_VERSION
338 )
339
340 def normalize(const_el):
341 return (
342 const_el.getAttribute("rsc"),
343 const_el.getAttribute("with-rsc"),
344 pacemaker.role.get_value_for_cib(
345 const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
346 new_roles_supported,
347 ),
348 pacemaker.role.get_value_for_cib(
349 const_el.getAttribute("with-rsc-role").capitalize()
350 or DEFAULT_ROLE,
351 new_roles_supported,
352 ),
353 )
354
355 normalized_el = normalize(constraint_el)
356 return [
357 other_el
358 for other_el in dom.getElementsByTagName("rsc_colocation")
359 if not other_el.getElementsByTagName("resource_set")
360 and constraint_el is not other_el
361 and normalized_el == normalize(other_el)
362 ]
363
364
365 def order_rm(lib, argv, modifiers):
366 """
367 Options:
368 * -f - CIB file
369 """
370 del lib
371 modifiers.ensure_only_supported("-f")
372 if not argv:
373 raise CmdLineInputError()
374
375 elementFound = False
376 (dom, constraintsElement) = getCurrentConstraints()
377
378 for resource in argv:
379 for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
380 if (
381 ord_loc.getAttribute("first") == resource
382 or ord_loc.getAttribute("then") == resource
383 ):
384 constraintsElement.removeChild(ord_loc)
385 elementFound = True
386
387 resource_refs_to_remove = []
388 for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
389 if ord_set.getAttribute("id") == resource:
390 resource_refs_to_remove.append(ord_set)
391 elementFound = True
392
393 for res_ref in resource_refs_to_remove:
394 res_set = res_ref.parentNode
395 res_order = res_set.parentNode
396
397 res_ref.parentNode.removeChild(res_ref)
398 if not res_set.getElementsByTagName("resource_ref"):
399 res_set.parentNode.removeChild(res_set)
400 if not res_order.getElementsByTagName("resource_set"):
401 res_order.parentNode.removeChild(res_order)
402
403 if elementFound:
404 utils.replace_cib_configuration(dom)
405 else:
406 utils.err("No matching resources found in ordering list")
407
408
409 def order_start(lib, argv, modifiers):
410 """
411 Options:
412 * -f - CIB file
413 * --force - allow constraint for any resource, allow duplicate constraints
414 """
415 del lib
416 modifiers.ensure_only_supported("-f", "--force")
417 if len(argv) < 3:
418 raise CmdLineInputError()
419
420 first_action = DEFAULT_ACTION
421 then_action = DEFAULT_ACTION
422 action = argv[0]
423 if action in const.PCMK_ACTIONS:
424 first_action = action
425 argv.pop(0)
426
427 resource1 = argv.pop(0)
428 if argv.pop(0) != "then":
429 raise CmdLineInputError()
430
431 if not argv:
432 raise CmdLineInputError()
433
434 action = argv[0]
435 if action in const.PCMK_ACTIONS:
436 then_action = action
437 argv.pop(0)
438
439 if not argv:
440 raise CmdLineInputError()
441 resource2 = argv.pop(0)
442
443 order_options = []
444 if argv:
445 order_options = order_options + argv[:]
446 if "then" in order_options:
447 raise CmdLineInputError(
448 message="Multiple 'then's cannot be specified.",
449 hint=(
450 "Use the 'pcs constraint order set' command if you want to "
451 "create a constraint for more than two resources."
452 ),
453 show_both_usage_and_message=True,
454 )
455
456 order_options.append("first-action=" + first_action)
457 order_options.append("then-action=" + then_action)
458 _order_add(resource1, resource2, order_options, modifiers)
459
460
461 def _order_add(resource1, resource2, options_list, modifiers): # noqa: PLR0912, PLR0915
462 """
463 Commandline options:
464 * -f - CIB file
465 * --force - allow constraint for any resource, allow duplicate constraints
466 """
467 cib_dom = utils.get_cib_dom()
468 _validate_constraint_resource(cib_dom, resource1)
469 _validate_constraint_resource(cib_dom, resource2)
470
471 _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
472
473 order_options = []
474 id_specified = False
475 sym = None
476 for arg in options_list:
477 if arg == "symmetrical":
478 sym = "true"
479 elif arg == "nonsymmetrical":
480 sym = "false"
481 else:
482 name, value = parse_args.split_option(arg, allow_empty_value=False)
483 if name == "id":
484 id_valid, id_error = utils.validate_xml_id(
485 value, "constraint id"
486 )
487 if not id_valid:
488 utils.err(id_error)
489 if utils.does_id_exist(cib_dom, value):
490 utils.err(
491 "id '%s' is already in use, please specify another one"
492 % value
493 )
494 id_specified = True
495 order_options.append((name, value))
496 elif name == "symmetrical":
497 if value.lower() in OPTIONS_SYMMETRICAL:
498 sym = value.lower()
499 else:
500 utils.err(
501 "invalid symmetrical value '%s', allowed values are: %s"
502 % (value, ", ".join(OPTIONS_SYMMETRICAL))
503 )
504 else:
505 order_options.append((name, value))
506 if sym:
507 order_options.append(("symmetrical", sym))
508
509 options = ""
510 if order_options:
511 options = " (Options: %s)" % " ".join(
512 [
513 "%s=%s" % (name, value)
514 for name, value in order_options
515 if name not in ("kind", "score")
516 ]
517 )
518
519 scorekind = "kind: Mandatory"
520 id_suffix = "mandatory"
521 for opt in order_options:
522 if opt[0] == "score":
523 scorekind = "score: " + opt[1]
524 id_suffix = opt[1]
525 # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
526 # added to pcs after 0.11.7
527 deprecation_warning(
528 reports.messages.DeprecatedOption(opt[0], []).message
529 )
530 break
531 if opt[0] == "kind":
532 scorekind = "kind: " + opt[1]
533 id_suffix = opt[1]
534 break
535
536 if not id_specified:
537 order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
538 order_id = utils.find_unique_id(cib_dom, order_id)
539 order_options.append(("id", order_id))
540
541 (dom, constraintsElement) = getCurrentConstraints()
542 element = dom.createElement("rsc_order")
543 element.setAttribute("first", resource1)
544 element.setAttribute("then", resource2)
545 for order_opt in order_options:
546 element.setAttribute(order_opt[0], order_opt[1])
547 constraintsElement.appendChild(element)
548 if not modifiers.get("--force"):
549
550 def _constraint_export(constraint_info):
551 options = constraint_info["options"]
552 oc_resource1 = options.get("first", "")
553 oc_resource2 = options.get("then", "")
554 first_action = options.get("first-action", "")
555 then_action = options.get("then-action", "")
556 oc_id = options.get("id", "")
557 oc_score = options.get("score", "")
558 oc_kind = options.get("kind", "")
559 oc_sym = ""
560 oc_id_out = ""
561 oc_options = ""
562 if "symmetrical" in options and not is_true(
563 options.get("symmetrical", "false")
564 ):
565 oc_sym = "(non-symmetrical)"
566 if oc_kind != "":
567 score_text = "(kind:" + oc_kind + ")"
568 elif oc_kind == "" and oc_score == "":
569 score_text = "(kind:Mandatory)"
570 else:
571 score_text = "(score:" + oc_score + ")"
572 oc_id_out = "(id:" + oc_id + ")"
573 already_processed_options = (
574 "first",
575 "then",
576 "first-action",
577 "then-action",
578 "id",
579 "score",
580 "kind",
581 "symmetrical",
582 )
583 oc_options = " ".join(
584 [
585 f"{name}={value}"
586 for name, value in options.items()
587 if name not in already_processed_options
588 ]
589 )
590 if oc_options:
591 oc_options = "(Options: " + oc_options + ")"
592 return " ".join(
593 [
594 arg
595 for arg in [
596 first_action,
597 oc_resource1,
598 "then",
599 then_action,
600 oc_resource2,
601 score_text,
602 oc_sym,
603 oc_options,
604 oc_id_out,
605 ]
606 if arg
607 ]
608 )
609
610 duplicates = order_find_duplicates(constraintsElement, element)
611 if duplicates:
612 utils.err(
613 "duplicate constraint already exists, use --force to override\n"
614 + "\n".join(
615 [
616 " "
617 + _constraint_export(
618 {"options": dict(dup.attributes.items())}
619 )
620 for dup in duplicates
621 ]
622 )
623 )
624 print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
625 utils.replace_cib_configuration(dom)
626
627
628 def order_find_duplicates(dom, constraint_el):
629 """
630 Commandline options: no options
631 """
632
633 def normalize(constraint_el):
634 return (
635 constraint_el.getAttribute("first"),
636 constraint_el.getAttribute("then"),
637 constraint_el.getAttribute("first-action").lower()
638 or DEFAULT_ACTION,
639 constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
640 )
641
642 normalized_el = normalize(constraint_el)
643 return [
644 other_el
645 for other_el in dom.getElementsByTagName("rsc_order")
646 if not other_el.getElementsByTagName("resource_set")
647 and constraint_el is not other_el
648 and normalized_el == normalize(other_el)
649 ]
650
651
652 _SetConstraint = TypeVar(
653 "_SetConstraint",
654 CibConstraintLocationSetDto,
655 CibConstraintColocationSetDto,
656 CibConstraintOrderSetDto,
657 CibConstraintTicketSetDto,
658 )
659
660
661 def _filter_set_constraints_by_resources(
662 constraints_dto: Iterable[_SetConstraint], resources: Set[str]
663 ) -> list[_SetConstraint]:
664 return [
665 constraint_set_dto
666 for constraint_set_dto in constraints_dto
667 if any(
668 set(resource_set.resources_ids) & resources
669 for resource_set in constraint_set_dto.resource_sets
670 )
671 ]
672
673
674 def _filter_constraints_by_resources(
675 constraints_dto: CibConstraintsDto,
676 resources: StringIterable,
677 patterns: StringIterable,
678 ) -> CibConstraintsDto:
679 required_resources_set = set(resources)
680 required_patterns_set = set(patterns)
681 return CibConstraintsDto(
682 location=[
683 constraint_dto
684 for constraint_dto in constraints_dto.location
685 if (
686 constraint_dto.resource_id is not None
687 and constraint_dto.resource_id in required_resources_set
688 )
689 or (
690 constraint_dto.resource_pattern is not None
691 and constraint_dto.resource_pattern in required_patterns_set
692 )
693 ],
694 location_set=_filter_set_constraints_by_resources(
695 constraints_dto.location_set, required_resources_set
696 ),
697 colocation=[
698 constraint_dto
699 for constraint_dto in constraints_dto.colocation
700 if {constraint_dto.resource_id, constraint_dto.with_resource_id}
701 & required_resources_set
702 ],
703 colocation_set=_filter_set_constraints_by_resources(
704 constraints_dto.colocation_set, required_resources_set
705 ),
706 order=[
707 constraint_dto
708 for constraint_dto in constraints_dto.order
709 if {
710 constraint_dto.first_resource_id,
711 constraint_dto.then_resource_id,
712 }
713 & required_resources_set
714 ],
715 order_set=_filter_set_constraints_by_resources(
716 constraints_dto.order_set, required_resources_set
717 ),
718 ticket=[
719 constraint_dto
720 for constraint_dto in constraints_dto.ticket
721 if constraint_dto.resource_id in required_resources_set
722 ],
723 ticket_set=_filter_set_constraints_by_resources(
724 constraints_dto.ticket_set, required_resources_set
725 ),
726 )
727
728
729 def _filter_location_by_node_base(
730 constraint_dtos: Iterable[CibConstraintLocationAnyDto],
731 nodes: StringCollection,
732 ) -> list[CibConstraintLocationAnyDto]:
733 return [
734 constraint_dto
735 for constraint_dto in constraint_dtos
736 if constraint_dto.attributes.node is not None
737 and constraint_dto.attributes.node in nodes
738 ]
739
740
741 def location_config_cmd(
742 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
743 ) -> None:
744 """
745 Options:
746 * --all - print expired constraints
747 * --full - print all details
748 * -f - CIB file
749 """
750 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
751 filter_type: Optional[str] = None
752 filter_items: parse_args.Argv = []
753 if argv:
754 filter_type, *filter_items = argv
755 allowed_types = ("resources", "nodes")
756 if filter_type not in allowed_types:
757 raise CmdLineInputError(
758 f"Unknown keyword '{filter_type}'. Allowed keywords: "
759 f"{format_list(allowed_types)}"
760 )
761 if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
762 raise CmdLineInputError(
763 "Output formats other than 'text' are not supported together "
764 "with grouping and filtering by nodes or resources"
765 )
766
767 constraints_dto = filter_constraints_by_rule_expired_status(
768 lib.constraint.get_config(evaluate_rules=True),
769 modifiers.is_specified("--all"),
770 )
771
772 constraints_dto = CibConstraintsDto(
773 location=constraints_dto.location,
774 location_set=constraints_dto.location_set,
775 )
776
777 def _print_lines(lines: StringSequence) -> None:
778 if lines:
779 print("Location Constraints:")
780 print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
781
782 if filter_type == "resources":
783 if filter_items:
784 resources = []
785 patterns = []
786 for item in filter_items:
787 item_type, item_value = parse_args.parse_typed_arg(
788 item,
789 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
790 RESOURCE_TYPE_RESOURCE,
791 )
792 if item_type == RESOURCE_TYPE_RESOURCE:
793 resources.append(item_value)
794 elif item_type == RESOURCE_TYPE_REGEXP:
795 patterns.append(item_value)
796 constraints_dto = _filter_constraints_by_resources(
797 constraints_dto, resources, patterns
798 )
799 _print_lines(
800 location.constraints_to_grouped_by_resource_text(
801 constraints_dto.location,
802 modifiers.is_specified("--full"),
803 )
804 )
805 return
806 if filter_type == "nodes":
807 if filter_items:
808 constraints_dto = CibConstraintsDto(
809 location=_filter_location_by_node_base(
810 constraints_dto.location, filter_items
811 ),
812 location_set=_filter_location_by_node_base(
813 constraints_dto.location_set, filter_items
814 ),
815 )
816 _print_lines(
817 location.constraints_to_grouped_by_node_text(
818 constraints_dto.location,
819 modifiers.is_specified("--full"),
820 )
821 )
822 return
823
824 print_config(constraints_dto, modifiers)
825
826
827 def _verify_node_name(node, existing_nodes):
828 report_list = []
829 if node not in existing_nodes:
830 report_list.append(
831 ReportItem.error(
832 reports.messages.NodeNotFound(node),
833 force_code=reports.codes.FORCE,
834 )
835 )
836 return report_list
837
838
839 def _verify_score(score):
840 if not utils.is_score(score):
841 utils.err(
842 "invalid score '%s', use integer or INFINITY or -INFINITY" % score
843 )
844
845
846 def location_prefer( # noqa: PLR0912
847 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
848 ) -> None:
849 """
850 Options:
851 * --force - allow unknown options, allow constraint for any resource type
852 * -f - CIB file
853 """
854 modifiers.ensure_only_supported("--force", "-f")
855 rsc = argv.pop(0)
856 prefer_option = argv.pop(0)
857
858 dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
859 rsc,
860 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
861 RESOURCE_TYPE_RESOURCE,
862 )
863
864 if prefer_option == "prefers":
865 prefer = True
866 elif prefer_option == "avoids":
867 prefer = False
868 else:
869 raise CmdLineInputError()
870
871 skip_node_check = False
872 existing_nodes: list[str] = []
873 if modifiers.is_specified("-f") or modifiers.get("--force"):
874 skip_node_check = True
875 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
876 else:
877 lib_env = utils.get_lib_env()
878 existing_nodes, report_list = get_existing_nodes_names(
879 corosync_conf=lib_env.get_corosync_conf(),
880 cib=lib_env.get_cib(),
881 )
882 if report_list:
883 process_library_reports(report_list)
884
885 report_list = []
886 parameters_list = []
887 for nodeconf in argv:
888 nodeconf_a = nodeconf.split("=", 1)
889 node = nodeconf_a[0]
890 if not skip_node_check:
891 report_list += _verify_node_name(node, existing_nodes)
892 if len(nodeconf_a) == 1:
893 score = "INFINITY" if prefer else "-INFINITY"
894 else:
895 score = nodeconf_a[1]
896 _verify_score(score)
897 if not prefer:
898 score = score[1:] if score[0] == "-" else "-" + score
899
900 parameters_list.append(
901 [
902 sanitize_id(f"location-{rsc_value}-{node}-{score}"),
903 rsc,
904 node,
905 f"score={score}",
906 ]
907 )
908
909 if report_list:
910 process_library_reports(report_list)
911
912 modifiers = modifiers.get_subset("--force", "-f")
913
914 for parameters in parameters_list:
915 location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
916
917
918 def location_add( # noqa: PLR0912, PLR0915
919 lib: Any,
920 argv: parse_args.Argv,
921 modifiers: parse_args.InputModifiers,
922 skip_score_and_node_check: bool = False,
923 ) -> None:
924 """
925 Options:
926 * --force - allow unknown options, allow constraint for any resource type
927 * -f - CIB file
928 """
929 del lib
930 modifiers.ensure_only_supported("--force", "-f")
931 if len(argv) < 4:
932 raise CmdLineInputError()
933
934 constraint_id = argv.pop(0)
935 rsc_type, rsc_value = parse_args.parse_typed_arg(
936 argv.pop(0),
937 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
938 RESOURCE_TYPE_RESOURCE,
939 )
940 node = argv.pop(0)
941 score = None
942 if "=" not in argv[0]:
943 score = argv.pop(0)
944 # TODO added to pcs in the first 0.12.x version
945 deprecation_warning(STANDALONE_SCORE_MSG)
946 options = []
947 # For now we only allow setting resource-discovery and score
948 for arg in argv:
949 name, value = parse_args.split_option(arg, allow_empty_value=False)
950 if name == "score":
951 score = value
952 elif name == "resource-discovery":
953 if not modifiers.get("--force"):
954 allowed_discovery = list(
955 map(
956 str,
957 [
958 CibResourceDiscovery.ALWAYS,
959 CibResourceDiscovery.EXCLUSIVE,
960 CibResourceDiscovery.NEVER,
961 ],
962 )
963 )
964 if value not in allowed_discovery:
965 utils.err(
966 (
967 "invalid {0} value '{1}', allowed values are: {2}"
968 ", use --force to override"
969 ).format(name, value, format_list(allowed_discovery))
970 )
971 options.append([name, value])
972 elif modifiers.get("--force"):
973 options.append([name, value])
974 else:
975 utils.err("bad option '%s', use --force to override" % name)
976 if score is None:
977 score = "INFINITY"
978
979 # Verify that specified node exists in the cluster and score is valid
980 if not skip_score_and_node_check:
981 if modifiers.is_specified("-f") or modifiers.get("--force"):
982 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
983 else:
984 lib_env = utils.get_lib_env()
985 existing_nodes, report_list = get_existing_nodes_names(
986 corosync_conf=lib_env.get_corosync_conf(),
987 cib=lib_env.get_cib(),
988 )
989 report_list += _verify_node_name(node, existing_nodes)
990 if report_list:
991 process_library_reports(report_list)
992 _verify_score(score)
993
994 id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
995 if not id_valid:
996 utils.err(id_error)
997
998 dom = utils.get_cib_dom()
999
1000 if rsc_type == RESOURCE_TYPE_RESOURCE:
1001 (
1002 rsc_valid,
1003 rsc_error,
1004 dummy_correct_id,
1005 ) = utils.validate_constraint_resource(dom, rsc_value)
1006 if not rsc_valid:
1007 utils.err(rsc_error)
1008
1009 # Verify current constraint doesn't already exist
1010 # If it does we replace it with the new constraint
1011 dummy_dom, constraintsElement = getCurrentConstraints(dom)
1012 # If the id matches, or the rsc & node match, then we replace/remove
1013 elementsToRemove = [
1014 rsc_loc
1015 for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1016 # pylint: disable=too-many-boolean-expressions
1017 if rsc_loc.getAttribute("id") == constraint_id
1018 or (
1019 rsc_loc.getAttribute("node") == node
1020 and (
1021 (
1022 rsc_type == RESOURCE_TYPE_RESOURCE
1023 and rsc_loc.getAttribute("rsc") == rsc_value
1024 )
1025 or (
1026 rsc_type == RESOURCE_TYPE_REGEXP
1027 and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1028 )
1029 )
1030 )
1031 ]
1032 for etr in elementsToRemove:
1033 constraintsElement.removeChild(etr)
1034
1035 element = dom.createElement("rsc_location")
1036 element.setAttribute("id", constraint_id)
1037 if rsc_type == RESOURCE_TYPE_RESOURCE:
1038 element.setAttribute("rsc", rsc_value)
1039 elif rsc_type == RESOURCE_TYPE_REGEXP:
1040 element.setAttribute("rsc-pattern", rsc_value)
1041 element.setAttribute("node", node)
1042 element.setAttribute("score", score)
1043 for option in options:
1044 element.setAttribute(option[0], option[1])
1045 constraintsElement.appendChild(element)
1046
1047 utils.replace_cib_configuration(dom)
1048
1049
1050 # Grabs the current constraints and returns the dom and constraint element
1051 def getCurrentConstraints(passed_dom=None):
1052 """
1053 Commandline options:
1054 * -f - CIB file, only if passed_dom is None
1055 """
1056 if passed_dom:
1057 dom = passed_dom
1058 else:
1059 current_constraints_xml = utils.get_cib_xpath("//constraints")
1060 if current_constraints_xml == "":
1061 utils.err("unable to process cib")
1062 # Verify current constraint doesn't already exist
1063 # If it does we replace it with the new constraint
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1064 dom = parseString(current_constraints_xml)
1065
1066 constraintsElement = dom.getElementsByTagName("constraints")[0]
1067 return (dom, constraintsElement)
1068
1069
1070 # If returnStatus is set, then we don't error out, we just print the error
1071 # and return false
1072 def constraint_rm( # noqa: PLR0912
1073 lib,
1074 argv,
1075 modifiers,
1076 returnStatus=False,
1077 constraintsElement=None,
1078 passed_dom=None,
1079 ):
1080 """
1081 Options:
1082 * -f - CIB file, effective only if passed_dom is None
1083 """
1084 if passed_dom is None:
1085 modifiers.ensure_only_supported("-f")
1086 if not argv:
1087 raise CmdLineInputError()
1088
1089 bad_constraint = False
1090 if len(argv) != 1:
1091 for arg in argv:
1092 if not constraint_rm(
1093 lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1094 ):
1095 bad_constraint = True
1096 if bad_constraint:
1097 sys.exit(1)
1098 return None
1099
1100 c_id = argv.pop(0)
1101 elementFound = False
1102 dom = None
1103 use_cibadmin = False
1104 if not constraintsElement:
1105 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1106 use_cibadmin = True
1107
1108 for co in constraintsElement.childNodes[:]:
1109 if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1110 continue
1111 if co.getAttribute("id") == c_id:
1112 constraintsElement.removeChild(co)
1113 elementFound = True
1114
1115 if not elementFound:
1116 for rule in constraintsElement.getElementsByTagName("rule")[:]:
1117 if rule.getAttribute("id") == c_id:
1118 elementFound = True
1119 parent = rule.parentNode
1120 parent.removeChild(rule)
1121 if not parent.getElementsByTagName("rule"):
1122 parent.parentNode.removeChild(parent)
1123
1124 if elementFound:
1125 if passed_dom:
1126 return dom
1127 if use_cibadmin:
1128 utils.replace_cib_configuration(dom)
1129 if returnStatus:
1130 return True
1131 else:
1132 utils.err("Unable to find constraint - '%s'" % c_id, False)
1133 if returnStatus:
1134 return False
1135 sys.exit(1)
1136 return None
1137
1138
1139 def _split_set_constraints(
1140 constraints_dto: CibConstraintsDto,
1141 ) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1142 return (
1143 CibConstraintsDto(
1144 location=constraints_dto.location,
1145 colocation=constraints_dto.colocation,
1146 order=constraints_dto.order,
1147 ticket=constraints_dto.ticket,
1148 ),
1149 CibConstraintsDto(
1150 location_set=constraints_dto.location_set,
1151 colocation_set=constraints_dto.colocation_set,
1152 order_set=constraints_dto.order_set,
1153 ticket_set=constraints_dto.ticket_set,
1154 ),
1155 )
1156
1157
1158 def _find_constraints_containing_resource(
1159 resources_dto: CibResourcesDto,
1160 constraints_dto: CibConstraintsDto,
1161 resource_id: str,
1162 ) -> CibConstraintsDto:
1163 resources_filter = [resource_id]
1164 # Original implementation only included parent resource only if resource_id
1165 # was referring to a primitive resource, ignoring groups. This may change in
1166 # the future if necessary.
1167 if any(
1168 primitive_dto.id == resource_id
1169 for primitive_dto in resources_dto.primitives
1170 ):
1171 for clone_dto in resources_dto.clones:
1172 if clone_dto.member_id == resource_id:
1173 resources_filter.append(clone_dto.id)
1174 break
1175 return _filter_constraints_by_resources(
1176 constraints_dto, resources_filter, []
1177 )
1178
1179
1180 def ref(
1181 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1182 ) -> None:
1183 modifiers.ensure_only_supported("-f")
1184 if not argv:
1185 raise CmdLineInputError()
1186
1187 resources_dto = cast(
1188 CibResourcesDto, lib.resource.get_configured_resources()
1189 )
1190
1191 constraints_dto = cast(
1192 CibConstraintsDto,
1193 lib.constraint.get_config(evaluate_rules=False),
1194 )
1195
1196 for resource_id in sorted(set(argv)):
1197 constraint_ids = get_all_constraints_ids(
1198 _find_constraints_containing_resource(
1199 resources_dto, constraints_dto, resource_id
1200 )
1201 )
1202 print(f"Resource: {resource_id}")
1203 if constraint_ids:
1204 print(
1205 "\n".join(
1206 indent(
1207 sorted(constraint_ids),
1208 indent_step=INDENT_STEP,
1209 )
1210 )
1211 )
1212 else:
1213 print(" No Matches")
1214
1215
1216 def remove_constraints_containing(
1217 resource_id: str, output=False, constraints_element=None, passed_dom=None
1218 ):
1219 """
1220 Commandline options:
1221 * -f - CIB file, effective only if passed_dom is None
1222 """
1223 lib = utils.get_library_wrapper()
1224 modifiers = utils.get_input_modifiers()
1225 resources_dto = cast(
1226 CibResourcesDto, lib.resource.get_configured_resources()
1227 )
1228
1229 constraints_dto, set_constraints_dto = _split_set_constraints(
1230 cast(
1231 CibConstraintsDto,
1232 lib.constraint.get_config(evaluate_rules=False),
1233 )
1234 )
1235 constraints = sorted(
1236 get_all_constraints_ids(
1237 _find_constraints_containing_resource(
1238 resources_dto, constraints_dto, resource_id
1239 )
1240 )
1241 )
1242 set_constraints = sorted(
1243 get_all_constraints_ids(
1244 _find_constraints_containing_resource(
1245 resources_dto, set_constraints_dto, resource_id
1246 )
1247 )
1248 )
1249 for c in constraints:
1250 if output:
1251 print_to_stderr(f"Removing Constraint - {c}")
1252 if constraints_element is not None:
1253 constraint_rm(
1254 lib,
1255 [c],
1256 modifiers,
1257 True,
1258 constraints_element,
1259 passed_dom=passed_dom,
1260 )
1261 else:
1262 constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1263
1264 if set_constraints:
1265 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1266 for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1267 # If resource id is in a set, remove it from the set, if the set
1268 # is empty, then we remove the set, if the parent of the set
1269 # is empty then we remove it
1270 if set_c.getAttribute("id") == resource_id:
1271 parent_node = set_c.parentNode
1272 parent_node.removeChild(set_c)
1273 if output:
1274 print_to_stderr(
1275 "Removing {} from set {}".format(
1276 resource_id, parent_node.getAttribute("id")
1277 )
1278 )
1279 if parent_node.getElementsByTagName("resource_ref").length == 0:
1280 print_to_stderr(
1281 "Removing set {}".format(parent_node.getAttribute("id"))
1282 )
1283 parent_node_2 = parent_node.parentNode
1284 parent_node_2.removeChild(parent_node)
1285 if (
1286 parent_node_2.getElementsByTagName(
1287 "resource_set"
1288 ).length
1289 == 0
1290 ):
1291 parent_node_2.parentNode.removeChild(parent_node_2)
1292 print_to_stderr(
1293 "Removing constraint {}".format(
1294 parent_node_2.getAttribute("id")
1295 )
1296 )
1297 if passed_dom:
1298 return dom
1299 utils.replace_cib_configuration(dom)
1300 return None
1301
1302
1303 # Re-assign any constraints referencing a resource to its parent (a clone
1304 # or master)
1305 def constraint_resource_update(old_id, dom):
1306 """
1307 Commandline options: no options
1308 """
1309 new_id = None
1310 clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1311 if clone_ms_parent:
1312 new_id = clone_ms_parent.getAttribute("id")
1313
1314 if new_id:
1315 constraints = dom.getElementsByTagName("rsc_location")
1316 constraints += dom.getElementsByTagName("rsc_order")
1317 constraints += dom.getElementsByTagName("rsc_colocation")
1318 attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1319 for constraint in constraints:
1320 for attr in attrs_to_update:
1321 if constraint.getAttribute(attr) == old_id:
1322 constraint.setAttribute(attr, new_id)
1323 return dom
1324