1    	import sys
2    	import xml.dom.minidom
3    	from enum import Enum
4    	from typing import (
5    	    Any,
6    	    Iterable,
7    	    Optional,
8    	    Set,
9    	    TypeVar,
10   	    cast,
11   	)
12   	from xml.dom.minidom import parseString
13   	
14   	import pcs.cli.constraint_order.command as order_command
15   	from pcs import rule as rule_utils
16   	from pcs import utils
17   	from pcs.cli.common import parse_args
18   	from pcs.cli.common.errors import CmdLineInputError
19   	from pcs.cli.common.output import (
20   	    INDENT_STEP,
21   	    lines_to_str,
22   	)
23   	from pcs.cli.constraint.location import command as location_command
24   	from pcs.cli.constraint.output import (
25   	    CibConstraintLocationAnyDto,
26   	    filter_constraints_by_rule_expired_status,
27   	    location,
28   	    print_config,
29   	)
30   	from pcs.cli.reports import process_library_reports
31   	from pcs.cli.reports.output import (
32   	    deprecation_warning,
33   	    print_to_stderr,
34   	    warn,
35   	)
36   	from pcs.common import (
37   	    const,
38   	    pacemaker,
39   	    reports,
40   	)
41   	from pcs.common.pacemaker.constraint import (
42   	    CibConstraintColocationSetDto,
43   	    CibConstraintLocationSetDto,
44   	    CibConstraintOrderSetDto,
45   	    CibConstraintsDto,
46   	    CibConstraintTicketSetDto,
47   	    get_all_constraints_ids,
48   	)
49   	from pcs.common.pacemaker.resource.list import CibResourcesDto
50   	from pcs.common.pacemaker.types import CibResourceDiscovery
51   	from pcs.common.reports import ReportItem
52   	from pcs.common.str_tools import (
53   	    format_list,
54   	    indent,
55   	)
56   	from pcs.common.types import (
57   	    StringCollection,
58   	    StringIterable,
59   	    StringSequence,
60   	)
61   	from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
62   	from pcs.lib.node import get_existing_nodes_names
63   	from pcs.lib.pacemaker.values import (
64   	    SCORE_INFINITY,
65   	    is_true,
66   	    sanitize_id,
67   	)
68   	
69   	# pylint: disable=invalid-name
70   	# pylint: disable=too-many-branches
71   	# pylint: disable=too-many-lines
72   	# pylint: disable=too-many-locals
73   	# pylint: disable=too-many-nested-blocks
74   	# pylint: disable=too-many-statements
75   	
76   	DEFAULT_ACTION = const.PCMK_ACTION_START
77   	DEFAULT_ROLE = const.PCMK_ROLE_STARTED
78   	
79   	OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
80   	
81   	LOCATION_NODE_VALIDATION_SKIP_MSG = (
82   	    "Validation for node existence in the cluster will be skipped"
83   	)
84   	
85   	RESOURCE_TYPE_RESOURCE = "resource"
86   	RESOURCE_TYPE_REGEXP = "regexp"
87   	
88   	
89   	class CrmRuleReturnCode(Enum):
90   	    IN_EFFECT = 0
91   	    EXPIRED = 110
92   	    TO_BE_IN_EFFECT = 111
93   	
94   	
95   	def constraint_location_cmd(lib, argv, modifiers):
96   	    sub_cmd = "config" if not argv else argv.pop(0)
97   	
98   	    try:
99   	        if sub_cmd == "add":
100  	            location_add(lib, argv, modifiers)
101  	        elif sub_cmd in ["remove", "delete"]:
102  	            location_command.remove(lib, argv, modifiers)
103  	        elif sub_cmd == "show":
104  	            location_show(lib, argv, modifiers)
105  	        elif sub_cmd == "config":
106  	            location_config_cmd(lib, argv, modifiers)
107  	        elif len(argv) >= 2:
108  	            if argv[0] == "rule":
109  	                location_rule(lib, [sub_cmd] + argv, modifiers)
110  	            else:
111  	                location_prefer(lib, [sub_cmd] + argv, modifiers)
112  	        else:
113  	            raise CmdLineInputError()
114  	    except CmdLineInputError as e:
115  	        utils.exit_on_cmdline_input_error(
116  	            e, "constraint", ["location", sub_cmd]
117  	        )
118  	
119  	
120  	def constraint_order_cmd(lib, argv, modifiers):
121  	    sub_cmd = "config" if not argv else argv.pop(0)
122  	
123  	    try:
124  	        if sub_cmd == "set":
125  	            order_command.create_with_set(lib, argv, modifiers)
126  	        elif sub_cmd in ["remove", "delete"]:
127  	            order_rm(lib, argv, modifiers)
128  	        elif sub_cmd == "show":
129  	            order_command.show(lib, argv, modifiers)
130  	        elif sub_cmd == "config":
131  	            order_command.config_cmd(lib, argv, modifiers)
132  	        else:
133  	            order_start(lib, [sub_cmd] + argv, modifiers)
134  	    except CmdLineInputError as e:
135  	        utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
136  	
137  	
138  	def constraint_show(lib, argv, modifiers):
139  	    deprecation_warning(
140  	        "This command is deprecated and will be removed. "
141  	        "Please use 'pcs constraint config' instead."
142  	    )
143  	    config_cmd(lib, argv, modifiers)
144  	
145  	
146  	def config_cmd(
147  	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
148  	) -> None:
149  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
150  	    if argv:
151  	        raise CmdLineInputError()
152  	
153  	    print_config(
154  	        cast(
155  	            CibConstraintsDto,
156  	            lib.constraint.get_config(evaluate_rules=True),
157  	        ),
158  	        modifiers,
159  	    )
160  	
161  	
162  	def _validate_constraint_resource(cib_dom, resource_id):
163  	    (
164  	        resource_valid,
165  	        resource_error,
166  	        dummy_correct_id,
167  	    ) = utils.validate_constraint_resource(cib_dom, resource_id)
168  	    if not resource_valid:
169  	        utils.err(resource_error)
170  	
171  	
172  	def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
173  	    if not utils.validate_resources_not_in_same_group(
174  	        cib_dom, resource1, resource2
175  	    ):
176  	        utils.err(
177  	            "Cannot create an order constraint for resources in the same group"
178  	        )
179  	
180  	
181  	# Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
182  	# possible commands:
183  	#        <src> with        <tgt> [score] [options]
184  	#        <src> with <role> <tgt> [score] [options]
185  	# <role> <src> with        <tgt> [score] [options]
186  	# <role> <src> with <role> <tgt> [score] [options]
187  	def colocation_add(lib, argv, modifiers):  # noqa: PLR0912, PLR0915
188  	    """
189  	    Options:
190  	      * -f - CIB file
191  	      * --force - allow constraint on any resource, allow duplicate constraints
192  	    """
193  	
194  	    def _parse_score_options(argv):
195  	        # When passed an array of arguments if the first argument doesn't have
196  	        # an '=' then it's the score, otherwise they're all arguments. Return a
197  	        # tuple with the score and array of name,value pairs
198  	        """
199  	        Commandline options: no options
200  	        """
201  	        if not argv:
202  	            return SCORE_INFINITY, []
203  	        score = SCORE_INFINITY if "=" in argv[0] else argv.pop(0)
204  	        # create a list of 2-tuples (name, value)
205  	        arg_array = [
206  	            parse_args.split_option(arg, allow_empty_value=False)
207  	            for arg in argv
208  	        ]
209  	        return score, arg_array
210  	
211  	    del lib
212  	    modifiers.ensure_only_supported("-f", "--force")
213  	    if len(argv) < 3:
214  	        raise CmdLineInputError()
215  	
216  	    role1 = ""
217  	    role2 = ""
218  	
219  	    cib_dom = utils.get_cib_dom()
220  	    new_roles_supported = utils.isCibVersionSatisfied(
221  	        cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
222  	    )
223  	
224  	    def _validate_and_prepare_role(role):
225  	        role_cleaned = role.lower().capitalize()
226  	        if role_cleaned not in const.PCMK_ROLES:
227  	            utils.err(
228  	                "invalid role value '{0}', allowed values are: {1}".format(
229  	                    role, format_list(const.PCMK_ROLES)
230  	                )
231  	            )
232  	        utils.print_deprecation_warning_for_legacy_roles(role)
233  	        return pacemaker.role.get_value_for_cib(
234  	            role_cleaned, new_roles_supported
235  	        )
236  	
237  	    if argv[2] == "with":
238  	        role1 = _validate_and_prepare_role(argv.pop(0))
239  	        resource1 = argv.pop(0)
240  	    elif argv[1] == "with":
241  	        resource1 = argv.pop(0)
242  	    else:
243  	        raise CmdLineInputError()
244  	
245  	    if argv.pop(0) != "with":
246  	        raise CmdLineInputError()
247  	    if "with" in argv:
248  	        raise CmdLineInputError(
249  	            message="Multiple 'with's cannot be specified.",
250  	            hint=(
251  	                "Use the 'pcs constraint colocation set' command if you want "
252  	                "to create a constraint for more than two resources."
253  	            ),
254  	            show_both_usage_and_message=True,
255  	        )
256  	
257  	    if not argv:
258  	        raise CmdLineInputError()
259  	    if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
260  	        resource2 = argv.pop(0)
261  	    else:
262  	        role2 = _validate_and_prepare_role(argv.pop(0))
263  	        resource2 = argv.pop(0)
264  	
265  	    score, nv_pairs = _parse_score_options(argv)
266  	
267  	    _validate_constraint_resource(cib_dom, resource1)
268  	    _validate_constraint_resource(cib_dom, resource2)
269  	
270  	    id_in_nvpairs = None
271  	    for name, value in nv_pairs:
272  	        if name == "id":
273  	            id_valid, id_error = utils.validate_xml_id(value, "constraint id")
274  	            if not id_valid:
275  	                utils.err(id_error)
276  	            if utils.does_id_exist(cib_dom, value):
277  	                utils.err(
278  	                    "id '%s' is already in use, please specify another one"
279  	                    % value
280  	                )
281  	            id_in_nvpairs = True
282  	    if not id_in_nvpairs:
283  	        nv_pairs.append(
284  	            (
285  	                "id",
286  	                utils.find_unique_id(
287  	                    cib_dom,
288  	                    "colocation-%s-%s-%s" % (resource1, resource2, score),
289  	                ),
290  	            )
291  	        )
292  	
293  	    (dom, constraintsElement) = getCurrentConstraints(cib_dom)
294  	
295  	    # If one role is specified, the other should default to "started"
296  	    if role1 != "" and role2 == "":
297  	        role2 = DEFAULT_ROLE
298  	    if role2 != "" and role1 == "":
299  	        role1 = DEFAULT_ROLE
300  	    element = dom.createElement("rsc_colocation")
301  	    element.setAttribute("rsc", resource1)
302  	    element.setAttribute("with-rsc", resource2)
303  	    element.setAttribute("score", score)
304  	    if role1 != "":
305  	        element.setAttribute("rsc-role", role1)
306  	    if role2 != "":
307  	        element.setAttribute("with-rsc-role", role2)
308  	    for nv_pair in nv_pairs:
309  	        element.setAttribute(nv_pair[0], nv_pair[1])
310  	    if not modifiers.get("--force"):
311  	
312  	        def _constraint_export(constraint_info):
313  	            options_dict = constraint_info["options"]
314  	            co_resource1 = options_dict.get("rsc", "")
315  	            co_resource2 = options_dict.get("with-rsc", "")
316  	            co_id = options_dict.get("id", "")
317  	            co_score = options_dict.get("score", "")
318  	            score_text = "(score:" + co_score + ")"
319  	            console_option_list = [
320  	                f"({option[0]}:{option[1]})"
321  	                for option in sorted(options_dict.items())
322  	                if option[0] not in ("rsc", "with-rsc", "id", "score")
323  	            ]
324  	            console_option_list.append(f"(id:{co_id})")
325  	            return " ".join(
326  	                [co_resource1, "with", co_resource2, score_text]
327  	                + console_option_list
328  	            )
329  	
330  	        duplicates = colocation_find_duplicates(constraintsElement, element)
331  	        if duplicates:
332  	            utils.err(
333  	                "duplicate constraint already exists, use --force to override\n"
334  	                + "\n".join(
335  	                    [
336  	                        "  "
337  	                        + _constraint_export(
338  	                            {"options": dict(dup.attributes.items())}
339  	                        )
340  	                        for dup in duplicates
341  	                    ]
342  	                )
343  	            )
344  	    constraintsElement.appendChild(element)
345  	    utils.replace_cib_configuration(dom)
346  	
347  	
348  	def colocation_find_duplicates(dom, constraint_el):
349  	    """
350  	    Commandline options: no options
351  	    """
352  	    new_roles_supported = utils.isCibVersionSatisfied(
353  	        dom, const.PCMK_NEW_ROLES_CIB_VERSION
354  	    )
355  	
356  	    def normalize(const_el):
357  	        return (
358  	            const_el.getAttribute("rsc"),
359  	            const_el.getAttribute("with-rsc"),
360  	            pacemaker.role.get_value_for_cib(
361  	                const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
362  	                new_roles_supported,
363  	            ),
364  	            pacemaker.role.get_value_for_cib(
365  	                const_el.getAttribute("with-rsc-role").capitalize()
366  	                or DEFAULT_ROLE,
367  	                new_roles_supported,
368  	            ),
369  	        )
370  	
371  	    normalized_el = normalize(constraint_el)
372  	    return [
373  	        other_el
374  	        for other_el in dom.getElementsByTagName("rsc_colocation")
375  	        if not other_el.getElementsByTagName("resource_set")
376  	        and constraint_el is not other_el
377  	        and normalized_el == normalize(other_el)
378  	    ]
379  	
380  	
381  	def order_rm(lib, argv, modifiers):
382  	    """
383  	    Options:
384  	      * -f - CIB file
385  	    """
386  	    del lib
387  	    modifiers.ensure_only_supported("-f")
388  	    if not argv:
389  	        raise CmdLineInputError()
390  	
391  	    elementFound = False
392  	    (dom, constraintsElement) = getCurrentConstraints()
393  	
394  	    for resource in argv:
395  	        for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
396  	            if (
397  	                ord_loc.getAttribute("first") == resource
398  	                or ord_loc.getAttribute("then") == resource
399  	            ):
400  	                constraintsElement.removeChild(ord_loc)
401  	                elementFound = True
402  	
403  	        resource_refs_to_remove = []
404  	        for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
405  	            if ord_set.getAttribute("id") == resource:
406  	                resource_refs_to_remove.append(ord_set)
407  	                elementFound = True
408  	
409  	        for res_ref in resource_refs_to_remove:
410  	            res_set = res_ref.parentNode
411  	            res_order = res_set.parentNode
412  	
413  	            res_ref.parentNode.removeChild(res_ref)
414  	            if not res_set.getElementsByTagName("resource_ref"):
415  	                res_set.parentNode.removeChild(res_set)
416  	                if not res_order.getElementsByTagName("resource_set"):
417  	                    res_order.parentNode.removeChild(res_order)
418  	
419  	    if elementFound:
420  	        utils.replace_cib_configuration(dom)
421  	    else:
422  	        utils.err("No matching resources found in ordering list")
423  	
424  	
425  	def order_start(lib, argv, modifiers):
426  	    """
427  	    Options:
428  	      * -f - CIB file
429  	      * --force - allow constraint for any resource, allow duplicate constraints
430  	    """
431  	    del lib
432  	    modifiers.ensure_only_supported("-f", "--force")
433  	    if len(argv) < 3:
434  	        raise CmdLineInputError()
435  	
436  	    first_action = DEFAULT_ACTION
437  	    then_action = DEFAULT_ACTION
438  	    action = argv[0]
439  	    if action in const.PCMK_ACTIONS:
440  	        first_action = action
441  	        argv.pop(0)
442  	
443  	    resource1 = argv.pop(0)
444  	    if argv.pop(0) != "then":
445  	        raise CmdLineInputError()
446  	
447  	    if not argv:
448  	        raise CmdLineInputError()
449  	
450  	    action = argv[0]
451  	    if action in const.PCMK_ACTIONS:
452  	        then_action = action
453  	        argv.pop(0)
454  	
455  	    if not argv:
456  	        raise CmdLineInputError()
457  	    resource2 = argv.pop(0)
458  	
459  	    order_options = []
460  	    if argv:
461  	        order_options = order_options + argv[:]
462  	    if "then" in order_options:
463  	        raise CmdLineInputError(
464  	            message="Multiple 'then's cannot be specified.",
465  	            hint=(
466  	                "Use the 'pcs constraint order set' command if you want to "
467  	                "create a constraint for more than two resources."
468  	            ),
469  	            show_both_usage_and_message=True,
470  	        )
471  	
472  	    order_options.append("first-action=" + first_action)
473  	    order_options.append("then-action=" + then_action)
474  	    _order_add(resource1, resource2, order_options, modifiers)
475  	
476  	
477  	def _order_add(resource1, resource2, options_list, modifiers):  # noqa: PLR0912, PLR0915
478  	    """
479  	    Commandline options:
480  	      * -f - CIB file
481  	      * --force - allow constraint for any resource, allow duplicate constraints
482  	    """
483  	    cib_dom = utils.get_cib_dom()
484  	    _validate_constraint_resource(cib_dom, resource1)
485  	    _validate_constraint_resource(cib_dom, resource2)
486  	
487  	    _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
488  	
489  	    order_options = []
490  	    id_specified = False
491  	    sym = None
492  	    for arg in options_list:
493  	        if arg == "symmetrical":
494  	            sym = "true"
495  	        elif arg == "nonsymmetrical":
496  	            sym = "false"
497  	        else:
498  	            name, value = parse_args.split_option(arg, allow_empty_value=False)
499  	            if name == "id":
500  	                id_valid, id_error = utils.validate_xml_id(
501  	                    value, "constraint id"
502  	                )
503  	                if not id_valid:
504  	                    utils.err(id_error)
505  	                if utils.does_id_exist(cib_dom, value):
506  	                    utils.err(
507  	                        "id '%s' is already in use, please specify another one"
508  	                        % value
509  	                    )
510  	                id_specified = True
511  	                order_options.append((name, value))
512  	            elif name == "symmetrical":
513  	                if value.lower() in OPTIONS_SYMMETRICAL:
514  	                    sym = value.lower()
515  	                else:
516  	                    utils.err(
517  	                        "invalid symmetrical value '%s', allowed values are: %s"
518  	                        % (value, ", ".join(OPTIONS_SYMMETRICAL))
519  	                    )
520  	            else:
521  	                order_options.append((name, value))
522  	    if sym:
523  	        order_options.append(("symmetrical", sym))
524  	
525  	    options = ""
526  	    if order_options:
527  	        options = " (Options: %s)" % " ".join(
528  	            [
529  	                "%s=%s" % (name, value)
530  	                for name, value in order_options
531  	                if name not in ("kind", "score")
532  	            ]
533  	        )
534  	
535  	    scorekind = "kind: Mandatory"
536  	    id_suffix = "mandatory"
537  	    for opt in order_options:
538  	        if opt[0] == "score":
539  	            scorekind = "score: " + opt[1]
540  	            id_suffix = opt[1]
541  	            # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
542  	            # added to pcs after 0.11.7
543  	            deprecation_warning(
544  	                reports.messages.DeprecatedOption(opt[0], []).message
545  	            )
546  	            break
547  	        if opt[0] == "kind":
548  	            scorekind = "kind: " + opt[1]
549  	            id_suffix = opt[1]
550  	            break
551  	
552  	    if not id_specified:
553  	        order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
554  	        order_id = utils.find_unique_id(cib_dom, order_id)
555  	        order_options.append(("id", order_id))
556  	
557  	    (dom, constraintsElement) = getCurrentConstraints()
558  	    element = dom.createElement("rsc_order")
559  	    element.setAttribute("first", resource1)
560  	    element.setAttribute("then", resource2)
561  	    for order_opt in order_options:
562  	        element.setAttribute(order_opt[0], order_opt[1])
563  	    constraintsElement.appendChild(element)
564  	    if not modifiers.get("--force"):
565  	
566  	        def _constraint_export(constraint_info):
567  	            options = constraint_info["options"]
568  	            oc_resource1 = options.get("first", "")
569  	            oc_resource2 = options.get("then", "")
570  	            first_action = options.get("first-action", "")
571  	            then_action = options.get("then-action", "")
572  	            oc_id = options.get("id", "")
573  	            oc_score = options.get("score", "")
574  	            oc_kind = options.get("kind", "")
575  	            oc_sym = ""
576  	            oc_id_out = ""
577  	            oc_options = ""
578  	            if "symmetrical" in options and not is_true(
579  	                options.get("symmetrical", "false")
580  	            ):
581  	                oc_sym = "(non-symmetrical)"
582  	            if oc_kind != "":
583  	                score_text = "(kind:" + oc_kind + ")"
584  	            elif oc_kind == "" and oc_score == "":
585  	                score_text = "(kind:Mandatory)"
586  	            else:
587  	                score_text = "(score:" + oc_score + ")"
588  	            oc_id_out = "(id:" + oc_id + ")"
589  	            already_processed_options = (
590  	                "first",
591  	                "then",
592  	                "first-action",
593  	                "then-action",
594  	                "id",
595  	                "score",
596  	                "kind",
597  	                "symmetrical",
598  	            )
599  	            oc_options = " ".join(
600  	                [
601  	                    f"{name}={value}"
602  	                    for name, value in options.items()
603  	                    if name not in already_processed_options
604  	                ]
605  	            )
606  	            if oc_options:
607  	                oc_options = "(Options: " + oc_options + ")"
608  	            return " ".join(
609  	                [
610  	                    arg
611  	                    for arg in [
612  	                        first_action,
613  	                        oc_resource1,
614  	                        "then",
615  	                        then_action,
616  	                        oc_resource2,
617  	                        score_text,
618  	                        oc_sym,
619  	                        oc_options,
620  	                        oc_id_out,
621  	                    ]
622  	                    if arg
623  	                ]
624  	            )
625  	
626  	        duplicates = order_find_duplicates(constraintsElement, element)
627  	        if duplicates:
628  	            utils.err(
629  	                "duplicate constraint already exists, use --force to override\n"
630  	                + "\n".join(
631  	                    [
632  	                        "  "
633  	                        + _constraint_export(
634  	                            {"options": dict(dup.attributes.items())}
635  	                        )
636  	                        for dup in duplicates
637  	                    ]
638  	                )
639  	            )
640  	    print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
641  	    utils.replace_cib_configuration(dom)
642  	
643  	
644  	def order_find_duplicates(dom, constraint_el):
645  	    """
646  	    Commandline options: no options
647  	    """
648  	
649  	    def normalize(constraint_el):
650  	        return (
651  	            constraint_el.getAttribute("first"),
652  	            constraint_el.getAttribute("then"),
653  	            constraint_el.getAttribute("first-action").lower()
654  	            or DEFAULT_ACTION,
655  	            constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
656  	        )
657  	
658  	    normalized_el = normalize(constraint_el)
659  	    return [
660  	        other_el
661  	        for other_el in dom.getElementsByTagName("rsc_order")
662  	        if not other_el.getElementsByTagName("resource_set")
663  	        and constraint_el is not other_el
664  	        and normalized_el == normalize(other_el)
665  	    ]
666  	
667  	
668  	def location_show(lib, argv, modifiers):
669  	    deprecation_warning(
670  	        "This command is deprecated and will be removed. "
671  	        "Please use 'pcs constraint location config' instead."
672  	    )
673  	    return location_config_cmd(lib, argv, modifiers)
674  	
675  	
676  	_SetConstraint = TypeVar(
677  	    "_SetConstraint",
678  	    CibConstraintLocationSetDto,
679  	    CibConstraintColocationSetDto,
680  	    CibConstraintOrderSetDto,
681  	    CibConstraintTicketSetDto,
682  	)
683  	
684  	
685  	def _filter_set_constraints_by_resources(
686  	    constraints_dto: Iterable[_SetConstraint], resources: Set[str]
687  	) -> list[_SetConstraint]:
688  	    return [
689  	        constraint_set_dto
690  	        for constraint_set_dto in constraints_dto
691  	        if any(
692  	            set(resource_set.resources_ids) & resources
693  	            for resource_set in constraint_set_dto.resource_sets
694  	        )
695  	    ]
696  	
697  	
698  	def _filter_constraints_by_resources(
699  	    constraints_dto: CibConstraintsDto,
700  	    resources: StringIterable,
701  	    patterns: StringIterable,
702  	) -> CibConstraintsDto:
703  	    required_resources_set = set(resources)
704  	    required_patterns_set = set(patterns)
705  	    return CibConstraintsDto(
706  	        location=[
707  	            constraint_dto
708  	            for constraint_dto in constraints_dto.location
709  	            if (
710  	                constraint_dto.resource_id is not None
711  	                and constraint_dto.resource_id in required_resources_set
712  	            )
713  	            or (
714  	                constraint_dto.resource_pattern is not None
715  	                and constraint_dto.resource_pattern in required_patterns_set
716  	            )
717  	        ],
718  	        location_set=_filter_set_constraints_by_resources(
719  	            constraints_dto.location_set, required_resources_set
720  	        ),
721  	        colocation=[
722  	            constraint_dto
723  	            for constraint_dto in constraints_dto.colocation
724  	            if {constraint_dto.resource_id, constraint_dto.with_resource_id}
725  	            & required_resources_set
726  	        ],
727  	        colocation_set=_filter_set_constraints_by_resources(
728  	            constraints_dto.colocation_set, required_resources_set
729  	        ),
730  	        order=[
731  	            constraint_dto
732  	            for constraint_dto in constraints_dto.order
733  	            if {
734  	                constraint_dto.first_resource_id,
735  	                constraint_dto.then_resource_id,
736  	            }
737  	            & required_resources_set
738  	        ],
739  	        order_set=_filter_set_constraints_by_resources(
740  	            constraints_dto.order_set, required_resources_set
741  	        ),
742  	        ticket=[
743  	            constraint_dto
744  	            for constraint_dto in constraints_dto.ticket
745  	            if constraint_dto.resource_id in required_resources_set
746  	        ],
747  	        ticket_set=_filter_set_constraints_by_resources(
748  	            constraints_dto.ticket_set, required_resources_set
749  	        ),
750  	    )
751  	
752  	
753  	def _filter_location_by_node_base(
754  	    constraint_dtos: Iterable[CibConstraintLocationAnyDto],
755  	    nodes: StringCollection,
756  	) -> list[CibConstraintLocationAnyDto]:
757  	    return [
758  	        constraint_dto
759  	        for constraint_dto in constraint_dtos
760  	        if constraint_dto.attributes.node is not None
761  	        and constraint_dto.attributes.node in nodes
762  	    ]
763  	
764  	
765  	def location_config_cmd(
766  	    lib: Any, argv: StringSequence, modifiers: parse_args.InputModifiers
767  	) -> None:
768  	    """
769  	    Options:
770  	      * --all - print expired constraints
771  	      * --full - print all details
772  	      * -f - CIB file
773  	    """
774  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
775  	    filter_type: Optional[str] = None
776  	    filter_items: parse_args.Argv = []
777  	    if argv:
778  	        filter_type, *filter_items = argv
779  	        allowed_types = ("resources", "nodes")
780  	        if filter_type not in allowed_types:
781  	            raise CmdLineInputError(
782  	                f"Unknown keyword '{filter_type}'. Allowed keywords: "
783  	                f"{format_list(allowed_types)}"
784  	            )
785  	        if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
786  	            raise CmdLineInputError(
787  	                "Output formats other than 'text' are not supported together "
788  	                "with grouping and filtering by nodes or resources"
789  	            )
790  	
791  	    constraints_dto = filter_constraints_by_rule_expired_status(
792  	        lib.constraint.get_config(evaluate_rules=True),
793  	        modifiers.is_specified("--all"),
794  	    )
795  	
796  	    constraints_dto = CibConstraintsDto(
797  	        location=constraints_dto.location,
798  	        location_set=constraints_dto.location_set,
799  	    )
800  	
801  	    def _print_lines(lines: StringSequence) -> None:
802  	        if lines:
803  	            print("Location Constraints:")
804  	            print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
805  	
806  	    if filter_type == "resources":
807  	        if filter_items:
808  	            resources = []
809  	            patterns = []
810  	            for item in filter_items:
811  	                item_type, item_value = parse_args.parse_typed_arg(
812  	                    item,
813  	                    [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
814  	                    RESOURCE_TYPE_RESOURCE,
815  	                )
816  	                if item_type == RESOURCE_TYPE_RESOURCE:
817  	                    resources.append(item_value)
818  	                elif item_type == RESOURCE_TYPE_REGEXP:
819  	                    patterns.append(item_value)
820  	            constraints_dto = _filter_constraints_by_resources(
821  	                constraints_dto, resources, patterns
822  	            )
823  	        _print_lines(
824  	            location.constraints_to_grouped_by_resource_text(
825  	                constraints_dto.location,
826  	                modifiers.is_specified("--full"),
827  	            )
828  	        )
829  	        return
830  	    if filter_type == "nodes":
831  	        if filter_items:
832  	            constraints_dto = CibConstraintsDto(
833  	                location=_filter_location_by_node_base(
834  	                    constraints_dto.location, filter_items
835  	                ),
836  	                location_set=_filter_location_by_node_base(
837  	                    constraints_dto.location_set, filter_items
838  	                ),
839  	            )
840  	        _print_lines(
841  	            location.constraints_to_grouped_by_node_text(
842  	                constraints_dto.location,
843  	                modifiers.is_specified("--full"),
844  	            )
845  	        )
846  	        return
847  	
848  	    print_config(constraints_dto, modifiers)
849  	
850  	
851  	def _verify_node_name(node, existing_nodes):
852  	    report_list = []
853  	    if node not in existing_nodes:
854  	        report_list.append(
855  	            ReportItem.error(
856  	                reports.messages.NodeNotFound(node),
857  	                force_code=reports.codes.FORCE,
858  	            )
859  	        )
860  	    return report_list
861  	
862  	
863  	def _verify_score(score):
864  	    if not utils.is_score(score):
865  	        utils.err(
866  	            "invalid score '%s', use integer or INFINITY or -INFINITY" % score
867  	        )
868  	
869  	
870  	def location_prefer(  # noqa: PLR0912
871  	    lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
872  	) -> None:
873  	    """
874  	    Options:
875  	      * --force - allow unknown options, allow constraint for any resource type
876  	      * -f - CIB file
877  	    """
878  	    modifiers.ensure_only_supported("--force", "-f")
879  	    rsc = argv.pop(0)
880  	    prefer_option = argv.pop(0)
881  	
882  	    dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
883  	        rsc,
884  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
885  	        RESOURCE_TYPE_RESOURCE,
886  	    )
887  	
888  	    if prefer_option == "prefers":
889  	        prefer = True
890  	    elif prefer_option == "avoids":
891  	        prefer = False
892  	    else:
893  	        raise CmdLineInputError()
894  	
895  	    skip_node_check = False
896  	    existing_nodes: list[str] = []
897  	    if modifiers.is_specified("-f") or modifiers.get("--force"):
898  	        skip_node_check = True
899  	        warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
900  	    else:
901  	        lib_env = utils.get_lib_env()
902  	        existing_nodes, report_list = get_existing_nodes_names(
903  	            corosync_conf=lib_env.get_corosync_conf(),
904  	            cib=lib_env.get_cib(),
905  	        )
906  	        if report_list:
907  	            process_library_reports(report_list)
908  	
909  	    report_list = []
910  	    parameters_list = []
911  	    for nodeconf in argv:
912  	        nodeconf_a = nodeconf.split("=", 1)
913  	        node = nodeconf_a[0]
914  	        if not skip_node_check:
915  	            report_list += _verify_node_name(node, existing_nodes)
916  	        if len(nodeconf_a) == 1:
917  	            score = "INFINITY" if prefer else "-INFINITY"
918  	        else:
919  	            score = nodeconf_a[1]
920  	            _verify_score(score)
921  	            if not prefer:
922  	                score = score[1:] if score[0] == "-" else "-" + score
923  	
924  	        parameters_list.append(
925  	            [
926  	                sanitize_id(f"location-{rsc_value}-{node}-{score}"),
927  	                rsc,
928  	                node,
929  	                score,
930  	            ]
931  	        )
932  	
933  	    if report_list:
934  	        process_library_reports(report_list)
935  	
936  	    modifiers = modifiers.get_subset("--force", "-f")
937  	
938  	    for parameters in parameters_list:
939  	        location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
940  	
941  	
942  	def location_add(  # noqa: PLR0912, PLR0915
943  	    lib: Any,
944  	    argv: parse_args.Argv,
945  	    modifiers: parse_args.InputModifiers,
946  	    skip_score_and_node_check: bool = False,
947  	) -> None:
948  	    """
949  	    Options:
950  	      * --force - allow unknown options, allow constraint for any resource type
951  	      * -f - CIB file
952  	    """
953  	    del lib
954  	    modifiers.ensure_only_supported("--force", "-f")
955  	    if len(argv) < 4:
956  	        raise CmdLineInputError()
957  	
958  	    constraint_id = argv.pop(0)
959  	    rsc_type, rsc_value = parse_args.parse_typed_arg(
960  	        argv.pop(0),
961  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
962  	        RESOURCE_TYPE_RESOURCE,
963  	    )
964  	    node = argv.pop(0)
965  	    score = argv.pop(0)
966  	    options = []
967  	    # For now we only allow setting resource-discovery
968  	    if argv:
969  	        for arg in argv:
970  	            if "=" in arg:
971  	                name, value = arg.split("=", 1)
972  	                if name == "resource-discovery":
973  	                    if not modifiers.get("--force"):
974  	                        allowed_discovery = list(
975  	                            map(
976  	                                str,
977  	                                [
978  	                                    CibResourceDiscovery.ALWAYS,
979  	                                    CibResourceDiscovery.EXCLUSIVE,
980  	                                    CibResourceDiscovery.NEVER,
981  	                                ],
982  	                            )
983  	                        )
984  	                        if value not in allowed_discovery:
985  	                            utils.err(
986  	                                (
987  	                                    "invalid {0} value '{1}', allowed values are: "
988  	                                    "{2}, use --force to override"
989  	                                ).format(
990  	                                    name, value, format_list(allowed_discovery)
991  	                                )
992  	                            )
993  	                options.append([name, value])
994  	            else:
995  	                raise CmdLineInputError(f"bad option '{arg}'")
996  	            if options[-1][0] != "resource-discovery" and not modifiers.get(
997  	                "--force"
998  	            ):
999  	                utils.err(
1000 	                    "bad option '%s', use --force to override" % options[-1][0]
1001 	                )
1002 	
1003 	    # Verify that specified node exists in the cluster and score is valid
1004 	    if not skip_score_and_node_check:
1005 	        if modifiers.is_specified("-f") or modifiers.get("--force"):
1006 	            warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
1007 	        else:
1008 	            lib_env = utils.get_lib_env()
1009 	            existing_nodes, report_list = get_existing_nodes_names(
1010 	                corosync_conf=lib_env.get_corosync_conf(),
1011 	                cib=lib_env.get_cib(),
1012 	            )
1013 	            report_list += _verify_node_name(node, existing_nodes)
1014 	            if report_list:
1015 	                process_library_reports(report_list)
1016 	        _verify_score(score)
1017 	
1018 	    id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
1019 	    if not id_valid:
1020 	        utils.err(id_error)
1021 	
1022 	    dom = utils.get_cib_dom()
1023 	
1024 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1025 	        (
1026 	            rsc_valid,
1027 	            rsc_error,
1028 	            dummy_correct_id,
1029 	        ) = utils.validate_constraint_resource(dom, rsc_value)
1030 	        if not rsc_valid:
1031 	            utils.err(rsc_error)
1032 	
1033 	    # Verify current constraint doesn't already exist
1034 	    # If it does we replace it with the new constraint
1035 	    dummy_dom, constraintsElement = getCurrentConstraints(dom)
1036 	    # If the id matches, or the rsc & node match, then we replace/remove
1037 	    elementsToRemove = [
1038 	        rsc_loc
1039 	        for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1040 	        # pylint: disable=too-many-boolean-expressions
1041 	        if rsc_loc.getAttribute("id") == constraint_id
1042 	        or (
1043 	            rsc_loc.getAttribute("node") == node
1044 	            and (
1045 	                (
1046 	                    rsc_type == RESOURCE_TYPE_RESOURCE
1047 	                    and rsc_loc.getAttribute("rsc") == rsc_value
1048 	                )
1049 	                or (
1050 	                    rsc_type == RESOURCE_TYPE_REGEXP
1051 	                    and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1052 	                )
1053 	            )
1054 	        )
1055 	    ]
1056 	    for etr in elementsToRemove:
1057 	        constraintsElement.removeChild(etr)
1058 	
1059 	    element = dom.createElement("rsc_location")
1060 	    element.setAttribute("id", constraint_id)
1061 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1062 	        element.setAttribute("rsc", rsc_value)
1063 	    elif rsc_type == RESOURCE_TYPE_REGEXP:
1064 	        element.setAttribute("rsc-pattern", rsc_value)
1065 	    element.setAttribute("node", node)
1066 	    element.setAttribute("score", score)
1067 	    for option in options:
1068 	        element.setAttribute(option[0], option[1])
1069 	    constraintsElement.appendChild(element)
1070 	
1071 	    utils.replace_cib_configuration(dom)
1072 	
1073 	
1074 	def location_rule(lib, argv, modifiers):  # noqa: PLR0912
1075 	    """
1076 	    Options:
1077 	      * -f - CIB file
1078 	      * --force - allow constraint on any resource type, allow duplicate
1079 	        constraints
1080 	    """
1081 	    del lib
1082 	    modifiers.ensure_only_supported("-f", "--force")
1083 	    if len(argv) < 3:
1084 	        raise CmdLineInputError()
1085 	
1086 	    rsc_type, rsc_value = parse_args.parse_typed_arg(
1087 	        argv.pop(0),
1088 	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
1089 	        RESOURCE_TYPE_RESOURCE,
1090 	    )
1091 	    argv.pop(0)  # pop "rule"
1092 	    options, rule_argv = rule_utils.parse_argv(
1093 	        argv,
1094 	        {
1095 	            "constraint-id": None,
1096 	            "resource-discovery": None,
1097 	        },
1098 	    )
1099 	    resource_discovery = (
1100 	        "resource-discovery" in options and options["resource-discovery"]
1101 	    )
1102 	
1103 	    try:
1104 	        # Parse the rule to see if we need to upgrade CIB schema. All errors
1105 	        # would be properly reported by a validator called bellow, so we can
1106 	        # safely ignore them here.
1107 	        parsed_rule = rule_utils.RuleParser().parse(
1108 	            rule_utils.TokenPreprocessor().run(rule_argv)
1109 	        )
1110 	        if rule_utils.has_node_attr_expr_with_type_integer(parsed_rule):
1111 	            utils.checkAndUpgradeCIB(
1112 	                const.PCMK_RULES_NODE_ATTR_EXPR_WITH_INT_TYPE_CIB_VERSION
1113 	            )
1114 	    except (rule_utils.ParserException, rule_utils.CibBuilderException):
1115 	        pass
1116 	
1117 	    dom = utils.get_cib_dom()
1118 	
1119 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1120 	        (
1121 	            rsc_valid,
1122 	            rsc_error,
1123 	            dummy_correct_id,
1124 	        ) = utils.validate_constraint_resource(dom, rsc_value)
1125 	        if not rsc_valid:
1126 	            utils.err(rsc_error)
1127 	
1128 	    cib, constraints = getCurrentConstraints(dom)
1129 	    lc = cib.createElement("rsc_location")
1130 	
1131 	    # If resource-discovery is specified, we use it with the rsc_location
1132 	    # element not the rule
1133 	    if resource_discovery:
1134 	        if not modifiers.get("--force"):
1135 	            allowed_discovery = list(
1136 	                map(
1137 	                    str,
1138 	                    [
1139 	                        CibResourceDiscovery.ALWAYS,
1140 	                        CibResourceDiscovery.EXCLUSIVE,
1141 	                        CibResourceDiscovery.NEVER,
1142 	                    ],
1143 	                )
1144 	            )
1145 	            if resource_discovery not in allowed_discovery:
1146 	                utils.err(
1147 	                    (
1148 	                        "invalid {0} value '{1}', allowed values are: {2}, "
1149 	                        "use --force to override"
1150 	                    ).format(
1151 	                        "resource-discovery",
1152 	                        resource_discovery,
1153 	                        format_list(allowed_discovery),
1154 	                    )
1155 	                )
1156 	        lc.setAttribute("resource-discovery", options.pop("resource-discovery"))
1157 	
1158 	    constraints.appendChild(lc)
1159 	    if options.get("constraint-id"):
1160 	        id_valid, id_error = utils.validate_xml_id(
1161 	            options["constraint-id"], "constraint id"
1162 	        )
1163 	        if not id_valid:
1164 	            utils.err(id_error)
1165 	        if utils.does_id_exist(dom, options["constraint-id"]):
1166 	            utils.err(
1167 	                "id '%s' is already in use, please specify another one"
1168 	                % options["constraint-id"]
1169 	            )
1170 	        lc.setAttribute("id", options["constraint-id"])
1171 	        del options["constraint-id"]
1172 	    else:
1173 	        lc.setAttribute(
1174 	            "id",
1175 	            utils.find_unique_id(dom, sanitize_id("location-" + rsc_value)),
1176 	        )
1177 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1178 	        lc.setAttribute("rsc", rsc_value)
1179 	    elif rsc_type == RESOURCE_TYPE_REGEXP:
1180 	        lc.setAttribute("rsc-pattern", rsc_value)
1181 	
1182 	    rule_utils.dom_rule_add(
1183 	        lc, options, rule_argv, utils.getValidateWithVersion(cib)
1184 	    )
1185 	    location_rule_check_duplicates(constraints, lc, modifiers.get("--force"))
1186 	    utils.replace_cib_configuration(cib)
1187 	
1188 	
1189 	def location_rule_check_duplicates(dom, constraint_el, force):
1190 	    """
1191 	    Commandline options: no options
1192 	    """
1193 	    if not force:
1194 	        duplicates = location_rule_find_duplicates(dom, constraint_el)
1195 	        if duplicates:
1196 	            lines = []
1197 	            for dup in duplicates:
1198 	                lines.append("  Constraint: %s" % dup.getAttribute("id"))
1199 	                lines.extend(
1200 	                    rule_utils.ExportDetailed().get_string(
1201 	                        dup_rule, False, True, indent="    "
1202 	                    )
1203 	                    for dup_rule in utils.dom_get_children_by_tag_name(
1204 	                        dup, "rule"
1205 	                    )
1206 	                )
1207 	            utils.err(
1208 	                "duplicate constraint already exists, use --force to override\n"
1209 	                + "\n".join(lines)
1210 	            )
1211 	
1212 	
1213 	def location_rule_find_duplicates(dom, constraint_el):
1214 	    """
1215 	    Commandline options: no options
1216 	    """
1217 	
1218 	    def normalize(constraint_el):
1219 	        if constraint_el.hasAttribute("rsc-pattern"):
1220 	            rsc = (
1221 	                RESOURCE_TYPE_REGEXP,
1222 	                constraint_el.getAttribute("rsc-pattern"),
1223 	            )
1224 	        else:
1225 	            rsc = (RESOURCE_TYPE_RESOURCE, constraint_el.getAttribute("rsc"))
1226 	        return (
1227 	            rsc,
1228 	            [
1229 	                rule_utils.ExportAsExpression().get_string(rule_el, True)
1230 	                for rule_el in constraint_el.getElementsByTagName("rule")
1231 	            ],
1232 	        )
1233 	
1234 	    normalized_el = normalize(constraint_el)
1235 	    return [
1236 	        other_el
1237 	        for other_el in dom.getElementsByTagName("rsc_location")
1238 	        if other_el.getElementsByTagName("rule")
1239 	        and constraint_el is not other_el
1240 	        and normalized_el == normalize(other_el)
1241 	    ]
1242 	
1243 	
1244 	# Grabs the current constraints and returns the dom and constraint element
1245 	def getCurrentConstraints(passed_dom=None):
1246 	    """
1247 	    Commandline options:
1248 	      * -f - CIB file, only if passed_dom is None
1249 	    """
1250 	    if passed_dom:
1251 	        dom = passed_dom
1252 	    else:
1253 	        current_constraints_xml = utils.get_cib_xpath("//constraints")
1254 	        if current_constraints_xml == "":
1255 	            utils.err("unable to process cib")
1256 	        # Verify current constraint doesn't already exist
1257 	        # If it does we replace it with the new constraint
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1258 	        dom = parseString(current_constraints_xml)
1259 	
1260 	    constraintsElement = dom.getElementsByTagName("constraints")[0]
1261 	    return (dom, constraintsElement)
1262 	
1263 	
1264 	# If returnStatus is set, then we don't error out, we just print the error
1265 	# and return false
1266 	def constraint_rm(  # noqa: PLR0912
1267 	    lib,
1268 	    argv,
1269 	    modifiers,
1270 	    returnStatus=False,
1271 	    constraintsElement=None,
1272 	    passed_dom=None,
1273 	):
1274 	    """
1275 	    Options:
1276 	      * -f - CIB file, effective only if passed_dom is None
1277 	    """
1278 	    if passed_dom is None:
1279 	        modifiers.ensure_only_supported("-f")
1280 	    if not argv:
1281 	        raise CmdLineInputError()
1282 	
1283 	    bad_constraint = False
1284 	    if len(argv) != 1:
1285 	        for arg in argv:
1286 	            if not constraint_rm(
1287 	                lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1288 	            ):
1289 	                bad_constraint = True
1290 	        if bad_constraint:
1291 	            sys.exit(1)
1292 	        return None
1293 	
1294 	    c_id = argv.pop(0)
1295 	    elementFound = False
1296 	    dom = None
1297 	    use_cibadmin = False
1298 	    if not constraintsElement:
1299 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1300 	        use_cibadmin = True
1301 	
1302 	    for co in constraintsElement.childNodes[:]:
1303 	        if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1304 	            continue
1305 	        if co.getAttribute("id") == c_id:
1306 	            constraintsElement.removeChild(co)
1307 	            elementFound = True
1308 	
1309 	    if not elementFound:
1310 	        for rule in constraintsElement.getElementsByTagName("rule")[:]:
1311 	            if rule.getAttribute("id") == c_id:
1312 	                elementFound = True
1313 	                parent = rule.parentNode
1314 	                parent.removeChild(rule)
1315 	                if not parent.getElementsByTagName("rule"):
1316 	                    parent.parentNode.removeChild(parent)
1317 	
1318 	    if elementFound:
1319 	        if passed_dom:
1320 	            return dom
1321 	        if use_cibadmin:
1322 	            utils.replace_cib_configuration(dom)
1323 	        if returnStatus:
1324 	            return True
1325 	    else:
1326 	        utils.err("Unable to find constraint - '%s'" % c_id, False)
1327 	        if returnStatus:
1328 	            return False
1329 	        sys.exit(1)
1330 	    return None
1331 	
1332 	
1333 	def _split_set_constraints(
1334 	    constraints_dto: CibConstraintsDto,
1335 	) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1336 	    return (
1337 	        CibConstraintsDto(
1338 	            location=constraints_dto.location,
1339 	            colocation=constraints_dto.colocation,
1340 	            order=constraints_dto.order,
1341 	            ticket=constraints_dto.ticket,
1342 	        ),
1343 	        CibConstraintsDto(
1344 	            location_set=constraints_dto.location_set,
1345 	            colocation_set=constraints_dto.colocation_set,
1346 	            order_set=constraints_dto.order_set,
1347 	            ticket_set=constraints_dto.ticket_set,
1348 	        ),
1349 	    )
1350 	
1351 	
1352 	def _find_constraints_containing_resource(
1353 	    resources_dto: CibResourcesDto,
1354 	    constraints_dto: CibConstraintsDto,
1355 	    resource_id: str,
1356 	) -> CibConstraintsDto:
1357 	    resources_filter = [resource_id]
1358 	    # Original implementation only included parent resource only if resource_id
1359 	    # was referring to a primitive resource, ignoring groups. This may change in
1360 	    # the future if necessary.
1361 	    if any(
1362 	        primitive_dto.id == resource_id
1363 	        for primitive_dto in resources_dto.primitives
1364 	    ):
1365 	        for clone_dto in resources_dto.clones:
1366 	            if clone_dto.member_id == resource_id:
1367 	                resources_filter.append(clone_dto.id)
1368 	                break
1369 	    return _filter_constraints_by_resources(
1370 	        constraints_dto, resources_filter, []
1371 	    )
1372 	
1373 	
1374 	def ref(
1375 	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1376 	) -> None:
1377 	    modifiers.ensure_only_supported("-f")
1378 	    if not argv:
1379 	        raise CmdLineInputError()
1380 	
1381 	    resources_dto = cast(
1382 	        CibResourcesDto, lib.resource.get_configured_resources()
1383 	    )
1384 	
1385 	    constraints_dto = cast(
1386 	        CibConstraintsDto,
1387 	        lib.constraint.get_config(evaluate_rules=False),
1388 	    )
1389 	
1390 	    for resource_id in sorted(set(argv)):
1391 	        constraint_ids = get_all_constraints_ids(
1392 	            _find_constraints_containing_resource(
1393 	                resources_dto, constraints_dto, resource_id
1394 	            )
1395 	        )
1396 	        print(f"Resource: {resource_id}")
1397 	        if constraint_ids:
1398 	            print(
1399 	                "\n".join(
1400 	                    indent(
1401 	                        sorted(constraint_ids),
1402 	                        indent_step=INDENT_STEP,
1403 	                    )
1404 	                )
1405 	            )
1406 	        else:
1407 	            print("  No Matches")
1408 	
1409 	
1410 	def remove_constraints_containing(
1411 	    resource_id: str, output=False, constraints_element=None, passed_dom=None
1412 	):
1413 	    """
1414 	    Commandline options:
1415 	      * -f - CIB file, effective only if passed_dom is None
1416 	    """
1417 	    lib = utils.get_library_wrapper()
1418 	    modifiers = utils.get_input_modifiers()
1419 	    resources_dto = cast(
1420 	        CibResourcesDto, lib.resource.get_configured_resources()
1421 	    )
1422 	
1423 	    constraints_dto, set_constraints_dto = _split_set_constraints(
1424 	        cast(
1425 	            CibConstraintsDto,
1426 	            lib.constraint.get_config(evaluate_rules=False),
1427 	        )
1428 	    )
1429 	    constraints = sorted(
1430 	        get_all_constraints_ids(
1431 	            _find_constraints_containing_resource(
1432 	                resources_dto, constraints_dto, resource_id
1433 	            )
1434 	        )
1435 	    )
1436 	    set_constraints = sorted(
1437 	        get_all_constraints_ids(
1438 	            _find_constraints_containing_resource(
1439 	                resources_dto, set_constraints_dto, resource_id
1440 	            )
1441 	        )
1442 	    )
1443 	    for c in constraints:
1444 	        if output:
1445 	            print_to_stderr(f"Removing Constraint - {c}")
1446 	        if constraints_element is not None:
1447 	            constraint_rm(
1448 	                lib,
1449 	                [c],
1450 	                modifiers,
1451 	                True,
1452 	                constraints_element,
1453 	                passed_dom=passed_dom,
1454 	            )
1455 	        else:
1456 	            constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1457 	
1458 	    if set_constraints:
1459 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1460 	        for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1461 	            # If resource id is in a set, remove it from the set, if the set
1462 	            # is empty, then we remove the set, if the parent of the set
1463 	            # is empty then we remove it
1464 	            if set_c.getAttribute("id") == resource_id:
1465 	                parent_node = set_c.parentNode
1466 	                parent_node.removeChild(set_c)
1467 	                if output:
1468 	                    print_to_stderr(
1469 	                        "Removing {} from set {}".format(
1470 	                            resource_id, parent_node.getAttribute("id")
1471 	                        )
1472 	                    )
1473 	                if parent_node.getElementsByTagName("resource_ref").length == 0:
1474 	                    print_to_stderr(
1475 	                        "Removing set {}".format(parent_node.getAttribute("id"))
1476 	                    )
1477 	                    parent_node_2 = parent_node.parentNode
1478 	                    parent_node_2.removeChild(parent_node)
1479 	                    if (
1480 	                        parent_node_2.getElementsByTagName(
1481 	                            "resource_set"
1482 	                        ).length
1483 	                        == 0
1484 	                    ):
1485 	                        parent_node_2.parentNode.removeChild(parent_node_2)
1486 	                        print_to_stderr(
1487 	                            "Removing constraint {}".format(
1488 	                                parent_node_2.getAttribute("id")
1489 	                            )
1490 	                        )
1491 	        if passed_dom:
1492 	            return dom
1493 	        utils.replace_cib_configuration(dom)
1494 	    return None
1495 	
1496 	
1497 	# Re-assign any constraints referencing a resource to its parent (a clone
1498 	# or master)
1499 	def constraint_resource_update(old_id, dom):
1500 	    """
1501 	    Commandline options: no options
1502 	    """
1503 	    new_id = None
1504 	    clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1505 	    if clone_ms_parent:
1506 	        new_id = clone_ms_parent.getAttribute("id")
1507 	
1508 	    if new_id:
1509 	        constraints = dom.getElementsByTagName("rsc_location")
1510 	        constraints += dom.getElementsByTagName("rsc_order")
1511 	        constraints += dom.getElementsByTagName("rsc_colocation")
1512 	        attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1513 	        for constraint in constraints:
1514 	            for attr in attrs_to_update:
1515 	                if constraint.getAttribute(attr) == old_id:
1516 	                    constraint.setAttribute(attr, new_id)
1517 	    return dom
1518 	
1519 	
1520 	def constraint_rule_add(lib, argv, modifiers):
1521 	    """
1522 	    Options:
1523 	      * -f - CIB file
1524 	      * --force - allow duplicate constraints
1525 	    """
1526 	    # deprecated in pacemaker 2, removed in pacemaker 3
1527 	    # added to pcs after 0.11.8
1528 	    # the whole command removed in pcs-0.12
1529 	    deprecation_warning(
1530 	        "The possibility of defining multiple rules in a single location "
1531 	        "constraint is deprecated and will be removed."
1532 	    )
1533 	    del lib
1534 	    modifiers.ensure_only_supported("-f", "--force")
1535 	    if not argv:
1536 	        raise CmdLineInputError()
1537 	    constraint_id = argv.pop(0)
1538 	    options, rule_argv = rule_utils.parse_argv(argv)
1539 	    try:
1540 	        # Parse the rule to see if we need to upgrade CIB schema. All errors
1541 	        # would be properly reported by a validator called bellow, so we can
1542 	        # safely ignore them here.
1543 	        parsed_rule = rule_utils.RuleParser().parse(
1544 	            rule_utils.TokenPreprocessor().run(rule_argv)
1545 	        )
1546 	        if rule_utils.has_node_attr_expr_with_type_integer(parsed_rule):
1547 	            utils.checkAndUpgradeCIB(
1548 	                const.PCMK_RULES_NODE_ATTR_EXPR_WITH_INT_TYPE_CIB_VERSION
1549 	            )
1550 	    except (rule_utils.ParserException, rule_utils.CibBuilderException):
1551 	        pass
1552 	    cib = utils.get_cib_dom()
1553 	    constraint = utils.dom_get_element_with_id(
1554 	        cib.getElementsByTagName("constraints")[0],
1555 	        "rsc_location",
1556 	        constraint_id,
1557 	    )
1558 	    if not constraint:
1559 	        utils.err("Unable to find constraint: " + constraint_id)
1560 	    rule_utils.dom_rule_add(
1561 	        constraint,
1562 	        options,
1563 	        rule_argv,
1564 	        utils.getValidateWithVersion(cib),
1565 	    )
1566 	    location_rule_check_duplicates(cib, constraint, modifiers.get("--force"))
1567 	    utils.replace_cib_configuration(cib)
1568