1 from collections import Counter
2 from functools import partial
3 from typing import (
4 Final,
5 Mapping,
6 Optional,
7 Union,
8 )
9
10 from pcs.cli.common.errors import (
11 SEE_MAN_CHANGES,
12 CmdLineInputError,
13 )
14 from pcs.cli.reports.output import deprecation_warning
15 from pcs.common.const import INFINITY
16 from pcs.common.str_tools import (
17 format_list,
18 format_list_custom_last_separator,
19 format_plural,
20 )
21 from pcs.common.tools import timeout_to_seconds
22 from pcs.common.types import (
23 StringCollection,
24 StringIterable,
25 StringSequence,
26 )
27
28 # sys.argv always returns a list, we don't need StringSequence in here
29 Argv = list[str]
30 ModifierValueType = Union[None, bool, str]
31
32 _FUTURE_OPTION_STR: Final = "future"
33 FUTURE_OPTION: Final = f"--{_FUTURE_OPTION_STR}"
34 _OUTPUT_FORMAT_OPTION_STR: Final = "output-format"
35 OUTPUT_FORMAT_OPTION: Final = f"--{_OUTPUT_FORMAT_OPTION_STR}"
36 OUTPUT_FORMAT_VALUE_CMD: Final = "cmd"
37 OUTPUT_FORMAT_VALUE_JSON: Final = "json"
38 OUTPUT_FORMAT_VALUE_TEXT: Final = "text"
39 OUTPUT_FORMAT_VALUES: Final = frozenset(
40 (
41 OUTPUT_FORMAT_VALUE_CMD,
42 OUTPUT_FORMAT_VALUE_JSON,
43 OUTPUT_FORMAT_VALUE_TEXT,
44 )
45 )
46
47 ARG_TYPE_DELIMITER: Final = "%"
48
49 # h = help, f = file,
50 # p = password (cluster auth), u = user (cluster auth),
51 PCS_SHORT_OPTIONS: Final = "hf:p:u:"
52 PCS_LONG_OPTIONS: Final = [
53 "debug",
54 "version",
55 "help",
56 "fullhelp",
57 "force",
58 "skip-offline",
59 "autodelete",
60 "simulate",
61 "all",
62 "full",
63 "local",
64 "wait",
65 "config",
66 "start",
67 "enable",
68 "disabled",
69 "off",
70 "request-timeout=",
71 "brief",
72 _FUTURE_OPTION_STR,
73 # resource (safe-)disable
74 "safe",
75 "no-strict",
76 # resource cleanup | refresh
77 "strict",
78 "pacemaker",
79 "corosync",
80 "no-default-ops",
81 "defaults",
82 "nodesc",
83 "promoted",
84 "name=",
85 "group=",
86 "node=",
87 "from=",
88 "to=",
89 "after=",
90 "before=",
91 "corosync_conf=",
92 "booth-conf=",
93 "booth-key=",
94 # do not stop resources in resource delete command
95 "no-stop",
96 "no-watchdog-validation",
97 # pcs cluster setup
98 "no-cluster-uuid",
99 "no-keys-sync",
100 # in pcs status - do not display resource status on inactive node
101 "hide-inactive",
102 # pcs resource (un)manage - enable or disable monitor operations
103 "monitor",
104 # TODO remove
105 # used only in deprecated 'pcs resource|stonith show'
106 "groups",
107 # "pcs resource clear --expired" - only clear expired moves and bans
108 "expired",
109 # disable evaluating whether rules are expired
110 "no-expire-check",
111 # allow overwriting existing files, currently meant for / used in CLI only
112 "overwrite",
113 # output format of commands, e.g: json, cmd, text, ...
114 f"{_OUTPUT_FORMAT_OPTION_STR}=",
115 # auth token
116 "token=",
117 # enable agent self validation
118 "agent-validation",
119 # disable text output in query commands
120 "quiet",
121 # proceed with dangerous actions, meant for / used in CLI only
122 "yes",
123 # retrieve and display cibsecret values / used in CLI only
124 "show-secrets",
125 ]
126
127
128 def split_list(arg_list: Argv, separator: str) -> list[Argv]:
129 """
130 split a list of arguments to several lists using separator as a delimiter
131
132 arg_list -- list of command line arguments to split
133 separator -- delimiter
134 """
135 separator_indexes = [i for i, x in enumerate(arg_list) if x == separator]
136 bounds = zip(
137 [0] + [i + 1 for i in separator_indexes],
138 separator_indexes + [None],
139 strict=False,
140 )
141 return [arg_list[i:j] for i, j in bounds]
142
143
144 def split_list_by_any_keywords(
145 arg_list: Argv, keyword_label: str
146 ) -> dict[str, Argv]:
147 """
148 split a list of arguments using any argument not containing = as a delimiter
149
150 arg_list -- list of command line arguments to split
151 keyword_label -- description of all keywords
152 """
153 groups: dict[str, Argv] = {}
154 if not arg_list:
155 return groups
156
157 if "=" in arg_list[0]:
158 raise CmdLineInputError(
159 f"Invalid character '=' in {keyword_label} '{arg_list[0]}'"
160 )
161
162 current_keyword = arg_list[0]
163 groups[current_keyword] = []
164 for arg in arg_list[1:]:
165 if "=" in arg:
166 groups[current_keyword].append(arg)
167 else:
168 current_keyword = arg
169 if current_keyword in groups:
170 raise CmdLineInputError(
171 "{} '{}' defined multiple times".format(
172 keyword_label.capitalize(), current_keyword
173 )
174 )
175 groups[current_keyword] = []
176 return groups
177
178
179 def split_option(arg: str, allow_empty_value: bool = True) -> tuple[str, str]:
180 """
181 Get (key, value) from a key=value commandline argument.
182
183 Split the argument by the first = and return resulting parts. Raise
184 CmdLineInputError if the argument cannot be split.
185
186 arg -- commandline argument to split
187 allow_empty_value -- if False, raise CmdLineInputError on empty value
188 """
189 if "=" not in arg:
190 raise CmdLineInputError(f"missing value of '{arg}' option")
191 if arg.startswith("="):
192 raise CmdLineInputError(f"missing key in '{arg}' option")
193 key, value = arg.split("=", 1)
194 if not (value or allow_empty_value):
195 raise CmdLineInputError(f"value of '{key}' option is empty")
196 return key, value
197
198
199 def ensure_unique_args(cmdline_args: Argv) -> None:
200 """
201 Raises in case there are duplicate args
202 """
203 duplicities = [
204 item for item, count in Counter(cmdline_args).items() if count > 1
205 ]
206 if duplicities:
207 argument_pl = format_plural(duplicities, "argument")
208 duplicities_list = format_list(duplicities)
209 raise CmdLineInputError(f"duplicate {argument_pl}: {duplicities_list}")
210
211
212 class KeyValueParser:
213 """
214 Parse and check key=value options
215 """
216
217 def __init__(self, arg_list: Argv, repeatable: StringCollection = ()):
218 """
219 arg_list -- commandline arguments to be parsed
220 repeatable -- keys that are allowed to be specified several times
221 """
222 self._repeatable_keys = repeatable
223 self._key_value_map: dict[str, list[str]] = {}
224 for arg in arg_list:
225 name, value = split_option(arg)
226 if name not in self._key_value_map:
227 self._key_value_map[name] = [value]
228 else:
229 self._key_value_map[name].append(value)
230
231 def check_allowed_keys(self, allowed_keys: StringCollection) -> None:
232 """
233 Check that only allowed keys were specified
234
235 allowed_keys -- list of allowed keys
236 """
237 unknown_options = set(self._key_value_map.keys()) - set(allowed_keys)
238 if unknown_options:
239 raise CmdLineInputError(
240 "Unknown option{s} '{options}'".format(
241 s=("s" if len(unknown_options) > 1 else ""),
242 options="', '".join(sorted(unknown_options)),
243 )
244 )
245
246 def get_unique(self) -> dict[str, str]:
247 """
248 Get all non-repeatable keys and their values; raise if a key has more values
249 """
250 result: dict[str, str] = {}
251 for key, values in self._key_value_map.items():
252 if key in self._repeatable_keys:
253 continue
254 values_uniq = set(values)
255 if len(values_uniq) > 1:
256 raise CmdLineInputError(
257 f"duplicate option '{key}' with different values "
258 f"{format_list_custom_last_separator(values_uniq, ' and ')}"
259 )
260 result[key] = values[0]
261 return result
262
263 def get_repeatable(self) -> dict[str, list[str]]:
264 """
265 Get all repeatable keys and their values
266 """
267 return {
268 key: self._key_value_map[key]
269 for key in self._repeatable_keys
270 if key in self._key_value_map
271 }
272
273
274 class ArgsByKeywords:
275 def __init__(self, groups: Mapping[str, list[Argv]]):
276 self._groups = groups
277 self._flat_cache: dict[str, Argv] = {}
278
279 def allow_repetition_only_for(self, keyword_set: StringCollection) -> None:
280 """
281 Raise CmdLineInputError if a keyword has been repeated when not allowed
282
283 keyword_set -- repetition is allowed for these keywords
284 """
285 for keyword, arg_groups in self._groups.items():
286 if len(arg_groups) > 1 and keyword not in keyword_set:
287 raise CmdLineInputError(
288 f"'{keyword}' cannot be used more than once"
289 )
290
291 def ensure_unique_keywords(self) -> None:
292 """
293 Raise CmdLineInputError if any keyword has been repeated
294 """
295 return self.allow_repetition_only_for(set())
296
297 def is_empty(self) -> bool:
298 """
299 Check if any args have been specified
300 """
301 return not self._groups
302
303 def has_keyword(self, keyword: str) -> bool:
304 """
305 Check if a keyword has been specified
306
307 keyword -- a keyword to check
308 """
309 return keyword in self._groups
310
311 def has_empty_keyword(self, keyword: str) -> bool:
312 """
313 Check if a keyword has been specified without any following args
314
315 keyword -- a keyword to check
316 """
317 return self.has_keyword(keyword) and not self.get_args_flat(keyword)
318
319 def get_args_flat(self, keyword: str) -> Argv:
320 """
321 Get arguments of a keyword in one sequence
322 """
323 if keyword in self._groups:
324 if keyword not in self._flat_cache:
325 self._flat_cache[keyword] = [
326 arg
327 for one_group in self._groups[keyword]
328 for arg in one_group
329 ]
330 return self._flat_cache[keyword]
331 return []
332
333 def get_args_groups(self, keyword: str) -> list[Argv]:
334 """
335 Get arguments of a keyword, one group for each keyword occurrence
336 """
337 if keyword in self._groups:
338 return self._groups[keyword]
339 return []
340
341
342 def group_by_keywords(
343 arg_list: Argv,
344 keyword_set: StringCollection,
345 implicit_first_keyword: Optional[str] = None,
346 ) -> ArgsByKeywords:
347 """
348 Separate argv into groups delimited by specified keywords
349
350 arg_list -- commandline arguments containing keywords
351 keyword_set -- all expected keywords
352 implicit_first_keyword -- key for capturing args before the first keyword
353 """
354 args_by_keywords: dict[str, list[Argv]] = {}
355
356 def new_keyword(keyword: str) -> None:
357 if keyword not in args_by_keywords:
358 args_by_keywords[keyword] = []
359 args_by_keywords[keyword].append([])
360
361 if arg_list:
362 if arg_list[0] not in keyword_set:
363 if not implicit_first_keyword:
364 raise CmdLineInputError()
365 current_keyword = implicit_first_keyword
366 new_keyword(current_keyword)
367
368 for arg in arg_list:
369 if arg in keyword_set:
370 current_keyword = arg
371 new_keyword(current_keyword)
372 else:
373 args_by_keywords[current_keyword][-1].append(arg)
374
375 return ArgsByKeywords(args_by_keywords)
376
377
378 def parse_typed_arg(
379 arg: str, allowed_types: StringSequence, default_type: str
380 ) -> tuple[str, str]:
381 """
382 Get (type, value) from a typed commandline argument.
383
384 Split the argument by the type separator and return the type and the value.
385 Raise CmdLineInputError in the argument format or type is not valid.
386 string arg -- commandline argument
387 Iterable allowed_types -- list of allowed argument types
388 string default_type -- type to return if the argument doesn't specify a type
389 """
390 if ARG_TYPE_DELIMITER not in arg:
391 return default_type, arg
392 arg_type, arg_value = arg.split(ARG_TYPE_DELIMITER, 1)
393 if not arg_type:
394 return default_type, arg_value
395 if arg_type not in allowed_types:
396 raise CmdLineInputError(
397 (
398 "'{arg_type}' is not an allowed type for '{arg_full}', use "
399 "{hint}"
400 ).format(
401 arg_type=arg_type,
402 arg_full=arg,
403 hint=", ".join(sorted(allowed_types)),
404 )
405 )
406 return arg_type, arg_value
407
408
409 def _is_num(arg: str) -> bool:
410 if arg.lower() == INFINITY.lower():
411 return True
412 try:
413 int(arg)
414 return True
415 except ValueError:
416 return False
417
418
419 def _is_float(arg: str) -> bool:
420 try:
421 float(arg)
422 return True
423 except ValueError:
424 return False
425
426
427 def _is_negative_num(arg: str) -> bool:
428 return arg.startswith("-") and (_is_num(arg[1:]) or _is_float(arg))
429
430
431 def is_short_option_expecting_value(arg: str) -> bool:
432 return len(arg) == 2 and arg[0] == "-" and f"{arg[1]}:" in PCS_SHORT_OPTIONS
433
434
435 def is_long_option_expecting_value(arg: str) -> bool:
436 return (
437 len(arg) > 2 and arg[0:2] == "--" and f"{arg[2:]}=" in PCS_LONG_OPTIONS
438 )
439
440
441 def is_option_expecting_value(arg: str) -> bool:
442 return is_short_option_expecting_value(
443 arg
444 ) or is_long_option_expecting_value(arg)
445
446
447 # DEPRECATED
448 # TODO remove
449 # This function is called only by deprecated code for parsing argv containing
450 # negative numbers without -- prepending them.
451 def filter_out_non_option_negative_numbers(arg_list: Argv) -> tuple[Argv, Argv]:
452 """
453 Return arg_list without non-option negative numbers.
454 Negative numbers following the option expecting value are kept.
455
456 There is the problematic legacy:
457 Argument "--" has special meaning: it can be used to signal that no more
458 options will follow. This would solve the problem with negative numbers in
459 a standard way: there would be no special approach to negative numbers,
460 everything would be left in the hands of users.
461
462 We cannot use "--" as it would be a backward incompatible change:
463 * "pcs ... -infinity" would not work any more, users would have to switch
464 to "pcs ... -- ... -infinity"
465 * previously, position of some --options mattered, for example
466 "--clone <clone options>", this syntax would not be possible with the "--"
467 in place
468
469 Currently used --options, which may be problematic when switching to "--":
470 * --group <group name>, --before | --after <resource id>
471 * pcs resource | stonith create, pcs resource group add, pcs tag update
472 * They have a single argument, so they would work even with --. But the
473 command may look weird:
474 pcs resource create --group G --after R2 -- R3 ocf:pacemaker:Dummy
475 vs. current command
476 pcs resource create R3 ocf:pacemaker:Dummy --group G --after R2
477
478 list arg_list contains command line arguments
479 """
480 args_without_negative_nums = []
481 args_filtered_out = []
482 for i, arg in enumerate(arg_list):
483 prev_arg = arg_list[i - 1] if i > 0 else ""
484 if not _is_negative_num(arg) or is_option_expecting_value(prev_arg):
485 args_without_negative_nums.append(arg)
486 else:
487 args_filtered_out.append(arg)
488
489 return args_without_negative_nums, args_filtered_out
490
491
492 # DEPRECATED
493 # TODO remove
494 # This function is called only by deprecated code for parsing argv containing
495 # negative numbers without -- prepending them.
496 def filter_out_options(arg_list: Argv) -> Argv:
497 """
498 Return arg_list without options and negative numbers
499
500 See a comment in filter_out_non_option_negative_numbers.
501
502 arg_list -- command line arguments
503 """
504 args_without_options = []
505 for i, arg in enumerate(arg_list):
506 prev_arg = arg_list[i - 1] if i > 0 else ""
507 if not is_option_expecting_value(prev_arg) and (
508 not arg.startswith("-") or arg == "-" or _is_negative_num(arg)
509 ):
510 args_without_options.append(arg)
511 return args_without_options
512
513
514 def wait_to_timeout(wait: Union[bool, str, None]) -> int:
515 if wait is False:
516 return -1
517 if wait is None:
518 return 0
519 timeout = timeout_to_seconds(wait)
520 if timeout is None:
521 raise CmdLineInputError(f"'{wait}' is not a valid interval value")
522 return timeout
523
524
525 class InputModifiers:
526 def __init__(self, options: Mapping[str, ModifierValueType]):
527 self._defined_options = set(options.keys())
528 self._options = dict(options)
529 self._options.update(
530 {
531 # boolean values
532 "--all": "--all" in options,
533 "--agent-validation": "--agent-validation" in options,
534 # TODO remove
535 # used only in acl commands and it is deprecated there
536 "--autodelete": "--autodelete" in options,
537 "--brief": "--brief" in options,
538 "--config": "--config" in options,
539 "--corosync": "--corosync" in options,
540 "--debug": "--debug" in options,
541 "--defaults": "--defaults" in options,
542 "--disabled": "--disabled" in options,
543 "--enable": "--enable" in options,
544 "--expired": "--expired" in options,
545 "--force": "--force" in options,
546 "--full": "--full" in options,
547 "--quiet": "--quiet" in options,
548 FUTURE_OPTION: FUTURE_OPTION in options,
549 # TODO remove
550 # used only in deprecated 'pcs resource|stonith show'
551 "--groups": "--groups" in options,
552 "--hide-inactive": "--hide-inactive" in options,
553 "--local": "--local" in options,
554 "--monitor": "--monitor" in options,
555 "--no-default-ops": "--no-default-ops" in options,
556 "--nodesc": "--nodesc" in options,
557 "--no-expire-check": "--no-expire-check" in options,
558 "--no-cluster-uuid": "--no-cluster-uuid" in options,
559 "--no-keys-sync": "--no-keys-sync" in options,
560 "--no-stop": "--no-stop" in options,
561 "--no-strict": "--no-strict" in options,
562 "--no-watchdog-validation": (
563 "--no-watchdog-validation" in options
564 ),
565 "--off": "--off" in options,
566 "--overwrite": "--overwrite" in options,
567 "--pacemaker": "--pacemaker" in options,
568 "--promoted": "--promoted" in options,
569 "--safe": "--safe" in options,
570 "--show-secrets": "--show-secretes" in options,
571 "--simulate": "--simulate" in options,
572 "--skip-offline": "--skip-offline" in options,
573 "--start": "--start" in options,
574 "--strict": "--strict" in options,
575 "--yes": "--yes" in options,
576 # string values
577 "--after": options.get("--after", None),
578 "--before": options.get("--before", None),
579 "--booth-conf": options.get("--booth-conf", None),
580 "--booth-key": options.get("--booth-key", None),
581 "--corosync_conf": options.get("--corosync_conf", None),
582 "--from": options.get("--from", None),
583 # TODO remove
584 # used in resource create and stonith create, deprecated in both
585 "--group": options.get("--group", None),
586 "--name": options.get("--name", None),
587 "--node": options.get("--node", None),
588 OUTPUT_FORMAT_OPTION: options.get(
589 OUTPUT_FORMAT_OPTION, OUTPUT_FORMAT_VALUE_TEXT
590 ),
591 "--request-timeout": options.get("--request-timeout", None),
592 "--to": options.get("--to", None),
593 "--token": options.get("--token", None),
594 "--wait": options.get("--wait", False),
595 "-f": options.get("-f", None),
596 "-p": options.get("-p", None),
597 "-u": options.get("-u", None),
598 }
599 )
600
601 def get_subset(
602 self, *options: str, **custom_options: ModifierValueType
603 ) -> "InputModifiers":
604 opt_dict = {
605 opt: self.get(opt) for opt in options if self.is_specified(opt)
606 }
607 opt_dict.update(custom_options)
608 return InputModifiers(opt_dict)
609
610 def ensure_only_supported(
611 self,
612 *supported_options: str,
613 hint_syntax_changed: Optional[str] = None,
614 output_format_supported: bool = False,
615 ) -> None:
616 # --debug is supported in all commands
617 supported_options_set = set(supported_options) | {"--debug"}
618 if output_format_supported:
619 supported_options_set.add(OUTPUT_FORMAT_OPTION)
620 unsupported_options = self._defined_options - supported_options_set
621 if unsupported_options:
622 pluralize = partial(format_plural, unsupported_options)
623 raise CmdLineInputError(
624 "Specified {option} {option_list} {_is} not supported in this "
625 "command".format(
626 option=pluralize("option"),
627 option_list=format_list(sorted(unsupported_options)),
628 _is=pluralize("is"),
629 ),
630 hint=(
631 "Syntax has changed from previous version. {}".format(
632 SEE_MAN_CHANGES.format(hint_syntax_changed)
633 )
634 if hint_syntax_changed
635 else None
636 ),
637 )
638
639 def ensure_not_mutually_exclusive(self, *mutually_exclusive: str) -> None:
640 """
641 Raise CmdLineInputError if several exclusive options were specified
642
643 mutually_exclusive -- mutually exclusive options
644 """
645 options_to_report = self._defined_options & set(mutually_exclusive)
646 if len(options_to_report) > 1:
647 raise CmdLineInputError(
648 "Only one of {} can be used".format(
649 format_list(sorted(options_to_report))
650 )
651 )
652
653 def ensure_not_incompatible(
654 self, checked: str, incompatible: StringCollection
655 ) -> None:
656 """
657 Raise CmdLineInputError if both the checked and an incompatible option
658 were specified
659
660 checked -- option incompatible with any of incompatible options
661 incompatible -- set of options incompatible with checked
662 """
663 if checked not in self._defined_options:
664 return
665 disallowed = self._defined_options & set(incompatible)
666 if disallowed:
667 raise CmdLineInputError(
668 "'{}' cannot be used with {}".format(
669 checked, format_list(sorted(disallowed))
670 )
671 )
672
673 def ensure_dependency_satisfied(
674 self, main_option: str, dependent_options: StringCollection
675 ) -> None:
676 """
677 Raise CmdLineInputError if any of dependent_options is present and
678 main_option is not present.
679
680 main_option -- option on which dependent_options depend
681 dependent_options -- none of these options can be specified if
682 main_option is not specified
683 """
684 if main_option in self._defined_options:
685 return
686 disallowed = self._defined_options & set(dependent_options)
687 if disallowed:
688 raise CmdLineInputError(
689 "{} cannot be used without '{}'".format(
690 format_list(sorted(disallowed)), main_option
691 )
692 )
693
694 def is_specified(self, option: str) -> bool:
695 return option in self._defined_options
696
697 def is_specified_any(self, option_list: StringIterable) -> bool:
698 return any(self.is_specified(option) for option in option_list)
699
700 def get(
701 self, option: str, default: ModifierValueType = None
702 ) -> ModifierValueType:
703 if option in self._defined_options:
|
(2) Event copy_paste_error: |
"_options" in "self._options[option]" looks like a copy-paste error. |
|
(3) Event remediation: |
Should it say "_defined_options" instead? |
| Also see events: |
[original] |
704 return self._options[option]
705 if default is not None:
706 return default
707 if option in self._options:
708 return self._options[option]
709 raise AssertionError(f"Non existing default value for '{option}'")
710
711 def get_output_format(
712 self,
713 supported_formats: StringCollection = OUTPUT_FORMAT_VALUES,
714 ) -> str:
715 output_format = self.get(OUTPUT_FORMAT_OPTION)
716 if output_format in supported_formats:
717 return str(output_format)
718 raise CmdLineInputError(
719 (
720 "Unknown value '{value}' for '{option}' option. Supported "
721 "{value_pl} {is_pl}: {supported}"
722 ).format(
723 value=output_format,
724 option=OUTPUT_FORMAT_OPTION,
725 value_pl=format_plural(supported_formats, "value"),
726 is_pl=format_plural(supported_formats, "is"),
727 supported=format_list(list(supported_formats)),
728 )
729 )
730
731
732 def get_rule_str(argv: Argv) -> Optional[str]:
733 if argv:
734 if len(argv) > 1:
735 # deprecated after 0.11.7
736 deprecation_warning(
737 "Specifying a rule as multiple arguments is deprecated and "
738 "might be removed in a future release, specify the rule as "
739 "a single string instead"
740 )
741 return " ".join(argv)
742 return argv[0]
743 return None
744