1 from collections import Counter
2 from collections.abc import Mapping
3 from functools import partial
4 from typing import Final
5
6 from pcs.cli.common.errors import (
7 SEE_MAN_CHANGES,
8 CmdLineInputError,
9 )
10 from pcs.cli.reports.output import deprecation_warning
11 from pcs.common.const import INFINITY
12 from pcs.common.str_tools import (
13 format_list,
14 format_list_custom_last_separator,
15 format_plural,
16 )
17 from pcs.common.tools import timeout_to_seconds
18 from pcs.common.types import (
19 StringCollection,
20 StringIterable,
21 StringSequence,
22 )
23
24 # sys.argv always returns a list, we don't need StringSequence in here
25 Argv = list[str]
26 ModifierValueType = None | bool | str
27
28 _FUTURE_OPTION_STR: Final = "future"
29 FUTURE_OPTION: Final = f"--{_FUTURE_OPTION_STR}"
30 _OUTPUT_FORMAT_OPTION_STR: Final = "output-format"
31 OUTPUT_FORMAT_OPTION: Final = f"--{_OUTPUT_FORMAT_OPTION_STR}"
32 OUTPUT_FORMAT_VALUE_CMD: Final = "cmd"
33 OUTPUT_FORMAT_VALUE_JSON: Final = "json"
34 OUTPUT_FORMAT_VALUE_TEXT: Final = "text"
35 OUTPUT_FORMAT_VALUES: Final = frozenset(
36 (
37 OUTPUT_FORMAT_VALUE_CMD,
38 OUTPUT_FORMAT_VALUE_JSON,
39 OUTPUT_FORMAT_VALUE_TEXT,
40 )
41 )
42
43 MODIFIER_OPTIONS_BOOL: Final[frozenset[str]] = frozenset(
44 (
45 "--all",
46 "--agent-validation",
47 # TODO remove
48 # used only in acl commands and it is deprecated there
49 "--autodelete",
50 "--brief",
51 "--config",
52 "--corosync",
53 "--debug",
54 "--defaults",
55 "--disabled",
56 "--enable",
57 "--expired",
58 "--force",
59 "--full",
60 "--quiet",
61 FUTURE_OPTION,
62 # TODO remove
63 # used only in deprecated 'pcs resource|stonith show'
64 "--groups",
65 "--hide-inactive",
66 "--local",
67 "--monitor",
68 "--no-default-ops",
69 "--nodesc",
70 "--no-expire-check",
71 "--no-cluster-uuid",
72 "--no-keys-sync",
73 "--no-stop",
74 "--no-strict",
75 "--no-watchdog-validation",
76 "--off",
77 "--overwrite",
78 "--pacemaker",
79 "--promoted",
80 "--safe",
81 "--show-secrets",
82 "--simulate",
83 "--skip-offline",
84 "--start",
85 "--strict",
86 "--yes",
87 )
88 )
89
90 MODIFIER_OPTIONS_VAL: Final[frozenset[str]] = frozenset(
91 (
92 "--after",
93 "--before",
94 "--booth-conf",
95 "--booth-key",
96 "--corosync_conf",
97 "--from",
98 # TODO remove
99 # used in resource create and stonith create, deprecated in both
100 "--group",
101 "--name",
102 "--node",
103 "--request-timeout",
104 "--to",
105 "--token",
106 "-f",
107 "-p",
108 "-u",
109 )
110 )
111
112 ARG_TYPE_DELIMITER: Final = "%"
113
114 # h = help, f = file,
115 # p = password (cluster auth), u = user (cluster auth),
116 PCS_SHORT_OPTIONS: Final = "hf:p:u:"
117 PCS_LONG_OPTIONS: Final = [
118 "debug",
119 "version",
120 "help",
121 "fullhelp",
122 "force",
123 "skip-offline",
124 "autodelete",
125 "simulate",
126 "all",
127 "full",
128 "local",
129 "wait",
130 "config",
131 "start",
132 "enable",
133 "disabled",
134 "off",
135 "request-timeout=",
136 "brief",
137 _FUTURE_OPTION_STR,
138 # resource (safe-)disable
139 "safe",
140 "no-strict",
141 # resource cleanup | refresh
142 "strict",
143 "pacemaker",
144 "corosync",
145 "no-default-ops",
146 "defaults",
147 "nodesc",
148 "promoted",
149 "name=",
150 "group=",
151 "node=",
152 "from=",
153 "to=",
154 "after=",
155 "before=",
156 "corosync_conf=",
157 "booth-conf=",
158 "booth-key=",
159 # do not stop resources in resource delete command
160 "no-stop",
161 "no-watchdog-validation",
162 # pcs cluster setup
163 "no-cluster-uuid",
164 "no-keys-sync",
165 # in pcs status - do not display resource status on inactive node
166 "hide-inactive",
167 # pcs resource (un)manage - enable or disable monitor operations
168 "monitor",
169 # TODO remove
170 # used only in deprecated 'pcs resource|stonith show'
171 "groups",
172 # "pcs resource clear --expired" - only clear expired moves and bans
173 "expired",
174 # disable evaluating whether rules are expired
175 "no-expire-check",
176 # allow overwriting existing files, currently meant for / used in CLI only
177 "overwrite",
178 # output format of commands, e.g: json, cmd, text, ...
179 f"{_OUTPUT_FORMAT_OPTION_STR}=",
180 # auth token
181 "token=",
182 # enable agent self validation
183 "agent-validation",
184 # disable text output in query commands
185 "quiet",
186 # proceed with dangerous actions, meant for / used in CLI only
187 "yes",
188 # retrieve and display cibsecret values / used in CLI only
189 "show-secrets",
190 ]
191
192
193 def split_list(arg_list: Argv, separator: str) -> list[Argv]:
194 """
195 split a list of arguments to several lists using separator as a delimiter
196
197 arg_list -- list of command line arguments to split
198 separator -- delimiter
199 """
200 separator_indexes = [i for i, x in enumerate(arg_list) if x == separator]
201 bounds = zip(
202 [0] + [i + 1 for i in separator_indexes],
203 separator_indexes + [None],
204 strict=False,
205 )
206 return [arg_list[i:j] for i, j in bounds]
207
208
209 def split_list_by_any_keywords(
210 arg_list: Argv, keyword_label: str
211 ) -> dict[str, Argv]:
212 """
213 split a list of arguments using any argument not containing = as a delimiter
214
215 arg_list -- list of command line arguments to split
216 keyword_label -- description of all keywords
217 """
218 groups: dict[str, Argv] = {}
219 if not arg_list:
220 return groups
221
222 if "=" in arg_list[0]:
223 raise CmdLineInputError(
224 f"Invalid character '=' in {keyword_label} '{arg_list[0]}'"
225 )
226
227 current_keyword = arg_list[0]
228 groups[current_keyword] = []
229 for arg in arg_list[1:]:
230 if "=" in arg:
231 groups[current_keyword].append(arg)
232 else:
233 current_keyword = arg
234 if current_keyword in groups:
235 raise CmdLineInputError(
236 "{} '{}' defined multiple times".format(
237 keyword_label.capitalize(), current_keyword
238 )
239 )
240 groups[current_keyword] = []
241 return groups
242
243
244 def split_option(arg: str, allow_empty_value: bool = True) -> tuple[str, str]:
245 """
246 Get (key, value) from a key=value commandline argument.
247
248 Split the argument by the first = and return resulting parts. Raise
249 CmdLineInputError if the argument cannot be split.
250
251 arg -- commandline argument to split
252 allow_empty_value -- if False, raise CmdLineInputError on empty value
253 """
254 if "=" not in arg:
255 raise CmdLineInputError(f"missing value of '{arg}' option")
256 if arg.startswith("="):
257 raise CmdLineInputError(f"missing key in '{arg}' option")
258 key, value = arg.split("=", 1)
259 if not (value or allow_empty_value):
260 raise CmdLineInputError(f"value of '{key}' option is empty")
261 return key, value
262
263
264 def ensure_unique_args(cmdline_args: Argv) -> None:
265 """
266 Raises in case there are duplicate args
267 """
268 duplicities = [
269 item for item, count in Counter(cmdline_args).items() if count > 1
270 ]
271 if duplicities:
272 argument_pl = format_plural(duplicities, "argument")
273 duplicities_list = format_list(duplicities)
274 raise CmdLineInputError(f"duplicate {argument_pl}: {duplicities_list}")
275
276
277 class KeyValueParser:
278 """
279 Parse and check key=value options
280 """
281
282 def __init__(self, arg_list: Argv, repeatable: StringCollection = ()):
283 """
284 arg_list -- commandline arguments to be parsed
285 repeatable -- keys that are allowed to be specified several times
286 """
287 self._repeatable_keys = repeatable
288 self._key_value_map: dict[str, list[str]] = {}
289 for arg in arg_list:
290 name, value = split_option(arg)
291 if name not in self._key_value_map:
292 self._key_value_map[name] = [value]
293 else:
294 self._key_value_map[name].append(value)
295
296 def check_allowed_keys(self, allowed_keys: StringCollection) -> None:
297 """
298 Check that only allowed keys were specified
299
300 allowed_keys -- list of allowed keys
301 """
302 unknown_options = set(self._key_value_map.keys()) - set(allowed_keys)
303 if unknown_options:
304 raise CmdLineInputError(
305 "Unknown option{s} '{options}'".format(
306 s=("s" if len(unknown_options) > 1 else ""),
307 options="', '".join(sorted(unknown_options)),
308 )
309 )
310
311 def get_unique(self) -> dict[str, str]:
312 """
313 Get all non-repeatable keys and their values; raise if a key has more values
314 """
315 result: dict[str, str] = {}
316 for key, values in self._key_value_map.items():
317 if key in self._repeatable_keys:
318 continue
319 values_uniq = set(values)
320 if len(values_uniq) > 1:
321 raise CmdLineInputError(
322 f"duplicate option '{key}' with different values "
323 f"{format_list_custom_last_separator(values_uniq, ' and ')}"
324 )
325 result[key] = values[0]
326 return result
327
328 def get_repeatable(self) -> dict[str, list[str]]:
329 """
330 Get all repeatable keys and their values
331 """
332 return {
333 key: self._key_value_map[key]
334 for key in self._repeatable_keys
335 if key in self._key_value_map
336 }
337
338
339 class ArgsByKeywords:
340 def __init__(self, groups: Mapping[str, list[Argv]]):
341 self._groups = groups
342 self._flat_cache: dict[str, Argv] = {}
343
344 def allow_repetition_only_for(self, keyword_set: StringCollection) -> None:
345 """
346 Raise CmdLineInputError if a keyword has been repeated when not allowed
347
348 keyword_set -- repetition is allowed for these keywords
349 """
350 for keyword, arg_groups in self._groups.items():
351 if len(arg_groups) > 1 and keyword not in keyword_set:
352 raise CmdLineInputError(
353 f"'{keyword}' cannot be used more than once"
354 )
355
356 def ensure_unique_keywords(self) -> None:
357 """
358 Raise CmdLineInputError if any keyword has been repeated
359 """
360 return self.allow_repetition_only_for(set())
361
362 def is_empty(self) -> bool:
363 """
364 Check if any args have been specified
365 """
366 return not self._groups
367
368 def has_keyword(self, keyword: str) -> bool:
369 """
370 Check if a keyword has been specified
371
372 keyword -- a keyword to check
373 """
374 return keyword in self._groups
375
376 def has_empty_keyword(self, keyword: str) -> bool:
377 """
378 Check if a keyword has been specified without any following args
379
380 keyword -- a keyword to check
381 """
382 return self.has_keyword(keyword) and not self.get_args_flat(keyword)
383
384 def get_args_flat(self, keyword: str) -> Argv:
385 """
386 Get arguments of a keyword in one sequence
387 """
388 if keyword in self._groups:
389 if keyword not in self._flat_cache:
390 self._flat_cache[keyword] = [
391 arg
392 for one_group in self._groups[keyword]
393 for arg in one_group
394 ]
395 return self._flat_cache[keyword]
396 return []
397
398 def get_args_groups(self, keyword: str) -> list[Argv]:
399 """
400 Get arguments of a keyword, one group for each keyword occurrence
401 """
402 if keyword in self._groups:
403 return self._groups[keyword]
404 return []
405
406
407 def group_by_keywords(
408 arg_list: Argv,
409 keyword_set: StringCollection,
410 implicit_first_keyword: str | None = None,
411 ) -> ArgsByKeywords:
412 """
413 Separate argv into groups delimited by specified keywords
414
415 arg_list -- commandline arguments containing keywords
416 keyword_set -- all expected keywords
417 implicit_first_keyword -- key for capturing args before the first keyword
418 """
419 args_by_keywords: dict[str, list[Argv]] = {}
420
421 def new_keyword(keyword: str) -> None:
422 if keyword not in args_by_keywords:
423 args_by_keywords[keyword] = []
424 args_by_keywords[keyword].append([])
425
426 if arg_list:
427 if arg_list[0] not in keyword_set:
428 if not implicit_first_keyword:
429 raise CmdLineInputError()
430 current_keyword = implicit_first_keyword
431 new_keyword(current_keyword)
432
433 for arg in arg_list:
434 if arg in keyword_set:
435 current_keyword = arg
436 new_keyword(current_keyword)
437 else:
438 args_by_keywords[current_keyword][-1].append(arg)
439
440 return ArgsByKeywords(args_by_keywords)
441
442
443 def parse_typed_arg(
444 arg: str, allowed_types: StringSequence, default_type: str
445 ) -> tuple[str, str]:
446 """
447 Get (type, value) from a typed commandline argument.
448
449 Split the argument by the type separator and return the type and the value.
450 Raise CmdLineInputError in the argument format or type is not valid.
451 string arg -- commandline argument
452 Iterable allowed_types -- list of allowed argument types
453 string default_type -- type to return if the argument doesn't specify a type
454 """
455 if ARG_TYPE_DELIMITER not in arg:
456 return default_type, arg
457 arg_type, arg_value = arg.split(ARG_TYPE_DELIMITER, 1)
458 if not arg_type:
459 return default_type, arg_value
460 if arg_type not in allowed_types:
461 raise CmdLineInputError(
462 (
463 "'{arg_type}' is not an allowed type for '{arg_full}', use "
464 "{hint}"
465 ).format(
466 arg_type=arg_type,
467 arg_full=arg,
468 hint=", ".join(sorted(allowed_types)),
469 )
470 )
471 return arg_type, arg_value
472
473
474 def _is_num(arg: str) -> bool:
475 if arg.lower() == INFINITY.lower():
476 return True
477 try:
478 int(arg)
479 return True
480 except ValueError:
481 return False
482
483
484 def _is_float(arg: str) -> bool:
485 try:
486 float(arg)
487 return True
488 except ValueError:
489 return False
490
491
492 def _is_negative_num(arg: str) -> bool:
493 return arg.startswith("-") and (_is_num(arg[1:]) or _is_float(arg))
494
495
496 def is_short_option_expecting_value(arg: str) -> bool:
497 return len(arg) == 2 and arg[0] == "-" and f"{arg[1]}:" in PCS_SHORT_OPTIONS
498
499
500 def is_long_option_expecting_value(arg: str) -> bool:
501 return (
502 len(arg) > 2 and arg[0:2] == "--" and f"{arg[2:]}=" in PCS_LONG_OPTIONS
503 )
504
505
506 def is_option_expecting_value(arg: str) -> bool:
507 return is_short_option_expecting_value(
508 arg
509 ) or is_long_option_expecting_value(arg)
510
511
512 # DEPRECATED
513 # TODO remove
514 # This function is called only by deprecated code for parsing argv containing
515 # negative numbers without -- prepending them.
516 def filter_out_non_option_negative_numbers(arg_list: Argv) -> tuple[Argv, Argv]:
517 """
518 Return arg_list without non-option negative numbers.
519 Negative numbers following the option expecting value are kept.
520
521 There is the problematic legacy:
522 Argument "--" has special meaning: it can be used to signal that no more
523 options will follow. This would solve the problem with negative numbers in
524 a standard way: there would be no special approach to negative numbers,
525 everything would be left in the hands of users.
526
527 We cannot use "--" as it would be a backward incompatible change:
528 * "pcs ... -infinity" would not work any more, users would have to switch
529 to "pcs ... -- ... -infinity"
530 * previously, position of some --options mattered, for example
531 "--clone <clone options>", this syntax would not be possible with the "--"
532 in place
533
534 Currently used --options, which may be problematic when switching to "--":
535 * --group <group name>, --before | --after <resource id>
536 * pcs resource | stonith create, pcs resource group add, pcs tag update
537 * They have a single argument, so they would work even with --. But the
538 command may look weird:
539 pcs resource create --group G --after R2 -- R3 ocf:pacemaker:Dummy
540 vs. current command
541 pcs resource create R3 ocf:pacemaker:Dummy --group G --after R2
542
543 list arg_list contains command line arguments
544 """
545 args_without_negative_nums = []
546 args_filtered_out = []
547 for i, arg in enumerate(arg_list):
548 prev_arg = arg_list[i - 1] if i > 0 else ""
549 if not _is_negative_num(arg) or is_option_expecting_value(prev_arg):
550 args_without_negative_nums.append(arg)
551 else:
552 args_filtered_out.append(arg)
553
554 return args_without_negative_nums, args_filtered_out
555
556
557 # DEPRECATED
558 # TODO remove
559 # This function is called only by deprecated code for parsing argv containing
560 # negative numbers without -- prepending them.
561 def filter_out_options(arg_list: Argv) -> Argv:
562 """
563 Return arg_list without options and negative numbers
564
565 See a comment in filter_out_non_option_negative_numbers.
566
567 arg_list -- command line arguments
568 """
569 args_without_options = []
570 for i, arg in enumerate(arg_list):
571 prev_arg = arg_list[i - 1] if i > 0 else ""
572 if not is_option_expecting_value(prev_arg) and (
573 not arg.startswith("-") or arg == "-" or _is_negative_num(arg)
574 ):
575 args_without_options.append(arg)
576 return args_without_options
577
578
579 def wait_to_timeout(wait: bool | str | None) -> int:
580 if wait is False:
581 return -1
582 if wait is None:
583 return 0
584 timeout = timeout_to_seconds(wait)
585 if timeout is None:
586 raise CmdLineInputError(f"'{wait}' is not a valid interval value")
587 return timeout
588
589
590 class InputModifiers:
591 def __init__(self, options: Mapping[str, ModifierValueType]):
592 self._defined_options = set(options.keys())
593 self._options = dict(options)
594 self._options.update(
595 {opt: opt in options for opt in MODIFIER_OPTIONS_BOOL}
596 )
597 self._options.update(
598 {opt: options.get(opt, None) for opt in MODIFIER_OPTIONS_VAL}
599 )
600 self._options.update(
601 {
602 OUTPUT_FORMAT_OPTION: options.get(
603 OUTPUT_FORMAT_OPTION, OUTPUT_FORMAT_VALUE_TEXT
604 ),
605 "--wait": options.get("--wait", False),
606 }
607 )
608
609 def get_subset(
610 self, *options: str, **custom_options: ModifierValueType
611 ) -> "InputModifiers":
612 opt_dict = {
613 opt: self.get(opt) for opt in options if self.is_specified(opt)
614 }
615 opt_dict.update(custom_options)
616 return InputModifiers(opt_dict)
617
618 def ensure_only_supported(
619 self,
620 *supported_options: str,
621 hint_syntax_changed: str | None = None,
622 output_format_supported: bool = False,
623 ) -> None:
624 # --debug is supported in all commands
625 supported_options_set = set(supported_options) | {"--debug"}
626 if output_format_supported:
627 supported_options_set.add(OUTPUT_FORMAT_OPTION)
628 unsupported_options = self._defined_options - supported_options_set
629 if unsupported_options:
630 pluralize = partial(format_plural, unsupported_options)
631 raise CmdLineInputError(
632 "Specified {option} {option_list} {_is} not supported in this "
633 "command".format(
634 option=pluralize("option"),
635 option_list=format_list(sorted(unsupported_options)),
636 _is=pluralize("is"),
637 ),
638 hint=(
639 "Syntax has changed from previous version. {}".format(
640 SEE_MAN_CHANGES.format(hint_syntax_changed)
641 )
642 if hint_syntax_changed
643 else None
644 ),
645 )
646
647 def ensure_not_mutually_exclusive(self, *mutually_exclusive: str) -> None:
648 """
649 Raise CmdLineInputError if several exclusive options were specified
650
651 mutually_exclusive -- mutually exclusive options
652 """
653 options_to_report = self._defined_options & set(mutually_exclusive)
654 if len(options_to_report) > 1:
655 raise CmdLineInputError(
656 "Only one of {} can be used".format(
657 format_list(sorted(options_to_report))
658 )
659 )
660
661 def ensure_not_incompatible(
662 self, checked: str, incompatible: StringCollection
663 ) -> None:
664 """
665 Raise CmdLineInputError if both the checked and an incompatible option
666 were specified
667
668 checked -- option incompatible with any of incompatible options
669 incompatible -- set of options incompatible with checked
670 """
671 if checked not in self._defined_options:
672 return
673 disallowed = self._defined_options & set(incompatible)
674 if disallowed:
675 raise CmdLineInputError(
676 "'{}' cannot be used with {}".format(
677 checked, format_list(sorted(disallowed))
678 )
679 )
680
681 def ensure_dependency_satisfied(
682 self, main_option: str, dependent_options: StringCollection
683 ) -> None:
684 """
685 Raise CmdLineInputError if any of dependent_options is present and
686 main_option is not present.
687
688 main_option -- option on which dependent_options depend
689 dependent_options -- none of these options can be specified if
690 main_option is not specified
691 """
692 if main_option in self._defined_options:
693 return
694 disallowed = self._defined_options & set(dependent_options)
695 if disallowed:
696 raise CmdLineInputError(
697 "{} cannot be used without '{}'".format(
698 format_list(sorted(disallowed)), main_option
699 )
700 )
701
702 def is_specified(self, option: str) -> bool:
703 return option in self._defined_options
704
705 def is_specified_any(self, option_list: StringIterable) -> bool:
706 return any(self.is_specified(option) for option in option_list)
707
708 def get(
709 self, option: str, default: ModifierValueType = None
710 ) -> ModifierValueType:
711 if option in self._defined_options:
|
CID (unavailable; MK=489eaaa87afbb746a3d63c1bcfb03702) (#1 of 1): Copy-paste error (COPY_PASTE_ERROR): |
|
(2) Event copy_paste_error: |
"_options" in "self._options[option]" looks like a copy-paste error. |
|
(3) Event remediation: |
Should it say "_defined_options" instead? |
| Also see events: |
[original] |
712 return self._options[option]
713 if default is not None:
714 return default
715 if option in self._options:
716 return self._options[option]
717 raise AssertionError(f"Non existing default value for '{option}'")
718
719 def get_output_format(
720 self,
721 supported_formats: StringCollection = OUTPUT_FORMAT_VALUES,
722 ) -> str:
723 output_format = self.get(OUTPUT_FORMAT_OPTION)
724 if output_format in supported_formats:
725 return str(output_format)
726 raise CmdLineInputError(
727 (
728 "Unknown value '{value}' for '{option}' option. Supported "
729 "{value_pl} {is_pl}: {supported}"
730 ).format(
731 value=output_format,
732 option=OUTPUT_FORMAT_OPTION,
733 value_pl=format_plural(supported_formats, "value"),
734 is_pl=format_plural(supported_formats, "is"),
735 supported=format_list(list(supported_formats)),
736 )
737 )
738
739
740 def get_rule_str(argv: Argv) -> str | None:
741 if argv:
742 if len(argv) > 1:
743 # deprecated after 0.11.7
744 deprecation_warning(
745 "Specifying a rule as multiple arguments is deprecated and "
746 "might be removed in a future release, specify the rule as "
747 "a single string instead"
748 )
749 return " ".join(argv)
750 return argv[0]
751 return None
752