1 import re
2 import uuid
3 from dataclasses import astuple, dataclass
4 from typing import Generator, MutableSet, Optional, TypeVar, Union
5
6 from lxml import etree
7 from lxml.etree import _Element
8
9 from pcs.common.types import StringCollection
10 from pcs.common.validate import is_integer
11
12 T = TypeVar("T", bound=type)
13
14 _VERSION_PATTERN = re.compile(
15 r"(?P<major>\d+)\.(?P<minor>\d+)(\.(?P<rev>\d+))?"
16 )
17
18
19 def bin_to_str(binary: bytes) -> str:
20 return "".join(map(chr, binary))
21
22
23 def get_all_subclasses(cls: T) -> MutableSet[T]:
24 subclasses = set(cls.__subclasses__())
25 return subclasses.union(
26 {s for c in subclasses for s in get_all_subclasses(c)}
27 )
28
29
30 def get_unique_uuid(already_used: StringCollection) -> str:
31 is_duplicate = True
32 while is_duplicate:
33 candidate = str(uuid.uuid4())
34 is_duplicate = candidate in already_used
35 return candidate
36
37
38 def format_os_error(e: OSError) -> str:
39 if e.filename:
40 return f"{e.strerror}: '{e.filename}'"
41 if e.strerror:
42 return e.strerror
43 return (f"{e.__class__.__name__} {e}").strip()
44
45
46 def xml_fromstring(xml: str) -> _Element:
47 # If the xml contains encoding declaration such as:
48 # <?xml version="1.0" encoding="UTF-8"?>
49 # we get an exception in python3:
50 # ValueError: Unicode strings with encoding declaration are not supported.
51 # Please use bytes input or XML fragments without declaration.
52 # So we encode the string to bytes.
53 return etree.fromstring(
54 xml.encode("utf-8"),
55 # it raises on a huge xml without the flag huge_tree=True
56 # see https://bugzilla.redhat.com/show_bug.cgi?id=1506864
|
CID (unavailable; MK=4b80d73ed452516eeaf78fba1c4f4be1) (#1 of 1): XML entity processing enabled (SIGMA.xml_entity_enabled): |
|
(1) Event Sigma main event: |
The Python application enables entity expansion by setting the `lxml.etree.XMLParser` value `resolve_entities` to `true` or `internal` (the default value). If untrusted XML is parsed with entity expansion enabled, a malicious attacker could submit a document that contains very deeply nested entity definitions (known as a Billion Laughs Attack), causing the parser to use large amounts of memory and processing power resulting in a denial of service (DoS) condition. |
|
(2) Event remediation: |
Explicitly set `resolve_entities` argument to `False`. |
57 etree.XMLParser(huge_tree=True),
58 )
59
60
61 def timeout_to_seconds(timeout: Union[int, str]) -> Optional[int]:
62 """
63 Transform pacemaker style timeout to number of seconds. If `timeout` is not
64 a valid timeout, `None` is returned.
65
66 timeout -- timeout string
67 """
68 try:
69 candidate = int(timeout)
70 if candidate >= 0:
71 return candidate
72 return None
73 except ValueError:
74 pass
75 # Now we know the timeout is not an integer nor an integer string.
76 # Let's make sure mypy knows the timeout is a string as well.
77 timeout = str(timeout)
78 suffix_multiplier = {
79 "s": 1,
80 "sec": 1,
81 "m": 60,
82 "min": 60,
83 "h": 3600,
84 "hr": 3600,
85 }
86 for suffix, multiplier in suffix_multiplier.items():
87 if timeout.endswith(suffix):
88 candidate2 = timeout[: -len(suffix)]
89 if is_integer(candidate2, at_least=0):
90 return int(candidate2) * multiplier
91 return None
92
93
94 @dataclass(frozen=True)
95 class Version:
96 major: int
97 minor: Optional[int] = None
98 revision: Optional[int] = None
99
100 @property
101 def as_full_tuple(self) -> tuple[int, int, int]:
102 return (
103 self.major,
104 self.minor if self.minor is not None else 0,
105 self.revision if self.revision is not None else 0,
106 )
107
108 def normalize(self) -> "Version":
109 return self.__class__(*self.as_full_tuple)
110
111 def __iter__(self) -> Generator[Optional[int], None, None]:
112 yield from astuple(self)
113
114 def __getitem__(self, index: int) -> Optional[int]:
115 return astuple(self)[index]
116
117 def __str__(self) -> str:
118 return ".".join([str(x) for x in self if x is not None])
119
120 def __lt__(self, other: "Version") -> bool:
121 return self.as_full_tuple < other.as_full_tuple
122
123 def __le__(self, other: "Version") -> bool:
124 return self.as_full_tuple <= other.as_full_tuple
125
126 def __hash__(self) -> int:
127 # https://docs.astral.sh/ruff/rules/eq-without-hash/
128 # used self.as_full_tuple because __eq__ and __hash__ should be in sync,
129 # objects which compare equal must have the same hash value
130 return hash(self.as_full_tuple)
131
132 # See, https://stackoverflow.com/questions/37557411/why-does-defining-the-argument-types-for-eq-throw-a-mypy-type-error
133 def __eq__(self, other: object) -> bool:
134 if not isinstance(other, Version):
135 return NotImplemented
136 return self.as_full_tuple == other.as_full_tuple
137
138 def __ne__(self, other: object) -> bool:
139 if not isinstance(other, Version):
140 return NotImplemented
141 return self.as_full_tuple != other.as_full_tuple
142
143 def __gt__(self, other: "Version") -> bool:
144 return self.as_full_tuple > other.as_full_tuple
145
146 def __ge__(self, other: "Version") -> bool:
147 return self.as_full_tuple >= other.as_full_tuple
148
149
150 def get_version_from_string(value: str) -> Optional[Version]:
151 """
152 Get Version instance from a string or None if version cannot be determined.
153
154 value -- string to be searched for version pattern
155 """
156 match = _VERSION_PATTERN.search(value)
157 if not match:
158 return None
159 return Version(
160 int(match.group("major")),
161 int(match.group("minor")),
162 int(match.group("rev")) if match.group("rev") else None,
163 )
164