1 from collections import Counter
2 from functools import partial
3 from typing import (
4 Final,
5 Mapping,
6 Optional,
7 Union,
8 )
9
10 from pcs.cli.common.errors import (
11 SEE_MAN_CHANGES,
12 CmdLineInputError,
13 )
14 from pcs.cli.reports.output import deprecation_warning
15 from pcs.common.const import INFINITY
16 from pcs.common.str_tools import (
17 format_list,
18 format_list_custom_last_separator,
19 format_plural,
20 )
21 from pcs.common.tools import timeout_to_seconds
22 from pcs.common.types import (
23 StringCollection,
24 StringIterable,
25 StringSequence,
26 )
27
28 # sys.argv always returns a list, we don't need StringSequence in here
29 Argv = list[str]
30 ModifierValueType = Union[None, bool, str]
31
32 _FUTURE_OPTION_STR: Final = "future"
33 FUTURE_OPTION: Final = f"--{_FUTURE_OPTION_STR}"
34 _OUTPUT_FORMAT_OPTION_STR: Final = "output-format"
35 OUTPUT_FORMAT_OPTION: Final = f"--{_OUTPUT_FORMAT_OPTION_STR}"
36 OUTPUT_FORMAT_VALUE_CMD: Final = "cmd"
37 OUTPUT_FORMAT_VALUE_JSON: Final = "json"
38 OUTPUT_FORMAT_VALUE_TEXT: Final = "text"
39 OUTPUT_FORMAT_VALUES: Final = frozenset(
40 (
41 OUTPUT_FORMAT_VALUE_CMD,
42 OUTPUT_FORMAT_VALUE_JSON,
43 OUTPUT_FORMAT_VALUE_TEXT,
44 )
45 )
46
47 ARG_TYPE_DELIMITER: Final = "%"
48
49 # h = help, f = file,
50 # p = password (cluster auth), u = user (cluster auth),
51 PCS_SHORT_OPTIONS: Final = "hf:p:u:"
52 PCS_LONG_OPTIONS: Final = [
53 "debug",
54 "version",
55 "help",
56 "fullhelp",
57 "force",
58 "skip-offline",
59 "autodelete",
60 "simulate",
61 "all",
62 "full",
63 "local",
64 "wait",
65 "config",
66 "start",
67 "enable",
68 "disabled",
69 "off",
70 "request-timeout=",
71 "brief",
72 _FUTURE_OPTION_STR,
73 # resource (safe-)disable
74 "safe",
75 "no-strict",
76 # resource cleanup | refresh
77 "strict",
78 "pacemaker",
79 "corosync",
80 "no-default-ops",
81 "defaults",
82 "nodesc",
83 "promoted",
84 "name=",
85 "group=",
86 "node=",
87 "from=",
88 "to=",
89 "after=",
90 "before=",
91 "corosync_conf=",
92 "booth-conf=",
93 "booth-key=",
94 # do not stop resources in resource delete command
95 "no-stop",
96 "no-watchdog-validation",
97 # pcs cluster setup
98 "no-cluster-uuid",
99 "no-keys-sync",
100 # in pcs status - do not display resource status on inactive node
101 "hide-inactive",
102 # pcs resource (un)manage - enable or disable monitor operations
103 "monitor",
104 # TODO remove
105 # used only in deprecated 'pcs resource|stonith show'
106 "groups",
107 # "pcs resource clear --expired" - only clear expired moves and bans
108 "expired",
109 # disable evaluating whether rules are expired
110 "no-expire-check",
111 # allow overwriting existing files, currently meant for / used in CLI only
112 "overwrite",
113 # output format of commands, e.g: json, cmd, text, ...
114 f"{_OUTPUT_FORMAT_OPTION_STR}=",
115 # auth token
116 "token=",
117 # enable agent self validation
118 "agent-validation",
119 # disable text output in query commands
120 "quiet",
121 # proceed with dangerous actions, meant for / used in CLI only
122 "yes",
123 ]
124
125
126 def split_list(arg_list: Argv, separator: str) -> list[Argv]:
127 """
128 split a list of arguments to several lists using separator as a delimiter
129
130 arg_list -- list of command line arguments to split
131 separator -- delimiter
132 """
133 separator_indexes = [i for i, x in enumerate(arg_list) if x == separator]
134 bounds = zip(
135 [0] + [i + 1 for i in separator_indexes],
136 separator_indexes + [None],
137 strict=False,
138 )
139 return [arg_list[i:j] for i, j in bounds]
140
141
142 def split_list_by_any_keywords(
143 arg_list: Argv, keyword_label: str
144 ) -> dict[str, Argv]:
145 """
146 split a list of arguments using any argument not containing = as a delimiter
147
148 arg_list -- list of command line arguments to split
149 keyword_label -- description of all keywords
150 """
151 groups: dict[str, Argv] = {}
152 if not arg_list:
153 return groups
154
155 if "=" in arg_list[0]:
156 raise CmdLineInputError(
157 f"Invalid character '=' in {keyword_label} '{arg_list[0]}'"
158 )
159
160 current_keyword = arg_list[0]
161 groups[current_keyword] = []
162 for arg in arg_list[1:]:
163 if "=" in arg:
164 groups[current_keyword].append(arg)
165 else:
166 current_keyword = arg
167 if current_keyword in groups:
168 raise CmdLineInputError(
169 "{} '{}' defined multiple times".format(
170 keyword_label.capitalize(), current_keyword
171 )
172 )
173 groups[current_keyword] = []
174 return groups
175
176
177 def split_option(arg: str, allow_empty_value: bool = True) -> tuple[str, str]:
178 """
179 Get (key, value) from a key=value commandline argument.
180
181 Split the argument by the first = and return resulting parts. Raise
182 CmdLineInputError if the argument cannot be split.
183
184 arg -- commandline argument to split
185 allow_empty_value -- if False, raise CmdLineInputError on empty value
186 """
187 if "=" not in arg:
188 raise CmdLineInputError(f"missing value of '{arg}' option")
189 if arg.startswith("="):
190 raise CmdLineInputError(f"missing key in '{arg}' option")
191 key, value = arg.split("=", 1)
192 if not (value or allow_empty_value):
193 raise CmdLineInputError(f"value of '{key}' option is empty")
194 return key, value
195
196
197 def ensure_unique_args(cmdline_args: Argv) -> None:
198 """
199 Raises in case there are duplicate args
200 """
201 duplicities = [
202 item for item, count in Counter(cmdline_args).items() if count > 1
203 ]
204 if duplicities:
205 argument_pl = format_plural(duplicities, "argument")
206 duplicities_list = format_list(duplicities)
207 raise CmdLineInputError(f"duplicate {argument_pl}: {duplicities_list}")
208
209
210 class KeyValueParser:
211 """
212 Parse and check key=value options
213 """
214
215 def __init__(self, arg_list: Argv, repeatable: StringCollection = ()):
216 """
217 arg_list -- commandline arguments to be parsed
218 repeatable -- keys that are allowed to be specified several times
219 """
220 self._repeatable_keys = repeatable
221 self._key_value_map: dict[str, list[str]] = {}
222 for arg in arg_list:
223 name, value = split_option(arg)
224 if name not in self._key_value_map:
225 self._key_value_map[name] = [value]
226 else:
227 self._key_value_map[name].append(value)
228
229 def check_allowed_keys(self, allowed_keys: StringCollection) -> None:
230 """
231 Check that only allowed keys were specified
232
233 allowed_keys -- list of allowed keys
234 """
235 unknown_options = set(self._key_value_map.keys()) - set(allowed_keys)
236 if unknown_options:
237 raise CmdLineInputError(
238 "Unknown option{s} '{options}'".format(
239 s=("s" if len(unknown_options) > 1 else ""),
240 options="', '".join(sorted(unknown_options)),
241 )
242 )
243
244 def get_unique(self) -> dict[str, str]:
245 """
246 Get all non-repeatable keys and their values; raise if a key has more values
247 """
248 result: dict[str, str] = {}
249 for key, values in self._key_value_map.items():
250 if key in self._repeatable_keys:
251 continue
252 values_uniq = set(values)
253 if len(values_uniq) > 1:
254 raise CmdLineInputError(
255 f"duplicate option '{key}' with different values "
256 f"{format_list_custom_last_separator(values_uniq, ' and ')}"
257 )
258 result[key] = values[0]
259 return result
260
261 def get_repeatable(self) -> dict[str, list[str]]:
262 """
263 Get all repeatable keys and their values
264 """
265 return {
266 key: self._key_value_map[key]
267 for key in self._repeatable_keys
268 if key in self._key_value_map
269 }
270
271
272 class ArgsByKeywords:
273 def __init__(self, groups: Mapping[str, list[Argv]]):
274 self._groups = groups
275 self._flat_cache: dict[str, Argv] = {}
276
277 def allow_repetition_only_for(self, keyword_set: StringCollection) -> None:
278 """
279 Raise CmdLineInputError if a keyword has been repeated when not allowed
280
281 keyword_set -- repetition is allowed for these keywords
282 """
283 for keyword, arg_groups in self._groups.items():
284 if len(arg_groups) > 1 and keyword not in keyword_set:
285 raise CmdLineInputError(
286 f"'{keyword}' cannot be used more than once"
287 )
288
289 def ensure_unique_keywords(self) -> None:
290 """
291 Raise CmdLineInputError if any keyword has been repeated
292 """
293 return self.allow_repetition_only_for(set())
294
295 def is_empty(self) -> bool:
296 """
297 Check if any args have been specified
298 """
299 return not self._groups
300
301 def has_keyword(self, keyword: str) -> bool:
302 """
303 Check if a keyword has been specified
304
305 keyword -- a keyword to check
306 """
307 return keyword in self._groups
308
309 def has_empty_keyword(self, keyword: str) -> bool:
310 """
311 Check if a keyword has been specified without any following args
312
313 keyword -- a keyword to check
314 """
315 return self.has_keyword(keyword) and not self.get_args_flat(keyword)
316
317 def get_args_flat(self, keyword: str) -> Argv:
318 """
319 Get arguments of a keyword in one sequence
320 """
321 if keyword in self._groups:
322 if keyword not in self._flat_cache:
323 self._flat_cache[keyword] = [
324 arg
325 for one_group in self._groups[keyword]
326 for arg in one_group
327 ]
328 return self._flat_cache[keyword]
329 return []
330
331 def get_args_groups(self, keyword: str) -> list[Argv]:
332 """
333 Get arguments of a keyword, one group for each keyword occurrence
334 """
335 if keyword in self._groups:
336 return self._groups[keyword]
337 return []
338
339
340 def group_by_keywords(
341 arg_list: Argv,
342 keyword_set: StringCollection,
343 implicit_first_keyword: Optional[str] = None,
344 ) -> ArgsByKeywords:
345 """
346 Separate argv into groups delimited by specified keywords
347
348 arg_list -- commandline arguments containing keywords
349 keyword_set -- all expected keywords
350 implicit_first_keyword -- key for capturing args before the first keyword
351 """
352 args_by_keywords: dict[str, list[Argv]] = {}
353
354 def new_keyword(keyword: str) -> None:
355 if keyword not in args_by_keywords:
356 args_by_keywords[keyword] = []
357 args_by_keywords[keyword].append([])
358
359 if arg_list:
360 if arg_list[0] not in keyword_set:
361 if not implicit_first_keyword:
362 raise CmdLineInputError()
363 current_keyword = implicit_first_keyword
364 new_keyword(current_keyword)
365
366 for arg in arg_list:
367 if arg in keyword_set:
368 current_keyword = arg
369 new_keyword(current_keyword)
370 else:
371 args_by_keywords[current_keyword][-1].append(arg)
372
373 return ArgsByKeywords(args_by_keywords)
374
375
376 def parse_typed_arg(
377 arg: str, allowed_types: StringSequence, default_type: str
378 ) -> tuple[str, str]:
379 """
380 Get (type, value) from a typed commandline argument.
381
382 Split the argument by the type separator and return the type and the value.
383 Raise CmdLineInputError in the argument format or type is not valid.
384 string arg -- commandline argument
385 Iterable allowed_types -- list of allowed argument types
386 string default_type -- type to return if the argument doesn't specify a type
387 """
388 if ARG_TYPE_DELIMITER not in arg:
389 return default_type, arg
390 arg_type, arg_value = arg.split(ARG_TYPE_DELIMITER, 1)
391 if not arg_type:
392 return default_type, arg_value
393 if arg_type not in allowed_types:
394 raise CmdLineInputError(
395 (
396 "'{arg_type}' is not an allowed type for '{arg_full}', use "
397 "{hint}"
398 ).format(
399 arg_type=arg_type,
400 arg_full=arg,
401 hint=", ".join(sorted(allowed_types)),
402 )
403 )
404 return arg_type, arg_value
405
406
407 def _is_num(arg: str) -> bool:
408 if arg.lower() == INFINITY.lower():
409 return True
410 try:
411 int(arg)
412 return True
413 except ValueError:
414 return False
415
416
417 def _is_float(arg: str) -> bool:
418 try:
419 float(arg)
420 return True
421 except ValueError:
422 return False
423
424
425 def _is_negative_num(arg: str) -> bool:
426 return arg.startswith("-") and (_is_num(arg[1:]) or _is_float(arg))
427
428
429 def is_short_option_expecting_value(arg: str) -> bool:
430 return len(arg) == 2 and arg[0] == "-" and f"{arg[1]}:" in PCS_SHORT_OPTIONS
431
432
433 def is_long_option_expecting_value(arg: str) -> bool:
434 return (
435 len(arg) > 2 and arg[0:2] == "--" and f"{arg[2:]}=" in PCS_LONG_OPTIONS
436 )
437
438
439 def is_option_expecting_value(arg: str) -> bool:
440 return is_short_option_expecting_value(
441 arg
442 ) or is_long_option_expecting_value(arg)
443
444
445 # DEPRECATED
446 # TODO remove
447 # This function is called only by deprecated code for parsing argv containing
448 # negative numbers without -- prepending them.
449 def filter_out_non_option_negative_numbers(arg_list: Argv) -> tuple[Argv, Argv]:
450 """
451 Return arg_list without non-option negative numbers.
452 Negative numbers following the option expecting value are kept.
453
454 There is the problematic legacy:
455 Argument "--" has special meaning: it can be used to signal that no more
456 options will follow. This would solve the problem with negative numbers in
457 a standard way: there would be no special approach to negative numbers,
458 everything would be left in the hands of users.
459
460 We cannot use "--" as it would be a backward incompatible change:
461 * "pcs ... -infinity" would not work any more, users would have to switch
462 to "pcs ... -- ... -infinity"
463 * previously, position of some --options mattered, for example
464 "--clone <clone options>", this syntax would not be possible with the "--"
465 in place
466
467 Currently used --options, which may be problematic when switching to "--":
468 * --group <group name>, --before | --after <resource id>
469 * pcs resource | stonith create, pcs resource group add, pcs tag update
470 * They have a single argument, so they would work even with --. But the
471 command may look weird:
472 pcs resource create --group G --after R2 -- R3 ocf:pacemaker:Dummy
473 vs. current command
474 pcs resource create R3 ocf:pacemaker:Dummy --group G --after R2
475
476 list arg_list contains command line arguments
477 """
478 args_without_negative_nums = []
479 args_filtered_out = []
480 for i, arg in enumerate(arg_list):
481 prev_arg = arg_list[i - 1] if i > 0 else ""
482 if not _is_negative_num(arg) or is_option_expecting_value(prev_arg):
483 args_without_negative_nums.append(arg)
484 else:
485 args_filtered_out.append(arg)
486
487 return args_without_negative_nums, args_filtered_out
488
489
490 # DEPRECATED
491 # TODO remove
492 # This function is called only by deprecated code for parsing argv containing
493 # negative numbers without -- prepending them.
494 def filter_out_options(arg_list: Argv) -> Argv:
495 """
496 Return arg_list without options and negative numbers
497
498 See a comment in filter_out_non_option_negative_numbers.
499
500 arg_list -- command line arguments
501 """
502 args_without_options = []
503 for i, arg in enumerate(arg_list):
504 prev_arg = arg_list[i - 1] if i > 0 else ""
505 if not is_option_expecting_value(prev_arg) and (
506 not arg.startswith("-") or arg == "-" or _is_negative_num(arg)
507 ):
508 args_without_options.append(arg)
509 return args_without_options
510
511
512 def wait_to_timeout(wait: Union[bool, str, None]) -> int:
513 if wait is False:
514 return -1
515 if wait is None:
516 return 0
517 timeout = timeout_to_seconds(wait)
518 if timeout is None:
519 raise CmdLineInputError(f"'{wait}' is not a valid interval value")
520 return timeout
521
522
523 class InputModifiers:
524 def __init__(self, options: Mapping[str, ModifierValueType]):
525 self._defined_options = set(options.keys())
526 self._options = dict(options)
527 self._options.update(
528 {
529 # boolean values
530 "--all": "--all" in options,
531 "--agent-validation": "--agent-validation" in options,
532 # TODO remove
533 # used only in acl commands and it is deprecated there
534 "--autodelete": "--autodelete" in options,
535 "--brief": "--brief" in options,
536 "--config": "--config" in options,
537 "--corosync": "--corosync" in options,
538 "--debug": "--debug" in options,
539 "--defaults": "--defaults" in options,
540 "--disabled": "--disabled" in options,
541 "--enable": "--enable" in options,
542 "--expired": "--expired" in options,
543 "--force": "--force" in options,
544 "--full": "--full" in options,
545 "--quiet": "--quiet" in options,
546 FUTURE_OPTION: FUTURE_OPTION in options,
547 # TODO remove
548 # used only in deprecated 'pcs resource|stonith show'
549 "--groups": "--groups" in options,
550 "--hide-inactive": "--hide-inactive" in options,
551 "--local": "--local" in options,
552 "--monitor": "--monitor" in options,
553 "--no-default-ops": "--no-default-ops" in options,
554 "--nodesc": "--nodesc" in options,
555 "--no-expire-check": "--no-expire-check" in options,
556 "--no-cluster-uuid": "--no-cluster-uuid" in options,
557 "--no-keys-sync": "--no-keys-sync" in options,
558 "--no-stop": "--no-stop" in options,
559 "--no-strict": "--no-strict" in options,
560 "--no-watchdog-validation": (
561 "--no-watchdog-validation" in options
562 ),
563 "--off": "--off" in options,
564 "--overwrite": "--overwrite" in options,
565 "--pacemaker": "--pacemaker" in options,
566 "--promoted": "--promoted" in options,
567 "--safe": "--safe" in options,
568 "--simulate": "--simulate" in options,
569 "--skip-offline": "--skip-offline" in options,
570 "--start": "--start" in options,
571 "--strict": "--strict" in options,
572 "--yes": "--yes" in options,
573 # string values
574 "--after": options.get("--after", None),
575 "--before": options.get("--before", None),
576 "--booth-conf": options.get("--booth-conf", None),
577 "--booth-key": options.get("--booth-key", None),
578 "--corosync_conf": options.get("--corosync_conf", None),
579 "--from": options.get("--from", None),
580 # TODO remove
581 # used in resource create and stonith create, deprecated in both
582 "--group": options.get("--group", None),
583 "--name": options.get("--name", None),
584 "--node": options.get("--node", None),
585 OUTPUT_FORMAT_OPTION: options.get(
586 OUTPUT_FORMAT_OPTION, OUTPUT_FORMAT_VALUE_TEXT
587 ),
588 "--request-timeout": options.get("--request-timeout", None),
589 "--to": options.get("--to", None),
590 "--token": options.get("--token", None),
591 "--wait": options.get("--wait", False),
592 "-f": options.get("-f", None),
593 "-p": options.get("-p", None),
594 "-u": options.get("-u", None),
595 }
596 )
597
598 def get_subset(
599 self, *options: str, **custom_options: ModifierValueType
600 ) -> "InputModifiers":
601 opt_dict = {
602 opt: self.get(opt) for opt in options if self.is_specified(opt)
603 }
604 opt_dict.update(custom_options)
605 return InputModifiers(opt_dict)
606
607 def ensure_only_supported(
608 self,
609 *supported_options: str,
610 hint_syntax_changed: Optional[str] = None,
611 output_format_supported: bool = False,
612 ) -> None:
613 # --debug is supported in all commands
614 supported_options_set = set(supported_options) | {"--debug"}
615 if output_format_supported:
616 supported_options_set.add(OUTPUT_FORMAT_OPTION)
617 unsupported_options = self._defined_options - supported_options_set
618 if unsupported_options:
619 pluralize = partial(format_plural, unsupported_options)
620 raise CmdLineInputError(
621 "Specified {option} {option_list} {_is} not supported in this "
622 "command".format(
623 option=pluralize("option"),
624 option_list=format_list(sorted(unsupported_options)),
625 _is=pluralize("is"),
626 ),
627 hint=(
628 "Syntax has changed from previous version. {}".format(
629 SEE_MAN_CHANGES.format(hint_syntax_changed)
630 )
631 if hint_syntax_changed
632 else None
633 ),
634 )
635
636 def ensure_not_mutually_exclusive(self, *mutually_exclusive: str) -> None:
637 """
638 Raise CmdLineInputError if several exclusive options were specified
639
640 mutually_exclusive -- mutually exclusive options
641 """
642 options_to_report = self._defined_options & set(mutually_exclusive)
643 if len(options_to_report) > 1:
644 raise CmdLineInputError(
645 "Only one of {} can be used".format(
646 format_list(sorted(options_to_report))
647 )
648 )
649
650 def ensure_not_incompatible(
651 self, checked: str, incompatible: StringCollection
652 ) -> None:
653 """
654 Raise CmdLineInputError if both the checked and an incompatible option
655 were specified
656
657 checked -- option incompatible with any of incompatible options
658 incompatible -- set of options incompatible with checked
659 """
660 if checked not in self._defined_options:
661 return
662 disallowed = self._defined_options & set(incompatible)
663 if disallowed:
664 raise CmdLineInputError(
665 "'{}' cannot be used with {}".format(
666 checked, format_list(sorted(disallowed))
667 )
668 )
669
670 def ensure_dependency_satisfied(
671 self, main_option: str, dependent_options: StringCollection
672 ) -> None:
673 """
674 Raise CmdLineInputError if any of dependent_options is present and
675 main_option is not present.
676
677 main_option -- option on which dependent_options depend
678 dependent_options -- none of these options can be specified if
679 main_option is not specified
680 """
681 if main_option in self._defined_options:
682 return
683 disallowed = self._defined_options & set(dependent_options)
684 if disallowed:
685 raise CmdLineInputError(
686 "{} cannot be used without '{}'".format(
687 format_list(sorted(disallowed)), main_option
688 )
689 )
690
691 def is_specified(self, option: str) -> bool:
692 return option in self._defined_options
693
694 def is_specified_any(self, option_list: StringIterable) -> bool:
695 return any(self.is_specified(option) for option in option_list)
696
697 def get(
698 self, option: str, default: ModifierValueType = None
699 ) -> ModifierValueType:
700 if option in self._defined_options:
|
(2) Event copy_paste_error: |
"_options" in "self._options[option]" looks like a copy-paste error. |
|
(3) Event remediation: |
Should it say "_defined_options" instead? |
| Also see events: |
[original] |
701 return self._options[option]
702 if default is not None:
703 return default
704 if option in self._options:
705 return self._options[option]
706 raise AssertionError(f"Non existing default value for '{option}'")
707
708 def get_output_format(
709 self,
710 supported_formats: StringCollection = OUTPUT_FORMAT_VALUES,
711 ) -> str:
712 output_format = self.get(OUTPUT_FORMAT_OPTION)
713 if output_format in supported_formats:
714 return str(output_format)
715 raise CmdLineInputError(
716 (
717 "Unknown value '{value}' for '{option}' option. Supported "
718 "{value_pl} {is_pl}: {supported}"
719 ).format(
720 value=output_format,
721 option=OUTPUT_FORMAT_OPTION,
722 value_pl=format_plural(supported_formats, "value"),
723 is_pl=format_plural(supported_formats, "is"),
724 supported=format_list(list(supported_formats)),
725 )
726 )
727
728
729 def get_rule_str(argv: Argv) -> Optional[str]:
730 if argv:
731 if len(argv) > 1:
732 # deprecated after 0.11.7
733 deprecation_warning(
734 "Specifying a rule as multiple arguments is deprecated and "
735 "might be removed in a future release, specify the rule as "
736 "a single string instead"
737 )
738 return " ".join(argv)
739 return argv[0]
740 return None
741