1 import re
2 import uuid
3 from collections.abc import Generator, MutableSet
4 from dataclasses import astuple, dataclass
5 from typing import TypeVar
6
7 from lxml import etree
8 from lxml.etree import _Element
9
10 from pcs.common.types import StringCollection
11 from pcs.common.validate import is_integer
12
13 T = TypeVar("T", bound=type)
14
15 _VERSION_PATTERN = re.compile(
16 r"(?P<major>\d+)\.(?P<minor>\d+)(\.(?P<rev>\d+))?"
17 )
18
19
20 def bin_to_str(binary: bytes) -> str:
21 return "".join(map(chr, binary))
22
23
24 def get_all_subclasses(cls: T) -> MutableSet[T]:
25 subclasses = set(cls.__subclasses__())
26 return subclasses.union(
27 {s for c in subclasses for s in get_all_subclasses(c)}
28 )
29
30
31 def get_unique_uuid(already_used: StringCollection) -> str:
32 is_duplicate = True
33 while is_duplicate:
34 candidate = str(uuid.uuid4())
35 is_duplicate = candidate in already_used
36 return candidate
37
38
39 def format_os_error(e: OSError) -> str:
40 if e.filename:
41 return f"{e.strerror}: '{e.filename}'"
42 if e.strerror:
43 return e.strerror
44 return (f"{e.__class__.__name__} {e}").strip()
45
46
47 def xml_fromstring(xml: str) -> _Element:
48 # If the xml contains encoding declaration such as:
49 # <?xml version="1.0" encoding="UTF-8"?>
50 # we get an exception in python3:
51 # ValueError: Unicode strings with encoding declaration are not supported.
52 # Please use bytes input or XML fragments without declaration.
53 # So we encode the string to bytes.
54 return etree.fromstring(
55 xml.encode("utf-8"),
56 # it raises on a huge xml without the flag huge_tree=True
57 # see https://bugzilla.redhat.com/show_bug.cgi?id=1506864
|
CID (unavailable; MK=4b80d73ed452516eeaf78fba1c4f4be1) (#1 of 1): XML entity processing enabled (SIGMA.xml_entity_enabled): |
|
(1) Event Sigma main event: |
The Python application enables entity expansion by setting the `lxml.etree.XMLParser` value `resolve_entities` to `true` or `internal` (the default value). If untrusted XML is parsed with entity expansion enabled, a malicious attacker could submit a document that contains very deeply nested entity definitions (known as a Billion Laughs Attack), causing the parser to use large amounts of memory and processing power resulting in a denial of service (DoS) condition. |
|
(2) Event remediation: |
Explicitly set `resolve_entities` argument to `False`. |
58 etree.XMLParser(huge_tree=True),
59 )
60
61
62 def timeout_to_seconds(timeout: int | str) -> int | None:
63 """
64 Transform pacemaker style timeout to number of seconds. If `timeout` is not
65 a valid timeout, `None` is returned.
66
67 timeout -- timeout string
68 """
69 try:
70 candidate = int(timeout)
71 if candidate >= 0:
72 return candidate
73 return None
74 except ValueError:
75 pass
76 # Now we know the timeout is not an integer nor an integer string.
77 # Let's make sure mypy knows the timeout is a string as well.
78 timeout = str(timeout)
79 suffix_multiplier = {
80 "s": 1,
81 "sec": 1,
82 "m": 60,
83 "min": 60,
84 "h": 3600,
85 "hr": 3600,
86 }
87 for suffix, multiplier in suffix_multiplier.items():
88 if timeout.endswith(suffix):
89 candidate2 = timeout[: -len(suffix)]
90 if is_integer(candidate2, at_least=0):
91 return int(candidate2) * multiplier
92 return None
93
94
95 @dataclass(frozen=True)
96 class Version:
97 major: int
98 minor: int | None = None
99 revision: int | None = None
100
101 @property
102 def as_full_tuple(self) -> tuple[int, int, int]:
103 return (
104 self.major,
105 self.minor if self.minor is not None else 0,
106 self.revision if self.revision is not None else 0,
107 )
108
109 def normalize(self) -> "Version":
110 return self.__class__(*self.as_full_tuple)
111
112 def __iter__(self) -> Generator[int | None, None, None]:
113 yield from astuple(self)
114
115 def __getitem__(self, index: int) -> int | None:
116 return astuple(self)[index]
117
118 def __str__(self) -> str:
119 return ".".join([str(x) for x in self if x is not None])
120
121 def __lt__(self, other: "Version") -> bool:
122 return self.as_full_tuple < other.as_full_tuple
123
124 def __le__(self, other: "Version") -> bool:
125 return self.as_full_tuple <= other.as_full_tuple
126
127 def __hash__(self) -> int:
128 # https://docs.astral.sh/ruff/rules/eq-without-hash/
129 # used self.as_full_tuple because __eq__ and __hash__ should be in sync,
130 # objects which compare equal must have the same hash value
131 return hash(self.as_full_tuple)
132
133 # See, https://stackoverflow.com/questions/37557411/why-does-defining-the-argument-types-for-eq-throw-a-mypy-type-error
134 def __eq__(self, other: object) -> bool:
135 if not isinstance(other, Version):
136 return NotImplemented
137 return self.as_full_tuple == other.as_full_tuple
138
139 def __ne__(self, other: object) -> bool:
140 if not isinstance(other, Version):
141 return NotImplemented
142 return self.as_full_tuple != other.as_full_tuple
143
144 def __gt__(self, other: "Version") -> bool:
145 return self.as_full_tuple > other.as_full_tuple
146
147 def __ge__(self, other: "Version") -> bool:
148 return self.as_full_tuple >= other.as_full_tuple
149
150
151 def get_version_from_string(value: str) -> Version | None:
152 """
153 Get Version instance from a string or None if version cannot be determined.
154
155 value -- string to be searched for version pattern
156 """
157 match = _VERSION_PATTERN.search(value)
158 if not match:
159 return None
160 return Version(
161 int(match.group("major")),
162 int(match.group("minor")),
163 int(match.group("rev")) if match.group("rev") else None,
164 )
165