1 import sys
2 import xml.dom.minidom
3 from collections.abc import Iterable
4 from enum import Enum
5 from typing import Any, TypeVar, cast
6 from xml.dom.minidom import parseString
7
8 import pcs.cli.constraint_order.command as order_command
9 from pcs import utils
10 from pcs.cli.common import parse_args
11 from pcs.cli.common.errors import (
12 CmdLineInputError,
13 raise_command_replaced,
14 )
15 from pcs.cli.common.output import (
16 INDENT_STEP,
17 lines_to_str,
18 )
19 from pcs.cli.constraint.location.command import (
20 RESOURCE_TYPE_REGEXP,
21 RESOURCE_TYPE_RESOURCE,
22 )
23 from pcs.cli.constraint.output import (
24 CibConstraintLocationAnyDto,
25 filter_constraints_by_rule_expired_status,
26 location,
27 print_config,
28 )
29 from pcs.cli.reports import process_library_reports
30 from pcs.cli.reports.output import (
31 deprecation_warning,
32 print_to_stderr,
33 warn,
34 )
35 from pcs.common import (
36 const,
37 pacemaker,
38 reports,
39 )
40 from pcs.common.pacemaker.constraint import (
41 CibConstraintColocationSetDto,
42 CibConstraintLocationSetDto,
43 CibConstraintOrderSetDto,
44 CibConstraintsDto,
45 CibConstraintTicketSetDto,
46 get_all_constraints_ids,
47 )
48 from pcs.common.pacemaker.resource.list import CibResourcesDto
49 from pcs.common.pacemaker.types import CibResourceDiscovery
50 from pcs.common.reports import ReportItem
51 from pcs.common.str_tools import (
52 format_list,
53 indent,
54 )
55 from pcs.common.types import (
56 StringCollection,
57 StringIterable,
58 StringSequence,
59 )
60 from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
61 from pcs.lib.node import get_existing_nodes_names
62 from pcs.lib.pacemaker.values import (
63 SCORE_INFINITY,
64 is_true,
65 sanitize_id,
66 )
67
68 # pylint: disable=invalid-name
69 # pylint: disable=too-many-branches
70 # pylint: disable=too-many-lines
71 # pylint: disable=too-many-locals
72 # pylint: disable=too-many-statements
73
74 DEFAULT_ACTION = const.PCMK_ACTION_START
75 DEFAULT_ROLE = const.PCMK_ROLE_STARTED
76
77 OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
78
79 LOCATION_NODE_VALIDATION_SKIP_MSG = (
80 "Validation for node existence in the cluster will be skipped"
81 )
82 STANDALONE_SCORE_MSG = (
83 "Specifying score as a standalone value is deprecated and "
84 "might be removed in a future release, use score=value instead"
85 )
86
87
88 class CrmRuleReturnCode(Enum):
89 IN_EFFECT = 0
90 EXPIRED = 110
91 TO_BE_IN_EFFECT = 111
92
93
94 def constraint_order_cmd(lib, argv, modifiers):
95 sub_cmd = "config" if not argv else argv.pop(0)
96
97 try:
98 if sub_cmd == "set":
99 order_command.create_with_set(lib, argv, modifiers)
100 elif sub_cmd in ["remove", "delete"]:
101 order_rm(lib, argv, modifiers)
102 elif sub_cmd == "show":
103 raise_command_replaced(
104 ["pcs constraint order config"], pcs_version="0.12"
105 )
106 elif sub_cmd == "config":
107 order_command.config_cmd(lib, argv, modifiers)
108 else:
109 order_start(lib, [sub_cmd] + argv, modifiers)
110 except CmdLineInputError as e:
111 utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
112
113
114 def config_cmd(
115 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
116 ) -> None:
117 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
118 if argv:
119 raise CmdLineInputError()
120
121 print_config(
122 cast(
123 CibConstraintsDto,
124 lib.constraint.get_config(evaluate_rules=True),
125 ),
126 modifiers,
127 )
128
129
130 def _validate_constraint_resource(cib_dom, resource_id):
131 (
132 resource_valid,
133 resource_error,
134 dummy_correct_id,
135 ) = utils.validate_constraint_resource(cib_dom, resource_id)
136 if not resource_valid:
137 utils.err(resource_error)
138
139
140 def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
141 if not utils.validate_resources_not_in_same_group(
142 cib_dom, resource1, resource2
143 ):
144 utils.err(
145 "Cannot create an order constraint for resources in the same group"
146 )
147
148
149 # Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
150 # possible commands:
151 # <src> with <tgt> [score] [options]
152 # <src> with <role> <tgt> [score] [options]
153 # <role> <src> with <tgt> [score] [options]
154 # <role> <src> with <role> <tgt> [score] [options]
155 # Specifying score as a single argument is deprecated, though. The correct way
156 # is score=value in options.
157 def colocation_add(lib, argv, modifiers): # noqa: PLR0912, PLR0915
158 """
159 Options:
160 * -f - CIB file
161 * --force - allow constraint on any resource, allow duplicate constraints
162 """
163
164 def _parse_score_options(argv):
165 # When passed an array of arguments if the first argument doesn't have
166 # an '=' then it's the score, otherwise they're all arguments. Return a
167 # tuple with the score and array of name,value pairs
168 """
169 Commandline options: no options
170 """
171 if not argv:
172 return None, []
173 score = None
174 if "=" not in argv[0]:
175 score = argv.pop(0)
176 # TODO added to pcs in the first 0.12.x version
177 deprecation_warning(STANDALONE_SCORE_MSG)
178
179 # create a list of 2-tuples (name, value)
180 arg_array = [
181 parse_args.split_option(arg, allow_empty_value=False)
182 for arg in argv
183 ]
184 return score, arg_array
185
186 def _validate_and_prepare_role(new_roles_supported, role):
187 if role is None:
188 return ""
189 role_cleaned = role.lower().capitalize()
190 if role_cleaned not in const.PCMK_ROLES:
191 utils.err(
192 "invalid role value '{0}', allowed values are: {1}".format(
193 role, format_list(const.PCMK_ROLES)
194 )
195 )
196 return pacemaker.role.get_value_for_cib(
197 role_cleaned, new_roles_supported
198 )
199
200 del lib
201 modifiers.ensure_only_supported("-f", "--force")
202 if len(argv) < 3:
203 raise CmdLineInputError()
204
205 role1_candidate = None
206 role2_candidate = None
207
208 if argv[2] == "with":
209 role1_candidate = argv.pop(0)
210 resource1 = argv.pop(0)
211 elif argv[1] == "with":
212 resource1 = argv.pop(0)
213 else:
214 raise CmdLineInputError()
215
216 if argv.pop(0) != "with":
217 raise CmdLineInputError()
218 if "with" in argv:
219 raise CmdLineInputError(
220 message="Multiple 'with's cannot be specified.",
221 hint=(
222 "Use the 'pcs constraint colocation set' command if you want "
223 "to create a constraint for more than two resources."
224 ),
225 show_both_usage_and_message=True,
226 )
227
228 if not argv:
229 raise CmdLineInputError()
230 if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
231 resource2 = argv.pop(0)
232 else:
233 role2_candidate = argv.pop(0)
234 resource2 = argv.pop(0)
235
236 score, nv_pairs = _parse_score_options(argv)
237 influence_attr_set = any(name == "influence" for name, _ in nv_pairs)
238
239 cib_dom = (
240 utils.cluster_upgrade_to_version(
241 const.PCMK_COLOCATION_INFLUENCE_CIB_VERSION
242 )
243 if influence_attr_set
244 else utils.get_cib_dom()
245 )
246 new_roles_supported = utils.isCibVersionSatisfied(
247 cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
248 )
249
250 role1 = _validate_and_prepare_role(new_roles_supported, role1_candidate)
251 role2 = _validate_and_prepare_role(new_roles_supported, role2_candidate)
252 _validate_constraint_resource(cib_dom, resource1)
253 _validate_constraint_resource(cib_dom, resource2)
254
255 id_in_nvpairs = None
256 for name, value in nv_pairs:
257 if name == "id":
258 id_valid, id_error = utils.validate_xml_id(value, "constraint id")
259 if not id_valid:
260 utils.err(id_error)
261 if utils.does_id_exist(cib_dom, value):
262 utils.err(
263 "id '%s' is already in use, please specify another one"
264 % value
265 )
266 id_in_nvpairs = True
267 elif name == "score":
268 score = value
269 if score is None:
270 score = SCORE_INFINITY
271 if not id_in_nvpairs:
272 nv_pairs.append(
273 (
274 "id",
275 utils.find_unique_id(
276 cib_dom,
277 "colocation-%s-%s-%s" % (resource1, resource2, score),
278 ),
279 )
280 )
281
282 (dom, constraintsElement) = getCurrentConstraints(cib_dom)
283
284 # If one role is specified, the other should default to "started"
285 if role1 != "" and role2 == "":
286 role2 = DEFAULT_ROLE
287 if role2 != "" and role1 == "":
288 role1 = DEFAULT_ROLE
289 element = dom.createElement("rsc_colocation")
290 element.setAttribute("rsc", resource1)
291 element.setAttribute("with-rsc", resource2)
292 element.setAttribute("score", score)
293 if role1 != "":
294 element.setAttribute("rsc-role", role1)
295 if role2 != "":
296 element.setAttribute("with-rsc-role", role2)
297 for nv_pair in nv_pairs:
298 element.setAttribute(nv_pair[0], nv_pair[1])
299 if not modifiers.get("--force"):
300
301 def _constraint_export(constraint_info):
302 options_dict = constraint_info["options"]
303 co_resource1 = options_dict.get("rsc", "")
304 co_resource2 = options_dict.get("with-rsc", "")
305 co_id = options_dict.get("id", "")
306 co_score = options_dict.get("score", "")
307 score_text = "(score:" + co_score + ")"
308 console_option_list = [
309 f"({option[0]}:{option[1]})"
310 for option in sorted(options_dict.items())
311 if option[0] not in ("rsc", "with-rsc", "id", "score")
312 ]
313 console_option_list.append(f"(id:{co_id})")
314 return " ".join(
315 [co_resource1, "with", co_resource2, score_text]
316 + console_option_list
317 )
318
319 duplicates = colocation_find_duplicates(constraintsElement, element)
320 if duplicates:
321 utils.err(
322 "duplicate constraint already exists, use --force to override\n"
323 + "\n".join(
324 [
325 " "
326 + _constraint_export(
327 {"options": dict(dup.attributes.items())}
328 )
329 for dup in duplicates
330 ]
331 )
332 )
333 constraintsElement.appendChild(element)
334 utils.replace_cib_configuration(dom)
335
336
337 def colocation_find_duplicates(dom, constraint_el):
338 """
339 Commandline options: no options
340 """
341 new_roles_supported = utils.isCibVersionSatisfied(
342 dom, const.PCMK_NEW_ROLES_CIB_VERSION
343 )
344
345 def normalize(const_el):
346 return (
347 const_el.getAttribute("rsc"),
348 const_el.getAttribute("with-rsc"),
349 pacemaker.role.get_value_for_cib(
350 const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
351 new_roles_supported,
352 ),
353 pacemaker.role.get_value_for_cib(
354 const_el.getAttribute("with-rsc-role").capitalize()
355 or DEFAULT_ROLE,
356 new_roles_supported,
357 ),
358 )
359
360 normalized_el = normalize(constraint_el)
361 return [
362 other_el
363 for other_el in dom.getElementsByTagName("rsc_colocation")
364 if not other_el.getElementsByTagName("resource_set")
365 and constraint_el is not other_el
366 and normalized_el == normalize(other_el)
367 ]
368
369
370 def order_rm(lib, argv, modifiers):
371 """
372 Options:
373 * -f - CIB file
374 """
375 del lib
376 modifiers.ensure_only_supported("-f")
377 if not argv:
378 raise CmdLineInputError()
379
380 elementFound = False
381 (dom, constraintsElement) = getCurrentConstraints()
382
383 for resource in argv:
384 for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
385 if (
386 ord_loc.getAttribute("first") == resource
387 or ord_loc.getAttribute("then") == resource
388 ):
389 constraintsElement.removeChild(ord_loc)
390 elementFound = True
391
392 resource_refs_to_remove = []
393 for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
394 if ord_set.getAttribute("id") == resource:
395 resource_refs_to_remove.append(ord_set)
396 elementFound = True
397
398 for res_ref in resource_refs_to_remove:
399 res_set = res_ref.parentNode
400 res_order = res_set.parentNode
401
402 res_ref.parentNode.removeChild(res_ref)
403 if not res_set.getElementsByTagName("resource_ref"):
404 res_set.parentNode.removeChild(res_set)
405 if not res_order.getElementsByTagName("resource_set"):
406 res_order.parentNode.removeChild(res_order)
407
408 if elementFound:
409 utils.replace_cib_configuration(dom)
410 else:
411 utils.err("No matching resources found in ordering list")
412
413
414 def order_start(lib, argv, modifiers):
415 """
416 Options:
417 * -f - CIB file
418 * --force - allow constraint for any resource, allow duplicate constraints
419 """
420 del lib
421 modifiers.ensure_only_supported("-f", "--force")
422 if len(argv) < 3:
423 raise CmdLineInputError()
424
425 first_action = DEFAULT_ACTION
426 then_action = DEFAULT_ACTION
427 action = argv[0]
428 if action in const.PCMK_ACTIONS:
429 first_action = action
430 argv.pop(0)
431
432 resource1 = argv.pop(0)
433 if argv.pop(0) != "then":
434 raise CmdLineInputError()
435
436 if not argv:
437 raise CmdLineInputError()
438
439 action = argv[0]
440 if action in const.PCMK_ACTIONS:
441 then_action = action
442 argv.pop(0)
443
444 if not argv:
445 raise CmdLineInputError()
446 resource2 = argv.pop(0)
447
448 order_options = []
449 if argv:
450 order_options = order_options + argv[:]
451 if "then" in order_options:
452 raise CmdLineInputError(
453 message="Multiple 'then's cannot be specified.",
454 hint=(
455 "Use the 'pcs constraint order set' command if you want to "
456 "create a constraint for more than two resources."
457 ),
458 show_both_usage_and_message=True,
459 )
460
461 order_options.append("first-action=" + first_action)
462 order_options.append("then-action=" + then_action)
463 _order_add(resource1, resource2, order_options, modifiers)
464
465
466 def _order_add(resource1, resource2, options_list, modifiers): # noqa: PLR0912, PLR0915
467 """
468 Commandline options:
469 * -f - CIB file
470 * --force - allow constraint for any resource, allow duplicate constraints
471 """
472 cib_dom = utils.get_cib_dom()
473 _validate_constraint_resource(cib_dom, resource1)
474 _validate_constraint_resource(cib_dom, resource2)
475
476 _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
477
478 order_options = []
479 id_specified = False
480 sym = None
481 for arg in options_list:
482 if arg == "symmetrical":
483 sym = "true"
484 elif arg == "nonsymmetrical":
485 sym = "false"
486 else:
487 name, value = parse_args.split_option(arg, allow_empty_value=False)
488 if name == "id":
489 id_valid, id_error = utils.validate_xml_id(
490 value, "constraint id"
491 )
492 if not id_valid:
493 utils.err(id_error)
494 if utils.does_id_exist(cib_dom, value):
495 utils.err(
496 "id '%s' is already in use, please specify another one"
497 % value
498 )
499 id_specified = True
500 order_options.append((name, value))
501 elif name == "symmetrical":
502 if value.lower() in OPTIONS_SYMMETRICAL:
503 sym = value.lower()
504 else:
505 utils.err(
506 "invalid symmetrical value '%s', allowed values are: %s"
507 % (value, ", ".join(OPTIONS_SYMMETRICAL))
508 )
509 else:
510 order_options.append((name, value))
511 if sym:
512 order_options.append(("symmetrical", sym))
513
514 options = ""
515 if order_options:
516 options = " (Options: %s)" % " ".join(
517 [
518 "%s=%s" % (name, value)
519 for name, value in order_options
520 if name not in ("kind", "score")
521 ]
522 )
523
524 scorekind = "kind: Mandatory"
525 id_suffix = "mandatory"
526 for opt in order_options:
527 if opt[0] == "score":
528 scorekind = "score: " + opt[1]
529 id_suffix = opt[1]
530 # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
531 # added to pcs after 0.11.7
532 deprecation_warning(
533 reports.messages.DeprecatedOption(opt[0], []).message
534 )
535 break
536 if opt[0] == "kind":
537 scorekind = "kind: " + opt[1]
538 id_suffix = opt[1]
539 break
540
541 if not id_specified:
542 order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
543 order_id = utils.find_unique_id(cib_dom, order_id)
544 order_options.append(("id", order_id))
545
546 (dom, constraintsElement) = getCurrentConstraints()
547 element = dom.createElement("rsc_order")
548 element.setAttribute("first", resource1)
549 element.setAttribute("then", resource2)
550 for order_opt in order_options:
551 element.setAttribute(order_opt[0], order_opt[1])
552 constraintsElement.appendChild(element)
553 if not modifiers.get("--force"):
554
555 def _constraint_export(constraint_info):
556 options = constraint_info["options"]
557 oc_resource1 = options.get("first", "")
558 oc_resource2 = options.get("then", "")
559 first_action = options.get("first-action", "")
560 then_action = options.get("then-action", "")
561 oc_id = options.get("id", "")
562 oc_score = options.get("score", "")
563 oc_kind = options.get("kind", "")
564 oc_sym = ""
565 oc_id_out = ""
566 oc_options = ""
567 if "symmetrical" in options and not is_true(
568 options.get("symmetrical", "false")
569 ):
570 oc_sym = "(non-symmetrical)"
571 if oc_kind != "":
572 score_text = "(kind:" + oc_kind + ")"
573 elif oc_kind == "" and oc_score == "":
574 score_text = "(kind:Mandatory)"
575 else:
576 score_text = "(score:" + oc_score + ")"
577 oc_id_out = "(id:" + oc_id + ")"
578 already_processed_options = (
579 "first",
580 "then",
581 "first-action",
582 "then-action",
583 "id",
584 "score",
585 "kind",
586 "symmetrical",
587 )
588 oc_options = " ".join(
589 [
590 f"{name}={value}"
591 for name, value in options.items()
592 if name not in already_processed_options
593 ]
594 )
595 if oc_options:
596 oc_options = "(Options: " + oc_options + ")"
597 return " ".join(
598 [
599 arg
600 for arg in [
601 first_action,
602 oc_resource1,
603 "then",
604 then_action,
605 oc_resource2,
606 score_text,
607 oc_sym,
608 oc_options,
609 oc_id_out,
610 ]
611 if arg
612 ]
613 )
614
615 duplicates = order_find_duplicates(constraintsElement, element)
616 if duplicates:
617 utils.err(
618 "duplicate constraint already exists, use --force to override\n"
619 + "\n".join(
620 [
621 " "
622 + _constraint_export(
623 {"options": dict(dup.attributes.items())}
624 )
625 for dup in duplicates
626 ]
627 )
628 )
629 print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
630 utils.replace_cib_configuration(dom)
631
632
633 def order_find_duplicates(dom, constraint_el):
634 """
635 Commandline options: no options
636 """
637
638 def normalize(constraint_el):
639 return (
640 constraint_el.getAttribute("first"),
641 constraint_el.getAttribute("then"),
642 constraint_el.getAttribute("first-action").lower()
643 or DEFAULT_ACTION,
644 constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
645 )
646
647 normalized_el = normalize(constraint_el)
648 return [
649 other_el
650 for other_el in dom.getElementsByTagName("rsc_order")
651 if not other_el.getElementsByTagName("resource_set")
652 and constraint_el is not other_el
653 and normalized_el == normalize(other_el)
654 ]
655
656
657 _SetConstraint = TypeVar(
658 "_SetConstraint",
659 CibConstraintLocationSetDto,
660 CibConstraintColocationSetDto,
661 CibConstraintOrderSetDto,
662 CibConstraintTicketSetDto,
663 )
664
665
666 def _filter_set_constraints_by_resources(
667 constraints_dto: Iterable[_SetConstraint], resources: set[str]
668 ) -> list[_SetConstraint]:
669 return [
670 constraint_set_dto
671 for constraint_set_dto in constraints_dto
672 if any(
673 set(resource_set.resources_ids) & resources
674 for resource_set in constraint_set_dto.resource_sets
675 )
676 ]
677
678
679 def _filter_constraints_by_resources(
680 constraints_dto: CibConstraintsDto,
681 resources: StringIterable,
682 patterns: StringIterable,
683 ) -> CibConstraintsDto:
684 required_resources_set = set(resources)
685 required_patterns_set = set(patterns)
686 return CibConstraintsDto(
687 location=[
688 constraint_dto
689 for constraint_dto in constraints_dto.location
690 if (
691 constraint_dto.resource_id is not None
692 and constraint_dto.resource_id in required_resources_set
693 )
694 or (
695 constraint_dto.resource_pattern is not None
696 and constraint_dto.resource_pattern in required_patterns_set
697 )
698 ],
699 location_set=_filter_set_constraints_by_resources(
700 constraints_dto.location_set, required_resources_set
701 ),
702 colocation=[
703 constraint_dto
704 for constraint_dto in constraints_dto.colocation
705 if {constraint_dto.resource_id, constraint_dto.with_resource_id}
706 & required_resources_set
707 ],
708 colocation_set=_filter_set_constraints_by_resources(
709 constraints_dto.colocation_set, required_resources_set
710 ),
711 order=[
712 constraint_dto
713 for constraint_dto in constraints_dto.order
714 if {
715 constraint_dto.first_resource_id,
716 constraint_dto.then_resource_id,
717 }
718 & required_resources_set
719 ],
720 order_set=_filter_set_constraints_by_resources(
721 constraints_dto.order_set, required_resources_set
722 ),
723 ticket=[
724 constraint_dto
725 for constraint_dto in constraints_dto.ticket
726 if constraint_dto.resource_id in required_resources_set
727 ],
728 ticket_set=_filter_set_constraints_by_resources(
729 constraints_dto.ticket_set, required_resources_set
730 ),
731 )
732
733
734 def _filter_location_by_node_base(
735 constraint_dtos: Iterable[CibConstraintLocationAnyDto],
736 nodes: StringCollection,
737 ) -> list[CibConstraintLocationAnyDto]:
738 return [
739 constraint_dto
740 for constraint_dto in constraint_dtos
741 if constraint_dto.attributes.node is not None
742 and constraint_dto.attributes.node in nodes
743 ]
744
745
746 def location_config_cmd(
747 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
748 ) -> None:
749 """
750 Options:
751 * --all - print expired constraints
752 * --full - print all details
753 * -f - CIB file
754 """
755 modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
756 filter_type: str | None = None
757 filter_items: parse_args.Argv = []
758 if argv:
759 filter_type, *filter_items = argv
760 allowed_types = ("resources", "nodes")
761 if filter_type not in allowed_types:
762 raise CmdLineInputError(
763 f"Unknown keyword '{filter_type}'. Allowed keywords: "
764 f"{format_list(allowed_types)}"
765 )
766 if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
767 raise CmdLineInputError(
768 "Output formats other than 'text' are not supported together "
769 "with grouping and filtering by nodes or resources"
770 )
771
772 constraints_dto = filter_constraints_by_rule_expired_status(
773 lib.constraint.get_config(evaluate_rules=True),
774 modifiers.is_specified("--all"),
775 )
776
777 constraints_dto = CibConstraintsDto(
778 location=constraints_dto.location,
779 location_set=constraints_dto.location_set,
780 )
781
782 def _print_lines(lines: StringSequence) -> None:
783 if lines:
784 print("Location Constraints:")
785 print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
786
787 if filter_type == "resources":
788 if filter_items:
789 resources = []
790 patterns = []
791 for item in filter_items:
792 item_type, item_value = parse_args.parse_typed_arg(
793 item,
794 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
795 RESOURCE_TYPE_RESOURCE,
796 )
797 if item_type == RESOURCE_TYPE_RESOURCE:
798 resources.append(item_value)
799 elif item_type == RESOURCE_TYPE_REGEXP:
800 patterns.append(item_value)
801 constraints_dto = _filter_constraints_by_resources(
802 constraints_dto, resources, patterns
803 )
804 _print_lines(
805 location.constraints_to_grouped_by_resource_text(
806 constraints_dto.location,
807 modifiers.is_specified("--full"),
808 )
809 )
810 return
811 if filter_type == "nodes":
812 if filter_items:
813 constraints_dto = CibConstraintsDto(
814 location=_filter_location_by_node_base(
815 constraints_dto.location, filter_items
816 ),
817 location_set=_filter_location_by_node_base(
818 constraints_dto.location_set, filter_items
819 ),
820 )
821 _print_lines(
822 location.constraints_to_grouped_by_node_text(
823 constraints_dto.location,
824 modifiers.is_specified("--full"),
825 )
826 )
827 return
828
829 print_config(constraints_dto, modifiers)
830
831
832 def _verify_node_name(node, existing_nodes):
833 report_list = []
834 if node not in existing_nodes:
835 report_list.append(
836 ReportItem.error(
837 reports.messages.NodeNotFound(node),
838 force_code=reports.codes.FORCE,
839 )
840 )
841 return report_list
842
843
844 def _verify_score(score):
845 if not utils.is_score(score):
846 utils.err(
847 "invalid score '%s', use integer or INFINITY or -INFINITY" % score
848 )
849
850
851 def location_prefer( # noqa: PLR0912
852 lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
853 ) -> None:
854 """
855 Options:
856 * --force - allow unknown options, allow constraint for any resource type
857 * -f - CIB file
858 """
859 modifiers.ensure_only_supported("--force", "-f")
860 rsc = argv.pop(0)
861 prefer_option = argv.pop(0)
862
863 dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
864 rsc,
865 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
866 RESOURCE_TYPE_RESOURCE,
867 )
868
869 if prefer_option == "prefers":
870 prefer = True
871 elif prefer_option == "avoids":
872 prefer = False
873 else:
874 raise CmdLineInputError()
875
876 skip_node_check = False
877 existing_nodes: list[str] = []
878 if modifiers.is_specified("-f") or modifiers.get("--force"):
879 skip_node_check = True
880 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
881 else:
882 lib_env = utils.get_lib_env()
883 existing_nodes, report_list = get_existing_nodes_names(
884 corosync_conf=lib_env.get_corosync_conf(),
885 cib=lib_env.get_cib(),
886 )
887 if report_list:
888 process_library_reports(report_list)
889
890 report_list = []
891 parameters_list = []
892 for nodeconf in argv:
893 nodeconf_a = nodeconf.split("=", 1)
894 node = nodeconf_a[0]
895 if not skip_node_check:
896 report_list += _verify_node_name(node, existing_nodes)
897 if len(nodeconf_a) == 1:
898 score = "INFINITY" if prefer else "-INFINITY"
899 else:
900 score = nodeconf_a[1]
901 _verify_score(score)
902 if not prefer:
903 score = score[1:] if score[0] == "-" else "-" + score
904
905 parameters_list.append(
906 [
907 sanitize_id(f"location-{rsc_value}-{node}-{score}"),
908 rsc,
909 node,
910 f"score={score}",
911 ]
912 )
913
914 if report_list:
915 process_library_reports(report_list)
916
917 modifiers = modifiers.get_subset("--force", "-f")
918
919 for parameters in parameters_list:
920 location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
921
922
923 def location_add( # noqa: PLR0912, PLR0915
924 lib: Any,
925 argv: parse_args.Argv,
926 modifiers: parse_args.InputModifiers,
927 skip_score_and_node_check: bool = False,
928 ) -> None:
929 """
930 Options:
931 * --force - allow unknown options, allow constraint for any resource type
932 * -f - CIB file
933 """
934 del lib
935 modifiers.ensure_only_supported("--force", "-f")
936 if len(argv) < 4:
937 raise CmdLineInputError()
938
939 constraint_id = argv.pop(0)
940 rsc_type, rsc_value = parse_args.parse_typed_arg(
941 argv.pop(0),
942 [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
943 RESOURCE_TYPE_RESOURCE,
944 )
945 node = argv.pop(0)
946 score = None
947 if "=" not in argv[0]:
948 score = argv.pop(0)
949 # TODO added to pcs in the first 0.12.x version
950 deprecation_warning(STANDALONE_SCORE_MSG)
951 options = []
952 # For now we only allow setting resource-discovery and score
953 for arg in argv:
954 name, value = parse_args.split_option(arg, allow_empty_value=False)
955 if name == "score":
956 score = value
957 elif name == "resource-discovery":
958 if not modifiers.get("--force"):
959 allowed_discovery = list(
960 map(
961 str,
962 [
963 CibResourceDiscovery.ALWAYS,
964 CibResourceDiscovery.EXCLUSIVE,
965 CibResourceDiscovery.NEVER,
966 ],
967 )
968 )
969 if value not in allowed_discovery:
970 utils.err(
971 (
972 "invalid {0} value '{1}', allowed values are: {2}"
973 ", use --force to override"
974 ).format(name, value, format_list(allowed_discovery))
975 )
976 options.append([name, value])
977 elif modifiers.get("--force"):
978 options.append([name, value])
979 else:
980 utils.err("bad option '%s', use --force to override" % name)
981 if score is None:
982 score = "INFINITY"
983
984 # Verify that specified node exists in the cluster and score is valid
985 if not skip_score_and_node_check:
986 if modifiers.is_specified("-f") or modifiers.get("--force"):
987 warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
988 else:
989 lib_env = utils.get_lib_env()
990 existing_nodes, report_list = get_existing_nodes_names(
991 corosync_conf=lib_env.get_corosync_conf(),
992 cib=lib_env.get_cib(),
993 )
994 report_list += _verify_node_name(node, existing_nodes)
995 if report_list:
996 process_library_reports(report_list)
997 _verify_score(score)
998
999 id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
1000 if not id_valid:
1001 utils.err(id_error)
1002
1003 dom = utils.get_cib_dom()
1004
1005 if rsc_type == RESOURCE_TYPE_RESOURCE:
1006 (
1007 rsc_valid,
1008 rsc_error,
1009 dummy_correct_id,
1010 ) = utils.validate_constraint_resource(dom, rsc_value)
1011 if not rsc_valid:
1012 utils.err(rsc_error)
1013
1014 # Verify current constraint doesn't already exist
1015 # If it does we replace it with the new constraint
1016 dummy_dom, constraintsElement = getCurrentConstraints(dom)
1017 # If the id matches, or the rsc & node match, then we replace/remove
1018 elementsToRemove = [
1019 rsc_loc
1020 for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1021 # pylint: disable=too-many-boolean-expressions
1022 if rsc_loc.getAttribute("id") == constraint_id
1023 or (
1024 rsc_loc.getAttribute("node") == node
1025 and (
1026 (
1027 rsc_type == RESOURCE_TYPE_RESOURCE
1028 and rsc_loc.getAttribute("rsc") == rsc_value
1029 )
1030 or (
1031 rsc_type == RESOURCE_TYPE_REGEXP
1032 and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1033 )
1034 )
1035 )
1036 ]
1037 for etr in elementsToRemove:
1038 constraintsElement.removeChild(etr)
1039
1040 element = dom.createElement("rsc_location")
1041 element.setAttribute("id", constraint_id)
1042 if rsc_type == RESOURCE_TYPE_RESOURCE:
1043 element.setAttribute("rsc", rsc_value)
1044 elif rsc_type == RESOURCE_TYPE_REGEXP:
1045 element.setAttribute("rsc-pattern", rsc_value)
1046 element.setAttribute("node", node)
1047 element.setAttribute("score", score)
1048 for option in options:
1049 element.setAttribute(option[0], option[1])
1050 constraintsElement.appendChild(element)
1051
1052 utils.replace_cib_configuration(dom)
1053
1054
1055 # Grabs the current constraints and returns the dom and constraint element
1056 def getCurrentConstraints(passed_dom=None):
1057 """
1058 Commandline options:
1059 * -f - CIB file, only if passed_dom is None
1060 """
1061 if passed_dom:
1062 dom = passed_dom
1063 else:
1064 current_constraints_xml = utils.get_cib_xpath("//constraints")
1065 if current_constraints_xml == "":
1066 utils.err("unable to process cib")
1067 # Verify current constraint doesn't already exist
1068 # If it does we replace it with the new constraint
|
CID (unavailable; MK=42e4d47d6c846e21a3e7a16e47445de3) (#1 of 1): XML external entity processing enabled (SIGMA.xml_external_entity_enabled): |
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
1069 dom = parseString(current_constraints_xml)
1070
1071 constraintsElement = dom.getElementsByTagName("constraints")[0]
1072 return (dom, constraintsElement)
1073
1074
1075 # If returnStatus is set, then we don't error out, we just print the error
1076 # and return false
1077 def constraint_rm( # noqa: PLR0912
1078 lib,
1079 argv,
1080 modifiers,
1081 returnStatus=False,
1082 constraintsElement=None,
1083 passed_dom=None,
1084 ):
1085 """
1086 Options:
1087 * -f - CIB file, effective only if passed_dom is None
1088 """
1089 if passed_dom is None:
1090 modifiers.ensure_only_supported("-f")
1091 if not argv:
1092 raise CmdLineInputError()
1093
1094 bad_constraint = False
1095 if len(argv) != 1:
1096 for arg in argv:
1097 if not constraint_rm(
1098 lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1099 ):
1100 bad_constraint = True
1101 if bad_constraint:
1102 sys.exit(1)
1103 return None
1104
1105 c_id = argv.pop(0)
1106 elementFound = False
1107 dom = None
1108 use_cibadmin = False
1109 if not constraintsElement:
1110 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1111 use_cibadmin = True
1112
1113 for co in constraintsElement.childNodes[:]:
1114 if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1115 continue
1116 if co.getAttribute("id") == c_id:
1117 constraintsElement.removeChild(co)
1118 elementFound = True
1119
1120 if not elementFound:
1121 for rule in constraintsElement.getElementsByTagName("rule")[:]:
1122 if rule.getAttribute("id") == c_id:
1123 elementFound = True
1124 parent = rule.parentNode
1125 parent.removeChild(rule)
1126 if not parent.getElementsByTagName("rule"):
1127 parent.parentNode.removeChild(parent)
1128
1129 if elementFound:
1130 if passed_dom:
1131 return dom
1132 if use_cibadmin:
1133 utils.replace_cib_configuration(dom)
1134 if returnStatus:
1135 return True
1136 else:
1137 utils.err("Unable to find constraint - '%s'" % c_id, False)
1138 if returnStatus:
1139 return False
1140 sys.exit(1)
1141 return None
1142
1143
1144 def _split_set_constraints(
1145 constraints_dto: CibConstraintsDto,
1146 ) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1147 return (
1148 CibConstraintsDto(
1149 location=constraints_dto.location,
1150 colocation=constraints_dto.colocation,
1151 order=constraints_dto.order,
1152 ticket=constraints_dto.ticket,
1153 ),
1154 CibConstraintsDto(
1155 location_set=constraints_dto.location_set,
1156 colocation_set=constraints_dto.colocation_set,
1157 order_set=constraints_dto.order_set,
1158 ticket_set=constraints_dto.ticket_set,
1159 ),
1160 )
1161
1162
1163 def _find_constraints_containing_resource(
1164 resources_dto: CibResourcesDto,
1165 constraints_dto: CibConstraintsDto,
1166 resource_id: str,
1167 ) -> CibConstraintsDto:
1168 resources_filter = [resource_id]
1169 # Original implementation only included parent resource only if resource_id
1170 # was referring to a primitive resource, ignoring groups. This may change in
1171 # the future if necessary.
1172 if any(
1173 primitive_dto.id == resource_id
1174 for primitive_dto in resources_dto.primitives
1175 ):
1176 for clone_dto in resources_dto.clones:
1177 if clone_dto.member_id == resource_id:
1178 resources_filter.append(clone_dto.id)
1179 break
1180 return _filter_constraints_by_resources(
1181 constraints_dto, resources_filter, []
1182 )
1183
1184
1185 def ref(
1186 lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1187 ) -> None:
1188 modifiers.ensure_only_supported("-f")
1189 if not argv:
1190 raise CmdLineInputError()
1191
1192 resources_dto = cast(
1193 CibResourcesDto, lib.resource.get_configured_resources()
1194 )
1195
1196 constraints_dto = cast(
1197 CibConstraintsDto,
1198 lib.constraint.get_config(evaluate_rules=False),
1199 )
1200
1201 for resource_id in sorted(set(argv)):
1202 constraint_ids = get_all_constraints_ids(
1203 _find_constraints_containing_resource(
1204 resources_dto, constraints_dto, resource_id
1205 )
1206 )
1207 print(f"Resource: {resource_id}")
1208 if constraint_ids:
1209 print(
1210 "\n".join(
1211 indent(
1212 sorted(constraint_ids),
1213 indent_step=INDENT_STEP,
1214 )
1215 )
1216 )
1217 else:
1218 print(" No Matches")
1219
1220
1221 def remove_constraints_containing(
1222 resource_id: str, output=False, constraints_element=None, passed_dom=None
1223 ):
1224 """
1225 Commandline options:
1226 * -f - CIB file, effective only if passed_dom is None
1227 """
1228 lib = utils.get_library_wrapper()
1229 modifiers = utils.get_input_modifiers()
1230 resources_dto = cast(
1231 CibResourcesDto, lib.resource.get_configured_resources()
1232 )
1233
1234 constraints_dto, set_constraints_dto = _split_set_constraints(
1235 cast(
1236 CibConstraintsDto,
1237 lib.constraint.get_config(evaluate_rules=False),
1238 )
1239 )
1240 constraints = sorted(
1241 get_all_constraints_ids(
1242 _find_constraints_containing_resource(
1243 resources_dto, constraints_dto, resource_id
1244 )
1245 )
1246 )
1247 set_constraints = sorted(
1248 get_all_constraints_ids(
1249 _find_constraints_containing_resource(
1250 resources_dto, set_constraints_dto, resource_id
1251 )
1252 )
1253 )
1254 for c in constraints:
1255 if output:
1256 print_to_stderr(f"Removing Constraint - {c}")
1257 if constraints_element is not None:
1258 constraint_rm(
1259 lib,
1260 [c],
1261 modifiers,
1262 True,
1263 constraints_element,
1264 passed_dom=passed_dom,
1265 )
1266 else:
1267 constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1268
1269 if set_constraints:
1270 (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1271 for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1272 # If resource id is in a set, remove it from the set, if the set
1273 # is empty, then we remove the set, if the parent of the set
1274 # is empty then we remove it
1275 if set_c.getAttribute("id") == resource_id:
1276 parent_node = set_c.parentNode
1277 parent_node.removeChild(set_c)
1278 if output:
1279 print_to_stderr(
1280 "Removing {} from set {}".format(
1281 resource_id, parent_node.getAttribute("id")
1282 )
1283 )
1284 if parent_node.getElementsByTagName("resource_ref").length == 0:
1285 print_to_stderr(
1286 "Removing set {}".format(parent_node.getAttribute("id"))
1287 )
1288 parent_node_2 = parent_node.parentNode
1289 parent_node_2.removeChild(parent_node)
1290 if (
1291 parent_node_2.getElementsByTagName(
1292 "resource_set"
1293 ).length
1294 == 0
1295 ):
1296 parent_node_2.parentNode.removeChild(parent_node_2)
1297 print_to_stderr(
1298 "Removing constraint {}".format(
1299 parent_node_2.getAttribute("id")
1300 )
1301 )
1302 if passed_dom:
1303 return dom
1304 utils.replace_cib_configuration(dom)
1305 return None
1306
1307
1308 # Re-assign any constraints referencing a resource to its parent (a clone
1309 # or master)
1310 def constraint_resource_update(old_id, dom):
1311 """
1312 Commandline options: no options
1313 """
1314 new_id = None
1315 clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1316 if clone_ms_parent:
1317 new_id = clone_ms_parent.getAttribute("id")
1318
1319 if new_id:
1320 constraints = dom.getElementsByTagName("rsc_location")
1321 constraints += dom.getElementsByTagName("rsc_order")
1322 constraints += dom.getElementsByTagName("rsc_colocation")
1323 attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1324 for constraint in constraints:
1325 for attr in attrs_to_update:
1326 if constraint.getAttribute(attr) == old_id:
1327 constraint.setAttribute(attr, new_id)
1328 return dom
1329