1    	# pylint: disable=too-many-lines
2    	import base64
3    	import fcntl
4    	import getpass
5    	import json
6    	import logging
7    	import os
8    	import re
9    	import signal
10   	import struct
11   	import subprocess
12   	import sys
13   	import tarfile
14   	import tempfile
15   	import termios
16   	import threading
17   	import time
18   	import xml.dom.minidom
19   	from functools import lru_cache
20   	from io import BytesIO
21   	from textwrap import dedent
22   	from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast
23   	from urllib.parse import urlencode
24   	from xml.dom.minidom import Document as DomDocument
25   	from xml.dom.minidom import parseString
26   	
27   	import pcs.cli.booth.env
28   	import pcs.lib.corosync.config_parser as corosync_conf_parser
29   	from pcs import settings, usage
30   	from pcs.cli.cluster_property.output import PropertyConfigurationFacade
31   	from pcs.cli.common import middleware
32   	from pcs.cli.common.env_cli import Env
33   	from pcs.cli.common.errors import CmdLineInputError
34   	from pcs.cli.common.lib_wrapper import Library
35   	from pcs.cli.common.parse_args import InputModifiers
36   	from pcs.cli.common.tools import print_to_stderr, timeout_to_seconds_legacy
37   	from pcs.cli.file import metadata as cli_file_metadata
38   	from pcs.cli.reports import ReportProcessorToConsole, process_library_reports
39   	from pcs.cli.reports import output as reports_output
40   	from pcs.common import const, file_type_codes
41   	from pcs.common import file as pcs_file
42   	from pcs.common import pacemaker as common_pacemaker
43   	from pcs.common import pcs_pycurl as pycurl
44   	from pcs.common.host import PcsKnownHost
45   	from pcs.common.pacemaker.resource.operations import (
46   	    OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME,
47   	)
48   	from pcs.common.reports import ReportProcessor
49   	from pcs.common.reports.messages import CibUpgradeFailedToMinimalRequiredVersion
50   	from pcs.common.services.errors import ManageServiceError
51   	from pcs.common.services.interfaces import ServiceManagerInterface
52   	from pcs.common.str_tools import format_list
53   	from pcs.common.tools import Version, timeout_to_seconds
54   	from pcs.common.types import StringSequence
55   	from pcs.lib.corosync.config_facade import ConfigFacade as corosync_conf_facade
56   	from pcs.lib.env import LibraryEnvironment
57   	from pcs.lib.errors import LibraryError
58   	from pcs.lib.external import CommandRunner, is_proxy_set
59   	from pcs.lib.file.instance import FileInstance as LibFileInstance
60   	from pcs.lib.host.config.facade import Facade as KnownHostsFacade
61   	from pcs.lib.interface.config import ParserErrorException
62   	from pcs.lib.pacemaker.live import get_cluster_status_dom
63   	from pcs.lib.pacemaker.state import ClusterState
64   	from pcs.lib.pacemaker.values import is_score as is_score_value
65   	from pcs.lib.pacemaker.values import validate_id
66   	from pcs.lib.services import get_service_manager as _get_service_manager
67   	from pcs.lib.services import service_exception_to_report
68   	
69   	if TYPE_CHECKING:
70   	    from pcs.common.reports.item import ReportItemList
71   	
72   	# pylint: disable=invalid-name
73   	# pylint: disable=too-many-branches
74   	
75   	# usefile & filename variables are set in pcs module
76   	usefile = False
77   	filename = ""
78   	# Note: not properly typed
79   	pcs_options: Dict[Any, Any] = {}
80   	
81   	
82   	def _getValidateWithVersion(dom) -> Version:
83   	    """
84   	    Commandline options: no options
85   	    """
86   	    cib = dom.getElementsByTagName("cib")
87   	    if len(cib) != 1:
88   	        err("Bad cib")
89   	
90   	    cib = cib[0]
91   	
92   	    version = cib.getAttribute("validate-with")
93   	    r = re.compile(r"pacemaker-(\d+)\.(\d+)\.?(\d+)?")
94   	    m = r.match(version)
95   	    if m is None:
96   	        raise AssertionError()
97   	    major = int(m.group(1))
98   	    minor = int(m.group(2))
99   	    rev = int(m.group(3) or 0)
100  	    return Version(major, minor, rev)
101  	
102  	
103  	def isCibVersionSatisfied(cib_dom, required_version: Version) -> bool:
104  	    if not isinstance(cib_dom, DomDocument):
105  	        cib_dom = cib_dom.ownerDocument
106  	    return _getValidateWithVersion(cib_dom) >= required_version
107  	
108  	
109  	# Check the current pacemaker version in cib and upgrade it if necessary
110  	# Returns False if not upgraded and True if upgraded
111  	def _checkAndUpgradeCIB(required_version: Version) -> bool:
112  	    """
113  	    Commandline options:
114  	      * -f - CIB file
115  	    """
116  	    if isCibVersionSatisfied(get_cib_dom(), required_version):
117  	        return False
118  	    cluster_upgrade()
119  	    return True
120  	
121  	
122  	def cluster_upgrade():
123  	    """
124  	    Commandline options:
125  	      * -f - CIB file
126  	    """
127  	    output, retval = run(["cibadmin", "--upgrade", "--force"])
128  	    if retval != 0:
129  	        err("unable to upgrade cluster: %s" % output)
130  	    if (
131  	        output.strip()
132  	        == "Upgrade unnecessary: Schema is already the latest available"
133  	    ):
134  	        return
135  	    print_to_stderr("Cluster CIB has been upgraded to latest version")
136  	
137  	
138  	def cluster_upgrade_to_version(required_version: Version) -> Any:
139  	    """
140  	    Commandline options:
141  	      * -f - CIB file
142  	    """
143  	    _checkAndUpgradeCIB(required_version)
144  	    dom = get_cib_dom()
145  	    current_version = _getValidateWithVersion(dom)
146  	    if current_version < required_version:
147  	        err(
148  	            CibUpgradeFailedToMinimalRequiredVersion(
149  	                str(current_version),
150  	                str(required_version),
151  	            ).message
152  	        )
153  	    return dom
154  	
155  	
156  	# Check status of node
157  	def checkStatus(node):
158  	    """
159  	    Commandline options:
160  	      * --request-timeout - timeout for HTTP requests
161  	    """
162  	    return sendHTTPRequest(
163  	        node, "remote/status", urlencode({"version": "2"}), False, False
164  	    )
165  	
166  	
167  	# Check and see if we're authorized (faster than a status check)
168  	def checkAuthorization(node):
169  	    """
170  	    Commandline options:
171  	      * --request-timeout - timeout for HTTP requests
172  	    """
173  	    return sendHTTPRequest(node, "remote/check_auth", None, False, False)
174  	
175  	
176  	def get_uid_gid_file_name(uid, gid):
177  	    """
178  	    Commandline options: no options
179  	    """
180  	    return "pcs-uidgid-%s-%s" % (uid, gid)
181  	
182  	
183  	# Reads in uid file and returns dict of values {'uid':'theuid', 'gid':'thegid'}
184  	def read_uid_gid_file(uidgid_filename):
185  	    """
186  	    Commandline options: no options
187  	    """
188  	    uidgid = {}
189  	    with open(
190  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "r"
191  	    ) as myfile:
192  	        data = myfile.read().split("\n")
193  	    in_uidgid = False
194  	    for data_line in data:
195  	        line = re.sub(r"#.*", "", data_line)
196  	        if not in_uidgid:
197  	            if re.search(r"uidgid.*{", line):
198  	                in_uidgid = True
199  	            else:
200  	                continue
201  	        matches = re.search(r"uid:\s*(\S+)", line)
202  	        if matches:
203  	            uidgid["uid"] = matches.group(1)
204  	
205  	        matches = re.search(r"gid:\s*(\S+)", line)
206  	        if matches:
207  	            uidgid["gid"] = matches.group(1)
208  	
209  	    return uidgid
210  	
211  	
212  	def get_uidgid_file_content(
213  	    uid: Optional[str] = None, gid: Optional[str] = None
214  	) -> Optional[str]:
215  	    if not uid and not gid:
216  	        return None
217  	    uid_gid_lines = []
218  	    if uid:
219  	        uid_gid_lines.append(f"  uid: {uid}")
220  	    if gid:
221  	        uid_gid_lines.append(f"  gid: {gid}")
222  	    return dedent(
223  	        """\
224  	        uidgid {{
225  	        {uid_gid_keys}
226  	        }}
227  	        """
228  	    ).format(uid_gid_keys="\n".join(uid_gid_lines))
229  	
230  	
231  	def write_uid_gid_file(uid, gid):
232  	    """
233  	    Commandline options: no options
234  	    """
235  	    orig_filename = get_uid_gid_file_name(uid, gid)
236  	    uidgid_filename = orig_filename
237  	    counter = 0
238  	    if find_uid_gid_files(uid, gid):
239  	        err("uidgid file with uid=%s and gid=%s already exists" % (uid, gid))
240  	
241  	    while os.path.exists(
242  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename)
243  	    ):
244  	        counter = counter + 1
245  	        uidgid_filename = orig_filename + "-" + str(counter)
246  	
247  	    data = get_uidgid_file_content(uid, gid)
248  	    if data:
249  	        with open(
250  	            os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "w"
251  	        ) as uidgid_file:
252  	            uidgid_file.write(data)
253  	
254  	
255  	def find_uid_gid_files(uid, gid):
256  	    """
257  	    Commandline options: no options
258  	    """
259  	    if uid == "" and gid == "":
260  	        return []
261  	
262  	    found_files = []
263  	    uid_gid_files = os.listdir(settings.corosync_uidgid_dir)
264  	    for uidgid_file in uid_gid_files:
265  	        uid_gid_dict = read_uid_gid_file(uidgid_file)
266  	        if ("uid" in uid_gid_dict and uid == "") or (
267  	            "uid" not in uid_gid_dict and uid != ""
268  	        ):
269  	            continue
270  	        if ("gid" in uid_gid_dict and gid == "") or (
271  	            "gid" not in uid_gid_dict and gid != ""
272  	        ):
273  	            continue
274  	        if "uid" in uid_gid_dict and uid != uid_gid_dict["uid"]:
275  	            continue
276  	        if "gid" in uid_gid_dict and gid != uid_gid_dict["gid"]:
277  	            continue
278  	
279  	        found_files.append(uidgid_file)
280  	
281  	    return found_files
282  	
283  	
284  	# Removes all uid/gid files with the specified uid/gid, returns false if we
285  	# couldn't find one
286  	def remove_uid_gid_file(uid, gid):
287  	    """
288  	    Commandline options: no options
289  	    """
290  	    if uid == "" and gid == "":
291  	        return False
292  	
293  	    file_removed = False
294  	    for uidgid_file in find_uid_gid_files(uid, gid):
295  	        os.remove(os.path.join(settings.corosync_uidgid_dir, uidgid_file))
296  	        file_removed = True
297  	
298  	    return file_removed
299  	
300  	
301  	@lru_cache()
302  	def read_known_hosts_file() -> dict[str, PcsKnownHost]:
303  	    return read_known_hosts_file_not_cached()
304  	
305  	
306  	def read_known_hosts_file_not_cached() -> dict[str, PcsKnownHost]:
307  	    """
308  	    Commandline options: no options
309  	    """
310  	    try:
311  	        if os.getuid() != 0:
312  	            known_hosts_raw_file = pcs_file.RawFile(
313  	                cli_file_metadata.for_file_type(file_type_codes.PCS_KNOWN_HOSTS)
314  	            )
315  	            # json.loads handles bytes, it expects utf-8, 16 or 32 encoding
316  	            known_hosts_struct = json.loads(known_hosts_raw_file.read())
317  	            # TODO use known hosts facade for getting info from json struct once the
318  	            # facade exists
319  	            return {
320  	                name: PcsKnownHost.from_known_host_file_dict(name, host)
321  	                for name, host in known_hosts_struct["known_hosts"].items()
322  	            }
323  	        # TODO remove
324  	        # This is here to provide known-hosts to functions not yet
325  	        # overhauled to pcs.lib. Cli should never read known hosts from
326  	        # /var/lib/pcsd/.
327  	        known_hosts_instance = LibFileInstance.for_known_hosts()
328  	        known_hosts_facade = cast(
329  	            KnownHostsFacade, known_hosts_instance.read_to_facade()
330  	        )
331  	        return known_hosts_facade.known_hosts
332  	
333  	    except LibraryError as e:
334  	        # TODO remove
335  	        # This is here to provide known-hosts to functions not yet
336  	        # overhauled to pcs.lib. Cli should never read known hosts from
337  	        # /var/lib/pcsd/.
338  	        process_library_reports(list(e.args))
339  	    except ParserErrorException as e:
340  	        # TODO remove
341  	        # This is here to provide known-hosts to functions not yet
342  	        # overhauled to pcs.lib. Cli should never read known hosts from
343  	        # /var/lib/pcsd/.
344  	        process_library_reports(
345  	            known_hosts_instance.parser_exception_to_report_list(e)
346  	        )
347  	    except pcs_file.RawFileError as e:
348  	        reports_output.warn("Unable to read the known-hosts file: " + e.reason)
349  	    except json.JSONDecodeError as e:
350  	        reports_output.warn(f"Unable to parse the known-hosts file: {e}")
351  	    except (TypeError, KeyError):
352  	        reports_output.warn("Warning: Unable to parse the known-hosts file.")
353  	    return {}
354  	
355  	
356  	def repeat_if_timeout(send_http_request_function, repeat_count=15):
357  	    """
358  	    Commandline options: no options
359  	    NOTE: callback send_http_request_function may use --request-timeout
360  	    """
361  	
362  	    def repeater(node, *args, **kwargs):
363  	        repeats_left = repeat_count
364  	        while True:
365  	            retval, output = send_http_request_function(node, *args, **kwargs)
366  	            if (
367  	                retval != 2
368  	                or "Operation timed out" not in output
369  	                or repeats_left < 1
370  	            ):
371  	                # did not timed out OR repeat limit exceeded
372  	                return retval, output
373  	            repeats_left = repeats_left - 1
374  	            if "--debug" in pcs_options:
375  	                print_to_stderr(f"{node}: {output}, trying again...")
376  	
377  	    return repeater
378  	
379  	
380  	# Set the corosync.conf file on the specified node
381  	def getCorosyncConfig(node):
382  	    """
383  	    Commandline options:
384  	      * --request-timeout - timeout for HTTP requests
385  	    """
386  	    return sendHTTPRequest(node, "remote/get_corosync_conf", None, False, False)
387  	
388  	
389  	def setCorosyncConfig(node, config):
390  	    """
391  	    Commandline options:
392  	      * --request-timeout - timeout for HTTP requests
393  	    """
394  	    data = urlencode({"corosync_conf": config})
395  	    (status, data) = sendHTTPRequest(node, "remote/set_corosync_conf", data)
396  	    if status != 0:
397  	        err("Unable to set corosync config: {0}".format(data))
398  	
399  	
400  	def getPacemakerNodeStatus(node):
401  	    """
402  	    Commandline options:
403  	      * --request-timeout - timeout for HTTP requests
404  	    """
405  	    return sendHTTPRequest(
406  	        node, "remote/pacemaker_node_status", None, False, False
407  	    )
408  	
409  	
410  	def startCluster(node, quiet=False, timeout=None):
411  	    """
412  	    Commandline options:
413  	      * --request-timeout - timeout for HTTP requests
414  	    """
415  	    return sendHTTPRequest(
416  	        node,
417  	        "remote/cluster_start",
418  	        printResult=False,
419  	        printSuccess=not quiet,
420  	        timeout=timeout,
421  	    )
422  	
423  	
424  	def stopPacemaker(node, quiet=False, force=True):
425  	    """
426  	    Commandline options:
427  	      * --request-timeout - timeout for HTTP requests
428  	    """
429  	    return stopCluster(
430  	        node, pacemaker=True, corosync=False, quiet=quiet, force=force
431  	    )
432  	
433  	
434  	def stopCorosync(node, quiet=False, force=True):
435  	    """
436  	    Commandline options:
437  	      * --request-timeout - timeout for HTTP requests
438  	    """
439  	    return stopCluster(
440  	        node, pacemaker=False, corosync=True, quiet=quiet, force=force
441  	    )
442  	
443  	
444  	def stopCluster(node, quiet=False, pacemaker=True, corosync=True, force=True):
445  	    """
446  	    Commandline options:
447  	      * --request-timeout - timeout for HTTP requests
448  	    """
449  	    data = {}
450  	    timeout = None
451  	    if pacemaker and not corosync:
452  	        data["component"] = "pacemaker"
453  	        timeout = 2 * 60
454  	    elif corosync and not pacemaker:
455  	        data["component"] = "corosync"
456  	    if force:
457  	        data["force"] = 1
458  	    data = urlencode(data)
459  	    return sendHTTPRequest(
460  	        node,
461  	        "remote/cluster_stop",
462  	        data,
463  	        printResult=False,
464  	        printSuccess=not quiet,
465  	        timeout=timeout,
466  	    )
467  	
468  	
469  	def enableCluster(node):
470  	    """
471  	    Commandline options:
472  	      * --request-timeout - timeout for HTTP requests
473  	    """
474  	    return sendHTTPRequest(node, "remote/cluster_enable", None, False, True)
475  	
476  	
477  	def disableCluster(node):
478  	    """
479  	    Commandline options:
480  	      * --request-timeout - timeout for HTTP requests
481  	    """
482  	    return sendHTTPRequest(node, "remote/cluster_disable", None, False, True)
483  	
484  	
485  	def destroyCluster(node, quiet=False):
486  	    """
487  	    Commandline options:
488  	      * --request-timeout - timeout for HTTP requests
489  	    """
490  	    return sendHTTPRequest(
491  	        node, "remote/cluster_destroy", None, not quiet, not quiet
492  	    )
493  	
494  	
495  	def restoreConfig(node, tarball_data):
496  	    """
497  	    Commandline options:
498  	      * --request-timeout - timeout for HTTP requests
499  	    """
500  	    data = urlencode({"tarball": tarball_data})
501  	    return sendHTTPRequest(node, "remote/config_restore", data, False, True)
502  	
503  	
504  	def pauseConfigSyncing(node, delay_seconds=300):
505  	    """
506  	    Commandline options:
507  	      * --request-timeout - timeout for HTTP requests
508  	    """
509  	    data = urlencode({"sync_thread_pause": delay_seconds})
510  	    return sendHTTPRequest(node, "remote/set_sync_options", data, False, False)
511  	
512  	
513  	# Send an HTTP request to a node return a tuple with status, data
514  	# If status is 0 then data contains server response
515  	# Otherwise if non-zero then data contains error message
516  	# Returns a tuple (error, error message)
517  	# 0 = Success,
518  	# 1 = HTTP Error
519  	# 2 = No response,
520  	# 3 = Auth Error
521  	# 4 = Permission denied
522  	def sendHTTPRequest(  # noqa: PLR0912, PLR0915
523  	    host, request, data=None, printResult=True, printSuccess=True, timeout=None
524  	):
525  	    """
526  	    Commandline options:
527  	      * --request-timeout - timeout for HTTP requests
528  	      * --debug
529  	    """
530  	    # pylint: disable=too-many-locals
531  	    # pylint: disable=too-many-statements
532  	    port = None
533  	    addr = host
534  	    token = None
535  	    known_host = read_known_hosts_file().get(host, None)
536  	    # TODO: do not allow communication with unknown host
537  	    if known_host:
538  	        port = known_host.dest.port
539  	        addr = known_host.dest.addr
540  	        token = known_host.token
541  	    if port is None:
542  	        port = settings.pcsd_default_port
543  	    url = "https://{host}:{port}/{request}".format(
544  	        host="[{0}]".format(addr) if ":" in addr else addr,
545  	        request=request,
546  	        port=port,
547  	    )
548  	    if "--debug" in pcs_options:
549  	        print_to_stderr(f"Sending HTTP Request to: {url}\nData: {data}")
550  	
551  	    def __debug_callback(data_type, debug_data):
552  	        prefixes = {
553  	            # pylint: disable=no-member
554  	            pycurl.DEBUG_TEXT: b"* ",
555  	            pycurl.DEBUG_HEADER_IN: b"< ",
556  	            pycurl.DEBUG_HEADER_OUT: b"> ",
557  	            pycurl.DEBUG_DATA_IN: b"<< ",
558  	            pycurl.DEBUG_DATA_OUT: b">> ",
559  	        }
560  	        if data_type in prefixes:
561  	            debug_output.write(prefixes[data_type])
562  	            debug_output.write(debug_data)
563  	            if not debug_data.endswith(b"\n"):
564  	                debug_output.write(b"\n")
565  	
566  	    output = BytesIO()
567  	    debug_output = BytesIO()
568  	    cookies = __get_cookie_list(token)
569  	    if not timeout:
570  	        timeout = settings.default_request_timeout
571  	    timeout = pcs_options.get("--request-timeout", timeout)
572  	
573  	    handler = pycurl.Curl()
574  	    handler.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTPS)
575  	    handler.setopt(pycurl.URL, url.encode("utf-8"))
576  	    handler.setopt(pycurl.WRITEFUNCTION, output.write)
577  	    handler.setopt(pycurl.VERBOSE, 1)
578  	    handler.setopt(pycurl.NOSIGNAL, 1)  # required for multi-threading
579  	    handler.setopt(pycurl.DEBUGFUNCTION, __debug_callback)
580  	    handler.setopt(pycurl.TIMEOUT_MS, int(timeout * 1000))
581  	    handler.setopt(pycurl.SSL_VERIFYHOST, 0)
582  	    handler.setopt(pycurl.SSL_VERIFYPEER, 0)
583  	    handler.setopt(pycurl.HTTPHEADER, ["Expect: "])
584  	    if cookies:
585  	        handler.setopt(pycurl.COOKIE, ";".join(cookies).encode("utf-8"))
586  	    if data:
587  	        handler.setopt(pycurl.COPYPOSTFIELDS, data.encode("utf-8"))
588  	    try:
589  	        handler.perform()
590  	        response_data = output.getvalue().decode("utf-8")
591  	        response_code = handler.getinfo(pycurl.RESPONSE_CODE)
592  	        if printResult or printSuccess:
593  	            print_to_stderr(host + ": " + response_data.strip())
594  	        if "--debug" in pcs_options:
595  	            print_to_stderr(
596  	                "Response Code: {response_code}\n"
597  	                "--Debug Response Start--\n"
598  	                "{response_data}\n"
599  	                "--Debug Response End--\n"
600  	                "Communication debug info for calling: {url}\n"
601  	                "--Debug Communication Output Start--\n"
602  	                "{debug_comm_output}\n"
603  	                "--Debug Communication Output End--".format(
604  	                    response_code=response_code,
605  	                    response_data=response_data,
606  	                    url=url,
607  	                    debug_comm_output=debug_output.getvalue().decode(
608  	                        "utf-8", "ignore"
609  	                    ),
610  	                )
611  	            )
612  	
613  	        if response_code == 401:
614  	            output = (
615  	                3,
616  	                (
617  	                    "Unable to authenticate to {node} - (HTTP error: {code}), "
618  	                    "try running 'pcs host auth {node}'"
619  	                ).format(node=host, code=response_code),
620  	            )
621  	        elif response_code == 403:
622  	            output = (
623  	                4,
624  	                "{node}: Permission denied - (HTTP error: {code})".format(
625  	                    node=host, code=response_code
626  	                ),
627  	            )
628  	        elif response_code >= 400:
629  	            output = (
630  	                1,
631  	                "Error connecting to {node} - (HTTP error: {code})".format(
632  	                    node=host, code=response_code
633  	                ),
634  	            )
635  	        else:
636  	            output = (0, response_data)
637  	
638  	        if printResult and output[0] != 0:
639  	            print_to_stderr(output[1])
640  	
641  	        return output
642  	    except pycurl.error as e:
643  	        if is_proxy_set(os.environ):
644  	            reports_output.warn(
645  	                "Proxy is set in environment variables, try disabling it"
646  	            )
647  	        # pylint: disable=unbalanced-tuple-unpacking
648  	        dummy_errno, reason = e.args
649  	        if "--debug" in pcs_options:
650  	            print_to_stderr(f"Response Reason: {reason}")
651  	        msg = (
652  	            "Unable to connect to {host}, check if pcsd is running there or try "
653  	            "setting higher timeout with --request-timeout option ({reason})"
654  	        ).format(host=host, reason=reason)
655  	        if printResult:
656  	            print_to_stderr(msg)
657  	        return (2, msg)
658  	
659  	
660  	def __get_cookie_list(token):
661  	    """
662  	    Commandline options: no options
663  	    """
664  	    cookies = []
665  	    if token:
666  	        cookies.append("token=" + token)
667  	    if os.geteuid() == 0:
668  	        for name in ("CIB_user", "CIB_user_groups"):
669  	            if name in os.environ and os.environ[name].strip():
670  	                value = os.environ[name].strip()
671  	                # Let's be safe about characters in env variables and do base64.
672  	                # We cannot do it for CIB_user however to be backward compatible
673  	                # so we at least remove disallowed characters.
674  	                if name == "CIB_user":
675  	                    value = re.sub(r"[^!-~]", "", value).replace(";", "")
676  	                else:
677  	                    # python3 requires the value to be bytes not str
678  	                    value = base64.b64encode(value.encode("utf8")).decode(
679  	                        "utf-8"
680  	                    )
681  	                cookies.append("{0}={1}".format(name, value))
682  	    return cookies
683  	
684  	
685  	def get_corosync_conf_facade(conf_text=None):
686  	    """
687  	    Commandline options:
688  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
689  	        settings
690  	    """
691  	    try:
692  	        return corosync_conf_facade(
693  	            corosync_conf_parser.Parser.parse(
694  	                (getCorosyncConf() if conf_text is None else conf_text).encode(
695  	                    "utf-8"
696  	                )
697  	            )
698  	        )
699  	    except corosync_conf_parser.CorosyncConfParserException as e:
700  	        return err("Unable to parse corosync.conf: %s" % e)
701  	
702  	
703  	def getNodeAttributesFromPacemaker():
704  	    """
705  	    Commandline options: no options
706  	    """
707  	    try:
708  	        return [
709  	            node.attrs
710  	            for node in ClusterState(
711  	                get_cluster_status_dom(cmd_runner())
712  	            ).node_section.nodes
713  	        ]
714  	    except LibraryError as e:
715  	        return process_library_reports(e.args)
716  	
717  	
718  	def hasCorosyncConf():
719  	    """
720  	    Commandline options:
721  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
722  	        settings
723  	    """
724  	    return os.path.isfile(settings.corosync_conf_file)
725  	
726  	
727  	def getCorosyncConf():
728  	    """
729  	    Commandline options:
730  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
731  	        settings
732  	    """
733  	    corosync_conf_content = None
734  	    try:
735  	        with open(
736  	            settings.corosync_conf_file, "r", encoding="utf-8"
737  	        ) as corosync_conf_file:
738  	            corosync_conf_content = corosync_conf_file.read()
739  	    except IOError as e:
740  	        err("Unable to read %s: %s" % (settings.corosync_conf_file, e.strerror))
741  	    return corosync_conf_content
742  	
743  	
744  	def reloadCorosync():
745  	    """
746  	    Commandline options: no options
747  	    """
748  	    output, retval = run(["corosync-cfgtool", "-R"])
749  	    return output, retval
750  	
751  	
752  	def getCorosyncActiveNodes():
753  	    """
754  	    Commandline options: no options
755  	    """
756  	    output, retval = run(["corosync-cmapctl"])
757  	    if retval != 0:
758  	        return []
759  	
760  	    nodename_re = re.compile(r"^nodelist\.node\.(\d+)\.name .*= (.*)", re.M)
761  	    nodestatus_re = re.compile(
762  	        r"^runtime\.members\.(\d+).status .*= (.*)", re.M
763  	    )
764  	    nodenameid_mapping_re = re.compile(
765  	        r"nodelist\.node\.(\d+)\.nodeid .*= (\d+)", re.M
766  	    )
767  	
768  	    node_names = nodename_re.findall(output)
769  	
770  	    index_to_id = dict(nodenameid_mapping_re.findall(output))
771  	    id_to_status = dict(nodestatus_re.findall(output))
772  	
773  	    node_status = {}
774  	    for index, node_name in node_names:
775  	        if index in index_to_id:
776  	            nodeid = index_to_id[index]
777  	            if nodeid in id_to_status:
778  	                node_status[node_name] = id_to_status[nodeid]
779  	        else:
780  	            print_to_stderr(f"Error mapping {node_name}")
781  	
782  	    nodes_active = []
783  	    for node, status in node_status.items():
784  	        if status == "joined":
785  	            nodes_active.append(node)
786  	
787  	    return nodes_active
788  	
789  	
790  	# is it needed to handle corosync-qdevice service when managing cluster services
791  	def need_to_handle_qdevice_service():
792  	    """
793  	    Commandline options: no options
794  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
795  	        settings but it doesn't make sense for contexts in which this function
796  	        is used
797  	    """
798  	    try:
799  	        with open(settings.corosync_conf_file, "rb") as corosync_conf_file:
800  	            return (
801  	                corosync_conf_facade(
802  	                    corosync_conf_parser.Parser.parse(corosync_conf_file.read())
803  	                ).get_quorum_device_model()
804  	                is not None
805  	            )
806  	    except (EnvironmentError, corosync_conf_parser.CorosyncConfParserException):
807  	        # corosync.conf not present or not valid => no qdevice specified
808  	        return False
809  	
810  	
811  	# Restore default behavior before starting subprocesses
812  	def subprocess_setup():
813  	    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
814  	
815  	
816  	def touch_cib_file(cib_filename):
817  	    if not os.path.isfile(cib_filename):
818  	        try:
819  	            write_empty_cib(cib_filename)
820  	        except EnvironmentError as e:
821  	            err(
822  	                "Unable to write to file: '{0}': '{1}'".format(
823  	                    cib_filename, str(e)
824  	                )
825  	            )
826  	
827  	
828  	# Run command, with environment and return (output, retval)
829  	# DEPRECATED, please use lib.external.CommandRunner via utils.cmd_runner()
830  	def run(
831  	    args,
832  	    ignore_stderr=False,
833  	    string_for_stdin=None,
834  	    env_extend=None,
835  	    binary_output=False,
836  	):
837  	    """
838  	    Commandline options:
839  	      * -f - CIB file (effective only for some pacemaker tools)
840  	      * --debug
841  	    """
842  	    if not env_extend:
843  	        env_extend = {}
844  	    env_var = env_extend
845  	    env_var.update(dict(os.environ))
846  	    env_var["LC_ALL"] = "C"
847  	    if usefile:
848  	        env_var["CIB_file"] = filename
849  	        touch_cib_file(filename)
850  	
851  	    command = args[0]
852  	    if command[0:3] == "crm" or command in [
853  	        "cibadmin",
854  	        "iso8601",
855  	        "stonith_admin",
856  	    ]:
857  	        args[0] = os.path.join(settings.pacemaker_execs, command)
858  	    elif command[0:8] == "corosync":
859  	        args[0] = os.path.join(settings.corosync_execs, command)
860  	
861  	    try:
862  	        if "--debug" in pcs_options:
863  	            print_to_stderr("Running: " + " ".join(args))
864  	            if string_for_stdin:
865  	                print_to_stderr(
866  	                    f"--Debug Input Start--\n"
867  	                    f"{string_for_stdin}\n"
868  	                    f"--Debug Input End--"
869  	                )
870  	
871  	        # Some commands react differently if you give them anything via stdin
872  	        if string_for_stdin is not None:
873  	            stdin_pipe = subprocess.PIPE
874  	        else:
875  	            stdin_pipe = subprocess.DEVNULL
876  	
877  	        # pylint: disable=subprocess-popen-preexec-fn, consider-using-with
878  	        p = subprocess.Popen(
879  	            args,
880  	            stdin=stdin_pipe,
881  	            stdout=subprocess.PIPE,
882  	            stderr=(subprocess.PIPE if ignore_stderr else subprocess.STDOUT),
883  	            preexec_fn=subprocess_setup,  # noqa: PLW1509
884  	            close_fds=True,
885  	            env=env_var,
886  	            # decodes newlines and in python3 also converts bytes to str
887  	            universal_newlines=(not binary_output),
888  	        )
889  	        output, dummy_stderror = p.communicate(string_for_stdin)
890  	        retval = p.returncode
891  	        if "--debug" in pcs_options:
892  	            print_to_stderr(
893  	                "Return Value: {retval}\n"
894  	                "--Debug Output Start--\n"
895  	                "{debug_output}\n"
896  	                "--Debug Output End--".format(
897  	                    retval=retval,
898  	                    debug_output=output.rstrip(),
899  	                )
900  	            )
901  	    except OSError as e:
902  	        print_to_stderr(e.strerror)
903  	        err("unable to locate command: " + args[0])
904  	
905  	    return output, retval
906  	
907  	
908  	def cmd_runner(cib_file_override=None):
909  	    """
910  	    Commandline options:
911  	      * -f - CIB file
912  	    """
913  	    env_vars = {}
914  	    if usefile:
915  	        env_vars["CIB_file"] = filename
916  	    if cib_file_override:
917  	        env_vars["CIB_file"] = cib_file_override
918  	    env_vars.update(os.environ)
919  	    env_vars["LC_ALL"] = "C"
920  	    return CommandRunner(
921  	        logging.getLogger("pcs"), get_report_processor(), env_vars
922  	    )
923  	
924  	
925  	def run_pcsdcli(command, data=None):
926  	    """
927  	    Commandline options:
928  	      * --request-timeout - timeout for HTTP request, applicable for commands:
929  	        * remove_known_hosts - only when running on cluster node (sync will
930  	            be initiated)
931  	        * auth
932  	        * send_local_configs
933  	    """
934  	    if not data:
935  	        data = {}
936  	    env_var = {}
937  	    if "--debug" in pcs_options:
938  	        env_var["PCSD_DEBUG"] = "true"
939  	    if "--request-timeout" in pcs_options:
940  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(pcs_options["--request-timeout"])
941  	    else:
942  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(settings.default_request_timeout)
943  	    pcsd_dir_path = settings.pcsd_exec_location
944  	    pcsdcli_path = os.path.join(pcsd_dir_path, "pcsd-cli.rb")
945  	    if settings.pcsd_gem_path is not None:
946  	        env_var["GEM_HOME"] = settings.pcsd_gem_path
947  	    stdout, dummy_stderr, retval = cmd_runner().run(
948  	        [settings.ruby_exec, "-I" + pcsd_dir_path, pcsdcli_path, command],
949  	        json.dumps(data),
950  	        env_var,
951  	    )
952  	    try:
953  	        output_json = json.loads(stdout)
954  	        for key in ["status", "text", "data"]:
955  	            if key not in output_json:
956  	                output_json[key] = None
957  	
958  	        output = "".join(output_json["log"])
959  	        # check if some requests timed out, if so print message about it
960  	        if "error: operation_timedout" in output:
961  	            print_to_stderr("Error: Operation timed out")
962  	        # check if there are any connection failures due to proxy in pcsd and
963  	        # print warning if so
964  	        proxy_msg = "Proxy is set in environment variables, try disabling it"
965  	        if proxy_msg in output:
966  	            reports_output.warn(proxy_msg)
967  	
968  	    except ValueError:
969  	        output_json = {
970  	            "status": "bad_json_output",
971  	            "text": stdout,
972  	            "data": None,
973  	        }
974  	    return output_json, retval
975  	
976  	
977  	def call_local_pcsd(argv, options, std_in=None):  # noqa: PLR0911
978  	    """
979  	    Commandline options:
980  	      * --request-timeout - timeout of call to local pcsd
981  	    """
982  	    # pylint: disable=too-many-return-statements
983  	    # some commands cannot be run under a non-root account
984  	    # so we pass those commands to locally running pcsd to execute them
985  	    # returns [list_of_errors, exit_code, stdout, stderr]
986  	    data = {
987  	        "command": json.dumps(argv),
988  	        "options": json.dumps(options),
989  	    }
990  	    if std_in:
991  	        data["stdin"] = std_in
992  	    data_send = urlencode(data)
993  	    code, output = sendHTTPRequest(
994  	        "localhost", "run_pcs", data_send, False, False
995  	    )
996  	
997  	    if code == 3:  # not authenticated
998  	        return [
999  	            [
1000 	                "Unable to authenticate against the local pcsd. Run the same "
1001 	                "command as root or authenticate yourself to the local pcsd "
1002 	                "using command 'pcs client local-auth'"
1003 	            ],
1004 	            1,
1005 	            "",
1006 	            "",
1007 	        ]
1008 	    if code != 0:  # http error connecting to localhost
1009 	        return [[output], 1, "", ""]
1010 	
1011 	    try:
1012 	        output_json = json.loads(output)
1013 	        for key in ["status", "data"]:
1014 	            if key not in output_json:
1015 	                output_json[key] = None
1016 	    except ValueError:
1017 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1018 	    if output_json["status"] == "bad_command":
1019 	        return [["Command not allowed"], 1, "", ""]
1020 	    if output_json["status"] == "access_denied":
1021 	        return [["Access denied"], 1, "", ""]
1022 	    if output_json["status"] != "ok" or not output_json["data"]:
1023 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1024 	    try:
1025 	        exitcode = output_json["data"]["code"]
1026 	        std_out = output_json["data"]["stdout"]
1027 	        std_err = output_json["data"]["stderr"]
1028 	        return [[], exitcode, std_out, std_err]
1029 	    except KeyError:
1030 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1031 	
1032 	
1033 	def map_for_error_list(callab, iterab):
1034 	    """
1035 	    Commandline options: no options
1036 	    NOTE: callback 'callab' may use some options
1037 	    """
1038 	    error_list = []
1039 	    for item in iterab:
1040 	        retval, error = callab(item)
1041 	        if retval != 0:
1042 	            error_list.append(error)
1043 	    return error_list
1044 	
1045 	
1046 	def run_parallel(worker_list, wait_seconds=1):
1047 	    """
1048 	    Commandline options: no options
1049 	    """
1050 	    thread_list = set()
1051 	    for worker in worker_list:
1052 	        thread = threading.Thread(target=worker)
1053 	        thread.daemon = True
1054 	        thread.start()
1055 	        thread_list.add(thread)
1056 	
1057 	    while thread_list:
1058 	        thread = thread_list.pop()
1059 	        thread.join(wait_seconds)
1060 	        if thread.is_alive():
1061 	            thread_list.add(thread)
1062 	
1063 	
1064 	def create_task(report, action, node, *args, **kwargs):
1065 	    """
1066 	    Commandline options: no options
1067 	    """
1068 	
1069 	    def worker():
1070 	        returncode, output = action(node, *args, **kwargs)
1071 	        report(node, returncode, output)
1072 	
1073 	    return worker
1074 	
1075 	
1076 	def create_task_list(report, action, node_list, *args, **kwargs):
1077 	    """
1078 	    Commandline options: no options
1079 	    """
1080 	    return [
1081 	        create_task(report, action, node, *args, **kwargs) for node in node_list
1082 	    ]
1083 	
1084 	
1085 	def parallel_for_nodes(action, node_list, *args, **kwargs):
1086 	    """
1087 	    Commandline options: no options
1088 	    NOTE: callback 'action' may use some cmd options
1089 	    """
1090 	    node_errors = {}
1091 	
1092 	    def report(node, returncode, output):
1093 	        message = "{0}: {1}".format(node, output.strip())
1094 	        print_to_stderr(message)
1095 	        if returncode != 0:
1096 	            node_errors[node] = message
1097 	
1098 	    run_parallel(create_task_list(report, action, node_list, *args, **kwargs))
1099 	    return node_errors
1100 	
1101 	
1102 	def get_group_children(group_id):
1103 	    """
1104 	    Commandline options: no options
1105 	    """
1106 	    return dom_get_group_children(get_cib_dom(), group_id)
1107 	
1108 	
1109 	def dom_get_group_children(dom, group_id):
1110 	    groups = dom.getElementsByTagName("group")
1111 	    for g in groups:
1112 	        if g.getAttribute("id") == group_id:
1113 	            return [
1114 	                child_el.getAttribute("id")
1115 	                for child_el in get_group_children_el_from_el(g)
1116 	            ]
1117 	    return []
1118 	
1119 	
1120 	def get_group_children_el_from_el(group_el):
1121 	    child_resources = []
1122 	    for child in group_el.childNodes:
1123 	        if child.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1124 	            continue
1125 	        if child.tagName == "primitive":
1126 	            child_resources.append(child)
1127 	    return child_resources
1128 	
1129 	
1130 	def dom_get_clone_ms_resource(dom, clone_ms_id):
1131 	    """
1132 	    Commandline options: no options
1133 	    """
1134 	    clone_ms = dom_get_clone(dom, clone_ms_id) or dom_get_master(
1135 	        dom, clone_ms_id
1136 	    )
1137 	    if clone_ms:
1138 	        return dom_elem_get_clone_ms_resource(clone_ms)
1139 	    return None
1140 	
1141 	
1142 	def dom_elem_get_clone_ms_resource(clone_ms):
1143 	    """
1144 	    Commandline options: no options
1145 	    """
1146 	    for child in clone_ms.childNodes:
1147 	        if (
1148 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1149 	            and child.tagName in ["group", "primitive"]
1150 	        ):
1151 	            return child
1152 	    return None
1153 	
1154 	
1155 	def dom_get_resource_clone_ms_parent(dom, resource_id):
1156 	    """
1157 	    Commandline options: no options
1158 	    """
1159 	    resource = dom_get_resource(dom, resource_id) or dom_get_group(
1160 	        dom, resource_id
1161 	    )
1162 	    if resource:
1163 	        return dom_get_parent_by_tag_names(resource, ["clone", "master"])
1164 	    return None
1165 	
1166 	
1167 	def dom_get_resource_bundle_parent(dom, resource_id):
1168 	    """
1169 	    Commandline options: no options
1170 	    """
1171 	    resource = dom_get_resource(dom, resource_id)
1172 	    if resource:
1173 	        return dom_get_parent_by_tag_names(resource, ["bundle"])
1174 	    return None
1175 	
1176 	
1177 	def dom_get_master(dom, master_id):
1178 	    """
1179 	    Commandline options: no options
1180 	    """
1181 	    for master in dom.getElementsByTagName("master"):
1182 	        if master.getAttribute("id") == master_id:
1183 	            return master
1184 	    return None
1185 	
1186 	
1187 	def dom_get_clone(dom, clone_id):
1188 	    """
1189 	    Commandline options: no options
1190 	    """
1191 	    for clone in dom.getElementsByTagName("clone"):
1192 	        if clone.getAttribute("id") == clone_id:
1193 	            return clone
1194 	    return None
1195 	
1196 	
1197 	def dom_get_group(dom, group_id):
1198 	    """
1199 	    Commandline options: no options
1200 	    """
1201 	    for group in dom.getElementsByTagName("group"):
1202 	        if group.getAttribute("id") == group_id:
1203 	            return group
1204 	    return None
1205 	
1206 	
1207 	def dom_get_bundle(dom, bundle_id):
1208 	    """
1209 	    Commandline options: no options
1210 	    """
1211 	    for bundle in dom.getElementsByTagName("bundle"):
1212 	        if bundle.getAttribute("id") == bundle_id:
1213 	            return bundle
1214 	    return None
1215 	
1216 	
1217 	def dom_get_resource_bundle(bundle_el):
1218 	    """
1219 	    Commandline options: no options
1220 	    """
1221 	    for child in bundle_el.childNodes:
1222 	        if (
1223 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1224 	            and child.tagName == "primitive"
1225 	        ):
1226 	            return child
1227 	    return None
1228 	
1229 	
1230 	def dom_get_group_clone(dom, group_id):
1231 	    """
1232 	    Commandline options: no options
1233 	    """
1234 	    for clone in dom.getElementsByTagName("clone"):
1235 	        group = dom_get_group(clone, group_id)
1236 	        if group:
1237 	            return group
1238 	    return None
1239 	
1240 	
1241 	def dom_get_group_masterslave(dom, group_id):
1242 	    """
1243 	    Commandline options: no options
1244 	    """
1245 	    for master in dom.getElementsByTagName("master"):
1246 	        group = dom_get_group(master, group_id)
1247 	        if group:
1248 	            return group
1249 	    return None
1250 	
1251 	
1252 	def dom_get_resource(dom, resource_id):
1253 	    """
1254 	    Commandline options: no options
1255 	    """
1256 	    for primitive in dom.getElementsByTagName("primitive"):
1257 	        if primitive.getAttribute("id") == resource_id:
1258 	            return primitive
1259 	    return None
1260 	
1261 	
1262 	def dom_get_any_resource(dom, resource_id):
1263 	    """
1264 	    Commandline options: no options
1265 	    """
1266 	    return (
1267 	        dom_get_resource(dom, resource_id)
1268 	        or dom_get_group(dom, resource_id)
1269 	        or dom_get_clone(dom, resource_id)
1270 	        or dom_get_master(dom, resource_id)
1271 	    )
1272 	
1273 	
1274 	def dom_get_resource_clone(dom, resource_id):
1275 	    """
1276 	    Commandline options: no options
1277 	    """
1278 	    for clone in dom.getElementsByTagName("clone"):
1279 	        resource = dom_get_resource(clone, resource_id)
1280 	        if resource:
1281 	            return resource
1282 	    return None
1283 	
1284 	
1285 	def dom_get_resource_masterslave(dom, resource_id):
1286 	    """
1287 	    Commandline options: no options
1288 	    """
1289 	    for master in dom.getElementsByTagName("master"):
1290 	        resource = dom_get_resource(master, resource_id)
1291 	        if resource:
1292 	            return resource
1293 	    return None
1294 	
1295 	
1296 	# returns tuple (is_valid, error_message, correct_resource_id_if_exists)
1297 	# there is a duplicate code in pcs/lib/cib/constraint/constraint.py
1298 	# please use function in pcs/lib/cib/constraint/constraint.py
1299 	def validate_constraint_resource(dom, resource_id):  # noqa: PLR0911
1300 	    """
1301 	    Commandline options:
1302 	      * --force - allow constraint on any resource
1303 	    """
1304 	    # pylint: disable=too-many-return-statements
1305 	    resource_el = (
1306 	        dom_get_clone(dom, resource_id)
1307 	        or dom_get_master(dom, resource_id)
1308 	        or dom_get_bundle(dom, resource_id)
1309 	    )
1310 	    if resource_el:
1311 	        # clones, masters and bundles are always valid
1312 	        return True, "", resource_id
1313 	
1314 	    resource_el = dom_get_resource(dom, resource_id) or dom_get_group(
1315 	        dom, resource_id
1316 	    )
1317 	    if not resource_el:
1318 	        return False, "Resource '%s' does not exist" % resource_id, None
1319 	
1320 	    clone_el = dom_get_resource_clone_ms_parent(
1321 	        dom, resource_id
1322 	    ) or dom_get_resource_bundle_parent(dom, resource_id)
1323 	    if not clone_el:
1324 	        # a primitive and a group is valid if not in a clone nor a master nor a
1325 	        # bundle
1326 	        return True, "", resource_id
1327 	
1328 	    if "--force" in pcs_options:
1329 	        return True, "", clone_el.getAttribute("id")
1330 	
1331 	    if clone_el.tagName in ["clone", "master"]:
1332 	        return (
1333 	            False,
1334 	            "%s is a clone resource, you should use the clone id: %s "
1335 	            "when adding constraints. Use --force to override."
1336 	            % (resource_id, clone_el.getAttribute("id")),
1337 	            clone_el.getAttribute("id"),
1338 	        )
1339 	    if clone_el.tagName == "bundle":
1340 	        return (
1341 	            False,
1342 	            "%s is a bundle resource, you should use the bundle id: %s "
1343 	            "when adding constraints. Use --force to override."
1344 	            % (resource_id, clone_el.getAttribute("id")),
1345 	            clone_el.getAttribute("id"),
1346 	        )
1347 	    return True, "", resource_id
1348 	
1349 	
1350 	def validate_resources_not_in_same_group(dom, resource_id1, resource_id2):
1351 	    resource_el1 = dom_get_resource(dom, resource_id1)
1352 	    resource_el2 = dom_get_resource(dom, resource_id2)
1353 	    if not resource_el1 or not resource_el2:
1354 	        # Only primitive resources can be in a group. If at least one of the
1355 	        # resources is not a primitive (resource_el is None), then the
1356 	        # resources are not in the same group.
1357 	        return True
1358 	    group1 = dom_get_parent_by_tag_names(resource_el1, ["group"])
1359 	    group2 = dom_get_parent_by_tag_names(resource_el2, ["group"])
1360 	    if not group1 or not group2:
1361 	        return True
1362 	    return group1 != group2
1363 	
1364 	
1365 	def dom_get_resource_remote_node_name(dom_resource):
1366 	    """
1367 	    Commandline options: no options
1368 	    """
1369 	    if dom_resource.tagName != "primitive":
1370 	        return None
1371 	    if (
1372 	        dom_resource.getAttribute("class").lower() == "ocf"
1373 	        and dom_resource.getAttribute("provider").lower() == "pacemaker"
1374 	        and dom_resource.getAttribute("type").lower() == "remote"
1375 	    ):
1376 	        return dom_resource.getAttribute("id")
1377 	    return dom_get_meta_attr_value(dom_resource, "remote-node")
1378 	
1379 	
1380 	def dom_get_meta_attr_value(dom_resource, meta_name):
1381 	    """
1382 	    Commandline options: no options
1383 	    """
1384 	    for meta in dom_resource.getElementsByTagName("meta_attributes"):
1385 	        for nvpair in meta.getElementsByTagName("nvpair"):
1386 	            if nvpair.getAttribute("name") == meta_name:
1387 	                return nvpair.getAttribute("value")
1388 	    return None
1389 	
1390 	
1391 	def dom_get_node(dom, node_name):
1392 	    """
1393 	    Commandline options: no options
1394 	    """
1395 	    for e in dom.getElementsByTagName("node"):
1396 	        if e.hasAttribute("uname") and e.getAttribute("uname") == node_name:
1397 	            return e
1398 	    return None
1399 	
1400 	
1401 	def _dom_get_children_by_tag_name(dom_el, tag_name):
1402 	    """
1403 	    Commandline options: no options
1404 	    """
1405 	    return [
1406 	        node
1407 	        for node in dom_el.childNodes
1408 	        if node.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1409 	        and node.tagName == tag_name
1410 	    ]
1411 	
1412 	
1413 	def dom_get_parent_by_tag_names(dom_el, tag_names):
1414 	    """
1415 	    Commandline options: no options
1416 	    """
1417 	    parent = dom_el.parentNode
1418 	    while parent:
1419 	        if not isinstance(parent, xml.dom.minidom.Element):
1420 	            return None
1421 	        if parent.tagName in tag_names:
1422 	            return parent
1423 	        parent = parent.parentNode
1424 	    return None
1425 	
1426 	
1427 	# moved to pcs.lib.pacemaker.state
1428 	def get_resource_for_running_check(cluster_state, resource_id, stopped=False):
1429 	    """
1430 	    Commandline options: no options
1431 	    """
1432 	
1433 	    def _isnum(value):
1434 	        return all(char in list("0123456789") for char in value)
1435 	
1436 	    # pylint: disable=too-many-nested-blocks
1437 	    for clone in cluster_state.getElementsByTagName("clone"):
1438 	        if clone.getAttribute("id") == resource_id:
1439 	            for child in clone.childNodes:
1440 	                if child.nodeType == child.ELEMENT_NODE and child.tagName in [
1441 	                    "resource",
1442 	                    "group",
1443 	                ]:
1444 	                    resource_id = child.getAttribute("id")
1445 	                    # in a clone, a resource can have an id of '<name>:N'
1446 	                    if ":" in resource_id:
1447 	                        parts = resource_id.rsplit(":", 1)
1448 	                        if _isnum(parts[1]):
1449 	                            resource_id = parts[0]
1450 	                    break
1451 	    for group in cluster_state.getElementsByTagName("group"):
1452 	        # If resource is a clone it can have an id of '<resource name>:N'
1453 	        if group.getAttribute("id") == resource_id or group.getAttribute(
1454 	            "id"
1455 	        ).startswith(resource_id + ":"):
1456 	            if stopped:
1457 	                elem = group.getElementsByTagName("resource")[0]
1458 	            else:
1459 	                elem = group.getElementsByTagName("resource")[-1]
1460 	            resource_id = elem.getAttribute("id")
1461 	    return resource_id
1462 	
1463 	
1464 	# moved to pcs.lib.pacemaker.state
1465 	# see pcs.lib.commands.resource for usage
1466 	def resource_running_on(resource, passed_state=None, stopped=False):
1467 	    """
1468 	    Commandline options:
1469 	      * -f - has effect but doesn't make sense to check state of resource
1470 	    """
1471 	    # pylint: disable=too-many-locals
1472 	    nodes_started = []
1473 	    nodes_promoted = []
1474 	    nodes_unpromoted = []
1475 	    state = passed_state if passed_state else getClusterState()
1476 	    resource_original = resource
1477 	    resource = get_resource_for_running_check(state, resource, stopped)
1478 	    resources = state.getElementsByTagName("resource")
1479 	    for res in resources:
1480 	        # If resource is a clone it can have an id of '<resource name>:N'
1481 	        # If resource is a clone it will be found more than once - cannot break
1482 	        if (
1483 	            res.getAttribute("id") == resource
1484 	            or res.getAttribute("id").startswith(resource + ":")
1485 	        ) and res.getAttribute("failed") != "true":
1486 	            for node in res.getElementsByTagName("node"):
1487 	                node_name = node.getAttribute("name")
1488 	                role = res.getAttribute("role")
1489 	                if role == const.PCMK_ROLE_STARTED:
1490 	                    nodes_started.append(node_name)
1491 	                elif role in (
1492 	                    const.PCMK_ROLE_PROMOTED,
1493 	                    const.PCMK_ROLE_PROMOTED_LEGACY,
1494 	                ):
1495 	                    nodes_promoted.append(node_name)
1496 	                elif role in (
1497 	                    const.PCMK_ROLE_UNPROMOTED,
1498 	                    const.PCMK_ROLE_UNPROMOTED_LEGACY,
1499 	                ):
1500 	                    nodes_unpromoted.append(node_name)
1501 	    if not nodes_started and not nodes_promoted and not nodes_unpromoted:
1502 	        message = "Resource '%s' is not running on any node" % resource_original
1503 	    else:
1504 	        message_parts = []
1505 	        for alist, label in (
1506 	            (nodes_started, "running"),
1507 	            (nodes_promoted, str(const.PCMK_ROLE_PROMOTED).lower()),
1508 	            (nodes_unpromoted, str(const.PCMK_ROLE_UNPROMOTED).lower()),
1509 	        ):
1510 	            if alist:
1511 	                alist.sort()
1512 	                message_parts.append(
1513 	                    "%s on node%s %s"
1514 	                    % (label, "s" if len(alist) > 1 else "", ", ".join(alist))
1515 	                )
1516 	        message = "Resource '%s' is %s." % (
1517 	            resource_original,
1518 	            "; ".join(message_parts),
1519 	        )
1520 	    return {
1521 	        "message": message,
1522 	        "is_running": bool(nodes_started or nodes_promoted or nodes_unpromoted),
1523 	    }
1524 	
1525 	
1526 	def validate_wait_get_timeout(need_cib_support=True):
1527 	    """
1528 	    Commandline options:
1529 	      * --wait
1530 	      * -f - to check if -f and --wait are not used simultaneously
1531 	    """
1532 	    if need_cib_support and usefile:
1533 	        err("Cannot use '-f' together with '--wait'")
1534 	    wait_timeout = pcs_options["--wait"]
1535 	    if wait_timeout is None:
1536 	        return wait_timeout
1537 	    wait_timeout = timeout_to_seconds(wait_timeout)
1538 	    if wait_timeout is None:
1539 	        err(
1540 	            "%s is not a valid number of seconds to wait"
1541 	            % pcs_options["--wait"]
1542 	        )
1543 	    return wait_timeout
1544 	
1545 	
1546 	# Return matches from the CIB with the xpath_query
1547 	def get_cib_xpath(xpath_query):
1548 	    """
1549 	    Commandline options:
1550 	      * -f - CIB file
1551 	    """
1552 	    args = ["cibadmin", "-Q", "--xpath", xpath_query]
1553 	    output, retval = run(args)
1554 	    if retval != 0:
1555 	        return ""
1556 	    return output
1557 	
1558 	
1559 	def get_cib(scope=None):
1560 	    """
1561 	    Commandline options:
1562 	      * -f - CIB file
1563 	    """
1564 	    command = ["cibadmin", "-l", "-Q"]
1565 	    if scope:
1566 	        command.append("--scope=%s" % scope)
1567 	    output, retval = run(command)
1568 	    if retval != 0:
1569 	        if retval == 105 and scope:
1570 	            err("unable to get cib, scope '%s' not present in cib" % scope)
1571 	        else:
1572 	            err("unable to get cib")
1573 	    return output
1574 	
1575 	
1576 	def get_cib_dom(cib_xml=None):
1577 	    """
1578 	    Commandline options:
1579 	      * -f - CIB file
1580 	    """
1581 	    if cib_xml is None:
1582 	        cib_xml = get_cib()
1583 	    try:
1584 	        return parseString(cib_xml)
1585 	    except xml.parsers.expat.ExpatError:
1586 	        return err("unable to get cib")
1587 	
1588 	
1589 	# Replace only configuration section of cib with dom passed
1590 	def replace_cib_configuration(dom):
1591 	    """
1592 	    Commandline options:
1593 	      * -f - CIB file
1594 	    """
1595 	    new_dom = dom.toxml() if hasattr(dom, "toxml") else dom
1596 	    cmd = ["cibadmin", "--replace", "-V", "--xml-pipe", "-o", "configuration"]
1597 	    output, retval = run(cmd, False, new_dom)
1598 	    if retval != 0:
1599 	        err("Unable to update cib\n" + output)
1600 	
1601 	
1602 	def is_valid_cib_scope(scope):
1603 	    """
1604 	    Commandline options: no options
1605 	    """
1606 	    return scope in [
1607 	        "acls",
1608 	        "alerts",
1609 	        "configuration",
1610 	        "constraints",
1611 	        "crm_config",
1612 	        "fencing-topology",
1613 	        "nodes",
1614 	        "op_defaults",
1615 	        "resources",
1616 	        "rsc_defaults",
1617 	        "tags",
1618 	    ]
1619 	
1620 	
1621 	# Checks to see if id exists in the xml dom passed
1622 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1623 	def does_id_exist(dom, check_id):  # noqa: PLR0912
1624 	    """
1625 	    Commandline options: no options
1626 	    """
1627 	    # do not search in /cib/status, it may contain references to previously
1628 	    # existing and deleted resources and thus preventing creating them again
1629 	    document = (
1630 	        dom if isinstance(dom, xml.dom.minidom.Document) else dom.ownerDocument
1631 	    )
1632 	    cib_found = False
1633 	    for cib in _dom_get_children_by_tag_name(document, "cib"):
1634 	        cib_found = True
1635 	        for section in cib.childNodes:
1636 	            if section.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1637 	                continue
1638 	            if section.tagName == "status":
1639 	                continue
1640 	            for elem in section.getElementsByTagName("*"):
1641 	                if elem.getAttribute("id") == check_id:
1642 	                    return True
1643 	    if not cib_found:
1644 	        for elem in document.getElementsByTagName("*"):
1645 	            if elem.getAttribute("id") == check_id:
1646 	                return True
1647 	    return False
1648 	
1649 	
1650 	# Returns check_id if it doesn't exist in the dom, otherwise it adds an integer
1651 	# to the end of the id and increments it until a unique id is found
1652 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1653 	def find_unique_id(dom, check_id):
1654 	    """
1655 	    Commandline options: no options
1656 	    """
1657 	    counter = 1
1658 	    temp_id = check_id
1659 	    while does_id_exist(dom, temp_id):
1660 	        temp_id = check_id + "-" + str(counter)
1661 	        counter += 1
1662 	    return temp_id
1663 	
1664 	
1665 	# Checks to see if the specified operation already exists in passed set of
1666 	# operations
1667 	# pacemaker differentiates between operations only by name and interval
1668 	def operation_exists(operations_el, op_el):
1669 	    """
1670 	    Commandline options: no options
1671 	    """
1672 	    op_name = op_el.getAttribute("name")
1673 	    op_interval = timeout_to_seconds_legacy(op_el.getAttribute("interval"))
1674 	    return [
1675 	        op
1676 	        for op in operations_el.getElementsByTagName("op")
1677 	        if (
1678 	            op.getAttribute("name") == op_name
1679 	            and timeout_to_seconds_legacy(op.getAttribute("interval"))
1680 	            == op_interval
1681 	        )
1682 	    ]
1683 	
1684 	
1685 	def operation_exists_by_name(operations_el, op_el):
1686 	    """
1687 	    Commandline options: no options
1688 	    """
1689 	
1690 	    def get_role(_el, new_roles_supported):
1691 	        return common_pacemaker.role.get_value_for_cib(
1692 	            _el.getAttribute("role") or const.PCMK_ROLE_STARTED,
1693 	            new_roles_supported,
1694 	        )
1695 	
1696 	    new_roles_supported = isCibVersionSatisfied(
1697 	        operations_el, const.PCMK_NEW_ROLES_CIB_VERSION
1698 	    )
1699 	    existing = []
1700 	    op_name = op_el.getAttribute("name")
1701 	    op_role = get_role(op_el, new_roles_supported)
1702 	    ocf_check_level = None
1703 	    if op_name == "monitor":
1704 	        ocf_check_level = get_operation_ocf_check_level(op_el)
1705 	
1706 	    for op in operations_el.getElementsByTagName("op"):
1707 	        if op.getAttribute("name") == op_name:
1708 	            if (
1709 	                op_name != "monitor"
1710 	                or get_role(op, new_roles_supported) == op_role
1711 	                and ocf_check_level == get_operation_ocf_check_level(op)
1712 	            ):
1713 	                existing.append(op)
1714 	    return existing
1715 	
1716 	
1717 	def get_operation_ocf_check_level(operation_el):
1718 	    """
1719 	    Commandline options: no options
1720 	    """
1721 	    for attr_el in operation_el.getElementsByTagName("instance_attributes"):
1722 	        for nvpair_el in attr_el.getElementsByTagName("nvpair"):
1723 	            if (
1724 	                nvpair_el.getAttribute("name")
1725 	                == OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME
1726 	            ):
1727 	                return nvpair_el.getAttribute("value")
1728 	    return None
1729 	
1730 	
1731 	def set_node_attribute(prop, value, node):
1732 	    """
1733 	    Commandline options:
1734 	      * -f - CIB file
1735 	      * --force - no error if attribute to delete doesn't exist
1736 	    """
1737 	    if value == "":
1738 	        o, r = run(
1739 	            [
1740 	                "crm_attribute",
1741 	                "-t",
1742 	                "nodes",
1743 	                "--node",
1744 	                node,
1745 	                "--name",
1746 	                prop,
1747 	                "--query",
1748 	            ]
1749 	        )
1750 	        if r != 0 and "--force" not in pcs_options:
1751 	            err(
1752 	                "attribute: '%s' doesn't exist for node: '%s'" % (prop, node),
1753 	                False,
1754 	            )
1755 	            # This return code is used by pcsd
1756 	            sys.exit(2)
1757 	        o, r = run(
1758 	            [
1759 	                "crm_attribute",
1760 	                "-t",
1761 	                "nodes",
1762 	                "--node",
1763 	                node,
1764 	                "--name",
1765 	                prop,
1766 	                "--delete",
1767 	            ]
1768 	        )
1769 	    else:
1770 	        o, r = run(
1771 	            [
1772 	                "crm_attribute",
1773 	                "-t",
1774 	                "nodes",
1775 	                "--node",
1776 	                node,
1777 	                "--name",
1778 	                prop,
1779 	                "--update",
1780 	                value,
1781 	            ]
1782 	        )
1783 	
1784 	    if r != 0:
1785 	        err("unable to set attribute %s\n%s" % (prop, o))
1786 	
1787 	
1788 	def getTerminalSize(fd=1):
1789 	    """
1790 	    Returns height and width of current terminal. First tries to get
1791 	    size via termios.TIOCGWINSZ, then from environment. Defaults to 25
1792 	    lines x 80 columns if both methods fail.
1793 	
1794 	    :param fd: file descriptor (default: 1=stdout)
1795 	
1796 	    Commandline options: no options
1797 	    """
1798 	    try:
1799 	        hw = struct.unpack(
1800 	            str("hh"), fcntl.ioctl(fd, termios.TIOCGWINSZ, "1234")
1801 	        )
1802 	    except OSError:
1803 	        try:
1804 	            hw = (os.environ["LINES"], os.environ["COLUMNS"])
1805 	        except KeyError:
1806 	            hw = (25, 80)
1807 	    return hw
1808 	
1809 	
1810 	def get_terminal_input(message=None):
1811 	    """
1812 	    Commandline options: no options
1813 	    """
1814 	    if message:
1815 	        sys.stdout.write(message)
1816 	        sys.stdout.flush()
1817 	    try:
1818 	        return input("")
1819 	    except EOFError:
1820 	        return ""
1821 	    except KeyboardInterrupt:
1822 	        print("Interrupted")
1823 	        sys.exit(1)
1824 	
1825 	
1826 	def _get_continue_confirmation_interactive(warning_text: str) -> bool:
1827 	    """
1828 	    Warns user and asks for permission to continue. Returns True if user wishes
1829 	    to continue, False otherwise.
1830 	
1831 	    This function is mostly intended for prompting user to confirm destructive
1832 	    operations - that's why WARNING is in all caps here and user is asked to
1833 	    explicitly type 'yes' or 'y' to continue.
1834 	
1835 	    warning_text -- describes action that we want the user to confirm
1836 	    """
1837 	    print(f"WARNING: {warning_text}")
1838 	    print("Type 'yes' or 'y' to proceed, anything else to cancel: ", end="")
1839 	    response = get_terminal_input()
1840 	    if response in ["yes", "y"]:
1841 	        return True
1842 	    print("Canceled")
1843 	    return False
1844 	
1845 	
1846 	def is_run_interactive() -> bool:
1847 	    """
1848 	    Return True if pcs is running in an interactive environment, False otherwise
1849 	    """
1850 	    return (
1851 	        sys.stdin is not None
1852 	        and sys.stdout is not None
1853 	        and sys.stdin.isatty()
1854 	        and sys.stdout.isatty()
1855 	    )
1856 	
1857 	
1858 	def get_continue_confirmation(
1859 	    warning_text: str, yes: bool, force: bool
1860 	) -> bool:
1861 	    """
1862 	    Either asks user to confirm continuation interactively or use --yes to
1863 	    override when running from a script. Returns True if user wants to continue.
1864 	    Returns False if user cancels the action. If a non-interactive environment
1865 	    is detected, pcs exits with an error formed from warning_text.
1866 	
1867 	    warning_text -- describes action that we want the user to confirm
1868 	    yes -- was --yes flag provided?
1869 	    force -- was --force flag provided? (deprecated)
1870 	    """
1871 	    if force and not yes:
1872 	        # Force may be specified for overriding library errors. We don't want
1873 	        # to report an issue in that case.
1874 	        # deprecated in the first pcs-0.12 version
1875 	        reports_output.deprecation_warning(
1876 	            "Using --force to confirm this action is deprecated and might be "
1877 	            "removed in a future release, use --yes instead"
1878 	        )
1879 	    if yes or force:
1880 	        reports_output.warn(warning_text)
1881 	        return True
1882 	    if not is_run_interactive():
1883 	        err(f"{warning_text}, use --yes to override")
1884 	        return False
1885 	    return _get_continue_confirmation_interactive(warning_text)
1886 	
1887 	
1888 	def get_terminal_password(message="Password: "):
1889 	    """
1890 	    Commandline options: no options
1891 	    """
1892 	    if sys.stdin is not None and sys.stdin.isatty():
1893 	        try:
1894 	            return getpass.getpass(message)
1895 	        except KeyboardInterrupt:
1896 	            print("Interrupted")
1897 	            sys.exit(1)
1898 	    else:
1899 	        return get_terminal_input(message)
1900 	
1901 	
1902 	# Returns an xml dom containing the current status of the cluster
1903 	# DEPRECATED, please use
1904 	# ClusterState(lib.pacemaker.live.get_cluster_status_dom()) instead
1905 	def getClusterState():
1906 	    """
1907 	    Commandline options:
1908 	      * -f - CIB file
1909 	    """
1910 	    xml_string, returncode = run(
1911 	        ["crm_mon", "--one-shot", "--output-as=xml", "--inactive"],
1912 	        ignore_stderr=True,
1913 	    )
1914 	    if returncode != 0:
1915 	        err("error running crm_mon, is pacemaker running?")
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1916 	    return parseString(xml_string)
1917 	
1918 	
1919 	def write_empty_cib(cibfile):
1920 	    """
1921 	    Commandline options: no options
1922 	    """
1923 	    empty_xml = """
1924 	        <cib admin_epoch="0" epoch="1" num_updates="1" validate-with="pacemaker-3.1">
1925 	          <configuration>
1926 	            <crm_config/>
1927 	            <nodes/>
1928 	            <resources/>
1929 	            <constraints/>
1930 	          </configuration>
1931 	          <status/>
1932 	        </cib>
1933 	    """
1934 	    with open(cibfile, "w") as f:
1935 	        f.write(empty_xml)
1936 	
1937 	
1938 	# Test if 'var' is a score or option (contains an '=')
1939 	def is_score_or_opt(var):
1940 	    """
1941 	    Commandline options: no options
1942 	    """
1943 	    if is_score(var):
1944 	        return True
1945 	    return var.find("=") != -1
1946 	
1947 	
1948 	def is_score(var):
1949 	    """
1950 	    Commandline options: no options
1951 	    """
1952 	    return is_score_value(var)
1953 	
1954 	
1955 	def validate_xml_id(var: str, description: str = "id") -> Tuple[bool, str]:
1956 	    """
1957 	    Commandline options: no options
1958 	    """
1959 	    report_list: ReportItemList = []
1960 	    validate_id(var, description, report_list)
1961 	    if report_list:
1962 	        return False, report_list[0].message.message
1963 	    return True, ""
1964 	
1965 	
1966 	def err(errorText: str, exit_after_error: bool = True) -> None:
1967 	    retval = reports_output.error(errorText)
1968 	    if exit_after_error:
1969 	        raise retval
1970 	
1971 	
1972 	@lru_cache(typed=True)
1973 	def get_service_manager() -> ServiceManagerInterface:
1974 	    return _get_service_manager(cmd_runner(), get_report_processor())
1975 	
1976 	
1977 	def enableServices():
1978 	    """
1979 	    Commandline options: no options
1980 	    """
1981 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
1982 	    service_list = ["corosync", "pacemaker"]
1983 	    if need_to_handle_qdevice_service():
1984 	        service_list.append("corosync-qdevice")
1985 	    service_manager = get_service_manager()
1986 	
1987 	    report_item_list = []
1988 	    for service in service_list:
1989 	        try:
1990 	            service_manager.enable(service)
1991 	        except ManageServiceError as e:
1992 	            report_item_list.append(service_exception_to_report(e))
1993 	    if report_item_list:
1994 	        raise LibraryError(*report_item_list)
1995 	
1996 	
1997 	def disableServices():
1998 	    """
1999 	    Commandline options: no options
2000 	    """
2001 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
2002 	    service_list = ["corosync", "pacemaker"]
2003 	    if need_to_handle_qdevice_service():
2004 	        service_list.append("corosync-qdevice")
2005 	    service_manager = get_service_manager()
2006 	
2007 	    report_item_list = []
2008 	    for service in service_list:
2009 	        try:
2010 	            service_manager.disable(service)
2011 	        except ManageServiceError as e:
2012 	            report_item_list.append(service_exception_to_report(e))
2013 	    if report_item_list:
2014 	        raise LibraryError(*report_item_list)
2015 	
2016 	
2017 	def start_service(service):
2018 	    """
2019 	    Commandline options: no options
2020 	    """
2021 	    service_manager = get_service_manager()
2022 	
2023 	    try:
2024 	        service_manager.start(service)
2025 	    except ManageServiceError as e:
2026 	        raise LibraryError(service_exception_to_report(e)) from e
2027 	
2028 	
2029 	def stop_service(service):
2030 	    """
2031 	    Commandline options: no options
2032 	    """
2033 	    service_manager = get_service_manager()
2034 	
2035 	    try:
2036 	        service_manager.stop(service)
2037 	    except ManageServiceError as e:
2038 	        raise LibraryError(service_exception_to_report(e)) from e
2039 	
2040 	
2041 	def write_file(path, data, permissions=0o644, binary=False):
2042 	    """
2043 	    Commandline options:
2044 	      * --force - overwrite a file if it already exists
2045 	    """
2046 	    if os.path.exists(path):
2047 	        if "--force" not in pcs_options:
2048 	            return False, "'%s' already exists, use --force to overwrite" % path
2049 	        try:
2050 	            os.remove(path)
2051 	        except EnvironmentError as e:
2052 	            return False, "unable to remove '%s': %s" % (path, e)
2053 	    mode = "wb" if binary else "w"
2054 	    try:
2055 	        with os.fdopen(
2056 	            os.open(path, os.O_WRONLY | os.O_CREAT, permissions), mode
2057 	        ) as outfile:
2058 	            outfile.write(data)
2059 	    except EnvironmentError as e:
2060 	        return False, "unable to write to '%s': %s" % (path, e)
2061 	    return True, ""
2062 	
2063 	
2064 	def tar_add_file_data(  # noqa: PLR0913
2065 	    tarball,
2066 	    data,
2067 	    name,
2068 	    *,
2069 	    mode=None,
2070 	    uid=None,
2071 	    gid=None,
2072 	    uname=None,
2073 	    gname=None,
2074 	    mtime=None,
2075 	):
2076 	    # pylint: disable=too-many-arguments
2077 	    """
2078 	    Commandline options: no options
2079 	    """
2080 	    info = tarfile.TarInfo(name)
2081 	    info.size = len(data)
2082 	    info.type = tarfile.REGTYPE
2083 	    info.mtime = int(time.time()) if mtime is None else mtime
2084 	    if mode is not None:
2085 	        info.mode = mode
2086 	    if uid is not None:
2087 	        info.uid = uid
2088 	    if gid is not None:
2089 	        info.gid = gid
2090 	    if uname is not None:
2091 	        info.uname = uname
2092 	    if gname is not None:
2093 	        info.gname = gname
2094 	    data_io = BytesIO(data)
2095 	    tarball.addfile(info, data_io)
2096 	    data_io.close()
2097 	
2098 	
2099 	# DEPRECATED, please use pcs.lib.pacemaker.live.simulate_cib
2100 	def simulate_cib(cib_dom):
2101 	    """
2102 	    Commandline options: no options
2103 	    """
2104 	    try:
2105 	        with (
2106 	            tempfile.NamedTemporaryFile(
2107 	                mode="w+", suffix=".pcs"
2108 	            ) as new_cib_file,
2109 	            tempfile.NamedTemporaryFile(
2110 	                mode="w+", suffix=".pcs"
2111 	            ) as transitions_file,
2112 	        ):
2113 	            output, retval = run(
2114 	                [
2115 	                    "crm_simulate",
2116 	                    "--simulate",
2117 	                    "--save-output",
2118 	                    new_cib_file.name,
2119 	                    "--save-graph",
2120 	                    transitions_file.name,
2121 	                    "--xml-pipe",
2122 	                ],
2123 	                string_for_stdin=cib_dom.toxml(),
2124 	            )
2125 	            if retval != 0:
2126 	                return err("Unable to run crm_simulate:\n%s" % output)
2127 	            new_cib_file.seek(0)
2128 	            transitions_file.seek(0)
2129 	            return (
2130 	                output,
2131 	                parseString(transitions_file.read()),
2132 	                parseString(new_cib_file.read()),
2133 	            )
2134 	    except (EnvironmentError, xml.parsers.expat.ExpatError) as e:
2135 	        return err("Unable to run crm_simulate:\n%s" % e)
2136 	    except xml.etree.ElementTree.ParseError as e:
2137 	        return err("Unable to run crm_simulate:\n%s" % e)
2138 	
2139 	
2140 	# DEPRECATED
2141 	# please use pcs.lib.pacemaker.simulate.get_operations_from_transitions
2142 	def get_operations_from_transitions(transitions_dom):
2143 	    """
2144 	    Commandline options: no options
2145 	    """
2146 	    operation_list = []
2147 	    watched_operations = (
2148 	        "start",
2149 	        "stop",
2150 	        "promote",
2151 	        "demote",
2152 	        "migrate_from",
2153 	        "migrate_to",
2154 	    )
2155 	    for rsc_op in transitions_dom.getElementsByTagName("rsc_op"):
2156 	        primitives = rsc_op.getElementsByTagName("primitive")
2157 	        if not primitives:
2158 	            continue
2159 	        if rsc_op.getAttribute("operation").lower() not in watched_operations:
2160 	            continue
2161 	        for prim in primitives:
2162 	            prim_id = prim.getAttribute("id")
2163 	            operation_list.append(
2164 	                (
2165 	                    int(rsc_op.getAttribute("id")),
2166 	                    {
2167 	                        "id": prim_id,
2168 	                        "long_id": prim.getAttribute("long-id") or prim_id,
2169 	                        "operation": rsc_op.getAttribute("operation").lower(),
2170 	                        "on_node": rsc_op.getAttribute("on_node"),
2171 	                    },
2172 	                )
2173 	            )
2174 	    operation_list.sort(key=lambda x: x[0])
2175 	    return [op[1] for op in operation_list]
2176 	
2177 	
2178 	def get_resources_location_from_operations(cib_dom, resources_operations):
2179 	    """
2180 	    Commandline options:
2181 	      * --force - allow constraints on any resource, may not have any effect as
2182 	        an invalid constraint is ignored anyway
2183 	    """
2184 	    locations = {}
2185 	    for res_op in resources_operations:
2186 	        operation = res_op["operation"]
2187 	        if operation not in ("start", "promote", "migrate_from"):
2188 	            continue
2189 	        long_id = res_op["long_id"]
2190 	        if long_id not in locations:
2191 	            # Move clone instances as if they were non-cloned resources, it
2192 	            # really works with current pacemaker (1.1.13-6). Otherwise there
2193 	            # is probably no way to move them other then setting their
2194 	            # stickiness to 0.
2195 	            res_id = res_op["id"]
2196 	            if ":" in res_id:
2197 	                res_id = res_id.split(":")[0]
2198 	            id_for_constraint = validate_constraint_resource(cib_dom, res_id)[2]
2199 	            if not id_for_constraint:
2200 	                continue
2201 	            locations[long_id] = {
2202 	                "id": res_op["id"],
2203 	                "long_id": long_id,
2204 	                "id_for_constraint": id_for_constraint,
2205 	            }
2206 	        if operation in ("start", "migrate_from"):
2207 	            locations[long_id]["start_on_node"] = res_op["on_node"]
2208 	        if operation == "promote":
2209 	            locations[long_id]["promote_on_node"] = res_op["on_node"]
2210 	    return {
2211 	        key: val
2212 	        for key, val in locations.items()
2213 	        if "start_on_node" in val or "promote_on_node" in val
2214 	    }
2215 	
2216 	
2217 	def get_remote_quorumtool_output(node):
2218 	    """
2219 	    Commandline options:
2220 	      * --request-timeout - timeout for HTTP requests
2221 	    """
2222 	    return sendHTTPRequest(node, "remote/get_quorum_info", None, False, False)
2223 	
2224 	
2225 	# return True if quorumtool_output is a string returned when the node is off
2226 	def is_node_offline_by_quorumtool_output(quorum_info):
2227 	    """
2228 	    Commandline options: no options
2229 	    """
2230 	    return quorum_info.strip() == "Cannot initialize CMAP service"
2231 	
2232 	
2233 	def dom_prepare_child_element(dom_element, tag_name, id_candidate):
2234 	    """
2235 	    Commandline options: no options
2236 	    """
2237 	    child_elements = [
2238 	        child
2239 	        for child in dom_element.childNodes
2240 	        if child.nodeType == child.ELEMENT_NODE and child.tagName == tag_name
2241 	    ]
2242 	
2243 	    if not child_elements:
2244 	        dom = dom_element.ownerDocument
2245 	        child_element = dom.createElement(tag_name)
2246 	        child_element.setAttribute("id", find_unique_id(dom, id_candidate))
2247 	        dom_element.appendChild(child_element)
2248 	    else:
2249 	        child_element = child_elements[0]
2250 	    return child_element
2251 	
2252 	
2253 	def dom_update_nvset(dom_element, nvpair_tuples, tag_name, id_candidate):
2254 	    """
2255 	    Commandline options: no options
2256 	    """
2257 	    # Already ported to pcs.libcib.nvpair
2258 	
2259 	    # Do not ever remove the nvset element, even if it is empty. There may be
2260 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2261 	    # and removing) but not nvsets. In such a case, removing the nvset would
2262 	    # cause the whole change to be rejected by pacemaker with a "permission
2263 	    # denied" message.
2264 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2265 	    if not nvpair_tuples:
2266 	        return
2267 	
2268 	    only_removing = True
2269 	    for _, value in nvpair_tuples:
2270 	        if value != "":
2271 	            only_removing = False
2272 	            break
2273 	
2274 	    # Do not use dom.getElementsByTagName, that would get elements we do not
2275 	    # want to. For example if dom_element is a clone, we would get the clones's
2276 	    # as well as clone's primitive's attributes.
2277 	    nvset_element_list = _dom_get_children_by_tag_name(dom_element, tag_name)
2278 	
2279 	    # Do not create new nvset if we are only removing values from it.
2280 	    if not nvset_element_list and only_removing:
2281 	        return
2282 	
2283 	    if not nvset_element_list:
2284 	        dom = dom_element.ownerDocument
2285 	        nvset_element = dom.createElement(tag_name)
2286 	        nvset_element.setAttribute("id", find_unique_id(dom, id_candidate))
2287 	        dom_element.appendChild(nvset_element)
2288 	    else:
2289 	        nvset_element = nvset_element_list[0]
2290 	
2291 	    for name, value in nvpair_tuples:
2292 	        dom_update_nv_pair(
2293 	            nvset_element, name, value, nvset_element.getAttribute("id") + "-"
2294 	        )
2295 	
2296 	
2297 	def dom_update_nv_pair(dom_element, name, value, id_prefix=""):
2298 	    """
2299 	    Commandline options: no options
2300 	    """
2301 	    # Do not ever remove the nvset element, even if it is empty. There may be
2302 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2303 	    # and removing) but not nvsets. In such a case, removing the nvset would
2304 	    # cause the whole change to be rejected by pacemaker with a "permission
2305 	    # denied" message.
2306 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2307 	
2308 	    dom = dom_element.ownerDocument
2309 	    element_found = False
2310 	    for el in dom_element.getElementsByTagName("nvpair"):
2311 	        if el.getAttribute("name") == name:
2312 	            element_found = True
2313 	            if value == "":
2314 	                dom_element.removeChild(el)
2315 	            else:
2316 	                el.setAttribute("value", value)
2317 	            break
2318 	    if not element_found and value != "":
2319 	        el = dom.createElement("nvpair")
2320 	        el.setAttribute("id", id_prefix + name)
2321 	        el.setAttribute("name", name)
2322 	        el.setAttribute("value", value)
2323 	        dom_element.appendChild(el)
2324 	    return dom_element
2325 	
2326 	
2327 	# Passed an array of strings ["a=b","c=d"], return array of tuples
2328 	# [("a","b"),("c","d")]
2329 	def convert_args_to_tuples(ra_values):
2330 	    """
2331 	    Commandline options: no options
2332 	    """
2333 	    ret = []
2334 	    for ra_val in ra_values:
2335 	        if ra_val.count("=") != 0:
2336 	            split_val = ra_val.split("=", 1)
2337 	            ret.append((split_val[0], split_val[1]))
2338 	    return ret
2339 	
2340 	
2341 	def is_int(val):
2342 	    try:
2343 	        int(val)
2344 	        return True
2345 	    except ValueError:
2346 	        return False
2347 	
2348 	
2349 	def dom_update_utilization(dom_element, attributes, id_prefix=""):
2350 	    """
2351 	    Commandline options: no options
2352 	    """
2353 	    attr_tuples = []
2354 	    for name, value in sorted(attributes.items()):
2355 	        if value != "" and not is_int(value):
2356 	            err(
2357 	                "Value of utilization attribute must be integer: "
2358 	                "'{0}={1}'".format(name, value)
2359 	            )
2360 	        attr_tuples.append((name, value))
2361 	    dom_update_nvset(
2362 	        dom_element,
2363 	        attr_tuples,
2364 	        "utilization",
2365 	        id_prefix + dom_element.getAttribute("id") + "-utilization",
2366 	    )
2367 	
2368 	
2369 	def dom_update_meta_attr(dom_element, attributes):
2370 	    """
2371 	    Commandline options: no options
2372 	    """
2373 	    dom_update_nvset(
2374 	        dom_element,
2375 	        attributes,
2376 	        "meta_attributes",
2377 	        dom_element.getAttribute("id") + "-meta_attributes",
2378 	    )
2379 	
2380 	
2381 	def dom_update_instance_attr(dom_element, attributes):
2382 	    """
2383 	    Commandline options: no options
2384 	    """
2385 	    dom_update_nvset(
2386 	        dom_element,
2387 	        attributes,
2388 	        "instance_attributes",
2389 	        dom_element.getAttribute("id") + "-instance_attributes",
2390 	    )
2391 	
2392 	
2393 	def get_utilization(element, filter_name=None):
2394 	    """
2395 	    Commandline options: no options
2396 	    """
2397 	    utilization = {}
2398 	    for e in element.getElementsByTagName("utilization"):
2399 	        for u in e.getElementsByTagName("nvpair"):
2400 	            name = u.getAttribute("name")
2401 	            if filter_name is not None and name != filter_name:
2402 	                continue
2403 	            utilization[name] = u.getAttribute("value")
2404 	        # Use just first element of utilization attributes. We don't support
2405 	        # utilization with rules just yet.
2406 	        break
2407 	    return utilization
2408 	
2409 	
2410 	def get_utilization_str(element, filter_name=None):
2411 	    """
2412 	    Commandline options: no options
2413 	    """
2414 	    output = []
2415 	    for name, value in sorted(get_utilization(element, filter_name).items()):
2416 	        output.append(name + "=" + value)
2417 	    return " ".join(output)
2418 	
2419 	
2420 	def get_lib_env() -> LibraryEnvironment:
2421 	    """
2422 	    Commandline options:
2423 	      * -f - CIB file
2424 	      * --corosync_conf - corosync.conf file
2425 	      * --request-timeout - timeout of HTTP requests
2426 	    """
2427 	    user = None
2428 	    groups = None
2429 	    if os.geteuid() == 0:
2430 	        for name in ("CIB_user", "CIB_user_groups"):
2431 	            if name in os.environ and os.environ[name].strip():
2432 	                value = os.environ[name].strip()
2433 	                if name == "CIB_user":
2434 	                    user = value
2435 	                else:
2436 	                    groups = value.split(" ")
2437 	
2438 	    cib_data = None
2439 	    if usefile:
2440 	        cib_data = get_cib()
2441 	
2442 	    corosync_conf_data = None
2443 	    if "--corosync_conf" in pcs_options:
2444 	        conf = pcs_options["--corosync_conf"]
2445 	        try:
2446 	            with open(conf) as corosync_conf_file:
2447 	                corosync_conf_data = corosync_conf_file.read()
2448 	        except IOError as e:
2449 	            err("Unable to read %s: %s" % (conf, e.strerror))
2450 	
2451 	    return LibraryEnvironment(
2452 	        logging.getLogger("pcs"),
2453 	        get_report_processor(),
2454 	        user,
2455 	        groups,
2456 	        cib_data,
2457 	        corosync_conf_data,
2458 	        known_hosts_getter=read_known_hosts_file,
2459 	        request_timeout=pcs_options.get("--request-timeout"),
2460 	    )
2461 	
2462 	
2463 	def get_cib_user_groups():
2464 	    """
2465 	    Commandline options: no options
2466 	    """
2467 	    user = None
2468 	    groups = None
2469 	    if os.geteuid() == 0:
2470 	        for name in ("CIB_user", "CIB_user_groups"):
2471 	            if name in os.environ and os.environ[name].strip():
2472 	                value = os.environ[name].strip()
2473 	                if name == "CIB_user":
2474 	                    user = value
2475 	                else:
2476 	                    groups = value.split(" ")
2477 	    return user, groups
2478 	
2479 	
2480 	def get_cli_env():
2481 	    """
2482 	    Commandline options:
2483 	      * --debug
2484 	      * --request-timeout
2485 	    """
2486 	    env = Env()
2487 	    env.user, env.groups = get_cib_user_groups()
2488 	    env.known_hosts_getter = read_known_hosts_file
2489 	    env.report_processor = get_report_processor()
2490 	    env.request_timeout = pcs_options.get("--request-timeout")
2491 	    return env
2492 	
2493 	
2494 	def get_middleware_factory():
2495 	    """
2496 	    Commandline options:
2497 	      * --corosync_conf
2498 	      * --name
2499 	      * --booth-conf
2500 	      * --booth-key
2501 	      * -f
2502 	    """
2503 	    return middleware.create_middleware_factory(
2504 	        cib=middleware.cib(filename if usefile else None, touch_cib_file),
2505 	        corosync_conf_existing=middleware.corosync_conf_existing(
2506 	            pcs_options.get("--corosync_conf")
2507 	        ),
2508 	        booth_conf=pcs.cli.booth.env.middleware_config(
2509 	            pcs_options.get("--booth-conf"),
2510 	            pcs_options.get("--booth-key"),
2511 	        ),
2512 	    )
2513 	
2514 	
2515 	def get_library_wrapper():
2516 	    """
2517 	    Commandline options:
2518 	      * --debug
2519 	      * --request-timeout
2520 	      * --corosync_conf
2521 	      * --name
2522 	      * --booth-conf
2523 	      * --booth-key
2524 	      * -f
2525 	    NOTE: usage of options may depend on used middleware for particular command
2526 	    """
2527 	    return Library(get_cli_env(), get_middleware_factory())
2528 	
2529 	
2530 	def exit_on_cmdline_input_error(
2531 	    error: CmdLineInputError, main_name: str, usage_name: StringSequence
2532 	) -> None:
2533 	    if error and error.message:
2534 	        reports_output.error(error.message)
2535 	    if error and error.hint:
2536 	        print_to_stderr(f"Hint: {error.hint}")
2537 	    if not error or (not error.message or error.show_both_usage_and_message):
2538 	        usage.show(main_name, list(usage_name))
2539 	    sys.exit(1)
2540 	
2541 	
2542 	def get_report_processor() -> ReportProcessor:
2543 	    return ReportProcessorToConsole(debug="--debug" in pcs_options)
2544 	
2545 	
2546 	def get_user_and_pass() -> tuple[str, str]:
2547 	    """
2548 	    Commandline options:
2549 	      * -u - username
2550 	      * -p - password
2551 	    """
2552 	    username = (
2553 	        pcs_options["-u"]
2554 	        if "-u" in pcs_options
2555 	        else get_terminal_input("Username: ")
2556 	    )
2557 	    password = (
2558 	        pcs_options["-p"] if "-p" in pcs_options else get_terminal_password()
2559 	    )
2560 	    return username, password
2561 	
2562 	
2563 	def get_input_modifiers() -> InputModifiers:
2564 	    return InputModifiers(pcs_options)
2565 	
2566 	
2567 	def get_token_from_file(file_name: str) -> str:
2568 	    try:
2569 	        with open(file_name, "rb") as file:
2570 	            # 256 to stay backwards compatible
2571 	            max_size = 256
2572 	            value_bytes = file.read(max_size + 1)
2573 	            if len(value_bytes) > max_size:
2574 	                err(f"Maximal token size of {max_size} bytes exceeded")
2575 	            if not value_bytes:
2576 	                err(f"File '{file_name}' is empty")
2577 	            return base64.b64encode(value_bytes).decode("utf-8")
2578 	    except OSError as e:
2579 	        err(f"Unable to read file '{file_name}': {e}", exit_after_error=False)
2580 	        raise SystemExit(1) from e
2581 	
2582 	
2583 	def print_warning_if_utilization_attrs_has_no_effect(
2584 	    properties_facade: PropertyConfigurationFacade,
2585 	) -> None:
2586 	    PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS = [
2587 	        "balanced",
2588 	        "minimal",
2589 	        "utilization",
2590 	    ]
2591 	    value = properties_facade.get_property_value_or_default(
2592 	        "placement-strategy"
2593 	    )
2594 	    if value not in PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS:
2595 	        reports_output.warn(
2596 	            "Utilization attributes configuration has no effect until cluster "
2597 	            "property option 'placement-strategy' is set to one of the "
2598 	            "values: "
2599 	            f"{format_list(PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS)}"
2600 	        )
2601