1    	# ruff: noqa: B019 https://docs.astral.sh/ruff/rules/cached-instance-method/
2    	import os.path
3    	import ssl
4    	from collections import namedtuple
5    	from functools import lru_cache
6    	
7    	from pcs import settings
8    	from pcs.common.validate import (
9    	    is_integer,
10   	    is_port_number,
11   	)
12   	
13   	# Relative location instead of system location is used for development purposes.
14   	LOCAL_PUBLIC_DIR = os.path.realpath(
15   	    os.path.dirname(os.path.abspath(__file__)) + "/../../pcsd/public"
16   	)
17   	LOCAL_WEBUI_DIR = os.path.join(LOCAL_PUBLIC_DIR, "ui")
18   	WEBUI_FALLBACK_FILE = "ui_instructions.html"
19   	
20   	PCSD_PORT = "PCSD_PORT"
21   	PCSD_SSL_CIPHERS = "PCSD_SSL_CIPHERS"
22   	PCSD_SSL_OPTIONS = "PCSD_SSL_OPTIONS"
23   	PCSD_BIND_ADDR = "PCSD_BIND_ADDR"
24   	NOTIFY_SOCKET = "NOTIFY_SOCKET"
25   	PCSD_DEBUG = "PCSD_DEBUG"
26   	PCSD_DISABLE_GUI = "PCSD_DISABLE_GUI"
27   	PCSD_SESSION_LIFETIME = "PCSD_SESSION_LIFETIME"
28   	PCSD_DEV = "PCSD_DEV"
29   	WEBUI_DIR = "WEBUI_DIR"
30   	WEBUI_FALLBACK = "WEBUI_FALLBACK"
31   	PCSD_WORKER_COUNT = "PCSD_WORKER_COUNT"
32   	PCSD_WORKER_RESET_LIMIT = "PCSD_WORKER_RESET_LIMIT"
33   	PCSD_MAX_WORKER_COUNT = "PCSD_MAX_WORKER_COUNT"
34   	PCSD_DEADLOCK_THRESHOLD_TIMEOUT = "PCSD_DEADLOCK_THRESHOLD_TIMEOUT"
35   	PCSD_CHECK_INTERVAL_MS = "PCSD_CHECK_INTERVAL_MS"
36   	PCSD_TASK_ABANDONED_TIMEOUT = "PCSD_TASK_ABANDONED_TIMEOUT"
37   	PCSD_TASK_UNRESPONSIVE_TIMEOUT = "PCSD_TASK_UNRESPONSIVE_TIMEOUT"
38   	PCSD_TASK_DELETION_TIMEOUT = "PCSD_TASK_DELETION_TIMEOUT"
39   	
40   	Env = namedtuple(
41   	    "Env",
42   	    [
43   	        PCSD_PORT,
44   	        PCSD_SSL_CIPHERS,
45   	        PCSD_SSL_OPTIONS,
46   	        PCSD_BIND_ADDR,
47   	        NOTIFY_SOCKET,
48   	        PCSD_DEBUG,
49   	        PCSD_DISABLE_GUI,
50   	        PCSD_SESSION_LIFETIME,
51   	        WEBUI_DIR,
52   	        WEBUI_FALLBACK,
53   	        PCSD_DEV,
54   	        PCSD_WORKER_COUNT,
55   	        PCSD_WORKER_RESET_LIMIT,
56   	        PCSD_MAX_WORKER_COUNT,
57   	        PCSD_DEADLOCK_THRESHOLD_TIMEOUT,
58   	        PCSD_CHECK_INTERVAL_MS,
59   	        PCSD_TASK_ABANDONED_TIMEOUT,
60   	        PCSD_TASK_UNRESPONSIVE_TIMEOUT,
61   	        PCSD_TASK_DELETION_TIMEOUT,
62   	        "has_errors",
63   	    ],
64   	)
65   	
66   	
67   	def prepare_env(environ, logger=None):
68   	    loader = EnvLoader(environ)
69   	    loader.check_webui()
70   	    env = Env(
71   	        loader.port(),
72   	        loader.ssl_ciphers(),
73   	        loader.ssl_options(),
74   	        loader.bind_addresses(),
75   	        loader.notify_socket(),
76   	        loader.pcsd_debug(),
77   	        loader.pcsd_disable_gui(),
78   	        loader.session_lifetime(),
79   	        loader.webui_dir(),
80   	        loader.webui_fallback(),
81   	        loader.pcsd_dev(),
82   	        loader.pcsd_worker_count(),
83   	        loader.pcsd_worker_reset_limit(),
84   	        loader.pcsd_max_worker_count(),
85   	        loader.pcsd_deadlock_threshold_timeout(),
86   	        loader.pcsd_check_interval_ms(),
87   	        loader.pcsd_task_abandoned_timeout(),
88   	        loader.pcsd_task_unresponsive_timeout(),
89   	        loader.pcsd_task_deletion_timeout(),
90   	        loader.has_errors(),
91   	    )
92   	    if logger:
93   	        for error in loader.errors:
94   	            logger.error(error)
95   	        for warning in loader.warnings:
96   	            logger.warning(warning)
97   	    return env
98   	
99   	
100  	def str_to_ssl_options(ssl_options_string, reports):
101  	    ssl_options = 0
102  	    # We are tolerant to trailing whitespaces and trailing comas.
103  	    raw_ssl_options = ssl_options_string.strip(" ,")
104  	    if not raw_ssl_options:  # raw_ssl_options.split(",") == [""]
105  	        return ssl_options
106  	    for raw_option in raw_ssl_options.split(","):
107  	        option = raw_option.strip()
108  	        if option == "OP_NO_RENEGOTIATION" and not hasattr(ssl, option):
109  	            # OP_NO_RENEGOTIATION is for a prevention of DoS attacks.
110  	            # See https://bugzilla.redhat.com/show_bug.cgi?id=1566430
111  	            #
112  	            # OP_NO_RENEGOTIATION is new in python 3.7. `pcs` supports python
113  	            # 3.6+ but even with python 3.6 it is possible to use this option if
114  	            # the underlying openssl has version 1.1.0h+.
115  	            ssl_options |= 1073741824
116  	        elif option.startswith("OP_") and hasattr(ssl, option):
117  	            ssl_options |= getattr(ssl, option)
118  	        else:
119  	            reports.append(f"Ignoring unknown SSL option '{option}'")
120  	    return ssl_options
121  	
122  	
123  	class EnvLoader:
124  	    # pylint: disable=too-many-public-methods
125  	    def __init__(self, environ):
126  	        self.environ = environ
127  	        self.errors = []
128  	        self.warnings = []
129  	
130  	    def has_errors(self):
131  	        return len(self.errors) > 0
132  	
133  	    @lru_cache(maxsize=5)
134  	    def port(self):
135  	        port = self.environ.get(PCSD_PORT, settings.pcsd_default_port)
136  	        if not is_port_number(port):
137  	            self.errors.append(f"Invalid port number '{port}', use 1..65535")
138  	        return port
139  	
140  	    def ssl_ciphers(self):
141  	        ssl_ciphers = self.environ.get(
142  	            PCSD_SSL_CIPHERS, settings.default_ssl_ciphers
143  	        )
144  	        try:  # validate ciphers
(1) Event Sigma main event: Certificate validation has been disabled for the `ssl.SSLContext` instance. This allows for the acceptance of a rogue certificate and the potential for a manipulator-in-the-middle (MITM) attack.
(2) Event remediation: Modify the `ssl.SSLContext` instance to enable certificate validation: * Explicitly set `ssl.SSLContext.verify_mode` to `ssl.CERT_REQUIRED` or set `ssl.SSLContext.check_hostname` to `True` as enabling `check_hostname` will automatically change the default from `CERT_NONE` to `CERT_REQUIRED`. * Explicitly set `protocol` to `ssl.PROTOCOL_TLS_SERVER` or `ssl.PROTOCOL_TLS_CLIENT`.
145  	            ssl.SSLContext().set_ciphers(ssl_ciphers)
146  	        except ssl.SSLError as e:
147  	            self.errors.append(f"Invalid ciphers: '{e}'")
148  	        return ssl_ciphers
149  	
150  	    def ssl_options(self):
151  	        if PCSD_SSL_OPTIONS in self.environ:
152  	            # User knows about underlying system. If there is a wrong option it
153  	            # may be a typo - let them correct it. They are able to correct it
154  	            # in pcsd.conf.
155  	            return str_to_ssl_options(
156  	                self.environ[PCSD_SSL_OPTIONS], self.errors
157  	            )
158  	        # Vanilla pcsd should run even on an "exotic" system. If there is
159  	        # a wrong option it is not probably a typo... User should not be
160  	        # forced to modify source code (settings.py).
161  	        return str_to_ssl_options(settings.default_ssl_options, self.warnings)
162  	
163  	    def bind_addresses(self):
164  	        if PCSD_BIND_ADDR not in self.environ:
165  	            return {None}
166  	
167  	        raw_bind_addresses = self.environ[PCSD_BIND_ADDR]
168  	        if not raw_bind_addresses.strip():
169  	            return {""}
170  	
171  	        return {a.strip() for a in raw_bind_addresses.split(",")}
172  	
173  	    def notify_socket(self):
174  	        return self.environ.get(NOTIFY_SOCKET, None)
175  	
176  	    def pcsd_disable_gui(self):
177  	        return self.__has_true_in_environ(PCSD_DISABLE_GUI)
178  	
179  	    def session_lifetime(self):
180  	        session_lifetime = self.environ.get(
181  	            PCSD_SESSION_LIFETIME, settings.gui_session_lifetime_seconds
182  	        )
183  	        try:
184  	            return int(session_lifetime)
185  	        except ValueError:
186  	            self.errors.append(
187  	                f"Invalid PCSD_SESSION_LIFETIME value '{session_lifetime}'"
188  	                " (it must be an integer)"
189  	            )
190  	            return session_lifetime
191  	
192  	    def pcsd_debug(self):
193  	        return self.__has_true_in_environ(PCSD_DEBUG)
194  	
195  	    def check_webui(self):
196  	        if not self.pcsd_disable_gui() and not (
197  	            os.path.exists(self.webui_dir())
198  	            or os.path.exists(self.webui_fallback())
199  	        ):
200  	            self.errors.append(
201  	                f"Webui assets directory '{self.webui_dir()}'"
202  	                + f" or fallback html '{self.webui_fallback()}' does not exist"
203  	            )
204  	
205  	    @lru_cache(maxsize=5)
206  	    def webui_dir(self):
207  	        return LOCAL_WEBUI_DIR if self.pcsd_dev() else settings.pcsd_webui_dir
208  	
209  	    @lru_cache(maxsize=5)
210  	    def webui_fallback(self):
211  	        return os.path.join(
212  	            LOCAL_PUBLIC_DIR if self.pcsd_dev() else settings.pcsd_public_dir,
213  	            WEBUI_FALLBACK_FILE,
214  	        )
215  	
216  	    @lru_cache(maxsize=5)
217  	    def pcsd_dev(self):
218  	        return self.__has_true_in_environ(PCSD_DEV)
219  	
220  	    def _get_positive_int(self, key: str, default: int) -> int:
221  	        value = self.environ.get(key, default)
222  	        if not is_integer(value, at_least=1):
223  	            self.errors.append(
224  	                f"Value '{value}' for '{key}' is not a positive integer"
225  	            )
226  	            return default
227  	        return int(value)
228  	
229  	    def _get_non_negative_int(self, key: str, default: int) -> int:
230  	        value = self.environ.get(key, default)
231  	        if not is_integer(value, at_least=0):
232  	            self.errors.append(
233  	                f"Value '{value}' for '{key}' is not a non-negative integer"
234  	            )
235  	            return default
236  	        return int(value)
237  	
238  	    @lru_cache(maxsize=1)
239  	    def pcsd_worker_count(self) -> int:
240  	        return self._get_positive_int(
241  	            PCSD_WORKER_COUNT, settings.pcsd_worker_count
242  	        )
243  	
244  	    @lru_cache(maxsize=1)
245  	    def pcsd_worker_reset_limit(self) -> int:
246  	        return self._get_positive_int(
247  	            PCSD_WORKER_RESET_LIMIT, settings.pcsd_worker_reset_limit
248  	        )
249  	
250  	    @lru_cache(maxsize=1)
251  	    def pcsd_max_worker_count(self) -> int:
252  	        return self._get_positive_int(
253  	            PCSD_MAX_WORKER_COUNT,
254  	            self.pcsd_worker_count() + settings.pcsd_temporary_workers,
255  	        )
256  	
257  	    @lru_cache(maxsize=1)
258  	    def pcsd_deadlock_threshold_timeout(self) -> int:
259  	        return self._get_non_negative_int(
260  	            PCSD_DEADLOCK_THRESHOLD_TIMEOUT,
261  	            settings.pcsd_deadlock_threshold_timeout,
262  	        )
263  	
264  	    @lru_cache(maxsize=1)
265  	    def pcsd_check_interval_ms(self) -> int:
266  	        return self._get_positive_int(
267  	            PCSD_CHECK_INTERVAL_MS, settings.async_api_scheduler_interval_ms
268  	        )
269  	
270  	    @lru_cache(maxsize=1)
271  	    def pcsd_task_abandoned_timeout(self) -> int:
272  	        return self._get_positive_int(
273  	            PCSD_TASK_ABANDONED_TIMEOUT, settings.task_abandoned_timeout_seconds
274  	        )
275  	
276  	    @lru_cache(maxsize=1)
277  	    def pcsd_task_unresponsive_timeout(self) -> int:
278  	        return self._get_positive_int(
279  	            PCSD_TASK_UNRESPONSIVE_TIMEOUT,
280  	            settings.task_unresponsive_timeout_seconds,
281  	        )
282  	
283  	    @lru_cache(maxsize=1)
284  	    def pcsd_task_deletion_timeout(self) -> int:
285  	        return self._get_non_negative_int(
286  	            PCSD_TASK_DELETION_TIMEOUT, settings.task_deletion_timeout_seconds
287  	        )
288  	
289  	    def __has_true_in_environ(self, environ_key):
290  	        return self.environ.get(environ_key, "").lower() == "true"
291