1    	import sys
2    	import xml.dom.minidom
3    	from enum import Enum
4    	from typing import (
5    	    Any,
6    	    Iterable,
7    	    Optional,
8    	    Set,
9    	    TypeVar,
10   	    cast,
11   	)
12   	from xml.dom.minidom import parseString
13   	
14   	import pcs.cli.constraint_order.command as order_command
15   	from pcs import utils
16   	from pcs.cli.common import parse_args
17   	from pcs.cli.common.errors import (
18   	    CmdLineInputError,
19   	    raise_command_replaced,
20   	)
21   	from pcs.cli.common.output import (
22   	    INDENT_STEP,
23   	    lines_to_str,
24   	)
25   	from pcs.cli.constraint.location.command import (
26   	    RESOURCE_TYPE_REGEXP,
27   	    RESOURCE_TYPE_RESOURCE,
28   	)
29   	from pcs.cli.constraint.output import (
30   	    CibConstraintLocationAnyDto,
31   	    filter_constraints_by_rule_expired_status,
32   	    location,
33   	    print_config,
34   	)
35   	from pcs.cli.reports import process_library_reports
36   	from pcs.cli.reports.output import (
37   	    deprecation_warning,
38   	    print_to_stderr,
39   	    warn,
40   	)
41   	from pcs.common import (
42   	    const,
43   	    pacemaker,
44   	    reports,
45   	)
46   	from pcs.common.pacemaker.constraint import (
47   	    CibConstraintColocationSetDto,
48   	    CibConstraintLocationSetDto,
49   	    CibConstraintOrderSetDto,
50   	    CibConstraintsDto,
51   	    CibConstraintTicketSetDto,
52   	    get_all_constraints_ids,
53   	)
54   	from pcs.common.pacemaker.resource.list import CibResourcesDto
55   	from pcs.common.pacemaker.types import CibResourceDiscovery
56   	from pcs.common.reports import ReportItem
57   	from pcs.common.str_tools import (
58   	    format_list,
59   	    indent,
60   	)
61   	from pcs.common.types import (
62   	    StringCollection,
63   	    StringIterable,
64   	    StringSequence,
65   	)
66   	from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
67   	from pcs.lib.node import get_existing_nodes_names
68   	from pcs.lib.pacemaker.values import (
69   	    SCORE_INFINITY,
70   	    is_true,
71   	    sanitize_id,
72   	)
73   	
74   	# pylint: disable=invalid-name
75   	# pylint: disable=too-many-branches
76   	# pylint: disable=too-many-lines
77   	# pylint: disable=too-many-locals
78   	# pylint: disable=too-many-statements
79   	
80   	DEFAULT_ACTION = const.PCMK_ACTION_START
81   	DEFAULT_ROLE = const.PCMK_ROLE_STARTED
82   	
83   	OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
84   	
85   	LOCATION_NODE_VALIDATION_SKIP_MSG = (
86   	    "Validation for node existence in the cluster will be skipped"
87   	)
88   	STANDALONE_SCORE_MSG = (
89   	    "Specifying score as a standalone value is deprecated and "
90   	    "might be removed in a future release, use score=value instead"
91   	)
92   	
93   	
94   	class CrmRuleReturnCode(Enum):
95   	    IN_EFFECT = 0
96   	    EXPIRED = 110
97   	    TO_BE_IN_EFFECT = 111
98   	
99   	
100  	def constraint_order_cmd(lib, argv, modifiers):
101  	    sub_cmd = "config" if not argv else argv.pop(0)
102  	
103  	    try:
104  	        if sub_cmd == "set":
105  	            order_command.create_with_set(lib, argv, modifiers)
106  	        elif sub_cmd in ["remove", "delete"]:
107  	            order_rm(lib, argv, modifiers)
108  	        elif sub_cmd == "show":
109  	            raise_command_replaced(
110  	                ["pcs constraint order config"], pcs_version="0.12"
111  	            )
112  	        elif sub_cmd == "config":
113  	            order_command.config_cmd(lib, argv, modifiers)
114  	        else:
115  	            order_start(lib, [sub_cmd] + argv, modifiers)
116  	    except CmdLineInputError as e:
117  	        utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
118  	
119  	
120  	def config_cmd(
121  	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
122  	) -> None:
123  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
124  	    if argv:
125  	        raise CmdLineInputError()
126  	
127  	    print_config(
128  	        cast(
129  	            CibConstraintsDto,
130  	            lib.constraint.get_config(evaluate_rules=True),
131  	        ),
132  	        modifiers,
133  	    )
134  	
135  	
136  	def _validate_constraint_resource(cib_dom, resource_id):
137  	    (
138  	        resource_valid,
139  	        resource_error,
140  	        dummy_correct_id,
141  	    ) = utils.validate_constraint_resource(cib_dom, resource_id)
142  	    if not resource_valid:
143  	        utils.err(resource_error)
144  	
145  	
146  	def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
147  	    if not utils.validate_resources_not_in_same_group(
148  	        cib_dom, resource1, resource2
149  	    ):
150  	        utils.err(
151  	            "Cannot create an order constraint for resources in the same group"
152  	        )
153  	
154  	
155  	# Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
156  	# possible commands:
157  	#        <src> with        <tgt> [score] [options]
158  	#        <src> with <role> <tgt> [score] [options]
159  	# <role> <src> with        <tgt> [score] [options]
160  	# <role> <src> with <role> <tgt> [score] [options]
161  	# Specifying score as a single argument is deprecated, though. The correct way
162  	# is score=value in options.
163  	def colocation_add(lib, argv, modifiers):  # noqa: PLR0912, PLR0915
164  	    """
165  	    Options:
166  	      * -f - CIB file
167  	      * --force - allow constraint on any resource, allow duplicate constraints
168  	    """
169  	
170  	    def _parse_score_options(argv):
171  	        # When passed an array of arguments if the first argument doesn't have
172  	        # an '=' then it's the score, otherwise they're all arguments. Return a
173  	        # tuple with the score and array of name,value pairs
174  	        """
175  	        Commandline options: no options
176  	        """
177  	        if not argv:
178  	            return None, []
179  	        score = None
180  	        if "=" not in argv[0]:
181  	            score = argv.pop(0)
182  	            # TODO added to pcs in the first 0.12.x version
183  	            deprecation_warning(STANDALONE_SCORE_MSG)
184  	
185  	        # create a list of 2-tuples (name, value)
186  	        arg_array = [
187  	            parse_args.split_option(arg, allow_empty_value=False)
188  	            for arg in argv
189  	        ]
190  	        return score, arg_array
191  	
192  	    del lib
193  	    modifiers.ensure_only_supported("-f", "--force")
194  	    if len(argv) < 3:
195  	        raise CmdLineInputError()
196  	
197  	    role1 = ""
198  	    role2 = ""
199  	
200  	    cib_dom = utils.get_cib_dom()
201  	    new_roles_supported = utils.isCibVersionSatisfied(
202  	        cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
203  	    )
204  	
205  	    def _validate_and_prepare_role(role):
206  	        role_cleaned = role.lower().capitalize()
207  	        if role_cleaned not in const.PCMK_ROLES:
208  	            utils.err(
209  	                "invalid role value '{0}', allowed values are: {1}".format(
210  	                    role, format_list(const.PCMK_ROLES)
211  	                )
212  	            )
213  	        return pacemaker.role.get_value_for_cib(
214  	            role_cleaned, new_roles_supported
215  	        )
216  	
217  	    if argv[2] == "with":
218  	        role1 = _validate_and_prepare_role(argv.pop(0))
219  	        resource1 = argv.pop(0)
220  	    elif argv[1] == "with":
221  	        resource1 = argv.pop(0)
222  	    else:
223  	        raise CmdLineInputError()
224  	
225  	    if argv.pop(0) != "with":
226  	        raise CmdLineInputError()
227  	    if "with" in argv:
228  	        raise CmdLineInputError(
229  	            message="Multiple 'with's cannot be specified.",
230  	            hint=(
231  	                "Use the 'pcs constraint colocation set' command if you want "
232  	                "to create a constraint for more than two resources."
233  	            ),
234  	            show_both_usage_and_message=True,
235  	        )
236  	
237  	    if not argv:
238  	        raise CmdLineInputError()
239  	    if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
240  	        resource2 = argv.pop(0)
241  	    else:
242  	        role2 = _validate_and_prepare_role(argv.pop(0))
243  	        resource2 = argv.pop(0)
244  	
245  	    score, nv_pairs = _parse_score_options(argv)
246  	
247  	    _validate_constraint_resource(cib_dom, resource1)
248  	    _validate_constraint_resource(cib_dom, resource2)
249  	
250  	    id_in_nvpairs = None
251  	    for name, value in nv_pairs:
252  	        if name == "id":
253  	            id_valid, id_error = utils.validate_xml_id(value, "constraint id")
254  	            if not id_valid:
255  	                utils.err(id_error)
256  	            if utils.does_id_exist(cib_dom, value):
257  	                utils.err(
258  	                    "id '%s' is already in use, please specify another one"
259  	                    % value
260  	                )
261  	            id_in_nvpairs = True
262  	        elif name == "score":
263  	            score = value
264  	    if score is None:
265  	        score = SCORE_INFINITY
266  	    if not id_in_nvpairs:
267  	        nv_pairs.append(
268  	            (
269  	                "id",
270  	                utils.find_unique_id(
271  	                    cib_dom,
272  	                    "colocation-%s-%s-%s" % (resource1, resource2, score),
273  	                ),
274  	            )
275  	        )
276  	
277  	    (dom, constraintsElement) = getCurrentConstraints(cib_dom)
278  	
279  	    # If one role is specified, the other should default to "started"
280  	    if role1 != "" and role2 == "":
281  	        role2 = DEFAULT_ROLE
282  	    if role2 != "" and role1 == "":
283  	        role1 = DEFAULT_ROLE
284  	    element = dom.createElement("rsc_colocation")
285  	    element.setAttribute("rsc", resource1)
286  	    element.setAttribute("with-rsc", resource2)
287  	    element.setAttribute("score", score)
288  	    if role1 != "":
289  	        element.setAttribute("rsc-role", role1)
290  	    if role2 != "":
291  	        element.setAttribute("with-rsc-role", role2)
292  	    for nv_pair in nv_pairs:
293  	        element.setAttribute(nv_pair[0], nv_pair[1])
294  	    if not modifiers.get("--force"):
295  	
296  	        def _constraint_export(constraint_info):
297  	            options_dict = constraint_info["options"]
298  	            co_resource1 = options_dict.get("rsc", "")
299  	            co_resource2 = options_dict.get("with-rsc", "")
300  	            co_id = options_dict.get("id", "")
301  	            co_score = options_dict.get("score", "")
302  	            score_text = "(score:" + co_score + ")"
303  	            console_option_list = [
304  	                f"({option[0]}:{option[1]})"
305  	                for option in sorted(options_dict.items())
306  	                if option[0] not in ("rsc", "with-rsc", "id", "score")
307  	            ]
308  	            console_option_list.append(f"(id:{co_id})")
309  	            return " ".join(
310  	                [co_resource1, "with", co_resource2, score_text]
311  	                + console_option_list
312  	            )
313  	
314  	        duplicates = colocation_find_duplicates(constraintsElement, element)
315  	        if duplicates:
316  	            utils.err(
317  	                "duplicate constraint already exists, use --force to override\n"
318  	                + "\n".join(
319  	                    [
320  	                        "  "
321  	                        + _constraint_export(
322  	                            {"options": dict(dup.attributes.items())}
323  	                        )
324  	                        for dup in duplicates
325  	                    ]
326  	                )
327  	            )
328  	    constraintsElement.appendChild(element)
329  	    utils.replace_cib_configuration(dom)
330  	
331  	
332  	def colocation_find_duplicates(dom, constraint_el):
333  	    """
334  	    Commandline options: no options
335  	    """
336  	    new_roles_supported = utils.isCibVersionSatisfied(
337  	        dom, const.PCMK_NEW_ROLES_CIB_VERSION
338  	    )
339  	
340  	    def normalize(const_el):
341  	        return (
342  	            const_el.getAttribute("rsc"),
343  	            const_el.getAttribute("with-rsc"),
344  	            pacemaker.role.get_value_for_cib(
345  	                const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
346  	                new_roles_supported,
347  	            ),
348  	            pacemaker.role.get_value_for_cib(
349  	                const_el.getAttribute("with-rsc-role").capitalize()
350  	                or DEFAULT_ROLE,
351  	                new_roles_supported,
352  	            ),
353  	        )
354  	
355  	    normalized_el = normalize(constraint_el)
356  	    return [
357  	        other_el
358  	        for other_el in dom.getElementsByTagName("rsc_colocation")
359  	        if not other_el.getElementsByTagName("resource_set")
360  	        and constraint_el is not other_el
361  	        and normalized_el == normalize(other_el)
362  	    ]
363  	
364  	
365  	def order_rm(lib, argv, modifiers):
366  	    """
367  	    Options:
368  	      * -f - CIB file
369  	    """
370  	    del lib
371  	    modifiers.ensure_only_supported("-f")
372  	    if not argv:
373  	        raise CmdLineInputError()
374  	
375  	    elementFound = False
376  	    (dom, constraintsElement) = getCurrentConstraints()
377  	
378  	    for resource in argv:
379  	        for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
380  	            if (
381  	                ord_loc.getAttribute("first") == resource
382  	                or ord_loc.getAttribute("then") == resource
383  	            ):
384  	                constraintsElement.removeChild(ord_loc)
385  	                elementFound = True
386  	
387  	        resource_refs_to_remove = []
388  	        for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
389  	            if ord_set.getAttribute("id") == resource:
390  	                resource_refs_to_remove.append(ord_set)
391  	                elementFound = True
392  	
393  	        for res_ref in resource_refs_to_remove:
394  	            res_set = res_ref.parentNode
395  	            res_order = res_set.parentNode
396  	
397  	            res_ref.parentNode.removeChild(res_ref)
398  	            if not res_set.getElementsByTagName("resource_ref"):
399  	                res_set.parentNode.removeChild(res_set)
400  	                if not res_order.getElementsByTagName("resource_set"):
401  	                    res_order.parentNode.removeChild(res_order)
402  	
403  	    if elementFound:
404  	        utils.replace_cib_configuration(dom)
405  	    else:
406  	        utils.err("No matching resources found in ordering list")
407  	
408  	
409  	def order_start(lib, argv, modifiers):
410  	    """
411  	    Options:
412  	      * -f - CIB file
413  	      * --force - allow constraint for any resource, allow duplicate constraints
414  	    """
415  	    del lib
416  	    modifiers.ensure_only_supported("-f", "--force")
417  	    if len(argv) < 3:
418  	        raise CmdLineInputError()
419  	
420  	    first_action = DEFAULT_ACTION
421  	    then_action = DEFAULT_ACTION
422  	    action = argv[0]
423  	    if action in const.PCMK_ACTIONS:
424  	        first_action = action
425  	        argv.pop(0)
426  	
427  	    resource1 = argv.pop(0)
428  	    if argv.pop(0) != "then":
429  	        raise CmdLineInputError()
430  	
431  	    if not argv:
432  	        raise CmdLineInputError()
433  	
434  	    action = argv[0]
435  	    if action in const.PCMK_ACTIONS:
436  	        then_action = action
437  	        argv.pop(0)
438  	
439  	    if not argv:
440  	        raise CmdLineInputError()
441  	    resource2 = argv.pop(0)
442  	
443  	    order_options = []
444  	    if argv:
445  	        order_options = order_options + argv[:]
446  	    if "then" in order_options:
447  	        raise CmdLineInputError(
448  	            message="Multiple 'then's cannot be specified.",
449  	            hint=(
450  	                "Use the 'pcs constraint order set' command if you want to "
451  	                "create a constraint for more than two resources."
452  	            ),
453  	            show_both_usage_and_message=True,
454  	        )
455  	
456  	    order_options.append("first-action=" + first_action)
457  	    order_options.append("then-action=" + then_action)
458  	    _order_add(resource1, resource2, order_options, modifiers)
459  	
460  	
461  	def _order_add(resource1, resource2, options_list, modifiers):  # noqa: PLR0912, PLR0915
462  	    """
463  	    Commandline options:
464  	      * -f - CIB file
465  	      * --force - allow constraint for any resource, allow duplicate constraints
466  	    """
467  	    cib_dom = utils.get_cib_dom()
468  	    _validate_constraint_resource(cib_dom, resource1)
469  	    _validate_constraint_resource(cib_dom, resource2)
470  	
471  	    _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
472  	
473  	    order_options = []
474  	    id_specified = False
475  	    sym = None
476  	    for arg in options_list:
477  	        if arg == "symmetrical":
478  	            sym = "true"
479  	        elif arg == "nonsymmetrical":
480  	            sym = "false"
481  	        else:
482  	            name, value = parse_args.split_option(arg, allow_empty_value=False)
483  	            if name == "id":
484  	                id_valid, id_error = utils.validate_xml_id(
485  	                    value, "constraint id"
486  	                )
487  	                if not id_valid:
488  	                    utils.err(id_error)
489  	                if utils.does_id_exist(cib_dom, value):
490  	                    utils.err(
491  	                        "id '%s' is already in use, please specify another one"
492  	                        % value
493  	                    )
494  	                id_specified = True
495  	                order_options.append((name, value))
496  	            elif name == "symmetrical":
497  	                if value.lower() in OPTIONS_SYMMETRICAL:
498  	                    sym = value.lower()
499  	                else:
500  	                    utils.err(
501  	                        "invalid symmetrical value '%s', allowed values are: %s"
502  	                        % (value, ", ".join(OPTIONS_SYMMETRICAL))
503  	                    )
504  	            else:
505  	                order_options.append((name, value))
506  	    if sym:
507  	        order_options.append(("symmetrical", sym))
508  	
509  	    options = ""
510  	    if order_options:
511  	        options = " (Options: %s)" % " ".join(
512  	            [
513  	                "%s=%s" % (name, value)
514  	                for name, value in order_options
515  	                if name not in ("kind", "score")
516  	            ]
517  	        )
518  	
519  	    scorekind = "kind: Mandatory"
520  	    id_suffix = "mandatory"
521  	    for opt in order_options:
522  	        if opt[0] == "score":
523  	            scorekind = "score: " + opt[1]
524  	            id_suffix = opt[1]
525  	            # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
526  	            # added to pcs after 0.11.7
527  	            deprecation_warning(
528  	                reports.messages.DeprecatedOption(opt[0], []).message
529  	            )
530  	            break
531  	        if opt[0] == "kind":
532  	            scorekind = "kind: " + opt[1]
533  	            id_suffix = opt[1]
534  	            break
535  	
536  	    if not id_specified:
537  	        order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
538  	        order_id = utils.find_unique_id(cib_dom, order_id)
539  	        order_options.append(("id", order_id))
540  	
541  	    (dom, constraintsElement) = getCurrentConstraints()
542  	    element = dom.createElement("rsc_order")
543  	    element.setAttribute("first", resource1)
544  	    element.setAttribute("then", resource2)
545  	    for order_opt in order_options:
546  	        element.setAttribute(order_opt[0], order_opt[1])
547  	    constraintsElement.appendChild(element)
548  	    if not modifiers.get("--force"):
549  	
550  	        def _constraint_export(constraint_info):
551  	            options = constraint_info["options"]
552  	            oc_resource1 = options.get("first", "")
553  	            oc_resource2 = options.get("then", "")
554  	            first_action = options.get("first-action", "")
555  	            then_action = options.get("then-action", "")
556  	            oc_id = options.get("id", "")
557  	            oc_score = options.get("score", "")
558  	            oc_kind = options.get("kind", "")
559  	            oc_sym = ""
560  	            oc_id_out = ""
561  	            oc_options = ""
562  	            if "symmetrical" in options and not is_true(
563  	                options.get("symmetrical", "false")
564  	            ):
565  	                oc_sym = "(non-symmetrical)"
566  	            if oc_kind != "":
567  	                score_text = "(kind:" + oc_kind + ")"
568  	            elif oc_kind == "" and oc_score == "":
569  	                score_text = "(kind:Mandatory)"
570  	            else:
571  	                score_text = "(score:" + oc_score + ")"
572  	            oc_id_out = "(id:" + oc_id + ")"
573  	            already_processed_options = (
574  	                "first",
575  	                "then",
576  	                "first-action",
577  	                "then-action",
578  	                "id",
579  	                "score",
580  	                "kind",
581  	                "symmetrical",
582  	            )
583  	            oc_options = " ".join(
584  	                [
585  	                    f"{name}={value}"
586  	                    for name, value in options.items()
587  	                    if name not in already_processed_options
588  	                ]
589  	            )
590  	            if oc_options:
591  	                oc_options = "(Options: " + oc_options + ")"
592  	            return " ".join(
593  	                [
594  	                    arg
595  	                    for arg in [
596  	                        first_action,
597  	                        oc_resource1,
598  	                        "then",
599  	                        then_action,
600  	                        oc_resource2,
601  	                        score_text,
602  	                        oc_sym,
603  	                        oc_options,
604  	                        oc_id_out,
605  	                    ]
606  	                    if arg
607  	                ]
608  	            )
609  	
610  	        duplicates = order_find_duplicates(constraintsElement, element)
611  	        if duplicates:
612  	            utils.err(
613  	                "duplicate constraint already exists, use --force to override\n"
614  	                + "\n".join(
615  	                    [
616  	                        "  "
617  	                        + _constraint_export(
618  	                            {"options": dict(dup.attributes.items())}
619  	                        )
620  	                        for dup in duplicates
621  	                    ]
622  	                )
623  	            )
624  	    print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
625  	    utils.replace_cib_configuration(dom)
626  	
627  	
628  	def order_find_duplicates(dom, constraint_el):
629  	    """
630  	    Commandline options: no options
631  	    """
632  	
633  	    def normalize(constraint_el):
634  	        return (
635  	            constraint_el.getAttribute("first"),
636  	            constraint_el.getAttribute("then"),
637  	            constraint_el.getAttribute("first-action").lower()
638  	            or DEFAULT_ACTION,
639  	            constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
640  	        )
641  	
642  	    normalized_el = normalize(constraint_el)
643  	    return [
644  	        other_el
645  	        for other_el in dom.getElementsByTagName("rsc_order")
646  	        if not other_el.getElementsByTagName("resource_set")
647  	        and constraint_el is not other_el
648  	        and normalized_el == normalize(other_el)
649  	    ]
650  	
651  	
652  	_SetConstraint = TypeVar(
653  	    "_SetConstraint",
654  	    CibConstraintLocationSetDto,
655  	    CibConstraintColocationSetDto,
656  	    CibConstraintOrderSetDto,
657  	    CibConstraintTicketSetDto,
658  	)
659  	
660  	
661  	def _filter_set_constraints_by_resources(
662  	    constraints_dto: Iterable[_SetConstraint], resources: Set[str]
663  	) -> list[_SetConstraint]:
664  	    return [
665  	        constraint_set_dto
666  	        for constraint_set_dto in constraints_dto
667  	        if any(
668  	            set(resource_set.resources_ids) & resources
669  	            for resource_set in constraint_set_dto.resource_sets
670  	        )
671  	    ]
672  	
673  	
674  	def _filter_constraints_by_resources(
675  	    constraints_dto: CibConstraintsDto,
676  	    resources: StringIterable,
677  	    patterns: StringIterable,
678  	) -> CibConstraintsDto:
679  	    required_resources_set = set(resources)
680  	    required_patterns_set = set(patterns)
681  	    return CibConstraintsDto(
682  	        location=[
683  	            constraint_dto
684  	            for constraint_dto in constraints_dto.location
685  	            if (
686  	                constraint_dto.resource_id is not None
687  	                and constraint_dto.resource_id in required_resources_set
688  	            )
689  	            or (
690  	                constraint_dto.resource_pattern is not None
691  	                and constraint_dto.resource_pattern in required_patterns_set
692  	            )
693  	        ],
694  	        location_set=_filter_set_constraints_by_resources(
695  	            constraints_dto.location_set, required_resources_set
696  	        ),
697  	        colocation=[
698  	            constraint_dto
699  	            for constraint_dto in constraints_dto.colocation
700  	            if {constraint_dto.resource_id, constraint_dto.with_resource_id}
701  	            & required_resources_set
702  	        ],
703  	        colocation_set=_filter_set_constraints_by_resources(
704  	            constraints_dto.colocation_set, required_resources_set
705  	        ),
706  	        order=[
707  	            constraint_dto
708  	            for constraint_dto in constraints_dto.order
709  	            if {
710  	                constraint_dto.first_resource_id,
711  	                constraint_dto.then_resource_id,
712  	            }
713  	            & required_resources_set
714  	        ],
715  	        order_set=_filter_set_constraints_by_resources(
716  	            constraints_dto.order_set, required_resources_set
717  	        ),
718  	        ticket=[
719  	            constraint_dto
720  	            for constraint_dto in constraints_dto.ticket
721  	            if constraint_dto.resource_id in required_resources_set
722  	        ],
723  	        ticket_set=_filter_set_constraints_by_resources(
724  	            constraints_dto.ticket_set, required_resources_set
725  	        ),
726  	    )
727  	
728  	
729  	def _filter_location_by_node_base(
730  	    constraint_dtos: Iterable[CibConstraintLocationAnyDto],
731  	    nodes: StringCollection,
732  	) -> list[CibConstraintLocationAnyDto]:
733  	    return [
734  	        constraint_dto
735  	        for constraint_dto in constraint_dtos
736  	        if constraint_dto.attributes.node is not None
737  	        and constraint_dto.attributes.node in nodes
738  	    ]
739  	
740  	
741  	def location_config_cmd(
742  	    lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
743  	) -> None:
744  	    """
745  	    Options:
746  	      * --all - print expired constraints
747  	      * --full - print all details
748  	      * -f - CIB file
749  	    """
750  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
751  	    filter_type: Optional[str] = None
752  	    filter_items: parse_args.Argv = []
753  	    if argv:
754  	        filter_type, *filter_items = argv
755  	        allowed_types = ("resources", "nodes")
756  	        if filter_type not in allowed_types:
757  	            raise CmdLineInputError(
758  	                f"Unknown keyword '{filter_type}'. Allowed keywords: "
759  	                f"{format_list(allowed_types)}"
760  	            )
761  	        if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
762  	            raise CmdLineInputError(
763  	                "Output formats other than 'text' are not supported together "
764  	                "with grouping and filtering by nodes or resources"
765  	            )
766  	
767  	    constraints_dto = filter_constraints_by_rule_expired_status(
768  	        lib.constraint.get_config(evaluate_rules=True),
769  	        modifiers.is_specified("--all"),
770  	    )
771  	
772  	    constraints_dto = CibConstraintsDto(
773  	        location=constraints_dto.location,
774  	        location_set=constraints_dto.location_set,
775  	    )
776  	
777  	    def _print_lines(lines: StringSequence) -> None:
778  	        if lines:
779  	            print("Location Constraints:")
780  	            print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
781  	
782  	    if filter_type == "resources":
783  	        if filter_items:
784  	            resources = []
785  	            patterns = []
786  	            for item in filter_items:
787  	                item_type, item_value = parse_args.parse_typed_arg(
788  	                    item,
789  	                    [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
790  	                    RESOURCE_TYPE_RESOURCE,
791  	                )
792  	                if item_type == RESOURCE_TYPE_RESOURCE:
793  	                    resources.append(item_value)
794  	                elif item_type == RESOURCE_TYPE_REGEXP:
795  	                    patterns.append(item_value)
796  	            constraints_dto = _filter_constraints_by_resources(
797  	                constraints_dto, resources, patterns
798  	            )
799  	        _print_lines(
800  	            location.constraints_to_grouped_by_resource_text(
801  	                constraints_dto.location,
802  	                modifiers.is_specified("--full"),
803  	            )
804  	        )
805  	        return
806  	    if filter_type == "nodes":
807  	        if filter_items:
808  	            constraints_dto = CibConstraintsDto(
809  	                location=_filter_location_by_node_base(
810  	                    constraints_dto.location, filter_items
811  	                ),
812  	                location_set=_filter_location_by_node_base(
813  	                    constraints_dto.location_set, filter_items
814  	                ),
815  	            )
816  	        _print_lines(
817  	            location.constraints_to_grouped_by_node_text(
818  	                constraints_dto.location,
819  	                modifiers.is_specified("--full"),
820  	            )
821  	        )
822  	        return
823  	
824  	    print_config(constraints_dto, modifiers)
825  	
826  	
827  	def _verify_node_name(node, existing_nodes):
828  	    report_list = []
829  	    if node not in existing_nodes:
830  	        report_list.append(
831  	            ReportItem.error(
832  	                reports.messages.NodeNotFound(node),
833  	                force_code=reports.codes.FORCE,
834  	            )
835  	        )
836  	    return report_list
837  	
838  	
839  	def _verify_score(score):
840  	    if not utils.is_score(score):
841  	        utils.err(
842  	            "invalid score '%s', use integer or INFINITY or -INFINITY" % score
843  	        )
844  	
845  	
846  	def location_prefer(  # noqa: PLR0912
847  	    lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
848  	) -> None:
849  	    """
850  	    Options:
851  	      * --force - allow unknown options, allow constraint for any resource type
852  	      * -f - CIB file
853  	    """
854  	    modifiers.ensure_only_supported("--force", "-f")
855  	    rsc = argv.pop(0)
856  	    prefer_option = argv.pop(0)
857  	
858  	    dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
859  	        rsc,
860  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
861  	        RESOURCE_TYPE_RESOURCE,
862  	    )
863  	
864  	    if prefer_option == "prefers":
865  	        prefer = True
866  	    elif prefer_option == "avoids":
867  	        prefer = False
868  	    else:
869  	        raise CmdLineInputError()
870  	
871  	    skip_node_check = False
872  	    existing_nodes: list[str] = []
873  	    if modifiers.is_specified("-f") or modifiers.get("--force"):
874  	        skip_node_check = True
875  	        warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
876  	    else:
877  	        lib_env = utils.get_lib_env()
878  	        existing_nodes, report_list = get_existing_nodes_names(
879  	            corosync_conf=lib_env.get_corosync_conf(),
880  	            cib=lib_env.get_cib(),
881  	        )
882  	        if report_list:
883  	            process_library_reports(report_list)
884  	
885  	    report_list = []
886  	    parameters_list = []
887  	    for nodeconf in argv:
888  	        nodeconf_a = nodeconf.split("=", 1)
889  	        node = nodeconf_a[0]
890  	        if not skip_node_check:
891  	            report_list += _verify_node_name(node, existing_nodes)
892  	        if len(nodeconf_a) == 1:
893  	            score = "INFINITY" if prefer else "-INFINITY"
894  	        else:
895  	            score = nodeconf_a[1]
896  	            _verify_score(score)
897  	            if not prefer:
898  	                score = score[1:] if score[0] == "-" else "-" + score
899  	
900  	        parameters_list.append(
901  	            [
902  	                sanitize_id(f"location-{rsc_value}-{node}-{score}"),
903  	                rsc,
904  	                node,
905  	                f"score={score}",
906  	            ]
907  	        )
908  	
909  	    if report_list:
910  	        process_library_reports(report_list)
911  	
912  	    modifiers = modifiers.get_subset("--force", "-f")
913  	
914  	    for parameters in parameters_list:
915  	        location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
916  	
917  	
918  	def location_add(  # noqa: PLR0912, PLR0915
919  	    lib: Any,
920  	    argv: parse_args.Argv,
921  	    modifiers: parse_args.InputModifiers,
922  	    skip_score_and_node_check: bool = False,
923  	) -> None:
924  	    """
925  	    Options:
926  	      * --force - allow unknown options, allow constraint for any resource type
927  	      * -f - CIB file
928  	    """
929  	    del lib
930  	    modifiers.ensure_only_supported("--force", "-f")
931  	    if len(argv) < 4:
932  	        raise CmdLineInputError()
933  	
934  	    constraint_id = argv.pop(0)
935  	    rsc_type, rsc_value = parse_args.parse_typed_arg(
936  	        argv.pop(0),
937  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
938  	        RESOURCE_TYPE_RESOURCE,
939  	    )
940  	    node = argv.pop(0)
941  	    score = None
942  	    if "=" not in argv[0]:
943  	        score = argv.pop(0)
944  	        # TODO added to pcs in the first 0.12.x version
945  	        deprecation_warning(STANDALONE_SCORE_MSG)
946  	    options = []
947  	    # For now we only allow setting resource-discovery and score
948  	    for arg in argv:
949  	        name, value = parse_args.split_option(arg, allow_empty_value=False)
950  	        if name == "score":
951  	            score = value
952  	        elif name == "resource-discovery":
953  	            if not modifiers.get("--force"):
954  	                allowed_discovery = list(
955  	                    map(
956  	                        str,
957  	                        [
958  	                            CibResourceDiscovery.ALWAYS,
959  	                            CibResourceDiscovery.EXCLUSIVE,
960  	                            CibResourceDiscovery.NEVER,
961  	                        ],
962  	                    )
963  	                )
964  	                if value not in allowed_discovery:
965  	                    utils.err(
966  	                        (
967  	                            "invalid {0} value '{1}', allowed values are: {2}"
968  	                            ", use --force to override"
969  	                        ).format(name, value, format_list(allowed_discovery))
970  	                    )
971  	            options.append([name, value])
972  	        elif modifiers.get("--force"):
973  	            options.append([name, value])
974  	        else:
975  	            utils.err("bad option '%s', use --force to override" % name)
976  	    if score is None:
977  	        score = "INFINITY"
978  	
979  	    # Verify that specified node exists in the cluster and score is valid
980  	    if not skip_score_and_node_check:
981  	        if modifiers.is_specified("-f") or modifiers.get("--force"):
982  	            warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
983  	        else:
984  	            lib_env = utils.get_lib_env()
985  	            existing_nodes, report_list = get_existing_nodes_names(
986  	                corosync_conf=lib_env.get_corosync_conf(),
987  	                cib=lib_env.get_cib(),
988  	            )
989  	            report_list += _verify_node_name(node, existing_nodes)
990  	            if report_list:
991  	                process_library_reports(report_list)
992  	        _verify_score(score)
993  	
994  	    id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
995  	    if not id_valid:
996  	        utils.err(id_error)
997  	
998  	    dom = utils.get_cib_dom()
999  	
1000 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1001 	        (
1002 	            rsc_valid,
1003 	            rsc_error,
1004 	            dummy_correct_id,
1005 	        ) = utils.validate_constraint_resource(dom, rsc_value)
1006 	        if not rsc_valid:
1007 	            utils.err(rsc_error)
1008 	
1009 	    # Verify current constraint doesn't already exist
1010 	    # If it does we replace it with the new constraint
1011 	    dummy_dom, constraintsElement = getCurrentConstraints(dom)
1012 	    # If the id matches, or the rsc & node match, then we replace/remove
1013 	    elementsToRemove = [
1014 	        rsc_loc
1015 	        for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1016 	        # pylint: disable=too-many-boolean-expressions
1017 	        if rsc_loc.getAttribute("id") == constraint_id
1018 	        or (
1019 	            rsc_loc.getAttribute("node") == node
1020 	            and (
1021 	                (
1022 	                    rsc_type == RESOURCE_TYPE_RESOURCE
1023 	                    and rsc_loc.getAttribute("rsc") == rsc_value
1024 	                )
1025 	                or (
1026 	                    rsc_type == RESOURCE_TYPE_REGEXP
1027 	                    and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1028 	                )
1029 	            )
1030 	        )
1031 	    ]
1032 	    for etr in elementsToRemove:
1033 	        constraintsElement.removeChild(etr)
1034 	
1035 	    element = dom.createElement("rsc_location")
1036 	    element.setAttribute("id", constraint_id)
1037 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1038 	        element.setAttribute("rsc", rsc_value)
1039 	    elif rsc_type == RESOURCE_TYPE_REGEXP:
1040 	        element.setAttribute("rsc-pattern", rsc_value)
1041 	    element.setAttribute("node", node)
1042 	    element.setAttribute("score", score)
1043 	    for option in options:
1044 	        element.setAttribute(option[0], option[1])
1045 	    constraintsElement.appendChild(element)
1046 	
1047 	    utils.replace_cib_configuration(dom)
1048 	
1049 	
1050 	# Grabs the current constraints and returns the dom and constraint element
1051 	def getCurrentConstraints(passed_dom=None):
1052 	    """
1053 	    Commandline options:
1054 	      * -f - CIB file, only if passed_dom is None
1055 	    """
1056 	    if passed_dom:
1057 	        dom = passed_dom
1058 	    else:
1059 	        current_constraints_xml = utils.get_cib_xpath("//constraints")
1060 	        if current_constraints_xml == "":
1061 	            utils.err("unable to process cib")
1062 	        # Verify current constraint doesn't already exist
1063 	        # If it does we replace it with the new constraint
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1064 	        dom = parseString(current_constraints_xml)
1065 	
1066 	    constraintsElement = dom.getElementsByTagName("constraints")[0]
1067 	    return (dom, constraintsElement)
1068 	
1069 	
1070 	# If returnStatus is set, then we don't error out, we just print the error
1071 	# and return false
1072 	def constraint_rm(  # noqa: PLR0912
1073 	    lib,
1074 	    argv,
1075 	    modifiers,
1076 	    returnStatus=False,
1077 	    constraintsElement=None,
1078 	    passed_dom=None,
1079 	):
1080 	    """
1081 	    Options:
1082 	      * -f - CIB file, effective only if passed_dom is None
1083 	    """
1084 	    if passed_dom is None:
1085 	        modifiers.ensure_only_supported("-f")
1086 	    if not argv:
1087 	        raise CmdLineInputError()
1088 	
1089 	    bad_constraint = False
1090 	    if len(argv) != 1:
1091 	        for arg in argv:
1092 	            if not constraint_rm(
1093 	                lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1094 	            ):
1095 	                bad_constraint = True
1096 	        if bad_constraint:
1097 	            sys.exit(1)
1098 	        return None
1099 	
1100 	    c_id = argv.pop(0)
1101 	    elementFound = False
1102 	    dom = None
1103 	    use_cibadmin = False
1104 	    if not constraintsElement:
1105 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1106 	        use_cibadmin = True
1107 	
1108 	    for co in constraintsElement.childNodes[:]:
1109 	        if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1110 	            continue
1111 	        if co.getAttribute("id") == c_id:
1112 	            constraintsElement.removeChild(co)
1113 	            elementFound = True
1114 	
1115 	    if not elementFound:
1116 	        for rule in constraintsElement.getElementsByTagName("rule")[:]:
1117 	            if rule.getAttribute("id") == c_id:
1118 	                elementFound = True
1119 	                parent = rule.parentNode
1120 	                parent.removeChild(rule)
1121 	                if not parent.getElementsByTagName("rule"):
1122 	                    parent.parentNode.removeChild(parent)
1123 	
1124 	    if elementFound:
1125 	        if passed_dom:
1126 	            return dom
1127 	        if use_cibadmin:
1128 	            utils.replace_cib_configuration(dom)
1129 	        if returnStatus:
1130 	            return True
1131 	    else:
1132 	        utils.err("Unable to find constraint - '%s'" % c_id, False)
1133 	        if returnStatus:
1134 	            return False
1135 	        sys.exit(1)
1136 	    return None
1137 	
1138 	
1139 	def _split_set_constraints(
1140 	    constraints_dto: CibConstraintsDto,
1141 	) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1142 	    return (
1143 	        CibConstraintsDto(
1144 	            location=constraints_dto.location,
1145 	            colocation=constraints_dto.colocation,
1146 	            order=constraints_dto.order,
1147 	            ticket=constraints_dto.ticket,
1148 	        ),
1149 	        CibConstraintsDto(
1150 	            location_set=constraints_dto.location_set,
1151 	            colocation_set=constraints_dto.colocation_set,
1152 	            order_set=constraints_dto.order_set,
1153 	            ticket_set=constraints_dto.ticket_set,
1154 	        ),
1155 	    )
1156 	
1157 	
1158 	def _find_constraints_containing_resource(
1159 	    resources_dto: CibResourcesDto,
1160 	    constraints_dto: CibConstraintsDto,
1161 	    resource_id: str,
1162 	) -> CibConstraintsDto:
1163 	    resources_filter = [resource_id]
1164 	    # Original implementation only included parent resource only if resource_id
1165 	    # was referring to a primitive resource, ignoring groups. This may change in
1166 	    # the future if necessary.
1167 	    if any(
1168 	        primitive_dto.id == resource_id
1169 	        for primitive_dto in resources_dto.primitives
1170 	    ):
1171 	        for clone_dto in resources_dto.clones:
1172 	            if clone_dto.member_id == resource_id:
1173 	                resources_filter.append(clone_dto.id)
1174 	                break
1175 	    return _filter_constraints_by_resources(
1176 	        constraints_dto, resources_filter, []
1177 	    )
1178 	
1179 	
1180 	def ref(
1181 	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1182 	) -> None:
1183 	    modifiers.ensure_only_supported("-f")
1184 	    if not argv:
1185 	        raise CmdLineInputError()
1186 	
1187 	    resources_dto = cast(
1188 	        CibResourcesDto, lib.resource.get_configured_resources()
1189 	    )
1190 	
1191 	    constraints_dto = cast(
1192 	        CibConstraintsDto,
1193 	        lib.constraint.get_config(evaluate_rules=False),
1194 	    )
1195 	
1196 	    for resource_id in sorted(set(argv)):
1197 	        constraint_ids = get_all_constraints_ids(
1198 	            _find_constraints_containing_resource(
1199 	                resources_dto, constraints_dto, resource_id
1200 	            )
1201 	        )
1202 	        print(f"Resource: {resource_id}")
1203 	        if constraint_ids:
1204 	            print(
1205 	                "\n".join(
1206 	                    indent(
1207 	                        sorted(constraint_ids),
1208 	                        indent_step=INDENT_STEP,
1209 	                    )
1210 	                )
1211 	            )
1212 	        else:
1213 	            print("  No Matches")
1214 	
1215 	
1216 	def remove_constraints_containing(
1217 	    resource_id: str, output=False, constraints_element=None, passed_dom=None
1218 	):
1219 	    """
1220 	    Commandline options:
1221 	      * -f - CIB file, effective only if passed_dom is None
1222 	    """
1223 	    lib = utils.get_library_wrapper()
1224 	    modifiers = utils.get_input_modifiers()
1225 	    resources_dto = cast(
1226 	        CibResourcesDto, lib.resource.get_configured_resources()
1227 	    )
1228 	
1229 	    constraints_dto, set_constraints_dto = _split_set_constraints(
1230 	        cast(
1231 	            CibConstraintsDto,
1232 	            lib.constraint.get_config(evaluate_rules=False),
1233 	        )
1234 	    )
1235 	    constraints = sorted(
1236 	        get_all_constraints_ids(
1237 	            _find_constraints_containing_resource(
1238 	                resources_dto, constraints_dto, resource_id
1239 	            )
1240 	        )
1241 	    )
1242 	    set_constraints = sorted(
1243 	        get_all_constraints_ids(
1244 	            _find_constraints_containing_resource(
1245 	                resources_dto, set_constraints_dto, resource_id
1246 	            )
1247 	        )
1248 	    )
1249 	    for c in constraints:
1250 	        if output:
1251 	            print_to_stderr(f"Removing Constraint - {c}")
1252 	        if constraints_element is not None:
1253 	            constraint_rm(
1254 	                lib,
1255 	                [c],
1256 	                modifiers,
1257 	                True,
1258 	                constraints_element,
1259 	                passed_dom=passed_dom,
1260 	            )
1261 	        else:
1262 	            constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1263 	
1264 	    if set_constraints:
1265 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1266 	        for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1267 	            # If resource id is in a set, remove it from the set, if the set
1268 	            # is empty, then we remove the set, if the parent of the set
1269 	            # is empty then we remove it
1270 	            if set_c.getAttribute("id") == resource_id:
1271 	                parent_node = set_c.parentNode
1272 	                parent_node.removeChild(set_c)
1273 	                if output:
1274 	                    print_to_stderr(
1275 	                        "Removing {} from set {}".format(
1276 	                            resource_id, parent_node.getAttribute("id")
1277 	                        )
1278 	                    )
1279 	                if parent_node.getElementsByTagName("resource_ref").length == 0:
1280 	                    print_to_stderr(
1281 	                        "Removing set {}".format(parent_node.getAttribute("id"))
1282 	                    )
1283 	                    parent_node_2 = parent_node.parentNode
1284 	                    parent_node_2.removeChild(parent_node)
1285 	                    if (
1286 	                        parent_node_2.getElementsByTagName(
1287 	                            "resource_set"
1288 	                        ).length
1289 	                        == 0
1290 	                    ):
1291 	                        parent_node_2.parentNode.removeChild(parent_node_2)
1292 	                        print_to_stderr(
1293 	                            "Removing constraint {}".format(
1294 	                                parent_node_2.getAttribute("id")
1295 	                            )
1296 	                        )
1297 	        if passed_dom:
1298 	            return dom
1299 	        utils.replace_cib_configuration(dom)
1300 	    return None
1301 	
1302 	
1303 	# Re-assign any constraints referencing a resource to its parent (a clone
1304 	# or master)
1305 	def constraint_resource_update(old_id, dom):
1306 	    """
1307 	    Commandline options: no options
1308 	    """
1309 	    new_id = None
1310 	    clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1311 	    if clone_ms_parent:
1312 	        new_id = clone_ms_parent.getAttribute("id")
1313 	
1314 	    if new_id:
1315 	        constraints = dom.getElementsByTagName("rsc_location")
1316 	        constraints += dom.getElementsByTagName("rsc_order")
1317 	        constraints += dom.getElementsByTagName("rsc_colocation")
1318 	        attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1319 	        for constraint in constraints:
1320 	            for attr in attrs_to_update:
1321 	                if constraint.getAttribute(attr) == old_id:
1322 	                    constraint.setAttribute(attr, new_id)
1323 	    return dom
1324