1    	import datetime
2    	import difflib
3    	import grp
4    	import json
5    	import os
6    	import os.path
7    	import pwd
8    	import re
9    	import shutil
10   	import sys
11   	import tarfile
12   	import tempfile
13   	import time
14   	from io import BytesIO
15   	from typing import cast
16   	from xml.dom.minidom import parse
17   	
18   	from pcs import (
19   	    cluster,
20   	    quorum,
21   	    settings,
22   	    status,
23   	    usage,
24   	    utils,
25   	)
26   	from pcs.cli.alert.output import config_dto_to_lines as alerts_to_lines
27   	from pcs.cli.cluster_property.output import (
28   	    PropertyConfigurationFacade,
29   	    properties_to_text,
30   	)
31   	from pcs.cli.common import middleware
32   	from pcs.cli.common.errors import CmdLineInputError
33   	from pcs.cli.common.output import (
34   	    INDENT_STEP,
35   	    smart_wrap_text,
36   	)
37   	from pcs.cli.constraint.output import constraints_to_text
38   	from pcs.cli.nvset import nvset_dto_list_to_lines
39   	from pcs.cli.reports import process_library_reports
40   	from pcs.cli.reports.output import (
41   	    print_to_stderr,
42   	    warn,
43   	)
44   	from pcs.cli.resource.output import (
45   	    ResourcesConfigurationFacade,
46   	    resources_to_text,
47   	)
48   	from pcs.cli.stonith.levels.output import stonith_level_config_to_text
49   	from pcs.cli.tag.output import tags_to_text
50   	from pcs.common.interface import dto
51   	from pcs.common.pacemaker.constraint import CibConstraintsDto
52   	from pcs.common.str_tools import indent
53   	from pcs.lib.errors import LibraryError
54   	from pcs.lib.node import get_existing_nodes_names
55   	
56   	
57   	def config_show(lib, argv, modifiers):
58   	    """
59   	    Options:
60   	      * -f - CIB file, when getting cluster name on remote node (corosync.conf
61   	        doesn't exist)
62   	      * --corosync_conf - corosync.conf file
63   	    """
64   	    modifiers.ensure_only_supported("-f", "--corosync_conf", "--show-secrets")
65   	    if argv:
66   	        raise CmdLineInputError()
67   	
68   	    corosync_conf_dto = None
69   	    cluster_name = ""
70   	    properties_facade = PropertyConfigurationFacade.from_properties_config(
71   	        lib.cluster_property.get_properties(),
72   	    )
73   	    try:
74   	        corosync_conf_dto = lib.cluster.get_corosync_conf_struct()
75   	        cluster_name = corosync_conf_dto.cluster_name
76   	    except LibraryError:
77   	        # there is no corosync.conf on remote nodes, we can try to
78   	        # get cluster name from pacemaker
79   	        pass
80   	    if not cluster_name:
81   	        cluster_name = properties_facade.get_property_value("cluster-name", "")
82   	    print("Cluster Name: %s" % cluster_name)
83   	
84   	    status.nodes_status(lib, ["config"], modifiers.get_subset("-f"))
85   	    cib_lines = _config_show_cib_lines(lib, properties_facade=properties_facade)
86   	    if cib_lines:
87   	        print()
88   	        print("\n".join(cib_lines))
89   	    if (
90   	        utils.hasCorosyncConf()
91   	        and not modifiers.is_specified("-f")
92   	        and not modifiers.is_specified("--corosync_conf")
93   	    ):
94   	        cluster.cluster_uidgid(
95   	            lib, [], modifiers.get_subset(), silent_list=True
96   	        )
97   	    if corosync_conf_dto:
98   	        quorum_device_dict = {}
99   	        if corosync_conf_dto.quorum_device:
100  	            quorum_device_dict = dto.to_dict(corosync_conf_dto.quorum_device)
101  	        config = dict(
102  	            options=corosync_conf_dto.quorum_options,
103  	            device=quorum_device_dict,
104  	        )
105  	        quorum_lines = quorum.quorum_config_to_str(config)
106  	        if quorum_lines:
107  	            print()
108  	            print("Quorum:")
109  	            print("\n".join(indent(quorum_lines)))
110  	
111  	
112  	def _config_show_cib_lines(lib, properties_facade=None):  # noqa: PLR0912, PLR0915
113  	    """
114  	    Commandline options:
115  	      * -f - CIB file
116  	    """
117  	    # pylint: disable=too-many-branches
118  	    # pylint: disable=too-many-locals
119  	    # pylint: disable=too-many-statements
120  	
121  	    # update of pcs_options will change output of constraint show and
122  	    # displaying resources and operations defaults
123  	    utils.pcs_options["--full"] = 1
124  	    # get latest modifiers object after updating pcs_options
125  	    modifiers = utils.get_input_modifiers()
126  	
127  	    resources_facade = ResourcesConfigurationFacade.from_resources_dto(
128  	        lib.resource.get_configured_resources()
129  	    )
130  	    resources_only_facade = resources_facade.filter_stonith(False)
131  	    stonith_only_facade = resources_facade.filter_stonith(True)
132  	    if modifiers.is_specified("--show-secrets"):
133  	        for facade in [resources_only_facade, stonith_only_facade]:
134  	            queries = facade.get_secrets_queries()
135  	            if queries:
136  	                facade.update_secrets_values(
137  	                    lib.resource.get_cibsecrets(queries)
138  	                )
139  	
140  	    all_lines = []
141  	
142  	    resources_lines = smart_wrap_text(
143  	        indent(
144  	            resources_to_text(resources_only_facade), indent_step=INDENT_STEP
145  	        )
146  	    )
147  	    if resources_lines:
148  	        all_lines.append("Resources:")
149  	        all_lines.extend(resources_lines)
150  	
151  	    stonith_lines = smart_wrap_text(
152  	        indent(resources_to_text(stonith_only_facade), indent_step=INDENT_STEP)
153  	    )
154  	    if stonith_lines:
155  	        if all_lines:
156  	            all_lines.append("")
157  	        all_lines.append("Stonith Devices:")
158  	        all_lines.extend(stonith_lines)
159  	
160  	    levels_lines = stonith_level_config_to_text(
161  	        lib.fencing_topology.get_config_dto()
162  	    )
163  	    if levels_lines:
164  	        if all_lines:
165  	            all_lines.append("")
166  	        all_lines.append("Fencing Levels:")
167  	        all_lines.extend(indent(levels_lines, indent_step=2))
168  	
169  	    constraints_lines = smart_wrap_text(
170  	        constraints_to_text(
171  	            cast(
172  	                CibConstraintsDto,
173  	                lib.constraint.get_config(evaluate_rules=False),
174  	            ),
175  	            modifiers.is_specified("--full"),
176  	        )
177  	    )
178  	    if constraints_lines:
179  	        if all_lines:
180  	            all_lines.append("")
181  	        all_lines.extend(constraints_lines)
182  	
183  	    alert_lines = indent(alerts_to_lines(lib.alert.get_config_dto()))
184  	    if alert_lines:
185  	        if all_lines:
186  	            all_lines.append("")
187  	        all_lines.append("Alerts:")
188  	        all_lines.extend(alert_lines)
189  	
190  	    resources_defaults_lines = indent(
191  	        nvset_dto_list_to_lines(
192  	            lib.cib_options.resource_defaults_config(
193  	                evaluate_expired=False
194  	            ).meta_attributes,
195  	            nvset_label="Meta Attrs",
196  	            with_ids=modifiers.get("--full"),
197  	        )
198  	    )
199  	    if resources_defaults_lines:
200  	        if all_lines:
201  	            all_lines.append("")
202  	        all_lines.append("Resources Defaults:")
203  	        all_lines.extend(resources_defaults_lines)
204  	
205  	    operations_defaults_lines = indent(
206  	        nvset_dto_list_to_lines(
207  	            lib.cib_options.operation_defaults_config(
208  	                evaluate_expired=False
209  	            ).meta_attributes,
210  	            nvset_label="Meta Attrs",
211  	            with_ids=modifiers.get("--full"),
212  	        )
213  	    )
214  	    if operations_defaults_lines:
215  	        if all_lines:
216  	            all_lines.append("")
217  	        all_lines.append("Operations Defaults:")
218  	        all_lines.extend(operations_defaults_lines)
219  	
220  	    if not properties_facade:
221  	        properties_facade = PropertyConfigurationFacade.from_properties_config(
222  	            lib.cluster_property.get_properties()
223  	        )
224  	    properties_lines = properties_to_text(properties_facade)
225  	    if properties_lines:
226  	        if all_lines:
227  	            all_lines.append("")
228  	        all_lines.extend(properties_lines)
229  	
230  	    tag_lines = smart_wrap_text(tags_to_text(lib.tag.get_config_dto([])))
231  	    if tag_lines:
232  	        if all_lines:
233  	            all_lines.append("")
234  	        all_lines.append("Tags:")
235  	        all_lines.extend(indent(tag_lines, indent_step=1))
236  	
237  	    return all_lines
238  	
239  	
240  	def config_backup(lib, argv, modifiers):
241  	    """
242  	    Options:
243  	      * --force - overwrite file if already exists
244  	    """
245  	    del lib
246  	    modifiers.ensure_only_supported("--force")
247  	    if len(argv) > 1:
248  	        raise CmdLineInputError()
249  	
250  	    outfile_name = None
251  	    if argv:
252  	        outfile_name = argv[0]
253  	        if not outfile_name.endswith(".tar.bz2"):
254  	            outfile_name += ".tar.bz2"
255  	
256  	    tar_data = config_backup_local()
257  	    if outfile_name:
258  	        ok, message = utils.write_file(
259  	            outfile_name, tar_data, permissions=0o600, binary=True
260  	        )
261  	        if not ok:
262  	            utils.err(message)
263  	    else:
264  	        # in python3 stdout accepts str so we need to use buffer
265  	        sys.stdout.buffer.write(tar_data)
266  	
267  	
268  	def config_backup_local():
269  	    """
270  	    Commandline options: no options
271  	    """
272  	    file_list = config_backup_path_list()
273  	    tar_data = BytesIO()
274  	
275  	    try:
276  	        with tarfile.open(fileobj=tar_data, mode="w|bz2") as tarball:
277  	            config_backup_add_version_to_tarball(tarball)
278  	            for tar_path, path_info in file_list.items():
279  	                if (
280  	                    not os.path.exists(path_info["path"])
281  	                    and not path_info["required"]
282  	                ):
283  	                    continue
284  	                tarball.add(path_info["path"], tar_path)
285  	    except (tarfile.TarError, EnvironmentError) as e:
286  	        utils.err("unable to create tarball: %s" % e)
287  	
288  	    tar = tar_data.getvalue()
289  	    tar_data.close()
290  	    return tar
291  	
292  	
293  	def config_restore(lib, argv, modifiers):
294  	    """
295  	    Options:
296  	      * --local - restore config only on local node
297  	      * --request-timeout - timeout for HTTP requests, used only if --local was
298  	        not defined or user is not root
299  	    """
300  	    del lib
301  	    modifiers.ensure_only_supported("--local", "--request-timeout")
302  	    if len(argv) > 1:
303  	        raise CmdLineInputError()
304  	
305  	    infile_name = infile_obj = None
306  	    if argv:
307  	        infile_name = argv[0]
308  	    if not infile_name:
309  	        # in python3 stdin returns str so we need to use buffer
310  	        infile_obj = BytesIO(sys.stdin.buffer.read())
311  	
312  	    if os.getuid() == 0:
313  	        if modifiers.get("--local"):
314  	            config_restore_local(infile_name, infile_obj)
315  	        else:
316  	            config_restore_remote(infile_name, infile_obj)
317  	    else:
318  	        new_argv = ["config", "restore"]
319  	        options = []
320  	        new_stdin = None
321  	        if modifiers.get("--local"):
322  	            options.append("--local")
323  	        if infile_name:
324  	            new_argv.append(os.path.abspath(infile_name))
325  	        else:
326  	            new_stdin = infile_obj.read()
327  	        err_msgs, exitcode, std_out, std_err = utils.call_local_pcsd(
328  	            new_argv, options, new_stdin
329  	        )
330  	        if err_msgs:
331  	            for msg in err_msgs:
332  	                utils.err(msg, False)
333  	            sys.exit(1)
334  	        print(std_out)
335  	        sys.stderr.write(std_err)
336  	        sys.exit(exitcode)
337  	
338  	
339  	def config_restore_remote(infile_name, infile_obj):  # noqa: PLR0912
340  	    """
341  	    Commandline options:
342  	      * --request-timeout - timeout for HTTP requests
343  	    """
344  	    # pylint: disable=too-many-branches
345  	    # pylint: disable=too-many-locals
346  	    extracted = {
347  	        "version.txt": "",
348  	        "corosync.conf": "",
349  	    }
350  	    try:
351  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
352  	            while True:
353  	                # next(tarball) does not work in python2.6
354  	                tar_member_info = tarball.next()
355  	                if tar_member_info is None:
356  	                    break
357  	                if tar_member_info.name in extracted:
358  	                    tar_member = tarball.extractfile(tar_member_info)
359  	                    extracted[tar_member_info.name] = tar_member.read()
360  	                    tar_member.close()
361  	    except (tarfile.TarError, EnvironmentError) as e:
362  	        utils.err("unable to read the tarball: %s" % e)
363  	
364  	    config_backup_check_version(extracted["version.txt"])
365  	
366  	    node_list, report_list = get_existing_nodes_names(
367  	        utils.get_corosync_conf_facade(
368  	            conf_text=extracted["corosync.conf"].decode("utf-8")
369  	        )
370  	    )
371  	    if report_list:
372  	        process_library_reports(report_list)
373  	    if not node_list:
374  	        utils.err("no nodes found in the tarball")
375  	
376  	    err_msgs = []
377  	    for node in node_list:
378  	        try:
379  	            retval, output = utils.checkStatus(node)
380  	            if retval != 0:
381  	                err_msgs.append(output)
382  	                continue
383  	            _status = json.loads(output)
384  	            if any(
385  	                _status["node"]["services"][service_name]["running"]
386  	                for service_name in (
387  	                    "corosync",
388  	                    "pacemaker",
389  	                    "pacemaker_remote",
390  	                )
391  	            ):
392  	                err_msgs.append(
393  	                    "Cluster is currently running on node %s. You need to stop "
394  	                    "the cluster in order to restore the configuration." % node
395  	                )
396  	                continue
397  	        except (ValueError, NameError, LookupError):
398  	            err_msgs.append("unable to determine status of the node %s" % node)
399  	    if err_msgs:
400  	        for msg in err_msgs:
401  	            utils.err(msg, False)
402  	        sys.exit(1)
403  	
404  	    # Temporarily disable config files syncing thread in pcsd so it will not
405  	    # rewrite restored files. 10 minutes should be enough time to restore.
406  	    # If node returns HTTP 404 it does not support config syncing at all.
407  	    for node in node_list:
408  	        retval, output = utils.pauseConfigSyncing(node, 10 * 60)
409  	        if not (retval == 0 or "(HTTP error: 404)" in output):
410  	            utils.err(output)
411  	
412  	    if infile_obj:
413  	        infile_obj.seek(0)
414  	        tarball_data = infile_obj.read()
415  	    else:
416  	        with open(infile_name, "rb") as tarball:
417  	            tarball_data = tarball.read()
418  	
419  	    error_list = []
420  	    for node in node_list:
421  	        retval, error = utils.restoreConfig(node, tarball_data)
422  	        if retval != 0:
423  	            error_list.append(error)
424  	    if error_list:
425  	        utils.err("unable to restore all nodes\n" + "\n".join(error_list))
426  	
427  	
428  	def config_restore_local(infile_name, infile_obj):  # noqa: PLR0912, PLR0915
429  	    """
430  	    Commandline options: no options
431  	    """
432  	    # pylint: disable=too-many-branches
433  	    # pylint: disable=too-many-locals
434  	    # pylint: disable=too-many-statements
435  	    service_manager = utils.get_service_manager()
436  	    if (
437  	        service_manager.is_running("corosync")
438  	        or service_manager.is_running("pacemaker")
439  	        or service_manager.is_running("pacemaker_remote")
440  	    ):
441  	        utils.err(
442  	            "Cluster is currently running on this node. You need to stop "
443  	            "the cluster in order to restore the configuration."
444  	        )
445  	
446  	    file_list = config_backup_path_list(with_uid_gid=True)
447  	    tarball_file_list = []
448  	    version = None
449  	    tmp_dir = None
450  	    try:
451  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
452  	            while True:
453  	                # next(tarball) does not work in python2.6
454  	                tar_member_info = tarball.next()
455  	                if tar_member_info is None:
456  	                    break
457  	                if tar_member_info.name == "version.txt":
458  	                    version_data = tarball.extractfile(tar_member_info)
459  	                    version = version_data.read()
460  	                    version_data.close()
461  	                    continue
462  	                tarball_file_list.append(tar_member_info.name)
463  	
464  	        required_file_list = [
465  	            tar_path
466  	            for tar_path, path_info in file_list.items()
467  	            if path_info["required"]
468  	        ]
469  	        missing = set(required_file_list) - set(tarball_file_list)
470  	        if missing:
471  	            utils.err(
472  	                "unable to restore the cluster, missing files in backup: %s"
473  	                % ", ".join(missing)
474  	            )
475  	
476  	        config_backup_check_version(version)
477  	
478  	        if infile_obj:
479  	            infile_obj.seek(0)
480  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
481  	            while True:
482  	                # next(tarball) does not work in python2.6
483  	                tar_member_info = tarball.next()
484  	                if tar_member_info is None:
485  	                    break
486  	                extract_info = None
487  	                path = tar_member_info.name
488  	                while path:
489  	                    if path in file_list:
490  	                        extract_info = file_list[path]
491  	                        break
492  	                    path = os.path.dirname(path)
493  	                if not extract_info:
494  	                    continue
495  	                path_full = None
496  	                if callable(extract_info.get("pre_store_call")):
497  	                    extract_info["pre_store_call"]()
498  	                if "rename" in extract_info and extract_info["rename"]:
499  	                    if tmp_dir is None:
500  	                        tmp_dir = tempfile.mkdtemp()
501  	                    if hasattr(tarfile, "data_filter"):
502  	                        # Safe way of extraction is available since Python 3.12,
503  	                        # hasattr above checks if it's available.
504  	                        # It's also backported to 3.11.4, 3.10.12, 3.9.17.
505  	                        # It may be backported to older versions in downstream.
506  	                        tarball.extractall(
507  	                            tmp_dir, [tar_member_info], filter="data"
508  	                        )
509  	                    else:
510  	                        # Unsafe way of extraction
511  	                        # Remove once we don't support Python 3.8 and older
512  	                        tarball.extractall(tmp_dir, [tar_member_info])
513  	                    path_full = extract_info["path"]
514  	                    shutil.move(
515  	                        os.path.join(tmp_dir, tar_member_info.name), path_full
516  	                    )
517  	                else:
518  	                    dir_path = os.path.dirname(extract_info["path"])
519  	                    if hasattr(tarfile, "data_filter"):
520  	                        # Safe way of extraction is available since Python 3.12,
521  	                        # hasattr above checks if it's available.
522  	                        # It's also backported to 3.11.4, 3.10.12, 3.9.17.
523  	                        # It may be backported to older versions in downstream.
524  	                        tarball.extractall(
525  	                            dir_path, [tar_member_info], filter="data"
526  	                        )
527  	                    else:
528  	                        # Unsafe way of extracting
529  	                        # Remove once we don't support Python 3.8 and older
530  	                        tarball.extractall(dir_path, [tar_member_info])
531  	                    path_full = os.path.join(dir_path, tar_member_info.name)
532  	                file_attrs = extract_info["attrs"]
533  	                os.chmod(path_full, file_attrs["mode"])
534  	                os.chown(path_full, file_attrs["uid"], file_attrs["gid"])
535  	    except (tarfile.TarError, EnvironmentError, OSError) as e:
536  	        utils.err("unable to restore the cluster: %s" % e)
537  	    finally:
538  	        if tmp_dir:
539  	            shutil.rmtree(tmp_dir, ignore_errors=True)
540  	
541  	    try:
542  	        sig_path = os.path.join(settings.cib_dir, "cib.xml.sig")
543  	        if os.path.exists(sig_path):
544  	            os.remove(sig_path)
545  	    except EnvironmentError as e:
546  	        utils.err("unable to remove %s: %s" % (sig_path, e))
547  	
548  	
549  	def config_backup_path_list(with_uid_gid=False):
550  	    """
551  	    Commandline options: no option
552  	    NOTE: corosync.conf path may be altered using --corosync_conf
553  	    """
554  	    corosync_attrs = {
555  	        "mtime": int(time.time()),
556  	        "mode": 0o644,
557  	        "uname": "root",
558  	        "gname": "root",
559  	        "uid": 0,
560  	        "gid": 0,
561  	    }
562  	    corosync_authkey_attrs = dict(corosync_attrs)
563  	    corosync_authkey_attrs["mode"] = 0o400
564  	    cib_attrs = {
565  	        "mtime": int(time.time()),
566  	        "mode": 0o600,
567  	        "uname": settings.pacemaker_uname,
568  	        "gname": settings.pacemaker_gname,
569  	    }
570  	    if with_uid_gid:
571  	        cib_attrs["uid"] = _get_uid(cib_attrs["uname"])
572  	        cib_attrs["gid"] = _get_gid(cib_attrs["gname"])
573  	
574  	    pcmk_authkey_attrs = dict(cib_attrs)
575  	    pcmk_authkey_attrs["mode"] = 0o440
576  	    return {
577  	        "cib.xml": {
578  	            "path": os.path.join(settings.cib_dir, "cib.xml"),
579  	            "required": True,
580  	            "attrs": dict(cib_attrs),
581  	        },
582  	        "corosync_authkey": {
583  	            "path": settings.corosync_authkey_file,
584  	            "required": False,
585  	            "attrs": corosync_authkey_attrs,
586  	            "restore_procedure": None,
587  	            "rename": True,
588  	        },
589  	        "pacemaker_authkey": {
590  	            "path": settings.pacemaker_authkey_file,
591  	            "required": False,
592  	            "attrs": pcmk_authkey_attrs,
593  	            "restore_procedure": None,
594  	            "rename": True,
595  	            "pre_store_call": _ensure_etc_pacemaker_exists,
596  	        },
597  	        "corosync.conf": {
598  	            "path": settings.corosync_conf_file,
599  	            "required": True,
600  	            "attrs": dict(corosync_attrs),
601  	        },
602  	        "uidgid.d": {
603  	            "path": settings.corosync_uidgid_dir,
604  	            "required": False,
605  	            "attrs": dict(corosync_attrs),
606  	        },
607  	        "pcs_settings.conf": {
608  	            "path": settings.pcsd_settings_conf_location,
609  	            "required": False,
610  	            "attrs": {
611  	                "mtime": int(time.time()),
612  	                "mode": 0o644,
613  	                "uname": "root",
614  	                "gname": "root",
615  	                "uid": 0,
616  	                "gid": 0,
617  	            },
618  	        },
619  	    }
620  	
621  	
622  	def _get_uid(user_name):
623  	    """
624  	    Commandline options: no options
625  	    """
626  	    try:
627  	        return pwd.getpwnam(user_name).pw_uid
628  	    except KeyError:
629  	        return utils.err(
630  	            "Unable to determine uid of user '{0}'".format(user_name)
631  	        )
632  	
633  	
634  	def _get_gid(group_name):
635  	    """
636  	    Commandline options: no options
637  	    """
638  	    try:
639  	        return grp.getgrnam(group_name).gr_gid
640  	    except KeyError:
641  	        return utils.err(
642  	            "Unable to determine gid of group '{0}'".format(group_name)
643  	        )
644  	
645  	
646  	def _ensure_etc_pacemaker_exists():
647  	    """
648  	    Commandline options: no options
649  	    """
650  	    dir_name = os.path.dirname(settings.pacemaker_authkey_file)
651  	    if not os.path.exists(dir_name):
652  	        os.mkdir(dir_name)
653  	        os.chmod(dir_name, 0o750)
654  	        os.chown(
655  	            dir_name,
656  	            _get_uid(settings.pacemaker_uname),
657  	            _get_gid(settings.pacemaker_gname),
658  	        )
659  	
660  	
661  	def config_backup_check_version(version):
662  	    """
663  	    Commandline options: no options
664  	    """
665  	    try:
666  	        version_number = int(version)
667  	        supported_version = config_backup_version()
668  	        if version_number > supported_version:
669  	            utils.err(
670  	                f"Unsupported version of the backup, supported version is "
671  	                f"{supported_version}, backup version is {version_number}"
672  	            )
673  	        if version_number < supported_version:
674  	            warn(
675  	                f"Restoring from the backup version {version_number}, current "
676  	                f"supported version is {supported_version}"
677  	            )
678  	    except TypeError:
679  	        utils.err("Cannot determine version of the backup")
680  	
681  	
682  	def config_backup_add_version_to_tarball(tarball, version=None):
683  	    """
684  	    Commandline options: no options
685  	    """
686  	    ver = version if version is not None else str(config_backup_version())
687  	    return utils.tar_add_file_data(tarball, ver.encode("utf-8"), "version.txt")
688  	
689  	
690  	def config_backup_version():
691  	    """
692  	    Commandline options: no options
693  	    """
694  	    return 1
695  	
696  	
697  	def config_checkpoint_list(lib, argv, modifiers):
698  	    """
699  	    Options: no options
700  	    """
701  	    del lib
702  	    modifiers.ensure_only_supported()
703  	    if argv:
704  	        raise CmdLineInputError()
705  	    try:
706  	        file_list = os.listdir(settings.cib_dir)
707  	    except OSError as e:
708  	        utils.err("unable to list checkpoints: %s" % e)
709  	    cib_list = []
710  	    cib_name_re = re.compile(r"^cib-(\d+)\.raw$")
711  	    for filename in file_list:
712  	        match = cib_name_re.match(filename)
713  	        if not match:
714  	            continue
715  	        file_path = os.path.join(settings.cib_dir, filename)
716  	        try:
717  	            if os.path.isfile(file_path):
718  	                cib_list.append(
719  	                    (float(os.path.getmtime(file_path)), match.group(1))
720  	                )
721  	        except OSError:
722  	            pass
723  	    cib_list.sort()
724  	    if not cib_list:
725  	        print_to_stderr("No checkpoints available")
726  	        return
727  	    for cib_info in cib_list:
728  	        print(
729  	            "checkpoint %s: date %s"
730  	            % (cib_info[1], datetime.datetime.fromtimestamp(round(cib_info[0])))
731  	        )
732  	
733  	
734  	def _checkpoint_to_lines(lib, checkpoint_number):
735  	    # backup current settings
736  	    orig_usefile = utils.usefile
737  	    orig_filename = utils.filename
738  	    orig_middleware = lib.middleware_factory
739  	    orig_env = lib.env
740  	    # configure old code to read the CIB from a file
741  	    utils.usefile = True
742  	    utils.filename = os.path.join(
743  	        settings.cib_dir, "cib-%s.raw" % checkpoint_number
744  	    )
745  	    # configure new code to read the CIB from a file
746  	    lib.middleware_factory = orig_middleware._replace(
747  	        cib=middleware.cib(utils.filename, utils.touch_cib_file)
748  	    )
749  	    lib.env = utils.get_cli_env()
750  	    # export the CIB to text
751  	    result = False, []
752  	    if os.path.isfile(utils.filename):
753  	        result = True, _config_show_cib_lines(lib)
754  	    # restore original settings
755  	    utils.usefile = orig_usefile
756  	    utils.filename = orig_filename
757  	    lib.middleware_factory = orig_middleware
758  	    lib.env = orig_env
759  	    return result
760  	
761  	
762  	def config_checkpoint_view(lib, argv, modifiers):
763  	    """
764  	    Options: no options
765  	    """
766  	    modifiers.ensure_only_supported()
767  	    if len(argv) != 1:
768  	        print_to_stderr(usage.config(["checkpoint view"]))
769  	        sys.exit(1)
770  	
771  	    loaded, lines = _checkpoint_to_lines(lib, argv[0])
772  	    if not loaded:
773  	        utils.err("unable to read the checkpoint")
774  	    print("\n".join(lines))
775  	
776  	
777  	def config_checkpoint_diff(lib, argv, modifiers):
778  	    """
779  	    Commandline options:
780  	      * -f - CIB file
781  	    """
782  	    modifiers.ensure_only_supported("-f")
783  	    if len(argv) != 2:
784  	        print_to_stderr(usage.config(["checkpoint diff"]))
785  	        sys.exit(1)
786  	
787  	    if argv[0] == argv[1]:
788  	        utils.err("cannot diff a checkpoint against itself")
789  	
790  	    errors = []
791  	    checkpoints_lines = []
792  	    for checkpoint in argv:
793  	        if checkpoint == "live":
794  	            lines = _config_show_cib_lines(lib)
795  	            if not lines:
796  	                errors.append("unable to read live configuration")
797  	            else:
798  	                checkpoints_lines.append(lines)
799  	        else:
800  	            loaded, lines = _checkpoint_to_lines(lib, checkpoint)
801  	            if not loaded:
802  	                errors.append(
803  	                    "unable to read checkpoint '{0}'".format(checkpoint)
804  	                )
805  	            else:
806  	                checkpoints_lines.append(lines)
807  	
808  	    if errors:
809  	        utils.err("\n".join(errors))
810  	
811  	    print(
812  	        "Differences between {0} (-) and {1} (+):".format(
813  	            *[
814  	                (
815  	                    "live configuration"
816  	                    if label == "live"
817  	                    else f"checkpoint {label}"
818  	                )
819  	                for label in argv
820  	            ]
821  	        )
822  	    )
823  	    print(
824  	        "\n".join(
825  	            [
826  	                line.rstrip()
827  	                for line in difflib.Differ().compare(
828  	                    checkpoints_lines[0], checkpoints_lines[1]
829  	                )
830  	            ]
831  	        )
832  	    )
833  	
834  	
835  	def config_checkpoint_restore(lib, argv, modifiers):
836  	    """
837  	    Options:
838  	      * -f - CIB file, a checkpoint will be restored into a specified file
839  	    """
840  	    del lib
841  	    modifiers.ensure_only_supported("-f")
842  	    if len(argv) != 1:
843  	        print_to_stderr(usage.config(["checkpoint restore"]))
844  	        sys.exit(1)
845  	
846  	    cib_path = os.path.join(settings.cib_dir, "cib-%s.raw" % argv[0])
847  	    try:
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
848  	        snapshot_dom = parse(cib_path)
849  	    # pylint: disable=broad-except
850  	    except Exception as e:
851  	        utils.err("unable to read the checkpoint: %s" % e)
852  	    utils.replace_cib_configuration(snapshot_dom)
853