1    	# pylint: disable=too-many-lines
2    	import base64
3    	import fcntl
4    	import getpass
5    	import json
6    	import logging
7    	import os
8    	import re
9    	import signal
10   	import struct
11   	import subprocess
12   	import sys
13   	import tarfile
14   	import tempfile
15   	import termios
16   	import threading
17   	import time
18   	import xml.dom.minidom
19   	import xml.etree.ElementTree as ET
20   	from functools import lru_cache
21   	from io import BytesIO
22   	from textwrap import dedent
23   	from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
24   	from urllib.parse import urlencode
25   	from xml.dom.minidom import Document as DomDocument
26   	from xml.dom.minidom import parseString
27   	
28   	import pcs.cli.booth.env
29   	import pcs.lib.corosync.config_parser as corosync_conf_parser
30   	from pcs import settings, usage
31   	from pcs.cli.cluster_property.output import PropertyConfigurationFacade
32   	from pcs.cli.common import middleware
33   	from pcs.cli.common.env_cli import Env
34   	from pcs.cli.common.errors import CmdLineInputError
35   	from pcs.cli.common.lib_wrapper import Library
36   	from pcs.cli.common.parse_args import InputModifiers
37   	from pcs.cli.common.tools import print_to_stderr, timeout_to_seconds_legacy
38   	from pcs.cli.file import metadata as cli_file_metadata
39   	from pcs.cli.reports import ReportProcessorToConsole, process_library_reports
40   	from pcs.cli.reports import output as reports_output
41   	from pcs.common import const, file_type_codes
42   	from pcs.common import file as pcs_file
43   	from pcs.common import pacemaker as common_pacemaker
44   	from pcs.common import pcs_pycurl as pycurl
45   	from pcs.common.host import PcsKnownHost
46   	from pcs.common.pacemaker.resource.operations import (
47   	    OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME,
48   	)
49   	from pcs.common.reports import ReportProcessor
50   	from pcs.common.reports.messages import CibUpgradeFailedToMinimalRequiredVersion
51   	from pcs.common.services.errors import ManageServiceError
52   	from pcs.common.services.interfaces import ServiceManagerInterface
53   	from pcs.common.str_tools import format_list
54   	from pcs.common.tools import Version, timeout_to_seconds
55   	from pcs.common.types import StringSequence
56   	from pcs.lib.corosync.config_facade import ConfigFacade as corosync_conf_facade
57   	from pcs.lib.env import LibraryEnvironment
58   	from pcs.lib.errors import LibraryError
59   	from pcs.lib.external import CommandRunner, is_proxy_set
60   	from pcs.lib.file.instance import FileInstance as LibFileInstance
61   	from pcs.lib.interface.config import ParserErrorException
62   	from pcs.lib.pacemaker.live import get_cluster_status_dom
63   	from pcs.lib.pacemaker.state import ClusterState
64   	from pcs.lib.pacemaker.values import is_score as is_score_value
65   	from pcs.lib.pacemaker.values import validate_id
66   	from pcs.lib.services import get_service_manager as _get_service_manager
67   	from pcs.lib.services import service_exception_to_report
68   	
69   	if TYPE_CHECKING:
70   	    from pcs.common.reports.item import ReportItemList
71   	
72   	# pylint: disable=invalid-name
73   	# pylint: disable=too-many-branches
74   	
75   	# usefile & filename variables are set in pcs module
76   	usefile = False
77   	filename = ""
78   	# Note: not properly typed
79   	pcs_options: Dict[Any, Any] = {}
80   	
81   	
82   	def getValidateWithVersion(dom) -> Version:
83   	    """
84   	    Commandline options: no options
85   	    """
86   	    cib = dom.getElementsByTagName("cib")
87   	    if len(cib) != 1:
88   	        err("Bad cib")
89   	
90   	    cib = cib[0]
91   	
92   	    version = cib.getAttribute("validate-with")
93   	    r = re.compile(r"pacemaker-(\d+)\.(\d+)\.?(\d+)?")
94   	    m = r.match(version)
95   	    if m is None:
96   	        raise AssertionError()
97   	    major = int(m.group(1))
98   	    minor = int(m.group(2))
99   	    rev = int(m.group(3) or 0)
100  	    return Version(major, minor, rev)
101  	
102  	
103  	def isCibVersionSatisfied(cib_dom, required_version: Version) -> bool:
104  	    if not isinstance(cib_dom, DomDocument):
105  	        cib_dom = cib_dom.ownerDocument
106  	    return getValidateWithVersion(cib_dom) >= required_version
107  	
108  	
109  	# Check the current pacemaker version in cib and upgrade it if necessary
110  	# Returns False if not upgraded and True if upgraded
111  	def checkAndUpgradeCIB(required_version: Version) -> bool:
112  	    """
113  	    Commandline options:
114  	      * -f - CIB file
115  	    """
116  	    if isCibVersionSatisfied(get_cib_dom(), required_version):
117  	        return False
118  	    cluster_upgrade()
119  	    return True
120  	
121  	
122  	def cluster_upgrade():
123  	    """
124  	    Commandline options:
125  	      * -f - CIB file
126  	    """
127  	    output, retval = run(["cibadmin", "--upgrade", "--force"])
128  	    if retval != 0:
129  	        err("unable to upgrade cluster: %s" % output)
130  	    if (
131  	        output.strip()
132  	        == "Upgrade unnecessary: Schema is already the latest available"
133  	    ):
134  	        return
135  	    print_to_stderr("Cluster CIB has been upgraded to latest version")
136  	
137  	
138  	def cluster_upgrade_to_version(required_version: Version) -> Any:
139  	    """
140  	    Commandline options:
141  	      * -f - CIB file
142  	    """
143  	    checkAndUpgradeCIB(required_version)
144  	    dom = get_cib_dom()
145  	    current_version = getValidateWithVersion(dom)
146  	    if current_version < required_version:
147  	        err(
148  	            CibUpgradeFailedToMinimalRequiredVersion(
149  	                str(current_version),
150  	                str(required_version),
151  	            ).message
152  	        )
153  	    return dom
154  	
155  	
156  	# Check status of node
157  	def checkStatus(node):
158  	    """
159  	    Commandline options:
160  	      * --request-timeout - timeout for HTTP requests
161  	    """
162  	    return sendHTTPRequest(
163  	        node, "remote/status", urlencode({"version": "2"}), False, False
164  	    )
165  	
166  	
167  	# Check and see if we're authorized (faster than a status check)
168  	def checkAuthorization(node):
169  	    """
170  	    Commandline options:
171  	      * --request-timeout - timeout for HTTP requests
172  	    """
173  	    return sendHTTPRequest(node, "remote/check_auth", None, False, False)
174  	
175  	
176  	def get_uid_gid_file_name(uid, gid):
177  	    """
178  	    Commandline options: no options
179  	    """
180  	    return "pcs-uidgid-%s-%s" % (uid, gid)
181  	
182  	
183  	# Reads in uid file and returns dict of values {'uid':'theuid', 'gid':'thegid'}
184  	def read_uid_gid_file(uidgid_filename):
185  	    """
186  	    Commandline options: no options
187  	    """
188  	    uidgid = {}
189  	    with open(
190  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "r"
191  	    ) as myfile:
192  	        data = myfile.read().split("\n")
193  	    in_uidgid = False
194  	    for data_line in data:
195  	        line = re.sub(r"#.*", "", data_line)
196  	        if not in_uidgid:
197  	            if re.search(r"uidgid.*{", line):
198  	                in_uidgid = True
199  	            else:
200  	                continue
201  	        matches = re.search(r"uid:\s*(\S+)", line)
202  	        if matches:
203  	            uidgid["uid"] = matches.group(1)
204  	
205  	        matches = re.search(r"gid:\s*(\S+)", line)
206  	        if matches:
207  	            uidgid["gid"] = matches.group(1)
208  	
209  	    return uidgid
210  	
211  	
212  	def get_uidgid_file_content(
213  	    uid: Optional[str] = None, gid: Optional[str] = None
214  	) -> Optional[str]:
215  	    if not uid and not gid:
216  	        return None
217  	    uid_gid_lines = []
218  	    if uid:
219  	        uid_gid_lines.append(f"  uid: {uid}")
220  	    if gid:
221  	        uid_gid_lines.append(f"  gid: {gid}")
222  	    return dedent(
223  	        """\
224  	        uidgid {{
225  	        {uid_gid_keys}
226  	        }}
227  	        """
228  	    ).format(uid_gid_keys="\n".join(uid_gid_lines))
229  	
230  	
231  	def write_uid_gid_file(uid, gid):
232  	    """
233  	    Commandline options: no options
234  	    """
235  	    orig_filename = get_uid_gid_file_name(uid, gid)
236  	    uidgid_filename = orig_filename
237  	    counter = 0
238  	    if find_uid_gid_files(uid, gid):
239  	        err("uidgid file with uid=%s and gid=%s already exists" % (uid, gid))
240  	
241  	    while os.path.exists(
242  	        os.path.join(settings.corosync_uidgid_dir, uidgid_filename)
243  	    ):
244  	        counter = counter + 1
245  	        uidgid_filename = orig_filename + "-" + str(counter)
246  	
247  	    data = get_uidgid_file_content(uid, gid)
248  	    if data:
249  	        with open(
250  	            os.path.join(settings.corosync_uidgid_dir, uidgid_filename), "w"
251  	        ) as uidgid_file:
252  	            uidgid_file.write(data)
253  	
254  	
255  	def find_uid_gid_files(uid, gid):
256  	    """
257  	    Commandline options: no options
258  	    """
259  	    if uid == "" and gid == "":
260  	        return []
261  	
262  	    found_files = []
263  	    uid_gid_files = os.listdir(settings.corosync_uidgid_dir)
264  	    for uidgid_file in uid_gid_files:
265  	        uid_gid_dict = read_uid_gid_file(uidgid_file)
266  	        if ("uid" in uid_gid_dict and uid == "") or (
267  	            "uid" not in uid_gid_dict and uid != ""
268  	        ):
269  	            continue
270  	        if ("gid" in uid_gid_dict and gid == "") or (
271  	            "gid" not in uid_gid_dict and gid != ""
272  	        ):
273  	            continue
274  	        if "uid" in uid_gid_dict and uid != uid_gid_dict["uid"]:
275  	            continue
276  	        if "gid" in uid_gid_dict and gid != uid_gid_dict["gid"]:
277  	            continue
278  	
279  	        found_files.append(uidgid_file)
280  	
281  	    return found_files
282  	
283  	
284  	# Removes all uid/gid files with the specified uid/gid, returns false if we
285  	# couldn't find one
286  	def remove_uid_gid_file(uid, gid):
287  	    """
288  	    Commandline options: no options
289  	    """
290  	    if uid == "" and gid == "":
291  	        return False
292  	
293  	    file_removed = False
294  	    for uidgid_file in find_uid_gid_files(uid, gid):
295  	        os.remove(os.path.join(settings.corosync_uidgid_dir, uidgid_file))
296  	        file_removed = True
297  	
298  	    return file_removed
299  	
300  	
301  	@lru_cache()
302  	def read_known_hosts_file():
303  	    return read_known_hosts_file_not_cached()
304  	
305  	
306  	def read_known_hosts_file_not_cached():
307  	    """
308  	    Commandline options: no options
309  	    """
310  	    data = {}
311  	    try:
312  	        if os.getuid() != 0:
313  	            known_hosts_raw_file = pcs_file.RawFile(
314  	                cli_file_metadata.for_file_type(file_type_codes.PCS_KNOWN_HOSTS)
315  	            )
316  	            # json.loads handles bytes, it expects utf-8, 16 or 32 encoding
317  	            known_hosts_struct = json.loads(known_hosts_raw_file.read())
318  	        else:
319  	            # TODO remove
320  	            # This is here to provide known-hosts to functions not yet
321  	            # overhauled to pcs.lib. Cli should never read known hosts from
322  	            # /var/lib/pcsd/.
323  	            known_hosts_instance = LibFileInstance.for_known_hosts()
324  	            known_hosts_struct = known_hosts_instance.read_to_structure()
325  	
326  	        # TODO use known hosts facade for getting info from json struct once the
327  	        # facade exists
328  	        data = {
329  	            name: PcsKnownHost.from_known_host_file_dict(name, host)
330  	            for name, host in known_hosts_struct["known_hosts"].items()
331  	        }
332  	    except LibraryError as e:
333  	        # TODO remove
334  	        # This is here to provide known-hosts to functions not yet
335  	        # overhauled to pcs.lib. Cli should never read known hosts from
336  	        # /var/lib/pcsd/.
337  	        process_library_reports(e.args)
338  	    except ParserErrorException as e:
339  	        # TODO remove
340  	        # This is here to provide known-hosts to functions not yet
341  	        # overhauled to pcs.lib. Cli should never read known hosts from
342  	        # /var/lib/pcsd/.
343  	        process_library_reports(
344  	            known_hosts_instance.parser_exception_to_report_list(e)
345  	        )
346  	    except pcs_file.RawFileError as e:
347  	        reports_output.warn("Unable to read the known-hosts file: " + e.reason)
348  	    except json.JSONDecodeError as e:
349  	        reports_output.warn(f"Unable to parse the known-hosts file: {e}")
350  	    except (TypeError, KeyError):
351  	        reports_output.warn("Warning: Unable to parse the known-hosts file.")
352  	    return data
353  	
354  	
355  	def repeat_if_timeout(send_http_request_function, repeat_count=15):
356  	    """
357  	    Commandline options: no options
358  	    NOTE: callback send_http_request_function may use --request-timeout
359  	    """
360  	
361  	    def repeater(node, *args, **kwargs):
362  	        repeats_left = repeat_count
363  	        while True:
364  	            retval, output = send_http_request_function(node, *args, **kwargs)
365  	            if (
366  	                retval != 2
367  	                or "Operation timed out" not in output
368  	                or repeats_left < 1
369  	            ):
370  	                # did not timed out OR repeat limit exceeded
371  	                return retval, output
372  	            repeats_left = repeats_left - 1
373  	            if "--debug" in pcs_options:
374  	                print_to_stderr(f"{node}: {output}, trying again...")
375  	
376  	    return repeater
377  	
378  	
379  	# Set the corosync.conf file on the specified node
380  	def getCorosyncConfig(node):
381  	    """
382  	    Commandline options:
383  	      * --request-timeout - timeout for HTTP requests
384  	    """
385  	    return sendHTTPRequest(node, "remote/get_corosync_conf", None, False, False)
386  	
387  	
388  	def setCorosyncConfig(node, config):
389  	    """
390  	    Commandline options:
391  	      * --request-timeout - timeout for HTTP requests
392  	    """
393  	    data = urlencode({"corosync_conf": config})
394  	    (status, data) = sendHTTPRequest(node, "remote/set_corosync_conf", data)
395  	    if status != 0:
396  	        err("Unable to set corosync config: {0}".format(data))
397  	
398  	
399  	def getPacemakerNodeStatus(node):
400  	    """
401  	    Commandline options:
402  	      * --request-timeout - timeout for HTTP requests
403  	    """
404  	    return sendHTTPRequest(
405  	        node, "remote/pacemaker_node_status", None, False, False
406  	    )
407  	
408  	
409  	def startCluster(node, quiet=False, timeout=None):
410  	    """
411  	    Commandline options:
412  	      * --request-timeout - timeout for HTTP requests
413  	    """
414  	    return sendHTTPRequest(
415  	        node,
416  	        "remote/cluster_start",
417  	        printResult=False,
418  	        printSuccess=not quiet,
419  	        timeout=timeout,
420  	    )
421  	
422  	
423  	def stopPacemaker(node, quiet=False, force=True):
424  	    """
425  	    Commandline options:
426  	      * --request-timeout - timeout for HTTP requests
427  	    """
428  	    return stopCluster(
429  	        node, pacemaker=True, corosync=False, quiet=quiet, force=force
430  	    )
431  	
432  	
433  	def stopCorosync(node, quiet=False, force=True):
434  	    """
435  	    Commandline options:
436  	      * --request-timeout - timeout for HTTP requests
437  	    """
438  	    return stopCluster(
439  	        node, pacemaker=False, corosync=True, quiet=quiet, force=force
440  	    )
441  	
442  	
443  	def stopCluster(node, quiet=False, pacemaker=True, corosync=True, force=True):
444  	    """
445  	    Commandline options:
446  	      * --request-timeout - timeout for HTTP requests
447  	    """
448  	    data = {}
449  	    timeout = None
450  	    if pacemaker and not corosync:
451  	        data["component"] = "pacemaker"
452  	        timeout = 2 * 60
453  	    elif corosync and not pacemaker:
454  	        data["component"] = "corosync"
455  	    if force:
456  	        data["force"] = 1
457  	    data = urlencode(data)
458  	    return sendHTTPRequest(
459  	        node,
460  	        "remote/cluster_stop",
461  	        data,
462  	        printResult=False,
463  	        printSuccess=not quiet,
464  	        timeout=timeout,
465  	    )
466  	
467  	
468  	def enableCluster(node):
469  	    """
470  	    Commandline options:
471  	      * --request-timeout - timeout for HTTP requests
472  	    """
473  	    return sendHTTPRequest(node, "remote/cluster_enable", None, False, True)
474  	
475  	
476  	def disableCluster(node):
477  	    """
478  	    Commandline options:
479  	      * --request-timeout - timeout for HTTP requests
480  	    """
481  	    return sendHTTPRequest(node, "remote/cluster_disable", None, False, True)
482  	
483  	
484  	def destroyCluster(node, quiet=False):
485  	    """
486  	    Commandline options:
487  	      * --request-timeout - timeout for HTTP requests
488  	    """
489  	    return sendHTTPRequest(
490  	        node, "remote/cluster_destroy", None, not quiet, not quiet
491  	    )
492  	
493  	
494  	def restoreConfig(node, tarball_data):
495  	    """
496  	    Commandline options:
497  	      * --request-timeout - timeout for HTTP requests
498  	    """
499  	    data = urlencode({"tarball": tarball_data})
500  	    return sendHTTPRequest(node, "remote/config_restore", data, False, True)
501  	
502  	
503  	def pauseConfigSyncing(node, delay_seconds=300):
504  	    """
505  	    Commandline options:
506  	      * --request-timeout - timeout for HTTP requests
507  	    """
508  	    data = urlencode({"sync_thread_pause": delay_seconds})
509  	    return sendHTTPRequest(node, "remote/set_sync_options", data, False, False)
510  	
511  	
512  	def resumeConfigSyncing(node):
513  	    """
514  	    Commandline options:
515  	      * --request-timeout - timeout for HTTP requests
516  	    """
517  	    data = urlencode({"sync_thread_resume": 1})
518  	    return sendHTTPRequest(node, "remote/set_sync_options", data, False, False)
519  	
520  	
521  	# Send an HTTP request to a node return a tuple with status, data
522  	# If status is 0 then data contains server response
523  	# Otherwise if non-zero then data contains error message
524  	# Returns a tuple (error, error message)
525  	# 0 = Success,
526  	# 1 = HTTP Error
527  	# 2 = No response,
528  	# 3 = Auth Error
529  	# 4 = Permission denied
530  	def sendHTTPRequest(  # noqa: PLR0912, PLR0915
531  	    host, request, data=None, printResult=True, printSuccess=True, timeout=None
532  	):
533  	    """
534  	    Commandline options:
535  	      * --request-timeout - timeout for HTTP requests
536  	      * --debug
537  	    """
538  	    # pylint: disable=too-many-locals
539  	    # pylint: disable=too-many-statements
540  	    port = None
541  	    addr = host
542  	    token = None
543  	    known_host = read_known_hosts_file().get(host, None)
544  	    # TODO: do not allow communication with unknown host
545  	    if known_host:
546  	        port = known_host.dest.port
547  	        addr = known_host.dest.addr
548  	        token = known_host.token
549  	    if port is None:
550  	        port = settings.pcsd_default_port
551  	    url = "https://{host}:{port}/{request}".format(
552  	        host="[{0}]".format(addr) if ":" in addr else addr,
553  	        request=request,
554  	        port=port,
555  	    )
556  	    if "--debug" in pcs_options:
557  	        print_to_stderr(f"Sending HTTP Request to: {url}\nData: {data}")
558  	
559  	    def __debug_callback(data_type, debug_data):
560  	        prefixes = {
561  	            # pylint: disable=no-member
562  	            pycurl.DEBUG_TEXT: b"* ",
563  	            pycurl.DEBUG_HEADER_IN: b"< ",
564  	            pycurl.DEBUG_HEADER_OUT: b"> ",
565  	            pycurl.DEBUG_DATA_IN: b"<< ",
566  	            pycurl.DEBUG_DATA_OUT: b">> ",
567  	        }
568  	        if data_type in prefixes:
569  	            debug_output.write(prefixes[data_type])
570  	            debug_output.write(debug_data)
571  	            if not debug_data.endswith(b"\n"):
572  	                debug_output.write(b"\n")
573  	
574  	    output = BytesIO()
575  	    debug_output = BytesIO()
576  	    cookies = __get_cookie_list(token)
577  	    if not timeout:
578  	        timeout = settings.default_request_timeout
579  	    timeout = pcs_options.get("--request-timeout", timeout)
580  	
581  	    handler = pycurl.Curl()
582  	    handler.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTPS)
583  	    handler.setopt(pycurl.URL, url.encode("utf-8"))
584  	    handler.setopt(pycurl.WRITEFUNCTION, output.write)
585  	    handler.setopt(pycurl.VERBOSE, 1)
586  	    handler.setopt(pycurl.NOSIGNAL, 1)  # required for multi-threading
587  	    handler.setopt(pycurl.DEBUGFUNCTION, __debug_callback)
588  	    handler.setopt(pycurl.TIMEOUT_MS, int(timeout * 1000))
589  	    handler.setopt(pycurl.SSL_VERIFYHOST, 0)
590  	    handler.setopt(pycurl.SSL_VERIFYPEER, 0)
591  	    handler.setopt(pycurl.HTTPHEADER, ["Expect: "])
592  	    if cookies:
593  	        handler.setopt(pycurl.COOKIE, ";".join(cookies).encode("utf-8"))
594  	    if data:
595  	        handler.setopt(pycurl.COPYPOSTFIELDS, data.encode("utf-8"))
596  	    try:
597  	        handler.perform()
598  	        response_data = output.getvalue().decode("utf-8")
599  	        response_code = handler.getinfo(pycurl.RESPONSE_CODE)
600  	        if printResult or printSuccess:
601  	            print_to_stderr(host + ": " + response_data.strip())
602  	        if "--debug" in pcs_options:
603  	            print_to_stderr(
604  	                "Response Code: {response_code}\n"
605  	                "--Debug Response Start--\n"
606  	                "{response_data}\n"
607  	                "--Debug Response End--\n"
608  	                "Communication debug info for calling: {url}\n"
609  	                "--Debug Communication Output Start--\n"
610  	                "{debug_comm_output}\n"
611  	                "--Debug Communication Output End--".format(
612  	                    response_code=response_code,
613  	                    response_data=response_data,
614  	                    url=url,
615  	                    debug_comm_output=debug_output.getvalue().decode(
616  	                        "utf-8", "ignore"
617  	                    ),
618  	                )
619  	            )
620  	
621  	        if response_code == 401:
622  	            output = (
623  	                3,
624  	                (
625  	                    "Unable to authenticate to {node} - (HTTP error: {code}), "
626  	                    "try running 'pcs host auth {node}'"
627  	                ).format(node=host, code=response_code),
628  	            )
629  	        elif response_code == 403:
630  	            output = (
631  	                4,
632  	                "{node}: Permission denied - (HTTP error: {code})".format(
633  	                    node=host, code=response_code
634  	                ),
635  	            )
636  	        elif response_code >= 400:
637  	            output = (
638  	                1,
639  	                "Error connecting to {node} - (HTTP error: {code})".format(
640  	                    node=host, code=response_code
641  	                ),
642  	            )
643  	        else:
644  	            output = (0, response_data)
645  	
646  	        if printResult and output[0] != 0:
647  	            print_to_stderr(output[1])
648  	
649  	        return output
650  	    except pycurl.error as e:
651  	        if is_proxy_set(os.environ):
652  	            reports_output.warn(
653  	                "Proxy is set in environment variables, try disabling it"
654  	            )
655  	        # pylint: disable=unbalanced-tuple-unpacking
656  	        dummy_errno, reason = e.args
657  	        if "--debug" in pcs_options:
658  	            print_to_stderr(f"Response Reason: {reason}")
659  	        msg = (
660  	            "Unable to connect to {host}, check if pcsd is running there or try "
661  	            "setting higher timeout with --request-timeout option ({reason})"
662  	        ).format(host=host, reason=reason)
663  	        if printResult:
664  	            print_to_stderr(msg)
665  	        return (2, msg)
666  	
667  	
668  	def __get_cookie_list(token):
669  	    """
670  	    Commandline options: no options
671  	    """
672  	    cookies = []
673  	    if token:
674  	        cookies.append("token=" + token)
675  	    if os.geteuid() == 0:
676  	        for name in ("CIB_user", "CIB_user_groups"):
677  	            if name in os.environ and os.environ[name].strip():
678  	                value = os.environ[name].strip()
679  	                # Let's be safe about characters in env variables and do base64.
680  	                # We cannot do it for CIB_user however to be backward compatible
681  	                # so we at least remove disallowed characters.
682  	                if name == "CIB_user":
683  	                    value = re.sub(r"[^!-~]", "", value).replace(";", "")
684  	                else:
685  	                    # python3 requires the value to be bytes not str
686  	                    value = base64.b64encode(value.encode("utf8")).decode(
687  	                        "utf-8"
688  	                    )
689  	                cookies.append("{0}={1}".format(name, value))
690  	    return cookies
691  	
692  	
693  	def get_corosync_conf_facade(conf_text=None):
694  	    """
695  	    Commandline options:
696  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
697  	        settings
698  	    """
699  	    try:
700  	        return corosync_conf_facade(
701  	            corosync_conf_parser.Parser.parse(
702  	                (getCorosyncConf() if conf_text is None else conf_text).encode(
703  	                    "utf-8"
704  	                )
705  	            )
706  	        )
707  	    except corosync_conf_parser.CorosyncConfParserException as e:
708  	        return err("Unable to parse corosync.conf: %s" % e)
709  	
710  	
711  	def getNodeAttributesFromPacemaker():
712  	    """
713  	    Commandline options: no options
714  	    """
715  	    try:
716  	        return [
717  	            node.attrs
718  	            for node in ClusterState(
719  	                get_cluster_status_dom(cmd_runner())
720  	            ).node_section.nodes
721  	        ]
722  	    except LibraryError as e:
723  	        return process_library_reports(e.args)
724  	
725  	
726  	def hasCorosyncConf():
727  	    """
728  	    Commandline options:
729  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
730  	        settings
731  	    """
732  	    return os.path.isfile(settings.corosync_conf_file)
733  	
734  	
735  	def getCorosyncConf():
736  	    """
737  	    Commandline options:
738  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
739  	        settings
740  	    """
741  	    corosync_conf_content = None
742  	    try:
743  	        with open(
744  	            settings.corosync_conf_file, "r", encoding="utf-8"
745  	        ) as corosync_conf_file:
746  	            corosync_conf_content = corosync_conf_file.read()
747  	    except IOError as e:
748  	        err("Unable to read %s: %s" % (settings.corosync_conf_file, e.strerror))
749  	    return corosync_conf_content
750  	
751  	
752  	def reloadCorosync():
753  	    """
754  	    Commandline options: no options
755  	    """
756  	    output, retval = run(["corosync-cfgtool", "-R"])
757  	    return output, retval
758  	
759  	
760  	def getCorosyncActiveNodes():
761  	    """
762  	    Commandline options: no options
763  	    """
764  	    output, retval = run(["corosync-cmapctl"])
765  	    if retval != 0:
766  	        return []
767  	
768  	    nodename_re = re.compile(r"^nodelist\.node\.(\d+)\.name .*= (.*)", re.M)
769  	    nodestatus_re = re.compile(
770  	        r"^runtime\.members\.(\d+).status .*= (.*)", re.M
771  	    )
772  	    nodenameid_mapping_re = re.compile(
773  	        r"nodelist\.node\.(\d+)\.nodeid .*= (\d+)", re.M
774  	    )
775  	
776  	    node_names = nodename_re.findall(output)
777  	
778  	    index_to_id = dict(nodenameid_mapping_re.findall(output))
779  	    id_to_status = dict(nodestatus_re.findall(output))
780  	
781  	    node_status = {}
782  	    for index, node_name in node_names:
783  	        if index in index_to_id:
784  	            nodeid = index_to_id[index]
785  	            if nodeid in id_to_status:
786  	                node_status[node_name] = id_to_status[nodeid]
787  	        else:
788  	            print_to_stderr(f"Error mapping {node_name}")
789  	
790  	    nodes_active = []
791  	    for node, status in node_status.items():
792  	        if status == "joined":
793  	            nodes_active.append(node)
794  	
795  	    return nodes_active
796  	
797  	
798  	# is it needed to handle corosync-qdevice service when managing cluster services
799  	def need_to_handle_qdevice_service():
800  	    """
801  	    Commandline options: no options
802  	      * --corosync_conf - path to a mocked corosync.conf is set directly to
803  	        settings but it doesn't make sense for contexts in which this function
804  	        is used
805  	    """
806  	    try:
807  	        with open(settings.corosync_conf_file, "rb") as corosync_conf_file:
808  	            return (
809  	                corosync_conf_facade(
810  	                    corosync_conf_parser.Parser.parse(corosync_conf_file.read())
811  	                ).get_quorum_device_model()
812  	                is not None
813  	            )
814  	    except (EnvironmentError, corosync_conf_parser.CorosyncConfParserException):
815  	        # corosync.conf not present or not valid => no qdevice specified
816  	        return False
817  	
818  	
819  	# Restore default behavior before starting subprocesses
820  	def subprocess_setup():
821  	    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
822  	
823  	
824  	def touch_cib_file(cib_filename):
825  	    if not os.path.isfile(cib_filename):
826  	        try:
827  	            write_empty_cib(cib_filename)
828  	        except EnvironmentError as e:
829  	            err(
830  	                "Unable to write to file: '{0}': '{1}'".format(
831  	                    cib_filename, str(e)
832  	                )
833  	            )
834  	
835  	
836  	# Run command, with environment and return (output, retval)
837  	# DEPRECATED, please use lib.external.CommandRunner via utils.cmd_runner()
838  	def run(
839  	    args,
840  	    ignore_stderr=False,
841  	    string_for_stdin=None,
842  	    env_extend=None,
843  	    binary_output=False,
844  	):
845  	    """
846  	    Commandline options:
847  	      * -f - CIB file (effective only for some pacemaker tools)
848  	      * --debug
849  	    """
850  	    if not env_extend:
851  	        env_extend = {}
852  	    env_var = env_extend
853  	    env_var.update(dict(os.environ))
854  	    env_var["LC_ALL"] = "C"
855  	    if usefile:
856  	        env_var["CIB_file"] = filename
857  	        touch_cib_file(filename)
858  	
859  	    command = args[0]
860  	    if command[0:3] == "crm" or command in [
861  	        "cibadmin",
862  	        "iso8601",
863  	        "stonith_admin",
864  	    ]:
865  	        args[0] = os.path.join(settings.pacemaker_execs, command)
866  	    elif command[0:8] == "corosync":
867  	        args[0] = os.path.join(settings.corosync_execs, command)
868  	
869  	    try:
870  	        if "--debug" in pcs_options:
871  	            print_to_stderr("Running: " + " ".join(args))
872  	            if string_for_stdin:
873  	                print_to_stderr(
874  	                    f"--Debug Input Start--\n"
875  	                    f"{string_for_stdin}\n"
876  	                    f"--Debug Input End--"
877  	                )
878  	
879  	        # Some commands react differently if you give them anything via stdin
880  	        if string_for_stdin is not None:
881  	            stdin_pipe = subprocess.PIPE
882  	        else:
883  	            stdin_pipe = subprocess.DEVNULL
884  	
885  	        # pylint: disable=subprocess-popen-preexec-fn, consider-using-with
886  	        p = subprocess.Popen(
887  	            args,
888  	            stdin=stdin_pipe,
889  	            stdout=subprocess.PIPE,
890  	            stderr=(subprocess.PIPE if ignore_stderr else subprocess.STDOUT),
891  	            preexec_fn=subprocess_setup,  # noqa: PLW1509
892  	            close_fds=True,
893  	            env=env_var,
894  	            # decodes newlines and in python3 also converts bytes to str
895  	            universal_newlines=(not binary_output),
896  	        )
897  	        output, dummy_stderror = p.communicate(string_for_stdin)
898  	        retval = p.returncode
899  	        if "--debug" in pcs_options:
900  	            print_to_stderr(
901  	                "Return Value: {retval}\n"
902  	                "--Debug Output Start--\n"
903  	                "{debug_output}\n"
904  	                "--Debug Output End--".format(
905  	                    retval=retval,
906  	                    debug_output=output.rstrip(),
907  	                )
908  	            )
909  	    except OSError as e:
910  	        print_to_stderr(e.strerror)
911  	        err("unable to locate command: " + args[0])
912  	
913  	    return output, retval
914  	
915  	
916  	def cmd_runner(cib_file_override=None):
917  	    """
918  	    Commandline options:
919  	      * -f - CIB file
920  	    """
921  	    env_vars = {}
922  	    if usefile:
923  	        env_vars["CIB_file"] = filename
924  	    if cib_file_override:
925  	        env_vars["CIB_file"] = cib_file_override
926  	    env_vars.update(os.environ)
927  	    env_vars["LC_ALL"] = "C"
928  	    return CommandRunner(
929  	        logging.getLogger("pcs"), get_report_processor(), env_vars
930  	    )
931  	
932  	
933  	def run_pcsdcli(command, data=None):
934  	    """
935  	    Commandline options:
936  	      * --request-timeout - timeout for HTTP request, applicable for commands:
937  	        * remove_known_hosts - only when running on cluster node (sync will
938  	            be initiated)
939  	        * auth
940  	        * send_local_configs
941  	    """
942  	    if not data:
943  	        data = {}
944  	    env_var = {}
945  	    if "--debug" in pcs_options:
946  	        env_var["PCSD_DEBUG"] = "true"
947  	    if "--request-timeout" in pcs_options:
948  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(pcs_options["--request-timeout"])
949  	    else:
950  	        env_var["PCSD_NETWORK_TIMEOUT"] = str(settings.default_request_timeout)
951  	    pcsd_dir_path = settings.pcsd_exec_location
952  	    pcsdcli_path = os.path.join(pcsd_dir_path, "pcsd-cli.rb")
953  	    if settings.pcsd_gem_path is not None:
954  	        env_var["GEM_HOME"] = settings.pcsd_gem_path
955  	    stdout, dummy_stderr, retval = cmd_runner().run(
956  	        [settings.ruby_exec, "-I" + pcsd_dir_path, pcsdcli_path, command],
957  	        json.dumps(data),
958  	        env_var,
959  	    )
960  	    try:
961  	        output_json = json.loads(stdout)
962  	        for key in ["status", "text", "data"]:
963  	            if key not in output_json:
964  	                output_json[key] = None
965  	
966  	        output = "".join(output_json["log"])
967  	        # check if some requests timed out, if so print message about it
968  	        if "error: operation_timedout" in output:
969  	            print_to_stderr("Error: Operation timed out")
970  	        # check if there are any connection failures due to proxy in pcsd and
971  	        # print warning if so
972  	        proxy_msg = "Proxy is set in environment variables, try disabling it"
973  	        if proxy_msg in output:
974  	            reports_output.warn(proxy_msg)
975  	
976  	    except ValueError:
977  	        output_json = {
978  	            "status": "bad_json_output",
979  	            "text": stdout,
980  	            "data": None,
981  	        }
982  	    return output_json, retval
983  	
984  	
985  	def auth_hosts_token(host_dict):
986  	    output, retval = run_pcsdcli("auth_with_token", dict(nodes=host_dict))
987  	    if retval == 0:
988  	        if output["status"] == "access_denied":
989  	            err("Access denied")
990  	        if output["status"] != "ok":
991  	            err("Unable to communicate with pcsd")
992  	    else:
993  	        err("Unable to communicate with pcsd")
994  	
995  	
996  	def auth_hosts(host_dict):
997  	    """
998  	    Commandline options:
999  	      * --request-timeout - timeout for HTTP request
1000 	    """
1001 	    output, retval = run_pcsdcli("auth", dict(nodes=host_dict))
1002 	    if retval == 0 and output["status"] == "access_denied":
1003 	        err("Access denied")
1004 	    if retval == 0 and output["status"] == "ok" and output["data"]:
1005 	        failed = False
1006 	        try:
1007 	            if not output["data"]["sync_successful"]:
1008 	                err(
1009 	                    "Some nodes had a newer known-hosts than the local node. "
1010 	                    + "Local node's known-hosts were updated. "
1011 	                    + "Please repeat the authentication if needed."
1012 	                )
1013 	            for node, result in output["data"]["auth_responses"].items():
1014 	                if result["status"] == "ok":
1015 	                    print_to_stderr(f"{node}: Authorized")
1016 	                elif result["status"] == "bad_password":
1017 	                    err(f"{node}: Username and/or password is incorrect", False)
1018 	                    failed = True
1019 	                elif result["status"] in ("noresponse", "error"):
1020 	                    err("Unable to communicate with {0}".format(node), False)
1021 	                    failed = True
1022 	                else:
1023 	                    err("Unexpected response from {0}".format(node), False)
1024 	                    failed = True
1025 	            if output["data"]["sync_nodes_err"]:
1026 	                err(
1027 	                    (
1028 	                        "Unable to synchronize and save known-hosts on nodes: "
1029 	                        + "{0}. Run 'pcs host auth {1}' to make sure the nodes "
1030 	                        + "are authorized."
1031 	                    ).format(
1032 	                        ", ".join(output["data"]["sync_nodes_err"]),
1033 	                        " ".join(output["data"]["sync_nodes_err"]),
1034 	                    )
1035 	                )
1036 	        except (ValueError, KeyError):
1037 	            err("Unable to communicate with pcsd")
1038 	        if failed:
1039 	            sys.exit(1)
1040 	        return
1041 	    err("Unable to communicate with pcsd")
1042 	
1043 	
1044 	def call_local_pcsd(argv, options, std_in=None):  # noqa: PLR0911
1045 	    """
1046 	    Commandline options:
1047 	      * --request-timeout - timeout of call to local pcsd
1048 	    """
1049 	    # pylint: disable=too-many-return-statements
1050 	    # some commands cannot be run under a non-root account
1051 	    # so we pass those commands to locally running pcsd to execute them
1052 	    # returns [list_of_errors, exit_code, stdout, stderr]
1053 	    data = {
1054 	        "command": json.dumps(argv),
1055 	        "options": json.dumps(options),
1056 	    }
1057 	    if std_in:
1058 	        data["stdin"] = std_in
1059 	    data_send = urlencode(data)
1060 	    code, output = sendHTTPRequest(
1061 	        "localhost", "run_pcs", data_send, False, False
1062 	    )
1063 	
1064 	    if code == 3:  # not authenticated
1065 	        return [
1066 	            [
1067 	                "Unable to authenticate against the local pcsd. Run the same "
1068 	                "command as root or authenticate yourself to the local pcsd "
1069 	                "using command 'pcs client local-auth'"
1070 	            ],
1071 	            1,
1072 	            "",
1073 	            "",
1074 	        ]
1075 	    if code != 0:  # http error connecting to localhost
1076 	        return [[output], 1, "", ""]
1077 	
1078 	    try:
1079 	        output_json = json.loads(output)
1080 	        for key in ["status", "data"]:
1081 	            if key not in output_json:
1082 	                output_json[key] = None
1083 	    except ValueError:
1084 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1085 	    if output_json["status"] == "bad_command":
1086 	        return [["Command not allowed"], 1, "", ""]
1087 	    if output_json["status"] == "access_denied":
1088 	        return [["Access denied"], 1, "", ""]
1089 	    if output_json["status"] != "ok" or not output_json["data"]:
1090 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1091 	    try:
1092 	        exitcode = output_json["data"]["code"]
1093 	        std_out = output_json["data"]["stdout"]
1094 	        std_err = output_json["data"]["stderr"]
1095 	        return [[], exitcode, std_out, std_err]
1096 	    except KeyError:
1097 	        return [["Unable to communicate with pcsd"], 1, "", ""]
1098 	
1099 	
1100 	def map_for_error_list(callab, iterab):
1101 	    """
1102 	    Commandline options: no options
1103 	    NOTE: callback 'callab' may use some options
1104 	    """
1105 	    error_list = []
1106 	    for item in iterab:
1107 	        retval, error = callab(item)
1108 	        if retval != 0:
1109 	            error_list.append(error)
1110 	    return error_list
1111 	
1112 	
1113 	def run_parallel(worker_list, wait_seconds=1):
1114 	    """
1115 	    Commandline options: no options
1116 	    """
1117 	    thread_list = set()
1118 	    for worker in worker_list:
1119 	        thread = threading.Thread(target=worker)
1120 	        thread.daemon = True
1121 	        thread.start()
1122 	        thread_list.add(thread)
1123 	
1124 	    while thread_list:
1125 	        thread = thread_list.pop()
1126 	        thread.join(wait_seconds)
1127 	        if thread.is_alive():
1128 	            thread_list.add(thread)
1129 	
1130 	
1131 	def create_task(report, action, node, *args, **kwargs):
1132 	    """
1133 	    Commandline options: no options
1134 	    """
1135 	
1136 	    def worker():
1137 	        returncode, output = action(node, *args, **kwargs)
1138 	        report(node, returncode, output)
1139 	
1140 	    return worker
1141 	
1142 	
1143 	def create_task_list(report, action, node_list, *args, **kwargs):
1144 	    """
1145 	    Commandline options: no options
1146 	    """
1147 	    return [
1148 	        create_task(report, action, node, *args, **kwargs) for node in node_list
1149 	    ]
1150 	
1151 	
1152 	def parallel_for_nodes(action, node_list, *args, **kwargs):
1153 	    """
1154 	    Commandline options: no options
1155 	    NOTE: callback 'action' may use some cmd options
1156 	    """
1157 	    node_errors = {}
1158 	
1159 	    def report(node, returncode, output):
1160 	        message = "{0}: {1}".format(node, output.strip())
1161 	        print_to_stderr(message)
1162 	        if returncode != 0:
1163 	            node_errors[node] = message
1164 	
1165 	    run_parallel(create_task_list(report, action, node_list, *args, **kwargs))
1166 	    return node_errors
1167 	
1168 	
1169 	def get_group_children(group_id):
1170 	    """
1171 	    Commandline options: no options
1172 	    """
1173 	    return dom_get_group_children(get_cib_dom(), group_id)
1174 	
1175 	
1176 	def dom_get_group_children(dom, group_id):
1177 	    groups = dom.getElementsByTagName("group")
1178 	    for g in groups:
1179 	        if g.getAttribute("id") == group_id:
1180 	            return [
1181 	                child_el.getAttribute("id")
1182 	                for child_el in get_group_children_el_from_el(g)
1183 	            ]
1184 	    return []
1185 	
1186 	
1187 	def get_group_children_el_from_el(group_el):
1188 	    child_resources = []
1189 	    for child in group_el.childNodes:
1190 	        if child.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1191 	            continue
1192 	        if child.tagName == "primitive":
1193 	            child_resources.append(child)
1194 	    return child_resources
1195 	
1196 	
1197 	def dom_get_clone_ms_resource(dom, clone_ms_id):
1198 	    """
1199 	    Commandline options: no options
1200 	    """
1201 	    clone_ms = dom_get_clone(dom, clone_ms_id) or dom_get_master(
1202 	        dom, clone_ms_id
1203 	    )
1204 	    if clone_ms:
1205 	        return dom_elem_get_clone_ms_resource(clone_ms)
1206 	    return None
1207 	
1208 	
1209 	def dom_elem_get_clone_ms_resource(clone_ms):
1210 	    """
1211 	    Commandline options: no options
1212 	    """
1213 	    for child in clone_ms.childNodes:
1214 	        if (
1215 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1216 	            and child.tagName in ["group", "primitive"]
1217 	        ):
1218 	            return child
1219 	    return None
1220 	
1221 	
1222 	def dom_get_resource_clone_ms_parent(dom, resource_id):
1223 	    """
1224 	    Commandline options: no options
1225 	    """
1226 	    resource = dom_get_resource(dom, resource_id) or dom_get_group(
1227 	        dom, resource_id
1228 	    )
1229 	    if resource:
1230 	        return dom_get_parent_by_tag_names(resource, ["clone", "master"])
1231 	    return None
1232 	
1233 	
1234 	def dom_get_resource_bundle_parent(dom, resource_id):
1235 	    """
1236 	    Commandline options: no options
1237 	    """
1238 	    resource = dom_get_resource(dom, resource_id)
1239 	    if resource:
1240 	        return dom_get_parent_by_tag_names(resource, ["bundle"])
1241 	    return None
1242 	
1243 	
1244 	def dom_get_master(dom, master_id):
1245 	    """
1246 	    Commandline options: no options
1247 	    """
1248 	    for master in dom.getElementsByTagName("master"):
1249 	        if master.getAttribute("id") == master_id:
1250 	            return master
1251 	    return None
1252 	
1253 	
1254 	def dom_get_clone(dom, clone_id):
1255 	    """
1256 	    Commandline options: no options
1257 	    """
1258 	    for clone in dom.getElementsByTagName("clone"):
1259 	        if clone.getAttribute("id") == clone_id:
1260 	            return clone
1261 	    return None
1262 	
1263 	
1264 	def dom_get_group(dom, group_id):
1265 	    """
1266 	    Commandline options: no options
1267 	    """
1268 	    for group in dom.getElementsByTagName("group"):
1269 	        if group.getAttribute("id") == group_id:
1270 	            return group
1271 	    return None
1272 	
1273 	
1274 	def dom_get_bundle(dom, bundle_id):
1275 	    """
1276 	    Commandline options: no options
1277 	    """
1278 	    for bundle in dom.getElementsByTagName("bundle"):
1279 	        if bundle.getAttribute("id") == bundle_id:
1280 	            return bundle
1281 	    return None
1282 	
1283 	
1284 	def dom_get_resource_bundle(bundle_el):
1285 	    """
1286 	    Commandline options: no options
1287 	    """
1288 	    for child in bundle_el.childNodes:
1289 	        if (
1290 	            child.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1291 	            and child.tagName == "primitive"
1292 	        ):
1293 	            return child
1294 	    return None
1295 	
1296 	
1297 	def dom_get_group_clone(dom, group_id):
1298 	    """
1299 	    Commandline options: no options
1300 	    """
1301 	    for clone in dom.getElementsByTagName("clone"):
1302 	        group = dom_get_group(clone, group_id)
1303 	        if group:
1304 	            return group
1305 	    return None
1306 	
1307 	
1308 	def dom_get_group_masterslave(dom, group_id):
1309 	    """
1310 	    Commandline options: no options
1311 	    """
1312 	    for master in dom.getElementsByTagName("master"):
1313 	        group = dom_get_group(master, group_id)
1314 	        if group:
1315 	            return group
1316 	    return None
1317 	
1318 	
1319 	def dom_get_resource(dom, resource_id):
1320 	    """
1321 	    Commandline options: no options
1322 	    """
1323 	    for primitive in dom.getElementsByTagName("primitive"):
1324 	        if primitive.getAttribute("id") == resource_id:
1325 	            return primitive
1326 	    return None
1327 	
1328 	
1329 	def dom_get_any_resource(dom, resource_id):
1330 	    """
1331 	    Commandline options: no options
1332 	    """
1333 	    return (
1334 	        dom_get_resource(dom, resource_id)
1335 	        or dom_get_group(dom, resource_id)
1336 	        or dom_get_clone(dom, resource_id)
1337 	        or dom_get_master(dom, resource_id)
1338 	    )
1339 	
1340 	
1341 	def dom_get_resource_clone(dom, resource_id):
1342 	    """
1343 	    Commandline options: no options
1344 	    """
1345 	    for clone in dom.getElementsByTagName("clone"):
1346 	        resource = dom_get_resource(clone, resource_id)
1347 	        if resource:
1348 	            return resource
1349 	    return None
1350 	
1351 	
1352 	def dom_get_resource_masterslave(dom, resource_id):
1353 	    """
1354 	    Commandline options: no options
1355 	    """
1356 	    for master in dom.getElementsByTagName("master"):
1357 	        resource = dom_get_resource(master, resource_id)
1358 	        if resource:
1359 	            return resource
1360 	    return None
1361 	
1362 	
1363 	# returns tuple (is_valid, error_message, correct_resource_id_if_exists)
1364 	# there is a duplicate code in pcs/lib/cib/constraint/constraint.py
1365 	# please use function in pcs/lib/cib/constraint/constraint.py
1366 	def validate_constraint_resource(dom, resource_id):  # noqa: PLR0911
1367 	    """
1368 	    Commandline options:
1369 	      * --force - allow constraint on any resource
1370 	    """
1371 	    # pylint: disable=too-many-return-statements
1372 	    resource_el = (
1373 	        dom_get_clone(dom, resource_id)
1374 	        or dom_get_master(dom, resource_id)
1375 	        or dom_get_bundle(dom, resource_id)
1376 	    )
1377 	    if resource_el:
1378 	        # clones, masters and bundles are always valid
1379 	        return True, "", resource_id
1380 	
1381 	    resource_el = dom_get_resource(dom, resource_id) or dom_get_group(
1382 	        dom, resource_id
1383 	    )
1384 	    if not resource_el:
1385 	        return False, "Resource '%s' does not exist" % resource_id, None
1386 	
1387 	    clone_el = dom_get_resource_clone_ms_parent(
1388 	        dom, resource_id
1389 	    ) or dom_get_resource_bundle_parent(dom, resource_id)
1390 	    if not clone_el:
1391 	        # a primitive and a group is valid if not in a clone nor a master nor a
1392 	        # bundle
1393 	        return True, "", resource_id
1394 	
1395 	    if "--force" in pcs_options:
1396 	        return True, "", clone_el.getAttribute("id")
1397 	
1398 	    if clone_el.tagName in ["clone", "master"]:
1399 	        return (
1400 	            False,
1401 	            "%s is a clone resource, you should use the clone id: %s "
1402 	            "when adding constraints. Use --force to override."
1403 	            % (resource_id, clone_el.getAttribute("id")),
1404 	            clone_el.getAttribute("id"),
1405 	        )
1406 	    if clone_el.tagName == "bundle":
1407 	        return (
1408 	            False,
1409 	            "%s is a bundle resource, you should use the bundle id: %s "
1410 	            "when adding constraints. Use --force to override."
1411 	            % (resource_id, clone_el.getAttribute("id")),
1412 	            clone_el.getAttribute("id"),
1413 	        )
1414 	    return True, "", resource_id
1415 	
1416 	
1417 	def validate_resources_not_in_same_group(dom, resource_id1, resource_id2):
1418 	    resource_el1 = dom_get_resource(dom, resource_id1)
1419 	    resource_el2 = dom_get_resource(dom, resource_id2)
1420 	    if not resource_el1 or not resource_el2:
1421 	        # Only primitive resources can be in a group. If at least one of the
1422 	        # resources is not a primitive (resource_el is None), then the
1423 	        # resources are not in the same group.
1424 	        return True
1425 	    group1 = dom_get_parent_by_tag_names(resource_el1, ["group"])
1426 	    group2 = dom_get_parent_by_tag_names(resource_el2, ["group"])
1427 	    if not group1 or not group2:
1428 	        return True
1429 	    return group1 != group2
1430 	
1431 	
1432 	def dom_get_resource_remote_node_name(dom_resource):
1433 	    """
1434 	    Commandline options: no options
1435 	    """
1436 	    if dom_resource.tagName != "primitive":
1437 	        return None
1438 	    if (
1439 	        dom_resource.getAttribute("class").lower() == "ocf"
1440 	        and dom_resource.getAttribute("provider").lower() == "pacemaker"
1441 	        and dom_resource.getAttribute("type").lower() == "remote"
1442 	    ):
1443 	        return dom_resource.getAttribute("id")
1444 	    return dom_get_meta_attr_value(dom_resource, "remote-node")
1445 	
1446 	
1447 	def dom_get_meta_attr_value(dom_resource, meta_name):
1448 	    """
1449 	    Commandline options: no options
1450 	    """
1451 	    for meta in dom_resource.getElementsByTagName("meta_attributes"):
1452 	        for nvpair in meta.getElementsByTagName("nvpair"):
1453 	            if nvpair.getAttribute("name") == meta_name:
1454 	                return nvpair.getAttribute("value")
1455 	    return None
1456 	
1457 	
1458 	def dom_get_element_with_id(dom, tag_name, element_id):
1459 	    """
1460 	    Commandline options: no options
1461 	    """
1462 	    for elem in dom.getElementsByTagName(tag_name):
1463 	        if elem.hasAttribute("id") and elem.getAttribute("id") == element_id:
1464 	            return elem
1465 	    return None
1466 	
1467 	
1468 	def dom_get_node(dom, node_name):
1469 	    """
1470 	    Commandline options: no options
1471 	    """
1472 	    for e in dom.getElementsByTagName("node"):
1473 	        if e.hasAttribute("uname") and e.getAttribute("uname") == node_name:
1474 	            return e
1475 	    return None
1476 	
1477 	
1478 	def dom_get_children_by_tag_name(dom_el, tag_name):
1479 	    """
1480 	    Commandline options: no options
1481 	    """
1482 	    return [
1483 	        node
1484 	        for node in dom_el.childNodes
1485 	        if node.nodeType == xml.dom.minidom.Node.ELEMENT_NODE
1486 	        and node.tagName == tag_name
1487 	    ]
1488 	
1489 	
1490 	def dom_get_parent_by_tag_names(dom_el, tag_names):
1491 	    """
1492 	    Commandline options: no options
1493 	    """
1494 	    parent = dom_el.parentNode
1495 	    while parent:
1496 	        if not isinstance(parent, xml.dom.minidom.Element):
1497 	            return None
1498 	        if parent.tagName in tag_names:
1499 	            return parent
1500 	        parent = parent.parentNode
1501 	    return None
1502 	
1503 	
1504 	def dom_attrs_to_list(dom_el, with_id=False):
1505 	    """
1506 	    Commandline options: no options
1507 	    """
1508 	    attributes = [
1509 	        "%s=%s"
1510 	        % (
1511 	            name,
1512 	            (
1513 	                value
1514 	                if name != "role"
1515 	                else common_pacemaker.role.get_value_primary(value.capitalize())
1516 	            ),
1517 	        )
1518 	        for name, value in sorted(dom_el.attributes.items())
1519 	        if name != "id"
1520 	    ]
1521 	    if with_id:
1522 	        attributes.append("(id:%s)" % (dom_el.getAttribute("id")))
1523 	    return attributes
1524 	
1525 	
1526 	# moved to pcs.lib.pacemaker.state
1527 	def get_resource_for_running_check(cluster_state, resource_id, stopped=False):
1528 	    """
1529 	    Commandline options: no options
1530 	    """
1531 	
1532 	    def _isnum(value):
1533 	        return all(char in list("0123456789") for char in value)
1534 	
1535 	    # pylint: disable=too-many-nested-blocks
1536 	    for clone in cluster_state.getElementsByTagName("clone"):
1537 	        if clone.getAttribute("id") == resource_id:
1538 	            for child in clone.childNodes:
1539 	                if child.nodeType == child.ELEMENT_NODE and child.tagName in [
1540 	                    "resource",
1541 	                    "group",
1542 	                ]:
1543 	                    resource_id = child.getAttribute("id")
1544 	                    # in a clone, a resource can have an id of '<name>:N'
1545 	                    if ":" in resource_id:
1546 	                        parts = resource_id.rsplit(":", 1)
1547 	                        if _isnum(parts[1]):
1548 	                            resource_id = parts[0]
1549 	                    break
1550 	    for group in cluster_state.getElementsByTagName("group"):
1551 	        # If resource is a clone it can have an id of '<resource name>:N'
1552 	        if group.getAttribute("id") == resource_id or group.getAttribute(
1553 	            "id"
1554 	        ).startswith(resource_id + ":"):
1555 	            if stopped:
1556 	                elem = group.getElementsByTagName("resource")[0]
1557 	            else:
1558 	                elem = group.getElementsByTagName("resource")[-1]
1559 	            resource_id = elem.getAttribute("id")
1560 	    return resource_id
1561 	
1562 	
1563 	# moved to pcs.lib.pacemaker.state
1564 	# see pcs.lib.commands.resource for usage
1565 	def resource_running_on(resource, passed_state=None, stopped=False):
1566 	    """
1567 	    Commandline options:
1568 	      * -f - has effect but doesn't make sense to check state of resource
1569 	    """
1570 	    # pylint: disable=too-many-locals
1571 	    nodes_started = []
1572 	    nodes_promoted = []
1573 	    nodes_unpromoted = []
1574 	    state = passed_state if passed_state else getClusterState()
1575 	    resource_original = resource
1576 	    resource = get_resource_for_running_check(state, resource, stopped)
1577 	    resources = state.getElementsByTagName("resource")
1578 	    for res in resources:
1579 	        # If resource is a clone it can have an id of '<resource name>:N'
1580 	        # If resource is a clone it will be found more than once - cannot break
1581 	        if (
1582 	            res.getAttribute("id") == resource
1583 	            or res.getAttribute("id").startswith(resource + ":")
1584 	        ) and res.getAttribute("failed") != "true":
1585 	            for node in res.getElementsByTagName("node"):
1586 	                node_name = node.getAttribute("name")
1587 	                role = res.getAttribute("role")
1588 	                if role == const.PCMK_ROLE_STARTED:
1589 	                    nodes_started.append(node_name)
1590 	                elif role in const.PCMK_ROLES_PROMOTED:
1591 	                    nodes_promoted.append(node_name)
1592 	                elif role in const.PCMK_ROLES_UNPROMOTED:
1593 	                    nodes_unpromoted.append(node_name)
1594 	    if not nodes_started and not nodes_promoted and not nodes_unpromoted:
1595 	        message = "Resource '%s' is not running on any node" % resource_original
1596 	    else:
1597 	        message_parts = []
1598 	        for alist, label in (
1599 	            (nodes_started, "running"),
1600 	            (nodes_promoted, str(const.PCMK_ROLE_PROMOTED_PRIMARY).lower()),
1601 	            (nodes_unpromoted, str(const.PCMK_ROLE_UNPROMOTED_PRIMARY).lower()),
1602 	        ):
1603 	            if alist:
1604 	                alist.sort()
1605 	                message_parts.append(
1606 	                    "%s on node%s %s"
1607 	                    % (label, "s" if len(alist) > 1 else "", ", ".join(alist))
1608 	                )
1609 	        message = "Resource '%s' is %s." % (
1610 	            resource_original,
1611 	            "; ".join(message_parts),
1612 	        )
1613 	    return {
1614 	        "message": message,
1615 	        "is_running": bool(nodes_started or nodes_promoted or nodes_unpromoted),
1616 	    }
1617 	
1618 	
1619 	def validate_wait_get_timeout(need_cib_support=True):
1620 	    """
1621 	    Commandline options:
1622 	      * --wait
1623 	      * -f - to check if -f and --wait are not used simultaneously
1624 	    """
1625 	    if need_cib_support and usefile:
1626 	        err("Cannot use '-f' together with '--wait'")
1627 	    wait_timeout = pcs_options["--wait"]
1628 	    if wait_timeout is None:
1629 	        return wait_timeout
1630 	    wait_timeout = timeout_to_seconds(wait_timeout)
1631 	    if wait_timeout is None:
1632 	        err(
1633 	            "%s is not a valid number of seconds to wait"
1634 	            % pcs_options["--wait"]
1635 	        )
1636 	    return wait_timeout
1637 	
1638 	
1639 	# Return matches from the CIB with the xpath_query
1640 	def get_cib_xpath(xpath_query):
1641 	    """
1642 	    Commandline options:
1643 	      * -f - CIB file
1644 	    """
1645 	    args = ["cibadmin", "-Q", "--xpath", xpath_query]
1646 	    output, retval = run(args)
1647 	    if retval != 0:
1648 	        return ""
1649 	    return output
1650 	
1651 	
1652 	def get_cib(scope=None):
1653 	    """
1654 	    Commandline options:
1655 	      * -f - CIB file
1656 	    """
1657 	    command = ["cibadmin", "-l", "-Q"]
1658 	    if scope:
1659 	        command.append("--scope=%s" % scope)
1660 	    output, retval = run(command)
1661 	    if retval != 0:
1662 	        if retval == 105 and scope:
1663 	            err("unable to get cib, scope '%s' not present in cib" % scope)
1664 	        else:
1665 	            err("unable to get cib")
1666 	    return output
1667 	
1668 	
1669 	def get_cib_dom(cib_xml=None):
1670 	    """
1671 	    Commandline options:
1672 	      * -f - CIB file
1673 	    """
1674 	    if cib_xml is None:
1675 	        cib_xml = get_cib()
1676 	    try:
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
1677 	        return parseString(cib_xml)
1678 	    except xml.parsers.expat.ExpatError:
1679 	        return err("unable to get cib")
1680 	
1681 	
1682 	def get_cib_etree(cib_xml=None):
1683 	    """
1684 	    Commandline options:
1685 	      * -f - CIB file
1686 	    """
1687 	    if cib_xml is None:
1688 	        cib_xml = get_cib()
1689 	    try:
1690 	        return ET.fromstring(cib_xml)
1691 	    except xml.etree.ElementTree.ParseError:
1692 	        return err("unable to get cib")
1693 	
1694 	
1695 	def is_etree(var):
1696 	    """
1697 	    Commandline options: no options
1698 	    """
1699 	    return var.__class__ == xml.etree.ElementTree.Element
1700 	
1701 	
1702 	# Replace only configuration section of cib with dom passed
1703 	def replace_cib_configuration(dom):
1704 	    """
1705 	    Commandline options:
1706 	      * -f - CIB file
1707 	    """
1708 	    if is_etree(dom):
1709 	        # etree returns string in bytes: b'xml'
1710 	        # python 3 removed .encode() from byte strings
1711 	        # run(...) calls subprocess.Popen.communicate which calls encode...
1712 	        # so there is bytes to str conversion
1713 	        new_dom = ET.tostring(dom).decode()
1714 	    elif hasattr(dom, "toxml"):
1715 	        new_dom = dom.toxml()
1716 	    else:
1717 	        new_dom = dom
1718 	    cmd = ["cibadmin", "--replace", "-V", "--xml-pipe", "-o", "configuration"]
1719 	    output, retval = run(cmd, False, new_dom)
1720 	    if retval != 0:
1721 	        err("Unable to update cib\n" + output)
1722 	
1723 	
1724 	def is_valid_cib_scope(scope):
1725 	    """
1726 	    Commandline options: no options
1727 	    """
1728 	    return scope in [
1729 	        "acls",
1730 	        "alerts",
1731 	        "configuration",
1732 	        "constraints",
1733 	        "crm_config",
1734 	        "fencing-topology",
1735 	        "nodes",
1736 	        "op_defaults",
1737 	        "resources",
1738 	        "rsc_defaults",
1739 	        "tags",
1740 	    ]
1741 	
1742 	
1743 	# Checks to see if id exists in the xml dom passed
1744 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1745 	def does_id_exist(dom, check_id):  # noqa: PLR0912
1746 	    """
1747 	    Commandline options: no options
1748 	    """
1749 	    # do not search in /cib/status, it may contain references to previously
1750 	    # existing and deleted resources and thus preventing creating them again
1751 	    if is_etree(dom):
1752 	        for elem in dom.findall(
1753 	            str('(/cib/*[name()!="status"]|/*[name()!="cib"])/*')
1754 	        ):
1755 	            if elem.get("id") == check_id:
1756 	                return True
1757 	    else:
1758 	        document = (
1759 	            dom
1760 	            if isinstance(dom, xml.dom.minidom.Document)
1761 	            else dom.ownerDocument
1762 	        )
1763 	        cib_found = False
1764 	        for cib in dom_get_children_by_tag_name(document, "cib"):
1765 	            cib_found = True
1766 	            for section in cib.childNodes:
1767 	                if section.nodeType != xml.dom.minidom.Node.ELEMENT_NODE:
1768 	                    continue
1769 	                if section.tagName == "status":
1770 	                    continue
1771 	                for elem in section.getElementsByTagName("*"):
1772 	                    if elem.getAttribute("id") == check_id:
1773 	                        return True
1774 	        if not cib_found:
1775 	            for elem in document.getElementsByTagName("*"):
1776 	                if elem.getAttribute("id") == check_id:
1777 	                    return True
1778 	    return False
1779 	
1780 	
1781 	# Returns check_id if it doesn't exist in the dom, otherwise it adds an integer
1782 	# to the end of the id and increments it until a unique id is found
1783 	# DEPRECATED use lxml version available in pcs.lib.cib.tools
1784 	def find_unique_id(dom, check_id):
1785 	    """
1786 	    Commandline options: no options
1787 	    """
1788 	    counter = 1
1789 	    temp_id = check_id
1790 	    while does_id_exist(dom, temp_id):
1791 	        temp_id = check_id + "-" + str(counter)
1792 	        counter += 1
1793 	    return temp_id
1794 	
1795 	
1796 	# Checks to see if the specified operation already exists in passed set of
1797 	# operations
1798 	# pacemaker differentiates between operations only by name and interval
1799 	def operation_exists(operations_el, op_el):
1800 	    """
1801 	    Commandline options: no options
1802 	    """
1803 	    op_name = op_el.getAttribute("name")
1804 	    op_interval = timeout_to_seconds_legacy(op_el.getAttribute("interval"))
1805 	    return [
1806 	        op
1807 	        for op in operations_el.getElementsByTagName("op")
1808 	        if (
1809 	            op.getAttribute("name") == op_name
1810 	            and timeout_to_seconds_legacy(op.getAttribute("interval"))
1811 	            == op_interval
1812 	        )
1813 	    ]
1814 	
1815 	
1816 	def operation_exists_by_name(operations_el, op_el):
1817 	    """
1818 	    Commandline options: no options
1819 	    """
1820 	
1821 	    def get_role(_el, new_roles_supported):
1822 	        return common_pacemaker.role.get_value_for_cib(
1823 	            _el.getAttribute("role") or const.PCMK_ROLE_STARTED,
1824 	            new_roles_supported,
1825 	        )
1826 	
1827 	    new_roles_supported = isCibVersionSatisfied(
1828 	        operations_el, const.PCMK_NEW_ROLES_CIB_VERSION
1829 	    )
1830 	    existing = []
1831 	    op_name = op_el.getAttribute("name")
1832 	    op_role = get_role(op_el, new_roles_supported)
1833 	    ocf_check_level = None
1834 	    if op_name == "monitor":
1835 	        ocf_check_level = get_operation_ocf_check_level(op_el)
1836 	
1837 	    for op in operations_el.getElementsByTagName("op"):
1838 	        if op.getAttribute("name") == op_name:
1839 	            if (
1840 	                op_name != "monitor"
1841 	                or get_role(op, new_roles_supported) == op_role
1842 	                and ocf_check_level == get_operation_ocf_check_level(op)
1843 	            ):
1844 	                existing.append(op)
1845 	    return existing
1846 	
1847 	
1848 	def get_operation_ocf_check_level(operation_el):
1849 	    """
1850 	    Commandline options: no options
1851 	    """
1852 	    for attr_el in operation_el.getElementsByTagName("instance_attributes"):
1853 	        for nvpair_el in attr_el.getElementsByTagName("nvpair"):
1854 	            if (
1855 	                nvpair_el.getAttribute("name")
1856 	                == OCF_CHECK_LEVEL_INSTANCE_ATTRIBUTE_NAME
1857 	            ):
1858 	                return nvpair_el.getAttribute("value")
1859 	    return None
1860 	
1861 	
1862 	def set_node_attribute(prop, value, node):
1863 	    """
1864 	    Commandline options:
1865 	      * -f - CIB file
1866 	      * --force - no error if attribute to delete doesn't exist
1867 	    """
1868 	    if value == "":
1869 	        o, r = run(
1870 	            [
1871 	                "crm_attribute",
1872 	                "-t",
1873 	                "nodes",
1874 	                "--node",
1875 	                node,
1876 	                "--name",
1877 	                prop,
1878 	                "--query",
1879 	            ]
1880 	        )
1881 	        if r != 0 and "--force" not in pcs_options:
1882 	            err(
1883 	                "attribute: '%s' doesn't exist for node: '%s'" % (prop, node),
1884 	                False,
1885 	            )
1886 	            # This return code is used by pcsd
1887 	            sys.exit(2)
1888 	        o, r = run(
1889 	            [
1890 	                "crm_attribute",
1891 	                "-t",
1892 	                "nodes",
1893 	                "--node",
1894 	                node,
1895 	                "--name",
1896 	                prop,
1897 	                "--delete",
1898 	            ]
1899 	        )
1900 	    else:
1901 	        o, r = run(
1902 	            [
1903 	                "crm_attribute",
1904 	                "-t",
1905 	                "nodes",
1906 	                "--node",
1907 	                node,
1908 	                "--name",
1909 	                prop,
1910 	                "--update",
1911 	                value,
1912 	            ]
1913 	        )
1914 	
1915 	    if r != 0:
1916 	        err("unable to set attribute %s\n%s" % (prop, o))
1917 	
1918 	
1919 	def getTerminalSize(fd=1):
1920 	    """
1921 	    Returns height and width of current terminal. First tries to get
1922 	    size via termios.TIOCGWINSZ, then from environment. Defaults to 25
1923 	    lines x 80 columns if both methods fail.
1924 	
1925 	    :param fd: file descriptor (default: 1=stdout)
1926 	
1927 	    Commandline options: no options
1928 	    """
1929 	    try:
1930 	        hw = struct.unpack(
1931 	            str("hh"), fcntl.ioctl(fd, termios.TIOCGWINSZ, "1234")
1932 	        )
1933 	    except OSError:
1934 	        try:
1935 	            hw = (os.environ["LINES"], os.environ["COLUMNS"])
1936 	        except KeyError:
1937 	            hw = (25, 80)
1938 	    return hw
1939 	
1940 	
1941 	def get_terminal_input(message=None):
1942 	    """
1943 	    Commandline options: no options
1944 	    """
1945 	    if message:
1946 	        sys.stdout.write(message)
1947 	        sys.stdout.flush()
1948 	    try:
1949 	        return input("")
1950 	    except EOFError:
1951 	        return ""
1952 	    except KeyboardInterrupt:
1953 	        print("Interrupted")
1954 	        sys.exit(1)
1955 	
1956 	
1957 	def _get_continue_confirmation(warning_text: str) -> bool:
1958 	    """
1959 	    Warns user and asks for permission to continue. Returns True if user wishes
1960 	    to continue, False otherwise.
1961 	
1962 	    This function is mostly intended for prompting user to confirm destructive
1963 	    operations - that's why WARNING is in all caps here and user is asked to
1964 	    explicitly type 'yes' or 'y' to continue.
1965 	
1966 	    warning_text -- describes action that we want the user to confirm
1967 	    """
1968 	    print(f"WARNING: {warning_text}")
1969 	    print("Type 'yes' or 'y' to proceed, anything else to cancel: ", end="")
1970 	    response = get_terminal_input()
1971 	    if response in ["yes", "y"]:
1972 	        return True
1973 	    print("Canceled")
1974 	    return False
1975 	
1976 	
1977 	def is_run_interactive() -> bool:
1978 	    """
1979 	    Return True if pcs is running in an interactive environment, False otherwise
1980 	    """
1981 	    return (
1982 	        sys.stdin is not None
1983 	        and sys.stdout is not None
1984 	        and sys.stdin.isatty()
1985 	        and sys.stdout.isatty()
1986 	    )
1987 	
1988 	
1989 	def get_continue_confirmation_or_force(warning_text: str, force: bool) -> bool:
1990 	    """
1991 	    Either asks user to confirm continuation interactively or use --force to
1992 	    override when running from a script. Returns True if user wants to continue.
1993 	    Returns False if user cancels the action. If a non-interactive environment
1994 	    is detected, pcs exits with an error formed from warning_text.
1995 	
1996 	    warning_text -- describes action that we want the user to confirm
1997 	    force -- was force flag provided?
1998 	    """
1999 	    if force:
2000 	        reports_output.warn(warning_text)
2001 	        return True
2002 	    if not is_run_interactive():
2003 	        err(f"{warning_text}, use --force to override")
2004 	        return False
2005 	    return _get_continue_confirmation(warning_text)
2006 	
2007 	
2008 	def get_terminal_password(message="Password: "):
2009 	    """
2010 	    Commandline options: no options
2011 	    """
2012 	    if sys.stdin is not None and sys.stdin.isatty():
2013 	        try:
2014 	            return getpass.getpass(message)
2015 	        except KeyboardInterrupt:
2016 	            print("Interrupted")
2017 	            sys.exit(1)
2018 	    else:
2019 	        return get_terminal_input(message)
2020 	
2021 	
2022 	# Returns an xml dom containing the current status of the cluster
2023 	# DEPRECATED, please use
2024 	# ClusterState(lib.pacemaker.live.get_cluster_status_dom()) instead
2025 	def getClusterState():
2026 	    """
2027 	    Commandline options:
2028 	      * -f - CIB file
2029 	    """
2030 	    xml_string, returncode = run(
2031 	        ["crm_mon", "--one-shot", "--output-as=xml", "--inactive"],
2032 	        ignore_stderr=True,
2033 	    )
2034 	    if returncode != 0:
2035 	        err("error running crm_mon, is pacemaker running?")
2036 	    return parseString(xml_string)
2037 	
2038 	
2039 	def write_empty_cib(cibfile):
2040 	    """
2041 	    Commandline options: no options
2042 	    """
2043 	    empty_xml = """
2044 	        <cib admin_epoch="0" epoch="1" num_updates="1" validate-with="pacemaker-3.1">
2045 	          <configuration>
2046 	            <crm_config/>
2047 	            <nodes/>
2048 	            <resources/>
2049 	            <constraints/>
2050 	          </configuration>
2051 	          <status/>
2052 	        </cib>
2053 	    """
2054 	    with open(cibfile, "w") as f:
2055 	        f.write(empty_xml)
2056 	
2057 	
2058 	# Test if 'var' is a score or option (contains an '=')
2059 	def is_score_or_opt(var):
2060 	    """
2061 	    Commandline options: no options
2062 	    """
2063 	    if is_score(var):
2064 	        return True
2065 	    return var.find("=") != -1
2066 	
2067 	
2068 	def is_score(var):
2069 	    """
2070 	    Commandline options: no options
2071 	    """
2072 	    return is_score_value(var)
2073 	
2074 	
2075 	def validate_xml_id(var: str, description: str = "id") -> Tuple[bool, str]:
2076 	    """
2077 	    Commandline options: no options
2078 	    """
2079 	    report_list: ReportItemList = []
2080 	    validate_id(var, description, report_list)
2081 	    if report_list:
2082 	        return False, report_list[0].message.message
2083 	    return True, ""
2084 	
2085 	
2086 	# deprecated, moved to pcs.lib.pacemaker.live
2087 	def is_iso8601_date(var):
2088 	    """
2089 	    Commandline options: no options
2090 	    """
2091 	    # using pacemaker tool to check if a value is a valid pacemaker iso8601 date
2092 	    dummy_output, retVal = run(["iso8601", "-d", var])
2093 	    return retVal == 0
2094 	
2095 	
2096 	def err(errorText: str, exit_after_error: bool = True) -> None:
2097 	    retval = reports_output.error(errorText)
2098 	    if exit_after_error:
2099 	        raise retval
2100 	
2101 	
2102 	@lru_cache(typed=True)
2103 	def get_service_manager() -> ServiceManagerInterface:
2104 	    return _get_service_manager(cmd_runner(), get_report_processor())
2105 	
2106 	
2107 	def enableServices():
2108 	    """
2109 	    Commandline options: no options
2110 	    """
2111 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
2112 	    service_list = ["corosync", "pacemaker"]
2113 	    if need_to_handle_qdevice_service():
2114 	        service_list.append("corosync-qdevice")
2115 	    service_manager = get_service_manager()
2116 	
2117 	    report_item_list = []
2118 	    for service in service_list:
2119 	        try:
2120 	            service_manager.enable(service)
2121 	        except ManageServiceError as e:
2122 	            report_item_list.append(service_exception_to_report(e))
2123 	    if report_item_list:
2124 	        raise LibraryError(*report_item_list)
2125 	
2126 	
2127 	def disableServices():
2128 	    """
2129 	    Commandline options: no options
2130 	    """
2131 	    # do NOT handle SBD in here, it is started by pacemaker not systemd or init
2132 	    service_list = ["corosync", "pacemaker"]
2133 	    if need_to_handle_qdevice_service():
2134 	        service_list.append("corosync-qdevice")
2135 	    service_manager = get_service_manager()
2136 	
2137 	    report_item_list = []
2138 	    for service in service_list:
2139 	        try:
2140 	            service_manager.disable(service)
2141 	        except ManageServiceError as e:
2142 	            report_item_list.append(service_exception_to_report(e))
2143 	    if report_item_list:
2144 	        raise LibraryError(*report_item_list)
2145 	
2146 	
2147 	def start_service(service):
2148 	    """
2149 	    Commandline options: no options
2150 	    """
2151 	    service_manager = get_service_manager()
2152 	
2153 	    try:
2154 	        service_manager.start(service)
2155 	    except ManageServiceError as e:
2156 	        raise LibraryError(service_exception_to_report(e)) from e
2157 	
2158 	
2159 	def stop_service(service):
2160 	    """
2161 	    Commandline options: no options
2162 	    """
2163 	    service_manager = get_service_manager()
2164 	
2165 	    try:
2166 	        service_manager.stop(service)
2167 	    except ManageServiceError as e:
2168 	        raise LibraryError(service_exception_to_report(e)) from e
2169 	
2170 	
2171 	def write_file(path, data, permissions=0o644, binary=False):
2172 	    """
2173 	    Commandline options:
2174 	      * --force - overwrite a file if it already exists
2175 	    """
2176 	    if os.path.exists(path):
2177 	        if "--force" not in pcs_options:
2178 	            return False, "'%s' already exists, use --force to overwrite" % path
2179 	        try:
2180 	            os.remove(path)
2181 	        except EnvironmentError as e:
2182 	            return False, "unable to remove '%s': %s" % (path, e)
2183 	    mode = "wb" if binary else "w"
2184 	    try:
2185 	        with os.fdopen(
2186 	            os.open(path, os.O_WRONLY | os.O_CREAT, permissions), mode
2187 	        ) as outfile:
2188 	            outfile.write(data)
2189 	    except EnvironmentError as e:
2190 	        return False, "unable to write to '%s': %s" % (path, e)
2191 	    return True, ""
2192 	
2193 	
2194 	def tar_add_file_data(  # noqa: PLR0913
2195 	    tarball,
2196 	    data,
2197 	    name,
2198 	    *,
2199 	    mode=None,
2200 	    uid=None,
2201 	    gid=None,
2202 	    uname=None,
2203 	    gname=None,
2204 	    mtime=None,
2205 	):
2206 	    # pylint: disable=too-many-arguments
2207 	    """
2208 	    Commandline options: no options
2209 	    """
2210 	    info = tarfile.TarInfo(name)
2211 	    info.size = len(data)
2212 	    info.type = tarfile.REGTYPE
2213 	    info.mtime = int(time.time()) if mtime is None else mtime
2214 	    if mode is not None:
2215 	        info.mode = mode
2216 	    if uid is not None:
2217 	        info.uid = uid
2218 	    if gid is not None:
2219 	        info.gid = gid
2220 	    if uname is not None:
2221 	        info.uname = uname
2222 	    if gname is not None:
2223 	        info.gname = gname
2224 	    data_io = BytesIO(data)
2225 	    tarball.addfile(info, data_io)
2226 	    data_io.close()
2227 	
2228 	
2229 	# DEPRECATED, please use pcs.lib.pacemaker.live.simulate_cib
2230 	def simulate_cib(cib_dom):
2231 	    """
2232 	    Commandline options: no options
2233 	    """
2234 	    try:
2235 	        with (
2236 	            tempfile.NamedTemporaryFile(
2237 	                mode="w+", suffix=".pcs"
2238 	            ) as new_cib_file,
2239 	            tempfile.NamedTemporaryFile(
2240 	                mode="w+", suffix=".pcs"
2241 	            ) as transitions_file,
2242 	        ):
2243 	            output, retval = run(
2244 	                [
2245 	                    "crm_simulate",
2246 	                    "--simulate",
2247 	                    "--save-output",
2248 	                    new_cib_file.name,
2249 	                    "--save-graph",
2250 	                    transitions_file.name,
2251 	                    "--xml-pipe",
2252 	                ],
2253 	                string_for_stdin=cib_dom.toxml(),
2254 	            )
2255 	            if retval != 0:
2256 	                return err("Unable to run crm_simulate:\n%s" % output)
2257 	            new_cib_file.seek(0)
2258 	            transitions_file.seek(0)
2259 	            return (
2260 	                output,
2261 	                parseString(transitions_file.read()),
2262 	                parseString(new_cib_file.read()),
2263 	            )
2264 	    except (EnvironmentError, xml.parsers.expat.ExpatError) as e:
2265 	        return err("Unable to run crm_simulate:\n%s" % e)
2266 	    except xml.etree.ElementTree.ParseError as e:
2267 	        return err("Unable to run crm_simulate:\n%s" % e)
2268 	
2269 	
2270 	# DEPRECATED
2271 	# please use pcs.lib.pacemaker.simulate.get_operations_from_transitions
2272 	def get_operations_from_transitions(transitions_dom):
2273 	    """
2274 	    Commandline options: no options
2275 	    """
2276 	    operation_list = []
2277 	    watched_operations = (
2278 	        "start",
2279 	        "stop",
2280 	        "promote",
2281 	        "demote",
2282 	        "migrate_from",
2283 	        "migrate_to",
2284 	    )
2285 	    for rsc_op in transitions_dom.getElementsByTagName("rsc_op"):
2286 	        primitives = rsc_op.getElementsByTagName("primitive")
2287 	        if not primitives:
2288 	            continue
2289 	        if rsc_op.getAttribute("operation").lower() not in watched_operations:
2290 	            continue
2291 	        for prim in primitives:
2292 	            prim_id = prim.getAttribute("id")
2293 	            operation_list.append(
2294 	                (
2295 	                    int(rsc_op.getAttribute("id")),
2296 	                    {
2297 	                        "id": prim_id,
2298 	                        "long_id": prim.getAttribute("long-id") or prim_id,
2299 	                        "operation": rsc_op.getAttribute("operation").lower(),
2300 	                        "on_node": rsc_op.getAttribute("on_node"),
2301 	                    },
2302 	                )
2303 	            )
2304 	    operation_list.sort(key=lambda x: x[0])
2305 	    return [op[1] for op in operation_list]
2306 	
2307 	
2308 	def get_resources_location_from_operations(cib_dom, resources_operations):
2309 	    """
2310 	    Commandline options:
2311 	      * --force - allow constraints on any resource, may not have any effect as
2312 	        an invalid constraint is ignored anyway
2313 	    """
2314 	    locations = {}
2315 	    for res_op in resources_operations:
2316 	        operation = res_op["operation"]
2317 	        if operation not in ("start", "promote", "migrate_from"):
2318 	            continue
2319 	        long_id = res_op["long_id"]
2320 	        if long_id not in locations:
2321 	            # Move clone instances as if they were non-cloned resources, it
2322 	            # really works with current pacemaker (1.1.13-6). Otherwise there
2323 	            # is probably no way to move them other then setting their
2324 	            # stickiness to 0.
2325 	            res_id = res_op["id"]
2326 	            if ":" in res_id:
2327 	                res_id = res_id.split(":")[0]
2328 	            id_for_constraint = validate_constraint_resource(cib_dom, res_id)[2]
2329 	            if not id_for_constraint:
2330 	                continue
2331 	            locations[long_id] = {
2332 	                "id": res_op["id"],
2333 	                "long_id": long_id,
2334 	                "id_for_constraint": id_for_constraint,
2335 	            }
2336 	        if operation in ("start", "migrate_from"):
2337 	            locations[long_id]["start_on_node"] = res_op["on_node"]
2338 	        if operation == "promote":
2339 	            locations[long_id]["promote_on_node"] = res_op["on_node"]
2340 	    return {
2341 	        key: val
2342 	        for key, val in locations.items()
2343 	        if "start_on_node" in val or "promote_on_node" in val
2344 	    }
2345 	
2346 	
2347 	def get_remote_quorumtool_output(node):
2348 	    """
2349 	    Commandline options:
2350 	      * --request-timeout - timeout for HTTP requests
2351 	    """
2352 	    return sendHTTPRequest(node, "remote/get_quorum_info", None, False, False)
2353 	
2354 	
2355 	# return True if quorumtool_output is a string returned when the node is off
2356 	def is_node_offline_by_quorumtool_output(quorum_info):
2357 	    """
2358 	    Commandline options: no options
2359 	    """
2360 	    return quorum_info.strip() == "Cannot initialize CMAP service"
2361 	
2362 	
2363 	def dom_prepare_child_element(dom_element, tag_name, id_candidate):
2364 	    """
2365 	    Commandline options: no options
2366 	    """
2367 	    child_elements = [
2368 	        child
2369 	        for child in dom_element.childNodes
2370 	        if child.nodeType == child.ELEMENT_NODE and child.tagName == tag_name
2371 	    ]
2372 	
2373 	    if not child_elements:
2374 	        dom = dom_element.ownerDocument
2375 	        child_element = dom.createElement(tag_name)
2376 	        child_element.setAttribute("id", find_unique_id(dom, id_candidate))
2377 	        dom_element.appendChild(child_element)
2378 	    else:
2379 	        child_element = child_elements[0]
2380 	    return child_element
2381 	
2382 	
2383 	def dom_update_nvset(dom_element, nvpair_tuples, tag_name, id_candidate):
2384 	    """
2385 	    Commandline options: no options
2386 	    """
2387 	    # Already ported to pcs.libcib.nvpair
2388 	
2389 	    # Do not ever remove the nvset element, even if it is empty. There may be
2390 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2391 	    # and removing) but not nvsets. In such a case, removing the nvset would
2392 	    # cause the whole change to be rejected by pacemaker with a "permission
2393 	    # denied" message.
2394 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2395 	    if not nvpair_tuples:
2396 	        return
2397 	
2398 	    only_removing = True
2399 	    for _, value in nvpair_tuples:
2400 	        if value != "":
2401 	            only_removing = False
2402 	            break
2403 	
2404 	    # Do not use dom.getElementsByTagName, that would get elements we do not
2405 	    # want to. For example if dom_element is a clone, we would get the clones's
2406 	    # as well as clone's primitive's attributes.
2407 	    nvset_element_list = dom_get_children_by_tag_name(dom_element, tag_name)
2408 	
2409 	    # Do not create new nvset if we are only removing values from it.
2410 	    if not nvset_element_list and only_removing:
2411 	        return
2412 	
2413 	    if not nvset_element_list:
2414 	        dom = dom_element.ownerDocument
2415 	        nvset_element = dom.createElement(tag_name)
2416 	        nvset_element.setAttribute("id", find_unique_id(dom, id_candidate))
2417 	        dom_element.appendChild(nvset_element)
2418 	    else:
2419 	        nvset_element = nvset_element_list[0]
2420 	
2421 	    for name, value in nvpair_tuples:
2422 	        dom_update_nv_pair(
2423 	            nvset_element, name, value, nvset_element.getAttribute("id") + "-"
2424 	        )
2425 	
2426 	
2427 	def dom_update_nv_pair(dom_element, name, value, id_prefix=""):
2428 	    """
2429 	    Commandline options: no options
2430 	    """
2431 	    # Do not ever remove the nvset element, even if it is empty. There may be
2432 	    # ACLs set in pacemaker which allow "write" for nvpairs (adding, changing
2433 	    # and removing) but not nvsets. In such a case, removing the nvset would
2434 	    # cause the whole change to be rejected by pacemaker with a "permission
2435 	    # denied" message.
2436 	    # https://bugzilla.redhat.com/show_bug.cgi?id=1642514
2437 	
2438 	    dom = dom_element.ownerDocument
2439 	    element_found = False
2440 	    for el in dom_element.getElementsByTagName("nvpair"):
2441 	        if el.getAttribute("name") == name:
2442 	            element_found = True
2443 	            if value == "":
2444 	                dom_element.removeChild(el)
2445 	            else:
2446 	                el.setAttribute("value", value)
2447 	            break
2448 	    if not element_found and value != "":
2449 	        el = dom.createElement("nvpair")
2450 	        el.setAttribute("id", id_prefix + name)
2451 	        el.setAttribute("name", name)
2452 	        el.setAttribute("value", value)
2453 	        dom_element.appendChild(el)
2454 	    return dom_element
2455 	
2456 	
2457 	# Passed an array of strings ["a=b","c=d"], return array of tuples
2458 	# [("a","b"),("c","d")]
2459 	def convert_args_to_tuples(ra_values):
2460 	    """
2461 	    Commandline options: no options
2462 	    """
2463 	    ret = []
2464 	    for ra_val in ra_values:
2465 	        if ra_val.count("=") != 0:
2466 	            split_val = ra_val.split("=", 1)
2467 	            ret.append((split_val[0], split_val[1]))
2468 	    return ret
2469 	
2470 	
2471 	def is_int(val):
2472 	    try:
2473 	        int(val)
2474 	        return True
2475 	    except ValueError:
2476 	        return False
2477 	
2478 	
2479 	def dom_update_utilization(dom_element, attributes, id_prefix=""):
2480 	    """
2481 	    Commandline options: no options
2482 	    """
2483 	    attr_tuples = []
2484 	    for name, value in sorted(attributes.items()):
2485 	        if value != "" and not is_int(value):
2486 	            err(
2487 	                "Value of utilization attribute must be integer: "
2488 	                "'{0}={1}'".format(name, value)
2489 	            )
2490 	        attr_tuples.append((name, value))
2491 	    dom_update_nvset(
2492 	        dom_element,
2493 	        attr_tuples,
2494 	        "utilization",
2495 	        id_prefix + dom_element.getAttribute("id") + "-utilization",
2496 	    )
2497 	
2498 	
2499 	def dom_update_meta_attr(dom_element, attributes):
2500 	    """
2501 	    Commandline options: no options
2502 	    """
2503 	    dom_update_nvset(
2504 	        dom_element,
2505 	        attributes,
2506 	        "meta_attributes",
2507 	        dom_element.getAttribute("id") + "-meta_attributes",
2508 	    )
2509 	
2510 	
2511 	def dom_update_instance_attr(dom_element, attributes):
2512 	    """
2513 	    Commandline options: no options
2514 	    """
2515 	    dom_update_nvset(
2516 	        dom_element,
2517 	        attributes,
2518 	        "instance_attributes",
2519 	        dom_element.getAttribute("id") + "-instance_attributes",
2520 	    )
2521 	
2522 	
2523 	def get_utilization(element, filter_name=None):
2524 	    """
2525 	    Commandline options: no options
2526 	    """
2527 	    utilization = {}
2528 	    for e in element.getElementsByTagName("utilization"):
2529 	        for u in e.getElementsByTagName("nvpair"):
2530 	            name = u.getAttribute("name")
2531 	            if filter_name is not None and name != filter_name:
2532 	                continue
2533 	            utilization[name] = u.getAttribute("value")
2534 	        # Use just first element of utilization attributes. We don't support
2535 	        # utilization with rules just yet.
2536 	        break
2537 	    return utilization
2538 	
2539 	
2540 	def get_utilization_str(element, filter_name=None):
2541 	    """
2542 	    Commandline options: no options
2543 	    """
2544 	    output = []
2545 	    for name, value in sorted(get_utilization(element, filter_name).items()):
2546 	        output.append(name + "=" + value)
2547 	    return " ".join(output)
2548 	
2549 	
2550 	def get_lib_env() -> LibraryEnvironment:
2551 	    """
2552 	    Commandline options:
2553 	      * -f - CIB file
2554 	      * --corosync_conf - corosync.conf file
2555 	      * --request-timeout - timeout of HTTP requests
2556 	    """
2557 	    user = None
2558 	    groups = None
2559 	    if os.geteuid() == 0:
2560 	        for name in ("CIB_user", "CIB_user_groups"):
2561 	            if name in os.environ and os.environ[name].strip():
2562 	                value = os.environ[name].strip()
2563 	                if name == "CIB_user":
2564 	                    user = value
2565 	                else:
2566 	                    groups = value.split(" ")
2567 	
2568 	    cib_data = None
2569 	    if usefile:
2570 	        cib_data = get_cib()
2571 	
2572 	    corosync_conf_data = None
2573 	    if "--corosync_conf" in pcs_options:
2574 	        conf = pcs_options["--corosync_conf"]
2575 	        try:
2576 	            with open(conf) as corosync_conf_file:
2577 	                corosync_conf_data = corosync_conf_file.read()
2578 	        except IOError as e:
2579 	            err("Unable to read %s: %s" % (conf, e.strerror))
2580 	
2581 	    return LibraryEnvironment(
2582 	        logging.getLogger("pcs"),
2583 	        get_report_processor(),
2584 	        user,
2585 	        groups,
2586 	        cib_data,
2587 	        corosync_conf_data,
2588 	        known_hosts_getter=read_known_hosts_file,
2589 	        request_timeout=pcs_options.get("--request-timeout"),
2590 	    )
2591 	
2592 	
2593 	def get_cib_user_groups():
2594 	    """
2595 	    Commandline options: no options
2596 	    """
2597 	    user = None
2598 	    groups = None
2599 	    if os.geteuid() == 0:
2600 	        for name in ("CIB_user", "CIB_user_groups"):
2601 	            if name in os.environ and os.environ[name].strip():
2602 	                value = os.environ[name].strip()
2603 	                if name == "CIB_user":
2604 	                    user = value
2605 	                else:
2606 	                    groups = value.split(" ")
2607 	    return user, groups
2608 	
2609 	
2610 	def get_cli_env():
2611 	    """
2612 	    Commandline options:
2613 	      * --debug
2614 	      * --request-timeout
2615 	    """
2616 	    env = Env()
2617 	    env.user, env.groups = get_cib_user_groups()
2618 	    env.known_hosts_getter = read_known_hosts_file
2619 	    env.report_processor = get_report_processor()
2620 	    env.request_timeout = pcs_options.get("--request-timeout")
2621 	    return env
2622 	
2623 	
2624 	def get_middleware_factory():
2625 	    """
2626 	    Commandline options:
2627 	      * --corosync_conf
2628 	      * --name
2629 	      * --booth-conf
2630 	      * --booth-key
2631 	      * -f
2632 	    """
2633 	    return middleware.create_middleware_factory(
2634 	        cib=middleware.cib(filename if usefile else None, touch_cib_file),
2635 	        corosync_conf_existing=middleware.corosync_conf_existing(
2636 	            pcs_options.get("--corosync_conf")
2637 	        ),
2638 	        booth_conf=pcs.cli.booth.env.middleware_config(
2639 	            pcs_options.get("--booth-conf"),
2640 	            pcs_options.get("--booth-key"),
2641 	        ),
2642 	    )
2643 	
2644 	
2645 	def get_library_wrapper():
2646 	    """
2647 	    Commandline options:
2648 	      * --debug
2649 	      * --request-timeout
2650 	      * --corosync_conf
2651 	      * --name
2652 	      * --booth-conf
2653 	      * --booth-key
2654 	      * -f
2655 	    NOTE: usage of options may depend on used middleware for particular command
2656 	    """
2657 	    return Library(get_cli_env(), get_middleware_factory())
2658 	
2659 	
2660 	def exit_on_cmdline_input_error(
2661 	    error: CmdLineInputError, main_name: str, usage_name: StringSequence
2662 	) -> None:
2663 	    if error and error.message:
2664 	        reports_output.error(error.message)
2665 	    if error and error.hint:
2666 	        print_to_stderr(f"Hint: {error.hint}")
2667 	    if not error or (not error.message or error.show_both_usage_and_message):
2668 	        usage.show(main_name, list(usage_name))
2669 	    sys.exit(1)
2670 	
2671 	
2672 	def get_report_processor() -> ReportProcessor:
2673 	    return ReportProcessorToConsole(debug="--debug" in pcs_options)
2674 	
2675 	
2676 	def get_user_and_pass():
2677 	    """
2678 	    Commandline options:
2679 	      * -u - username
2680 	      * -p - password
2681 	    """
2682 	    username = (
2683 	        pcs_options["-u"]
2684 	        if "-u" in pcs_options
2685 	        else get_terminal_input("Username: ")
2686 	    )
2687 	    password = (
2688 	        pcs_options["-p"] if "-p" in pcs_options else get_terminal_password()
2689 	    )
2690 	    return username, password
2691 	
2692 	
2693 	def get_input_modifiers() -> InputModifiers:
2694 	    if "--master" in pcs_options:
2695 	        reports_output.deprecation_warning(
2696 	            "Option '--master' is deprecated and has been replaced by option "
2697 	            "'--promoted' "
2698 	        )
2699 	        pcs_options["--promoted"] = pcs_options.pop("--master")
2700 	    return InputModifiers(pcs_options)
2701 	
2702 	
2703 	def get_token_from_file(file_name: str) -> str:
2704 	    try:
2705 	        with open(file_name, "rb") as file:
2706 	            max_size = settings.pcsd_token_max_bytes  # type: ignore
2707 	            value_bytes = file.read(max_size + 1)
2708 	            if len(value_bytes) > max_size:
2709 	                err(f"Maximal token size of {max_size} bytes exceeded")
2710 	            if not value_bytes:
2711 	                err(f"File '{file_name}' is empty")
2712 	            return base64.b64encode(value_bytes).decode("utf-8")
2713 	    except OSError as e:
2714 	        err(f"Unable to read file '{file_name}': {e}", exit_after_error=False)
2715 	        raise SystemExit(1) from e
2716 	
2717 	
2718 	def print_deprecation_warning_for_legacy_roles(role: str) -> None:
2719 	    deprecation_map: Dict[str, str] = {
2720 	        const.PCMK_ROLE_PROMOTED_LEGACY: const.PCMK_ROLE_PROMOTED,
2721 	        const.PCMK_ROLE_UNPROMOTED_LEGACY: const.PCMK_ROLE_UNPROMOTED,
2722 	    }
2723 	    role_normalized = role.capitalize()
2724 	    if role_normalized in deprecation_map:
2725 	        replaced_by = deprecation_map[role_normalized]
2726 	        reports_output.deprecation_warning(
2727 	            f"Role value '{role}' is deprecated and should not be used, use "
2728 	            f"'{replaced_by}' instead"
2729 	        )
2730 	
2731 	
2732 	def print_warning_if_utilization_attrs_has_no_effect(
2733 	    properties_facade: PropertyConfigurationFacade,
2734 	) -> None:
2735 	    PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS = [
2736 	        "balanced",
2737 	        "minimal",
2738 	        "utilization",
2739 	    ]
2740 	    value = properties_facade.get_property_value_or_default(
2741 	        "placement-strategy"
2742 	    )
2743 	    if value not in PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS:
2744 	        reports_output.warn(
2745 	            "Utilization attributes configuration has no effect until cluster "
2746 	            "property option 'placement-strategy' is set to one of the "
2747 	            "values: "
2748 	            f"{format_list(PLACEMENT_STRATEGIES_USING_UTILIZATION_ATTRS)}"
2749 	        )
2750