1    	import sys
2    	import xml.dom.minidom
3    	from enum import Enum
4    	from typing import (
5    	    Any,
6    	    Iterable,
7    	    Optional,
8    	    Set,
9    	    TypeVar,
10   	    cast,
11   	)
12   	from xml.dom.minidom import parseString
13   	
14   	import pcs.cli.constraint_order.command as order_command
15   	from pcs import utils
16   	from pcs.cli.common import parse_args
17   	from pcs.cli.common.errors import (
18   	    CmdLineInputError,
19   	    raise_command_replaced,
20   	)
21   	from pcs.cli.common.output import (
22   	    INDENT_STEP,
23   	    lines_to_str,
24   	)
25   	from pcs.cli.constraint.location.command import (
26   	    RESOURCE_TYPE_REGEXP,
27   	    RESOURCE_TYPE_RESOURCE,
28   	)
29   	from pcs.cli.constraint.output import (
30   	    CibConstraintLocationAnyDto,
31   	    filter_constraints_by_rule_expired_status,
32   	    location,
33   	    print_config,
34   	)
35   	from pcs.cli.reports import process_library_reports
36   	from pcs.cli.reports.output import (
37   	    deprecation_warning,
38   	    print_to_stderr,
39   	    warn,
40   	)
41   	from pcs.common import (
42   	    const,
43   	    pacemaker,
44   	    reports,
45   	)
46   	from pcs.common.pacemaker.constraint import (
47   	    CibConstraintColocationSetDto,
48   	    CibConstraintLocationSetDto,
49   	    CibConstraintOrderSetDto,
50   	    CibConstraintsDto,
51   	    CibConstraintTicketSetDto,
52   	    get_all_constraints_ids,
53   	)
54   	from pcs.common.pacemaker.resource.list import CibResourcesDto
55   	from pcs.common.pacemaker.types import CibResourceDiscovery
56   	from pcs.common.reports import ReportItem
57   	from pcs.common.str_tools import (
58   	    format_list,
59   	    indent,
60   	)
61   	from pcs.common.types import (
62   	    StringCollection,
63   	    StringIterable,
64   	    StringSequence,
65   	)
66   	from pcs.lib.cib.constraint.order import ATTRIB as order_attrib
67   	from pcs.lib.node import get_existing_nodes_names
68   	from pcs.lib.pacemaker.values import (
69   	    SCORE_INFINITY,
70   	    is_true,
71   	    sanitize_id,
72   	)
73   	
74   	# pylint: disable=invalid-name
75   	# pylint: disable=too-many-branches
76   	# pylint: disable=too-many-lines
77   	# pylint: disable=too-many-locals
78   	# pylint: disable=too-many-statements
79   	
80   	DEFAULT_ACTION = const.PCMK_ACTION_START
81   	DEFAULT_ROLE = const.PCMK_ROLE_STARTED
82   	
83   	OPTIONS_SYMMETRICAL = order_attrib["symmetrical"]
84   	
85   	LOCATION_NODE_VALIDATION_SKIP_MSG = (
86   	    "Validation for node existence in the cluster will be skipped"
87   	)
88   	STANDALONE_SCORE_MSG = (
89   	    "Specifying score as a standalone value is deprecated and "
90   	    "might be removed in a future release, use score=value instead"
91   	)
92   	
93   	
94   	class CrmRuleReturnCode(Enum):
95   	    IN_EFFECT = 0
96   	    EXPIRED = 110
97   	    TO_BE_IN_EFFECT = 111
98   	
99   	
100  	def constraint_order_cmd(lib, argv, modifiers):
101  	    sub_cmd = "config" if not argv else argv.pop(0)
102  	
103  	    try:
104  	        if sub_cmd == "set":
105  	            order_command.create_with_set(lib, argv, modifiers)
106  	        elif sub_cmd in ["remove", "delete"]:
107  	            order_rm(lib, argv, modifiers)
108  	        elif sub_cmd == "show":
109  	            raise_command_replaced(
110  	                ["pcs constraint order config"], pcs_version="0.12"
111  	            )
112  	        elif sub_cmd == "config":
113  	            order_command.config_cmd(lib, argv, modifiers)
114  	        else:
115  	            order_start(lib, [sub_cmd] + argv, modifiers)
116  	    except CmdLineInputError as e:
117  	        utils.exit_on_cmdline_input_error(e, "constraint", ["order", sub_cmd])
118  	
119  	
120  	def config_cmd(
121  	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
122  	) -> None:
123  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
124  	    if argv:
125  	        raise CmdLineInputError()
126  	
127  	    print_config(
128  	        cast(
129  	            CibConstraintsDto,
130  	            lib.constraint.get_config(evaluate_rules=True),
131  	        ),
132  	        modifiers,
133  	    )
134  	
135  	
136  	def _validate_constraint_resource(cib_dom, resource_id):
137  	    (
138  	        resource_valid,
139  	        resource_error,
140  	        dummy_correct_id,
141  	    ) = utils.validate_constraint_resource(cib_dom, resource_id)
142  	    if not resource_valid:
143  	        utils.err(resource_error)
144  	
145  	
146  	def _validate_resources_not_in_same_group(cib_dom, resource1, resource2):
147  	    if not utils.validate_resources_not_in_same_group(
148  	        cib_dom, resource1, resource2
149  	    ):
150  	        utils.err(
151  	            "Cannot create an order constraint for resources in the same group"
152  	        )
153  	
154  	
155  	# Syntax: colocation add [role] <src> with [role] <tgt> [score] [options]
156  	# possible commands:
157  	#        <src> with        <tgt> [score] [options]
158  	#        <src> with <role> <tgt> [score] [options]
159  	# <role> <src> with        <tgt> [score] [options]
160  	# <role> <src> with <role> <tgt> [score] [options]
161  	# Specifying score as a single argument is deprecated, though. The correct way
162  	# is score=value in options.
163  	def colocation_add(lib, argv, modifiers):  # noqa: PLR0912, PLR0915
164  	    """
165  	    Options:
166  	      * -f - CIB file
167  	      * --force - allow constraint on any resource, allow duplicate constraints
168  	    """
169  	
170  	    def _parse_score_options(argv):
171  	        # When passed an array of arguments if the first argument doesn't have
172  	        # an '=' then it's the score, otherwise they're all arguments. Return a
173  	        # tuple with the score and array of name,value pairs
174  	        """
175  	        Commandline options: no options
176  	        """
177  	        if not argv:
178  	            return None, []
179  	        score = None
180  	        if "=" not in argv[0]:
181  	            score = argv.pop(0)
182  	            # TODO added to pcs in the first 0.12.x version
183  	            deprecation_warning(STANDALONE_SCORE_MSG)
184  	
185  	        # create a list of 2-tuples (name, value)
186  	        arg_array = [
187  	            parse_args.split_option(arg, allow_empty_value=False)
188  	            for arg in argv
189  	        ]
190  	        return score, arg_array
191  	
192  	    def _validate_and_prepare_role(new_roles_supported, role):
193  	        if role is None:
194  	            return ""
195  	        role_cleaned = role.lower().capitalize()
196  	        if role_cleaned not in const.PCMK_ROLES:
197  	            utils.err(
198  	                "invalid role value '{0}', allowed values are: {1}".format(
199  	                    role, format_list(const.PCMK_ROLES)
200  	                )
201  	            )
202  	        return pacemaker.role.get_value_for_cib(
203  	            role_cleaned, new_roles_supported
204  	        )
205  	
206  	    del lib
207  	    modifiers.ensure_only_supported("-f", "--force")
208  	    if len(argv) < 3:
209  	        raise CmdLineInputError()
210  	
211  	    role1_candidate = None
212  	    role2_candidate = None
213  	
214  	    if argv[2] == "with":
215  	        role1_candidate = argv.pop(0)
216  	        resource1 = argv.pop(0)
217  	    elif argv[1] == "with":
218  	        resource1 = argv.pop(0)
219  	    else:
220  	        raise CmdLineInputError()
221  	
222  	    if argv.pop(0) != "with":
223  	        raise CmdLineInputError()
224  	    if "with" in argv:
225  	        raise CmdLineInputError(
226  	            message="Multiple 'with's cannot be specified.",
227  	            hint=(
228  	                "Use the 'pcs constraint colocation set' command if you want "
229  	                "to create a constraint for more than two resources."
230  	            ),
231  	            show_both_usage_and_message=True,
232  	        )
233  	
234  	    if not argv:
235  	        raise CmdLineInputError()
236  	    if len(argv) == 1 or utils.is_score_or_opt(argv[1]):
237  	        resource2 = argv.pop(0)
238  	    else:
239  	        role2_candidate = argv.pop(0)
240  	        resource2 = argv.pop(0)
241  	
242  	    score, nv_pairs = _parse_score_options(argv)
243  	    influence_attr_set = any(name == "influence" for name, _ in nv_pairs)
244  	
245  	    cib_dom = (
246  	        utils.cluster_upgrade_to_version(
247  	            const.PCMK_COLOCATION_INFLUENCE_CIB_VERSION
248  	        )
249  	        if influence_attr_set
250  	        else utils.get_cib_dom()
251  	    )
252  	    new_roles_supported = utils.isCibVersionSatisfied(
253  	        cib_dom, const.PCMK_NEW_ROLES_CIB_VERSION
254  	    )
255  	
256  	    role1 = _validate_and_prepare_role(new_roles_supported, role1_candidate)
257  	    role2 = _validate_and_prepare_role(new_roles_supported, role2_candidate)
258  	    _validate_constraint_resource(cib_dom, resource1)
259  	    _validate_constraint_resource(cib_dom, resource2)
260  	
261  	    id_in_nvpairs = None
262  	    for name, value in nv_pairs:
263  	        if name == "id":
264  	            id_valid, id_error = utils.validate_xml_id(value, "constraint id")
265  	            if not id_valid:
266  	                utils.err(id_error)
267  	            if utils.does_id_exist(cib_dom, value):
268  	                utils.err(
269  	                    "id '%s' is already in use, please specify another one"
270  	                    % value
271  	                )
272  	            id_in_nvpairs = True
273  	        elif name == "score":
274  	            score = value
275  	    if score is None:
276  	        score = SCORE_INFINITY
277  	    if not id_in_nvpairs:
278  	        nv_pairs.append(
279  	            (
280  	                "id",
281  	                utils.find_unique_id(
282  	                    cib_dom,
283  	                    "colocation-%s-%s-%s" % (resource1, resource2, score),
284  	                ),
285  	            )
286  	        )
287  	
288  	    (dom, constraintsElement) = getCurrentConstraints(cib_dom)
289  	
290  	    # If one role is specified, the other should default to "started"
291  	    if role1 != "" and role2 == "":
292  	        role2 = DEFAULT_ROLE
293  	    if role2 != "" and role1 == "":
294  	        role1 = DEFAULT_ROLE
295  	    element = dom.createElement("rsc_colocation")
296  	    element.setAttribute("rsc", resource1)
297  	    element.setAttribute("with-rsc", resource2)
298  	    element.setAttribute("score", score)
299  	    if role1 != "":
300  	        element.setAttribute("rsc-role", role1)
301  	    if role2 != "":
302  	        element.setAttribute("with-rsc-role", role2)
303  	    for nv_pair in nv_pairs:
304  	        element.setAttribute(nv_pair[0], nv_pair[1])
305  	    if not modifiers.get("--force"):
306  	
307  	        def _constraint_export(constraint_info):
308  	            options_dict = constraint_info["options"]
309  	            co_resource1 = options_dict.get("rsc", "")
310  	            co_resource2 = options_dict.get("with-rsc", "")
311  	            co_id = options_dict.get("id", "")
312  	            co_score = options_dict.get("score", "")
313  	            score_text = "(score:" + co_score + ")"
314  	            console_option_list = [
315  	                f"({option[0]}:{option[1]})"
316  	                for option in sorted(options_dict.items())
317  	                if option[0] not in ("rsc", "with-rsc", "id", "score")
318  	            ]
319  	            console_option_list.append(f"(id:{co_id})")
320  	            return " ".join(
321  	                [co_resource1, "with", co_resource2, score_text]
322  	                + console_option_list
323  	            )
324  	
325  	        duplicates = colocation_find_duplicates(constraintsElement, element)
326  	        if duplicates:
327  	            utils.err(
328  	                "duplicate constraint already exists, use --force to override\n"
329  	                + "\n".join(
330  	                    [
331  	                        "  "
332  	                        + _constraint_export(
333  	                            {"options": dict(dup.attributes.items())}
334  	                        )
335  	                        for dup in duplicates
336  	                    ]
337  	                )
338  	            )
339  	    constraintsElement.appendChild(element)
340  	    utils.replace_cib_configuration(dom)
341  	
342  	
343  	def colocation_find_duplicates(dom, constraint_el):
344  	    """
345  	    Commandline options: no options
346  	    """
347  	    new_roles_supported = utils.isCibVersionSatisfied(
348  	        dom, const.PCMK_NEW_ROLES_CIB_VERSION
349  	    )
350  	
351  	    def normalize(const_el):
352  	        return (
353  	            const_el.getAttribute("rsc"),
354  	            const_el.getAttribute("with-rsc"),
355  	            pacemaker.role.get_value_for_cib(
356  	                const_el.getAttribute("rsc-role").capitalize() or DEFAULT_ROLE,
357  	                new_roles_supported,
358  	            ),
359  	            pacemaker.role.get_value_for_cib(
360  	                const_el.getAttribute("with-rsc-role").capitalize()
361  	                or DEFAULT_ROLE,
362  	                new_roles_supported,
363  	            ),
364  	        )
365  	
366  	    normalized_el = normalize(constraint_el)
367  	    return [
368  	        other_el
369  	        for other_el in dom.getElementsByTagName("rsc_colocation")
370  	        if not other_el.getElementsByTagName("resource_set")
371  	        and constraint_el is not other_el
372  	        and normalized_el == normalize(other_el)
373  	    ]
374  	
375  	
376  	def order_rm(lib, argv, modifiers):
377  	    """
378  	    Options:
379  	      * -f - CIB file
380  	    """
381  	    del lib
382  	    modifiers.ensure_only_supported("-f")
383  	    if not argv:
384  	        raise CmdLineInputError()
385  	
386  	    elementFound = False
387  	    (dom, constraintsElement) = getCurrentConstraints()
388  	
389  	    for resource in argv:
390  	        for ord_loc in constraintsElement.getElementsByTagName("rsc_order")[:]:
391  	            if (
392  	                ord_loc.getAttribute("first") == resource
393  	                or ord_loc.getAttribute("then") == resource
394  	            ):
395  	                constraintsElement.removeChild(ord_loc)
396  	                elementFound = True
397  	
398  	        resource_refs_to_remove = []
399  	        for ord_set in constraintsElement.getElementsByTagName("resource_ref"):
400  	            if ord_set.getAttribute("id") == resource:
401  	                resource_refs_to_remove.append(ord_set)
402  	                elementFound = True
403  	
404  	        for res_ref in resource_refs_to_remove:
405  	            res_set = res_ref.parentNode
406  	            res_order = res_set.parentNode
407  	
408  	            res_ref.parentNode.removeChild(res_ref)
409  	            if not res_set.getElementsByTagName("resource_ref"):
410  	                res_set.parentNode.removeChild(res_set)
411  	                if not res_order.getElementsByTagName("resource_set"):
412  	                    res_order.parentNode.removeChild(res_order)
413  	
414  	    if elementFound:
415  	        utils.replace_cib_configuration(dom)
416  	    else:
417  	        utils.err("No matching resources found in ordering list")
418  	
419  	
420  	def order_start(lib, argv, modifiers):
421  	    """
422  	    Options:
423  	      * -f - CIB file
424  	      * --force - allow constraint for any resource, allow duplicate constraints
425  	    """
426  	    del lib
427  	    modifiers.ensure_only_supported("-f", "--force")
428  	    if len(argv) < 3:
429  	        raise CmdLineInputError()
430  	
431  	    first_action = DEFAULT_ACTION
432  	    then_action = DEFAULT_ACTION
433  	    action = argv[0]
434  	    if action in const.PCMK_ACTIONS:
435  	        first_action = action
436  	        argv.pop(0)
437  	
438  	    resource1 = argv.pop(0)
439  	    if argv.pop(0) != "then":
440  	        raise CmdLineInputError()
441  	
442  	    if not argv:
443  	        raise CmdLineInputError()
444  	
445  	    action = argv[0]
446  	    if action in const.PCMK_ACTIONS:
447  	        then_action = action
448  	        argv.pop(0)
449  	
450  	    if not argv:
451  	        raise CmdLineInputError()
452  	    resource2 = argv.pop(0)
453  	
454  	    order_options = []
455  	    if argv:
456  	        order_options = order_options + argv[:]
457  	    if "then" in order_options:
458  	        raise CmdLineInputError(
459  	            message="Multiple 'then's cannot be specified.",
460  	            hint=(
461  	                "Use the 'pcs constraint order set' command if you want to "
462  	                "create a constraint for more than two resources."
463  	            ),
464  	            show_both_usage_and_message=True,
465  	        )
466  	
467  	    order_options.append("first-action=" + first_action)
468  	    order_options.append("then-action=" + then_action)
469  	    _order_add(resource1, resource2, order_options, modifiers)
470  	
471  	
472  	def _order_add(resource1, resource2, options_list, modifiers):  # noqa: PLR0912, PLR0915
473  	    """
474  	    Commandline options:
475  	      * -f - CIB file
476  	      * --force - allow constraint for any resource, allow duplicate constraints
477  	    """
478  	    cib_dom = utils.get_cib_dom()
479  	    _validate_constraint_resource(cib_dom, resource1)
480  	    _validate_constraint_resource(cib_dom, resource2)
481  	
482  	    _validate_resources_not_in_same_group(cib_dom, resource1, resource2)
483  	
484  	    order_options = []
485  	    id_specified = False
486  	    sym = None
487  	    for arg in options_list:
488  	        if arg == "symmetrical":
489  	            sym = "true"
490  	        elif arg == "nonsymmetrical":
491  	            sym = "false"
492  	        else:
493  	            name, value = parse_args.split_option(arg, allow_empty_value=False)
494  	            if name == "id":
495  	                id_valid, id_error = utils.validate_xml_id(
496  	                    value, "constraint id"
497  	                )
498  	                if not id_valid:
499  	                    utils.err(id_error)
500  	                if utils.does_id_exist(cib_dom, value):
501  	                    utils.err(
502  	                        "id '%s' is already in use, please specify another one"
503  	                        % value
504  	                    )
505  	                id_specified = True
506  	                order_options.append((name, value))
507  	            elif name == "symmetrical":
508  	                if value.lower() in OPTIONS_SYMMETRICAL:
509  	                    sym = value.lower()
510  	                else:
511  	                    utils.err(
512  	                        "invalid symmetrical value '%s', allowed values are: %s"
513  	                        % (value, ", ".join(OPTIONS_SYMMETRICAL))
514  	                    )
515  	            else:
516  	                order_options.append((name, value))
517  	    if sym:
518  	        order_options.append(("symmetrical", sym))
519  	
520  	    options = ""
521  	    if order_options:
522  	        options = " (Options: %s)" % " ".join(
523  	            [
524  	                "%s=%s" % (name, value)
525  	                for name, value in order_options
526  	                if name not in ("kind", "score")
527  	            ]
528  	        )
529  	
530  	    scorekind = "kind: Mandatory"
531  	    id_suffix = "mandatory"
532  	    for opt in order_options:
533  	        if opt[0] == "score":
534  	            scorekind = "score: " + opt[1]
535  	            id_suffix = opt[1]
536  	            # TODO deprecated in pacemaker 2, to be removed in pacemaker 3
537  	            # added to pcs after 0.11.7
538  	            deprecation_warning(
539  	                reports.messages.DeprecatedOption(opt[0], []).message
540  	            )
541  	            break
542  	        if opt[0] == "kind":
543  	            scorekind = "kind: " + opt[1]
544  	            id_suffix = opt[1]
545  	            break
546  	
547  	    if not id_specified:
548  	        order_id = "order-" + resource1 + "-" + resource2 + "-" + id_suffix
549  	        order_id = utils.find_unique_id(cib_dom, order_id)
550  	        order_options.append(("id", order_id))
551  	
552  	    (dom, constraintsElement) = getCurrentConstraints()
553  	    element = dom.createElement("rsc_order")
554  	    element.setAttribute("first", resource1)
555  	    element.setAttribute("then", resource2)
556  	    for order_opt in order_options:
557  	        element.setAttribute(order_opt[0], order_opt[1])
558  	    constraintsElement.appendChild(element)
559  	    if not modifiers.get("--force"):
560  	
561  	        def _constraint_export(constraint_info):
562  	            options = constraint_info["options"]
563  	            oc_resource1 = options.get("first", "")
564  	            oc_resource2 = options.get("then", "")
565  	            first_action = options.get("first-action", "")
566  	            then_action = options.get("then-action", "")
567  	            oc_id = options.get("id", "")
568  	            oc_score = options.get("score", "")
569  	            oc_kind = options.get("kind", "")
570  	            oc_sym = ""
571  	            oc_id_out = ""
572  	            oc_options = ""
573  	            if "symmetrical" in options and not is_true(
574  	                options.get("symmetrical", "false")
575  	            ):
576  	                oc_sym = "(non-symmetrical)"
577  	            if oc_kind != "":
578  	                score_text = "(kind:" + oc_kind + ")"
579  	            elif oc_kind == "" and oc_score == "":
580  	                score_text = "(kind:Mandatory)"
581  	            else:
582  	                score_text = "(score:" + oc_score + ")"
583  	            oc_id_out = "(id:" + oc_id + ")"
584  	            already_processed_options = (
585  	                "first",
586  	                "then",
587  	                "first-action",
588  	                "then-action",
589  	                "id",
590  	                "score",
591  	                "kind",
592  	                "symmetrical",
593  	            )
594  	            oc_options = " ".join(
595  	                [
596  	                    f"{name}={value}"
597  	                    for name, value in options.items()
598  	                    if name not in already_processed_options
599  	                ]
600  	            )
601  	            if oc_options:
602  	                oc_options = "(Options: " + oc_options + ")"
603  	            return " ".join(
604  	                [
605  	                    arg
606  	                    for arg in [
607  	                        first_action,
608  	                        oc_resource1,
609  	                        "then",
610  	                        then_action,
611  	                        oc_resource2,
612  	                        score_text,
613  	                        oc_sym,
614  	                        oc_options,
615  	                        oc_id_out,
616  	                    ]
617  	                    if arg
618  	                ]
619  	            )
620  	
621  	        duplicates = order_find_duplicates(constraintsElement, element)
622  	        if duplicates:
623  	            utils.err(
624  	                "duplicate constraint already exists, use --force to override\n"
625  	                + "\n".join(
626  	                    [
627  	                        "  "
628  	                        + _constraint_export(
629  	                            {"options": dict(dup.attributes.items())}
630  	                        )
631  	                        for dup in duplicates
632  	                    ]
633  	                )
634  	            )
635  	    print_to_stderr(f"Adding {resource1} {resource2} ({scorekind}){options}")
636  	    utils.replace_cib_configuration(dom)
637  	
638  	
639  	def order_find_duplicates(dom, constraint_el):
640  	    """
641  	    Commandline options: no options
642  	    """
643  	
644  	    def normalize(constraint_el):
645  	        return (
646  	            constraint_el.getAttribute("first"),
647  	            constraint_el.getAttribute("then"),
648  	            constraint_el.getAttribute("first-action").lower()
649  	            or DEFAULT_ACTION,
650  	            constraint_el.getAttribute("then-action").lower() or DEFAULT_ACTION,
651  	        )
652  	
653  	    normalized_el = normalize(constraint_el)
654  	    return [
655  	        other_el
656  	        for other_el in dom.getElementsByTagName("rsc_order")
657  	        if not other_el.getElementsByTagName("resource_set")
658  	        and constraint_el is not other_el
659  	        and normalized_el == normalize(other_el)
660  	    ]
661  	
662  	
663  	_SetConstraint = TypeVar(
664  	    "_SetConstraint",
665  	    CibConstraintLocationSetDto,
666  	    CibConstraintColocationSetDto,
667  	    CibConstraintOrderSetDto,
668  	    CibConstraintTicketSetDto,
669  	)
670  	
671  	
672  	def _filter_set_constraints_by_resources(
673  	    constraints_dto: Iterable[_SetConstraint], resources: Set[str]
674  	) -> list[_SetConstraint]:
675  	    return [
676  	        constraint_set_dto
677  	        for constraint_set_dto in constraints_dto
678  	        if any(
679  	            set(resource_set.resources_ids) & resources
680  	            for resource_set in constraint_set_dto.resource_sets
681  	        )
682  	    ]
683  	
684  	
685  	def _filter_constraints_by_resources(
686  	    constraints_dto: CibConstraintsDto,
687  	    resources: StringIterable,
688  	    patterns: StringIterable,
689  	) -> CibConstraintsDto:
690  	    required_resources_set = set(resources)
691  	    required_patterns_set = set(patterns)
692  	    return CibConstraintsDto(
693  	        location=[
694  	            constraint_dto
695  	            for constraint_dto in constraints_dto.location
696  	            if (
697  	                constraint_dto.resource_id is not None
698  	                and constraint_dto.resource_id in required_resources_set
699  	            )
700  	            or (
701  	                constraint_dto.resource_pattern is not None
702  	                and constraint_dto.resource_pattern in required_patterns_set
703  	            )
704  	        ],
705  	        location_set=_filter_set_constraints_by_resources(
706  	            constraints_dto.location_set, required_resources_set
707  	        ),
708  	        colocation=[
709  	            constraint_dto
710  	            for constraint_dto in constraints_dto.colocation
711  	            if {constraint_dto.resource_id, constraint_dto.with_resource_id}
712  	            & required_resources_set
713  	        ],
714  	        colocation_set=_filter_set_constraints_by_resources(
715  	            constraints_dto.colocation_set, required_resources_set
716  	        ),
717  	        order=[
718  	            constraint_dto
719  	            for constraint_dto in constraints_dto.order
720  	            if {
721  	                constraint_dto.first_resource_id,
722  	                constraint_dto.then_resource_id,
723  	            }
724  	            & required_resources_set
725  	        ],
726  	        order_set=_filter_set_constraints_by_resources(
727  	            constraints_dto.order_set, required_resources_set
728  	        ),
729  	        ticket=[
730  	            constraint_dto
731  	            for constraint_dto in constraints_dto.ticket
732  	            if constraint_dto.resource_id in required_resources_set
733  	        ],
734  	        ticket_set=_filter_set_constraints_by_resources(
735  	            constraints_dto.ticket_set, required_resources_set
736  	        ),
737  	    )
738  	
739  	
740  	def _filter_location_by_node_base(
741  	    constraint_dtos: Iterable[CibConstraintLocationAnyDto],
742  	    nodes: StringCollection,
743  	) -> list[CibConstraintLocationAnyDto]:
744  	    return [
745  	        constraint_dto
746  	        for constraint_dto in constraint_dtos
747  	        if constraint_dto.attributes.node is not None
748  	        and constraint_dto.attributes.node in nodes
749  	    ]
750  	
751  	
752  	def location_config_cmd(
753  	    lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
754  	) -> None:
755  	    """
756  	    Options:
757  	      * --all - print expired constraints
758  	      * --full - print all details
759  	      * -f - CIB file
760  	    """
761  	    modifiers.ensure_only_supported("-f", "--output-format", "--full", "--all")
762  	    filter_type: Optional[str] = None
763  	    filter_items: parse_args.Argv = []
764  	    if argv:
765  	        filter_type, *filter_items = argv
766  	        allowed_types = ("resources", "nodes")
767  	        if filter_type not in allowed_types:
768  	            raise CmdLineInputError(
769  	                f"Unknown keyword '{filter_type}'. Allowed keywords: "
770  	                f"{format_list(allowed_types)}"
771  	            )
772  	        if modifiers.get_output_format() != parse_args.OUTPUT_FORMAT_VALUE_TEXT:
773  	            raise CmdLineInputError(
774  	                "Output formats other than 'text' are not supported together "
775  	                "with grouping and filtering by nodes or resources"
776  	            )
777  	
778  	    constraints_dto = filter_constraints_by_rule_expired_status(
779  	        lib.constraint.get_config(evaluate_rules=True),
780  	        modifiers.is_specified("--all"),
781  	    )
782  	
783  	    constraints_dto = CibConstraintsDto(
784  	        location=constraints_dto.location,
785  	        location_set=constraints_dto.location_set,
786  	    )
787  	
788  	    def _print_lines(lines: StringSequence) -> None:
789  	        if lines:
790  	            print("Location Constraints:")
791  	            print(lines_to_str(indent(lines, indent_step=INDENT_STEP)))
792  	
793  	    if filter_type == "resources":
794  	        if filter_items:
795  	            resources = []
796  	            patterns = []
797  	            for item in filter_items:
798  	                item_type, item_value = parse_args.parse_typed_arg(
799  	                    item,
800  	                    [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
801  	                    RESOURCE_TYPE_RESOURCE,
802  	                )
803  	                if item_type == RESOURCE_TYPE_RESOURCE:
804  	                    resources.append(item_value)
805  	                elif item_type == RESOURCE_TYPE_REGEXP:
806  	                    patterns.append(item_value)
807  	            constraints_dto = _filter_constraints_by_resources(
808  	                constraints_dto, resources, patterns
809  	            )
810  	        _print_lines(
811  	            location.constraints_to_grouped_by_resource_text(
812  	                constraints_dto.location,
813  	                modifiers.is_specified("--full"),
814  	            )
815  	        )
816  	        return
817  	    if filter_type == "nodes":
818  	        if filter_items:
819  	            constraints_dto = CibConstraintsDto(
820  	                location=_filter_location_by_node_base(
821  	                    constraints_dto.location, filter_items
822  	                ),
823  	                location_set=_filter_location_by_node_base(
824  	                    constraints_dto.location_set, filter_items
825  	                ),
826  	            )
827  	        _print_lines(
828  	            location.constraints_to_grouped_by_node_text(
829  	                constraints_dto.location,
830  	                modifiers.is_specified("--full"),
831  	            )
832  	        )
833  	        return
834  	
835  	    print_config(constraints_dto, modifiers)
836  	
837  	
838  	def _verify_node_name(node, existing_nodes):
839  	    report_list = []
840  	    if node not in existing_nodes:
841  	        report_list.append(
842  	            ReportItem.error(
843  	                reports.messages.NodeNotFound(node),
844  	                force_code=reports.codes.FORCE,
845  	            )
846  	        )
847  	    return report_list
848  	
849  	
850  	def _verify_score(score):
851  	    if not utils.is_score(score):
852  	        utils.err(
853  	            "invalid score '%s', use integer or INFINITY or -INFINITY" % score
854  	        )
855  	
856  	
857  	def location_prefer(  # noqa: PLR0912
858  	    lib: Any, argv: parse_args.Argv, modifiers: parse_args.InputModifiers
859  	) -> None:
860  	    """
861  	    Options:
862  	      * --force - allow unknown options, allow constraint for any resource type
863  	      * -f - CIB file
864  	    """
865  	    modifiers.ensure_only_supported("--force", "-f")
866  	    rsc = argv.pop(0)
867  	    prefer_option = argv.pop(0)
868  	
869  	    dummy_rsc_type, rsc_value = parse_args.parse_typed_arg(
870  	        rsc,
871  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
872  	        RESOURCE_TYPE_RESOURCE,
873  	    )
874  	
875  	    if prefer_option == "prefers":
876  	        prefer = True
877  	    elif prefer_option == "avoids":
878  	        prefer = False
879  	    else:
880  	        raise CmdLineInputError()
881  	
882  	    skip_node_check = False
883  	    existing_nodes: list[str] = []
884  	    if modifiers.is_specified("-f") or modifiers.get("--force"):
885  	        skip_node_check = True
886  	        warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
887  	    else:
888  	        lib_env = utils.get_lib_env()
889  	        existing_nodes, report_list = get_existing_nodes_names(
890  	            corosync_conf=lib_env.get_corosync_conf(),
891  	            cib=lib_env.get_cib(),
892  	        )
893  	        if report_list:
894  	            process_library_reports(report_list)
895  	
896  	    report_list = []
897  	    parameters_list = []
898  	    for nodeconf in argv:
899  	        nodeconf_a = nodeconf.split("=", 1)
900  	        node = nodeconf_a[0]
901  	        if not skip_node_check:
902  	            report_list += _verify_node_name(node, existing_nodes)
903  	        if len(nodeconf_a) == 1:
904  	            score = "INFINITY" if prefer else "-INFINITY"
905  	        else:
906  	            score = nodeconf_a[1]
907  	            _verify_score(score)
908  	            if not prefer:
909  	                score = score[1:] if score[0] == "-" else "-" + score
910  	
911  	        parameters_list.append(
912  	            [
913  	                sanitize_id(f"location-{rsc_value}-{node}-{score}"),
914  	                rsc,
915  	                node,
916  	                f"score={score}",
917  	            ]
918  	        )
919  	
920  	    if report_list:
921  	        process_library_reports(report_list)
922  	
923  	    modifiers = modifiers.get_subset("--force", "-f")
924  	
925  	    for parameters in parameters_list:
926  	        location_add(lib, parameters, modifiers, skip_score_and_node_check=True)
927  	
928  	
929  	def location_add(  # noqa: PLR0912, PLR0915
930  	    lib: Any,
931  	    argv: parse_args.Argv,
932  	    modifiers: parse_args.InputModifiers,
933  	    skip_score_and_node_check: bool = False,
934  	) -> None:
935  	    """
936  	    Options:
937  	      * --force - allow unknown options, allow constraint for any resource type
938  	      * -f - CIB file
939  	    """
940  	    del lib
941  	    modifiers.ensure_only_supported("--force", "-f")
942  	    if len(argv) < 4:
943  	        raise CmdLineInputError()
944  	
945  	    constraint_id = argv.pop(0)
946  	    rsc_type, rsc_value = parse_args.parse_typed_arg(
947  	        argv.pop(0),
948  	        [RESOURCE_TYPE_RESOURCE, RESOURCE_TYPE_REGEXP],
949  	        RESOURCE_TYPE_RESOURCE,
950  	    )
951  	    node = argv.pop(0)
952  	    score = None
953  	    if "=" not in argv[0]:
954  	        score = argv.pop(0)
955  	        # TODO added to pcs in the first 0.12.x version
956  	        deprecation_warning(STANDALONE_SCORE_MSG)
957  	    options = []
958  	    # For now we only allow setting resource-discovery and score
959  	    for arg in argv:
960  	        name, value = parse_args.split_option(arg, allow_empty_value=False)
961  	        if name == "score":
962  	            score = value
963  	        elif name == "resource-discovery":
964  	            if not modifiers.get("--force"):
965  	                allowed_discovery = list(
966  	                    map(
967  	                        str,
968  	                        [
969  	                            CibResourceDiscovery.ALWAYS,
970  	                            CibResourceDiscovery.EXCLUSIVE,
971  	                            CibResourceDiscovery.NEVER,
972  	                        ],
973  	                    )
974  	                )
975  	                if value not in allowed_discovery:
976  	                    utils.err(
977  	                        (
978  	                            "invalid {0} value '{1}', allowed values are: {2}"
979  	                            ", use --force to override"
980  	                        ).format(name, value, format_list(allowed_discovery))
981  	                    )
982  	            options.append([name, value])
983  	        elif modifiers.get("--force"):
984  	            options.append([name, value])
985  	        else:
986  	            utils.err("bad option '%s', use --force to override" % name)
987  	    if score is None:
988  	        score = "INFINITY"
989  	
990  	    # Verify that specified node exists in the cluster and score is valid
991  	    if not skip_score_and_node_check:
992  	        if modifiers.is_specified("-f") or modifiers.get("--force"):
993  	            warn(LOCATION_NODE_VALIDATION_SKIP_MSG)
994  	        else:
995  	            lib_env = utils.get_lib_env()
996  	            existing_nodes, report_list = get_existing_nodes_names(
997  	                corosync_conf=lib_env.get_corosync_conf(),
998  	                cib=lib_env.get_cib(),
999  	            )
1000 	            report_list += _verify_node_name(node, existing_nodes)
1001 	            if report_list:
1002 	                process_library_reports(report_list)
1003 	        _verify_score(score)
1004 	
1005 	    id_valid, id_error = utils.validate_xml_id(constraint_id, "constraint id")
1006 	    if not id_valid:
1007 	        utils.err(id_error)
1008 	
1009 	    dom = utils.get_cib_dom()
1010 	
1011 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1012 	        (
1013 	            rsc_valid,
1014 	            rsc_error,
1015 	            dummy_correct_id,
1016 	        ) = utils.validate_constraint_resource(dom, rsc_value)
1017 	        if not rsc_valid:
1018 	            utils.err(rsc_error)
1019 	
1020 	    # Verify current constraint doesn't already exist
1021 	    # If it does we replace it with the new constraint
1022 	    dummy_dom, constraintsElement = getCurrentConstraints(dom)
1023 	    # If the id matches, or the rsc & node match, then we replace/remove
1024 	    elementsToRemove = [
1025 	        rsc_loc
1026 	        for rsc_loc in constraintsElement.getElementsByTagName("rsc_location")
1027 	        # pylint: disable=too-many-boolean-expressions
1028 	        if rsc_loc.getAttribute("id") == constraint_id
1029 	        or (
1030 	            rsc_loc.getAttribute("node") == node
1031 	            and (
1032 	                (
1033 	                    rsc_type == RESOURCE_TYPE_RESOURCE
1034 	                    and rsc_loc.getAttribute("rsc") == rsc_value
1035 	                )
1036 	                or (
1037 	                    rsc_type == RESOURCE_TYPE_REGEXP
1038 	                    and rsc_loc.getAttribute("rsc-pattern") == rsc_value
1039 	                )
1040 	            )
1041 	        )
1042 	    ]
1043 	    for etr in elementsToRemove:
1044 	        constraintsElement.removeChild(etr)
1045 	
1046 	    element = dom.createElement("rsc_location")
1047 	    element.setAttribute("id", constraint_id)
1048 	    if rsc_type == RESOURCE_TYPE_RESOURCE:
1049 	        element.setAttribute("rsc", rsc_value)
1050 	    elif rsc_type == RESOURCE_TYPE_REGEXP:
1051 	        element.setAttribute("rsc-pattern", rsc_value)
1052 	    element.setAttribute("node", node)
1053 	    element.setAttribute("score", score)
1054 	    for option in options:
1055 	        element.setAttribute(option[0], option[1])
1056 	    constraintsElement.appendChild(element)
1057 	
1058 	    utils.replace_cib_configuration(dom)
1059 	
1060 	
1061 	# Grabs the current constraints and returns the dom and constraint element
1062 	def getCurrentConstraints(passed_dom=None):
1063 	    """
1064 	    Commandline options:
1065 	      * -f - CIB file, only if passed_dom is None
1066 	    """
1067 	    if passed_dom:
1068 	        dom = passed_dom
1069 	    else:
1070 	        current_constraints_xml = utils.get_cib_xpath("//constraints")
1071 	        if current_constraints_xml == "":
1072 	            utils.err("unable to process cib")
1073 	        # Verify current constraint doesn't already exist
1074 	        # If it does we replace it with the new constraint
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1075 	        dom = parseString(current_constraints_xml)
1076 	
1077 	    constraintsElement = dom.getElementsByTagName("constraints")[0]
1078 	    return (dom, constraintsElement)
1079 	
1080 	
1081 	# If returnStatus is set, then we don't error out, we just print the error
1082 	# and return false
1083 	def constraint_rm(  # noqa: PLR0912
1084 	    lib,
1085 	    argv,
1086 	    modifiers,
1087 	    returnStatus=False,
1088 	    constraintsElement=None,
1089 	    passed_dom=None,
1090 	):
1091 	    """
1092 	    Options:
1093 	      * -f - CIB file, effective only if passed_dom is None
1094 	    """
1095 	    if passed_dom is None:
1096 	        modifiers.ensure_only_supported("-f")
1097 	    if not argv:
1098 	        raise CmdLineInputError()
1099 	
1100 	    bad_constraint = False
1101 	    if len(argv) != 1:
1102 	        for arg in argv:
1103 	            if not constraint_rm(
1104 	                lib, [arg], modifiers, returnStatus=True, passed_dom=passed_dom
1105 	            ):
1106 	                bad_constraint = True
1107 	        if bad_constraint:
1108 	            sys.exit(1)
1109 	        return None
1110 	
1111 	    c_id = argv.pop(0)
1112 	    elementFound = False
1113 	    dom = None
1114 	    use_cibadmin = False
1115 	    if not constraintsElement:
1116 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1117 	        use_cibadmin = True
1118 	
1119 	    for co in constraintsElement.childNodes[:]:
1120 	        if co.nodeType != xml.dom.Node.ELEMENT_NODE:
1121 	            continue
1122 	        if co.getAttribute("id") == c_id:
1123 	            constraintsElement.removeChild(co)
1124 	            elementFound = True
1125 	
1126 	    if not elementFound:
1127 	        for rule in constraintsElement.getElementsByTagName("rule")[:]:
1128 	            if rule.getAttribute("id") == c_id:
1129 	                elementFound = True
1130 	                parent = rule.parentNode
1131 	                parent.removeChild(rule)
1132 	                if not parent.getElementsByTagName("rule"):
1133 	                    parent.parentNode.removeChild(parent)
1134 	
1135 	    if elementFound:
1136 	        if passed_dom:
1137 	            return dom
1138 	        if use_cibadmin:
1139 	            utils.replace_cib_configuration(dom)
1140 	        if returnStatus:
1141 	            return True
1142 	    else:
1143 	        utils.err("Unable to find constraint - '%s'" % c_id, False)
1144 	        if returnStatus:
1145 	            return False
1146 	        sys.exit(1)
1147 	    return None
1148 	
1149 	
1150 	def _split_set_constraints(
1151 	    constraints_dto: CibConstraintsDto,
1152 	) -> tuple[CibConstraintsDto, CibConstraintsDto]:
1153 	    return (
1154 	        CibConstraintsDto(
1155 	            location=constraints_dto.location,
1156 	            colocation=constraints_dto.colocation,
1157 	            order=constraints_dto.order,
1158 	            ticket=constraints_dto.ticket,
1159 	        ),
1160 	        CibConstraintsDto(
1161 	            location_set=constraints_dto.location_set,
1162 	            colocation_set=constraints_dto.colocation_set,
1163 	            order_set=constraints_dto.order_set,
1164 	            ticket_set=constraints_dto.ticket_set,
1165 	        ),
1166 	    )
1167 	
1168 	
1169 	def _find_constraints_containing_resource(
1170 	    resources_dto: CibResourcesDto,
1171 	    constraints_dto: CibConstraintsDto,
1172 	    resource_id: str,
1173 	) -> CibConstraintsDto:
1174 	    resources_filter = [resource_id]
1175 	    # Original implementation only included parent resource only if resource_id
1176 	    # was referring to a primitive resource, ignoring groups. This may change in
1177 	    # the future if necessary.
1178 	    if any(
1179 	        primitive_dto.id == resource_id
1180 	        for primitive_dto in resources_dto.primitives
1181 	    ):
1182 	        for clone_dto in resources_dto.clones:
1183 	            if clone_dto.member_id == resource_id:
1184 	                resources_filter.append(clone_dto.id)
1185 	                break
1186 	    return _filter_constraints_by_resources(
1187 	        constraints_dto, resources_filter, []
1188 	    )
1189 	
1190 	
1191 	def ref(
1192 	    lib: Any, argv: list[str], modifiers: parse_args.InputModifiers
1193 	) -> None:
1194 	    modifiers.ensure_only_supported("-f")
1195 	    if not argv:
1196 	        raise CmdLineInputError()
1197 	
1198 	    resources_dto = cast(
1199 	        CibResourcesDto, lib.resource.get_configured_resources()
1200 	    )
1201 	
1202 	    constraints_dto = cast(
1203 	        CibConstraintsDto,
1204 	        lib.constraint.get_config(evaluate_rules=False),
1205 	    )
1206 	
1207 	    for resource_id in sorted(set(argv)):
1208 	        constraint_ids = get_all_constraints_ids(
1209 	            _find_constraints_containing_resource(
1210 	                resources_dto, constraints_dto, resource_id
1211 	            )
1212 	        )
1213 	        print(f"Resource: {resource_id}")
1214 	        if constraint_ids:
1215 	            print(
1216 	                "\n".join(
1217 	                    indent(
1218 	                        sorted(constraint_ids),
1219 	                        indent_step=INDENT_STEP,
1220 	                    )
1221 	                )
1222 	            )
1223 	        else:
1224 	            print("  No Matches")
1225 	
1226 	
1227 	def remove_constraints_containing(
1228 	    resource_id: str, output=False, constraints_element=None, passed_dom=None
1229 	):
1230 	    """
1231 	    Commandline options:
1232 	      * -f - CIB file, effective only if passed_dom is None
1233 	    """
1234 	    lib = utils.get_library_wrapper()
1235 	    modifiers = utils.get_input_modifiers()
1236 	    resources_dto = cast(
1237 	        CibResourcesDto, lib.resource.get_configured_resources()
1238 	    )
1239 	
1240 	    constraints_dto, set_constraints_dto = _split_set_constraints(
1241 	        cast(
1242 	            CibConstraintsDto,
1243 	            lib.constraint.get_config(evaluate_rules=False),
1244 	        )
1245 	    )
1246 	    constraints = sorted(
1247 	        get_all_constraints_ids(
1248 	            _find_constraints_containing_resource(
1249 	                resources_dto, constraints_dto, resource_id
1250 	            )
1251 	        )
1252 	    )
1253 	    set_constraints = sorted(
1254 	        get_all_constraints_ids(
1255 	            _find_constraints_containing_resource(
1256 	                resources_dto, set_constraints_dto, resource_id
1257 	            )
1258 	        )
1259 	    )
1260 	    for c in constraints:
1261 	        if output:
1262 	            print_to_stderr(f"Removing Constraint - {c}")
1263 	        if constraints_element is not None:
1264 	            constraint_rm(
1265 	                lib,
1266 	                [c],
1267 	                modifiers,
1268 	                True,
1269 	                constraints_element,
1270 	                passed_dom=passed_dom,
1271 	            )
1272 	        else:
1273 	            constraint_rm(lib, [c], modifiers, passed_dom=passed_dom)
1274 	
1275 	    if set_constraints:
1276 	        (dom, constraintsElement) = getCurrentConstraints(passed_dom)
1277 	        for set_c in constraintsElement.getElementsByTagName("resource_ref")[:]:
1278 	            # If resource id is in a set, remove it from the set, if the set
1279 	            # is empty, then we remove the set, if the parent of the set
1280 	            # is empty then we remove it
1281 	            if set_c.getAttribute("id") == resource_id:
1282 	                parent_node = set_c.parentNode
1283 	                parent_node.removeChild(set_c)
1284 	                if output:
1285 	                    print_to_stderr(
1286 	                        "Removing {} from set {}".format(
1287 	                            resource_id, parent_node.getAttribute("id")
1288 	                        )
1289 	                    )
1290 	                if parent_node.getElementsByTagName("resource_ref").length == 0:
1291 	                    print_to_stderr(
1292 	                        "Removing set {}".format(parent_node.getAttribute("id"))
1293 	                    )
1294 	                    parent_node_2 = parent_node.parentNode
1295 	                    parent_node_2.removeChild(parent_node)
1296 	                    if (
1297 	                        parent_node_2.getElementsByTagName(
1298 	                            "resource_set"
1299 	                        ).length
1300 	                        == 0
1301 	                    ):
1302 	                        parent_node_2.parentNode.removeChild(parent_node_2)
1303 	                        print_to_stderr(
1304 	                            "Removing constraint {}".format(
1305 	                                parent_node_2.getAttribute("id")
1306 	                            )
1307 	                        )
1308 	        if passed_dom:
1309 	            return dom
1310 	        utils.replace_cib_configuration(dom)
1311 	    return None
1312 	
1313 	
1314 	# Re-assign any constraints referencing a resource to its parent (a clone
1315 	# or master)
1316 	def constraint_resource_update(old_id, dom):
1317 	    """
1318 	    Commandline options: no options
1319 	    """
1320 	    new_id = None
1321 	    clone_ms_parent = utils.dom_get_resource_clone_ms_parent(dom, old_id)
1322 	    if clone_ms_parent:
1323 	        new_id = clone_ms_parent.getAttribute("id")
1324 	
1325 	    if new_id:
1326 	        constraints = dom.getElementsByTagName("rsc_location")
1327 	        constraints += dom.getElementsByTagName("rsc_order")
1328 	        constraints += dom.getElementsByTagName("rsc_colocation")
1329 	        attrs_to_update = ["rsc", "first", "then", "with-rsc"]
1330 	        for constraint in constraints:
1331 	            for attr in attrs_to_update:
1332 	                if constraint.getAttribute(attr) == old_id:
1333 	                    constraint.setAttribute(attr, new_id)
1334 	    return dom
1335