1 import datetime
2 import difflib
3 import grp
4 import json
5 import os
6 import os.path
7 import pwd
8 import re
9 import shutil
10 import sys
11 import tarfile
12 import tempfile
13 import time
14 from io import BytesIO
15 from typing import cast
16 from xml.dom.minidom import parse
17
18 from pcs import (
19 cluster,
20 quorum,
21 settings,
22 status,
23 usage,
24 utils,
25 )
26 from pcs.cli.alert.output import config_dto_to_lines as alerts_to_lines
27 from pcs.cli.cluster_property.output import (
28 PropertyConfigurationFacade,
29 properties_to_text,
30 )
31 from pcs.cli.common import middleware
32 from pcs.cli.common.errors import CmdLineInputError
33 from pcs.cli.common.output import (
34 INDENT_STEP,
35 smart_wrap_text,
36 )
37 from pcs.cli.constraint.output import constraints_to_text
38 from pcs.cli.nvset import nvset_dto_list_to_lines
39 from pcs.cli.reports import process_library_reports
40 from pcs.cli.reports.output import (
41 print_to_stderr,
42 warn,
43 )
44 from pcs.cli.resource.output import (
45 ResourcesConfigurationFacade,
46 resources_to_text,
47 )
48 from pcs.cli.stonith.levels.output import stonith_level_config_to_text
49 from pcs.cli.tag.output import tags_to_text
50 from pcs.common.interface import dto
51 from pcs.common.pacemaker.constraint import CibConstraintsDto
52 from pcs.common.str_tools import indent
53 from pcs.lib.errors import LibraryError
54 from pcs.lib.node import get_existing_nodes_names
55
56
57 def config_show(lib, argv, modifiers):
58 """
59 Options:
60 * -f - CIB file, when getting cluster name on remote node (corosync.conf
61 doesn't exist)
62 * --corosync_conf - corosync.conf file
63 """
64 modifiers.ensure_only_supported("-f", "--corosync_conf", "--show-secrets")
65 if argv:
66 raise CmdLineInputError()
67
68 corosync_conf_dto = None
69 cluster_name = ""
70 properties_facade = PropertyConfigurationFacade.from_properties_config(
71 lib.cluster_property.get_properties(),
72 )
73 try:
74 corosync_conf_dto = lib.cluster.get_corosync_conf_struct()
75 cluster_name = corosync_conf_dto.cluster_name
76 except LibraryError:
77 # there is no corosync.conf on remote nodes, we can try to
78 # get cluster name from pacemaker
79 pass
80 if not cluster_name:
81 cluster_name = properties_facade.get_property_value("cluster-name", "")
82 print("Cluster Name: %s" % cluster_name)
83
84 status.nodes_status(lib, ["config"], modifiers.get_subset("-f"))
85 cib_lines = _config_show_cib_lines(lib, properties_facade=properties_facade)
86 if cib_lines:
87 print()
88 print("\n".join(cib_lines))
89 if (
90 utils.hasCorosyncConf()
91 and not modifiers.is_specified("-f")
92 and not modifiers.is_specified("--corosync_conf")
93 ):
94 cluster.cluster_uidgid(
95 lib, [], modifiers.get_subset(), silent_list=True
96 )
97 if corosync_conf_dto:
98 quorum_device_dict = {}
99 if corosync_conf_dto.quorum_device:
100 quorum_device_dict = dto.to_dict(corosync_conf_dto.quorum_device)
101 config = dict(
102 options=corosync_conf_dto.quorum_options,
103 device=quorum_device_dict,
104 )
105 quorum_lines = quorum.quorum_config_to_str(config)
106 if quorum_lines:
107 print()
108 print("Quorum:")
109 print("\n".join(indent(quorum_lines)))
110
111
112 def _config_show_cib_lines(lib, properties_facade=None): # noqa: PLR0912, PLR0915
113 """
114 Commandline options:
115 * -f - CIB file
116 """
117 # pylint: disable=too-many-branches
118 # pylint: disable=too-many-locals
119 # pylint: disable=too-many-statements
120
121 # update of pcs_options will change output of constraint show and
122 # displaying resources and operations defaults
123 utils.pcs_options["--full"] = 1
124 # get latest modifiers object after updating pcs_options
125 modifiers = utils.get_input_modifiers()
126
127 resources_facade = ResourcesConfigurationFacade.from_resources_dto(
128 lib.resource.get_configured_resources()
129 )
130 resources_only_facade = resources_facade.filter_stonith(False)
131 stonith_only_facade = resources_facade.filter_stonith(True)
132 if modifiers.is_specified("--show-secrets"):
133 for facade in [resources_only_facade, stonith_only_facade]:
134 queries = facade.get_secrets_queries()
135 if queries:
136 facade.update_secrets_values(
137 lib.resource.get_cibsecrets(queries)
138 )
139
140 all_lines = []
141
142 resources_lines = smart_wrap_text(
143 indent(
144 resources_to_text(resources_only_facade), indent_step=INDENT_STEP
145 )
146 )
147 if resources_lines:
148 all_lines.append("Resources:")
149 all_lines.extend(resources_lines)
150
151 stonith_lines = smart_wrap_text(
152 indent(resources_to_text(stonith_only_facade), indent_step=INDENT_STEP)
153 )
154 if stonith_lines:
155 if all_lines:
156 all_lines.append("")
157 all_lines.append("Stonith Devices:")
158 all_lines.extend(stonith_lines)
159
160 levels_lines = stonith_level_config_to_text(
161 lib.fencing_topology.get_config_dto()
162 )
163 if levels_lines:
164 if all_lines:
165 all_lines.append("")
166 all_lines.append("Fencing Levels:")
167 all_lines.extend(indent(levels_lines, indent_step=2))
168
169 constraints_lines = smart_wrap_text(
170 constraints_to_text(
171 cast(
172 CibConstraintsDto,
173 lib.constraint.get_config(evaluate_rules=False),
174 ),
175 modifiers.is_specified("--full"),
176 )
177 )
178 if constraints_lines:
179 if all_lines:
180 all_lines.append("")
181 all_lines.extend(constraints_lines)
182
183 alert_lines = indent(alerts_to_lines(lib.alert.get_config_dto()))
184 if alert_lines:
185 if all_lines:
186 all_lines.append("")
187 all_lines.append("Alerts:")
188 all_lines.extend(alert_lines)
189
190 resources_defaults_lines = indent(
191 nvset_dto_list_to_lines(
192 lib.cib_options.resource_defaults_config(
193 evaluate_expired=False
194 ).meta_attributes,
195 nvset_label="Meta Attrs",
196 with_ids=modifiers.get("--full"),
197 )
198 )
199 if resources_defaults_lines:
200 if all_lines:
201 all_lines.append("")
202 all_lines.append("Resources Defaults:")
203 all_lines.extend(resources_defaults_lines)
204
205 operations_defaults_lines = indent(
206 nvset_dto_list_to_lines(
207 lib.cib_options.operation_defaults_config(
208 evaluate_expired=False
209 ).meta_attributes,
210 nvset_label="Meta Attrs",
211 with_ids=modifiers.get("--full"),
212 )
213 )
214 if operations_defaults_lines:
215 if all_lines:
216 all_lines.append("")
217 all_lines.append("Operations Defaults:")
218 all_lines.extend(operations_defaults_lines)
219
220 if not properties_facade:
221 properties_facade = PropertyConfigurationFacade.from_properties_config(
222 lib.cluster_property.get_properties()
223 )
224 properties_lines = properties_to_text(properties_facade)
225 if properties_lines:
226 if all_lines:
227 all_lines.append("")
228 all_lines.extend(properties_lines)
229
230 tag_lines = smart_wrap_text(tags_to_text(lib.tag.get_config_dto([])))
231 if tag_lines:
232 if all_lines:
233 all_lines.append("")
234 all_lines.append("Tags:")
235 all_lines.extend(indent(tag_lines, indent_step=1))
236
237 return all_lines
238
239
240 def config_backup(lib, argv, modifiers):
241 """
242 Options:
243 * --force - overwrite file if already exists
244 """
245 del lib
246 modifiers.ensure_only_supported("--force")
247 if len(argv) > 1:
248 raise CmdLineInputError()
249
250 outfile_name = None
251 if argv:
252 outfile_name = argv[0]
253 if not outfile_name.endswith(".tar.bz2"):
254 outfile_name += ".tar.bz2"
255
256 tar_data = config_backup_local()
257 if outfile_name:
258 ok, message = utils.write_file(
259 outfile_name, tar_data, permissions=0o600, binary=True
260 )
261 if not ok:
262 utils.err(message)
263 else:
264 # in python3 stdout accepts str so we need to use buffer
265 sys.stdout.buffer.write(tar_data)
266
267
268 def config_backup_local():
269 """
270 Commandline options: no options
271 """
272 file_list = config_backup_path_list()
273 tar_data = BytesIO()
274
275 try:
276 with tarfile.open(fileobj=tar_data, mode="w|bz2") as tarball:
277 config_backup_add_version_to_tarball(tarball)
278 for tar_path, path_info in file_list.items():
279 if (
280 not os.path.exists(path_info["path"])
281 and not path_info["required"]
282 ):
283 continue
284 tarball.add(path_info["path"], tar_path)
285 except (tarfile.TarError, EnvironmentError) as e:
286 utils.err("unable to create tarball: %s" % e)
287
288 tar = tar_data.getvalue()
289 tar_data.close()
290 return tar
291
292
293 def config_restore(lib, argv, modifiers):
294 """
295 Options:
296 * --local - restore config only on local node
297 * --request-timeout - timeout for HTTP requests, used only if --local was
298 not defined or user is not root
299 """
300 del lib
301 modifiers.ensure_only_supported("--local", "--request-timeout")
302 if len(argv) > 1:
303 raise CmdLineInputError()
304
305 infile_name = infile_obj = None
306 if argv:
307 infile_name = argv[0]
308 if not infile_name:
309 # in python3 stdin returns str so we need to use buffer
310 infile_obj = BytesIO(sys.stdin.buffer.read())
311
312 if os.getuid() == 0:
313 if modifiers.get("--local"):
314 config_restore_local(infile_name, infile_obj)
315 else:
316 config_restore_remote(infile_name, infile_obj)
317 else:
318 new_argv = ["config", "restore"]
319 options = []
320 new_stdin = None
321 if modifiers.get("--local"):
322 options.append("--local")
323 if infile_name:
324 new_argv.append(os.path.abspath(infile_name))
325 else:
326 new_stdin = infile_obj.read()
327 err_msgs, exitcode, std_out, std_err = utils.call_local_pcsd(
328 new_argv, options, new_stdin
329 )
330 if err_msgs:
331 for msg in err_msgs:
332 utils.err(msg, False)
333 sys.exit(1)
334 print(std_out)
335 sys.stderr.write(std_err)
336 sys.exit(exitcode)
337
338
339 def config_restore_remote(infile_name, infile_obj): # noqa: PLR0912
340 """
341 Commandline options:
342 * --request-timeout - timeout for HTTP requests
343 """
344 # pylint: disable=too-many-branches
345 # pylint: disable=too-many-locals
346 extracted = {
347 "version.txt": "",
348 "corosync.conf": "",
349 }
350 try:
351 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
352 while True:
353 # next(tarball) does not work in python2.6
354 tar_member_info = tarball.next()
355 if tar_member_info is None:
356 break
357 if tar_member_info.name in extracted:
358 tar_member = tarball.extractfile(tar_member_info)
359 extracted[tar_member_info.name] = tar_member.read()
360 tar_member.close()
361 except (tarfile.TarError, EnvironmentError) as e:
362 utils.err("unable to read the tarball: %s" % e)
363
364 config_backup_check_version(extracted["version.txt"])
365
366 node_list, report_list = get_existing_nodes_names(
367 utils.get_corosync_conf_facade(
368 conf_text=extracted["corosync.conf"].decode("utf-8")
369 )
370 )
371 if report_list:
372 process_library_reports(report_list)
373 if not node_list:
374 utils.err("no nodes found in the tarball")
375
376 err_msgs = []
377 for node in node_list:
378 try:
379 retval, output = utils.checkStatus(node)
380 if retval != 0:
381 err_msgs.append(output)
382 continue
383 _status = json.loads(output)
384 if any(
385 _status["node"]["services"][service_name]["running"]
386 for service_name in (
387 "corosync",
388 "pacemaker",
389 "pacemaker_remote",
390 )
391 ):
392 err_msgs.append(
393 "Cluster is currently running on node %s. You need to stop "
394 "the cluster in order to restore the configuration." % node
395 )
396 continue
397 except (ValueError, NameError, LookupError):
398 err_msgs.append("unable to determine status of the node %s" % node)
399 if err_msgs:
400 for msg in err_msgs:
401 utils.err(msg, False)
402 sys.exit(1)
403
404 # Temporarily disable config files syncing thread in pcsd so it will not
405 # rewrite restored files. 10 minutes should be enough time to restore.
406 # If node returns HTTP 404 it does not support config syncing at all.
407 for node in node_list:
408 retval, output = utils.pauseConfigSyncing(node, 10 * 60)
409 if not (retval == 0 or "(HTTP error: 404)" in output):
410 utils.err(output)
411
412 if infile_obj:
413 infile_obj.seek(0)
414 tarball_data = infile_obj.read()
415 else:
416 with open(infile_name, "rb") as tarball:
417 tarball_data = tarball.read()
418
419 error_list = []
420 for node in node_list:
421 retval, error = utils.restoreConfig(node, tarball_data)
422 if retval != 0:
423 error_list.append(error)
424 if error_list:
425 utils.err("unable to restore all nodes\n" + "\n".join(error_list))
426
427
428 def config_restore_local(infile_name, infile_obj): # noqa: PLR0912, PLR0915
429 """
430 Commandline options: no options
431 """
432 # pylint: disable=too-many-branches
433 # pylint: disable=too-many-locals
434 # pylint: disable=too-many-statements
435 service_manager = utils.get_service_manager()
436 if (
437 service_manager.is_running("corosync")
438 or service_manager.is_running("pacemaker")
439 or service_manager.is_running("pacemaker_remote")
440 ):
441 utils.err(
442 "Cluster is currently running on this node. You need to stop "
443 "the cluster in order to restore the configuration."
444 )
445
446 file_list = config_backup_path_list(with_uid_gid=True)
447 tarball_file_list = []
448 version = None
449 tmp_dir = None
450 try:
451 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
452 while True:
453 # next(tarball) does not work in python2.6
454 tar_member_info = tarball.next()
455 if tar_member_info is None:
456 break
457 if tar_member_info.name == "version.txt":
458 version_data = tarball.extractfile(tar_member_info)
459 version = version_data.read()
460 version_data.close()
461 continue
462 tarball_file_list.append(tar_member_info.name)
463
464 required_file_list = [
465 tar_path
466 for tar_path, path_info in file_list.items()
467 if path_info["required"]
468 ]
469 missing = set(required_file_list) - set(tarball_file_list)
470 if missing:
471 utils.err(
472 "unable to restore the cluster, missing files in backup: %s"
473 % ", ".join(missing)
474 )
475
476 config_backup_check_version(version)
477
478 if infile_obj:
479 infile_obj.seek(0)
480 with tarfile.open(infile_name, "r|*", infile_obj) as tarball:
481 while True:
482 # next(tarball) does not work in python2.6
483 tar_member_info = tarball.next()
484 if tar_member_info is None:
485 break
486 extract_info = None
487 path = tar_member_info.name
488 while path:
489 if path in file_list:
490 extract_info = file_list[path]
491 break
492 path = os.path.dirname(path)
493 if not extract_info:
494 continue
495 path_full = None
496 if callable(extract_info.get("pre_store_call")):
497 extract_info["pre_store_call"]()
498 if "rename" in extract_info and extract_info["rename"]:
499 if tmp_dir is None:
500 tmp_dir = tempfile.mkdtemp()
501 if hasattr(tarfile, "data_filter"):
502 # Safe way of extraction is available since Python 3.12,
503 # hasattr above checks if it's available.
504 # It's also backported to 3.11.4, 3.10.12, 3.9.17.
505 # It may be backported to older versions in downstream.
506 tarball.extractall(
507 tmp_dir, [tar_member_info], filter="data"
508 )
509 else:
510 # Unsafe way of extraction
511 # Remove once we don't support Python 3.8 and older
512 tarball.extractall(tmp_dir, [tar_member_info])
513 path_full = extract_info["path"]
514 shutil.move(
515 os.path.join(tmp_dir, tar_member_info.name), path_full
516 )
517 else:
518 dir_path = os.path.dirname(extract_info["path"])
519 if hasattr(tarfile, "data_filter"):
520 # Safe way of extraction is available since Python 3.12,
521 # hasattr above checks if it's available.
522 # It's also backported to 3.11.4, 3.10.12, 3.9.17.
523 # It may be backported to older versions in downstream.
524 tarball.extractall(
525 dir_path, [tar_member_info], filter="data"
526 )
527 else:
528 # Unsafe way of extracting
529 # Remove once we don't support Python 3.8 and older
530 tarball.extractall(dir_path, [tar_member_info])
531 path_full = os.path.join(dir_path, tar_member_info.name)
532 file_attrs = extract_info["attrs"]
533 os.chmod(path_full, file_attrs["mode"])
534 os.chown(path_full, file_attrs["uid"], file_attrs["gid"])
535 except (tarfile.TarError, EnvironmentError, OSError) as e:
536 utils.err("unable to restore the cluster: %s" % e)
537 finally:
538 if tmp_dir:
539 shutil.rmtree(tmp_dir, ignore_errors=True)
540
541 try:
542 sig_path = os.path.join(settings.cib_dir, "cib.xml.sig")
543 if os.path.exists(sig_path):
544 os.remove(sig_path)
545 except EnvironmentError as e:
546 utils.err("unable to remove %s: %s" % (sig_path, e))
547
548
549 def config_backup_path_list(with_uid_gid=False):
550 """
551 Commandline options: no option
552 NOTE: corosync.conf path may be altered using --corosync_conf
553 """
554 corosync_attrs = {
555 "mtime": int(time.time()),
556 "mode": 0o644,
557 "uname": "root",
558 "gname": "root",
559 "uid": 0,
560 "gid": 0,
561 }
562 corosync_authkey_attrs = dict(corosync_attrs)
563 corosync_authkey_attrs["mode"] = 0o400
564 cib_attrs = {
565 "mtime": int(time.time()),
566 "mode": 0o600,
567 "uname": settings.pacemaker_uname,
568 "gname": settings.pacemaker_gname,
569 }
570 if with_uid_gid:
571 cib_attrs["uid"] = _get_uid(cib_attrs["uname"])
572 cib_attrs["gid"] = _get_gid(cib_attrs["gname"])
573
574 pcmk_authkey_attrs = dict(cib_attrs)
575 pcmk_authkey_attrs["mode"] = 0o440
576 return {
577 "cib.xml": {
578 "path": os.path.join(settings.cib_dir, "cib.xml"),
579 "required": True,
580 "attrs": dict(cib_attrs),
581 },
582 "corosync_authkey": {
583 "path": settings.corosync_authkey_file,
584 "required": False,
585 "attrs": corosync_authkey_attrs,
586 "restore_procedure": None,
587 "rename": True,
588 },
589 "pacemaker_authkey": {
590 "path": settings.pacemaker_authkey_file,
591 "required": False,
592 "attrs": pcmk_authkey_attrs,
593 "restore_procedure": None,
594 "rename": True,
595 "pre_store_call": _ensure_etc_pacemaker_exists,
596 },
597 "corosync.conf": {
598 "path": settings.corosync_conf_file,
599 "required": True,
600 "attrs": dict(corosync_attrs),
601 },
602 "uidgid.d": {
603 "path": settings.corosync_uidgid_dir,
604 "required": False,
605 "attrs": dict(corosync_attrs),
606 },
607 "pcs_settings.conf": {
608 "path": settings.pcsd_settings_conf_location,
609 "required": False,
610 "attrs": {
611 "mtime": int(time.time()),
612 "mode": 0o644,
613 "uname": "root",
614 "gname": "root",
615 "uid": 0,
616 "gid": 0,
617 },
618 },
619 }
620
621
622 def _get_uid(user_name):
623 """
624 Commandline options: no options
625 """
626 try:
627 return pwd.getpwnam(user_name).pw_uid
628 except KeyError:
629 return utils.err(
630 "Unable to determine uid of user '{0}'".format(user_name)
631 )
632
633
634 def _get_gid(group_name):
635 """
636 Commandline options: no options
637 """
638 try:
639 return grp.getgrnam(group_name).gr_gid
640 except KeyError:
641 return utils.err(
642 "Unable to determine gid of group '{0}'".format(group_name)
643 )
644
645
646 def _ensure_etc_pacemaker_exists():
647 """
648 Commandline options: no options
649 """
650 dir_name = os.path.dirname(settings.pacemaker_authkey_file)
651 if not os.path.exists(dir_name):
652 os.mkdir(dir_name)
653 os.chmod(dir_name, 0o750)
654 os.chown(
655 dir_name,
656 _get_uid(settings.pacemaker_uname),
657 _get_gid(settings.pacemaker_gname),
658 )
659
660
661 def config_backup_check_version(version):
662 """
663 Commandline options: no options
664 """
665 try:
666 version_number = int(version)
667 supported_version = config_backup_version()
668 if version_number > supported_version:
669 utils.err(
670 f"Unsupported version of the backup, supported version is "
671 f"{supported_version}, backup version is {version_number}"
672 )
673 if version_number < supported_version:
674 warn(
675 f"Restoring from the backup version {version_number}, current "
676 f"supported version is {supported_version}"
677 )
678 except TypeError:
679 utils.err("Cannot determine version of the backup")
680
681
682 def config_backup_add_version_to_tarball(tarball, version=None):
683 """
684 Commandline options: no options
685 """
686 ver = version if version is not None else str(config_backup_version())
687 return utils.tar_add_file_data(tarball, ver.encode("utf-8"), "version.txt")
688
689
690 def config_backup_version():
691 """
692 Commandline options: no options
693 """
694 return 1
695
696
697 def config_checkpoint_list(lib, argv, modifiers):
698 """
699 Options: no options
700 """
701 del lib
702 modifiers.ensure_only_supported()
703 if argv:
704 raise CmdLineInputError()
705 try:
706 file_list = os.listdir(settings.cib_dir)
707 except OSError as e:
708 utils.err("unable to list checkpoints: %s" % e)
709 cib_list = []
710 cib_name_re = re.compile(r"^cib-(\d+)\.raw$")
711 for filename in file_list:
712 match = cib_name_re.match(filename)
713 if not match:
714 continue
715 file_path = os.path.join(settings.cib_dir, filename)
716 try:
717 if os.path.isfile(file_path):
718 cib_list.append(
719 (float(os.path.getmtime(file_path)), match.group(1))
720 )
721 except OSError:
722 pass
723 cib_list.sort()
724 if not cib_list:
725 print_to_stderr("No checkpoints available")
726 return
727 for cib_info in cib_list:
728 print(
729 "checkpoint %s: date %s"
730 % (cib_info[1], datetime.datetime.fromtimestamp(round(cib_info[0])))
731 )
732
733
734 def _checkpoint_to_lines(lib, checkpoint_number):
735 # backup current settings
736 orig_usefile = utils.usefile
737 orig_filename = utils.filename
738 orig_middleware = lib.middleware_factory
739 orig_env = lib.env
740 # configure old code to read the CIB from a file
741 utils.usefile = True
742 utils.filename = os.path.join(
743 settings.cib_dir, "cib-%s.raw" % checkpoint_number
744 )
745 # configure new code to read the CIB from a file
746 lib.middleware_factory = orig_middleware._replace(
747 cib=middleware.cib(utils.filename, utils.touch_cib_file)
748 )
749 lib.env = utils.get_cli_env()
750 # export the CIB to text
751 result = False, []
752 if os.path.isfile(utils.filename):
753 result = True, _config_show_cib_lines(lib)
754 # restore original settings
755 utils.usefile = orig_usefile
756 utils.filename = orig_filename
757 lib.middleware_factory = orig_middleware
758 lib.env = orig_env
759 return result
760
761
762 def config_checkpoint_view(lib, argv, modifiers):
763 """
764 Options: no options
765 """
766 modifiers.ensure_only_supported()
767 if len(argv) != 1:
768 print_to_stderr(usage.config(["checkpoint view"]))
769 sys.exit(1)
770
771 loaded, lines = _checkpoint_to_lines(lib, argv[0])
772 if not loaded:
773 utils.err("unable to read the checkpoint")
774 print("\n".join(lines))
775
776
777 def config_checkpoint_diff(lib, argv, modifiers):
778 """
779 Commandline options:
780 * -f - CIB file
781 """
782 modifiers.ensure_only_supported("-f")
783 if len(argv) != 2:
784 print_to_stderr(usage.config(["checkpoint diff"]))
785 sys.exit(1)
786
787 if argv[0] == argv[1]:
788 utils.err("cannot diff a checkpoint against itself")
789
790 errors = []
791 checkpoints_lines = []
792 for checkpoint in argv:
793 if checkpoint == "live":
794 lines = _config_show_cib_lines(lib)
795 if not lines:
796 errors.append("unable to read live configuration")
797 else:
798 checkpoints_lines.append(lines)
799 else:
800 loaded, lines = _checkpoint_to_lines(lib, checkpoint)
801 if not loaded:
802 errors.append(
803 "unable to read checkpoint '{0}'".format(checkpoint)
804 )
805 else:
806 checkpoints_lines.append(lines)
807
808 if errors:
809 utils.err("\n".join(errors))
810
811 print(
812 "Differences between {0} (-) and {1} (+):".format(
813 *[
814 (
815 "live configuration"
816 if label == "live"
817 else f"checkpoint {label}"
818 )
819 for label in argv
820 ]
821 )
822 )
823 print(
824 "\n".join(
825 [
826 line.rstrip()
827 for line in difflib.Differ().compare(
828 checkpoints_lines[0], checkpoints_lines[1]
829 )
830 ]
831 )
832 )
833
834
835 def config_checkpoint_restore(lib, argv, modifiers):
836 """
837 Options:
838 * -f - CIB file, a checkpoint will be restored into a specified file
839 """
840 del lib
841 modifiers.ensure_only_supported("-f")
842 if len(argv) != 1:
843 print_to_stderr(usage.config(["checkpoint restore"]))
844 sys.exit(1)
845
846 cib_path = os.path.join(settings.cib_dir, "cib-%s.raw" % argv[0])
847 try:
|
(1) Event Sigma main event: |
The application uses Python's built in `xml` module which does not properly handle erroneous or maliciously constructed data, making the application vulnerable to one or more types of XML attacks. |
|
(2) Event remediation: |
Avoid using the `xml` module. Consider using the `defusedxml` module or similar which safely prevents all XML entity attacks. |
848 snapshot_dom = parse(cib_path)
849 # pylint: disable=broad-except
850 except Exception as e:
851 utils.err("unable to read the checkpoint: %s" % e)
852 utils.replace_cib_configuration(snapshot_dom)
853