1 import datetime
2 import difflib
3 import grp
4 import json
5 import os
6 import os.path
7 import pwd
8 import re
9 import shutil
10 import sys
11 import tarfile
12 import tempfile
13 import time
14 from io import BytesIO
15 from typing import cast
16 from xml.dom.minidom import parse
17
18 from pcs import (
19 cluster,
20 quorum,
21 settings,
22 status,
23 usage,
24 utils,
25 )
26 from pcs.cli.alert.output import config_dto_to_lines as alerts_to_lines
27 from pcs.cli.cluster_property.output import (
28 PropertyConfigurationFacade,
29 properties_to_text,
30 )
31 from pcs.cli.common import middleware
32 from pcs.cli.common.errors import CmdLineInputError
33 from pcs.cli.common.output import (
34 INDENT_STEP,
35 smart_wrap_text,
36 )
37 from pcs.cli.constraint.output import constraints_to_text
38 from pcs.cli.nvset import nvset_dto_list_to_lines
39 from pcs.cli.reports import process_library_reports
40 from pcs.cli.reports.output import (
41 print_to_stderr,
42 warn,
43 )
44 from pcs.cli.resource.output import (
45 ResourcesConfigurationFacade,
46 resources_to_text,
47 )
48 from pcs.cli.stonith.levels.output import stonith_level_config_to_text
49 from pcs.cli.tag.output import tags_to_text
50 from pcs.common.interface import dto
51 from pcs.common.pacemaker.constraint import CibConstraintsDto
52 from pcs.common.str_tools import indent
53 from pcs.lib.errors import LibraryError
54 from pcs.lib.node import get_existing_nodes_names
55
56
57 def config_show(lib, argv, modifiers):
58 """
59 Options:
60 * -f - CIB file, when getting cluster name on remote node (corosync.conf
61 doesn't exist)
62 * --corosync_conf - corosync.conf file
63 """
64 modifiers.ensure_only_supported("-f", "--corosync_conf")
65 if argv:
66 raise CmdLineInputError()
67
68 corosync_conf_dto = None
69 cluster_name = ""
70 properties_facade = PropertyConfigurationFacade.from_properties_config(
71 lib.cluster_property.get_properties(),
72 )
73 try:
74 corosync_conf_dto = lib.cluster.get_corosync_conf_struct()
75 cluster_name = corosync_conf_dto.cluster_name
76 except LibraryError:
77 # there is no corosync.conf on remote nodes, we can try to
78 # get cluster name from pacemaker
79 pass
80 if not cluster_name:
81 cluster_name = properties_facade.get_property_value("cluster-name", "")
82 print("Cluster Name: %s" % cluster_name)
83
84 status.nodes_status(lib, ["config"], modifiers.get_subset("-f"))
85 cib_lines = _config_show_cib_lines(lib, properties_facade=properties_facade)
86 if cib_lines:
87 print()
88 print("\n".join(cib_lines))
89 if (
90 utils.hasCorosyncConf()
91 and not modifiers.is_specified("-f")
92 and not modifiers.is_specified("--corosync_conf")
93 ):
94 cluster.cluster_uidgid(
95 lib, [], modifiers.get_subset(), silent_list=True
96 )
97 if corosync_conf_dto:
98 quorum_device_dict = {}
99 if corosync_conf_dto.quorum_device:
100 quorum_device_dict = dto.to_dict(corosync_conf_dto.quorum_device)
101 config = dict(
102 options=corosync_conf_dto.quorum_options,
103 device=quorum_device_dict,
104 )
105 quorum_lines = quorum.quorum_config_to_str(config)
106 if quorum_lines:
107 print()
108 print("Quorum:")
109 print("\n".join(indent(quorum_lines)))
110
111
112 def _config_show_cib_lines(lib, properties_facade=None): # noqa: PLR0912, PLR0915
113 """
114 Commandline options:
115 * -f - CIB file
116 """
117 # pylint: disable=too-many-branches
118 # pylint: disable=too-many-locals
119 # pylint: disable=too-many-statements
120
121 # update of pcs_options will change output of constraint show and
122 # displaying resources and operations defaults
123 utils.pcs_options["--full"] = 1
124 # get latest modifiers object after updating pcs_options
125 modifiers = utils.get_input_modifiers()
126
127 resources_facade = ResourcesConfigurationFacade.from_resources_dto(
128 lib.resource.get_configured_resources()
129 )
130
131 all_lines = []
132
133 resources_lines = smart_wrap_text(
134 indent(
135 resources_to_text(resources_facade.filter_stonith(False)),
136 indent_step=INDENT_STEP,
137 )
138 )
139 if resources_lines:
140 all_lines.append("Resources:")
141 all_lines.extend(resources_lines)
142
143 stonith_lines = smart_wrap_text(
144 indent(
145 resources_to_text(resources_facade.filter_stonith(True)),
146 indent_step=INDENT_STEP,
147 )
148 )
149 if stonith_lines:
150 if all_lines:
151 all_lines.append("")
152 all_lines.append("Stonith Devices:")
153 all_lines.extend(stonith_lines)
154
155 levels_lines = stonith_level_config_to_text(
156 lib.fencing_topology.get_config_dto()
157 )
158 if levels_lines:
159 if all_lines:
160 all_lines.append("")
161 all_lines.append("Fencing Levels:")
162 all_lines.extend(indent(levels_lines, indent_step=2))
163
164 constraints_lines = smart_wrap_text(
165 constraints_to_text(
166 cast(
167 CibConstraintsDto,
168 lib.constraint.get_config(evaluate_rules=False),
169 ),
170 modifiers.is_specified("--full"),
171 )
172 )
173 if constraints_lines:
174 if all_lines:
175 all_lines.append("")
176 all_lines.extend(constraints_lines)
177
178 alert_lines = indent(alerts_to_lines(lib.alert.get_config_dto()))
179 if alert_lines:
180 if all_lines:
181 all_lines.append("")
182 all_lines.append("Alerts:")
183 all_lines.extend(alert_lines)
184
185 resources_defaults_lines = indent(
186 nvset_dto_list_to_lines(
187 lib.cib_options.resource_defaults_config(
188 evaluate_expired=False
189 ).meta_attributes,
190 nvset_label="Meta Attrs",
191 with_ids=modifiers.get("--full"),
192 )
193 )
194 if resources_defaults_lines:
195 if all_lines:
196 all_lines.append("")
197 all_lines.append("Resources Defaults:")
198 all_lines.extend(resources_defaults_lines)
199
200 operations_defaults_lines = indent(
201 nvset_dto_list_to_lines(
202 lib.cib_options.operation_defaults_config(
203 evaluate_expired=False
204 ).meta_attributes,
205 nvset_label="Meta Attrs",
206 with_ids=modifiers.get("--full"),
207 )
208 )
209 if operations_defaults_lines:
210 if all_lines:
211 all_lines.append("")
212 all_lines.append("Operations Defaults:")
213 all_lines.extend(operations_defaults_lines)
214
215 if not properties_facade:
216 properties_facade = PropertyConfigurationFacade.from_properties_config(
217 lib.cluster_property.get_properties()
218 )
219 properties_lines = properties_to_text(properties_facade)
220 if properties_lines:
221 if all_lines:
222 all_lines.append("")
223 all_lines.extend(properties_lines)
224
225 tag_lines = smart_wrap_text(tags_to_text(lib.tag.get_config_dto([])))
226 if tag_lines:
227 if all_lines:
228 all_lines.append("")
229 all_lines.append("Tags:")
230 all_lines.extend(indent(tag_lines, indent_step=1))
231
232 return all_lines
233
234
235 def config_backup(lib, argv, modifiers):
236 """
237 Options:
238 * --force - overwrite file if already exists
239 """
240 del lib
241 modifiers.ensure_only_supported("--force")
242 if len(argv) > 1:
243 raise CmdLineInputError()
244
245 outfile_name = None
246 if argv:
247 outfile_name = argv[0]
248 if not outfile_name.endswith(".tar.bz2"):
249 outfile_name += ".tar.bz2"
250
251 tar_data = config_backup_local()
252 if outfile_name:
253 ok, message = utils.write_file(
254 outfile_name, tar_data, permissions=0o600, binary=True
255 )
256 if not ok:
257 utils.err(message)
258 else:
259 # in python3 stdout accepts str so we need to use buffer
260 sys.stdout.buffer.write(tar_data)
261
262
263 def config_backup_local():
264 """
265 Commandline options: no options
266 """
267 file_list = config_backup_path_list()
268 tar_data = BytesIO()
269
270 try:
271 with tarfile.open(fileobj=tar_data, mode="w|bz2") as tarball:
272 config_backup_add_version_to_tarball(tarball)
273 for tar_path, path_info in file_list.items():
274 if (
275 not os.path.exists(path_info["path"])
276 and not path_info["required"]
277 ):
278 continue
279 tarball.add(path_info["path"], tar_path)
280 except (tarfile.TarError, EnvironmentError) as e:
281 utils.err("unable to create tarball: %s" % e)
282
283 tar = tar_data.getvalue()
284 tar_data.close()
285 return tar
286
287
288 def config_restore(lib, argv, modifiers):
289 """
290 Options:
291 * --local - restore config only on local node
292 * --request-timeout - timeout for HTTP requests, used only if --local was
293 not defined or user is not root
294 """
295 del lib
296 modifiers.ensure_only_supported("--local", "--request-timeout")
297 if len(argv) > 1:
298 raise CmdLineInputError()
299
300 infile_name = infile_obj = None
301 if argv:
302 infile_name = argv[0]
303 if not infile_name:
304 # in python3 stdin returns str so we need to use buffer
305 infile_obj = BytesIO(sys.stdin.buffer.read())
306
307 if os.getuid() == 0:
308 if modifiers.get("--local"):
309 config_restore_local(infile_name, infile_obj)
310 else:
311 config_restore_remote(infile_name, infile_obj)
312 else:
313 new_argv = ["config", "restore"]
314 options = []
315 new_stdin = None
316 if modifiers.get("--local"):
317 options.append("--local")
318 if infile_name:
319 new_argv.append(os.path.abspath(infile_name))
320 else:
321 new_stdin = infile_obj.read()
322 err_msgs, exitcode, std_out, std_err = utils.call_local_pcsd(
323 new_argv, options, new_stdin
324 )
325 if err_msgs:
326 for msg in err_msgs:
327 utils.err(msg, False)
328 sys.exit(1)
329 print(std_out)
330 sys.stderr.write(std_err)
331 sys.exit(exitcode)
332
333
334 def config_restore_remote(infile_name, infile_obj): # noqa: PLR0912
335 """
336 Commandline options:
337 * --request-timeout - timeout for HTTP requests
338 """
339 # pylint: disable=too-many-branches
340 # pylint: disable=too-many-locals
341 extracted = {
342 "version.txt": "",
343 "corosync.conf": "",
344 }
345 try:
346 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
347 while True:
348 # next(tarball) does not work in python2.6
349 tar_member_info = tarball.next()
350 if tar_member_info is None:
351 break
352 if tar_member_info.name in extracted:
353 tar_member = tarball.extractfile(tar_member_info)
354 extracted[tar_member_info.name] = tar_member.read()
355 tar_member.close()
356 except (tarfile.TarError, EnvironmentError) as e:
357 utils.err("unable to read the tarball: %s" % e)
358
359 config_backup_check_version(extracted["version.txt"])
360
361 node_list, report_list = get_existing_nodes_names(
362 utils.get_corosync_conf_facade(
363 conf_text=extracted["corosync.conf"].decode("utf-8")
364 )
365 )
366 if report_list:
367 process_library_reports(report_list)
368 if not node_list:
369 utils.err("no nodes found in the tarball")
370
371 err_msgs = []
372 for node in node_list:
373 try:
374 retval, output = utils.checkStatus(node)
375 if retval != 0:
376 err_msgs.append(output)
377 continue
378 _status = json.loads(output)
379 if any(
380 _status["node"]["services"][service_name]["running"]
381 for service_name in (
382 "corosync",
383 "pacemaker",
384 "pacemaker_remote",
385 )
386 ):
387 err_msgs.append(
388 "Cluster is currently running on node %s. You need to stop "
389 "the cluster in order to restore the configuration." % node
390 )
391 continue
392 except (ValueError, NameError, LookupError):
393 err_msgs.append("unable to determine status of the node %s" % node)
394 if err_msgs:
395 for msg in err_msgs:
396 utils.err(msg, False)
397 sys.exit(1)
398
399 # Temporarily disable config files syncing thread in pcsd so it will not
400 # rewrite restored files. 10 minutes should be enough time to restore.
401 # If node returns HTTP 404 it does not support config syncing at all.
402 for node in node_list:
403 retval, output = utils.pauseConfigSyncing(node, 10 * 60)
404 if not (retval == 0 or "(HTTP error: 404)" in output):
405 utils.err(output)
406
407 if infile_obj:
408 infile_obj.seek(0)
409 tarball_data = infile_obj.read()
410 else:
411 with open(infile_name, "rb") as tarball:
412 tarball_data = tarball.read()
413
414 error_list = []
415 for node in node_list:
416 retval, error = utils.restoreConfig(node, tarball_data)
417 if retval != 0:
418 error_list.append(error)
419 if error_list:
420 utils.err("unable to restore all nodes\n" + "\n".join(error_list))
421
422
423 def config_restore_local(infile_name, infile_obj): # noqa: PLR0912, PLR0915
424 """
425 Commandline options: no options
426 """
427 # pylint: disable=too-many-branches
428 # pylint: disable=too-many-locals
429 # pylint: disable=too-many-statements
430 service_manager = utils.get_service_manager()
431 if (
432 service_manager.is_running("corosync")
433 or service_manager.is_running("pacemaker")
434 or service_manager.is_running("pacemaker_remote")
435 ):
436 utils.err(
437 "Cluster is currently running on this node. You need to stop "
438 "the cluster in order to restore the configuration."
439 )
440
441 file_list = config_backup_path_list(with_uid_gid=True)
442 tarball_file_list = []
443 version = None
444 tmp_dir = None
445 try:
446 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
447 while True:
448 # next(tarball) does not work in python2.6
449 tar_member_info = tarball.next()
450 if tar_member_info is None:
451 break
452 if tar_member_info.name == "version.txt":
453 version_data = tarball.extractfile(tar_member_info)
454 version = version_data.read()
455 version_data.close()
456 continue
457 tarball_file_list.append(tar_member_info.name)
458
459 required_file_list = [
460 tar_path
461 for tar_path, path_info in file_list.items()
462 if path_info["required"]
463 ]
464 missing = set(required_file_list) - set(tarball_file_list)
465 if missing:
466 utils.err(
467 "unable to restore the cluster, missing files in backup: %s"
468 % ", ".join(missing)
469 )
470
471 config_backup_check_version(version)
472
473 if infile_obj:
474 infile_obj.seek(0)
475 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
476 while True:
477 # next(tarball) does not work in python2.6
478 tar_member_info = tarball.next()
479 if tar_member_info is None:
480 break
481 extract_info = None
482 path = tar_member_info.name
483 while path:
484 if path in file_list:
485 extract_info = file_list[path]
486 break
487 path = os.path.dirname(path)
488 if not extract_info:
489 continue
490 path_full = None
491 if callable(extract_info.get("pre_store_call")):
492 extract_info["pre_store_call"]()
493 if "rename" in extract_info and extract_info["rename"]:
494 if tmp_dir is None:
495 tmp_dir = tempfile.mkdtemp()
496 if hasattr(tarfile, "data_filter"):
497 # Safe way of extraction is available since Python 3.12,
498 # hasattr above checks if it's available.
499 # It's also backported to 3.11.4, 3.10.12, 3.9.17.
500 # It may be backported to older versions in downstream.
501 tarball.extractall(
502 tmp_dir, [tar_member_info], filter="data"
503 )
504 else:
505 # Unsafe way of extraction
506 # Remove once we don't support Python 3.8 and older
507 tarball.extractall(tmp_dir, [tar_member_info])
508 path_full = extract_info["path"]
509 shutil.move(
510 os.path.join(tmp_dir, tar_member_info.name), path_full
511 )
512 else:
513 dir_path = os.path.dirname(extract_info["path"])
514 if hasattr(tarfile, "data_filter"):
515 # Safe way of extraction is available since Python 3.12,
516 # hasattr above checks if it's available.
517 # It's also backported to 3.11.4, 3.10.12, 3.9.17.
518 # It may be backported to older versions in downstream.
519 tarball.extractall(
520 dir_path, [tar_member_info], filter="data"
521 )
522 else:
523 # Unsafe way of extracting
524 # Remove once we don't support Python 3.8 and older
525 tarball.extractall(dir_path, [tar_member_info])
526 path_full = os.path.join(dir_path, tar_member_info.name)
527 file_attrs = extract_info["attrs"]
528 os.chmod(path_full, file_attrs["mode"])
529 os.chown(path_full, file_attrs["uid"], file_attrs["gid"])
530 except (tarfile.TarError, EnvironmentError, OSError) as e:
531 utils.err("unable to restore the cluster: %s" % e)
532 finally:
533 if tmp_dir:
534 shutil.rmtree(tmp_dir, ignore_errors=True)
535
536 try:
537 sig_path = os.path.join(settings.cib_dir, "cib.xml.sig")
538 if os.path.exists(sig_path):
539 os.remove(sig_path)
540 except EnvironmentError as e:
541 utils.err("unable to remove %s: %s" % (sig_path, e))
542
543
544 def config_backup_path_list(with_uid_gid=False):
545 """
546 Commandline options: no option
547 NOTE: corosync.conf path may be altered using --corosync_conf
548 """
549 corosync_attrs = {
550 "mtime": int(time.time()),
551 "mode": 0o644,
552 "uname": "root",
553 "gname": "root",
554 "uid": 0,
555 "gid": 0,
556 }
557 corosync_authkey_attrs = dict(corosync_attrs)
558 corosync_authkey_attrs["mode"] = 0o400
559 cib_attrs = {
560 "mtime": int(time.time()),
561 "mode": 0o600,
562 "uname": settings.pacemaker_uname,
563 "gname": settings.pacemaker_gname,
564 }
565 if with_uid_gid:
566 cib_attrs["uid"] = _get_uid(cib_attrs["uname"])
567 cib_attrs["gid"] = _get_gid(cib_attrs["gname"])
568
569 pcmk_authkey_attrs = dict(cib_attrs)
570 pcmk_authkey_attrs["mode"] = 0o440
571 return {
572 "cib.xml": {
573 "path": os.path.join(settings.cib_dir, "cib.xml"),
574 "required": True,
575 "attrs": dict(cib_attrs),
576 },
577 "corosync_authkey": {
578 "path": settings.corosync_authkey_file,
579 "required": False,
580 "attrs": corosync_authkey_attrs,
581 "restore_procedure": None,
582 "rename": True,
583 },
584 "pacemaker_authkey": {
585 "path": settings.pacemaker_authkey_file,
586 "required": False,
587 "attrs": pcmk_authkey_attrs,
588 "restore_procedure": None,
589 "rename": True,
590 "pre_store_call": _ensure_etc_pacemaker_exists,
591 },
592 "corosync.conf": {
593 "path": settings.corosync_conf_file,
594 "required": True,
595 "attrs": dict(corosync_attrs),
596 },
597 "uidgid.d": {
598 "path": settings.corosync_uidgid_dir,
599 "required": False,
600 "attrs": dict(corosync_attrs),
601 },
602 "pcs_settings.conf": {
603 "path": settings.pcsd_settings_conf_location,
604 "required": False,
605 "attrs": {
606 "mtime": int(time.time()),
607 "mode": 0o644,
608 "uname": "root",
609 "gname": "root",
610 "uid": 0,
611 "gid": 0,
612 },
613 },
614 }
615
616
617 def _get_uid(user_name):
618 """
619 Commandline options: no options
620 """
621 try:
622 return pwd.getpwnam(user_name).pw_uid
623 except KeyError:
624 return utils.err(
625 "Unable to determine uid of user '{0}'".format(user_name)
626 )
627
628
629 def _get_gid(group_name):
630 """
631 Commandline options: no options
632 """
633 try:
634 return grp.getgrnam(group_name).gr_gid
635 except KeyError:
636 return utils.err(
637 "Unable to determine gid of group '{0}'".format(group_name)
638 )
639
640
641 def _ensure_etc_pacemaker_exists():
642 """
643 Commandline options: no options
644 """
645 dir_name = os.path.dirname(settings.pacemaker_authkey_file)
646 if not os.path.exists(dir_name):
647 os.mkdir(dir_name)
648 os.chmod(dir_name, 0o750)
649 os.chown(
650 dir_name,
651 _get_uid(settings.pacemaker_uname),
652 _get_gid(settings.pacemaker_gname),
653 )
654
655
656 def config_backup_check_version(version):
657 """
658 Commandline options: no options
659 """
660 try:
661 version_number = int(version)
662 supported_version = config_backup_version()
663 if version_number > supported_version:
664 utils.err(
665 f"Unsupported version of the backup, supported version is "
666 f"{supported_version}, backup version is {version_number}"
667 )
668 if version_number < supported_version:
669 warn(
670 f"Restoring from the backup version {version_number}, current "
671 f"supported version is {supported_version}"
672 )
673 except TypeError:
674 utils.err("Cannot determine version of the backup")
675
676
677 def config_backup_add_version_to_tarball(tarball, version=None):
678 """
679 Commandline options: no options
680 """
681 ver = version if version is not None else str(config_backup_version())
682 return utils.tar_add_file_data(tarball, ver.encode("utf-8"), "version.txt")
683
684
685 def config_backup_version():
686 """
687 Commandline options: no options
688 """
689 return 1
690
691
692 def config_checkpoint_list(lib, argv, modifiers):
693 """
694 Options: no options
695 """
696 del lib
697 modifiers.ensure_only_supported()
698 if argv:
699 raise CmdLineInputError()
700 try:
701 file_list = os.listdir(settings.cib_dir)
702 except OSError as e:
703 utils.err("unable to list checkpoints: %s" % e)
704 cib_list = []
705 cib_name_re = re.compile(r"^cib-(\d+)\.raw$")
706 for filename in file_list:
707 match = cib_name_re.match(filename)
708 if not match:
709 continue
710 file_path = os.path.join(settings.cib_dir, filename)
711 try:
712 if os.path.isfile(file_path):
713 cib_list.append(
714 (float(os.path.getmtime(file_path)), match.group(1))
715 )
716 except OSError:
717 pass
718 cib_list.sort()
719 if not cib_list:
720 print_to_stderr("No checkpoints available")
721 return
722 for cib_info in cib_list:
723 print(
724 "checkpoint %s: date %s"
725 % (cib_info[1], datetime.datetime.fromtimestamp(round(cib_info[0])))
726 )
727
728
729 def _checkpoint_to_lines(lib, checkpoint_number):
730 # backup current settings
731 orig_usefile = utils.usefile
732 orig_filename = utils.filename
733 orig_middleware = lib.middleware_factory
734 orig_env = lib.env
735 # configure old code to read the CIB from a file
736 utils.usefile = True
737 utils.filename = os.path.join(
738 settings.cib_dir, "cib-%s.raw" % checkpoint_number
739 )
740 # configure new code to read the CIB from a file
741 lib.middleware_factory = orig_middleware._replace(
742 cib=middleware.cib(utils.filename, utils.touch_cib_file)
743 )
744 lib.env = utils.get_cli_env()
745 # export the CIB to text
746 result = False, []
747 if os.path.isfile(utils.filename):
748 result = True, _config_show_cib_lines(lib)
749 # restore original settings
750 utils.usefile = orig_usefile
751 utils.filename = orig_filename
752 lib.middleware_factory = orig_middleware
753 lib.env = orig_env
754 return result
755
756
757 def config_checkpoint_view(lib, argv, modifiers):
758 """
759 Options: no options
760 """
761 modifiers.ensure_only_supported()
762 if len(argv) != 1:
763 print_to_stderr(usage.config(["checkpoint view"]))
764 sys.exit(1)
765
766 loaded, lines = _checkpoint_to_lines(lib, argv[0])
767 if not loaded:
768 utils.err("unable to read the checkpoint")
769 print("\n".join(lines))
770
771
772 def config_checkpoint_diff(lib, argv, modifiers):
773 """
774 Commandline options:
775 * -f - CIB file
776 """
777 modifiers.ensure_only_supported("-f")
778 if len(argv) != 2:
779 print_to_stderr(usage.config(["checkpoint diff"]))
780 sys.exit(1)
781
782 if argv[0] == argv[1]:
783 utils.err("cannot diff a checkpoint against itself")
784
785 errors = []
786 checkpoints_lines = []
787 for checkpoint in argv:
788 if checkpoint == "live":
789 lines = _config_show_cib_lines(lib)
790 if not lines:
791 errors.append("unable to read live configuration")
792 else:
793 checkpoints_lines.append(lines)
794 else:
795 loaded, lines = _checkpoint_to_lines(lib, checkpoint)
796 if not loaded:
797 errors.append(
798 "unable to read checkpoint '{0}'".format(checkpoint)
799 )
800 else:
801 checkpoints_lines.append(lines)
802
803 if errors:
804 utils.err("\n".join(errors))
805
806 print(
807 "Differences between {0} (-) and {1} (+):".format(
808 *[
809 (
810 "live configuration"
811 if label == "live"
812 else f"checkpoint {label}"
813 )
814 for label in argv
815 ]
816 )
817 )
818 print(
819 "\n".join(
820 [
821 line.rstrip()
822 for line in difflib.Differ().compare(
823 checkpoints_lines[0], checkpoints_lines[1]
824 )
825 ]
826 )
827 )
828
829
830 def config_checkpoint_restore(lib, argv, modifiers):
831 """
832 Options:
833 * -f - CIB file, a checkpoint will be restored into a specified file
834 """
835 del lib
836 modifiers.ensure_only_supported("-f")
837 if len(argv) != 1:
838 print_to_stderr(usage.config(["checkpoint restore"]))
839 sys.exit(1)
840
841 cib_path = os.path.join(settings.cib_dir, "cib-%s.raw" % argv[0])
842 try:
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
843 snapshot_dom = parse(cib_path)
844 # pylint: disable=broad-except
845 except Exception as e:
846 utils.err("unable to read the checkpoint: %s" % e)
847 utils.replace_cib_configuration(snapshot_dom)
848