1 from collections import Counter
2 from functools import partial
3 from typing import (
4 Final,
5 Mapping,
6 Optional,
7 Union,
8 )
9
10 from pcs.cli.common.errors import (
11 SEE_MAN_CHANGES,
12 CmdLineInputError,
13 )
14 from pcs.cli.reports.output import deprecation_warning
15 from pcs.common.const import INFINITY
16 from pcs.common.str_tools import (
17 format_list,
18 format_list_custom_last_separator,
19 format_plural,
20 )
21 from pcs.common.tools import timeout_to_seconds
22 from pcs.common.types import (
23 StringCollection,
24 StringIterable,
25 StringSequence,
26 )
27
28 # sys.argv always returns a list, we don't need StringSequence in here
29 Argv = list[str]
30 ModifierValueType = Union[None, bool, str]
31
32 _FUTURE_OPTION_STR: Final = "future"
33 FUTURE_OPTION: Final = f"--{_FUTURE_OPTION_STR}"
34 _OUTPUT_FORMAT_OPTION_STR: Final = "output-format"
35 OUTPUT_FORMAT_OPTION: Final = f"--{_OUTPUT_FORMAT_OPTION_STR}"
36 OUTPUT_FORMAT_VALUE_CMD: Final = "cmd"
37 OUTPUT_FORMAT_VALUE_JSON: Final = "json"
38 OUTPUT_FORMAT_VALUE_TEXT: Final = "text"
39 OUTPUT_FORMAT_VALUES: Final = frozenset(
40 (
41 OUTPUT_FORMAT_VALUE_CMD,
42 OUTPUT_FORMAT_VALUE_JSON,
43 OUTPUT_FORMAT_VALUE_TEXT,
44 )
45 )
46
47 MODIFIER_OPTIONS_BOOL: Final[frozenset[str]] = frozenset(
48 (
49 "--all",
50 "--agent-validation",
51 # TODO remove
52 # used only in acl commands and it is deprecated there
53 "--autodelete",
54 "--brief",
55 "--config",
56 "--corosync",
57 "--debug",
58 "--defaults",
59 "--disabled",
60 "--enable",
61 "--expired",
62 "--force",
63 "--full",
64 "--quiet",
65 FUTURE_OPTION,
66 # TODO remove
67 # used only in deprecated 'pcs resource|stonith show'
68 "--groups",
69 "--hide-inactive",
70 "--local",
71 "--monitor",
72 "--no-default-ops",
73 "--nodesc",
74 "--no-expire-check",
75 "--no-cluster-uuid",
76 "--no-keys-sync",
77 "--no-stop",
78 "--no-strict",
79 "--no-watchdog-validation",
80 "--off",
81 "--overwrite",
82 "--pacemaker",
83 "--promoted",
84 "--safe",
85 "--show-secrets",
86 "--simulate",
87 "--skip-offline",
88 "--start",
89 "--strict",
90 "--yes",
91 )
92 )
93
94 MODIFIER_OPTIONS_VAL: Final[frozenset[str]] = frozenset(
95 (
96 "--after",
97 "--before",
98 "--booth-conf",
99 "--booth-key",
100 "--corosync_conf",
101 "--from",
102 # TODO remove
103 # used in resource create and stonith create, deprecated in both
104 "--group",
105 "--name",
106 "--node",
107 "--request-timeout",
108 "--to",
109 "--token",
110 "-f",
111 "-p",
112 "-u",
113 )
114 )
115
116 ARG_TYPE_DELIMITER: Final = "%"
117
118 # h = help, f = file,
119 # p = password (cluster auth), u = user (cluster auth),
120 PCS_SHORT_OPTIONS: Final = "hf:p:u:"
121 PCS_LONG_OPTIONS: Final = [
122 "debug",
123 "version",
124 "help",
125 "fullhelp",
126 "force",
127 "skip-offline",
128 "autodelete",
129 "simulate",
130 "all",
131 "full",
132 "local",
133 "wait",
134 "config",
135 "start",
136 "enable",
137 "disabled",
138 "off",
139 "request-timeout=",
140 "brief",
141 _FUTURE_OPTION_STR,
142 # resource (safe-)disable
143 "safe",
144 "no-strict",
145 # resource cleanup | refresh
146 "strict",
147 "pacemaker",
148 "corosync",
149 "no-default-ops",
150 "defaults",
151 "nodesc",
152 "promoted",
153 "name=",
154 "group=",
155 "node=",
156 "from=",
157 "to=",
158 "after=",
159 "before=",
160 "corosync_conf=",
161 "booth-conf=",
162 "booth-key=",
163 # do not stop resources in resource delete command
164 "no-stop",
165 "no-watchdog-validation",
166 # pcs cluster setup
167 "no-cluster-uuid",
168 "no-keys-sync",
169 # in pcs status - do not display resource status on inactive node
170 "hide-inactive",
171 # pcs resource (un)manage - enable or disable monitor operations
172 "monitor",
173 # TODO remove
174 # used only in deprecated 'pcs resource|stonith show'
175 "groups",
176 # "pcs resource clear --expired" - only clear expired moves and bans
177 "expired",
178 # disable evaluating whether rules are expired
179 "no-expire-check",
180 # allow overwriting existing files, currently meant for / used in CLI only
181 "overwrite",
182 # output format of commands, e.g: json, cmd, text, ...
183 f"{_OUTPUT_FORMAT_OPTION_STR}=",
184 # auth token
185 "token=",
186 # enable agent self validation
187 "agent-validation",
188 # disable text output in query commands
189 "quiet",
190 # proceed with dangerous actions, meant for / used in CLI only
191 "yes",
192 # retrieve and display cibsecret values / used in CLI only
193 "show-secrets",
194 ]
195
196
197 def split_list(arg_list: Argv, separator: str) -> list[Argv]:
198 """
199 split a list of arguments to several lists using separator as a delimiter
200
201 arg_list -- list of command line arguments to split
202 separator -- delimiter
203 """
204 separator_indexes = [i for i, x in enumerate(arg_list) if x == separator]
205 bounds = zip(
206 [0] + [i + 1 for i in separator_indexes],
207 separator_indexes + [None],
208 strict=False,
209 )
210 return [arg_list[i:j] for i, j in bounds]
211
212
213 def split_list_by_any_keywords(
214 arg_list: Argv, keyword_label: str
215 ) -> dict[str, Argv]:
216 """
217 split a list of arguments using any argument not containing = as a delimiter
218
219 arg_list -- list of command line arguments to split
220 keyword_label -- description of all keywords
221 """
222 groups: dict[str, Argv] = {}
223 if not arg_list:
224 return groups
225
226 if "=" in arg_list[0]:
227 raise CmdLineInputError(
228 f"Invalid character '=' in {keyword_label} '{arg_list[0]}'"
229 )
230
231 current_keyword = arg_list[0]
232 groups[current_keyword] = []
233 for arg in arg_list[1:]:
234 if "=" in arg:
235 groups[current_keyword].append(arg)
236 else:
237 current_keyword = arg
238 if current_keyword in groups:
239 raise CmdLineInputError(
240 "{} '{}' defined multiple times".format(
241 keyword_label.capitalize(), current_keyword
242 )
243 )
244 groups[current_keyword] = []
245 return groups
246
247
248 def split_option(arg: str, allow_empty_value: bool = True) -> tuple[str, str]:
249 """
250 Get (key, value) from a key=value commandline argument.
251
252 Split the argument by the first = and return resulting parts. Raise
253 CmdLineInputError if the argument cannot be split.
254
255 arg -- commandline argument to split
256 allow_empty_value -- if False, raise CmdLineInputError on empty value
257 """
258 if "=" not in arg:
259 raise CmdLineInputError(f"missing value of '{arg}' option")
260 if arg.startswith("="):
261 raise CmdLineInputError(f"missing key in '{arg}' option")
262 key, value = arg.split("=", 1)
263 if not (value or allow_empty_value):
264 raise CmdLineInputError(f"value of '{key}' option is empty")
265 return key, value
266
267
268 def ensure_unique_args(cmdline_args: Argv) -> None:
269 """
270 Raises in case there are duplicate args
271 """
272 duplicities = [
273 item for item, count in Counter(cmdline_args).items() if count > 1
274 ]
275 if duplicities:
276 argument_pl = format_plural(duplicities, "argument")
277 duplicities_list = format_list(duplicities)
278 raise CmdLineInputError(f"duplicate {argument_pl}: {duplicities_list}")
279
280
281 class KeyValueParser:
282 """
283 Parse and check key=value options
284 """
285
286 def __init__(self, arg_list: Argv, repeatable: StringCollection = ()):
287 """
288 arg_list -- commandline arguments to be parsed
289 repeatable -- keys that are allowed to be specified several times
290 """
291 self._repeatable_keys = repeatable
292 self._key_value_map: dict[str, list[str]] = {}
293 for arg in arg_list:
294 name, value = split_option(arg)
295 if name not in self._key_value_map:
296 self._key_value_map[name] = [value]
297 else:
298 self._key_value_map[name].append(value)
299
300 def check_allowed_keys(self, allowed_keys: StringCollection) -> None:
301 """
302 Check that only allowed keys were specified
303
304 allowed_keys -- list of allowed keys
305 """
306 unknown_options = set(self._key_value_map.keys()) - set(allowed_keys)
307 if unknown_options:
308 raise CmdLineInputError(
309 "Unknown option{s} '{options}'".format(
310 s=("s" if len(unknown_options) > 1 else ""),
311 options="', '".join(sorted(unknown_options)),
312 )
313 )
314
315 def get_unique(self) -> dict[str, str]:
316 """
317 Get all non-repeatable keys and their values; raise if a key has more values
318 """
319 result: dict[str, str] = {}
320 for key, values in self._key_value_map.items():
321 if key in self._repeatable_keys:
322 continue
323 values_uniq = set(values)
324 if len(values_uniq) > 1:
325 raise CmdLineInputError(
326 f"duplicate option '{key}' with different values "
327 f"{format_list_custom_last_separator(values_uniq, ' and ')}"
328 )
329 result[key] = values[0]
330 return result
331
332 def get_repeatable(self) -> dict[str, list[str]]:
333 """
334 Get all repeatable keys and their values
335 """
336 return {
337 key: self._key_value_map[key]
338 for key in self._repeatable_keys
339 if key in self._key_value_map
340 }
341
342
343 class ArgsByKeywords:
344 def __init__(self, groups: Mapping[str, list[Argv]]):
345 self._groups = groups
346 self._flat_cache: dict[str, Argv] = {}
347
348 def allow_repetition_only_for(self, keyword_set: StringCollection) -> None:
349 """
350 Raise CmdLineInputError if a keyword has been repeated when not allowed
351
352 keyword_set -- repetition is allowed for these keywords
353 """
354 for keyword, arg_groups in self._groups.items():
355 if len(arg_groups) > 1 and keyword not in keyword_set:
356 raise CmdLineInputError(
357 f"'{keyword}' cannot be used more than once"
358 )
359
360 def ensure_unique_keywords(self) -> None:
361 """
362 Raise CmdLineInputError if any keyword has been repeated
363 """
364 return self.allow_repetition_only_for(set())
365
366 def is_empty(self) -> bool:
367 """
368 Check if any args have been specified
369 """
370 return not self._groups
371
372 def has_keyword(self, keyword: str) -> bool:
373 """
374 Check if a keyword has been specified
375
376 keyword -- a keyword to check
377 """
378 return keyword in self._groups
379
380 def has_empty_keyword(self, keyword: str) -> bool:
381 """
382 Check if a keyword has been specified without any following args
383
384 keyword -- a keyword to check
385 """
386 return self.has_keyword(keyword) and not self.get_args_flat(keyword)
387
388 def get_args_flat(self, keyword: str) -> Argv:
389 """
390 Get arguments of a keyword in one sequence
391 """
392 if keyword in self._groups:
393 if keyword not in self._flat_cache:
394 self._flat_cache[keyword] = [
395 arg
396 for one_group in self._groups[keyword]
397 for arg in one_group
398 ]
399 return self._flat_cache[keyword]
400 return []
401
402 def get_args_groups(self, keyword: str) -> list[Argv]:
403 """
404 Get arguments of a keyword, one group for each keyword occurrence
405 """
406 if keyword in self._groups:
407 return self._groups[keyword]
408 return []
409
410
411 def group_by_keywords(
412 arg_list: Argv,
413 keyword_set: StringCollection,
414 implicit_first_keyword: Optional[str] = None,
415 ) -> ArgsByKeywords:
416 """
417 Separate argv into groups delimited by specified keywords
418
419 arg_list -- commandline arguments containing keywords
420 keyword_set -- all expected keywords
421 implicit_first_keyword -- key for capturing args before the first keyword
422 """
423 args_by_keywords: dict[str, list[Argv]] = {}
424
425 def new_keyword(keyword: str) -> None:
426 if keyword not in args_by_keywords:
427 args_by_keywords[keyword] = []
428 args_by_keywords[keyword].append([])
429
430 if arg_list:
431 if arg_list[0] not in keyword_set:
432 if not implicit_first_keyword:
433 raise CmdLineInputError()
434 current_keyword = implicit_first_keyword
435 new_keyword(current_keyword)
436
437 for arg in arg_list:
438 if arg in keyword_set:
439 current_keyword = arg
440 new_keyword(current_keyword)
441 else:
442 args_by_keywords[current_keyword][-1].append(arg)
443
444 return ArgsByKeywords(args_by_keywords)
445
446
447 def parse_typed_arg(
448 arg: str, allowed_types: StringSequence, default_type: str
449 ) -> tuple[str, str]:
450 """
451 Get (type, value) from a typed commandline argument.
452
453 Split the argument by the type separator and return the type and the value.
454 Raise CmdLineInputError in the argument format or type is not valid.
455 string arg -- commandline argument
456 Iterable allowed_types -- list of allowed argument types
457 string default_type -- type to return if the argument doesn't specify a type
458 """
459 if ARG_TYPE_DELIMITER not in arg:
460 return default_type, arg
461 arg_type, arg_value = arg.split(ARG_TYPE_DELIMITER, 1)
462 if not arg_type:
463 return default_type, arg_value
464 if arg_type not in allowed_types:
465 raise CmdLineInputError(
466 (
467 "'{arg_type}' is not an allowed type for '{arg_full}', use "
468 "{hint}"
469 ).format(
470 arg_type=arg_type,
471 arg_full=arg,
472 hint=", ".join(sorted(allowed_types)),
473 )
474 )
475 return arg_type, arg_value
476
477
478 def _is_num(arg: str) -> bool:
479 if arg.lower() == INFINITY.lower():
480 return True
481 try:
482 int(arg)
483 return True
484 except ValueError:
485 return False
486
487
488 def _is_float(arg: str) -> bool:
489 try:
490 float(arg)
491 return True
492 except ValueError:
493 return False
494
495
496 def _is_negative_num(arg: str) -> bool:
497 return arg.startswith("-") and (_is_num(arg[1:]) or _is_float(arg))
498
499
500 def is_short_option_expecting_value(arg: str) -> bool:
501 return len(arg) == 2 and arg[0] == "-" and f"{arg[1]}:" in PCS_SHORT_OPTIONS
502
503
504 def is_long_option_expecting_value(arg: str) -> bool:
505 return (
506 len(arg) > 2 and arg[0:2] == "--" and f"{arg[2:]}=" in PCS_LONG_OPTIONS
507 )
508
509
510 def is_option_expecting_value(arg: str) -> bool:
511 return is_short_option_expecting_value(
512 arg
513 ) or is_long_option_expecting_value(arg)
514
515
516 # DEPRECATED
517 # TODO remove
518 # This function is called only by deprecated code for parsing argv containing
519 # negative numbers without -- prepending them.
520 def filter_out_non_option_negative_numbers(arg_list: Argv) -> tuple[Argv, Argv]:
521 """
522 Return arg_list without non-option negative numbers.
523 Negative numbers following the option expecting value are kept.
524
525 There is the problematic legacy:
526 Argument "--" has special meaning: it can be used to signal that no more
527 options will follow. This would solve the problem with negative numbers in
528 a standard way: there would be no special approach to negative numbers,
529 everything would be left in the hands of users.
530
531 We cannot use "--" as it would be a backward incompatible change:
532 * "pcs ... -infinity" would not work any more, users would have to switch
533 to "pcs ... -- ... -infinity"
534 * previously, position of some --options mattered, for example
535 "--clone <clone options>", this syntax would not be possible with the "--"
536 in place
537
538 Currently used --options, which may be problematic when switching to "--":
539 * --group <group name>, --before | --after <resource id>
540 * pcs resource | stonith create, pcs resource group add, pcs tag update
541 * They have a single argument, so they would work even with --. But the
542 command may look weird:
543 pcs resource create --group G --after R2 -- R3 ocf:pacemaker:Dummy
544 vs. current command
545 pcs resource create R3 ocf:pacemaker:Dummy --group G --after R2
546
547 list arg_list contains command line arguments
548 """
549 args_without_negative_nums = []
550 args_filtered_out = []
551 for i, arg in enumerate(arg_list):
552 prev_arg = arg_list[i - 1] if i > 0 else ""
553 if not _is_negative_num(arg) or is_option_expecting_value(prev_arg):
554 args_without_negative_nums.append(arg)
555 else:
556 args_filtered_out.append(arg)
557
558 return args_without_negative_nums, args_filtered_out
559
560
561 # DEPRECATED
562 # TODO remove
563 # This function is called only by deprecated code for parsing argv containing
564 # negative numbers without -- prepending them.
565 def filter_out_options(arg_list: Argv) -> Argv:
566 """
567 Return arg_list without options and negative numbers
568
569 See a comment in filter_out_non_option_negative_numbers.
570
571 arg_list -- command line arguments
572 """
573 args_without_options = []
574 for i, arg in enumerate(arg_list):
575 prev_arg = arg_list[i - 1] if i > 0 else ""
576 if not is_option_expecting_value(prev_arg) and (
577 not arg.startswith("-") or arg == "-" or _is_negative_num(arg)
578 ):
579 args_without_options.append(arg)
580 return args_without_options
581
582
583 def wait_to_timeout(wait: Union[bool, str, None]) -> int:
584 if wait is False:
585 return -1
586 if wait is None:
587 return 0
588 timeout = timeout_to_seconds(wait)
589 if timeout is None:
590 raise CmdLineInputError(f"'{wait}' is not a valid interval value")
591 return timeout
592
593
594 class InputModifiers:
595 def __init__(self, options: Mapping[str, ModifierValueType]):
596 self._defined_options = set(options.keys())
597 self._options = dict(options)
598 self._options.update(
599 {opt: opt in options for opt in MODIFIER_OPTIONS_BOOL}
600 )
601 self._options.update(
602 {opt: options.get(opt, None) for opt in MODIFIER_OPTIONS_VAL}
603 )
604 self._options.update(
605 {
606 OUTPUT_FORMAT_OPTION: options.get(
607 OUTPUT_FORMAT_OPTION, OUTPUT_FORMAT_VALUE_TEXT
608 ),
609 "--wait": options.get("--wait", False),
610 }
611 )
612
613 def get_subset(
614 self, *options: str, **custom_options: ModifierValueType
615 ) -> "InputModifiers":
616 opt_dict = {
617 opt: self.get(opt) for opt in options if self.is_specified(opt)
618 }
619 opt_dict.update(custom_options)
620 return InputModifiers(opt_dict)
621
622 def ensure_only_supported(
623 self,
624 *supported_options: str,
625 hint_syntax_changed: Optional[str] = None,
626 output_format_supported: bool = False,
627 ) -> None:
628 # --debug is supported in all commands
629 supported_options_set = set(supported_options) | {"--debug"}
630 if output_format_supported:
631 supported_options_set.add(OUTPUT_FORMAT_OPTION)
632 unsupported_options = self._defined_options - supported_options_set
633 if unsupported_options:
634 pluralize = partial(format_plural, unsupported_options)
635 raise CmdLineInputError(
636 "Specified {option} {option_list} {_is} not supported in this "
637 "command".format(
638 option=pluralize("option"),
639 option_list=format_list(sorted(unsupported_options)),
640 _is=pluralize("is"),
641 ),
642 hint=(
643 "Syntax has changed from previous version. {}".format(
644 SEE_MAN_CHANGES.format(hint_syntax_changed)
645 )
646 if hint_syntax_changed
647 else None
648 ),
649 )
650
651 def ensure_not_mutually_exclusive(self, *mutually_exclusive: str) -> None:
652 """
653 Raise CmdLineInputError if several exclusive options were specified
654
655 mutually_exclusive -- mutually exclusive options
656 """
657 options_to_report = self._defined_options & set(mutually_exclusive)
658 if len(options_to_report) > 1:
659 raise CmdLineInputError(
660 "Only one of {} can be used".format(
661 format_list(sorted(options_to_report))
662 )
663 )
664
665 def ensure_not_incompatible(
666 self, checked: str, incompatible: StringCollection
667 ) -> None:
668 """
669 Raise CmdLineInputError if both the checked and an incompatible option
670 were specified
671
672 checked -- option incompatible with any of incompatible options
673 incompatible -- set of options incompatible with checked
674 """
675 if checked not in self._defined_options:
676 return
677 disallowed = self._defined_options & set(incompatible)
678 if disallowed:
679 raise CmdLineInputError(
680 "'{}' cannot be used with {}".format(
681 checked, format_list(sorted(disallowed))
682 )
683 )
684
685 def ensure_dependency_satisfied(
686 self, main_option: str, dependent_options: StringCollection
687 ) -> None:
688 """
689 Raise CmdLineInputError if any of dependent_options is present and
690 main_option is not present.
691
692 main_option -- option on which dependent_options depend
693 dependent_options -- none of these options can be specified if
694 main_option is not specified
695 """
696 if main_option in self._defined_options:
697 return
698 disallowed = self._defined_options & set(dependent_options)
699 if disallowed:
700 raise CmdLineInputError(
701 "{} cannot be used without '{}'".format(
702 format_list(sorted(disallowed)), main_option
703 )
704 )
705
706 def is_specified(self, option: str) -> bool:
707 return option in self._defined_options
708
709 def is_specified_any(self, option_list: StringIterable) -> bool:
710 return any(self.is_specified(option) for option in option_list)
711
712 def get(
713 self, option: str, default: ModifierValueType = None
714 ) -> ModifierValueType:
715 if option in self._defined_options:
|
CID (unavailable; MK=489eaaa87afbb746a3d63c1bcfb03702) (#1 of 1): Copy-paste error (COPY_PASTE_ERROR): |
|
(2) Event copy_paste_error: |
"_options" in "self._options[option]" looks like a copy-paste error. |
|
(3) Event remediation: |
Should it say "_defined_options" instead? |
| Also see events: |
[original] |
716 return self._options[option]
717 if default is not None:
718 return default
719 if option in self._options:
720 return self._options[option]
721 raise AssertionError(f"Non existing default value for '{option}'")
722
723 def get_output_format(
724 self,
725 supported_formats: StringCollection = OUTPUT_FORMAT_VALUES,
726 ) -> str:
727 output_format = self.get(OUTPUT_FORMAT_OPTION)
728 if output_format in supported_formats:
729 return str(output_format)
730 raise CmdLineInputError(
731 (
732 "Unknown value '{value}' for '{option}' option. Supported "
733 "{value_pl} {is_pl}: {supported}"
734 ).format(
735 value=output_format,
736 option=OUTPUT_FORMAT_OPTION,
737 value_pl=format_plural(supported_formats, "value"),
738 is_pl=format_plural(supported_formats, "is"),
739 supported=format_list(list(supported_formats)),
740 )
741 )
742
743
744 def get_rule_str(argv: Argv) -> Optional[str]:
745 if argv:
746 if len(argv) > 1:
747 # deprecated after 0.11.7
748 deprecation_warning(
749 "Specifying a rule as multiple arguments is deprecated and "
750 "might be removed in a future release, specify the rule as "
751 "a single string instead"
752 )
753 return " ".join(argv)
754 return argv[0]
755 return None
756