1    	from pprint import pformat
2    	from urllib.parse import urlencode
3    	
4    	from tornado.httputil import HTTPHeaders
5    	from tornado.testing import AsyncHTTPTestCase
6    	from tornado.web import Application
7    	
8    	from pcs.daemon import ruby_pcsd
9    	
10   	USER = "user"
11   	GROUPS = ["group1", "group2"]
(1) Event Sigma main event: A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic).
(2) Event remediation: Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store.
12   	PASSWORD = "password"
13   	
14   	
15   	class RubyPcsdWrapper(ruby_pcsd.Wrapper):
16   	    def __init__(self, request_type):
17   	        # pylint: disable=super-init-not-called
18   	        self.request_type = request_type
19   	        self.status_code = 200
20   	        self.headers = {"Some": "value"}
21   	        self.body = b"Success action"
22   	
23   	        self._run_ruby_called = False
24   	        self._run_ruby_payload = None
25   	
26   	    async def run_ruby(
27   	        self,
28   	        request_type,
29   	        http_request=None,
30   	        payload=None,
31   	    ):
32   	        del http_request
33   	        self._run_ruby_called = True
34   	        self._run_ruby_payload = payload
35   	        if request_type != self.request_type:
36   	            raise AssertionError(
37   	                f"Wrong request type: expected '{self.request_type}'"
38   	                f" but was {request_type}"
39   	            )
40   	        return {
41   	            "headers": self.headers,
42   	            "status": self.status_code,
43   	            "body": self.body,
44   	        }
45   	
46   	    @property
47   	    def was_run_ruby_called(self) -> bool:
48   	        return self._run_ruby_called
49   	
50   	    @property
51   	    def run_ruby_payload(self):
52   	        return self._run_ruby_payload
53   	
54   	
55   	class AppTest(AsyncHTTPTestCase):
56   	    wrapper = None
57   	
58   	    def get_app(self):
59   	        return Application(self.get_routes())
60   	
61   	    def get_routes(self):
62   	        # pylint: disable=no-self-use
63   	        return []
64   	
65   	    def fetch(self, path, raise_error=False, **kwargs):
66   	        if "follow_redirects" not in kwargs:
67   	            kwargs["follow_redirects"] = False
68   	
69   	        if "is_ajax" in kwargs:
70   	            if "headers" not in kwargs:
71   	                kwargs["headers"] = {}
72   	            kwargs["headers"]["X-Requested-With"] = "XMLHttpRequest"
73   	            del kwargs["is_ajax"]
74   	
75   	        response = super().fetch(path, raise_error=raise_error, **kwargs)
76   	        # "Strict-Transport-Security" header is expected in every response
77   	        self.assertTrue(
78   	            "Strict-Transport-Security" in response.headers,
79   	            f"No 'Strict-Transport-Security' header in response for '{path}'",
80   	        )
81   	        return response
82   	
83   	    def post(self, path, body, **kwargs):
84   	        kwargs.update(
85   	            {
86   	                "method": "POST",
87   	                "body": urlencode(body),
88   	            }
89   	        )
90   	        return self.fetch(path, **kwargs)
91   	
92   	    def get(self, path, **kwargs):
93   	        return self.fetch(path, **kwargs)
94   	
95   	    def assert_headers_contains(self, headers: HTTPHeaders, contained: dict):
96   	        self.assertTrue(
97   	            all(item in headers.get_all() for item in contained.items()),
98   	            "Headers does not contain expected headers"
99   	            "\n  Expected headers:"
100  	            f"\n    {pformat(contained, indent=6)}"
101  	            "\n  All headers:"
102  	            f"\n    {pformat(dict(headers.get_all()), indent=6)}",
103  	        )
104  	
105  	    def assert_wrappers_response(self, response):
106  	        self.assertEqual(response.code, self.wrapper.status_code)
107  	        self.assert_headers_contains(response.headers, self.wrapper.headers)
108  	        self.assertEqual(response.body, self.wrapper.body)
109  	
110  	    def assert_unauth_ajax(self, response):
111  	        self.assertEqual(response.code, 401)
112  	        self.assertEqual(response.body, b'{"notauthorized":"true"}')
113