1 import sys
2 import xml.dom.minidom
3 from enum import Enum
4 from typing import (
5 Any,
6 Iterable,
7 Optional,
8 Set,
9 TypeVar,
10 cast,
11 )
12 from xml.dom.minidom import parseString
13
14 import pcs.cli.constraint_order.command as order_command
15 from pcs import rule as rule_utils
16 from pcs import utils
17 from pcs.cli.common import parse_args
18 from pcs.cli.common.errors import CmdLineInputError
19 from pcs.cli.common.output import (
20 INDENT_STEP,
21 lines_to_str,
22 )
23 from pcs.cli.constraint.location import command as location_command
24 from pcs.cli.constraint.output import (
25 CibConstraintLocationAnyDto,
26 filter_constraints_by_rule_expired_status,
27 location,
28 print_config,
29 )
30 from pcs.cli.reports import process_library_reports
31 from pcs.cli.reports.output import (
32 deprecation_warning,
33 print_to_stderr,
34 warn,
35 )
36 from pcs.common import (
37 const,
38 pacemaker,
39 reports,
40 )
41 from pcs.common.pacemaker.constraint import (
42 CibConstraintColocationSetDto,
43 CibConstraintLocationSetDto,
44 CibConstraintOrderSetDto,
45 CibConstraintsDto,
46 CibConstraintTicketSetDto,
47 get_all_constraints_ids,
48 )
49 from pcs.common.pacemaker.resource.list import CibResourcesDto
50 from pcs.common.pacemaker.types import CibResourceDiscovery
51 from pcs.common.reports import ReportItem
52 from pcs.common.str_tools import (
53 format_list,
54 indent,
55 )
56 from pcs.common.types import (
57 StringCollection,
58 StringIterable,
59 StringSequence,
60 )
61 from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
62 from pcs.lib.node import get_existing_nodes_names
63 from pcs.lib.pacemaker.values import (
64 SCORE_INFINITY,
65 is_true,
66 sanitize_id,
67 )
68
69 # pylint: disable=invalid-name
70 # pylint: disable=too-many-branches
71 # pylint: disable=too-many-lines
72 # pylint: disable=too-many-locals
73 # pylint: disable=too-many-nested-blocks
74 # pylint: disable=too-many-statements
75
76 DEFAULT_ACTION = const.PCMK_ACTION_START
77 DEFAULT_ROLE = const.PCMK_ROLE_STARTED
78
79 OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
80
81 LOCATION_NODE_VALIDATION_SKIP_MSG = (
82 "Validation for node existence in the cluster will be skipped"
83 )
84
85 RESOURCE_TYPE_RESOURCE = "resource"
86 RESOURCE_TYPE_REGEXP = "regexp"
87
88
89 class CrmRuleReturnCode(Enum):
90 IN_EFFECT = 0
91 EXPIRED = 110
92 TO_BE_IN_EFFECT = 111
93
94
95 def constraint_location_cmd(lib, argv, modifiers):
96 sub_cmd = "config" if not argv else argv.pop(0)
97
98 try:
99 if sub_cmd == "add":
100 location_add(lib, argv, modifiers)
101 elif sub_cmd in ["remove", "delete"]:
102 location_command.remove(lib, argv, modifiers)
103 elif sub_cmd == "show":
104 location_show(lib, argv, modifiers)
105 elif sub_cmd == "config":
106 location_config_cmd(lib, argv, modifiers)
107 elif len(argv) >= 2:
108 if argv[0] == "rule":
109 location_rule(lib, [sub_cmd] + argv, modifiers)
110 else:
111 location_prefer(lib, [sub_cmd] + argv, modifiers)
112 else:
113 raise CmdLineInputError()
114 except CmdLineInputError as e:
115 utils.exit_on_cmdline_input_error(
116 e, "constraint", ["location", sub_cmd]
117 )
118
119
120 def constraint_order_cmd(lib, argv, modifiers):
121 sub_cmd = "config" if not argv else argv.pop(0)
122
123 try:
124 if sub_cmd == "set":
125 order_command.create_with_set(lib, argv, modifiers)
126 elif sub_cmd in ["remove", "delete"]:
127 order_rm(lib, argv, modifiers)
128 elif sub_cmd == "show":
129 order_command.show(lib, argv, modifiers)
130 elif sub_cmd == "config":
131 order_command.config_cmd(lib, argv, modifiers)
132 else:
133 order_start(lib, [sub_cmd] + argv, modifiers)
134 except CmdLineInputError as e:
135 utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
136
137
138 def constraint_show(lib, argv, modifiers):
139 deprecation_warning(
140 "This command is deprecated and will be removed. "
141 "Please use 'pcs constraint config' instead."
142 )
143 config_cmd(lib, argv, modifiers)
144
145
146 def config_cmd(
147 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
148 ) -> None:
149 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
150 if argv:
151 raise CmdLineInputError()
152
153 print_config(
154 cast(
155 CibConstraintsDto,
156 lib.constraint.get_config(evaluate_rules=True),
157 ),
158 modifiers,
159 )
160
161
162 def _validate_constraint_resource(cib_dom, resource_id):
163 (
164 resource_valid,
165 resource_error,
166 dummy_correct_id,
167 ) = utils.validate_constraint_resource(cib_dom, resource_id)
168 if not resource_valid:
169 utils.err(resource_error)
170
171
172 def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
173 if not utils.validate_resources_not_in_same_group(
174 cib_dom, resource1, resource2
175 ):
176 utils.err(
177 "Cannot create an order constraint for resources in the same group"
178 )
179
180
181 # Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
182 # possible commands:
183 # <src> with <tgt> [score] [options]
184 # <src> with <role> <tgt> [score] [options]
185 # <role> <src> with <tgt> [score] [options]
186 # <role> <src> with <role> <tgt> [score] [options]
187 def colocation_add(lib, argv, modifiers): # noqa: PLR0912, PLR0915
188 """
189 Options:
190 * -f - CIB file
191 * --force - allow constraint on any resource, allow duplicate constraints
192 """
193
194 def _parse_score_options(argv):
195 # When passed an array of arguments if the first argument doesn't have
196 # an '=' then it's the score, otherwise they're all arguments. Return a
197 # tuple with the score and array of name,value pairs
198 """
199 Commandline options: no options
200 """
201 if not argv:
202 return SCORE_INFINITY, []
203 score = SCORE_INFINITY if "=" in argv[0] else argv.pop(0)
204 # create a list of 2-tuples (name, value)
205 arg_array = [
206 parse_args.split_option(arg, allow_empty_value=False)
207 for arg in argv
208 ]
209 return score, arg_array
210
211 del lib
212 modifiers.ensure_only_supported("-f", "--force")
213 if len(argv) < 3:
214 raise CmdLineInputError()
215
216 role1 = ""
217 role2 = ""
218
219 cib_dom = utils.get_cib_dom()
220 new_roles_supported = utils.isCibVersionSatisfied(
221 cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
222 )
223
224 def _validate_and_prepare_role(role):
225 role_cleaned = role.lower().capitalize()
226 if role_cleaned not in const.PCMK_ROLES:
227 utils.err(
228 "invalid role value '{0}', allowed values are: {1}".format(
229 role, format_list(const.PCMK_ROLES)
230 )
231 )
232 utils.print_deprecation_warning_for_legacy_roles(role)
233 return pacemaker.role.get_value_for_cib(
234 role_cleaned, new_roles_supported
235 )
236
237 if argv[2] == "with":
238 role1 = _validate_and_prepare_role(argv.pop(0))
239 resource1 = argv.pop(0)
240 elif argv[1] == "with":
241 resource1 = argv.pop(0)
242 else:
243 raise CmdLineInputError()
244
245 if argv.pop(0) != "with":
246 raise CmdLineInputError()
247 if "with" in argv:
248 raise CmdLineInputError(
249 message="Multiple 'with's cannot be specified.",
250 hint=(
251 "Use the 'pcs constraint colocation set' command if you want "
252 "to create a constraint for more than two resources."
253 ),
254 show_both_usage_and_message=True,
255 )
256
257 if not argv:
258 raise CmdLineInputError()
259 if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
260 resource2 = argv.pop(0)
261 else:
262 role2 = _validate_and_prepare_role(argv.pop(0))
263 resource2 = argv.pop(0)
264
265 score, nv_pairs = _parse_score_options(argv)
266
267 _validate_constraint_resource(cib_dom, resource1)
268 _validate_constraint_resource(cib_dom, resource2)
269
270 id_in_nvpairs = None
271 for name, value in nv_pairs:
272 if name == "id":
273 id_valid, id_error = utils.validate_xml_id(value, "constraint id")
274 if not id_valid:
275 utils.err(id_error)
276 if utils.does_id_exist(cib_dom, value):
277 utils.err(
278 "id '%s' is already in use, please specify another one"
279 % value
280 )
281 id_in_nvpairs = True
282 if not id_in_nvpairs:
283 nv_pairs.append(
284 (
285 "id",
286 utils.find_unique_id(
287 cib_dom,
288 "colocation-%s-%s-%s" % (resource1, resource2, score),
289 ),
290 )
291 )
292
293 (dom, constraintsElement) = getCurrentConstraints(cib_dom)
294
295 # If one role is specified, the other should default to "started"
296 if role1 != "" and role2 == "":
297 role2 = DEFAULT_ROLE
298 if role2 != "" and role1 == "":
299 role1 = DEFAULT_ROLE
300 element = dom.createElement("rsc_colocation")
301 element.setAttribute("rsc", resource1)
302 element.setAttribute("with-rsc", resource2)
303 element.setAttribute("score", score)
304 if role1 != "":
305 element.setAttribute("rsc-role", role1)
306 if role2 != "":
307 element.setAttribute("with-rsc-role", role2)
308 for nv_pair in nv_pairs:
309 element.setAttribute(nv_pair[0], nv_pair[1])
310 if not modifiers.get("--force"):
311
312 def _constraint_export(constraint_info):
313 options_dict = constraint_info["options"]
314 co_resource1 = options_dict.get("rsc", "")
315 co_resource2 = options_dict.get("with-rsc", "")
316 co_id = options_dict.get("id", "")
317 co_score = options_dict.get("score", "")
318 score_text = "(score:" + co_score + ")"
319 console_option_list = [
320 f"({option[0]}:{option[1]})"
321 for option in sorted(options_dict.items())
322 if option[0] not in ("rsc", "with-rsc", "id", "score")
323 ]
324 console_option_list.append(f"(id:{co_id})")
325 return " ".join(
326 [co_resource1, "with", co_resource2, score_text]
327 + console_option_list
328 )
329
330 duplicates = colocation_find_duplicates(constraintsElement, element)
331 if duplicates:
332 utils.err(
333 "duplicate constraint already exists, use --force to override\n"
334 + "\n".join(
335 [
336 " "
337 + _constraint_export(
338 {"options": dict(dup.attributes.items())}
339 )
340 for dup in duplicates
341 ]
342 )
343 )
344 constraintsElement.appendChild(element)
345 utils.replace_cib_configuration(dom)
346
347
348 def colocation_find_duplicates(dom, constraint_el):
349 """
350 Commandline options: no options
351 """
352 new_roles_supported = utils.isCibVersionSatisfied(
353 dom, const.PCMK_NEW_ROLES_CIB_VERSION
354 )
355
356 def normalize(const_el):
357 return (
358 const_el.getAttribute("rsc"),
359 const_el.getAttribute("with-rsc"),
360 pacemaker.role.get_value_for_cib(
361 const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
362 new_roles_supported,
363 ),
364 pacemaker.role.get_value_for_cib(
365 const_el.getAttribute("with-rsc-role").capitalize()
366 or DEFAULT_ROLE,
367 new_roles_supported,
368 ),
369 )
370
371 normalized_el = normalize(constraint_el)
372 return [
373 other_el
374 for other_el in dom.getElementsByTagName("rsc_colocation")
375 if not other_el.getElementsByTagName("resource_set")
376 and constraint_el is not other_el
377 and normalized_el == normalize(other_el)
378 ]
379
380
381 def order_rm(lib, argv, modifiers):
382 """
383 Options:
384 * -f - CIB file
385 """
386 del lib
387 modifiers.ensure_only_supported("-f")
388 if not argv:
389 raise CmdLineInputError()
390
391 elementFound = False
392 (dom, constraintsElement) = getCurrentConstraints()
393
394 for resource in argv:
395 for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
396 if (
397 ord_loc.getAttribute("first") == resource
398 or ord_loc.getAttribute("then") == resource
399 ):
400 constraintsElement.removeChild(ord_loc)
401 elementFound = True
402
403 resource_refs_to_remove = []
404 for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
405 if ord_set.getAttribute("id") == resource:
406 resource_refs_to_remove.append(ord_set)
407 elementFound = True
408
409 for res_ref in resource_refs_to_remove:
410 res_set = res_ref.parentNode
411 res_order = res_set.parentNode
412
413 res_ref.parentNode.removeChild(res_ref)
414 if not res_set.getElementsByTagName("resource_ref"):
415 res_set.parentNode.removeChild(res_set)
416 if not res_order.getElementsByTagName("resource_set"):
417 res_order.parentNode.removeChild(res_order)
418
419 if elementFound:
420 utils.replace_cib_configuration(dom)
421 else:
422 utils.err("No matching resources found in ordering list")
423
424
425 def order_start(lib, argv, modifiers):
426 """
427 Options:
428 * -f - CIB file
429 * --force - allow constraint for any resource, allow duplicate constraints
430 """
431 del lib
432 modifiers.ensure_only_supported("-f", "--force")
433 if len(argv) < 3:
434 raise CmdLineInputError()
435
436 first_action = DEFAULT_ACTION
437 then_action = DEFAULT_ACTION
438 action = argv[0]
439 if action in const.PCMK_ACTIONS:
440 first_action = action
441 argv.pop(0)
442
443 resource1 = argv.pop(0)
444 if argv.pop(0) != "then":
445 raise CmdLineInputError()
446
447 if not argv:
448 raise CmdLineInputError()
449
450 action = argv[0]
451 if action in const.PCMK_ACTIONS:
452 then_action = action
453 argv.pop(0)
454
455 if not argv:
456 raise CmdLineInputError()
457 resource2 = argv.pop(0)
458
459 order_options = []
460 if argv:
461 order_options = order_options + argv[:]
462 if "then" in order_options:
463 raise CmdLineInputError(
464 message="Multiple 'then's cannot be specified.",
465 hint=(
466 "Use the 'pcs constraint order set' command if you want to "
467 "create a constraint for more than two resources."
468 ),
469 show_both_usage_and_message=True,
470 )
471
472 order_options.append("first-action=" + first_action)
473 order_options.append("then-action=" + then_action)
474 _order_add(resource1, resource2, order_options, modifiers)
475
476
477 def _order_add(resource1, resource2, options_list, modifiers): # noqa: PLR0912, PLR0915
478 """
479 Commandline options:
480 * -f - CIB file
481 * --force - allow constraint for any resource, allow duplicate constraints
482 """
483 cib_dom = utils.get_cib_dom()
484 _validate_constraint_resource(cib_dom, resource1)
485 _validate_constraint_resource(cib_dom, resource2)
486
487 _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
488
489 order_options = []
490 id_specified = False
491 sym = None
492 for arg in options_list:
493 if arg == "symmetrical":
494 sym = "true"
495 elif arg == "nonsymmetrical":
496 sym = "false"
497 else:
498 name, value = parse_args.split_option(arg, allow_empty_value=False)
499 if name == "id":
500 id_valid, id_error = utils.validate_xml_id(
501 value, "constraint id"
502 )
503 if not id_valid:
504 utils.err(id_error)
505 if utils.does_id_exist(cib_dom, value):
506 utils.err(
507 "id '%s' is already in use, please specify another one"
508 % value
509 )
510 id_specified = True
511 order_options.append((name, value))
512 elif name == "symmetrical":
513 if value.lower() in OPTIONS_SYMMETRICAL:
514 sym = value.lower()
515 else:
516 utils.err(
517 "invalid symmetrical value '%s', allowed values are: %s"
518 % (value, ", ".join(OPTIONS_SYMMETRICAL))
519 )
520 else:
521 order_options.append((name, value))
522 if sym:
523 order_options.append(("symmetrical", sym))
524
525 options = ""
526 if order_options:
527 options = " (Options: %s)" % " ".join(
528 [
529 "%s=%s" % (name, value)
530 for name, value in order_options
531 if name not in ("kind", "score")
532 ]
533 )
534
535 scorekind = "kind: Mandatory"
536 id_suffix = "mandatory"
537 for opt in order_options:
538 if opt[0] == "score":
539 scorekind = "score: " + opt[1]
540 id_suffix = opt[1]
541 # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
542 # added to pcs after 0.11.7
543 deprecation_warning(
544 reports.messages.DeprecatedOption(opt[0], []).message
545 )
546 break
547 if opt[0] == "kind":
548 scorekind = "kind: " + opt[1]
549 id_suffix = opt[1]
550 break
551
552 if not id_specified:
553 order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
554 order_id = utils.find_unique_id(cib_dom, order_id)
555 order_options.append(("id", order_id))
556
557 (dom, constraintsElement) = getCurrentConstraints()
558 element = dom.createElement("rsc_order")
559 element.setAttribute("first", resource1)
560 element.setAttribute("then", resource2)
561 for order_opt in order_options:
562 element.setAttribute(order_opt[0], order_opt[1])
563 constraintsElement.appendChild(element)
564 if not modifiers.get("--force"):
565
566 def _constraint_export(constraint_info):
567 options = constraint_info["options"]
568 oc_resource1 = options.get("first", "")
569 oc_resource2 = options.get("then", "")
570 first_action = options.get("first-action", "")
571 then_action = options.get("then-action", "")
572 oc_id = options.get("id", "")
573 oc_score = options.get("score", "")
574 oc_kind = options.get("kind", "")
575 oc_sym = ""
576 oc_id_out = ""
577 oc_options = ""
578 if "symmetrical" in options and not is_true(
579 options.get("symmetrical", "false")
580 ):
581 oc_sym = "(non-symmetrical)"
582 if oc_kind != "":
583 score_text = "(kind:" + oc_kind + ")"
584 elif oc_kind == "" and oc_score == "":
585 score_text = "(kind:Mandatory)"
586 else:
587 score_text = "(score:" + oc_score + ")"
588 oc_id_out = "(id:" + oc_id + ")"
589 already_processed_options = (
590 "first",
591 "then",
592 "first-action",
593 "then-action",
594 "id",
595 "score",
596 "kind",
597 "symmetrical",
598 )
599 oc_options = " ".join(
600 [
601 f"{name}={value}"
602 for name, value in options.items()
603 if name not in already_processed_options
604 ]
605 )
606 if oc_options:
607 oc_options = "(Options: " + oc_options + ")"
608 return " ".join(
609 [
610 arg
611 for arg in [
612 first_action,
613 oc_resource1,
614 "then",
615 then_action,
616 oc_resource2,
617 score_text,
618 oc_sym,
619 oc_options,
620 oc_id_out,
621 ]
622 if arg
623 ]
624 )
625
626 duplicates = order_find_duplicates(constraintsElement, element)
627 if duplicates:
628 utils.err(
629 "duplicate constraint already exists, use --force to override\n"
630 + "\n".join(
631 [
632 " "
633 + _constraint_export(
634 {"options": dict(dup.attributes.items())}
635 )
636 for dup in duplicates
637 ]
638 )
639 )
640 print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
641 utils.replace_cib_configuration(dom)
642
643
644 def order_find_duplicates(dom, constraint_el):
645 """
646 Commandline options: no options
647 """
648
649 def normalize(constraint_el):
650 return (
651 constraint_el.getAttribute("first"),
652 constraint_el.getAttribute("then"),
653 constraint_el.getAttribute("first-action").lower()
654 or DEFAULT_ACTION,
655 constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
656 )
657
658 normalized_el = normalize(constraint_el)
659 return [
660 other_el
661 for other_el in dom.getElementsByTagName("rsc_order")
662 if not other_el.getElementsByTagName("resource_set")
663 and constraint_el is not other_el
664 and normalized_el == normalize(other_el)
665 ]
666
667
668 def location_show(lib, argv, modifiers):
669 deprecation_warning(
670 "This command is deprecated and will be removed. "
671 "Please use 'pcs constraint location config' instead."
672 )
673 return location_config_cmd(lib, argv, modifiers)
674
675
676 _SetConstraint = TypeVar(
677 "_SetConstraint",
678 CibConstraintLocationSetDto,
679 CibConstraintColocationSetDto,
680 CibConstraintOrderSetDto,
681 CibConstraintTicketSetDto,
682 )
683
684
685 def _filter_set_constraints_by_resources(
686 constraints_dto: Iterable[_SetConstraint], resources: Set[str]
687 ) -> list[_SetConstraint]:
688 return [
689 constraint_set_dto
690 for constraint_set_dto in constraints_dto
691 if any(
692 set(resource_set.resources_ids) & resources
693 for resource_set in constraint_set_dto.resource_sets
694 )
695 ]
696
697
698 def _filter_constraints_by_resources(
699 constraints_dto: CibConstraintsDto,
700 resources: StringIterable,
701 patterns: StringIterable,
702 ) -> CibConstraintsDto:
703 required_resources_set = set(resources)
704 required_patterns_set = set(patterns)
705 return CibConstraintsDto(
706 location=[
707 constraint_dto
708 for constraint_dto in constraints_dto.location
709 if (
710 constraint_dto.resource_id is not None
711 and constraint_dto.resource_id in required_resources_set
712 )
713 or (
714 constraint_dto.resource_pattern is not None
715 and constraint_dto.resource_pattern in required_patterns_set
716 )
717 ],
718 location_set=_filter_set_constraints_by_resources(
719 constraints_dto.location_set, required_resources_set
720 ),
721 colocation=[
722 constraint_dto
723 for constraint_dto in constraints_dto.colocation
724 if {constraint_dto.resource_id, constraint_dto.with_resource_id}
725 & required_resources_set
726 ],
727 colocation_set=_filter_set_constraints_by_resources(
728 constraints_dto.colocation_set, required_resources_set
729 ),
730 order=[
731 constraint_dto
732 for constraint_dto in constraints_dto.order
733 if {
734 constraint_dto.first_resource_id,
735 constraint_dto.then_resource_id,
736 }
737 & required_resources_set
738 ],
739 order_set=_filter_set_constraints_by_resources(
740 constraints_dto.order_set, required_resources_set
741 ),
742 ticket=[
743 constraint_dto
744 for constraint_dto in constraints_dto.ticket
745 if constraint_dto.resource_id in required_resources_set
746 ],
747 ticket_set=_filter_set_constraints_by_resources(
748 constraints_dto.ticket_set, required_resources_set
749 ),
750 )
751
752
753 def _filter_location_by_node_base(
754 constraint_dtos: Iterable[CibConstraintLocationAnyDto],
755 nodes: StringCollection,
756 ) -> list[CibConstraintLocationAnyDto]:
757 return [
758 constraint_dto
759 for constraint_dto in constraint_dtos
760 if constraint_dto.attributes.node is not None
761 and constraint_dto.attributes.node in nodes
762 ]
763
764
765 def location_config_cmd(
766 lib: Any, argv: StringSequence, modifiers: parse_args.InputModifiers
767 ) -> None:
768 """
769 Options:
770 * --all - print expired constraints
771 * --full - print all details
772 * -f - CIB file
773 """
774 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
775 filter_type: Optional[str] = None
776 filter_items: parse_args.Argv = []
777 if argv:
778 filter_type, *filter_items = argv
779 allowed_types = ("resources", "nodes")
780 if filter_type not in allowed_types:
781 raise CmdLineInputError(
782 f"Unknown keyword '{filter_type}'. Allowed keywords: "
783 f"{format_list(allowed_types)}"
784 )
785 if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
786 raise CmdLineInputError(
787 "Output formats other than 'text' are not supported together "
788 "with grouping and filtering by nodes or resources"
789 )
790
791 constraints_dto = filter_constraints_by_rule_expired_status(
792 lib.constraint.get_config(evaluate_rules=True),
793 modifiers.is_specified("--all"),
794 )
795
796 constraints_dto = CibConstraintsDto(
797 location=constraints_dto.location,
798 location_set=constraints_dto.location_set,
799 )
800
801 def _print_lines(lines: StringSequence) -> None:
802 if lines:
803 print("Location Constraints:")
804 print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
805
806 if filter_type == "resources":
807 if filter_items:
808 resources = []
809 patterns = []
810 for item in filter_items:
811 item_type, item_value = parse_args.parse_typed_arg(
812 item,
813 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
814 RESOURCE_TYPE_RESOURCE,
815 )
816 if item_type == RESOURCE_TYPE_RESOURCE:
817 resources.append(item_value)
818 elif item_type == RESOURCE_TYPE_REGEXP:
819 patterns.append(item_value)
820 constraints_dto = _filter_constraints_by_resources(
821 constraints_dto, resources, patterns
822 )
823 _print_lines(
824 location.constraints_to_grouped_by_resource_text(
825 constraints_dto.location,
826 modifiers.is_specified("--full"),
827 )
828 )
829 return
830 if filter_type == "nodes":
831 if filter_items:
832 constraints_dto = CibConstraintsDto(
833 location=_filter_location_by_node_base(
834 constraints_dto.location, filter_items
835 ),
836 location_set=_filter_location_by_node_base(
837 constraints_dto.location_set, filter_items
838 ),
839 )
840 _print_lines(
841 location.constraints_to_grouped_by_node_text(
842 constraints_dto.location,
843 modifiers.is_specified("--full"),
844 )
845 )
846 return
847
848 print_config(constraints_dto, modifiers)
849
850
851 def _verify_node_name(node, existing_nodes):
852 report_list = []
853 if node not in existing_nodes:
854 report_list.append(
855 ReportItem.error(
856 reports.messages.NodeNotFound(node),
857 force_code=reports.codes.FORCE,
858 )
859 )
860 return report_list
861
862
863 def _verify_score(score):
864 if not utils.is_score(score):
865 utils.err(
866 "invalid score '%s', use integer or INFINITY or -INFINITY" % score
867 )
868
869
870 def location_prefer( # noqa: PLR0912
871 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
872 ) -> None:
873 """
874 Options:
875 * --force - allow unknown options, allow constraint for any resource type
876 * -f - CIB file
877 """
878 modifiers.ensure_only_supported("--force", "-f")
879 rsc = argv.pop(0)
880 prefer_option = argv.pop(0)
881
882 dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
883 rsc,
884 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
885 RESOURCE_TYPE_RESOURCE,
886 )
887
888 if prefer_option == "prefers":
889 prefer = True
890 elif prefer_option == "avoids":
891 prefer = False
892 else:
893 raise CmdLineInputError()
894
895 skip_node_check = False
896 existing_nodes: list[str] = []
897 if modifiers.is_specified("-f") or modifiers.get("--force"):
898 skip_node_check = True
899 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
900 else:
901 lib_env = utils.get_lib_env()
902 existing_nodes, report_list = get_existing_nodes_names(
903 corosync_conf=lib_env.get_corosync_conf(),
904 cib=lib_env.get_cib(),
905 )
906 if report_list:
907 process_library_reports(report_list)
908
909 report_list = []
910 parameters_list = []
911 for nodeconf in argv:
912 nodeconf_a = nodeconf.split("=", 1)
913 node = nodeconf_a[0]
914 if not skip_node_check:
915 report_list += _verify_node_name(node, existing_nodes)
916 if len(nodeconf_a) == 1:
917 score = "INFINITY" if prefer else "-INFINITY"
918 else:
919 score = nodeconf_a[1]
920 _verify_score(score)
921 if not prefer:
922 score = score[1:] if score[0] == "-" else "-" + score
923
924 parameters_list.append(
925 [
926 sanitize_id(f"location-{rsc_value}-{node}-{score}"),
927 rsc,
928 node,
929 score,
930 ]
931 )
932
933 if report_list:
934 process_library_reports(report_list)
935
936 modifiers = modifiers.get_subset("--force", "-f")
937
938 for parameters in parameters_list:
939 location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
940
941
942 def location_add( # noqa: PLR0912, PLR0915
943 lib: Any,
944 argv: parse_args.Argv,
945 modifiers: parse_args.InputModifiers,
946 skip_score_and_node_check: bool = False,
947 ) -> None:
948 """
949 Options:
950 * --force - allow unknown options, allow constraint for any resource type
951 * -f - CIB file
952 """
953 del lib
954 modifiers.ensure_only_supported("--force", "-f")
955 if len(argv) < 4:
956 raise CmdLineInputError()
957
958 constraint_id = argv.pop(0)
959 rsc_type, rsc_value = parse_args.parse_typed_arg(
960 argv.pop(0),
961 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
962 RESOURCE_TYPE_RESOURCE,
963 )
964 node = argv.pop(0)
965 score = argv.pop(0)
966 options = []
967 # For now we only allow setting resource-discovery
968 if argv:
969 for arg in argv:
970 if "=" in arg:
971 name, value = arg.split("=", 1)
972 if name == "resource-discovery":
973 if not modifiers.get("--force"):
974 allowed_discovery = list(
975 map(
976 str,
977 [
978 CibResourceDiscovery.ALWAYS,
979 CibResourceDiscovery.EXCLUSIVE,
980 CibResourceDiscovery.NEVER,
981 ],
982 )
983 )
984 if value not in allowed_discovery:
985 utils.err(
986 (
987 "invalid {0} value '{1}', allowed values are: "
988 "{2}, use --force to override"
989 ).format(
990 name, value, format_list(allowed_discovery)
991 )
992 )
993 options.append([name, value])
994 else:
995 raise CmdLineInputError(f"bad option '{arg}'")
996 if options[-1][0] != "resource-discovery" and not modifiers.get(
997 "--force"
998 ):
999 utils.err(
1000 "bad option '%s', use --force to override" % options[-1][0]
1001 )
1002
1003 # Verify that specified node exists in the cluster and score is valid
1004 if not skip_score_and_node_check:
1005 if modifiers.is_specified("-f") or modifiers.get("--force"):
1006 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
1007 else:
1008 lib_env = utils.get_lib_env()
1009 existing_nodes, report_list = get_existing_nodes_names(
1010 corosync_conf=lib_env.get_corosync_conf(),
1011 cib=lib_env.get_cib(),
1012 )
1013 report_list += _verify_node_name(node, existing_nodes)
1014 if report_list:
1015 process_library_reports(report_list)
1016 _verify_score(score)
1017
1018 id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
1019 if not id_valid:
1020 utils.err(id_error)
1021
1022 dom = utils.get_cib_dom()
1023
1024 if rsc_type == RESOURCE_TYPE_RESOURCE:
1025 (
1026 rsc_valid,
1027 rsc_error,
1028 dummy_correct_id,
1029 ) = utils.validate_constraint_resource(dom, rsc_value)
1030 if not rsc_valid:
1031 utils.err(rsc_error)
1032
1033 # Verify current constraint doesn't already exist
1034 # If it does we replace it with the new constraint
1035 dummy_dom, constraintsElement = getCurrentConstraints(dom)
1036 # If the id matches, or the rsc & node match, then we replace/remove
1037 elementsToRemove = [
1038 rsc_loc
1039 for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1040 # pylint: disable=too-many-boolean-expressions
1041 if rsc_loc.getAttribute("id") == constraint_id
1042 or (
1043 rsc_loc.getAttribute("node") == node
1044 and (
1045 (
1046 rsc_type == RESOURCE_TYPE_RESOURCE
1047 and rsc_loc.getAttribute("rsc") == rsc_value
1048 )
1049 or (
1050 rsc_type == RESOURCE_TYPE_REGEXP
1051 and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1052 )
1053 )
1054 )
1055 ]
1056 for etr in elementsToRemove:
1057 constraintsElement.removeChild(etr)
1058
1059 element = dom.createElement("rsc_location")
1060 element.setAttribute("id", constraint_id)
1061 if rsc_type == RESOURCE_TYPE_RESOURCE:
1062 element.setAttribute("rsc", rsc_value)
1063 elif rsc_type == RESOURCE_TYPE_REGEXP:
1064 element.setAttribute("rsc-pattern", rsc_value)
1065 element.setAttribute("node", node)
1066 element.setAttribute("score", score)
1067 for option in options:
1068 element.setAttribute(option[0], option[1])
1069 constraintsElement.appendChild(element)
1070
1071 utils.replace_cib_configuration(dom)
1072
1073
1074 def location_rule(lib, argv, modifiers): # noqa: PLR0912
1075 """
1076 Options:
1077 * -f - CIB file
1078 * --force - allow constraint on any resource type, allow duplicate
1079 constraints
1080 """
1081 del lib
1082 modifiers.ensure_only_supported("-f", "--force")
1083 if len(argv) < 3:
1084 raise CmdLineInputError()
1085
1086 rsc_type, rsc_value = parse_args.parse_typed_arg(
1087 argv.pop(0),
1088 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
1089 RESOURCE_TYPE_RESOURCE,
1090 )
1091 argv.pop(0) # pop "rule"
1092 options, rule_argv = rule_utils.parse_argv(
1093 argv,
1094 {
1095 "constraint-id": None,
1096 "resource-discovery": None,
1097 },
1098 )
1099 resource_discovery = (
1100 "resource-discovery" in options and options["resource-discovery"]
1101 )
1102
1103 try:
1104 # Parse the rule to see if we need to upgrade CIB schema. All errors
1105 # would be properly reported by a validator called bellow, so we can
1106 # safely ignore them here.
1107 parsed_rule = rule_utils.RuleParser().parse(
1108 rule_utils.TokenPreprocessor().run(rule_argv)
1109 )
1110 if rule_utils.has_node_attr_expr_with_type_integer(parsed_rule):
1111 utils.checkAndUpgradeCIB(
1112 const.PCMK_RULES_NODE_ATTR_EXPR_WITH_INT_TYPE_CIB_VERSION
1113 )
1114 except (rule_utils.ParserException, rule_utils.CibBuilderException):
1115 pass
1116
1117 dom = utils.get_cib_dom()
1118
1119 if rsc_type == RESOURCE_TYPE_RESOURCE:
1120 (
1121 rsc_valid,
1122 rsc_error,
1123 dummy_correct_id,
1124 ) = utils.validate_constraint_resource(dom, rsc_value)
1125 if not rsc_valid:
1126 utils.err(rsc_error)
1127
1128 cib, constraints = getCurrentConstraints(dom)
1129 lc = cib.createElement("rsc_location")
1130
1131 # If resource-discovery is specified, we use it with the rsc_location
1132 # element not the rule
1133 if resource_discovery:
1134 if not modifiers.get("--force"):
1135 allowed_discovery = list(
1136 map(
1137 str,
1138 [
1139 CibResourceDiscovery.ALWAYS,
1140 CibResourceDiscovery.EXCLUSIVE,
1141 CibResourceDiscovery.NEVER,
1142 ],
1143 )
1144 )
1145 if resource_discovery not in allowed_discovery:
1146 utils.err(
1147 (
1148 "invalid {0} value '{1}', allowed values are: {2}, "
1149 "use --force to override"
1150 ).format(
1151 "resource-discovery",
1152 resource_discovery,
1153 format_list(allowed_discovery),
1154 )
1155 )
1156 lc.setAttribute("resource-discovery", options.pop("resource-discovery"))
1157
1158 constraints.appendChild(lc)
1159 if options.get("constraint-id"):
1160 id_valid, id_error = utils.validate_xml_id(
1161 options["constraint-id"], "constraint id"
1162 )
1163 if not id_valid:
1164 utils.err(id_error)
1165 if utils.does_id_exist(dom, options["constraint-id"]):
1166 utils.err(
1167 "id '%s' is already in use, please specify another one"
1168 % options["constraint-id"]
1169 )
1170 lc.setAttribute("id", options["constraint-id"])
1171 del options["constraint-id"]
1172 else:
1173 lc.setAttribute(
1174 "id",
1175 utils.find_unique_id(dom, sanitize_id("location-" + rsc_value)),
1176 )
1177 if rsc_type == RESOURCE_TYPE_RESOURCE:
1178 lc.setAttribute("rsc", rsc_value)
1179 elif rsc_type == RESOURCE_TYPE_REGEXP:
1180 lc.setAttribute("rsc-pattern", rsc_value)
1181
1182 rule_utils.dom_rule_add(
1183 lc, options, rule_argv, utils.getValidateWithVersion(cib)
1184 )
1185 location_rule_check_duplicates(constraints, lc, modifiers.get("--force"))
1186 utils.replace_cib_configuration(cib)
1187
1188
1189 def location_rule_check_duplicates(dom, constraint_el, force):
1190 """
1191 Commandline options: no options
1192 """
1193 if not force:
1194 duplicates = location_rule_find_duplicates(dom, constraint_el)
1195 if duplicates:
1196 lines = []
1197 for dup in duplicates:
1198 lines.append(" Constraint: %s" % dup.getAttribute("id"))
1199 lines.extend(
1200 rule_utils.ExportDetailed().get_string(
1201 dup_rule, False, True, indent=" "
1202 )
1203 for dup_rule in utils.dom_get_children_by_tag_name(
1204 dup, "rule"
1205 )
1206 )
1207 utils.err(
1208 "duplicate constraint already exists, use --force to override\n"
1209 + "\n".join(lines)
1210 )
1211
1212
1213 def location_rule_find_duplicates(dom, constraint_el):
1214 """
1215 Commandline options: no options
1216 """
1217
1218 def normalize(constraint_el):
1219 if constraint_el.hasAttribute("rsc-pattern"):
1220 rsc = (
1221 RESOURCE_TYPE_REGEXP,
1222 constraint_el.getAttribute("rsc-pattern"),
1223 )
1224 else:
1225 rsc = (RESOURCE_TYPE_RESOURCE, constraint_el.getAttribute("rsc"))
1226 return (
1227 rsc,
1228 [
1229 rule_utils.ExportAsExpression().get_string(rule_el, True)
1230 for rule_el in constraint_el.getElementsByTagName("rule")
1231 ],
1232 )
1233
1234 normalized_el = normalize(constraint_el)
1235 return [
1236 other_el
1237 for other_el in dom.getElementsByTagName("rsc_location")
1238 if other_el.getElementsByTagName("rule")
1239 and constraint_el is not other_el
1240 and normalized_el == normalize(other_el)
1241 ]
1242
1243
1244 # Grabs the current constraints and returns the dom and constraint element
1245 def getCurrentConstraints(passed_dom=None):
1246 """
1247 Commandline options:
1248 * -f - CIB file, only if passed_dom is None
1249 """
1250 if passed_dom:
1251 dom = passed_dom
1252 else:
1253 current_constraints_xml = utils.get_cib_xpath("//constraints")
1254 if current_constraints_xml == "":
1255 utils.err("unable to process cib")
1256 # Verify current constraint doesn't already exist
1257 # If it does we replace it with the new constraint
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1258 dom = parseString(current_constraints_xml)
1259
1260 constraintsElement = dom.getElementsByTagName("constraints")[0]
1261 return (dom, constraintsElement)
1262
1263
1264 # If returnStatus is set, then we don't error out, we just print the error
1265 # and return false
1266 def constraint_rm( # noqa: PLR0912
1267 lib,
1268 argv,
1269 modifiers,
1270 returnStatus=False,
1271 constraintsElement=None,
1272 passed_dom=None,
1273 ):
1274 """
1275 Options:
1276 * -f - CIB file, effective only if passed_dom is None
1277 """
1278 if passed_dom is None:
1279 modifiers.ensure_only_supported("-f")
1280 if not argv:
1281 raise CmdLineInputError()
1282
1283 bad_constraint = False
1284 if len(argv) != 1:
1285 for arg in argv:
1286 if not constraint_rm(
1287 lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1288 ):
1289 bad_constraint = True
1290 if bad_constraint:
1291 sys.exit(1)
1292 return None
1293
1294 c_id = argv.pop(0)
1295 elementFound = False
1296 dom = None
1297 use_cibadmin = False
1298 if not constraintsElement:
1299 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1300 use_cibadmin = True
1301
1302 for co in constraintsElement.childNodes[:]:
1303 if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1304 continue
1305 if co.getAttribute("id") == c_id:
1306 constraintsElement.removeChild(co)
1307 elementFound = True
1308
1309 if not elementFound:
1310 for rule in constraintsElement.getElementsByTagName("rule")[:]:
1311 if rule.getAttribute("id") == c_id:
1312 elementFound = True
1313 parent = rule.parentNode
1314 parent.removeChild(rule)
1315 if not parent.getElementsByTagName("rule"):
1316 parent.parentNode.removeChild(parent)
1317
1318 if elementFound:
1319 if passed_dom:
1320 return dom
1321 if use_cibadmin:
1322 utils.replace_cib_configuration(dom)
1323 if returnStatus:
1324 return True
1325 else:
1326 utils.err("Unable to find constraint - '%s'" % c_id, False)
1327 if returnStatus:
1328 return False
1329 sys.exit(1)
1330 return None
1331
1332
1333 def _split_set_constraints(
1334 constraints_dto: CibConstraintsDto,
1335 ) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1336 return (
1337 CibConstraintsDto(
1338 location=constraints_dto.location,
1339 colocation=constraints_dto.colocation,
1340 order=constraints_dto.order,
1341 ticket=constraints_dto.ticket,
1342 ),
1343 CibConstraintsDto(
1344 location_set=constraints_dto.location_set,
1345 colocation_set=constraints_dto.colocation_set,
1346 order_set=constraints_dto.order_set,
1347 ticket_set=constraints_dto.ticket_set,
1348 ),
1349 )
1350
1351
1352 def _find_constraints_containing_resource(
1353 resources_dto: CibResourcesDto,
1354 constraints_dto: CibConstraintsDto,
1355 resource_id: str,
1356 ) -> CibConstraintsDto:
1357 resources_filter = [resource_id]
1358 # Original implementation only included parent resource only if resource_id
1359 # was referring to a primitive resource, ignoring groups. This may change in
1360 # the future if necessary.
1361 if any(
1362 primitive_dto.id == resource_id
1363 for primitive_dto in resources_dto.primitives
1364 ):
1365 for clone_dto in resources_dto.clones:
1366 if clone_dto.member_id == resource_id:
1367 resources_filter.append(clone_dto.id)
1368 break
1369 return _filter_constraints_by_resources(
1370 constraints_dto, resources_filter, []
1371 )
1372
1373
1374 def ref(
1375 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1376 ) -> None:
1377 modifiers.ensure_only_supported("-f")
1378 if not argv:
1379 raise CmdLineInputError()
1380
1381 resources_dto = cast(
1382 CibResourcesDto, lib.resource.get_configured_resources()
1383 )
1384
1385 constraints_dto = cast(
1386 CibConstraintsDto,
1387 lib.constraint.get_config(evaluate_rules=False),
1388 )
1389
1390 for resource_id in sorted(set(argv)):
1391 constraint_ids = get_all_constraints_ids(
1392 _find_constraints_containing_resource(
1393 resources_dto, constraints_dto, resource_id
1394 )
1395 )
1396 print(f"Resource: {resource_id}")
1397 if constraint_ids:
1398 print(
1399 "\n".join(
1400 indent(
1401 sorted(constraint_ids),
1402 indent_step=INDENT_STEP,
1403 )
1404 )
1405 )
1406 else:
1407 print(" No Matches")
1408
1409
1410 def remove_constraints_containing(
1411 resource_id: str, output=False, constraints_element=None, passed_dom=None
1412 ):
1413 """
1414 Commandline options:
1415 * -f - CIB file, effective only if passed_dom is None
1416 """
1417 lib = utils.get_library_wrapper()
1418 modifiers = utils.get_input_modifiers()
1419 resources_dto = cast(
1420 CibResourcesDto, lib.resource.get_configured_resources()
1421 )
1422
1423 constraints_dto, set_constraints_dto = _split_set_constraints(
1424 cast(
1425 CibConstraintsDto,
1426 lib.constraint.get_config(evaluate_rules=False),
1427 )
1428 )
1429 constraints = sorted(
1430 get_all_constraints_ids(
1431 _find_constraints_containing_resource(
1432 resources_dto, constraints_dto, resource_id
1433 )
1434 )
1435 )
1436 set_constraints = sorted(
1437 get_all_constraints_ids(
1438 _find_constraints_containing_resource(
1439 resources_dto, set_constraints_dto, resource_id
1440 )
1441 )
1442 )
1443 for c in constraints:
1444 if output:
1445 print_to_stderr(f"Removing Constraint - {c}")
1446 if constraints_element is not None:
1447 constraint_rm(
1448 lib,
1449 [c],
1450 modifiers,
1451 True,
1452 constraints_element,
1453 passed_dom=passed_dom,
1454 )
1455 else:
1456 constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1457
1458 if set_constraints:
1459 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1460 for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1461 # If resource id is in a set, remove it from the set, if the set
1462 # is empty, then we remove the set, if the parent of the set
1463 # is empty then we remove it
1464 if set_c.getAttribute("id") == resource_id:
1465 parent_node = set_c.parentNode
1466 parent_node.removeChild(set_c)
1467 if output:
1468 print_to_stderr(
1469 "Removing {} from set {}".format(
1470 resource_id, parent_node.getAttribute("id")
1471 )
1472 )
1473 if parent_node.getElementsByTagName("resource_ref").length == 0:
1474 print_to_stderr(
1475 "Removing set {}".format(parent_node.getAttribute("id"))
1476 )
1477 parent_node_2 = parent_node.parentNode
1478 parent_node_2.removeChild(parent_node)
1479 if (
1480 parent_node_2.getElementsByTagName(
1481 "resource_set"
1482 ).length
1483 == 0
1484 ):
1485 parent_node_2.parentNode.removeChild(parent_node_2)
1486 print_to_stderr(
1487 "Removing constraint {}".format(
1488 parent_node_2.getAttribute("id")
1489 )
1490 )
1491 if passed_dom:
1492 return dom
1493 utils.replace_cib_configuration(dom)
1494 return None
1495
1496
1497 # Re-assign any constraints referencing a resource to its parent (a clone
1498 # or master)
1499 def constraint_resource_update(old_id, dom):
1500 """
1501 Commandline options: no options
1502 """
1503 new_id = None
1504 clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1505 if clone_ms_parent:
1506 new_id = clone_ms_parent.getAttribute("id")
1507
1508 if new_id:
1509 constraints = dom.getElementsByTagName("rsc_location")
1510 constraints += dom.getElementsByTagName("rsc_order")
1511 constraints += dom.getElementsByTagName("rsc_colocation")
1512 attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1513 for constraint in constraints:
1514 for attr in attrs_to_update:
1515 if constraint.getAttribute(attr) == old_id:
1516 constraint.setAttribute(attr, new_id)
1517 return dom
1518
1519
1520 def constraint_rule_add(lib, argv, modifiers):
1521 """
1522 Options:
1523 * -f - CIB file
1524 * --force - allow duplicate constraints
1525 """
1526 # deprecated in pacemaker 2, removed in pacemaker 3
1527 # added to pcs after 0.11.8
1528 # the whole command removed in pcs-0.12
1529 deprecation_warning(
1530 "The possibility of defining multiple rules in a single location "
1531 "constraint is deprecated and will be removed."
1532 )
1533 del lib
1534 modifiers.ensure_only_supported("-f", "--force")
1535 if not argv:
1536 raise CmdLineInputError()
1537 constraint_id = argv.pop(0)
1538 options, rule_argv = rule_utils.parse_argv(argv)
1539 try:
1540 # Parse the rule to see if we need to upgrade CIB schema. All errors
1541 # would be properly reported by a validator called bellow, so we can
1542 # safely ignore them here.
1543 parsed_rule = rule_utils.RuleParser().parse(
1544 rule_utils.TokenPreprocessor().run(rule_argv)
1545 )
1546 if rule_utils.has_node_attr_expr_with_type_integer(parsed_rule):
1547 utils.checkAndUpgradeCIB(
1548 const.PCMK_RULES_NODE_ATTR_EXPR_WITH_INT_TYPE_CIB_VERSION
1549 )
1550 except (rule_utils.ParserException, rule_utils.CibBuilderException):
1551 pass
1552 cib = utils.get_cib_dom()
1553 constraint = utils.dom_get_element_with_id(
1554 cib.getElementsByTagName("constraints")[0],
1555 "rsc_location",
1556 constraint_id,
1557 )
1558 if not constraint:
1559 utils.err("Unable to find constraint: " + constraint_id)
1560 rule_utils.dom_rule_add(
1561 constraint,
1562 options,
1563 rule_argv,
1564 utils.getValidateWithVersion(cib),
1565 )
1566 location_rule_check_duplicates(cib, constraint, modifiers.get("--force"))
1567 utils.replace_cib_configuration(cib)
1568