1 from collections import Counter
2 from functools import partial
3 from typing import (
4 Final,
5 Mapping,
6 Optional,
7 Union,
8 )
9
10 from pcs.cli.common.errors import (
11 SEE_MAN_CHANGES,
12 CmdLineInputError,
13 )
14 from pcs.common.const import INFINITY
15 from pcs.common.str_tools import (
16 format_list,
17 format_list_custom_last_separator,
18 format_plural,
19 )
20 from pcs.common.tools import timeout_to_seconds
21 from pcs.common.types import (
22 StringCollection,
23 StringIterable,
24 StringSequence,
25 )
26
27 # sys.argv always returns a list, we don't need StringSequence in here
28 Argv = list[str]
29 ModifierValueType = Union[None, bool, str]
30
31 _FUTURE_OPTION_STR: Final = "future"
32 FUTURE_OPTION: Final = f"--{_FUTURE_OPTION_STR}"
33 _OUTPUT_FORMAT_OPTION_STR: Final = "output-format"
34 OUTPUT_FORMAT_OPTION: Final = f"--{_OUTPUT_FORMAT_OPTION_STR}"
35 OUTPUT_FORMAT_VALUE_CMD: Final = "cmd"
36 OUTPUT_FORMAT_VALUE_JSON: Final = "json"
37 OUTPUT_FORMAT_VALUE_TEXT: Final = "text"
38 OUTPUT_FORMAT_VALUES: Final = frozenset(
39 (
40 OUTPUT_FORMAT_VALUE_CMD,
41 OUTPUT_FORMAT_VALUE_JSON,
42 OUTPUT_FORMAT_VALUE_TEXT,
43 )
44 )
45
46 ARG_TYPE_DELIMITER: Final = "%"
47
48 # h = help, f = file,
49 # p = password (cluster auth), u = user (cluster auth),
50 PCS_SHORT_OPTIONS: Final = "hf:p:u:"
51 PCS_LONG_OPTIONS: Final = [
52 "debug",
53 "version",
54 "help",
55 "fullhelp",
56 "force",
57 "skip-offline",
58 "autodelete",
59 "simulate",
60 "all",
61 "full",
62 "local",
63 "wait",
64 "config",
65 "start",
66 "enable",
67 "disabled",
68 "off",
69 "request-timeout=",
70 "brief",
71 _FUTURE_OPTION_STR,
72 # resource (safe-)disable
73 "safe",
74 "no-strict",
75 # resource cleanup | refresh
76 "strict",
77 "pacemaker",
78 "corosync",
79 "no-default-ops",
80 "defaults",
81 "nodesc",
82 "master", # deprecated, replaced by --promoted
83 "promoted",
84 "name=",
85 "group=",
86 "node=",
87 "from=",
88 "to=",
89 "after=",
90 "before=",
91 "corosync_conf=",
92 "booth-conf=",
93 "booth-key=",
94 "no-watchdog-validation",
95 # pcs cluster setup
96 "no-cluster-uuid",
97 "no-keys-sync",
98 # in pcs status - do not display resource status on inactive node
99 "hide-inactive",
100 # pcs resource (un)manage - enable or disable monitor operations
101 "monitor",
102 # TODO remove
103 # used only in deprecated 'pcs resource|stonith show'
104 "groups",
105 # "pcs resource clear --expired" - only clear expired moves and bans
106 "expired",
107 # disable evaluating whether rules are expired
108 "no-expire-check",
109 # allow overwriting existing files, currently meant for / used in CLI only
110 "overwrite",
111 # output format of commands, e.g: json, cmd, text, ...
112 f"{_OUTPUT_FORMAT_OPTION_STR}=",
113 # auth token
114 "token=",
115 # enable agent self validation
116 "agent-validation",
117 # disable text output in query commands
118 "quiet",
119 ]
120
121
122 def split_list(arg_list: Argv, separator: str) -> list[Argv]:
123 """
124 split a list of arguments to several lists using separator as a delimiter
125
126 arg_list -- list of command line arguments to split
127 separator -- delimiter
128 """
129 separator_indexes = [i for i, x in enumerate(arg_list) if x == separator]
130 bounds = zip(
131 [0] + [i + 1 for i in separator_indexes], separator_indexes + [None]
132 )
133 return [arg_list[i:j] for i, j in bounds]
134
135
136 def split_list_by_any_keywords(
137 arg_list: Argv, keyword_label: str
138 ) -> dict[str, Argv]:
139 """
140 split a list of arguments using any argument not containing = as a delimiter
141
142 arg_list -- list of command line arguments to split
143 keyword_label -- description of all keywords
144 """
145 groups: dict[str, Argv] = {}
146 if not arg_list:
147 return groups
148
149 if "=" in arg_list[0]:
150 raise CmdLineInputError(
151 f"Invalid character '=' in {keyword_label} '{arg_list[0]}'"
152 )
153
154 current_keyword = arg_list[0]
155 groups[current_keyword] = []
156 for arg in arg_list[1:]:
157 if "=" in arg:
158 groups[current_keyword].append(arg)
159 else:
160 current_keyword = arg
161 if current_keyword in groups:
162 raise CmdLineInputError(
163 "{} '{}' defined multiple times".format(
164 keyword_label.capitalize(), current_keyword
165 )
166 )
167 groups[current_keyword] = []
168 return groups
169
170
171 def split_option(arg: str, allow_empty_value: bool = True) -> tuple[str, str]:
172 """
173 Get (key, value) from a key=value commandline argument.
174
175 Split the argument by the first = and return resulting parts. Raise
176 CmdLineInputError if the argument cannot be split.
177
178 arg -- commandline argument to split
179 allow_empty_value -- if False, raise CmdLineInputError on empty value
180 """
181 if "=" not in arg:
182 raise CmdLineInputError(f"missing value of '{arg}' option")
183 if arg.startswith("="):
184 raise CmdLineInputError(f"missing key in '{arg}' option")
185 key, value = arg.split("=", 1)
186 if not (value or allow_empty_value):
187 raise CmdLineInputError(f"value of '{key}' option is empty")
188 return key, value
189
190
191 def ensure_unique_args(cmdline_args: Argv) -> None:
192 """
193 Raises in case there are duplicate args
194 """
195 duplicities = [
196 item for item, count in Counter(cmdline_args).items() if count > 1
197 ]
198 if duplicities:
199 argument_pl = format_plural(duplicities, "argument")
200 duplicities_list = format_list(duplicities)
201 raise CmdLineInputError(f"duplicate {argument_pl}: {duplicities_list}")
202
203
204 class KeyValueParser:
205 """
206 Parse and check key=value options
207 """
208
209 def __init__(self, arg_list: Argv, repeatable: StringCollection = ()):
210 """
211 arg_list -- commandline arguments to be parsed
212 repeatable -- keys that are allowed to be specified several times
213 """
214 self._repeatable_keys = repeatable
215 self._key_value_map: dict[str, list[str]] = {}
216 for arg in arg_list:
217 name, value = split_option(arg)
218 if name not in self._key_value_map:
219 self._key_value_map[name] = [value]
220 else:
221 self._key_value_map[name].append(value)
222
223 def check_allowed_keys(self, allowed_keys: StringCollection) -> None:
224 """
225 Check that only allowed keys were specified
226
227 allowed_keys -- list of allowed keys
228 """
229 unknown_options = set(self._key_value_map.keys()) - set(allowed_keys)
230 if unknown_options:
231 raise CmdLineInputError(
232 "Unknown option{s} '{options}'".format(
233 s=("s" if len(unknown_options) > 1 else ""),
234 options="', '".join(sorted(unknown_options)),
235 )
236 )
237
238 def get_unique(self) -> dict[str, str]:
239 """
240 Get all non-repeatable keys and their values; raise if a key has more values
241 """
242 result: dict[str, str] = {}
243 for key, values in self._key_value_map.items():
244 if key in self._repeatable_keys:
245 continue
246 values_uniq = set(values)
247 if len(values_uniq) > 1:
248 raise CmdLineInputError(
249 f"duplicate option '{key}' with different values "
250 f"{format_list_custom_last_separator(values_uniq, ' and ')}"
251 )
252 result[key] = values[0]
253 return result
254
255 def get_repeatable(self) -> dict[str, list[str]]:
256 """
257 Get all repeatable keys and their values
258 """
259 return {
260 key: self._key_value_map[key]
261 for key in self._repeatable_keys
262 if key in self._key_value_map
263 }
264
265
266 class ArgsByKeywords:
267 def __init__(self, groups: Mapping[str, list[Argv]]):
268 self._groups = groups
269 self._flat_cache: dict[str, Argv] = {}
270
271 def allow_repetition_only_for(self, keyword_set: StringCollection) -> None:
272 """
273 Raise CmdLineInputError if a keyword has been repeated when not allowed
274
275 keyword_set -- repetition is allowed for these keywords
276 """
277 for keyword, arg_groups in self._groups.items():
278 if len(arg_groups) > 1 and keyword not in keyword_set:
279 raise CmdLineInputError(
280 f"'{keyword}' cannot be used more than once"
281 )
282
283 def ensure_unique_keywords(self) -> None:
284 """
285 Raise CmdLineInputError if any keyword has been repeated
286 """
287 return self.allow_repetition_only_for(set())
288
289 def is_empty(self) -> bool:
290 """
291 Check if any args have been specified
292 """
293 return not self._groups
294
295 def has_keyword(self, keyword: str) -> bool:
296 """
297 Check if a keyword has been specified
298
299 keyword -- a keyword to check
300 """
301 return keyword in self._groups
302
303 def has_empty_keyword(self, keyword: str) -> bool:
304 """
305 Check if a keyword has been specified without any following args
306
307 keyword -- a keyword to check
308 """
309 return self.has_keyword(keyword) and not self.get_args_flat(keyword)
310
311 def get_args_flat(self, keyword: str) -> Argv:
312 """
313 Get arguments of a keyword in one sequence
314 """
315 if keyword in self._groups:
316 if keyword not in self._flat_cache:
317 self._flat_cache[keyword] = [
318 arg
319 for one_group in self._groups[keyword]
320 for arg in one_group
321 ]
322 return self._flat_cache[keyword]
323 return []
324
325 def get_args_groups(self, keyword: str) -> list[Argv]:
326 """
327 Get arguments of a keyword, one group for each keyword occurrence
328 """
329 if keyword in self._groups:
330 return self._groups[keyword]
331 return []
332
333
334 def group_by_keywords(
335 arg_list: Argv,
336 keyword_set: StringCollection,
337 implicit_first_keyword: Optional[str] = None,
338 ) -> ArgsByKeywords:
339 """
340 Separate argv into groups delimited by specified keywords
341
342 arg_list -- commandline arguments containing keywords
343 keyword_set -- all expected keywords
344 implicit_first_keyword -- key for capturing args before the first keyword
345 """
346 args_by_keywords: dict[str, list[Argv]] = {}
347
348 def new_keyword(keyword: str) -> None:
349 if keyword not in args_by_keywords:
350 args_by_keywords[keyword] = []
351 args_by_keywords[keyword].append([])
352
353 if arg_list:
354 if arg_list[0] not in keyword_set:
355 if not implicit_first_keyword:
356 raise CmdLineInputError()
357 current_keyword = implicit_first_keyword
358 new_keyword(current_keyword)
359
360 for arg in arg_list:
361 if arg in keyword_set:
362 current_keyword = arg
363 new_keyword(current_keyword)
364 else:
365 args_by_keywords[current_keyword][-1].append(arg)
366
367 return ArgsByKeywords(args_by_keywords)
368
369
370 def parse_typed_arg(
371 arg: str, allowed_types: StringSequence, default_type: str
372 ) -> tuple[str, str]:
373 """
374 Get (type, value) from a typed commandline argument.
375
376 Split the argument by the type separator and return the type and the value.
377 Raise CmdLineInputError in the argument format or type is not valid.
378 string arg -- commandline argument
379 Iterable allowed_types -- list of allowed argument types
380 string default_type -- type to return if the argument doesn't specify a type
381 """
382 if ARG_TYPE_DELIMITER not in arg:
383 return default_type, arg
384 arg_type, arg_value = arg.split(ARG_TYPE_DELIMITER, 1)
385 if not arg_type:
386 return default_type, arg_value
387 if arg_type not in allowed_types:
388 raise CmdLineInputError(
389 (
390 "'{arg_type}' is not an allowed type for '{arg_full}', use "
391 "{hint}"
392 ).format(
393 arg_type=arg_type,
394 arg_full=arg,
395 hint=", ".join(sorted(allowed_types)),
396 )
397 )
398 return arg_type, arg_value
399
400
401 def _is_num(arg: str) -> bool:
402 if arg.lower() == INFINITY.lower():
403 return True
404 try:
405 int(arg)
406 return True
407 except ValueError:
408 return False
409
410
411 def _is_float(arg: str) -> bool:
412 try:
413 float(arg)
414 return True
415 except ValueError:
416 return False
417
418
419 def _is_negative_num(arg: str) -> bool:
420 return arg.startswith("-") and (_is_num(arg[1:]) or _is_float(arg))
421
422
423 def is_short_option_expecting_value(arg: str) -> bool:
424 return len(arg) == 2 and arg[0] == "-" and f"{arg[1]}:" in PCS_SHORT_OPTIONS
425
426
427 def is_long_option_expecting_value(arg: str) -> bool:
428 return (
429 len(arg) > 2 and arg[0:2] == "--" and f"{arg[2:]}=" in PCS_LONG_OPTIONS
430 )
431
432
433 def is_option_expecting_value(arg: str) -> bool:
434 return is_short_option_expecting_value(
435 arg
436 ) or is_long_option_expecting_value(arg)
437
438
439 # DEPRECATED
440 # TODO remove
441 # This function is called only by deprecated code for parsing argv containing
442 # negative numbers without -- prepending them.
443 def filter_out_non_option_negative_numbers(arg_list: Argv) -> tuple[Argv, Argv]:
444 """
445 Return arg_list without non-option negative numbers.
446 Negative numbers following the option expecting value are kept.
447
448 There is the problematic legacy:
449 Argument "--" has special meaning: it can be used to signal that no more
450 options will follow. This would solve the problem with negative numbers in
451 a standard way: there would be no special approach to negative numbers,
452 everything would be left in the hands of users.
453
454 We cannot use "--" as it would be a backward incompatible change:
455 * "pcs ... -infinity" would not work any more, users would have to switch
456 to "pcs ... -- ... -infinity"
457 * previously, position of some --options mattered, for example
458 "--clone <clone options>", this syntax would not be possible with the "--"
459 in place
460
461 Currently used --options, which may be problematic when switching to "--":
462 * --group <group name>, --before | --after <resource id>
463 * pcs resource | stonith create, pcs resource group add, pcs tag update
464 * They have a single argument, so they would work even with --. But the
465 command may look weird:
466 pcs resource create --group G --after R2 -- R3 ocf:pacemaker:Dummy
467 vs. current command
468 pcs resource create R3 ocf:pacemaker:Dummy --group G --after R2
469
470 list arg_list contains command line arguments
471 """
472 args_without_negative_nums = []
473 args_filtered_out = []
474 for i, arg in enumerate(arg_list):
475 prev_arg = arg_list[i - 1] if i > 0 else ""
476 if not _is_negative_num(arg) or is_option_expecting_value(prev_arg):
477 args_without_negative_nums.append(arg)
478 else:
479 args_filtered_out.append(arg)
480
481 return args_without_negative_nums, args_filtered_out
482
483
484 # DEPRECATED
485 # TODO remove
486 # This function is called only by deprecated code for parsing argv containing
487 # negative numbers without -- prepending them.
488 def filter_out_options(arg_list: Argv) -> Argv:
489 """
490 Return arg_list without options and negative numbers
491
492 See a comment in filter_out_non_option_negative_numbers.
493
494 arg_list -- command line arguments
495 """
496 args_without_options = []
497 for i, arg in enumerate(arg_list):
498 prev_arg = arg_list[i - 1] if i > 0 else ""
499 if not is_option_expecting_value(prev_arg) and (
500 not arg.startswith("-") or arg == "-" or _is_negative_num(arg)
501 ):
502 args_without_options.append(arg)
503 return args_without_options
504
505
506 def wait_to_timeout(wait: Union[bool, str, None]) -> int:
507 if wait is False:
508 return -1
509 if wait is None:
510 return 0
511 timeout = timeout_to_seconds(wait)
512 if timeout is None:
513 raise CmdLineInputError(f"'{wait}' is not a valid interval value")
514 return timeout
515
516
517 class InputModifiers:
518 def __init__(self, options: Mapping[str, ModifierValueType]):
519 self._defined_options = set(options.keys())
520 self._options = dict(options)
521 self._options.update(
522 {
523 # boolean values
524 "--all": "--all" in options,
525 "--agent-validation": "--agent-validation" in options,
526 "--autodelete": "--autodelete" in options,
527 "--brief": "--brief" in options,
528 "--config": "--config" in options,
529 "--corosync": "--corosync" in options,
530 "--debug": "--debug" in options,
531 "--defaults": "--defaults" in options,
532 "--disabled": "--disabled" in options,
533 "--enable": "--enable" in options,
534 "--expired": "--expired" in options,
535 "--force": "--force" in options,
536 "--full": "--full" in options,
537 "--quiet": "--quiet" in options,
538 FUTURE_OPTION: FUTURE_OPTION in options,
539 # TODO remove
540 # used only in deprecated 'pcs resource|stonith show'
541 "--groups": "--groups" in options,
542 "--hide-inactive": "--hide-inactive" in options,
543 "--local": "--local" in options,
544 "--monitor": "--monitor" in options,
545 "--no-default-ops": "--no-default-ops" in options,
546 "--nodesc": "--nodesc" in options,
547 "--no-expire-check": "--no-expire-check" in options,
548 "--no-cluster-uuid": "--no-cluster-uuid" in options,
549 "--no-keys-sync": "--no-keys-sync" in options,
550 "--no-strict": "--no-strict" in options,
551 "--no-watchdog-validation": (
552 "--no-watchdog-validation" in options
553 ),
554 "--off": "--off" in options,
555 "--overwrite": "--overwrite" in options,
556 "--pacemaker": "--pacemaker" in options,
557 "--promoted": "--promoted" in options,
558 "--safe": "--safe" in options,
559 "--simulate": "--simulate" in options,
560 "--skip-offline": "--skip-offline" in options,
561 "--start": "--start" in options,
562 "--strict": "--strict" in options,
563 # string values
564 "--after": options.get("--after", None),
565 "--before": options.get("--before", None),
566 "--booth-conf": options.get("--booth-conf", None),
567 "--booth-key": options.get("--booth-key", None),
568 "--corosync_conf": options.get("--corosync_conf", None),
569 "--from": options.get("--from", None),
570 # TODO remove
571 # used in resource create and stonith create, deprecated in both
572 "--group": options.get("--group", None),
573 "--name": options.get("--name", None),
574 "--node": options.get("--node", None),
575 OUTPUT_FORMAT_OPTION: options.get(
576 OUTPUT_FORMAT_OPTION, OUTPUT_FORMAT_VALUE_TEXT
577 ),
578 "--request-timeout": options.get("--request-timeout", None),
579 "--to": options.get("--to", None),
580 "--token": options.get("--token", None),
581 "--wait": options.get("--wait", False),
582 "-f": options.get("-f", None),
583 "-p": options.get("-p", None),
584 "-u": options.get("-u", None),
585 }
586 )
587
588 def get_subset(
589 self, *options: str, **custom_options: ModifierValueType
590 ) -> "InputModifiers":
591 opt_dict = {
592 opt: self.get(opt) for opt in options if self.is_specified(opt)
593 }
594 opt_dict.update(custom_options)
595 return InputModifiers(opt_dict)
596
597 def ensure_only_supported(
598 self,
599 *supported_options: str,
600 hint_syntax_changed: bool = False,
601 output_format_supported: bool = False,
602 ) -> None:
603 # --debug is supported in all commands
604 supported_options_set = set(supported_options) | {"--debug"}
605 if output_format_supported:
606 supported_options_set.add(OUTPUT_FORMAT_OPTION)
607 unsupported_options = self._defined_options - supported_options_set
608 if unsupported_options:
609 pluralize = partial(format_plural, unsupported_options)
610 raise CmdLineInputError(
611 "Specified {option} {option_list} {_is} not supported in this "
612 "command".format(
613 option=pluralize("option"),
614 option_list=format_list(sorted(unsupported_options)),
615 _is=pluralize("is"),
616 ),
617 hint=(
618 "Syntax has changed from previous version. {}".format(
619 SEE_MAN_CHANGES.format("0.11")
620 )
621 if hint_syntax_changed
622 else None
623 ),
624 )
625
626 def ensure_not_mutually_exclusive(self, *mutually_exclusive: str) -> None:
627 """
628 Raise CmdLineInputError if several exclusive options were specified
629
630 mutually_exclusive -- mutually exclusive options
631 """
632 options_to_report = self._defined_options & set(mutually_exclusive)
633 if len(options_to_report) > 1:
634 raise CmdLineInputError(
635 "Only one of {} can be used".format(
636 format_list(sorted(options_to_report))
637 )
638 )
639
640 def ensure_not_incompatible(
641 self, checked: str, incompatible: StringCollection
642 ) -> None:
643 """
644 Raise CmdLineInputError if both the checked and an incompatible option
645 were specified
646
647 checked -- option incompatible with any of incompatible options
648 incompatible -- set of options incompatible with checked
649 """
650 if checked not in self._defined_options:
651 return
652 disallowed = self._defined_options & set(incompatible)
653 if disallowed:
654 raise CmdLineInputError(
655 "'{}' cannot be used with {}".format(
656 checked, format_list(sorted(disallowed))
657 )
658 )
659
660 def ensure_dependency_satisfied(
661 self, main_option: str, dependent_options: StringCollection
662 ) -> None:
663 """
664 Raise CmdLineInputError if any of dependent_options is present and
665 main_option is not present.
666
667 main_option -- option on which dependent_options depend
668 dependent_options -- none of these options can be specified if
669 main_option is not specified
670 """
671 if main_option in self._defined_options:
672 return
673 disallowed = self._defined_options & set(dependent_options)
674 if disallowed:
675 raise CmdLineInputError(
676 "{} cannot be used without '{}'".format(
677 format_list(sorted(disallowed)), main_option
678 )
679 )
680
681 def is_specified(self, option: str) -> bool:
682 return option in self._defined_options
683
684 def is_specified_any(self, option_list: StringIterable) -> bool:
685 return any(self.is_specified(option) for option in option_list)
686
687 def get(
688 self, option: str, default: ModifierValueType = None
689 ) -> ModifierValueType:
690 if option in self._defined_options:
|
(2) Event copy_paste_error: |
"_options" in "self._options[option]" looks like a copy-paste error. |
|
(3) Event remediation: |
Should it say "_defined_options" instead? |
| Also see events: |
[original] |
691 return self._options[option]
692 if default is not None:
693 return default
694 if option in self._options:
695 return self._options[option]
696 raise AssertionError(f"Non existing default value for '{option}'")
697
698 def get_output_format(
699 self,
700 supported_formats: StringCollection = OUTPUT_FORMAT_VALUES,
701 ) -> str:
702 output_format = self.get(OUTPUT_FORMAT_OPTION)
703 if output_format in supported_formats:
704 return str(output_format)
705 raise CmdLineInputError(
706 (
707 "Unknown value '{value}' for '{option}' option. Supported "
708 "{value_pl} {is_pl}: {supported}"
709 ).format(
710 value=output_format,
711 option=OUTPUT_FORMAT_OPTION,
712 value_pl=format_plural(supported_formats, "value"),
713 is_pl=format_plural(supported_formats, "is"),
714 supported=format_list(list(supported_formats)),
715 )
716 )
717