1    	from pprint import pformat
2    	from urllib.parse import urlencode
3    	
4    	from tornado.httputil import (
5    	    HTTPHeaders,
6    	    parse_cookie,
7    	)
8    	from tornado.testing import AsyncHTTPTestCase
9    	from tornado.web import Application
10   	
11   	from pcs.daemon import (
12   	    ruby_pcsd,
13   	    session,
14   	)
15   	from pcs.daemon.app.auth import PCSD_SESSION
16   	
17   	USER = "user"
18   	GROUPS = ["group1", "group2"]
(1) Event Sigma main event: A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic).
(2) Event remediation: Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store.
19   	PASSWORD = "password"
20   	
21   	
22   	class RubyPcsdWrapper(ruby_pcsd.Wrapper):
23   	    def __init__(self, request_type):
24   	        # pylint: disable=super-init-not-called
25   	        self.request_type = request_type
26   	        self.status_code = 200
27   	        self.headers = {"Some": "value"}
28   	        self.body = b"Success action"
29   	
30   	    async def run_ruby(
31   	        self,
32   	        request_type,
33   	        http_request=None,
34   	        payload=None,
35   	    ):
36   	        del http_request, payload
37   	        if request_type != self.request_type:
38   	            raise AssertionError(
39   	                f"Wrong request type: expected '{self.request_type}'"
40   	                f" but was {request_type}"
41   	            )
42   	        return {
43   	            "headers": self.headers,
44   	            "status": self.status_code,
45   	            "body": self.body,
46   	        }
47   	
48   	
49   	class AppTest(AsyncHTTPTestCase):
50   	    wrapper = None
51   	
52   	    def get_app(self):
53   	        return Application(self.get_routes())
54   	
55   	    def get_routes(self):
56   	        # pylint: disable=no-self-use
57   	        return []
58   	
59   	    def fetch(self, path, raise_error=False, **kwargs):
60   	        if "follow_redirects" not in kwargs:
61   	            kwargs["follow_redirects"] = False
62   	        response = super().fetch(path, raise_error=raise_error, **kwargs)
63   	        # "Strict-Transport-Security" header is expected in every response
64   	        self.assertTrue(
65   	            "Strict-Transport-Security" in response.headers,
66   	            f"No 'Strict-Transport-Security' header in response for '{path}'",
67   	        )
68   	        return response
69   	
70   	    def post(self, path, body, **kwargs):
71   	        kwargs.update(
72   	            {
73   	                "method": "POST",
74   	                "body": urlencode(body),
75   	            }
76   	        )
77   	        return self.fetch(path, **kwargs)
78   	
79   	    def get(self, path, **kwargs):
80   	        return self.fetch(path, **kwargs)
81   	
82   	    def assert_headers_contains(self, headers: HTTPHeaders, contained: dict):
83   	        self.assertTrue(
84   	            all(item in headers.get_all() for item in contained.items()),
85   	            "Headers does not contain expected headers"
86   	            "\n  Expected headers:"
87   	            f"\n    {pformat(contained, indent=6)}"
88   	            "\n  All headers:"
89   	            f"\n    {pformat(dict(headers.get_all()), indent=6)}",
90   	        )
91   	
92   	    def assert_wrappers_response(self, response):
93   	        self.assertEqual(response.code, self.wrapper.status_code)
94   	        self.assert_headers_contains(response.headers, self.wrapper.headers)
95   	        self.assertEqual(response.body, self.wrapper.body)
96   	
97   	
98   	class AppUiTestMixin(AppTest):
99   	    def setUp(self):
100  	        self.session_storage = session.Storage(lifetime_seconds=10)
101  	        super().setUp()
102  	
103  	    def assert_session_in_response(self, response, sid=None):
104  	        self.assertTrue("Set-Cookie" in response.headers)
105  	        cookie = parse_cookie(response.headers["Set-Cookie"])
106  	        self.assertTrue(PCSD_SESSION, cookie)
107  	        if sid:
108  	            self.assertEqual(cookie[PCSD_SESSION], sid)
109  	        return cookie[PCSD_SESSION]
110  	
111  	    def fetch(self, path, raise_error=False, **kwargs):
112  	        if "sid" in kwargs:
113  	            if "headers" not in kwargs:
114  	                kwargs["headers"] = {}
115  	            kwargs["headers"]["Cookie"] = f"{PCSD_SESSION}={kwargs['sid']}"
116  	            del kwargs["sid"]
117  	
118  	        if "is_ajax" in kwargs:
119  	            if "headers" not in kwargs:
120  	                kwargs["headers"] = {}
121  	            kwargs["headers"]["X-Requested-With"] = "XMLHttpRequest"
122  	            del kwargs["is_ajax"]
123  	
124  	        if "follow_redirects" not in kwargs:
125  	            kwargs["follow_redirects"] = False
126  	
127  	        return super().fetch(path, raise_error=raise_error, **kwargs)
128  	
129  	    def create_login_session(self):
130  	        return self.session_storage.login(USER)
131  	
132  	    def assert_success_response(self, response, expected_body):
133  	        self.assertEqual(response.code, 200)
134  	        self.assertEqual(response.body.decode(), expected_body)
135  	
136  	    def assert_unauth_ajax(self, response):
137  	        self.assertEqual(response.code, 401)
138  	        self.assertEqual(response.body, b'{"notauthorized":"true"}')
139