1    	import datetime
2    	import difflib
3    	import grp
4    	import json
5    	import os
6    	import os.path
7    	import pwd
8    	import re
9    	import shutil
10   	import sys
11   	import tarfile
12   	import tempfile
13   	import time
14   	from io import BytesIO
15   	from typing import cast
16   	from xml.dom.minidom import parse
17   	
18   	from pcs import (
19   	    cluster,
20   	    quorum,
21   	    settings,
22   	    status,
23   	    usage,
24   	    utils,
25   	)
26   	from pcs.cli.alert.output import config_dto_to_lines as alerts_to_lines
27   	from pcs.cli.cluster_property.output import (
28   	    PropertyConfigurationFacade,
29   	    properties_to_text,
30   	)
31   	from pcs.cli.common import middleware
32   	from pcs.cli.common.errors import CmdLineInputError
33   	from pcs.cli.common.output import (
34   	    INDENT_STEP,
35   	    smart_wrap_text,
36   	)
37   	from pcs.cli.constraint.output import constraints_to_text
38   	from pcs.cli.nvset import nvset_dto_list_to_lines
39   	from pcs.cli.reports import process_library_reports
40   	from pcs.cli.reports.output import (
41   	    print_to_stderr,
42   	    warn,
43   	)
44   	from pcs.cli.resource.output import (
45   	    ResourcesConfigurationFacade,
46   	    resources_to_text,
47   	)
48   	from pcs.cli.stonith.levels.output import stonith_level_config_to_text
49   	from pcs.cli.tag.output import tags_to_text
50   	from pcs.common.interface import dto
51   	from pcs.common.pacemaker.constraint import CibConstraintsDto
52   	from pcs.common.str_tools import indent
53   	from pcs.lib.errors import LibraryError
54   	from pcs.lib.node import get_existing_nodes_names
55   	
56   	
57   	def config_show(lib, argv, modifiers):
58   	    """
59   	    Options:
60   	      * -f - CIB file, when getting cluster name on remote node (corosync.conf
61   	        doesn't exist)
62   	      * --corosync_conf - corosync.conf file
63   	    """
64   	    modifiers.ensure_only_supported("-f", "--corosync_conf")
65   	    if argv:
66   	        raise CmdLineInputError()
67   	
68   	    corosync_conf_dto = None
69   	    cluster_name = ""
70   	    properties_facade = PropertyConfigurationFacade.from_properties_config(
71   	        lib.cluster_property.get_properties(),
72   	    )
73   	    try:
74   	        corosync_conf_dto = lib.cluster.get_corosync_conf_struct()
75   	        cluster_name = corosync_conf_dto.cluster_name
76   	    except LibraryError:
77   	        # there is no corosync.conf on remote nodes, we can try to
78   	        # get cluster name from pacemaker
79   	        pass
80   	    if not cluster_name:
81   	        cluster_name = properties_facade.get_property_value("cluster-name", "")
82   	    print("Cluster Name: %s" % cluster_name)
83   	
84   	    status.nodes_status(lib, ["config"], modifiers.get_subset("-f"))
85   	    cib_lines = _config_show_cib_lines(lib, properties_facade=properties_facade)
86   	    if cib_lines:
87   	        print()
88   	        print("\n".join(cib_lines))
89   	    if (
90   	        utils.hasCorosyncConf()
91   	        and not modifiers.is_specified("-f")
92   	        and not modifiers.is_specified("--corosync_conf")
93   	    ):
94   	        cluster.cluster_uidgid(
95   	            lib, [], modifiers.get_subset(), silent_list=True
96   	        )
97   	    if corosync_conf_dto:
98   	        quorum_device_dict = {}
99   	        if corosync_conf_dto.quorum_device:
100  	            quorum_device_dict = dto.to_dict(corosync_conf_dto.quorum_device)
101  	        config = dict(
102  	            options=corosync_conf_dto.quorum_options,
103  	            device=quorum_device_dict,
104  	        )
105  	        quorum_lines = quorum.quorum_config_to_str(config)
106  	        if quorum_lines:
107  	            print()
108  	            print("Quorum:")
109  	            print("\n".join(indent(quorum_lines)))
110  	
111  	
112  	def _config_show_cib_lines(lib, properties_facade=None):  # noqa: PLR0912, PLR0915
113  	    """
114  	    Commandline options:
115  	      * -f - CIB file
116  	    """
117  	    # pylint: disable=too-many-branches
118  	    # pylint: disable=too-many-locals
119  	    # pylint: disable=too-many-statements
120  	
121  	    # update of pcs_options will change output of constraint show and
122  	    # displaying resources and operations defaults
123  	    utils.pcs_options["--full"] = 1
124  	    # get latest modifiers object after updating pcs_options
125  	    modifiers = utils.get_input_modifiers()
126  	
127  	    resources_facade = ResourcesConfigurationFacade.from_resources_dto(
128  	        lib.resource.get_configured_resources()
129  	    )
130  	
131  	    all_lines = []
132  	
133  	    resources_lines = smart_wrap_text(
134  	        indent(
135  	            resources_to_text(resources_facade.filter_stonith(False)),
136  	            indent_step=INDENT_STEP,
137  	        )
138  	    )
139  	    if resources_lines:
140  	        all_lines.append("Resources:")
141  	        all_lines.extend(resources_lines)
142  	
143  	    stonith_lines = smart_wrap_text(
144  	        indent(
145  	            resources_to_text(resources_facade.filter_stonith(True)),
146  	            indent_step=INDENT_STEP,
147  	        )
148  	    )
149  	    if stonith_lines:
150  	        if all_lines:
151  	            all_lines.append("")
152  	        all_lines.append("Stonith Devices:")
153  	        all_lines.extend(stonith_lines)
154  	
155  	    levels_lines = stonith_level_config_to_text(
156  	        lib.fencing_topology.get_config_dto()
157  	    )
158  	    if levels_lines:
159  	        if all_lines:
160  	            all_lines.append("")
161  	        all_lines.append("Fencing Levels:")
162  	        all_lines.extend(indent(levels_lines, indent_step=2))
163  	
164  	    constraints_lines = smart_wrap_text(
165  	        constraints_to_text(
166  	            cast(
167  	                CibConstraintsDto,
168  	                lib.constraint.get_config(evaluate_rules=False),
169  	            ),
170  	            modifiers.is_specified("--full"),
171  	        )
172  	    )
173  	    if constraints_lines:
174  	        if all_lines:
175  	            all_lines.append("")
176  	        all_lines.extend(constraints_lines)
177  	
178  	    alert_lines = indent(alerts_to_lines(lib.alert.get_config_dto()))
179  	    if alert_lines:
180  	        if all_lines:
181  	            all_lines.append("")
182  	        all_lines.append("Alerts:")
183  	        all_lines.extend(alert_lines)
184  	
185  	    resources_defaults_lines = indent(
186  	        nvset_dto_list_to_lines(
187  	            lib.cib_options.resource_defaults_config(
188  	                evaluate_expired=False
189  	            ).meta_attributes,
190  	            nvset_label="Meta Attrs",
191  	            with_ids=modifiers.get("--full"),
192  	        )
193  	    )
194  	    if resources_defaults_lines:
195  	        if all_lines:
196  	            all_lines.append("")
197  	        all_lines.append("Resources Defaults:")
198  	        all_lines.extend(resources_defaults_lines)
199  	
200  	    operations_defaults_lines = indent(
201  	        nvset_dto_list_to_lines(
202  	            lib.cib_options.operation_defaults_config(
203  	                evaluate_expired=False
204  	            ).meta_attributes,
205  	            nvset_label="Meta Attrs",
206  	            with_ids=modifiers.get("--full"),
207  	        )
208  	    )
209  	    if operations_defaults_lines:
210  	        if all_lines:
211  	            all_lines.append("")
212  	        all_lines.append("Operations Defaults:")
213  	        all_lines.extend(operations_defaults_lines)
214  	
215  	    if not properties_facade:
216  	        properties_facade = PropertyConfigurationFacade.from_properties_config(
217  	            lib.cluster_property.get_properties()
218  	        )
219  	    properties_lines = properties_to_text(properties_facade)
220  	    if properties_lines:
221  	        if all_lines:
222  	            all_lines.append("")
223  	        all_lines.extend(properties_lines)
224  	
225  	    tag_lines = smart_wrap_text(tags_to_text(lib.tag.get_config_dto([])))
226  	    if tag_lines:
227  	        if all_lines:
228  	            all_lines.append("")
229  	        all_lines.append("Tags:")
230  	        all_lines.extend(indent(tag_lines, indent_step=1))
231  	
232  	    return all_lines
233  	
234  	
235  	def config_backup(lib, argv, modifiers):
236  	    """
237  	    Options:
238  	      * --force - overwrite file if already exists
239  	    """
240  	    del lib
241  	    modifiers.ensure_only_supported("--force")
242  	    if len(argv) > 1:
243  	        raise CmdLineInputError()
244  	
245  	    outfile_name = None
246  	    if argv:
247  	        outfile_name = argv[0]
248  	        if not outfile_name.endswith(".tar.bz2"):
249  	            outfile_name += ".tar.bz2"
250  	
251  	    tar_data = config_backup_local()
252  	    if outfile_name:
253  	        ok, message = utils.write_file(
254  	            outfile_name, tar_data, permissions=0o600, binary=True
255  	        )
256  	        if not ok:
257  	            utils.err(message)
258  	    else:
259  	        # in python3 stdout accepts str so we need to use buffer
260  	        sys.stdout.buffer.write(tar_data)
261  	
262  	
263  	def config_backup_local():
264  	    """
265  	    Commandline options: no options
266  	    """
267  	    file_list = config_backup_path_list()
268  	    tar_data = BytesIO()
269  	
270  	    try:
271  	        with tarfile.open(fileobj=tar_data, mode="w|bz2") as tarball:
272  	            config_backup_add_version_to_tarball(tarball)
273  	            for tar_path, path_info in file_list.items():
274  	                if (
275  	                    not os.path.exists(path_info["path"])
276  	                    and not path_info["required"]
277  	                ):
278  	                    continue
279  	                tarball.add(path_info["path"], tar_path)
280  	    except (tarfile.TarError, EnvironmentError) as e:
281  	        utils.err("unable to create tarball: %s" % e)
282  	
283  	    tar = tar_data.getvalue()
284  	    tar_data.close()
285  	    return tar
286  	
287  	
288  	def config_restore(lib, argv, modifiers):
289  	    """
290  	    Options:
291  	      * --local - restore config only on local node
292  	      * --request-timeout - timeout for HTTP requests, used only if --local was
293  	        not defined or user is not root
294  	    """
295  	    del lib
296  	    modifiers.ensure_only_supported("--local", "--request-timeout")
297  	    if len(argv) > 1:
298  	        raise CmdLineInputError()
299  	
300  	    infile_name = infile_obj = None
301  	    if argv:
302  	        infile_name = argv[0]
303  	    if not infile_name:
304  	        # in python3 stdin returns str so we need to use buffer
305  	        infile_obj = BytesIO(sys.stdin.buffer.read())
306  	
307  	    if os.getuid() == 0:
308  	        if modifiers.get("--local"):
309  	            config_restore_local(infile_name, infile_obj)
310  	        else:
311  	            config_restore_remote(infile_name, infile_obj)
312  	    else:
313  	        new_argv = ["config", "restore"]
314  	        options = []
315  	        new_stdin = None
316  	        if modifiers.get("--local"):
317  	            options.append("--local")
318  	        if infile_name:
319  	            new_argv.append(os.path.abspath(infile_name))
320  	        else:
321  	            new_stdin = infile_obj.read()
322  	        err_msgs, exitcode, std_out, std_err = utils.call_local_pcsd(
323  	            new_argv, options, new_stdin
324  	        )
325  	        if err_msgs:
326  	            for msg in err_msgs:
327  	                utils.err(msg, False)
328  	            sys.exit(1)
329  	        print(std_out)
330  	        sys.stderr.write(std_err)
331  	        sys.exit(exitcode)
332  	
333  	
334  	def config_restore_remote(infile_name, infile_obj):  # noqa: PLR0912
335  	    """
336  	    Commandline options:
337  	      * --request-timeout - timeout for HTTP requests
338  	    """
339  	    # pylint: disable=too-many-branches
340  	    # pylint: disable=too-many-locals
341  	    extracted = {
342  	        "version.txt": "",
343  	        "corosync.conf": "",
344  	    }
345  	    try:
346  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
347  	            while True:
348  	                # next(tarball) does not work in python2.6
349  	                tar_member_info = tarball.next()
350  	                if tar_member_info is None:
351  	                    break
352  	                if tar_member_info.name in extracted:
353  	                    tar_member = tarball.extractfile(tar_member_info)
354  	                    extracted[tar_member_info.name] = tar_member.read()
355  	                    tar_member.close()
356  	    except (tarfile.TarError, EnvironmentError) as e:
357  	        utils.err("unable to read the tarball: %s" % e)
358  	
359  	    config_backup_check_version(extracted["version.txt"])
360  	
361  	    node_list, report_list = get_existing_nodes_names(
362  	        utils.get_corosync_conf_facade(
363  	            conf_text=extracted["corosync.conf"].decode("utf-8")
364  	        )
365  	    )
366  	    if report_list:
367  	        process_library_reports(report_list)
368  	    if not node_list:
369  	        utils.err("no nodes found in the tarball")
370  	
371  	    err_msgs = []
372  	    for node in node_list:
373  	        try:
374  	            retval, output = utils.checkStatus(node)
375  	            if retval != 0:
376  	                err_msgs.append(output)
377  	                continue
378  	            _status = json.loads(output)
379  	            if any(
380  	                _status["node"]["services"][service_name]["running"]
381  	                for service_name in (
382  	                    "corosync",
383  	                    "pacemaker",
384  	                    "pacemaker_remote",
385  	                )
386  	            ):
387  	                err_msgs.append(
388  	                    "Cluster is currently running on node %s. You need to stop "
389  	                    "the cluster in order to restore the configuration." % node
390  	                )
391  	                continue
392  	        except (ValueError, NameError, LookupError):
393  	            err_msgs.append("unable to determine status of the node %s" % node)
394  	    if err_msgs:
395  	        for msg in err_msgs:
396  	            utils.err(msg, False)
397  	        sys.exit(1)
398  	
399  	    # Temporarily disable config files syncing thread in pcsd so it will not
400  	    # rewrite restored files. 10 minutes should be enough time to restore.
401  	    # If node returns HTTP 404 it does not support config syncing at all.
402  	    for node in node_list:
403  	        retval, output = utils.pauseConfigSyncing(node, 10 * 60)
404  	        if not (retval == 0 or "(HTTP error: 404)" in output):
405  	            utils.err(output)
406  	
407  	    if infile_obj:
408  	        infile_obj.seek(0)
409  	        tarball_data = infile_obj.read()
410  	    else:
411  	        with open(infile_name, "rb") as tarball:
412  	            tarball_data = tarball.read()
413  	
414  	    error_list = []
415  	    for node in node_list:
416  	        retval, error = utils.restoreConfig(node, tarball_data)
417  	        if retval != 0:
418  	            error_list.append(error)
419  	    if error_list:
420  	        utils.err("unable to restore all nodes\n" + "\n".join(error_list))
421  	
422  	
423  	def config_restore_local(infile_name, infile_obj):  # noqa: PLR0912, PLR0915
424  	    """
425  	    Commandline options: no options
426  	    """
427  	    # pylint: disable=too-many-branches
428  	    # pylint: disable=too-many-locals
429  	    # pylint: disable=too-many-statements
430  	    service_manager = utils.get_service_manager()
431  	    if (
432  	        service_manager.is_running("corosync")
433  	        or service_manager.is_running("pacemaker")
434  	        or service_manager.is_running("pacemaker_remote")
435  	    ):
436  	        utils.err(
437  	            "Cluster is currently running on this node. You need to stop "
438  	            "the cluster in order to restore the configuration."
439  	        )
440  	
441  	    file_list = config_backup_path_list(with_uid_gid=True)
442  	    tarball_file_list = []
443  	    version = None
444  	    tmp_dir = None
445  	    try:
446  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
447  	            while True:
448  	                # next(tarball) does not work in python2.6
449  	                tar_member_info = tarball.next()
450  	                if tar_member_info is None:
451  	                    break
452  	                if tar_member_info.name == "version.txt":
453  	                    version_data = tarball.extractfile(tar_member_info)
454  	                    version = version_data.read()
455  	                    version_data.close()
456  	                    continue
457  	                tarball_file_list.append(tar_member_info.name)
458  	
459  	        required_file_list = [
460  	            tar_path
461  	            for tar_path, path_info in file_list.items()
462  	            if path_info["required"]
463  	        ]
464  	        missing = set(required_file_list) - set(tarball_file_list)
465  	        if missing:
466  	            utils.err(
467  	                "unable to restore the cluster, missing files in backup: %s"
468  	                % ", ".join(missing)
469  	            )
470  	
471  	        config_backup_check_version(version)
472  	
473  	        if infile_obj:
474  	            infile_obj.seek(0)
475  	        with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
476  	            while True:
477  	                # next(tarball) does not work in python2.6
478  	                tar_member_info = tarball.next()
479  	                if tar_member_info is None:
480  	                    break
481  	                extract_info = None
482  	                path = tar_member_info.name
483  	                while path:
484  	                    if path in file_list:
485  	                        extract_info = file_list[path]
486  	                        break
487  	                    path = os.path.dirname(path)
488  	                if not extract_info:
489  	                    continue
490  	                path_full = None
491  	                if callable(extract_info.get("pre_store_call")):
492  	                    extract_info["pre_store_call"]()
493  	                if "rename" in extract_info and extract_info["rename"]:
494  	                    if tmp_dir is None:
495  	                        tmp_dir = tempfile.mkdtemp()
496  	                    if hasattr(tarfile, "data_filter"):
497  	                        # Safe way of extraction is available since Python 3.12,
498  	                        # hasattr above checks if it's available.
499  	                        # It's also backported to 3.11.4, 3.10.12, 3.9.17.
500  	                        # It may be backported to older versions in downstream.
501  	                        tarball.extractall(
502  	                            tmp_dir, [tar_member_info], filter="data"
503  	                        )
504  	                    else:
505  	                        # Unsafe way of extraction
506  	                        # Remove once we don't support Python 3.8 and older
507  	                        tarball.extractall(tmp_dir, [tar_member_info])
508  	                    path_full = extract_info["path"]
509  	                    shutil.move(
510  	                        os.path.join(tmp_dir, tar_member_info.name), path_full
511  	                    )
512  	                else:
513  	                    dir_path = os.path.dirname(extract_info["path"])
514  	                    if hasattr(tarfile, "data_filter"):
515  	                        # Safe way of extraction is available since Python 3.12,
516  	                        # hasattr above checks if it's available.
517  	                        # It's also backported to 3.11.4, 3.10.12, 3.9.17.
518  	                        # It may be backported to older versions in downstream.
519  	                        tarball.extractall(
520  	                            dir_path, [tar_member_info], filter="data"
521  	                        )
522  	                    else:
523  	                        # Unsafe way of extracting
524  	                        # Remove once we don't support Python 3.8 and older
525  	                        tarball.extractall(dir_path, [tar_member_info])
526  	                    path_full = os.path.join(dir_path, tar_member_info.name)
527  	                file_attrs = extract_info["attrs"]
528  	                os.chmod(path_full, file_attrs["mode"])
529  	                os.chown(path_full, file_attrs["uid"], file_attrs["gid"])
530  	    except (tarfile.TarError, EnvironmentError, OSError) as e:
531  	        utils.err("unable to restore the cluster: %s" % e)
532  	    finally:
533  	        if tmp_dir:
534  	            shutil.rmtree(tmp_dir, ignore_errors=True)
535  	
536  	    try:
537  	        sig_path = os.path.join(settings.cib_dir, "cib.xml.sig")
538  	        if os.path.exists(sig_path):
539  	            os.remove(sig_path)
540  	    except EnvironmentError as e:
541  	        utils.err("unable to remove %s: %s" % (sig_path, e))
542  	
543  	
544  	def config_backup_path_list(with_uid_gid=False):
545  	    """
546  	    Commandline options: no option
547  	    NOTE: corosync.conf path may be altered using --corosync_conf
548  	    """
549  	    corosync_attrs = {
550  	        "mtime": int(time.time()),
551  	        "mode": 0o644,
552  	        "uname": "root",
553  	        "gname": "root",
554  	        "uid": 0,
555  	        "gid": 0,
556  	    }
557  	    corosync_authkey_attrs = dict(corosync_attrs)
558  	    corosync_authkey_attrs["mode"] = 0o400
559  	    cib_attrs = {
560  	        "mtime": int(time.time()),
561  	        "mode": 0o600,
562  	        "uname": settings.pacemaker_uname,
563  	        "gname": settings.pacemaker_gname,
564  	    }
565  	    if with_uid_gid:
566  	        cib_attrs["uid"] = _get_uid(cib_attrs["uname"])
567  	        cib_attrs["gid"] = _get_gid(cib_attrs["gname"])
568  	
569  	    pcmk_authkey_attrs = dict(cib_attrs)
570  	    pcmk_authkey_attrs["mode"] = 0o440
571  	    return {
572  	        "cib.xml": {
573  	            "path": os.path.join(settings.cib_dir, "cib.xml"),
574  	            "required": True,
575  	            "attrs": dict(cib_attrs),
576  	        },
577  	        "corosync_authkey": {
578  	            "path": settings.corosync_authkey_file,
579  	            "required": False,
580  	            "attrs": corosync_authkey_attrs,
581  	            "restore_procedure": None,
582  	            "rename": True,
583  	        },
584  	        "pacemaker_authkey": {
585  	            "path": settings.pacemaker_authkey_file,
586  	            "required": False,
587  	            "attrs": pcmk_authkey_attrs,
588  	            "restore_procedure": None,
589  	            "rename": True,
590  	            "pre_store_call": _ensure_etc_pacemaker_exists,
591  	        },
592  	        "corosync.conf": {
593  	            "path": settings.corosync_conf_file,
594  	            "required": True,
595  	            "attrs": dict(corosync_attrs),
596  	        },
597  	        "uidgid.d": {
598  	            "path": settings.corosync_uidgid_dir,
599  	            "required": False,
600  	            "attrs": dict(corosync_attrs),
601  	        },
602  	        "pcs_settings.conf": {
603  	            "path": settings.pcsd_settings_conf_location,
604  	            "required": False,
605  	            "attrs": {
606  	                "mtime": int(time.time()),
607  	                "mode": 0o644,
608  	                "uname": "root",
609  	                "gname": "root",
610  	                "uid": 0,
611  	                "gid": 0,
612  	            },
613  	        },
614  	    }
615  	
616  	
617  	def _get_uid(user_name):
618  	    """
619  	    Commandline options: no options
620  	    """
621  	    try:
622  	        return pwd.getpwnam(user_name).pw_uid
623  	    except KeyError:
624  	        return utils.err(
625  	            "Unable to determine uid of user '{0}'".format(user_name)
626  	        )
627  	
628  	
629  	def _get_gid(group_name):
630  	    """
631  	    Commandline options: no options
632  	    """
633  	    try:
634  	        return grp.getgrnam(group_name).gr_gid
635  	    except KeyError:
636  	        return utils.err(
637  	            "Unable to determine gid of group '{0}'".format(group_name)
638  	        )
639  	
640  	
641  	def _ensure_etc_pacemaker_exists():
642  	    """
643  	    Commandline options: no options
644  	    """
645  	    dir_name = os.path.dirname(settings.pacemaker_authkey_file)
646  	    if not os.path.exists(dir_name):
647  	        os.mkdir(dir_name)
648  	        os.chmod(dir_name, 0o750)
649  	        os.chown(
650  	            dir_name,
651  	            _get_uid(settings.pacemaker_uname),
652  	            _get_gid(settings.pacemaker_gname),
653  	        )
654  	
655  	
656  	def config_backup_check_version(version):
657  	    """
658  	    Commandline options: no options
659  	    """
660  	    try:
661  	        version_number = int(version)
662  	        supported_version = config_backup_version()
663  	        if version_number > supported_version:
664  	            utils.err(
665  	                f"Unsupported version of the backup, supported version is "
666  	                f"{supported_version}, backup version is {version_number}"
667  	            )
668  	        if version_number < supported_version:
669  	            warn(
670  	                f"Restoring from the backup version {version_number}, current "
671  	                f"supported version is {supported_version}"
672  	            )
673  	    except TypeError:
674  	        utils.err("Cannot determine version of the backup")
675  	
676  	
677  	def config_backup_add_version_to_tarball(tarball, version=None):
678  	    """
679  	    Commandline options: no options
680  	    """
681  	    ver = version if version is not None else str(config_backup_version())
682  	    return utils.tar_add_file_data(tarball, ver.encode("utf-8"), "version.txt")
683  	
684  	
685  	def config_backup_version():
686  	    """
687  	    Commandline options: no options
688  	    """
689  	    return 1
690  	
691  	
692  	def config_checkpoint_list(lib, argv, modifiers):
693  	    """
694  	    Options: no options
695  	    """
696  	    del lib
697  	    modifiers.ensure_only_supported()
698  	    if argv:
699  	        raise CmdLineInputError()
700  	    try:
701  	        file_list = os.listdir(settings.cib_dir)
702  	    except OSError as e:
703  	        utils.err("unable to list checkpoints: %s" % e)
704  	    cib_list = []
705  	    cib_name_re = re.compile(r"^cib-(\d+)\.raw$")
706  	    for filename in file_list:
707  	        match = cib_name_re.match(filename)
708  	        if not match:
709  	            continue
710  	        file_path = os.path.join(settings.cib_dir, filename)
711  	        try:
712  	            if os.path.isfile(file_path):
713  	                cib_list.append(
714  	                    (float(os.path.getmtime(file_path)), match.group(1))
715  	                )
716  	        except OSError:
717  	            pass
718  	    cib_list.sort()
719  	    if not cib_list:
720  	        print_to_stderr("No checkpoints available")
721  	        return
722  	    for cib_info in cib_list:
723  	        print(
724  	            "checkpoint %s: date %s"
725  	            % (cib_info[1], datetime.datetime.fromtimestamp(round(cib_info[0])))
726  	        )
727  	
728  	
729  	def _checkpoint_to_lines(lib, checkpoint_number):
730  	    # backup current settings
731  	    orig_usefile = utils.usefile
732  	    orig_filename = utils.filename
733  	    orig_middleware = lib.middleware_factory
734  	    orig_env = lib.env
735  	    # configure old code to read the CIB from a file
736  	    utils.usefile = True
737  	    utils.filename = os.path.join(
738  	        settings.cib_dir, "cib-%s.raw" % checkpoint_number
739  	    )
740  	    # configure new code to read the CIB from a file
741  	    lib.middleware_factory = orig_middleware._replace(
742  	        cib=middleware.cib(utils.filename, utils.touch_cib_file)
743  	    )
744  	    lib.env = utils.get_cli_env()
745  	    # export the CIB to text
746  	    result = False, []
747  	    if os.path.isfile(utils.filename):
748  	        result = True, _config_show_cib_lines(lib)
749  	    # restore original settings
750  	    utils.usefile = orig_usefile
751  	    utils.filename = orig_filename
752  	    lib.middleware_factory = orig_middleware
753  	    lib.env = orig_env
754  	    return result
755  	
756  	
757  	def config_checkpoint_view(lib, argv, modifiers):
758  	    """
759  	    Options: no options
760  	    """
761  	    modifiers.ensure_only_supported()
762  	    if len(argv) != 1:
763  	        print_to_stderr(usage.config(["checkpoint view"]))
764  	        sys.exit(1)
765  	
766  	    loaded, lines = _checkpoint_to_lines(lib, argv[0])
767  	    if not loaded:
768  	        utils.err("unable to read the checkpoint")
769  	    print("\n".join(lines))
770  	
771  	
772  	def config_checkpoint_diff(lib, argv, modifiers):
773  	    """
774  	    Commandline options:
775  	      * -f - CIB file
776  	    """
777  	    modifiers.ensure_only_supported("-f")
778  	    if len(argv) != 2:
779  	        print_to_stderr(usage.config(["checkpoint diff"]))
780  	        sys.exit(1)
781  	
782  	    if argv[0] == argv[1]:
783  	        utils.err("cannot diff a checkpoint against itself")
784  	
785  	    errors = []
786  	    checkpoints_lines = []
787  	    for checkpoint in argv:
788  	        if checkpoint == "live":
789  	            lines = _config_show_cib_lines(lib)
790  	            if not lines:
791  	                errors.append("unable to read live configuration")
792  	            else:
793  	                checkpoints_lines.append(lines)
794  	        else:
795  	            loaded, lines = _checkpoint_to_lines(lib, checkpoint)
796  	            if not loaded:
797  	                errors.append(
798  	                    "unable to read checkpoint '{0}'".format(checkpoint)
799  	                )
800  	            else:
801  	                checkpoints_lines.append(lines)
802  	
803  	    if errors:
804  	        utils.err("\n".join(errors))
805  	
806  	    print(
807  	        "Differences between {0} (-) and {1} (+):".format(
808  	            *[
809  	                (
810  	                    "live configuration"
811  	                    if label == "live"
812  	                    else f"checkpoint {label}"
813  	                )
814  	                for label in argv
815  	            ]
816  	        )
817  	    )
818  	    print(
819  	        "\n".join(
820  	            [
821  	                line.rstrip()
822  	                for line in difflib.Differ().compare(
823  	                    checkpoints_lines[0], checkpoints_lines[1]
824  	                )
825  	            ]
826  	        )
827  	    )
828  	
829  	
830  	def config_checkpoint_restore(lib, argv, modifiers):
831  	    """
832  	    Options:
833  	      * -f - CIB file, a checkpoint will be restored into a specified file
834  	    """
835  	    del lib
836  	    modifiers.ensure_only_supported("-f")
837  	    if len(argv) != 1:
838  	        print_to_stderr(usage.config(["checkpoint restore"]))
839  	        sys.exit(1)
840  	
841  	    cib_path = os.path.join(settings.cib_dir, "cib-%s.raw" % argv[0])
842  	    try:
(1) Event Sigma main event: The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks.
(2) Event remediation: Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks.
843  	        snapshot_dom = parse(cib_path)
844  	    # pylint: disable=broad-except
845  	    except Exception as e:
846  	        utils.err("unable to read the checkpoint: %s" % e)
847  	    utils.replace_cib_configuration(snapshot_dom)
848