1    	# ruff: noqa: B019 https://docs.astral.sh/ruff/rules/cached-instance-method/
2    	import os.path
3    	import ssl
4    	from collections import namedtuple
5    	from functools import lru_cache
6    	
7    	from pcs import settings
8    	from pcs.common.validate import (
9    	    is_integer,
10   	    is_port_number,
11   	)
12   	
13   	try:
14   	    from pcs.daemon.app import webui
15   	except ImportError:
16   	    webui = None
17   	
18   	# Relative location instead of system location is used for development purposes.
19   	LOCAL_PUBLIC_DIR = os.path.realpath(
20   	    os.path.dirname(os.path.abspath(__file__)) + "/../../pcsd/public"
21   	)
22   	LOCAL_WEBUI_DIR = os.path.join(LOCAL_PUBLIC_DIR, "ui")
23   	WEBUI_FALLBACK_FILE = "ui_instructions.html"
24   	
25   	PCSD_PORT = "PCSD_PORT"
26   	PCSD_SSL_CIPHERS = "PCSD_SSL_CIPHERS"
27   	PCSD_SSL_OPTIONS = "PCSD_SSL_OPTIONS"
28   	PCSD_BIND_ADDR = "PCSD_BIND_ADDR"
29   	NOTIFY_SOCKET = "NOTIFY_SOCKET"
30   	PCSD_DEBUG = "PCSD_DEBUG"
31   	PCSD_SESSION_LIFETIME = "PCSD_SESSION_LIFETIME"
32   	PCSD_DEV = "PCSD_DEV"
33   	WEBUI_DIR = "WEBUI_DIR"
34   	WEBUI_FALLBACK = "WEBUI_FALLBACK"
35   	PCSD_WORKER_COUNT = "PCSD_WORKER_COUNT"
36   	PCSD_WORKER_RESET_LIMIT = "PCSD_WORKER_RESET_LIMIT"
37   	PCSD_MAX_WORKER_COUNT = "PCSD_MAX_WORKER_COUNT"
38   	PCSD_DEADLOCK_THRESHOLD_TIMEOUT = "PCSD_DEADLOCK_THRESHOLD_TIMEOUT"
39   	PCSD_CHECK_INTERVAL_MS = "PCSD_CHECK_INTERVAL_MS"
40   	PCSD_TASK_ABANDONED_TIMEOUT = "PCSD_TASK_ABANDONED_TIMEOUT"
41   	PCSD_TASK_UNRESPONSIVE_TIMEOUT = "PCSD_TASK_UNRESPONSIVE_TIMEOUT"
42   	PCSD_TASK_DELETION_TIMEOUT = "PCSD_TASK_DELETION_TIMEOUT"
43   	
44   	Env = namedtuple(
45   	    "Env",
46   	    [
47   	        PCSD_PORT,
48   	        PCSD_SSL_CIPHERS,
49   	        PCSD_SSL_OPTIONS,
50   	        PCSD_BIND_ADDR,
51   	        NOTIFY_SOCKET,
52   	        PCSD_DEBUG,
53   	        PCSD_SESSION_LIFETIME,
54   	        WEBUI_DIR,
55   	        WEBUI_FALLBACK,
56   	        PCSD_DEV,
57   	        PCSD_WORKER_COUNT,
58   	        PCSD_WORKER_RESET_LIMIT,
59   	        PCSD_MAX_WORKER_COUNT,
60   	        PCSD_DEADLOCK_THRESHOLD_TIMEOUT,
61   	        PCSD_CHECK_INTERVAL_MS,
62   	        PCSD_TASK_ABANDONED_TIMEOUT,
63   	        PCSD_TASK_UNRESPONSIVE_TIMEOUT,
64   	        PCSD_TASK_DELETION_TIMEOUT,
65   	        "has_errors",
66   	    ],
67   	)
68   	
69   	
70   	def prepare_env(environ, logger=None):
71   	    loader = EnvLoader(environ)
72   	    loader.check_webui()
73   	    env = Env(
74   	        loader.port(),
75   	        loader.ssl_ciphers(),
76   	        loader.ssl_options(),
77   	        loader.bind_addresses(),
78   	        loader.notify_socket(),
79   	        loader.pcsd_debug(),
80   	        loader.session_lifetime(),
81   	        loader.webui_dir(),
82   	        loader.webui_fallback(),
83   	        loader.pcsd_dev(),
84   	        loader.pcsd_worker_count(),
85   	        loader.pcsd_worker_reset_limit(),
86   	        loader.pcsd_max_worker_count(),
87   	        loader.pcsd_deadlock_threshold_timeout(),
88   	        loader.pcsd_check_interval_ms(),
89   	        loader.pcsd_task_abandoned_timeout(),
90   	        loader.pcsd_task_unresponsive_timeout(),
91   	        loader.pcsd_task_deletion_timeout(),
92   	        loader.has_errors(),
93   	    )
94   	    if logger:
95   	        for error in loader.errors:
96   	            logger.error(error)
97   	        for warning in loader.warnings:
98   	            logger.warning(warning)
99   	    return env
100  	
101  	
102  	def str_to_ssl_options(ssl_options_string, reports):
103  	    ssl_options = 0
104  	    # We are tolerant to trailing whitespaces and trailing comas.
105  	    raw_ssl_options = ssl_options_string.strip(" ,")
106  	    if not raw_ssl_options:  # raw_ssl_options.split(",") == [""]
107  	        return ssl_options
108  	    for raw_option in raw_ssl_options.split(","):
109  	        option = raw_option.strip()
110  	        if option == "OP_NO_RENEGOTIATION" and not hasattr(ssl, option):
111  	            # OP_NO_RENEGOTIATION is for a prevention of DoS attacks.
112  	            # See https://bugzilla.redhat.com/show_bug.cgi?id=1566430
113  	            #
114  	            # OP_NO_RENEGOTIATION is new in python 3.7. `pcs` supports python
115  	            # 3.6+ but even with python 3.6 it is possible to use this option if
116  	            # the underlying openssl has version 1.1.0h+.
117  	            ssl_options |= 1073741824
118  	        elif option.startswith("OP_") and hasattr(ssl, option):
119  	            ssl_options |= getattr(ssl, option)
120  	        else:
121  	            reports.append(f"Ignoring unknown SSL option '{option}'")
122  	    return ssl_options
123  	
124  	
125  	class EnvLoader:
126  	    # pylint: disable=too-many-public-methods
127  	    def __init__(self, environ):
128  	        self.environ = environ
129  	        self.errors = []
130  	        self.warnings = []
131  	
132  	    def has_errors(self):
133  	        return len(self.errors) > 0
134  	
135  	    @lru_cache(maxsize=5)
136  	    def port(self):
137  	        port = self.environ.get(PCSD_PORT, settings.pcsd_default_port)
138  	        if not is_port_number(port):
139  	            self.errors.append(f"Invalid port number '{port}', use 1..65535")
140  	        return port
141  	
142  	    def ssl_ciphers(self):
143  	        ssl_ciphers = self.environ.get(
144  	            PCSD_SSL_CIPHERS, settings.default_ssl_ciphers
145  	        )
146  	        try:  # validate ciphers
(1) Event Sigma main event: Certificate validation has been disabled for the `ssl.SSLContext` instance. This allows for the acceptance of a rogue certificate and the potential for a manipulator-in-the-middle (MITM) attack.
(2) Event remediation: Modify the `ssl.SSLContext` instance to enable certificate validation: * Explicitly set `ssl.SSLContext.verify_mode` to `ssl.CERT_REQUIRED` or set `ssl.SSLContext.check_hostname` to `True` as enabling `check_hostname` will automatically change the default from `CERT_NONE` to `CERT_REQUIRED`. * Explicitly set `protocol` to `ssl.PROTOCOL_TLS_SERVER` or `ssl.PROTOCOL_TLS_CLIENT`.
147  	            ssl.SSLContext().set_ciphers(ssl_ciphers)
148  	        except ssl.SSLError as e:
149  	            self.errors.append(f"Invalid ciphers: '{e}'")
150  	        return ssl_ciphers
151  	
152  	    def ssl_options(self):
153  	        if PCSD_SSL_OPTIONS in self.environ:
154  	            # User knows about underlying system. If there is a wrong option it
155  	            # may be a typo - let them correct it. They are able to correct it
156  	            # in pcsd.conf.
157  	            return str_to_ssl_options(
158  	                self.environ[PCSD_SSL_OPTIONS], self.errors
159  	            )
160  	        # Vanilla pcsd should run even on an "exotic" system. If there is
161  	        # a wrong option it is not probably a typo... User should not be
162  	        # forced to modify source code (settings.py).
163  	        return str_to_ssl_options(settings.default_ssl_options, self.warnings)
164  	
165  	    def bind_addresses(self):
166  	        if PCSD_BIND_ADDR not in self.environ:
167  	            return {None}
168  	
169  	        raw_bind_addresses = self.environ[PCSD_BIND_ADDR]
170  	        if not raw_bind_addresses.strip():
171  	            return {""}
172  	
173  	        return {a.strip() for a in raw_bind_addresses.split(",")}
174  	
175  	    def notify_socket(self):
176  	        return self.environ.get(NOTIFY_SOCKET, None)
177  	
178  	    def session_lifetime(self):
179  	        session_lifetime = self.environ.get(
180  	            PCSD_SESSION_LIFETIME, settings.gui_session_lifetime_seconds
181  	        )
182  	        try:
183  	            return int(session_lifetime)
184  	        except ValueError:
185  	            self.errors.append(
186  	                f"Invalid PCSD_SESSION_LIFETIME value '{session_lifetime}'"
187  	                " (it must be an integer)"
188  	            )
189  	            return session_lifetime
190  	
191  	    def pcsd_debug(self):
192  	        return self.__has_true_in_environ(PCSD_DEBUG)
193  	
194  	    def check_webui(self):
195  	        if webui and not (
196  	            os.path.exists(self.webui_dir())
197  	            or os.path.exists(self.webui_fallback())
198  	        ):
199  	            self.errors.append(
200  	                f"Webui assets directory '{self.webui_dir()}'"
201  	                + f" or fallback html '{self.webui_fallback()}' does not exist"
202  	            )
203  	
204  	    @lru_cache(maxsize=5)
205  	    def webui_dir(self):
206  	        return LOCAL_WEBUI_DIR if self.pcsd_dev() else settings.pcsd_webui_dir
207  	
208  	    @lru_cache(maxsize=5)
209  	    def webui_fallback(self):
210  	        return os.path.join(
211  	            LOCAL_PUBLIC_DIR if self.pcsd_dev() else settings.pcsd_public_dir,
212  	            WEBUI_FALLBACK_FILE,
213  	        )
214  	
215  	    @lru_cache(maxsize=5)
216  	    def pcsd_dev(self):
217  	        return self.__has_true_in_environ(PCSD_DEV)
218  	
219  	    def _get_positive_int(self, key: str, default: int) -> int:
220  	        value = self.environ.get(key, default)
221  	        if not is_integer(value, at_least=1):
222  	            self.errors.append(
223  	                f"Value '{value}' for '{key}' is not a positive integer"
224  	            )
225  	            return default
226  	        return int(value)
227  	
228  	    def _get_non_negative_int(self, key: str, default: int) -> int:
229  	        value = self.environ.get(key, default)
230  	        if not is_integer(value, at_least=0):
231  	            self.errors.append(
232  	                f"Value '{value}' for '{key}' is not a non-negative integer"
233  	            )
234  	            return default
235  	        return int(value)
236  	
237  	    @lru_cache(maxsize=1)
238  	    def pcsd_worker_count(self) -> int:
239  	        return self._get_positive_int(
240  	            PCSD_WORKER_COUNT, settings.pcsd_worker_count
241  	        )
242  	
243  	    @lru_cache(maxsize=1)
244  	    def pcsd_worker_reset_limit(self) -> int:
245  	        return self._get_positive_int(
246  	            PCSD_WORKER_RESET_LIMIT, settings.pcsd_worker_reset_limit
247  	        )
248  	
249  	    @lru_cache(maxsize=1)
250  	    def pcsd_max_worker_count(self) -> int:
251  	        return self._get_positive_int(
252  	            PCSD_MAX_WORKER_COUNT,
253  	            self.pcsd_worker_count() + settings.pcsd_temporary_workers,
254  	        )
255  	
256  	    @lru_cache(maxsize=1)
257  	    def pcsd_deadlock_threshold_timeout(self) -> int:
258  	        return self._get_non_negative_int(
259  	            PCSD_DEADLOCK_THRESHOLD_TIMEOUT,
260  	            settings.pcsd_deadlock_threshold_timeout,
261  	        )
262  	
263  	    @lru_cache(maxsize=1)
264  	    def pcsd_check_interval_ms(self) -> int:
265  	        return self._get_positive_int(
266  	            PCSD_CHECK_INTERVAL_MS, settings.async_api_scheduler_interval_ms
267  	        )
268  	
269  	    @lru_cache(maxsize=1)
270  	    def pcsd_task_abandoned_timeout(self) -> int:
271  	        return self._get_positive_int(
272  	            PCSD_TASK_ABANDONED_TIMEOUT, settings.task_abandoned_timeout_seconds
273  	        )
274  	
275  	    @lru_cache(maxsize=1)
276  	    def pcsd_task_unresponsive_timeout(self) -> int:
277  	        return self._get_positive_int(
278  	            PCSD_TASK_UNRESPONSIVE_TIMEOUT,
279  	            settings.task_unresponsive_timeout_seconds,
280  	        )
281  	
282  	    @lru_cache(maxsize=1)
283  	    def pcsd_task_deletion_timeout(self) -> int:
284  	        return self._get_non_negative_int(
285  	            PCSD_TASK_DELETION_TIMEOUT, settings.task_deletion_timeout_seconds
286  	        )
287  	
288  	    def __has_true_in_environ(self, environ_key):
289  	        return self.environ.get(environ_key, "").lower() == "true"
290