1    	# pylint: disable=too-many-lines
2    	import base64
3    	import getpass
4    	import json
5    	import logging
6    	import os
7    	import re
8    	import signal
9    	import subprocess
10   	import sys
11   	import tarfile
12   	import tempfile
13   	import threading
14   	import time
15   	import xml.dom.minidom
16   	from functools import lru_cache
17   	from io import BytesIO
18   	from textwrap import dedent
19   	from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast
20   	from urllib.parse import urlencode
21   	from xml.dom.minidom import Document as DomDocument
22   	from xml.dom.minidom import parseString
23   	
24   	import pcs.cli.booth.env
25   	import pcs.lib.corosync.config_parser as corosync_conf_parser
26   	from pcs import settings, usage
27   	from pcs.cli.cluster_property.output import PropertyConfigurationFacade
28   	from pcs.cli.common import middleware
29   	from pcs.cli.common.env_cli import Env
30   	from pcs.cli.common.errors import CmdLineInputError
31   	from pcs.cli.common.lib_wrapper import Library
32   	from pcs.cli.common.parse_args import InputModifiers
33   	from pcs.cli.common.tools import print_to_stderr, timeout_to_seconds_legacy
34   	from pcs.cli.file import metadata as cli_file_metadata
35   	from pcs.cli.reports import ReportProcessorToConsole, process_library_reports
36   	from pcs.cli.reports import output as reports_output
37   	from pcs.common import const, file_type_codes
38   	from pcs.common import file as pcs_file
39   	from pcs.common import pacemaker as common_pacemaker
40   	from pcs.common import pcs_pycurl as pycurl
41   	from pcs.common.host import PcsKnownHost
42   	from pcs.common.pacemaker.resource.operations import (
43   	    OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME,
44   	)
45   	from pcs.common.reports import ReportProcessor
46   	from pcs.common.reports.messages import CibUpgradeFailedToMinimalRequiredVersion
47   	from pcs.common.services.errors import ManageServiceError
48   	from pcs.common.services.interfaces import ServiceManagerInterface
49   	from pcs.common.str_tools import format_list
50   	from pcs.common.tools import Version, timeout_to_seconds
51   	from pcs.common.types import StringSequence
52   	from pcs.lib.corosync.config_facade import ConfigFacade as corosync_conf_facade
53   	from pcs.lib.env import LibraryEnvironment
54   	from pcs.lib.errors import LibraryError
55   	from pcs.lib.external import CommandRunner, is_proxy_set
56   	from pcs.lib.file.instance import FileInstance as LibFileInstance
57   	from pcs.lib.host.config.facade import Facade as KnownHostsFacade
58   	from pcs.lib.interface.config import ParserErrorException
59   	from pcs.lib.pacemaker.live import get_cluster_status_dom
60   	from pcs.lib.pacemaker.state import ClusterState
61   	from pcs.lib.pacemaker.values import is_score as is_score_value
62   	from pcs.lib.pacemaker.values import validate_id
63   	from pcs.lib.services import get_service_manager as _get_service_manager
64   	from pcs.lib.services import service_exception_to_report
65   	
66   	if TYPE_CHECKING:
67   	    from pcs.common.reports.item import ReportItemList
68   	
69   	# pylint: disable=invalid-name
70   	# pylint: disable=too-many-branches
71   	
72   	# usefile & filename variables are set in pcs module
73   	usefile = False
74   	filename = ""
75   	# Note: not properly typed
76   	pcs_options: Dict[Any, Any] = {}
77   	
78   	
79   	def _getValidateWithVersion(dom) -> Version:
80   	    """
81   	    Commandline options: no options
82   	    """
83   	    cib = dom.getElementsByTagName("cib")
84   	    if len(cib) != 1:
85   	        err("Bad cib")
86   	
87   	    cib = cib[0]
88   	
89   	    version = cib.getAttribute("validate-with")
90   	    r = re.compile(r"pacemaker-(\d+)\.(\d+)\.?(\d+)?")
91   	    m = r.match(version)
92   	    if m is None:
93   	        raise AssertionError()
94   	    major = int(m.group(1))
95   	    minor = int(m.group(2))
96   	    rev = int(m.group(3) or 0)
97   	    return Version(major, minor, rev)
98   	
99   	
100  	def isCibVersionSatisfied(cib_dom, required_version: Version) -> bool:
101  	    if not isinstance(cib_dom, DomDocument):
102  	        cib_dom = cib_dom.ownerDocument
103  	    return _getValidateWithVersion(cib_dom) >= required_version
104  	
105  	
106  	# Check the current pacemaker version in cib and upgrade it if necessary
107  	# Returns False if not upgraded and True if upgraded
108  	def _checkAndUpgradeCIB(required_version: Version) -> bool:
109  	    """
110  	    Commandline options:
111  	      * -f - CIB file
112  	    """
113  	    if isCibVersionSatisfied(get_cib_dom(), required_version):
114  	        return False
115  	    cluster_upgrade()
116  	    return True
117  	
118  	
119  	def cluster_upgrade():
120  	    """
121  	    Commandline options:
122  	      * -f - CIB file
123  	    """
124  	    output, retval = run(["cibadmin", "--upgrade", "--force"])
125  	    if retval != 0:
126  	        err("unable to upgrade cluster: %s" % output)
127  	    if (
128  	        output.strip()
129  	        == "Upgrade unnecessary: Schema is already the latest available"
130  	    ):
131  	        return
132  	    print_to_stderr("Cluster CIB has been upgraded to latest version")
133  	
134  	
135  	def cluster_upgrade_to_version(required_version: Version) -> Any:
136  	    """
137  	    Commandline options:
138  	      * -f - CIB file
139  	    """
140  	    _checkAndUpgradeCIB(required_version)
141  	    dom = get_cib_dom()
142  	    current_version = _getValidateWithVersion(dom)
143  	    if current_version < required_version:
144  	        err(
145  	            CibUpgradeFailedToMinimalRequiredVersion(
146  	                str(current_version),
147  	                str(required_version),
148  	            ).message
149  	        )
150  	    return dom
151  	
152  	
153  	# Check status of node
154  	def checkStatus(node):
155  	    """
156  	    Commandline options:
157  	      * --request-timeout - timeout for HTTP requests
158  	    """
159  	    return sendHTTPRequest(
160  	        node, "remote/status", urlencode({"version": "2"}), False, False
161  	    )
162  	
163  	
164  	# Check and see if we're authorized (faster than a status check)
165  	def checkAuthorization(node):
166  	    """
167  	    Commandline options:
168  	      * --request-timeout - timeout for HTTP requests
169  	    """
170  	    return sendHTTPRequest(node, "remote/check_auth", None, False, False)
171  	
172  	
173  	def get_uid_gid_file_name(uid, gid):
174  	    """
175  	    Commandline options: no options
176  	    """
177  	    return "pcs-uidgid-%s-%s" % (uid, gid)
178  	
179  	
180  	# Reads in uid file and returns dict of values {'uid':'theuid', 'gid':'thegid'}
181  	def read_uid_gid_file(uidgid_filename):
182  	    """
183  	    Commandline options: no options
184  	    """
185  	    uidgid = {}
186  	    with open(
187  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "r"
188  	    ) as myfile:
189  	        data = myfile.read().split("\n")
190  	    in_uidgid = False
191  	    for data_line in data:
192  	        line = re.sub(r"#.*", "", data_line)
193  	        if not in_uidgid:
194  	            if re.search(r"uidgid.*{", line):
195  	                in_uidgid = True
196  	            else:
197  	                continue
198  	        matches = re.search(r"uid:\s*(\S+)", line)
199  	        if matches:
200  	            uidgid["uid"] = matches.group(1)
201  	
202  	        matches = re.search(r"gid:\s*(\S+)", line)
203  	        if matches:
204  	            uidgid["gid"] = matches.group(1)
205  	
206  	    return uidgid
207  	
208  	
209  	def get_uidgid_file_content(
210  	    uid: Optional[str] = None, gid: Optional[str] = None
211  	) -> Optional[str]:
212  	    if not uid and not gid:
213  	        return None
214  	    uid_gid_lines = []
215  	    if uid:
216  	        uid_gid_lines.append(f"  uid: {uid}")
217  	    if gid:
218  	        uid_gid_lines.append(f"  gid: {gid}")
219  	    return dedent(
220  	        """\
221  	        uidgid {{
222  	        {uid_gid_keys}
223  	        }}
224  	        """
225  	    ).format(uid_gid_keys="\n".join(uid_gid_lines))
226  	
227  	
228  	def write_uid_gid_file(uid, gid):
229  	    """
230  	    Commandline options: no options
231  	    """
232  	    orig_filename = get_uid_gid_file_name(uid, gid)
233  	    uidgid_filename = orig_filename
234  	    counter = 0
235  	    if find_uid_gid_files(uid, gid):
236  	        err("uidgid file with uid=%s and gid=%s already exists" % (uid, gid))
237  	
238  	    while os.path.exists(
239  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename)
240  	    ):
241  	        counter = counter + 1
242  	        uidgid_filename = orig_filename + "-" + str(counter)
243  	
244  	    data = get_uidgid_file_content(uid, gid)
245  	    if data:
246  	        with open(
247  	            os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "w"
248  	        ) as uidgid_file:
249  	            uidgid_file.write(data)
250  	
251  	
252  	def find_uid_gid_files(uid, gid):
253  	    """
254  	    Commandline options: no options
255  	    """
256  	    if uid == "" and gid == "":
257  	        return []
258  	
259  	    found_files = []
260  	    uid_gid_files = os.listdir(settings.corosync_uidgid_dir)
261  	    for uidgid_file in uid_gid_files:
262  	        uid_gid_dict = read_uid_gid_file(uidgid_file)
263  	        if ("uid" in uid_gid_dict and uid == "") or (
264  	            "uid" not in uid_gid_dict and uid != ""
265  	        ):
266  	            continue
267  	        if ("gid" in uid_gid_dict and gid == "") or (
268  	            "gid" not in uid_gid_dict and gid != ""
269  	        ):
270  	            continue
271  	        if "uid" in uid_gid_dict and uid != uid_gid_dict["uid"]:
272  	            continue
273  	        if "gid" in uid_gid_dict and gid != uid_gid_dict["gid"]:
274  	            continue
275  	
276  	        found_files.append(uidgid_file)
277  	
278  	    return found_files
279  	
280  	
281  	# Removes all uid/gid files with the specified uid/gid, returns false if we
282  	# couldn't find one
283  	def remove_uid_gid_file(uid, gid):
284  	    """
285  	    Commandline options: no options
286  	    """
287  	    if uid == "" and gid == "":
288  	        return False
289  	
290  	    file_removed = False
291  	    for uidgid_file in find_uid_gid_files(uid, gid):
292  	        os.remove(os.path.join(settings.corosync_uidgid_dir, uidgid_file))
293  	        file_removed = True
294  	
295  	    return file_removed
296  	
297  	
298  	@lru_cache()
299  	def read_known_hosts_file() -> dict[str, PcsKnownHost]:
300  	    return read_known_hosts_file_not_cached()
301  	
302  	
303  	def read_known_hosts_file_not_cached() -> dict[str, PcsKnownHost]:
304  	    """
305  	    Commandline options: no options
306  	    """
307  	    try:
308  	        if os.getuid() != 0:
309  	            known_hosts_raw_file = pcs_file.RawFile(
310  	                cli_file_metadata.for_file_type(file_type_codes.PCS_KNOWN_HOSTS)
311  	            )
312  	            # json.loads handles bytes, it expects utf-8, 16 or 32 encoding
313  	            known_hosts_struct = json.loads(known_hosts_raw_file.read())
314  	            # TODO use known hosts facade for getting info from json struct once the
315  	            # facade exists
316  	            return {
317  	                name: PcsKnownHost.from_known_host_file_dict(name, host)
318  	                for name, host in known_hosts_struct["known_hosts"].items()
319  	            }
320  	        # TODO remove
321  	        # This is here to provide known-hosts to functions not yet
322  	        # overhauled to pcs.lib. Cli should never read known hosts from
323  	        # /var/lib/pcsd/.
324  	        known_hosts_instance = LibFileInstance.for_known_hosts()
325  	        known_hosts_facade = cast(
326  	            KnownHostsFacade, known_hosts_instance.read_to_facade()
327  	        )
328  	        return known_hosts_facade.known_hosts
329  	
330  	    except LibraryError as e:
331  	        # TODO remove
332  	        # This is here to provide known-hosts to functions not yet
333  	        # overhauled to pcs.lib. Cli should never read known hosts from
334  	        # /var/lib/pcsd/.
335  	        process_library_reports(list(e.args))
336  	    except ParserErrorException as e:
337  	        # TODO remove
338  	        # This is here to provide known-hosts to functions not yet
339  	        # overhauled to pcs.lib. Cli should never read known hosts from
340  	        # /var/lib/pcsd/.
341  	        process_library_reports(
342  	            known_hosts_instance.parser_exception_to_report_list(e)
343  	        )
344  	    except pcs_file.RawFileError as e:
345  	        reports_output.warn("Unable to read the known-hosts file: " + e.reason)
346  	    except json.JSONDecodeError as e:
347  	        reports_output.warn(f"Unable to parse the known-hosts file: {e}")
348  	    except (TypeError, KeyError):
349  	        reports_output.warn("Warning: Unable to parse the known-hosts file.")
350  	    return {}
351  	
352  	
353  	def repeat_if_timeout(send_http_request_function, repeat_count=15):
354  	    """
355  	    Commandline options: no options
356  	    NOTE: callback send_http_request_function may use --request-timeout
357  	    """
358  	
359  	    def repeater(node, *args, **kwargs):
360  	        repeats_left = repeat_count
361  	        while True:
362  	            retval, output = send_http_request_function(node, *args, **kwargs)
363  	            if (
364  	                retval != 2
365  	                or "Operation timed out" not in output
366  	                or repeats_left < 1
367  	            ):
368  	                # did not timed out OR repeat limit exceeded
369  	                return retval, output
370  	            repeats_left = repeats_left - 1
371  	            if "--debug" in pcs_options:
372  	                print_to_stderr(f"{node}: {output}, trying again...")
373  	
374  	    return repeater
375  	
376  	
377  	# Set the corosync.conf file on the specified node
378  	def getCorosyncConfig(node):
379  	    """
380  	    Commandline options:
381  	      * --request-timeout - timeout for HTTP requests
382  	    """
383  	    return sendHTTPRequest(node, "remote/get_corosync_conf", None, False, False)
384  	
385  	
386  	def setCorosyncConfig(node, config):
387  	    """
388  	    Commandline options:
389  	      * --request-timeout - timeout for HTTP requests
390  	    """
391  	    data = urlencode({"corosync_conf": config})
392  	    (status, data) = sendHTTPRequest(node, "remote/set_corosync_conf", data)
393  	    if status != 0:
394  	        err("Unable to set corosync config: {0}".format(data))
395  	
396  	
397  	def getPacemakerNodeStatus(node):
398  	    """
399  	    Commandline options:
400  	      * --request-timeout - timeout for HTTP requests
401  	    """
402  	    return sendHTTPRequest(
403  	        node, "remote/pacemaker_node_status", None, False, False
404  	    )
405  	
406  	
407  	def startCluster(node, quiet=False, timeout=None):
408  	    """
409  	    Commandline options:
410  	      * --request-timeout - timeout for HTTP requests
411  	    """
412  	    return sendHTTPRequest(
413  	        node,
414  	        "remote/cluster_start",
415  	        printResult=False,
416  	        printSuccess=not quiet,
417  	        timeout=timeout,
418  	    )
419  	
420  	
421  	def stopPacemaker(node, quiet=False, force=True):
422  	    """
423  	    Commandline options:
424  	      * --request-timeout - timeout for HTTP requests
425  	    """
426  	    return stopCluster(
427  	        node, pacemaker=True, corosync=False, quiet=quiet, force=force
428  	    )
429  	
430  	
431  	def stopCorosync(node, quiet=False, force=True):
432  	    """
433  	    Commandline options:
434  	      * --request-timeout - timeout for HTTP requests
435  	    """
436  	    return stopCluster(
437  	        node, pacemaker=False, corosync=True, quiet=quiet, force=force
438  	    )
439  	
440  	
441  	def stopCluster(node, quiet=False, pacemaker=True, corosync=True, force=True):
442  	    """
443  	    Commandline options:
444  	      * --request-timeout - timeout for HTTP requests
445  	    """
446  	    data = {}
447  	    timeout = None
448  	    if pacemaker and not corosync:
449  	        data["component"] = "pacemaker"
450  	        timeout = 2 * 60
451  	    elif corosync and not pacemaker:
452  	        data["component"] = "corosync"
453  	    if force:
454  	        data["force"] = 1
455  	    data = urlencode(data)
456  	    return sendHTTPRequest(
457  	        node,
458  	        "remote/cluster_stop",
459  	        data,
460  	        printResult=False,
461  	        printSuccess=not quiet,
462  	        timeout=timeout,
463  	    )
464  	
465  	
466  	def enableCluster(node):
467  	    """
468  	    Commandline options:
469  	      * --request-timeout - timeout for HTTP requests
470  	    """
471  	    return sendHTTPRequest(node, "remote/cluster_enable", None, False, True)
472  	
473  	
474  	def disableCluster(node):
475  	    """
476  	    Commandline options:
477  	      * --request-timeout - timeout for HTTP requests
478  	    """
479  	    return sendHTTPRequest(node, "remote/cluster_disable", None, False, True)
480  	
481  	
482  	def destroyCluster(node, quiet=False):
483  	    """
484  	    Commandline options:
485  	      * --request-timeout - timeout for HTTP requests
486  	    """
487  	    return sendHTTPRequest(
488  	        node, "remote/cluster_destroy", None, not quiet, not quiet
489  	    )
490  	
491  	
492  	def restoreConfig(node, tarball_data):
493  	    """
494  	    Commandline options:
495  	      * --request-timeout - timeout for HTTP requests
496  	    """
497  	    data = urlencode({"tarball": tarball_data})
498  	    return sendHTTPRequest(node, "remote/config_restore", data, False, True)
499  	
500  	
501  	def pauseConfigSyncing(node, delay_seconds=300):
502  	    """
503  	    Commandline options:
504  	      * --request-timeout - timeout for HTTP requests
505  	    """
506  	    data = urlencode({"sync_thread_pause": delay_seconds})
507  	    return sendHTTPRequest(node, "remote/set_sync_options", data, False, False)
508  	
509  	
510  	# Send an HTTP request to a node return a tuple with status, data
511  	# If status is 0 then data contains server response
512  	# Otherwise if non-zero then data contains error message
513  	# Returns a tuple (error, error message)
514  	# 0 = Success,
515  	# 1 = HTTP Error
516  	# 2 = No response,
517  	# 3 = Auth Error
518  	# 4 = Permission denied
519  	def sendHTTPRequest(  # noqa: PLR0912, PLR0915
520  	    host, request, data=None, printResult=True, printSuccess=True, timeout=None
521  	):
522  	    """
523  	    Commandline options:
524  	      * --request-timeout - timeout for HTTP requests
525  	      * --debug
526  	    """
527  	    # pylint: disable=too-many-locals
528  	    # pylint: disable=too-many-statements
529  	    port = None
530  	    addr = host
531  	    token = None
532  	    known_host = read_known_hosts_file().get(host, None)
533  	    # TODO: do not allow communication with unknown host
534  	    if known_host:
535  	        port = known_host.dest.port
536  	        addr = known_host.dest.addr
537  	        token = known_host.token
538  	    if port is None:
539  	        port = settings.pcsd_default_port
540  	    url = "https://{host}:{port}/{request}".format(
541  	        host="[{0}]".format(addr) if ":" in addr else addr,
542  	        request=request,
543  	        port=port,
544  	    )
545  	    if "--debug" in pcs_options:
546  	        print_to_stderr(f"Sending HTTP Request to: {url}\nData: {data}")
547  	
548  	    def __debug_callback(data_type, debug_data):
549  	        prefixes = {
550  	            # pylint: disable=no-member
551  	            pycurl.DEBUG_TEXT: b"* ",
552  	            pycurl.DEBUG_HEADER_IN: b"< ",
553  	            pycurl.DEBUG_HEADER_OUT: b"> ",
554  	            pycurl.DEBUG_DATA_IN: b"<< ",
555  	            pycurl.DEBUG_DATA_OUT: b">> ",
556  	        }
557  	        if data_type in prefixes:
558  	            debug_output.write(prefixes[data_type])
559  	            debug_output.write(debug_data)
560  	            if not debug_data.endswith(b"\n"):
561  	                debug_output.write(b"\n")
562  	
563  	    output = BytesIO()
564  	    debug_output = BytesIO()
565  	    cookies = __get_cookie_list(token)
566  	    if not timeout:
567  	        timeout = settings.default_request_timeout
568  	    timeout = pcs_options.get("--request-timeout", timeout)
569  	
570  	    handler = pycurl.Curl()
571  	    handler.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTPS)
572  	    handler.setopt(pycurl.URL, url.encode("utf-8"))
573  	    handler.setopt(pycurl.WRITEFUNCTION, output.write)
574  	    handler.setopt(pycurl.VERBOSE, 1)
575  	    handler.setopt(pycurl.NOSIGNAL, 1)  # required for multi-threading
576  	    handler.setopt(pycurl.DEBUGFUNCTION, __debug_callback)
577  	    handler.setopt(pycurl.TIMEOUT_MS, int(timeout * 1000))
578  	    handler.setopt(pycurl.SSL_VERIFYHOST, 0)
579  	    handler.setopt(pycurl.SSL_VERIFYPEER, 0)
580  	    handler.setopt(pycurl.HTTPHEADER, ["Expect: "])
581  	    if cookies:
582  	        handler.setopt(pycurl.COOKIE, ";".join(cookies).encode("utf-8"))
583  	    if data:
584  	        handler.setopt(pycurl.COPYPOSTFIELDS, data.encode("utf-8"))
585  	    try:
586  	        handler.perform()
587  	        response_data = output.getvalue().decode("utf-8")
588  	        response_code = handler.getinfo(pycurl.RESPONSE_CODE)
589  	        if printResult or printSuccess:
590  	            print_to_stderr(host + ": " + response_data.strip())
591  	        if "--debug" in pcs_options:
592  	            print_to_stderr(
593  	                "Response Code: {response_code}\n"
594  	                "--Debug Response Start--\n"
595  	                "{response_data}\n"
596  	                "--Debug Response End--\n"
597  	                "Communication debug info for calling: {url}\n"
598  	                "--Debug Communication Output Start--\n"
599  	                "{debug_comm_output}\n"
600  	                "--Debug Communication Output End--".format(
601  	                    response_code=response_code,
602  	                    response_data=response_data,
603  	                    url=url,
604  	                    debug_comm_output=debug_output.getvalue().decode(
605  	                        "utf-8", "ignore"
606  	                    ),
607  	                )
608  	            )
609  	
610  	        if response_code == 401:
611  	            output = (
612  	                3,
613  	                (
614  	                    "Unable to authenticate to {node} - (HTTP error: {code}), "
615  	                    "try running 'pcs host auth {node}'"
616  	                ).format(node=host, code=response_code),
617  	            )
618  	        elif response_code == 403:
619  	            output = (
620  	                4,
621  	                "{node}: Permission denied - (HTTP error: {code})".format(
622  	                    node=host, code=response_code
623  	                ),
624  	            )
625  	        elif response_code >= 400:
626  	            output = (
627  	                1,
628  	                "Error connecting to {node} - (HTTP error: {code})".format(
629  	                    node=host, code=response_code
630  	                ),
631  	            )
632  	        else:
633  	            output = (0, response_data)
634  	
635  	        if printResult and output[0] != 0:
636  	            print_to_stderr(output[1])
637  	
638  	        return output
639  	    except pycurl.error as e:
640  	        if is_proxy_set(os.environ):
641  	            reports_output.warn(
642  	                "Proxy is set in environment variables, try disabling it"
643  	            )
644  	        # pylint: disable=unbalanced-tuple-unpacking
645  	        dummy_errno, reason = e.args
646  	        if "--debug" in pcs_options:
647  	            print_to_stderr(f"Response Reason: {reason}")
648  	        msg = (
649  	            "Unable to connect to {host}, check if pcsd is running there or try "
650  	            "setting higher timeout with --request-timeout option ({reason})"
651  	        ).format(host=host, reason=reason)
652  	        if printResult:
653  	            print_to_stderr(msg)
654  	        return (2, msg)
655  	
656  	
657  	def __get_cookie_list(token):
658  	    """
659  	    Commandline options: no options
660  	    """
661  	    cookies = []
662  	    if token:
663  	        cookies.append("token=" + token)
664  	    if os.geteuid() == 0:
665  	        for name in ("CIB_user", "CIB_user_groups"):
666  	            if name in os.environ and os.environ[name].strip():
667  	                value = os.environ[name].strip()
668  	                # Let's be safe about characters in env variables and do base64.
669  	                # We cannot do it for CIB_user however to be backward compatible
670  	                # so we at least remove disallowed characters.
671  	                if name == "CIB_user":
672  	                    value = re.sub(r"[^!-~]", "", value).replace(";", "")
673  	                else:
674  	                    # python3 requires the value to be bytes not str
675  	                    value = base64.b64encode(value.encode("utf8")).decode(
676  	                        "utf-8"
677  	                    )
678  	                cookies.append("{0}={1}".format(name, value))
679  	    return cookies
680  	
681  	
682  	def get_corosync_conf_facade(conf_text=None):
683  	    """
684  	    Commandline options:
685  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
686  	        settings
687  	    """
688  	    try:
689  	        return corosync_conf_facade(
690  	            corosync_conf_parser.Parser.parse(
691  	                (getCorosyncConf() if conf_text is None else conf_text).encode(
692  	                    "utf-8"
693  	                )
694  	            )
695  	        )
696  	    except corosync_conf_parser.CorosyncConfParserException as e:
697  	        return err("Unable to parse corosync.conf: %s" % e)
698  	
699  	
700  	def getNodeAttributesFromPacemaker():
701  	    """
702  	    Commandline options: no options
703  	    """
704  	    try:
705  	        return [
706  	            node.attrs
707  	            for node in ClusterState(
708  	                get_cluster_status_dom(cmd_runner())
709  	            ).node_section.nodes
710  	        ]
711  	    except LibraryError as e:
712  	        return process_library_reports(e.args)
713  	
714  	
715  	def hasCorosyncConf():
716  	    """
717  	    Commandline options:
718  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
719  	        settings
720  	    """
721  	    return os.path.isfile(settings.corosync_conf_file)
722  	
723  	
724  	def getCorosyncConf():
725  	    """
726  	    Commandline options:
727  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
728  	        settings
729  	    """
730  	    corosync_conf_content = None
731  	    try:
732  	        with open(
733  	            settings.corosync_conf_file, "r", encoding="utf-8"
734  	        ) as corosync_conf_file:
735  	            corosync_conf_content = corosync_conf_file.read()
736  	    except IOError as e:
737  	        err("Unable to read %s: %s" % (settings.corosync_conf_file, e.strerror))
738  	    return corosync_conf_content
739  	
740  	
741  	def reloadCorosync():
742  	    """
743  	    Commandline options: no options
744  	    """
745  	    output, retval = run(["corosync-cfgtool", "-R"])
746  	    return output, retval
747  	
748  	
749  	def getCorosyncActiveNodes():
750  	    """
751  	    Commandline options: no options
752  	    """
753  	    output, retval = run(["corosync-cmapctl"])
754  	    if retval != 0:
755  	        return []
756  	
757  	    nodename_re = re.compile(r"^nodelist\.node\.(\d+)\.name .*= (.*)", re.M)
758  	    nodestatus_re = re.compile(
759  	        r"^runtime\.members\.(\d+).status .*= (.*)", re.M
760  	    )
761  	    nodenameid_mapping_re = re.compile(
762  	        r"nodelist\.node\.(\d+)\.nodeid .*= (\d+)", re.M
763  	    )
764  	
765  	    node_names = nodename_re.findall(output)
766  	
767  	    index_to_id = dict(nodenameid_mapping_re.findall(output))
768  	    id_to_status = dict(nodestatus_re.findall(output))
769  	
770  	    node_status = {}
771  	    for index, node_name in node_names:
772  	        if index in index_to_id:
773  	            nodeid = index_to_id[index]
774  	            if nodeid in id_to_status:
775  	                node_status[node_name] = id_to_status[nodeid]
776  	        else:
777  	            print_to_stderr(f"Error mapping {node_name}")
778  	
779  	    nodes_active = []
780  	    for node, status in node_status.items():
781  	        if status == "joined":
782  	            nodes_active.append(node)
783  	
784  	    return nodes_active
785  	
786  	
787  	# is it needed to handle corosync-qdevice service when managing cluster services
788  	def need_to_handle_qdevice_service():
789  	    """
790  	    Commandline options: no options
791  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
792  	        settings but it doesn't make sense for contexts in which this function
793  	        is used
794  	    """
795  	    try:
796  	        with open(settings.corosync_conf_file, "rb") as corosync_conf_file:
797  	            return (
798  	                corosync_conf_facade(
799  	                    corosync_conf_parser.Parser.parse(corosync_conf_file.read())
800  	                ).get_quorum_device_model()
801  	                is not None
802  	            )
803  	    except (EnvironmentError, corosync_conf_parser.CorosyncConfParserException):
804  	        # corosync.conf not present or not valid => no qdevice specified
805  	        return False
806  	
807  	
808  	# Restore default behavior before starting subprocesses
809  	def subprocess_setup():
810  	    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
811  	
812  	
813  	def touch_cib_file(cib_filename):
814  	    if not os.path.isfile(cib_filename):
815  	        try:
816  	            write_empty_cib(cib_filename)
817  	        except EnvironmentError as e:
818  	            err(
819  	                "Unable to write to file: '{0}': '{1}'".format(
820  	                    cib_filename, str(e)
821  	                )
822  	            )
823  	
824  	
825  	# Run command, with environment and return (output, retval)
826  	# DEPRECATED, please use lib.external.CommandRunner via utils.cmd_runner()
827  	def run(
828  	    args,
829  	    ignore_stderr=False,
830  	    string_for_stdin=None,
831  	    env_extend=None,
832  	    binary_output=False,
833  	):
834  	    """
835  	    Commandline options:
836  	      * -f - CIB file (effective only for some pacemaker tools)
837  	      * --debug
838  	    """
839  	    if not env_extend:
840  	        env_extend = {}
841  	    env_var = env_extend
842  	    env_var.update(dict(os.environ))
843  	    env_var["LC_ALL"] = "C"
844  	    if usefile:
845  	        env_var["CIB_file"] = filename
846  	        touch_cib_file(filename)
847  	
848  	    command = args[0]
849  	    if command[0:3] == "crm" or command in [
850  	        "cibadmin",
851  	        "iso8601",
852  	        "stonith_admin",
853  	    ]:
854  	        args[0] = os.path.join(settings.pacemaker_execs, command)
855  	    elif command[0:8] == "corosync":
856  	        args[0] = os.path.join(settings.corosync_execs, command)
857  	
858  	    try:
859  	        if "--debug" in pcs_options:
860  	            print_to_stderr("Running: " + " ".join(args))
861  	            if string_for_stdin:
862  	                print_to_stderr(
863  	                    f"--Debug Input Start--\n"
864  	                    f"{string_for_stdin}\n"
865  	                    f"--Debug Input End--"
866  	                )
867  	
868  	        # Some commands react differently if you give them anything via stdin
869  	        if string_for_stdin is not None:
870  	            stdin_pipe = subprocess.PIPE
871  	        else:
872  	            stdin_pipe = subprocess.DEVNULL
873  	
874  	        # pylint: disable=subprocess-popen-preexec-fn, consider-using-with
875  	        p = subprocess.Popen(
876  	            args,
877  	            stdin=stdin_pipe,
878  	            stdout=subprocess.PIPE,
879  	            stderr=(subprocess.PIPE if ignore_stderr else subprocess.STDOUT),
880  	            preexec_fn=subprocess_setup,  # noqa: PLW1509
881  	            close_fds=True,
882  	            env=env_var,
883  	            # decodes newlines and in python3 also converts bytes to str
884  	            universal_newlines=(not binary_output),
885  	        )
886  	        output, dummy_stderror = p.communicate(string_for_stdin)
887  	        retval = p.returncode
888  	        if "--debug" in pcs_options:
889  	            print_to_stderr(
890  	                "Return Value: {retval}\n"
891  	                "--Debug Output Start--\n"
892  	                "{debug_output}\n"
893  	                "--Debug Output End--".format(
894  	                    retval=retval,
895  	                    debug_output=output.rstrip(),
896  	                )
897  	            )
898  	    except OSError as e:
899  	        print_to_stderr(e.strerror)
900  	        err("unable to locate command: " + args[0])
901  	
902  	    return output, retval
903  	
904  	
905  	def cmd_runner(cib_file_override=None):
906  	    """
907  	    Commandline options:
908  	      * -f - CIB file
909  	    """
910  	    env_vars = {}
911  	    if usefile:
912  	        env_vars["CIB_file"] = filename
913  	    if cib_file_override:
914  	        env_vars["CIB_file"] = cib_file_override
915  	    env_vars.update(os.environ)
916  	    env_vars["LC_ALL"] = "C"
917  	    return CommandRunner(
918  	        logging.getLogger("pcs"), get_report_processor(), env_vars
919  	    )
920  	
921  	
922  	def run_pcsdcli(command, data=None):
923  	    """
924  	    Commandline options:
925  	      * --request-timeout - timeout for HTTP request, applicable for commands:
926  	        * remove_known_hosts - only when running on cluster node (sync will
927  	            be initiated)
928  	        * auth
929  	        * send_local_configs
930  	    """
931  	    if not data:
932  	        data = {}
933  	    env_var = {}
934  	    if "--debug" in pcs_options:
935  	        env_var["PCSD_DEBUG"] = "true"
936  	    if "--request-timeout" in pcs_options:
937  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(pcs_options["--request-timeout"])
938  	    else:
939  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(settings.default_request_timeout)
940  	    pcsd_dir_path = settings.pcsd_exec_location
941  	    pcsdcli_path = os.path.join(pcsd_dir_path, "pcsd-cli.rb")
942  	    if settings.pcsd_gem_path is not None:
943  	        env_var["GEM_HOME"] = settings.pcsd_gem_path
944  	    stdout, dummy_stderr, retval = cmd_runner().run(
945  	        [settings.ruby_exec, "-I" + pcsd_dir_path, pcsdcli_path, command],
946  	        json.dumps(data),
947  	        env_var,
948  	    )
949  	    try:
950  	        output_json = json.loads(stdout)
951  	        for key in ["status", "text", "data"]:
952  	            if key not in output_json:
953  	                output_json[key] = None
954  	
955  	        output = "".join(output_json["log"])
956  	        # check if some requests timed out, if so print message about it
957  	        if "error: operation_timedout" in output:
958  	            print_to_stderr("Error: Operation timed out")
959  	        # check if there are any connection failures due to proxy in pcsd and
960  	        # print warning if so
961  	        proxy_msg = "Proxy is set in environment variables, try disabling it"
962  	        if proxy_msg in output:
963  	            reports_output.warn(proxy_msg)
964  	
965  	    except ValueError:
966  	        output_json = {
967  	            "status": "bad_json_output",
968  	            "text": stdout,
969  	            "data": None,
970  	        }
971  	    return output_json, retval
972  	
973  	
974  	def call_local_pcsd(argv, options, std_in=None):  # noqa: PLR0911
975  	    """
976  	    Commandline options:
977  	      * --request-timeout - timeout of call to local pcsd
978  	    """
979  	    # pylint: disable=too-many-return-statements
980  	    # some commands cannot be run under a non-root account
981  	    # so we pass those commands to locally running pcsd to execute them
982  	    # returns [list_of_errors, exit_code, stdout, stderr]
983  	    data = {
984  	        "command": json.dumps(argv),
985  	        "options": json.dumps(options),
986  	    }
987  	    if std_in:
988  	        data["stdin"] = std_in
989  	    data_send = urlencode(data)
990  	    code, output = sendHTTPRequest(
991  	        "localhost", "run_pcs", data_send, False, False
992  	    )
993  	
994  	    if code == 3:  # not authenticated
995  	        return [
996  	            [
997  	                "Unable to authenticate against the local pcsd. Run the same "
998  	                "command as root or authenticate yourself to the local pcsd "
999  	                "using command 'pcs client local-auth'"
1000 	            ],
1001 	            1,
1002 	            "",
1003 	            "",
1004 	        ]
1005 	    if code != 0:  # http error connecting to localhost
1006 	        return [[output], 1, "", ""]
1007 	
1008 	    try:
1009 	        output_json = json.loads(output)
1010 	        for key in ["status", "data"]:
1011 	            if key not in output_json:
1012 	                output_json[key] = None
1013 	    except ValueError:
1014 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1015 	    if output_json["status"] == "bad_command":
1016 	        return [["Command not allowed"], 1, "", ""]
1017 	    if output_json["status"] == "access_denied":
1018 	        return [["Access denied"], 1, "", ""]
1019 	    if output_json["status"] != "ok" or not output_json["data"]:
1020 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1021 	    try:
1022 	        exitcode = output_json["data"]["code"]
1023 	        std_out = output_json["data"]["stdout"]
1024 	        std_err = output_json["data"]["stderr"]
1025 	        return [[], exitcode, std_out, std_err]
1026 	    except KeyError:
1027 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1028 	
1029 	
1030 	def map_for_error_list(callab, iterab):
1031 	    """
1032 	    Commandline options: no options
1033 	    NOTE: callback 'callab' may use some options
1034 	    """
1035 	    error_list = []
1036 	    for item in iterab:
1037 	        retval, error = callab(item)
1038 	        if retval != 0:
1039 	            error_list.append(error)
1040 	    return error_list
1041 	
1042 	
1043 	def run_parallel(worker_list, wait_seconds=1):
1044 	    """
1045 	    Commandline options: no options
1046 	    """
1047 	    thread_list = set()
1048 	    for worker in worker_list:
1049 	        thread = threading.Thread(target=worker)
1050 	        thread.daemon = True
1051 	        thread.start()
1052 	        thread_list.add(thread)
1053 	
1054 	    while thread_list:
1055 	        thread = thread_list.pop()
1056 	        thread.join(wait_seconds)
1057 	        if thread.is_alive():
1058 	            thread_list.add(thread)
1059 	
1060 	
1061 	def create_task(report, action, node, *args, **kwargs):
1062 	    """
1063 	    Commandline options: no options
1064 	    """
1065 	
1066 	    def worker():
1067 	        returncode, output = action(node, *args, **kwargs)
1068 	        report(node, returncode, output)
1069 	
1070 	    return worker
1071 	
1072 	
1073 	def create_task_list(report, action, node_list, *args, **kwargs):
1074 	    """
1075 	    Commandline options: no options
1076 	    """
1077 	    return [
1078 	        create_task(report, action, node, *args, **kwargs) for node in node_list
1079 	    ]
1080 	
1081 	
1082 	def parallel_for_nodes(action, node_list, *args, **kwargs):
1083 	    """
1084 	    Commandline options: no options
1085 	    NOTE: callback 'action' may use some cmd options
1086 	    """
1087 	    node_errors = {}
1088 	
1089 	    def report(node, returncode, output):
1090 	        message = "{0}: {1}".format(node, output.strip())
1091 	        print_to_stderr(message)
1092 	        if returncode != 0:
1093 	            node_errors[node] = message
1094 	
1095 	    run_parallel(create_task_list(report, action, node_list, *args, **kwargs))
1096 	    return node_errors
1097 	
1098 	
1099 	def get_group_children(group_id):
1100 	    """
1101 	    Commandline options: no options
1102 	    """
1103 	    return dom_get_group_children(get_cib_dom(), group_id)
1104 	
1105 	
1106 	def dom_get_group_children(dom, group_id):
1107 	    groups = dom.getElementsByTagName("group")
1108 	    for g in groups:
1109 	        if g.getAttribute("id") == group_id:
1110 	            return [
1111 	                child_el.getAttribute("id")
1112 	                for child_el in get_group_children_el_from_el(g)
1113 	            ]
1114 	    return []
1115 	
1116 	
1117 	def get_group_children_el_from_el(group_el):
1118 	    child_resources = []
1119 	    for child in group_el.childNodes:
1120 	        if child.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1121 	            continue
1122 	        if child.tagName == "primitive":
1123 	            child_resources.append(child)
1124 	    return child_resources
1125 	
1126 	
1127 	def dom_get_clone_ms_resource(dom, clone_ms_id):
1128 	    """
1129 	    Commandline options: no options
1130 	    """
1131 	    clone_ms = dom_get_clone(dom, clone_ms_id) or dom_get_master(
1132 	        dom, clone_ms_id
1133 	    )
1134 	    if clone_ms:
1135 	        return dom_elem_get_clone_ms_resource(clone_ms)
1136 	    return None
1137 	
1138 	
1139 	def dom_elem_get_clone_ms_resource(clone_ms):
1140 	    """
1141 	    Commandline options: no options
1142 	    """
1143 	    for child in clone_ms.childNodes:
1144 	        if (
1145 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1146 	            and child.tagName in ["group", "primitive"]
1147 	        ):
1148 	            return child
1149 	    return None
1150 	
1151 	
1152 	def dom_get_resource_clone_ms_parent(dom, resource_id):
1153 	    """
1154 	    Commandline options: no options
1155 	    """
1156 	    resource = dom_get_resource(dom, resource_id) or dom_get_group(
1157 	        dom, resource_id
1158 	    )
1159 	    if resource:
1160 	        return dom_get_parent_by_tag_names(resource, ["clone", "master"])
1161 	    return None
1162 	
1163 	
1164 	def dom_get_resource_bundle_parent(dom, resource_id):
1165 	    """
1166 	    Commandline options: no options
1167 	    """
1168 	    resource = dom_get_resource(dom, resource_id)
1169 	    if resource:
1170 	        return dom_get_parent_by_tag_names(resource, ["bundle"])
1171 	    return None
1172 	
1173 	
1174 	def dom_get_master(dom, master_id):
1175 	    """
1176 	    Commandline options: no options
1177 	    """
1178 	    for master in dom.getElementsByTagName("master"):
1179 	        if master.getAttribute("id") == master_id:
1180 	            return master
1181 	    return None
1182 	
1183 	
1184 	def dom_get_clone(dom, clone_id):
1185 	    """
1186 	    Commandline options: no options
1187 	    """
1188 	    for clone in dom.getElementsByTagName("clone"):
1189 	        if clone.getAttribute("id") == clone_id:
1190 	            return clone
1191 	    return None
1192 	
1193 	
1194 	def dom_get_group(dom, group_id):
1195 	    """
1196 	    Commandline options: no options
1197 	    """
1198 	    for group in dom.getElementsByTagName("group"):
1199 	        if group.getAttribute("id") == group_id:
1200 	            return group
1201 	    return None
1202 	
1203 	
1204 	def dom_get_bundle(dom, bundle_id):
1205 	    """
1206 	    Commandline options: no options
1207 	    """
1208 	    for bundle in dom.getElementsByTagName("bundle"):
1209 	        if bundle.getAttribute("id") == bundle_id:
1210 	            return bundle
1211 	    return None
1212 	
1213 	
1214 	def dom_get_resource_bundle(bundle_el):
1215 	    """
1216 	    Commandline options: no options
1217 	    """
1218 	    for child in bundle_el.childNodes:
1219 	        if (
1220 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1221 	            and child.tagName == "primitive"
1222 	        ):
1223 	            return child
1224 	    return None
1225 	
1226 	
1227 	def dom_get_group_clone(dom, group_id):
1228 	    """
1229 	    Commandline options: no options
1230 	    """
1231 	    for clone in dom.getElementsByTagName("clone"):
1232 	        group = dom_get_group(clone, group_id)
1233 	        if group:
1234 	            return group
1235 	    return None
1236 	
1237 	
1238 	def dom_get_group_masterslave(dom, group_id):
1239 	    """
1240 	    Commandline options: no options
1241 	    """
1242 	    for master in dom.getElementsByTagName("master"):
1243 	        group = dom_get_group(master, group_id)
1244 	        if group:
1245 	            return group
1246 	    return None
1247 	
1248 	
1249 	def dom_get_resource(dom, resource_id):
1250 	    """
1251 	    Commandline options: no options
1252 	    """
1253 	    for primitive in dom.getElementsByTagName("primitive"):
1254 	        if primitive.getAttribute("id") == resource_id:
1255 	            return primitive
1256 	    return None
1257 	
1258 	
1259 	def dom_get_any_resource(dom, resource_id):
1260 	    """
1261 	    Commandline options: no options
1262 	    """
1263 	    return (
1264 	        dom_get_resource(dom, resource_id)
1265 	        or dom_get_group(dom, resource_id)
1266 	        or dom_get_clone(dom, resource_id)
1267 	        or dom_get_master(dom, resource_id)
1268 	    )
1269 	
1270 	
1271 	def dom_get_resource_clone(dom, resource_id):
1272 	    """
1273 	    Commandline options: no options
1274 	    """
1275 	    for clone in dom.getElementsByTagName("clone"):
1276 	        resource = dom_get_resource(clone, resource_id)
1277 	        if resource:
1278 	            return resource
1279 	    return None
1280 	
1281 	
1282 	def dom_get_resource_masterslave(dom, resource_id):
1283 	    """
1284 	    Commandline options: no options
1285 	    """
1286 	    for master in dom.getElementsByTagName("master"):
1287 	        resource = dom_get_resource(master, resource_id)
1288 	        if resource:
1289 	            return resource
1290 	    return None
1291 	
1292 	
1293 	# returns tuple (is_valid, error_message, correct_resource_id_if_exists)
1294 	# there is a duplicate code in pcs/lib/cib/constraint/constraint.py
1295 	# please use function in pcs/lib/cib/constraint/constraint.py
1296 	def validate_constraint_resource(dom, resource_id):  # noqa: PLR0911
1297 	    """
1298 	    Commandline options:
1299 	      * --force - allow constraint on any resource
1300 	    """
1301 	    # pylint: disable=too-many-return-statements
1302 	    resource_el = (
1303 	        dom_get_clone(dom, resource_id)
1304 	        or dom_get_master(dom, resource_id)
1305 	        or dom_get_bundle(dom, resource_id)
1306 	    )
1307 	    if resource_el:
1308 	        # clones, masters and bundles are always valid
1309 	        return True, "", resource_id
1310 	
1311 	    resource_el = dom_get_resource(dom, resource_id) or dom_get_group(
1312 	        dom, resource_id
1313 	    )
1314 	    if not resource_el:
1315 	        return False, "Resource '%s' does not exist" % resource_id, None
1316 	
1317 	    clone_el = dom_get_resource_clone_ms_parent(
1318 	        dom, resource_id
1319 	    ) or dom_get_resource_bundle_parent(dom, resource_id)
1320 	    if not clone_el:
1321 	        # a primitive and a group is valid if not in a clone nor a master nor a
1322 	        # bundle
1323 	        return True, "", resource_id
1324 	
1325 	    if "--force" in pcs_options:
1326 	        return True, "", clone_el.getAttribute("id")
1327 	
1328 	    if clone_el.tagName in ["clone", "master"]:
1329 	        return (
1330 	            False,
1331 	            "%s is a clone resource, you should use the clone id: %s "
1332 	            "when adding constraints. Use --force to override."
1333 	            % (resource_id, clone_el.getAttribute("id")),
1334 	            clone_el.getAttribute("id"),
1335 	        )
1336 	    if clone_el.tagName == "bundle":
1337 	        return (
1338 	            False,
1339 	            "%s is a bundle resource, you should use the bundle id: %s "
1340 	            "when adding constraints. Use --force to override."
1341 	            % (resource_id, clone_el.getAttribute("id")),
1342 	            clone_el.getAttribute("id"),
1343 	        )
1344 	    return True, "", resource_id
1345 	
1346 	
1347 	def validate_resources_not_in_same_group(dom, resource_id1, resource_id2):
1348 	    resource_el1 = dom_get_resource(dom, resource_id1)
1349 	    resource_el2 = dom_get_resource(dom, resource_id2)
1350 	    if not resource_el1 or not resource_el2:
1351 	        # Only primitive resources can be in a group. If at least one of the
1352 	        # resources is not a primitive (resource_el is None), then the
1353 	        # resources are not in the same group.
1354 	        return True
1355 	    group1 = dom_get_parent_by_tag_names(resource_el1, ["group"])
1356 	    group2 = dom_get_parent_by_tag_names(resource_el2, ["group"])
1357 	    if not group1 or not group2:
1358 	        return True
1359 	    return group1 != group2
1360 	
1361 	
1362 	def dom_get_resource_remote_node_name(dom_resource):
1363 	    """
1364 	    Commandline options: no options
1365 	    """
1366 	    if dom_resource.tagName != "primitive":
1367 	        return None
1368 	    if (
1369 	        dom_resource.getAttribute("class").lower() == "ocf"
1370 	        and dom_resource.getAttribute("provider").lower() == "pacemaker"
1371 	        and dom_resource.getAttribute("type").lower() == "remote"
1372 	    ):
1373 	        return dom_resource.getAttribute("id")
1374 	    return dom_get_meta_attr_value(dom_resource, "remote-node")
1375 	
1376 	
1377 	def dom_get_meta_attr_value(dom_resource, meta_name):
1378 	    """
1379 	    Commandline options: no options
1380 	    """
1381 	    for meta in dom_resource.getElementsByTagName("meta_attributes"):
1382 	        for nvpair in meta.getElementsByTagName("nvpair"):
1383 	            if nvpair.getAttribute("name") == meta_name:
1384 	                return nvpair.getAttribute("value")
1385 	    return None
1386 	
1387 	
1388 	def dom_get_node(dom, node_name):
1389 	    """
1390 	    Commandline options: no options
1391 	    """
1392 	    for e in dom.getElementsByTagName("node"):
1393 	        if e.hasAttribute("uname") and e.getAttribute("uname") == node_name:
1394 	            return e
1395 	    return None
1396 	
1397 	
1398 	def _dom_get_children_by_tag_name(dom_el, tag_name):
1399 	    """
1400 	    Commandline options: no options
1401 	    """
1402 	    return [
1403 	        node
1404 	        for node in dom_el.childNodes
1405 	        if node.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1406 	        and node.tagName == tag_name
1407 	    ]
1408 	
1409 	
1410 	def dom_get_parent_by_tag_names(dom_el, tag_names):
1411 	    """
1412 	    Commandline options: no options
1413 	    """
1414 	    parent = dom_el.parentNode
1415 	    while parent:
1416 	        if not isinstance(parent, xml.dom.minidom.Element):
1417 	            return None
1418 	        if parent.tagName in tag_names:
1419 	            return parent
1420 	        parent = parent.parentNode
1421 	    return None
1422 	
1423 	
1424 	# moved to pcs.lib.pacemaker.state
1425 	def get_resource_for_running_check(cluster_state, resource_id, stopped=False):
1426 	    """
1427 	    Commandline options: no options
1428 	    """
1429 	
1430 	    def _isnum(value):
1431 	        return all(char in list("0123456789") for char in value)
1432 	
1433 	    # pylint: disable=too-many-nested-blocks
1434 	    for clone in cluster_state.getElementsByTagName("clone"):
1435 	        if clone.getAttribute("id") == resource_id:
1436 	            for child in clone.childNodes:
1437 	                if child.nodeType == child.ELEMENT_NODE and child.tagName in [
1438 	                    "resource",
1439 	                    "group",
1440 	                ]:
1441 	                    resource_id = child.getAttribute("id")
1442 	                    # in a clone, a resource can have an id of '<name>:N'
1443 	                    if ":" in resource_id:
1444 	                        parts = resource_id.rsplit(":", 1)
1445 	                        if _isnum(parts[1]):
1446 	                            resource_id = parts[0]
1447 	                    break
1448 	    for group in cluster_state.getElementsByTagName("group"):
1449 	        # If resource is a clone it can have an id of '<resource name>:N'
1450 	        if group.getAttribute("id") == resource_id or group.getAttribute(
1451 	            "id"
1452 	        ).startswith(resource_id + ":"):
1453 	            if stopped:
1454 	                elem = group.getElementsByTagName("resource")[0]
1455 	            else:
1456 	                elem = group.getElementsByTagName("resource")[-1]
1457 	            resource_id = elem.getAttribute("id")
1458 	    return resource_id
1459 	
1460 	
1461 	# moved to pcs.lib.pacemaker.state
1462 	# see pcs.lib.commands.resource for usage
1463 	def resource_running_on(resource, passed_state=None, stopped=False):
1464 	    """
1465 	    Commandline options:
1466 	      * -f - has effect but doesn't make sense to check state of resource
1467 	    """
1468 	    # pylint: disable=too-many-locals
1469 	    nodes_started = []
1470 	    nodes_promoted = []
1471 	    nodes_unpromoted = []
1472 	    state = passed_state if passed_state else getClusterState()
1473 	    resource_original = resource
1474 	    resource = get_resource_for_running_check(state, resource, stopped)
1475 	    resources = state.getElementsByTagName("resource")
1476 	    for res in resources:
1477 	        # If resource is a clone it can have an id of '<resource name>:N'
1478 	        # If resource is a clone it will be found more than once - cannot break
1479 	        if (
1480 	            res.getAttribute("id") == resource
1481 	            or res.getAttribute("id").startswith(resource + ":")
1482 	        ) and res.getAttribute("failed") != "true":
1483 	            for node in res.getElementsByTagName("node"):
1484 	                node_name = node.getAttribute("name")
1485 	                role = res.getAttribute("role")
1486 	                if role == const.PCMK_ROLE_STARTED:
1487 	                    nodes_started.append(node_name)
1488 	                elif role in (
1489 	                    const.PCMK_ROLE_PROMOTED,
1490 	                    const.PCMK_ROLE_PROMOTED_LEGACY,
1491 	                ):
1492 	                    nodes_promoted.append(node_name)
1493 	                elif role in (
1494 	                    const.PCMK_ROLE_UNPROMOTED,
1495 	                    const.PCMK_ROLE_UNPROMOTED_LEGACY,
1496 	                ):
1497 	                    nodes_unpromoted.append(node_name)
1498 	    if not nodes_started and not nodes_promoted and not nodes_unpromoted:
1499 	        message = "Resource '%s' is not running on any node" % resource_original
1500 	    else:
1501 	        message_parts = []
1502 	        for alist, label in (
1503 	            (nodes_started, "running"),
1504 	            (nodes_promoted, str(const.PCMK_ROLE_PROMOTED).lower()),
1505 	            (nodes_unpromoted, str(const.PCMK_ROLE_UNPROMOTED).lower()),
1506 	        ):
1507 	            if alist:
1508 	                alist.sort()
1509 	                message_parts.append(
1510 	                    "%s on node%s %s"
1511 	                    % (label, "s" if len(alist) > 1 else "", ", ".join(alist))
1512 	                )
1513 	        message = "Resource '%s' is %s." % (
1514 	            resource_original,
1515 	            "; ".join(message_parts),
1516 	        )
1517 	    return {
1518 	        "message": message,
1519 	        "is_running": bool(nodes_started or nodes_promoted or nodes_unpromoted),
1520 	    }
1521 	
1522 	
1523 	def validate_wait_get_timeout(need_cib_support=True):
1524 	    """
1525 	    Commandline options:
1526 	      * --wait
1527 	      * -f - to check if -f and --wait are not used simultaneously
1528 	    """
1529 	    if need_cib_support and usefile:
1530 	        err("Cannot use '-f' together with '--wait'")
1531 	    wait_timeout = pcs_options["--wait"]
1532 	    if wait_timeout is None:
1533 	        return wait_timeout
1534 	    wait_timeout = timeout_to_seconds(wait_timeout)
1535 	    if wait_timeout is None:
1536 	        err(
1537 	            "%s is not a valid number of seconds to wait"
1538 	            % pcs_options["--wait"]
1539 	        )
1540 	    return wait_timeout
1541 	
1542 	
1543 	# Return matches from the CIB with the xpath_query
1544 	def get_cib_xpath(xpath_query):
1545 	    """
1546 	    Commandline options:
1547 	      * -f - CIB file
1548 	    """
1549 	    args = ["cibadmin", "-Q", "--xpath", xpath_query]
1550 	    output, retval = run(args)
1551 	    if retval != 0:
1552 	        return ""
1553 	    return output
1554 	
1555 	
1556 	def get_cib(scope=None):
1557 	    """
1558 	    Commandline options:
1559 	      * -f - CIB file
1560 	    """
1561 	    command = ["cibadmin", "-l", "-Q"]
1562 	    if scope:
1563 	        command.append("--scope=%s" % scope)
1564 	    output, retval = run(command)
1565 	    if retval != 0:
1566 	        if retval == 105 and scope:
1567 	            err("unable to get cib, scope '%s' not present in cib" % scope)
1568 	        else:
1569 	            err("unable to get cib")
1570 	    return output
1571 	
1572 	
1573 	def get_cib_dom(cib_xml=None):
1574 	    """
1575 	    Commandline options:
1576 	      * -f - CIB file
1577 	    """
1578 	    if cib_xml is None:
1579 	        cib_xml = get_cib()
1580 	    try:
1581 	        return parseString(cib_xml)
1582 	    except xml.parsers.expat.ExpatError:
1583 	        return err("unable to get cib")
1584 	
1585 	
1586 	# Replace only configuration section of cib with dom passed
1587 	def replace_cib_configuration(dom):
1588 	    """
1589 	    Commandline options:
1590 	      * -f - CIB file
1591 	    """
1592 	    new_dom = dom.toxml() if hasattr(dom, "toxml") else dom
1593 	    cmd = ["cibadmin", "--replace", "-V", "--xml-pipe", "-o", "configuration"]
1594 	    output, retval = run(cmd, False, new_dom)
1595 	    if retval != 0:
1596 	        err("Unable to update cib\n" + output)
1597 	
1598 	
1599 	def is_valid_cib_scope(scope):
1600 	    """
1601 	    Commandline options: no options
1602 	    """
1603 	    return scope in [
1604 	        "acls",
1605 	        "alerts",
1606 	        "configuration",
1607 	        "constraints",
1608 	        "crm_config",
1609 	        "fencing-topology",
1610 	        "nodes",
1611 	        "op_defaults",
1612 	        "resources",
1613 	        "rsc_defaults",
1614 	        "tags",
1615 	    ]
1616 	
1617 	
1618 	# Checks to see if id exists in the xml dom passed
1619 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1620 	def does_id_exist(dom, check_id):  # noqa: PLR0912
1621 	    """
1622 	    Commandline options: no options
1623 	    """
1624 	    # do not search in /cib/status, it may contain references to previously
1625 	    # existing and deleted resources and thus preventing creating them again
1626 	    document = (
1627 	        dom if isinstance(dom, xml.dom.minidom.Document) else dom.ownerDocument
1628 	    )
1629 	    cib_found = False
1630 	    for cib in _dom_get_children_by_tag_name(document, "cib"):
1631 	        cib_found = True
1632 	        for section in cib.childNodes:
1633 	            if section.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1634 	                continue
1635 	            if section.tagName == "status":
1636 	                continue
1637 	            for elem in section.getElementsByTagName("*"):
1638 	                if elem.getAttribute("id") == check_id:
1639 	                    return True
1640 	    if not cib_found:
1641 	        for elem in document.getElementsByTagName("*"):
1642 	            if elem.getAttribute("id") == check_id:
1643 	                return True
1644 	    return False
1645 	
1646 	
1647 	# Returns check_id if it doesn't exist in the dom, otherwise it adds an integer
1648 	# to the end of the id and increments it until a unique id is found
1649 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1650 	def find_unique_id(dom, check_id):
1651 	    """
1652 	    Commandline options: no options
1653 	    """
1654 	    counter = 1
1655 	    temp_id = check_id
1656 	    while does_id_exist(dom, temp_id):
1657 	        temp_id = check_id + "-" + str(counter)
1658 	        counter += 1
1659 	    return temp_id
1660 	
1661 	
1662 	# Checks to see if the specified operation already exists in passed set of
1663 	# operations
1664 	# pacemaker differentiates between operations only by name and interval
1665 	def operation_exists(operations_el, op_el):
1666 	    """
1667 	    Commandline options: no options
1668 	    """
1669 	    op_name = op_el.getAttribute("name")
1670 	    op_interval = timeout_to_seconds_legacy(op_el.getAttribute("interval"))
1671 	    return [
1672 	        op
1673 	        for op in operations_el.getElementsByTagName("op")
1674 	        if (
1675 	            op.getAttribute("name") == op_name
1676 	            and timeout_to_seconds_legacy(op.getAttribute("interval"))
1677 	            == op_interval
1678 	        )
1679 	    ]
1680 	
1681 	
1682 	def operation_exists_by_name(operations_el, op_el):
1683 	    """
1684 	    Commandline options: no options
1685 	    """
1686 	
1687 	    def get_role(_el, new_roles_supported):
1688 	        return common_pacemaker.role.get_value_for_cib(
1689 	            _el.getAttribute("role") or const.PCMK_ROLE_STARTED,
1690 	            new_roles_supported,
1691 	        )
1692 	
1693 	    new_roles_supported = isCibVersionSatisfied(
1694 	        operations_el, const.PCMK_NEW_ROLES_CIB_VERSION
1695 	    )
1696 	    existing = []
1697 	    op_name = op_el.getAttribute("name")
1698 	    op_role = get_role(op_el, new_roles_supported)
1699 	    ocf_check_level = None
1700 	    if op_name == "monitor":
1701 	        ocf_check_level = get_operation_ocf_check_level(op_el)
1702 	
1703 	    for op in operations_el.getElementsByTagName("op"):
1704 	        if op.getAttribute("name") == op_name:
1705 	            if (
1706 	                op_name != "monitor"
1707 	                or get_role(op, new_roles_supported) == op_role
1708 	                and ocf_check_level == get_operation_ocf_check_level(op)
1709 	            ):
1710 	                existing.append(op)
1711 	    return existing
1712 	
1713 	
1714 	def get_operation_ocf_check_level(operation_el):
1715 	    """
1716 	    Commandline options: no options
1717 	    """
1718 	    for attr_el in operation_el.getElementsByTagName("instance_attributes"):
1719 	        for nvpair_el in attr_el.getElementsByTagName("nvpair"):
1720 	            if (
1721 	                nvpair_el.getAttribute("name")
1722 	                == OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME
1723 	            ):
1724 	                return nvpair_el.getAttribute("value")
1725 	    return None
1726 	
1727 	
1728 	def set_node_attribute(prop, value, node):
1729 	    """
1730 	    Commandline options:
1731 	      * -f - CIB file
1732 	      * --force - no error if attribute to delete doesn't exist
1733 	    """
1734 	    if value == "":
1735 	        o, r = run(
1736 	            [
1737 	                "crm_attribute",
1738 	                "-t",
1739 	                "nodes",
1740 	                "--node",
1741 	                node,
1742 	                "--name",
1743 	                prop,
1744 	                "--query",
1745 	            ]
1746 	        )
1747 	        if r != 0 and "--force" not in pcs_options:
1748 	            err(
1749 	                "attribute: '%s' doesn't exist for node: '%s'" % (prop, node),
1750 	                False,
1751 	            )
1752 	            # This return code is used by pcsd
1753 	            sys.exit(2)
1754 	        o, r = run(
1755 	            [
1756 	                "crm_attribute",
1757 	                "-t",
1758 	                "nodes",
1759 	                "--node",
1760 	                node,
1761 	                "--name",
1762 	                prop,
1763 	                "--delete",
1764 	            ]
1765 	        )
1766 	    else:
1767 	        o, r = run(
1768 	            [
1769 	                "crm_attribute",
1770 	                "-t",
1771 	                "nodes",
1772 	                "--node",
1773 	                node,
1774 	                "--name",
1775 	                prop,
1776 	                "--update",
1777 	                value,
1778 	            ]
1779 	        )
1780 	
1781 	    if r != 0:
1782 	        err("unable to set attribute %s\n%s" % (prop, o))
1783 	
1784 	
1785 	def get_terminal_input(message=None):
1786 	    """
1787 	    Commandline options: no options
1788 	    """
1789 	    if message:
1790 	        sys.stdout.write(message)
1791 	        sys.stdout.flush()
1792 	    try:
1793 	        return input("")
1794 	    except EOFError:
1795 	        return ""
1796 	    except KeyboardInterrupt:
1797 	        print("Interrupted")
1798 	        sys.exit(1)
1799 	
1800 	
1801 	def _get_continue_confirmation_interactive(warning_text: str) -> bool:
1802 	    """
1803 	    Warns user and asks for permission to continue. Returns True if user wishes
1804 	    to continue, False otherwise.
1805 	
1806 	    This function is mostly intended for prompting user to confirm destructive
1807 	    operations - that's why WARNING is in all caps here and user is asked to
1808 	    explicitly type 'yes' or 'y' to continue.
1809 	
1810 	    warning_text -- describes action that we want the user to confirm
1811 	    """
1812 	    print(f"WARNING: {warning_text}")
1813 	    print("Type 'yes' or 'y' to proceed, anything else to cancel: ", end="")
1814 	    response = get_terminal_input()
1815 	    if response in ["yes", "y"]:
1816 	        return True
1817 	    print("Canceled")
1818 	    return False
1819 	
1820 	
1821 	def is_run_interactive() -> bool:
1822 	    """
1823 	    Return True if pcs is running in an interactive environment, False otherwise
1824 	    """
1825 	    return (
1826 	        sys.stdin is not None
1827 	        and sys.stdout is not None
1828 	        and sys.stdin.isatty()
1829 	        and sys.stdout.isatty()
1830 	    )
1831 	
1832 	
1833 	def get_continue_confirmation(
1834 	    warning_text: str, yes: bool, force: bool
1835 	) -> bool:
1836 	    """
1837 	    Either asks user to confirm continuation interactively or use --yes to
1838 	    override when running from a script. Returns True if user wants to continue.
1839 	    Returns False if user cancels the action. If a non-interactive environment
1840 	    is detected, pcs exits with an error formed from warning_text.
1841 	
1842 	    warning_text -- describes action that we want the user to confirm
1843 	    yes -- was --yes flag provided?
1844 	    force -- was --force flag provided? (deprecated)
1845 	    """
1846 	    if force and not yes:
1847 	        # Force may be specified for overriding library errors. We don't want
1848 	        # to report an issue in that case.
1849 	        # deprecated in the first pcs-0.12 version
1850 	        reports_output.deprecation_warning(
1851 	            "Using --force to confirm this action is deprecated and might be "
1852 	            "removed in a future release, use --yes instead"
1853 	        )
1854 	    if yes or force:
1855 	        reports_output.warn(warning_text)
1856 	        return True
1857 	    if not is_run_interactive():
1858 	        err(f"{warning_text}, use --yes to override")
1859 	        return False
1860 	    return _get_continue_confirmation_interactive(warning_text)
1861 	
1862 	
1863 	def get_terminal_password(message="Password: "):
1864 	    """
1865 	    Commandline options: no options
1866 	    """
1867 	    if sys.stdin is not None and sys.stdin.isatty():
1868 	        try:
1869 	            return getpass.getpass(message)
1870 	        except KeyboardInterrupt:
1871 	            print("Interrupted")
1872 	            sys.exit(1)
1873 	    else:
1874 	        return get_terminal_input(message)
1875 	
1876 	
1877 	# Returns an xml dom containing the current status of the cluster
1878 	# DEPRECATED, please use
1879 	# ClusterState(lib.pacemaker.live.get_cluster_status_dom()) instead
1880 	def getClusterState():
1881 	    """
1882 	    Commandline options:
1883 	      * -f - CIB file
1884 	    """
1885 	    xml_string, returncode = run(
1886 	        ["crm_mon", "--one-shot", "--output-as=xml", "--inactive"],
1887 	        ignore_stderr=True,
1888 	    )
1889 	    if returncode != 0:
1890 	        err("error running crm_mon, is pacemaker running?")
CID (unavailable; MK=86b92a140d8bf1944f767fd2f530b1cb) (#1 of 1): XML external entity processing enabled (SIGMA.xml_external_entity_enabled):
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1891 	    return parseString(xml_string)
1892 	
1893 	
1894 	def write_empty_cib(cibfile):
1895 	    """
1896 	    Commandline options: no options
1897 	    """
1898 	    empty_xml = """
1899 	        <cib admin_epoch="0" epoch="1" num_updates="1" validate-with="pacemaker-3.1">
1900 	          <configuration>
1901 	            <crm_config/>
1902 	            <nodes/>
1903 	            <resources/>
1904 	            <constraints/>
1905 	          </configuration>
1906 	          <status/>
1907 	        </cib>
1908 	    """
1909 	    with open(cibfile, "w") as f:
1910 	        f.write(empty_xml)
1911 	
1912 	
1913 	# Test if 'var' is a score or option (contains an '=')
1914 	def is_score_or_opt(var):
1915 	    """
1916 	    Commandline options: no options
1917 	    """
1918 	    if is_score(var):
1919 	        return True
1920 	    return var.find("=") != -1
1921 	
1922 	
1923 	def is_score(var):
1924 	    """
1925 	    Commandline options: no options
1926 	    """
1927 	    return is_score_value(var)
1928 	
1929 	
1930 	def validate_xml_id(var: str, description: str = "id") -> Tuple[bool, str]:
1931 	    """
1932 	    Commandline options: no options
1933 	    """
1934 	    report_list: ReportItemList = []
1935 	    validate_id(var, description, report_list)
1936 	    if report_list:
1937 	        return False, report_list[0].message.message
1938 	    return True, ""
1939 	
1940 	
1941 	def err(errorText: str, exit_after_error: bool = True) -> None:
1942 	    retval = reports_output.error(errorText)
1943 	    if exit_after_error:
1944 	        raise retval
1945 	
1946 	
1947 	@lru_cache(typed=True)
1948 	def get_service_manager() -> ServiceManagerInterface:
1949 	    return _get_service_manager(cmd_runner(), get_report_processor())
1950 	
1951 	
1952 	def enableServices():
1953 	    """
1954 	    Commandline options: no options
1955 	    """
1956 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
1957 	    service_list = ["corosync", "pacemaker"]
1958 	    if need_to_handle_qdevice_service():
1959 	        service_list.append("corosync-qdevice")
1960 	    service_manager = get_service_manager()
1961 	
1962 	    report_item_list = []
1963 	    for service in service_list:
1964 	        try:
1965 	            service_manager.enable(service)
1966 	        except ManageServiceError as e:
1967 	            report_item_list.append(service_exception_to_report(e))
1968 	    if report_item_list:
1969 	        raise LibraryError(*report_item_list)
1970 	
1971 	
1972 	def disableServices():
1973 	    """
1974 	    Commandline options: no options
1975 	    """
1976 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
1977 	    service_list = ["corosync", "pacemaker"]
1978 	    if need_to_handle_qdevice_service():
1979 	        service_list.append("corosync-qdevice")
1980 	    service_manager = get_service_manager()
1981 	
1982 	    report_item_list = []
1983 	    for service in service_list:
1984 	        try:
1985 	            service_manager.disable(service)
1986 	        except ManageServiceError as e:
1987 	            report_item_list.append(service_exception_to_report(e))
1988 	    if report_item_list:
1989 	        raise LibraryError(*report_item_list)
1990 	
1991 	
1992 	def start_service(service):
1993 	    """
1994 	    Commandline options: no options
1995 	    """
1996 	    service_manager = get_service_manager()
1997 	
1998 	    try:
1999 	        service_manager.start(service)
2000 	    except ManageServiceError as e:
2001 	        raise LibraryError(service_exception_to_report(e)) from e
2002 	
2003 	
2004 	def stop_service(service):
2005 	    """
2006 	    Commandline options: no options
2007 	    """
2008 	    service_manager = get_service_manager()
2009 	
2010 	    try:
2011 	        service_manager.stop(service)
2012 	    except ManageServiceError as e:
2013 	        raise LibraryError(service_exception_to_report(e)) from e
2014 	
2015 	
2016 	def write_file(path, data, permissions=0o644, binary=False):
2017 	    """
2018 	    Commandline options:
2019 	      * --force - overwrite a file if it already exists
2020 	    """
2021 	    if os.path.exists(path):
2022 	        if "--force" not in pcs_options:
2023 	            return False, "'%s' already exists, use --force to overwrite" % path
2024 	        try:
2025 	            os.remove(path)
2026 	        except EnvironmentError as e:
2027 	            return False, "unable to remove '%s': %s" % (path, e)
2028 	    mode = "wb" if binary else "w"
2029 	    try:
2030 	        with os.fdopen(
2031 	            os.open(path, os.O_WRONLY | os.O_CREAT, permissions), mode
2032 	        ) as outfile:
2033 	            outfile.write(data)
2034 	    except EnvironmentError as e:
2035 	        return False, "unable to write to '%s': %s" % (path, e)
2036 	    return True, ""
2037 	
2038 	
2039 	def tar_add_file_data(  # noqa: PLR0913
2040 	    tarball,
2041 	    data,
2042 	    name,
2043 	    *,
2044 	    mode=None,
2045 	    uid=None,
2046 	    gid=None,
2047 	    uname=None,
2048 	    gname=None,
2049 	    mtime=None,
2050 	):
2051 	    # pylint: disable=too-many-arguments
2052 	    """
2053 	    Commandline options: no options
2054 	    """
2055 	    info = tarfile.TarInfo(name)
2056 	    info.size = len(data)
2057 	    info.type = tarfile.REGTYPE
2058 	    info.mtime = int(time.time()) if mtime is None else mtime
2059 	    if mode is not None:
2060 	        info.mode = mode
2061 	    if uid is not None:
2062 	        info.uid = uid
2063 	    if gid is not None:
2064 	        info.gid = gid
2065 	    if uname is not None:
2066 	        info.uname = uname
2067 	    if gname is not None:
2068 	        info.gname = gname
2069 	    data_io = BytesIO(data)
2070 	    tarball.addfile(info, data_io)
2071 	    data_io.close()
2072 	
2073 	
2074 	# DEPRECATED, please use pcs.lib.pacemaker.live.simulate_cib
2075 	def simulate_cib(cib_dom):
2076 	    """
2077 	    Commandline options: no options
2078 	    """
2079 	    try:
2080 	        with (
2081 	            tempfile.NamedTemporaryFile(
2082 	                mode="w+", suffix=".pcs"
2083 	            ) as new_cib_file,
2084 	            tempfile.NamedTemporaryFile(
2085 	                mode="w+", suffix=".pcs"
2086 	            ) as transitions_file,
2087 	        ):
2088 	            output, retval = run(
2089 	                [
2090 	                    "crm_simulate",
2091 	                    "--simulate",
2092 	                    "--save-output",
2093 	                    new_cib_file.name,
2094 	                    "--save-graph",
2095 	                    transitions_file.name,
2096 	                    "--xml-pipe",
2097 	                ],
2098 	                string_for_stdin=cib_dom.toxml(),
2099 	            )
2100 	            if retval != 0:
2101 	                return err("Unable to run crm_simulate:\n%s" % output)
2102 	            new_cib_file.seek(0)
2103 	            transitions_file.seek(0)
2104 	            return (
2105 	                output,
2106 	                parseString(transitions_file.read()),
2107 	                parseString(new_cib_file.read()),
2108 	            )
2109 	    except (EnvironmentError, xml.parsers.expat.ExpatError) as e:
2110 	        return err("Unable to run crm_simulate:\n%s" % e)
2111 	    except xml.etree.ElementTree.ParseError as e:
2112 	        return err("Unable to run crm_simulate:\n%s" % e)
2113 	
2114 	
2115 	# DEPRECATED
2116 	# please use pcs.lib.pacemaker.simulate.get_operations_from_transitions
2117 	def get_operations_from_transitions(transitions_dom):
2118 	    """
2119 	    Commandline options: no options
2120 	    """
2121 	    operation_list = []
2122 	    watched_operations = (
2123 	        "start",
2124 	        "stop",
2125 	        "promote",
2126 	        "demote",
2127 	        "migrate_from",
2128 	        "migrate_to",
2129 	    )
2130 	    for rsc_op in transitions_dom.getElementsByTagName("rsc_op"):
2131 	        primitives = rsc_op.getElementsByTagName("primitive")
2132 	        if not primitives:
2133 	            continue
2134 	        if rsc_op.getAttribute("operation").lower() not in watched_operations:
2135 	            continue
2136 	        for prim in primitives:
2137 	            prim_id = prim.getAttribute("id")
2138 	            operation_list.append(
2139 	                (
2140 	                    int(rsc_op.getAttribute("id")),
2141 	                    {
2142 	                        "id": prim_id,
2143 	                        "long_id": prim.getAttribute("long-id") or prim_id,
2144 	                        "operation": rsc_op.getAttribute("operation").lower(),
2145 	                        "on_node": rsc_op.getAttribute("on_node"),
2146 	                    },
2147 	                )
2148 	            )
2149 	    operation_list.sort(key=lambda x: x[0])
2150 	    return [op[1] for op in operation_list]
2151 	
2152 	
2153 	def get_resources_location_from_operations(cib_dom, resources_operations):
2154 	    """
2155 	    Commandline options:
2156 	      * --force - allow constraints on any resource, may not have any effect as
2157 	        an invalid constraint is ignored anyway
2158 	    """
2159 	    locations = {}
2160 	    for res_op in resources_operations:
2161 	        operation = res_op["operation"]
2162 	        if operation not in ("start", "promote", "migrate_from"):
2163 	            continue
2164 	        long_id = res_op["long_id"]
2165 	        if long_id not in locations:
2166 	            # Move clone instances as if they were non-cloned resources, it
2167 	            # really works with current pacemaker (1.1.13-6). Otherwise there
2168 	            # is probably no way to move them other then setting their
2169 	            # stickiness to 0.
2170 	            res_id = res_op["id"]
2171 	            if ":" in res_id:
2172 	                res_id = res_id.split(":")[0]
2173 	            id_for_constraint = validate_constraint_resource(cib_dom, res_id)[2]
2174 	            if not id_for_constraint:
2175 	                continue
2176 	            locations[long_id] = {
2177 	                "id": res_op["id"],
2178 	                "long_id": long_id,
2179 	                "id_for_constraint": id_for_constraint,
2180 	            }
2181 	        if operation in ("start", "migrate_from"):
2182 	            locations[long_id]["start_on_node"] = res_op["on_node"]
2183 	        if operation == "promote":
2184 	            locations[long_id]["promote_on_node"] = res_op["on_node"]
2185 	    return {
2186 	        key: val
2187 	        for key, val in locations.items()
2188 	        if "start_on_node" in val or "promote_on_node" in val
2189 	    }
2190 	
2191 	
2192 	def get_remote_quorumtool_output(node):
2193 	    """
2194 	    Commandline options:
2195 	      * --request-timeout - timeout for HTTP requests
2196 	    """
2197 	    return sendHTTPRequest(node, "remote/get_quorum_info", None, False, False)
2198 	
2199 	
2200 	# return True if quorumtool_output is a string returned when the node is off
2201 	def is_node_offline_by_quorumtool_output(quorum_info):
2202 	    """
2203 	    Commandline options: no options
2204 	    """
2205 	    return quorum_info.strip() == "Cannot initialize CMAP service"
2206 	
2207 	
2208 	def dom_prepare_child_element(dom_element, tag_name, id_candidate):
2209 	    """
2210 	    Commandline options: no options
2211 	    """
2212 	    child_elements = [
2213 	        child
2214 	        for child in dom_element.childNodes
2215 	        if child.nodeType == child.ELEMENT_NODE and child.tagName == tag_name
2216 	    ]
2217 	
2218 	    if not child_elements:
2219 	        dom = dom_element.ownerDocument
2220 	        child_element = dom.createElement(tag_name)
2221 	        child_element.setAttribute("id", find_unique_id(dom, id_candidate))
2222 	        dom_element.appendChild(child_element)
2223 	    else:
2224 	        child_element = child_elements[0]
2225 	    return child_element
2226 	
2227 	
2228 	def dom_update_nvset(dom_element, nvpair_tuples, tag_name, id_candidate):
2229 	    """
2230 	    Commandline options: no options
2231 	    """
2232 	    # Already ported to pcs.libcib.nvpair
2233 	
2234 	    # Do not ever remove the nvset element, even if it is empty. There may be
2235 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2236 	    # and removing) but not nvsets. In such a case, removing the nvset would
2237 	    # cause the whole change to be rejected by pacemaker with a "permission
2238 	    # denied" message.
2239 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2240 	    if not nvpair_tuples:
2241 	        return
2242 	
2243 	    only_removing = True
2244 	    for _, value in nvpair_tuples:
2245 	        if value != "":
2246 	            only_removing = False
2247 	            break
2248 	
2249 	    # Do not use dom.getElementsByTagName, that would get elements we do not
2250 	    # want to. For example if dom_element is a clone, we would get the clones's
2251 	    # as well as clone's primitive's attributes.
2252 	    nvset_element_list = _dom_get_children_by_tag_name(dom_element, tag_name)
2253 	
2254 	    # Do not create new nvset if we are only removing values from it.
2255 	    if not nvset_element_list and only_removing:
2256 	        return
2257 	
2258 	    if not nvset_element_list:
2259 	        dom = dom_element.ownerDocument
2260 	        nvset_element = dom.createElement(tag_name)
2261 	        nvset_element.setAttribute("id", find_unique_id(dom, id_candidate))
2262 	        dom_element.appendChild(nvset_element)
2263 	    else:
2264 	        nvset_element = nvset_element_list[0]
2265 	
2266 	    for name, value in nvpair_tuples:
2267 	        dom_update_nv_pair(
2268 	            nvset_element, name, value, nvset_element.getAttribute("id") + "-"
2269 	        )
2270 	
2271 	
2272 	def dom_update_nv_pair(dom_element, name, value, id_prefix=""):
2273 	    """
2274 	    Commandline options: no options
2275 	    """
2276 	    # Do not ever remove the nvset element, even if it is empty. There may be
2277 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2278 	    # and removing) but not nvsets. In such a case, removing the nvset would
2279 	    # cause the whole change to be rejected by pacemaker with a "permission
2280 	    # denied" message.
2281 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2282 	
2283 	    dom = dom_element.ownerDocument
2284 	    element_found = False
2285 	    for el in dom_element.getElementsByTagName("nvpair"):
2286 	        if el.getAttribute("name") == name:
2287 	            element_found = True
2288 	            if value == "":
2289 	                dom_element.removeChild(el)
2290 	            else:
2291 	                el.setAttribute("value", value)
2292 	            break
2293 	    if not element_found and value != "":
2294 	        el = dom.createElement("nvpair")
2295 	        el.setAttribute("id", id_prefix + name)
2296 	        el.setAttribute("name", name)
2297 	        el.setAttribute("value", value)
2298 	        dom_element.appendChild(el)
2299 	    return dom_element
2300 	
2301 	
2302 	# Passed an array of strings ["a=b","c=d"], return array of tuples
2303 	# [("a","b"),("c","d")]
2304 	def convert_args_to_tuples(ra_values):
2305 	    """
2306 	    Commandline options: no options
2307 	    """
2308 	    ret = []
2309 	    for ra_val in ra_values:
2310 	        if ra_val.count("=") != 0:
2311 	            split_val = ra_val.split("=", 1)
2312 	            ret.append((split_val[0], split_val[1]))
2313 	    return ret
2314 	
2315 	
2316 	def is_int(val):
2317 	    try:
2318 	        int(val)
2319 	        return True
2320 	    except ValueError:
2321 	        return False
2322 	
2323 	
2324 	def dom_update_utilization(dom_element, attributes, id_prefix=""):
2325 	    """
2326 	    Commandline options: no options
2327 	    """
2328 	    attr_tuples = []
2329 	    for name, value in sorted(attributes.items()):
2330 	        if value != "" and not is_int(value):
2331 	            err(
2332 	                "Value of utilization attribute must be integer: "
2333 	                "'{0}={1}'".format(name, value)
2334 	            )
2335 	        attr_tuples.append((name, value))
2336 	    dom_update_nvset(
2337 	        dom_element,
2338 	        attr_tuples,
2339 	        "utilization",
2340 	        id_prefix + dom_element.getAttribute("id") + "-utilization",
2341 	    )
2342 	
2343 	
2344 	def dom_update_meta_attr(dom_element, attributes):
2345 	    """
2346 	    Commandline options: no options
2347 	    """
2348 	    dom_update_nvset(
2349 	        dom_element,
2350 	        attributes,
2351 	        "meta_attributes",
2352 	        dom_element.getAttribute("id") + "-meta_attributes",
2353 	    )
2354 	
2355 	
2356 	def dom_update_instance_attr(dom_element, attributes):
2357 	    """
2358 	    Commandline options: no options
2359 	    """
2360 	    dom_update_nvset(
2361 	        dom_element,
2362 	        attributes,
2363 	        "instance_attributes",
2364 	        dom_element.getAttribute("id") + "-instance_attributes",
2365 	    )
2366 	
2367 	
2368 	def get_utilization(element, filter_name=None):
2369 	    """
2370 	    Commandline options: no options
2371 	    """
2372 	    utilization = {}
2373 	    for e in element.getElementsByTagName("utilization"):
2374 	        for u in e.getElementsByTagName("nvpair"):
2375 	            name = u.getAttribute("name")
2376 	            if filter_name is not None and name != filter_name:
2377 	                continue
2378 	            utilization[name] = u.getAttribute("value")
2379 	        # Use just first element of utilization attributes. We don't support
2380 	        # utilization with rules just yet.
2381 	        break
2382 	    return utilization
2383 	
2384 	
2385 	def get_utilization_str(element, filter_name=None):
2386 	    """
2387 	    Commandline options: no options
2388 	    """
2389 	    output = []
2390 	    for name, value in sorted(get_utilization(element, filter_name).items()):
2391 	        output.append(name + "=" + value)
2392 	    return " ".join(output)
2393 	
2394 	
2395 	def get_lib_env() -> LibraryEnvironment:
2396 	    """
2397 	    Commandline options:
2398 	      * -f - CIB file
2399 	      * --corosync_conf - corosync.conf file
2400 	      * --request-timeout - timeout of HTTP requests
2401 	    """
2402 	    user = None
2403 	    groups = None
2404 	    if os.geteuid() == 0:
2405 	        for name in ("CIB_user", "CIB_user_groups"):
2406 	            if name in os.environ and os.environ[name].strip():
2407 	                value = os.environ[name].strip()
2408 	                if name == "CIB_user":
2409 	                    user = value
2410 	                else:
2411 	                    groups = value.split(" ")
2412 	
2413 	    cib_data = None
2414 	    if usefile:
2415 	        cib_data = get_cib()
2416 	
2417 	    corosync_conf_data = None
2418 	    if "--corosync_conf" in pcs_options:
2419 	        conf = pcs_options["--corosync_conf"]
2420 	        try:
2421 	            with open(conf) as corosync_conf_file:
2422 	                corosync_conf_data = corosync_conf_file.read()
2423 	        except IOError as e:
2424 	            err("Unable to read %s: %s" % (conf, e.strerror))
2425 	
2426 	    return LibraryEnvironment(
2427 	        logging.getLogger("pcs"),
2428 	        get_report_processor(),
2429 	        user,
2430 	        groups,
2431 	        cib_data,
2432 	        corosync_conf_data,
2433 	        known_hosts_getter=read_known_hosts_file,
2434 	        request_timeout=pcs_options.get("--request-timeout"),
2435 	    )
2436 	
2437 	
2438 	def get_cib_user_groups():
2439 	    """
2440 	    Commandline options: no options
2441 	    """
2442 	    user = None
2443 	    groups = None
2444 	    if os.geteuid() == 0:
2445 	        for name in ("CIB_user", "CIB_user_groups"):
2446 	            if name in os.environ and os.environ[name].strip():
2447 	                value = os.environ[name].strip()
2448 	                if name == "CIB_user":
2449 	                    user = value
2450 	                else:
2451 	                    groups = value.split(" ")
2452 	    return user, groups
2453 	
2454 	
2455 	def get_cli_env():
2456 	    """
2457 	    Commandline options:
2458 	      * --debug
2459 	      * --request-timeout
2460 	    """
2461 	    env = Env()
2462 	    env.user, env.groups = get_cib_user_groups()
2463 	    env.known_hosts_getter = read_known_hosts_file
2464 	    env.report_processor = get_report_processor()
2465 	    env.request_timeout = pcs_options.get("--request-timeout")
2466 	    return env
2467 	
2468 	
2469 	def get_middleware_factory():
2470 	    """
2471 	    Commandline options:
2472 	      * --corosync_conf
2473 	      * --name
2474 	      * --booth-conf
2475 	      * --booth-key
2476 	      * -f
2477 	    """
2478 	    return middleware.create_middleware_factory(
2479 	        cib=middleware.cib(filename if usefile else None, touch_cib_file),
2480 	        corosync_conf_existing=middleware.corosync_conf_existing(
2481 	            pcs_options.get("--corosync_conf")
2482 	        ),
2483 	        booth_conf=pcs.cli.booth.env.middleware_config(
2484 	            pcs_options.get("--booth-conf"),
2485 	            pcs_options.get("--booth-key"),
2486 	        ),
2487 	    )
2488 	
2489 	
2490 	def get_library_wrapper():
2491 	    """
2492 	    Commandline options:
2493 	      * --debug
2494 	      * --request-timeout
2495 	      * --corosync_conf
2496 	      * --name
2497 	      * --booth-conf
2498 	      * --booth-key
2499 	      * -f
2500 	    NOTE: usage of options may depend on used middleware for particular command
2501 	    """
2502 	    return Library(get_cli_env(), get_middleware_factory())
2503 	
2504 	
2505 	def exit_on_cmdline_input_error(
2506 	    error: CmdLineInputError, main_name: str, usage_name: StringSequence
2507 	) -> None:
2508 	    if error and error.message:
2509 	        reports_output.error(error.message)
2510 	    if error and error.hint:
2511 	        print_to_stderr(f"Hint: {error.hint}")
2512 	    if not error or (not error.message or error.show_both_usage_and_message):
2513 	        usage.show(main_name, list(usage_name))
2514 	    sys.exit(1)
2515 	
2516 	
2517 	def get_report_processor() -> ReportProcessor:
2518 	    return ReportProcessorToConsole(debug="--debug" in pcs_options)
2519 	
2520 	
2521 	def get_user_and_pass() -> tuple[str, str]:
2522 	    """
2523 	    Commandline options:
2524 	      * -u - username
2525 	      * -p - password
2526 	    """
2527 	    username = (
2528 	        pcs_options["-u"]
2529 	        if "-u" in pcs_options
2530 	        else get_terminal_input("Username: ")
2531 	    )
2532 	    password = (
2533 	        pcs_options["-p"] if "-p" in pcs_options else get_terminal_password()
2534 	    )
2535 	    return username, password
2536 	
2537 	
2538 	def get_input_modifiers() -> InputModifiers:
2539 	    return InputModifiers(pcs_options)
2540 	
2541 	
2542 	def get_token_from_file(file_name: str) -> str:
2543 	    try:
2544 	        with open(file_name, "rb") as file:
2545 	            # 256 to stay backwards compatible
2546 	            max_size = 256
2547 	            value_bytes = file.read(max_size + 1)
2548 	            if len(value_bytes) > max_size:
2549 	                err(f"Maximal token size of {max_size} bytes exceeded")
2550 	            if not value_bytes:
2551 	                err(f"File '{file_name}' is empty")
2552 	            return base64.b64encode(value_bytes).decode("utf-8")
2553 	    except OSError as e:
2554 	        err(f"Unable to read file '{file_name}': {e}", exit_after_error=False)
2555 	        raise SystemExit(1) from e
2556 	
2557 	
2558 	def print_warning_if_utilization_attrs_has_no_effect(
2559 	    properties_facade: PropertyConfigurationFacade,
2560 	) -> None:
2561 	    PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS = [
2562 	        "balanced",
2563 	        "minimal",
2564 	        "utilization",
2565 	    ]
2566 	    value = properties_facade.get_property_value_or_default(
2567 	        "placement-strategy"
2568 	    )
2569 	    if value not in PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS:
2570 	        reports_output.warn(
2571 	            "Utilization attributes configuration has no effect until cluster "
2572 	            "property option 'placement-strategy' is set to one of the "
2573 	            "values: "
2574 	            f"{format_list(PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS)}"
2575 	        )
2576