1 import sys
2 import xml.dom.minidom
3 from enum import Enum
4 from typing import (
5 Any,
6 Iterable,
7 Optional,
8 Set,
9 TypeVar,
10 cast,
11 )
12 from xml.dom.minidom import parseString
13
14 import pcs.cli.constraint_order.command as order_command
15 from pcs import utils
16 from pcs.cli.common import parse_args
17 from pcs.cli.common.errors import (
18 CmdLineInputError,
19 raise_command_replaced,
20 )
21 from pcs.cli.common.output import (
22 INDENT_STEP,
23 lines_to_str,
24 )
25 from pcs.cli.constraint.location.command import (
26 RESOURCE_TYPE_REGEXP,
27 RESOURCE_TYPE_RESOURCE,
28 )
29 from pcs.cli.constraint.output import (
30 CibConstraintLocationAnyDto,
31 filter_constraints_by_rule_expired_status,
32 location,
33 print_config,
34 )
35 from pcs.cli.reports import process_library_reports
36 from pcs.cli.reports.output import (
37 deprecation_warning,
38 print_to_stderr,
39 warn,
40 )
41 from pcs.common import (
42 const,
43 pacemaker,
44 reports,
45 )
46 from pcs.common.pacemaker.constraint import (
47 CibConstraintColocationSetDto,
48 CibConstraintLocationSetDto,
49 CibConstraintOrderSetDto,
50 CibConstraintsDto,
51 CibConstraintTicketSetDto,
52 get_all_constraints_ids,
53 )
54 from pcs.common.pacemaker.resource.list import CibResourcesDto
55 from pcs.common.pacemaker.types import CibResourceDiscovery
56 from pcs.common.reports import ReportItem
57 from pcs.common.str_tools import (
58 format_list,
59 indent,
60 )
61 from pcs.common.types import (
62 StringCollection,
63 StringIterable,
64 StringSequence,
65 )
66 from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
67 from pcs.lib.node import get_existing_nodes_names
68 from pcs.lib.pacemaker.values import (
69 SCORE_INFINITY,
70 is_true,
71 sanitize_id,
72 )
73
74 # pylint: disable=invalid-name
75 # pylint: disable=too-many-branches
76 # pylint: disable=too-many-lines
77 # pylint: disable=too-many-locals
78 # pylint: disable=too-many-statements
79
80 DEFAULT_ACTION = const.PCMK_ACTION_START
81 DEFAULT_ROLE = const.PCMK_ROLE_STARTED
82
83 OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
84
85 LOCATION_NODE_VALIDATION_SKIP_MSG = (
86 "Validation for node existence in the cluster will be skipped"
87 )
88 STANDALONE_SCORE_MSG = (
89 "Specifying score as a standalone value is deprecated and "
90 "might be removed in a future release, use score=value instead"
91 )
92
93
94 class CrmRuleReturnCode(Enum):
95 IN_EFFECT = 0
96 EXPIRED = 110
97 TO_BE_IN_EFFECT = 111
98
99
100 def constraint_order_cmd(lib, argv, modifiers):
101 sub_cmd = "config" if not argv else argv.pop(0)
102
103 try:
104 if sub_cmd == "set":
105 order_command.create_with_set(lib, argv, modifiers)
106 elif sub_cmd in ["remove", "delete"]:
107 order_rm(lib, argv, modifiers)
108 elif sub_cmd == "show":
109 raise_command_replaced(
110 ["pcs constraint order config"], pcs_version="0.12"
111 )
112 elif sub_cmd == "config":
113 order_command.config_cmd(lib, argv, modifiers)
114 else:
115 order_start(lib, [sub_cmd] + argv, modifiers)
116 except CmdLineInputError as e:
117 utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
118
119
120 def config_cmd(
121 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
122 ) -> None:
123 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
124 if argv:
125 raise CmdLineInputError()
126
127 print_config(
128 cast(
129 CibConstraintsDto,
130 lib.constraint.get_config(evaluate_rules=True),
131 ),
132 modifiers,
133 )
134
135
136 def _validate_constraint_resource(cib_dom, resource_id):
137 (
138 resource_valid,
139 resource_error,
140 dummy_correct_id,
141 ) = utils.validate_constraint_resource(cib_dom, resource_id)
142 if not resource_valid:
143 utils.err(resource_error)
144
145
146 def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
147 if not utils.validate_resources_not_in_same_group(
148 cib_dom, resource1, resource2
149 ):
150 utils.err(
151 "Cannot create an order constraint for resources in the same group"
152 )
153
154
155 # Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
156 # possible commands:
157 # <src> with <tgt> [score] [options]
158 # <src> with <role> <tgt> [score] [options]
159 # <role> <src> with <tgt> [score] [options]
160 # <role> <src> with <role> <tgt> [score] [options]
161 # Specifying score as a single argument is deprecated, though. The correct way
162 # is score=value in options.
163 def colocation_add(lib, argv, modifiers): # noqa: PLR0912, PLR0915
164 """
165 Options:
166 * -f - CIB file
167 * --force - allow constraint on any resource, allow duplicate constraints
168 """
169
170 def _parse_score_options(argv):
171 # When passed an array of arguments if the first argument doesn't have
172 # an '=' then it's the score, otherwise they're all arguments. Return a
173 # tuple with the score and array of name,value pairs
174 """
175 Commandline options: no options
176 """
177 if not argv:
178 return None, []
179 score = None
180 if "=" not in argv[0]:
181 score = argv.pop(0)
182 # TODO added to pcs in the first 0.12.x version
183 deprecation_warning(STANDALONE_SCORE_MSG)
184
185 # create a list of 2-tuples (name, value)
186 arg_array = [
187 parse_args.split_option(arg, allow_empty_value=False)
188 for arg in argv
189 ]
190 return score, arg_array
191
192 def _validate_and_prepare_role(new_roles_supported, role):
193 if role is None:
194 return ""
195 role_cleaned = role.lower().capitalize()
196 if role_cleaned not in const.PCMK_ROLES:
197 utils.err(
198 "invalid role value '{0}', allowed values are: {1}".format(
199 role, format_list(const.PCMK_ROLES)
200 )
201 )
202 return pacemaker.role.get_value_for_cib(
203 role_cleaned, new_roles_supported
204 )
205
206 del lib
207 modifiers.ensure_only_supported("-f", "--force")
208 if len(argv) < 3:
209 raise CmdLineInputError()
210
211 role1_candidate = None
212 role2_candidate = None
213
214 if argv[2] == "with":
215 role1_candidate = argv.pop(0)
216 resource1 = argv.pop(0)
217 elif argv[1] == "with":
218 resource1 = argv.pop(0)
219 else:
220 raise CmdLineInputError()
221
222 if argv.pop(0) != "with":
223 raise CmdLineInputError()
224 if "with" in argv:
225 raise CmdLineInputError(
226 message="Multiple 'with's cannot be specified.",
227 hint=(
228 "Use the 'pcs constraint colocation set' command if you want "
229 "to create a constraint for more than two resources."
230 ),
231 show_both_usage_and_message=True,
232 )
233
234 if not argv:
235 raise CmdLineInputError()
236 if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
237 resource2 = argv.pop(0)
238 else:
239 role2_candidate = argv.pop(0)
240 resource2 = argv.pop(0)
241
242 score, nv_pairs = _parse_score_options(argv)
243 influence_attr_set = any(name == "influence" for name, _ in nv_pairs)
244
245 cib_dom = (
246 utils.cluster_upgrade_to_version(
247 const.PCMK_COLOCATION_INFLUENCE_CIB_VERSION
248 )
249 if influence_attr_set
250 else utils.get_cib_dom()
251 )
252 new_roles_supported = utils.isCibVersionSatisfied(
253 cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
254 )
255
256 role1 = _validate_and_prepare_role(new_roles_supported, role1_candidate)
257 role2 = _validate_and_prepare_role(new_roles_supported, role2_candidate)
258 _validate_constraint_resource(cib_dom, resource1)
259 _validate_constraint_resource(cib_dom, resource2)
260
261 id_in_nvpairs = None
262 for name, value in nv_pairs:
263 if name == "id":
264 id_valid, id_error = utils.validate_xml_id(value, "constraint id")
265 if not id_valid:
266 utils.err(id_error)
267 if utils.does_id_exist(cib_dom, value):
268 utils.err(
269 "id '%s' is already in use, please specify another one"
270 % value
271 )
272 id_in_nvpairs = True
273 elif name == "score":
274 score = value
275 if score is None:
276 score = SCORE_INFINITY
277 if not id_in_nvpairs:
278 nv_pairs.append(
279 (
280 "id",
281 utils.find_unique_id(
282 cib_dom,
283 "colocation-%s-%s-%s" % (resource1, resource2, score),
284 ),
285 )
286 )
287
288 (dom, constraintsElement) = getCurrentConstraints(cib_dom)
289
290 # If one role is specified, the other should default to "started"
291 if role1 != "" and role2 == "":
292 role2 = DEFAULT_ROLE
293 if role2 != "" and role1 == "":
294 role1 = DEFAULT_ROLE
295 element = dom.createElement("rsc_colocation")
296 element.setAttribute("rsc", resource1)
297 element.setAttribute("with-rsc", resource2)
298 element.setAttribute("score", score)
299 if role1 != "":
300 element.setAttribute("rsc-role", role1)
301 if role2 != "":
302 element.setAttribute("with-rsc-role", role2)
303 for nv_pair in nv_pairs:
304 element.setAttribute(nv_pair[0], nv_pair[1])
305 if not modifiers.get("--force"):
306
307 def _constraint_export(constraint_info):
308 options_dict = constraint_info["options"]
309 co_resource1 = options_dict.get("rsc", "")
310 co_resource2 = options_dict.get("with-rsc", "")
311 co_id = options_dict.get("id", "")
312 co_score = options_dict.get("score", "")
313 score_text = "(score:" + co_score + ")"
314 console_option_list = [
315 f"({option[0]}:{option[1]})"
316 for option in sorted(options_dict.items())
317 if option[0] not in ("rsc", "with-rsc", "id", "score")
318 ]
319 console_option_list.append(f"(id:{co_id})")
320 return " ".join(
321 [co_resource1, "with", co_resource2, score_text]
322 + console_option_list
323 )
324
325 duplicates = colocation_find_duplicates(constraintsElement, element)
326 if duplicates:
327 utils.err(
328 "duplicate constraint already exists, use --force to override\n"
329 + "\n".join(
330 [
331 " "
332 + _constraint_export(
333 {"options": dict(dup.attributes.items())}
334 )
335 for dup in duplicates
336 ]
337 )
338 )
339 constraintsElement.appendChild(element)
340 utils.replace_cib_configuration(dom)
341
342
343 def colocation_find_duplicates(dom, constraint_el):
344 """
345 Commandline options: no options
346 """
347 new_roles_supported = utils.isCibVersionSatisfied(
348 dom, const.PCMK_NEW_ROLES_CIB_VERSION
349 )
350
351 def normalize(const_el):
352 return (
353 const_el.getAttribute("rsc"),
354 const_el.getAttribute("with-rsc"),
355 pacemaker.role.get_value_for_cib(
356 const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
357 new_roles_supported,
358 ),
359 pacemaker.role.get_value_for_cib(
360 const_el.getAttribute("with-rsc-role").capitalize()
361 or DEFAULT_ROLE,
362 new_roles_supported,
363 ),
364 )
365
366 normalized_el = normalize(constraint_el)
367 return [
368 other_el
369 for other_el in dom.getElementsByTagName("rsc_colocation")
370 if not other_el.getElementsByTagName("resource_set")
371 and constraint_el is not other_el
372 and normalized_el == normalize(other_el)
373 ]
374
375
376 def order_rm(lib, argv, modifiers):
377 """
378 Options:
379 * -f - CIB file
380 """
381 del lib
382 modifiers.ensure_only_supported("-f")
383 if not argv:
384 raise CmdLineInputError()
385
386 elementFound = False
387 (dom, constraintsElement) = getCurrentConstraints()
388
389 for resource in argv:
390 for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
391 if (
392 ord_loc.getAttribute("first") == resource
393 or ord_loc.getAttribute("then") == resource
394 ):
395 constraintsElement.removeChild(ord_loc)
396 elementFound = True
397
398 resource_refs_to_remove = []
399 for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
400 if ord_set.getAttribute("id") == resource:
401 resource_refs_to_remove.append(ord_set)
402 elementFound = True
403
404 for res_ref in resource_refs_to_remove:
405 res_set = res_ref.parentNode
406 res_order = res_set.parentNode
407
408 res_ref.parentNode.removeChild(res_ref)
409 if not res_set.getElementsByTagName("resource_ref"):
410 res_set.parentNode.removeChild(res_set)
411 if not res_order.getElementsByTagName("resource_set"):
412 res_order.parentNode.removeChild(res_order)
413
414 if elementFound:
415 utils.replace_cib_configuration(dom)
416 else:
417 utils.err("No matching resources found in ordering list")
418
419
420 def order_start(lib, argv, modifiers):
421 """
422 Options:
423 * -f - CIB file
424 * --force - allow constraint for any resource, allow duplicate constraints
425 """
426 del lib
427 modifiers.ensure_only_supported("-f", "--force")
428 if len(argv) < 3:
429 raise CmdLineInputError()
430
431 first_action = DEFAULT_ACTION
432 then_action = DEFAULT_ACTION
433 action = argv[0]
434 if action in const.PCMK_ACTIONS:
435 first_action = action
436 argv.pop(0)
437
438 resource1 = argv.pop(0)
439 if argv.pop(0) != "then":
440 raise CmdLineInputError()
441
442 if not argv:
443 raise CmdLineInputError()
444
445 action = argv[0]
446 if action in const.PCMK_ACTIONS:
447 then_action = action
448 argv.pop(0)
449
450 if not argv:
451 raise CmdLineInputError()
452 resource2 = argv.pop(0)
453
454 order_options = []
455 if argv:
456 order_options = order_options + argv[:]
457 if "then" in order_options:
458 raise CmdLineInputError(
459 message="Multiple 'then's cannot be specified.",
460 hint=(
461 "Use the 'pcs constraint order set' command if you want to "
462 "create a constraint for more than two resources."
463 ),
464 show_both_usage_and_message=True,
465 )
466
467 order_options.append("first-action=" + first_action)
468 order_options.append("then-action=" + then_action)
469 _order_add(resource1, resource2, order_options, modifiers)
470
471
472 def _order_add(resource1, resource2, options_list, modifiers): # noqa: PLR0912, PLR0915
473 """
474 Commandline options:
475 * -f - CIB file
476 * --force - allow constraint for any resource, allow duplicate constraints
477 """
478 cib_dom = utils.get_cib_dom()
479 _validate_constraint_resource(cib_dom, resource1)
480 _validate_constraint_resource(cib_dom, resource2)
481
482 _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
483
484 order_options = []
485 id_specified = False
486 sym = None
487 for arg in options_list:
488 if arg == "symmetrical":
489 sym = "true"
490 elif arg == "nonsymmetrical":
491 sym = "false"
492 else:
493 name, value = parse_args.split_option(arg, allow_empty_value=False)
494 if name == "id":
495 id_valid, id_error = utils.validate_xml_id(
496 value, "constraint id"
497 )
498 if not id_valid:
499 utils.err(id_error)
500 if utils.does_id_exist(cib_dom, value):
501 utils.err(
502 "id '%s' is already in use, please specify another one"
503 % value
504 )
505 id_specified = True
506 order_options.append((name, value))
507 elif name == "symmetrical":
508 if value.lower() in OPTIONS_SYMMETRICAL:
509 sym = value.lower()
510 else:
511 utils.err(
512 "invalid symmetrical value '%s', allowed values are: %s"
513 % (value, ", ".join(OPTIONS_SYMMETRICAL))
514 )
515 else:
516 order_options.append((name, value))
517 if sym:
518 order_options.append(("symmetrical", sym))
519
520 options = ""
521 if order_options:
522 options = " (Options: %s)" % " ".join(
523 [
524 "%s=%s" % (name, value)
525 for name, value in order_options
526 if name not in ("kind", "score")
527 ]
528 )
529
530 scorekind = "kind: Mandatory"
531 id_suffix = "mandatory"
532 for opt in order_options:
533 if opt[0] == "score":
534 scorekind = "score: " + opt[1]
535 id_suffix = opt[1]
536 # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
537 # added to pcs after 0.11.7
538 deprecation_warning(
539 reports.messages.DeprecatedOption(opt[0], []).message
540 )
541 break
542 if opt[0] == "kind":
543 scorekind = "kind: " + opt[1]
544 id_suffix = opt[1]
545 break
546
547 if not id_specified:
548 order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
549 order_id = utils.find_unique_id(cib_dom, order_id)
550 order_options.append(("id", order_id))
551
552 (dom, constraintsElement) = getCurrentConstraints()
553 element = dom.createElement("rsc_order")
554 element.setAttribute("first", resource1)
555 element.setAttribute("then", resource2)
556 for order_opt in order_options:
557 element.setAttribute(order_opt[0], order_opt[1])
558 constraintsElement.appendChild(element)
559 if not modifiers.get("--force"):
560
561 def _constraint_export(constraint_info):
562 options = constraint_info["options"]
563 oc_resource1 = options.get("first", "")
564 oc_resource2 = options.get("then", "")
565 first_action = options.get("first-action", "")
566 then_action = options.get("then-action", "")
567 oc_id = options.get("id", "")
568 oc_score = options.get("score", "")
569 oc_kind = options.get("kind", "")
570 oc_sym = ""
571 oc_id_out = ""
572 oc_options = ""
573 if "symmetrical" in options and not is_true(
574 options.get("symmetrical", "false")
575 ):
576 oc_sym = "(non-symmetrical)"
577 if oc_kind != "":
578 score_text = "(kind:" + oc_kind + ")"
579 elif oc_kind == "" and oc_score == "":
580 score_text = "(kind:Mandatory)"
581 else:
582 score_text = "(score:" + oc_score + ")"
583 oc_id_out = "(id:" + oc_id + ")"
584 already_processed_options = (
585 "first",
586 "then",
587 "first-action",
588 "then-action",
589 "id",
590 "score",
591 "kind",
592 "symmetrical",
593 )
594 oc_options = " ".join(
595 [
596 f"{name}={value}"
597 for name, value in options.items()
598 if name not in already_processed_options
599 ]
600 )
601 if oc_options:
602 oc_options = "(Options: " + oc_options + ")"
603 return " ".join(
604 [
605 arg
606 for arg in [
607 first_action,
608 oc_resource1,
609 "then",
610 then_action,
611 oc_resource2,
612 score_text,
613 oc_sym,
614 oc_options,
615 oc_id_out,
616 ]
617 if arg
618 ]
619 )
620
621 duplicates = order_find_duplicates(constraintsElement, element)
622 if duplicates:
623 utils.err(
624 "duplicate constraint already exists, use --force to override\n"
625 + "\n".join(
626 [
627 " "
628 + _constraint_export(
629 {"options": dict(dup.attributes.items())}
630 )
631 for dup in duplicates
632 ]
633 )
634 )
635 print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
636 utils.replace_cib_configuration(dom)
637
638
639 def order_find_duplicates(dom, constraint_el):
640 """
641 Commandline options: no options
642 """
643
644 def normalize(constraint_el):
645 return (
646 constraint_el.getAttribute("first"),
647 constraint_el.getAttribute("then"),
648 constraint_el.getAttribute("first-action").lower()
649 or DEFAULT_ACTION,
650 constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
651 )
652
653 normalized_el = normalize(constraint_el)
654 return [
655 other_el
656 for other_el in dom.getElementsByTagName("rsc_order")
657 if not other_el.getElementsByTagName("resource_set")
658 and constraint_el is not other_el
659 and normalized_el == normalize(other_el)
660 ]
661
662
663 _SetConstraint = TypeVar(
664 "_SetConstraint",
665 CibConstraintLocationSetDto,
666 CibConstraintColocationSetDto,
667 CibConstraintOrderSetDto,
668 CibConstraintTicketSetDto,
669 )
670
671
672 def _filter_set_constraints_by_resources(
673 constraints_dto: Iterable[_SetConstraint], resources: Set[str]
674 ) -> list[_SetConstraint]:
675 return [
676 constraint_set_dto
677 for constraint_set_dto in constraints_dto
678 if any(
679 set(resource_set.resources_ids) & resources
680 for resource_set in constraint_set_dto.resource_sets
681 )
682 ]
683
684
685 def _filter_constraints_by_resources(
686 constraints_dto: CibConstraintsDto,
687 resources: StringIterable,
688 patterns: StringIterable,
689 ) -> CibConstraintsDto:
690 required_resources_set = set(resources)
691 required_patterns_set = set(patterns)
692 return CibConstraintsDto(
693 location=[
694 constraint_dto
695 for constraint_dto in constraints_dto.location
696 if (
697 constraint_dto.resource_id is not None
698 and constraint_dto.resource_id in required_resources_set
699 )
700 or (
701 constraint_dto.resource_pattern is not None
702 and constraint_dto.resource_pattern in required_patterns_set
703 )
704 ],
705 location_set=_filter_set_constraints_by_resources(
706 constraints_dto.location_set, required_resources_set
707 ),
708 colocation=[
709 constraint_dto
710 for constraint_dto in constraints_dto.colocation
711 if {constraint_dto.resource_id, constraint_dto.with_resource_id}
712 & required_resources_set
713 ],
714 colocation_set=_filter_set_constraints_by_resources(
715 constraints_dto.colocation_set, required_resources_set
716 ),
717 order=[
718 constraint_dto
719 for constraint_dto in constraints_dto.order
720 if {
721 constraint_dto.first_resource_id,
722 constraint_dto.then_resource_id,
723 }
724 & required_resources_set
725 ],
726 order_set=_filter_set_constraints_by_resources(
727 constraints_dto.order_set, required_resources_set
728 ),
729 ticket=[
730 constraint_dto
731 for constraint_dto in constraints_dto.ticket
732 if constraint_dto.resource_id in required_resources_set
733 ],
734 ticket_set=_filter_set_constraints_by_resources(
735 constraints_dto.ticket_set, required_resources_set
736 ),
737 )
738
739
740 def _filter_location_by_node_base(
741 constraint_dtos: Iterable[CibConstraintLocationAnyDto],
742 nodes: StringCollection,
743 ) -> list[CibConstraintLocationAnyDto]:
744 return [
745 constraint_dto
746 for constraint_dto in constraint_dtos
747 if constraint_dto.attributes.node is not None
748 and constraint_dto.attributes.node in nodes
749 ]
750
751
752 def location_config_cmd(
753 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
754 ) -> None:
755 """
756 Options:
757 * --all - print expired constraints
758 * --full - print all details
759 * -f - CIB file
760 """
761 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
762 filter_type: Optional[str] = None
763 filter_items: parse_args.Argv = []
764 if argv:
765 filter_type, *filter_items = argv
766 allowed_types = ("resources", "nodes")
767 if filter_type not in allowed_types:
768 raise CmdLineInputError(
769 f"Unknown keyword '{filter_type}'. Allowed keywords: "
770 f"{format_list(allowed_types)}"
771 )
772 if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
773 raise CmdLineInputError(
774 "Output formats other than 'text' are not supported together "
775 "with grouping and filtering by nodes or resources"
776 )
777
778 constraints_dto = filter_constraints_by_rule_expired_status(
779 lib.constraint.get_config(evaluate_rules=True),
780 modifiers.is_specified("--all"),
781 )
782
783 constraints_dto = CibConstraintsDto(
784 location=constraints_dto.location,
785 location_set=constraints_dto.location_set,
786 )
787
788 def _print_lines(lines: StringSequence) -> None:
789 if lines:
790 print("Location Constraints:")
791 print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
792
793 if filter_type == "resources":
794 if filter_items:
795 resources = []
796 patterns = []
797 for item in filter_items:
798 item_type, item_value = parse_args.parse_typed_arg(
799 item,
800 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
801 RESOURCE_TYPE_RESOURCE,
802 )
803 if item_type == RESOURCE_TYPE_RESOURCE:
804 resources.append(item_value)
805 elif item_type == RESOURCE_TYPE_REGEXP:
806 patterns.append(item_value)
807 constraints_dto = _filter_constraints_by_resources(
808 constraints_dto, resources, patterns
809 )
810 _print_lines(
811 location.constraints_to_grouped_by_resource_text(
812 constraints_dto.location,
813 modifiers.is_specified("--full"),
814 )
815 )
816 return
817 if filter_type == "nodes":
818 if filter_items:
819 constraints_dto = CibConstraintsDto(
820 location=_filter_location_by_node_base(
821 constraints_dto.location, filter_items
822 ),
823 location_set=_filter_location_by_node_base(
824 constraints_dto.location_set, filter_items
825 ),
826 )
827 _print_lines(
828 location.constraints_to_grouped_by_node_text(
829 constraints_dto.location,
830 modifiers.is_specified("--full"),
831 )
832 )
833 return
834
835 print_config(constraints_dto, modifiers)
836
837
838 def _verify_node_name(node, existing_nodes):
839 report_list = []
840 if node not in existing_nodes:
841 report_list.append(
842 ReportItem.error(
843 reports.messages.NodeNotFound(node),
844 force_code=reports.codes.FORCE,
845 )
846 )
847 return report_list
848
849
850 def _verify_score(score):
851 if not utils.is_score(score):
852 utils.err(
853 "invalid score '%s', use integer or INFINITY or -INFINITY" % score
854 )
855
856
857 def location_prefer( # noqa: PLR0912
858 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
859 ) -> None:
860 """
861 Options:
862 * --force - allow unknown options, allow constraint for any resource type
863 * -f - CIB file
864 """
865 modifiers.ensure_only_supported("--force", "-f")
866 rsc = argv.pop(0)
867 prefer_option = argv.pop(0)
868
869 dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
870 rsc,
871 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
872 RESOURCE_TYPE_RESOURCE,
873 )
874
875 if prefer_option == "prefers":
876 prefer = True
877 elif prefer_option == "avoids":
878 prefer = False
879 else:
880 raise CmdLineInputError()
881
882 skip_node_check = False
883 existing_nodes: list[str] = []
884 if modifiers.is_specified("-f") or modifiers.get("--force"):
885 skip_node_check = True
886 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
887 else:
888 lib_env = utils.get_lib_env()
889 existing_nodes, report_list = get_existing_nodes_names(
890 corosync_conf=lib_env.get_corosync_conf(),
891 cib=lib_env.get_cib(),
892 )
893 if report_list:
894 process_library_reports(report_list)
895
896 report_list = []
897 parameters_list = []
898 for nodeconf in argv:
899 nodeconf_a = nodeconf.split("=", 1)
900 node = nodeconf_a[0]
901 if not skip_node_check:
902 report_list += _verify_node_name(node, existing_nodes)
903 if len(nodeconf_a) == 1:
904 score = "INFINITY" if prefer else "-INFINITY"
905 else:
906 score = nodeconf_a[1]
907 _verify_score(score)
908 if not prefer:
909 score = score[1:] if score[0] == "-" else "-" + score
910
911 parameters_list.append(
912 [
913 sanitize_id(f"location-{rsc_value}-{node}-{score}"),
914 rsc,
915 node,
916 f"score={score}",
917 ]
918 )
919
920 if report_list:
921 process_library_reports(report_list)
922
923 modifiers = modifiers.get_subset("--force", "-f")
924
925 for parameters in parameters_list:
926 location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
927
928
929 def location_add( # noqa: PLR0912, PLR0915
930 lib: Any,
931 argv: parse_args.Argv,
932 modifiers: parse_args.InputModifiers,
933 skip_score_and_node_check: bool = False,
934 ) -> None:
935 """
936 Options:
937 * --force - allow unknown options, allow constraint for any resource type
938 * -f - CIB file
939 """
940 del lib
941 modifiers.ensure_only_supported("--force", "-f")
942 if len(argv) < 4:
943 raise CmdLineInputError()
944
945 constraint_id = argv.pop(0)
946 rsc_type, rsc_value = parse_args.parse_typed_arg(
947 argv.pop(0),
948 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
949 RESOURCE_TYPE_RESOURCE,
950 )
951 node = argv.pop(0)
952 score = None
953 if "=" not in argv[0]:
954 score = argv.pop(0)
955 # TODO added to pcs in the first 0.12.x version
956 deprecation_warning(STANDALONE_SCORE_MSG)
957 options = []
958 # For now we only allow setting resource-discovery and score
959 for arg in argv:
960 name, value = parse_args.split_option(arg, allow_empty_value=False)
961 if name == "score":
962 score = value
963 elif name == "resource-discovery":
964 if not modifiers.get("--force"):
965 allowed_discovery = list(
966 map(
967 str,
968 [
969 CibResourceDiscovery.ALWAYS,
970 CibResourceDiscovery.EXCLUSIVE,
971 CibResourceDiscovery.NEVER,
972 ],
973 )
974 )
975 if value not in allowed_discovery:
976 utils.err(
977 (
978 "invalid {0} value '{1}', allowed values are: {2}"
979 ", use --force to override"
980 ).format(name, value, format_list(allowed_discovery))
981 )
982 options.append([name, value])
983 elif modifiers.get("--force"):
984 options.append([name, value])
985 else:
986 utils.err("bad option '%s', use --force to override" % name)
987 if score is None:
988 score = "INFINITY"
989
990 # Verify that specified node exists in the cluster and score is valid
991 if not skip_score_and_node_check:
992 if modifiers.is_specified("-f") or modifiers.get("--force"):
993 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
994 else:
995 lib_env = utils.get_lib_env()
996 existing_nodes, report_list = get_existing_nodes_names(
997 corosync_conf=lib_env.get_corosync_conf(),
998 cib=lib_env.get_cib(),
999 )
1000 report_list += _verify_node_name(node, existing_nodes)
1001 if report_list:
1002 process_library_reports(report_list)
1003 _verify_score(score)
1004
1005 id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
1006 if not id_valid:
1007 utils.err(id_error)
1008
1009 dom = utils.get_cib_dom()
1010
1011 if rsc_type == RESOURCE_TYPE_RESOURCE:
1012 (
1013 rsc_valid,
1014 rsc_error,
1015 dummy_correct_id,
1016 ) = utils.validate_constraint_resource(dom, rsc_value)
1017 if not rsc_valid:
1018 utils.err(rsc_error)
1019
1020 # Verify current constraint doesn't already exist
1021 # If it does we replace it with the new constraint
1022 dummy_dom, constraintsElement = getCurrentConstraints(dom)
1023 # If the id matches, or the rsc & node match, then we replace/remove
1024 elementsToRemove = [
1025 rsc_loc
1026 for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1027 # pylint: disable=too-many-boolean-expressions
1028 if rsc_loc.getAttribute("id") == constraint_id
1029 or (
1030 rsc_loc.getAttribute("node") == node
1031 and (
1032 (
1033 rsc_type == RESOURCE_TYPE_RESOURCE
1034 and rsc_loc.getAttribute("rsc") == rsc_value
1035 )
1036 or (
1037 rsc_type == RESOURCE_TYPE_REGEXP
1038 and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1039 )
1040 )
1041 )
1042 ]
1043 for etr in elementsToRemove:
1044 constraintsElement.removeChild(etr)
1045
1046 element = dom.createElement("rsc_location")
1047 element.setAttribute("id", constraint_id)
1048 if rsc_type == RESOURCE_TYPE_RESOURCE:
1049 element.setAttribute("rsc", rsc_value)
1050 elif rsc_type == RESOURCE_TYPE_REGEXP:
1051 element.setAttribute("rsc-pattern", rsc_value)
1052 element.setAttribute("node", node)
1053 element.setAttribute("score", score)
1054 for option in options:
1055 element.setAttribute(option[0], option[1])
1056 constraintsElement.appendChild(element)
1057
1058 utils.replace_cib_configuration(dom)
1059
1060
1061 # Grabs the current constraints and returns the dom and constraint element
1062 def getCurrentConstraints(passed_dom=None):
1063 """
1064 Commandline options:
1065 * -f - CIB file, only if passed_dom is None
1066 """
1067 if passed_dom:
1068 dom = passed_dom
1069 else:
1070 current_constraints_xml = utils.get_cib_xpath("//constraints")
1071 if current_constraints_xml == "":
1072 utils.err("unable to process cib")
1073 # Verify current constraint doesn't already exist
1074 # If it does we replace it with the new constraint
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1075 dom = parseString(current_constraints_xml)
1076
1077 constraintsElement = dom.getElementsByTagName("constraints")[0]
1078 return (dom, constraintsElement)
1079
1080
1081 # If returnStatus is set, then we don't error out, we just print the error
1082 # and return false
1083 def constraint_rm( # noqa: PLR0912
1084 lib,
1085 argv,
1086 modifiers,
1087 returnStatus=False,
1088 constraintsElement=None,
1089 passed_dom=None,
1090 ):
1091 """
1092 Options:
1093 * -f - CIB file, effective only if passed_dom is None
1094 """
1095 if passed_dom is None:
1096 modifiers.ensure_only_supported("-f")
1097 if not argv:
1098 raise CmdLineInputError()
1099
1100 bad_constraint = False
1101 if len(argv) != 1:
1102 for arg in argv:
1103 if not constraint_rm(
1104 lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1105 ):
1106 bad_constraint = True
1107 if bad_constraint:
1108 sys.exit(1)
1109 return None
1110
1111 c_id = argv.pop(0)
1112 elementFound = False
1113 dom = None
1114 use_cibadmin = False
1115 if not constraintsElement:
1116 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1117 use_cibadmin = True
1118
1119 for co in constraintsElement.childNodes[:]:
1120 if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1121 continue
1122 if co.getAttribute("id") == c_id:
1123 constraintsElement.removeChild(co)
1124 elementFound = True
1125
1126 if not elementFound:
1127 for rule in constraintsElement.getElementsByTagName("rule")[:]:
1128 if rule.getAttribute("id") == c_id:
1129 elementFound = True
1130 parent = rule.parentNode
1131 parent.removeChild(rule)
1132 if not parent.getElementsByTagName("rule"):
1133 parent.parentNode.removeChild(parent)
1134
1135 if elementFound:
1136 if passed_dom:
1137 return dom
1138 if use_cibadmin:
1139 utils.replace_cib_configuration(dom)
1140 if returnStatus:
1141 return True
1142 else:
1143 utils.err("Unable to find constraint - '%s'" % c_id, False)
1144 if returnStatus:
1145 return False
1146 sys.exit(1)
1147 return None
1148
1149
1150 def _split_set_constraints(
1151 constraints_dto: CibConstraintsDto,
1152 ) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1153 return (
1154 CibConstraintsDto(
1155 location=constraints_dto.location,
1156 colocation=constraints_dto.colocation,
1157 order=constraints_dto.order,
1158 ticket=constraints_dto.ticket,
1159 ),
1160 CibConstraintsDto(
1161 location_set=constraints_dto.location_set,
1162 colocation_set=constraints_dto.colocation_set,
1163 order_set=constraints_dto.order_set,
1164 ticket_set=constraints_dto.ticket_set,
1165 ),
1166 )
1167
1168
1169 def _find_constraints_containing_resource(
1170 resources_dto: CibResourcesDto,
1171 constraints_dto: CibConstraintsDto,
1172 resource_id: str,
1173 ) -> CibConstraintsDto:
1174 resources_filter = [resource_id]
1175 # Original implementation only included parent resource only if resource_id
1176 # was referring to a primitive resource, ignoring groups. This may change in
1177 # the future if necessary.
1178 if any(
1179 primitive_dto.id == resource_id
1180 for primitive_dto in resources_dto.primitives
1181 ):
1182 for clone_dto in resources_dto.clones:
1183 if clone_dto.member_id == resource_id:
1184 resources_filter.append(clone_dto.id)
1185 break
1186 return _filter_constraints_by_resources(
1187 constraints_dto, resources_filter, []
1188 )
1189
1190
1191 def ref(
1192 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1193 ) -> None:
1194 modifiers.ensure_only_supported("-f")
1195 if not argv:
1196 raise CmdLineInputError()
1197
1198 resources_dto = cast(
1199 CibResourcesDto, lib.resource.get_configured_resources()
1200 )
1201
1202 constraints_dto = cast(
1203 CibConstraintsDto,
1204 lib.constraint.get_config(evaluate_rules=False),
1205 )
1206
1207 for resource_id in sorted(set(argv)):
1208 constraint_ids = get_all_constraints_ids(
1209 _find_constraints_containing_resource(
1210 resources_dto, constraints_dto, resource_id
1211 )
1212 )
1213 print(f"Resource: {resource_id}")
1214 if constraint_ids:
1215 print(
1216 "\n".join(
1217 indent(
1218 sorted(constraint_ids),
1219 indent_step=INDENT_STEP,
1220 )
1221 )
1222 )
1223 else:
1224 print(" No Matches")
1225
1226
1227 def remove_constraints_containing(
1228 resource_id: str, output=False, constraints_element=None, passed_dom=None
1229 ):
1230 """
1231 Commandline options:
1232 * -f - CIB file, effective only if passed_dom is None
1233 """
1234 lib = utils.get_library_wrapper()
1235 modifiers = utils.get_input_modifiers()
1236 resources_dto = cast(
1237 CibResourcesDto, lib.resource.get_configured_resources()
1238 )
1239
1240 constraints_dto, set_constraints_dto = _split_set_constraints(
1241 cast(
1242 CibConstraintsDto,
1243 lib.constraint.get_config(evaluate_rules=False),
1244 )
1245 )
1246 constraints = sorted(
1247 get_all_constraints_ids(
1248 _find_constraints_containing_resource(
1249 resources_dto, constraints_dto, resource_id
1250 )
1251 )
1252 )
1253 set_constraints = sorted(
1254 get_all_constraints_ids(
1255 _find_constraints_containing_resource(
1256 resources_dto, set_constraints_dto, resource_id
1257 )
1258 )
1259 )
1260 for c in constraints:
1261 if output:
1262 print_to_stderr(f"Removing Constraint - {c}")
1263 if constraints_element is not None:
1264 constraint_rm(
1265 lib,
1266 [c],
1267 modifiers,
1268 True,
1269 constraints_element,
1270 passed_dom=passed_dom,
1271 )
1272 else:
1273 constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1274
1275 if set_constraints:
1276 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1277 for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1278 # If resource id is in a set, remove it from the set, if the set
1279 # is empty, then we remove the set, if the parent of the set
1280 # is empty then we remove it
1281 if set_c.getAttribute("id") == resource_id:
1282 parent_node = set_c.parentNode
1283 parent_node.removeChild(set_c)
1284 if output:
1285 print_to_stderr(
1286 "Removing {} from set {}".format(
1287 resource_id, parent_node.getAttribute("id")
1288 )
1289 )
1290 if parent_node.getElementsByTagName("resource_ref").length == 0:
1291 print_to_stderr(
1292 "Removing set {}".format(parent_node.getAttribute("id"))
1293 )
1294 parent_node_2 = parent_node.parentNode
1295 parent_node_2.removeChild(parent_node)
1296 if (
1297 parent_node_2.getElementsByTagName(
1298 "resource_set"
1299 ).length
1300 == 0
1301 ):
1302 parent_node_2.parentNode.removeChild(parent_node_2)
1303 print_to_stderr(
1304 "Removing constraint {}".format(
1305 parent_node_2.getAttribute("id")
1306 )
1307 )
1308 if passed_dom:
1309 return dom
1310 utils.replace_cib_configuration(dom)
1311 return None
1312
1313
1314 # Re-assign any constraints referencing a resource to its parent (a clone
1315 # or master)
1316 def constraint_resource_update(old_id, dom):
1317 """
1318 Commandline options: no options
1319 """
1320 new_id = None
1321 clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1322 if clone_ms_parent:
1323 new_id = clone_ms_parent.getAttribute("id")
1324
1325 if new_id:
1326 constraints = dom.getElementsByTagName("rsc_location")
1327 constraints += dom.getElementsByTagName("rsc_order")
1328 constraints += dom.getElementsByTagName("rsc_colocation")
1329 attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1330 for constraint in constraints:
1331 for attr in attrs_to_update:
1332 if constraint.getAttribute(attr) == old_id:
1333 constraint.setAttribute(attr, new_id)
1334 return dom
1335