1 from pprint import pformat
2 from urllib.parse import urlencode
3
4 from tornado.httputil import (
5 HTTPHeaders,
6 parse_cookie,
7 )
8 from tornado.testing import AsyncHTTPTestCase
9 from tornado.web import Application
10
11 from pcs.daemon import (
12 ruby_pcsd,
13 session,
14 )
15 from pcs.daemon.app.auth import PCSD_SESSION
16
17 USER = "user"
18 GROUPS = ["group1", "group2"]
|
(1) Event Sigma main event: |
A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic). |
|
(2) Event remediation: |
Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store. |
19 PASSWORD = "password"
20
21
22 class RubyPcsdWrapper(ruby_pcsd.Wrapper):
23 def __init__(self, request_type):
24 # pylint: disable=super-init-not-called
25 self.request_type = request_type
26 self.status_code = 200
27 self.headers = {"Some": "value"}
28 self.body = b"Success action"
29
30 async def run_ruby(
31 self,
32 request_type,
33 http_request=None,
34 payload=None,
35 ):
36 del http_request, payload
37 if request_type != self.request_type:
38 raise AssertionError(
39 f"Wrong request type: expected '{self.request_type}'"
40 f" but was {request_type}"
41 )
42 return {
43 "headers": self.headers,
44 "status": self.status_code,
45 "body": self.body,
46 }
47
48
49 class AppTest(AsyncHTTPTestCase):
50 wrapper = None
51
52 def get_app(self):
53 return Application(self.get_routes())
54
55 def get_routes(self):
56 # pylint: disable=no-self-use
57 return []
58
59 def fetch(self, path, raise_error=False, **kwargs):
60 if "follow_redirects" not in kwargs:
61 kwargs["follow_redirects"] = False
62 response = super().fetch(path, raise_error=raise_error, **kwargs)
63 # "Strict-Transport-Security" header is expected in every response
64 self.assertTrue(
65 "Strict-Transport-Security" in response.headers,
66 f"No 'Strict-Transport-Security' header in response for '{path}'",
67 )
68 return response
69
70 def post(self, path, body, **kwargs):
71 kwargs.update(
72 {
73 "method": "POST",
74 "body": urlencode(body),
75 }
76 )
77 return self.fetch(path, **kwargs)
78
79 def get(self, path, **kwargs):
80 return self.fetch(path, **kwargs)
81
82 def assert_headers_contains(self, headers: HTTPHeaders, contained: dict):
83 self.assertTrue(
84 all(item in headers.get_all() for item in contained.items()),
85 "Headers does not contain expected headers"
86 "\n Expected headers:"
87 f"\n {pformat(contained, indent=6)}"
88 "\n All headers:"
89 f"\n {pformat(dict(headers.get_all()), indent=6)}",
90 )
91
92 def assert_wrappers_response(self, response):
93 self.assertEqual(response.code, self.wrapper.status_code)
94 self.assert_headers_contains(response.headers, self.wrapper.headers)
95 self.assertEqual(response.body, self.wrapper.body)
96
97
98 class AppUiTestMixin(AppTest):
99 def setUp(self):
100 self.session_storage = session.Storage(lifetime_seconds=10)
101 super().setUp()
102
103 def assert_session_in_response(self, response, sid=None):
104 self.assertTrue("Set-Cookie" in response.headers)
105 cookie = parse_cookie(response.headers["Set-Cookie"])
106 self.assertTrue(PCSD_SESSION, cookie)
107 if sid:
108 self.assertEqual(cookie[PCSD_SESSION], sid)
109 return cookie[PCSD_SESSION]
110
111 def fetch(self, path, raise_error=False, **kwargs):
112 if "sid" in kwargs:
113 if "headers" not in kwargs:
114 kwargs["headers"] = {}
115 kwargs["headers"]["Cookie"] = f"{PCSD_SESSION}={kwargs['sid']}"
116 del kwargs["sid"]
117
118 if "is_ajax" in kwargs:
119 if "headers" not in kwargs:
120 kwargs["headers"] = {}
121 kwargs["headers"]["X-Requested-With"] = "XMLHttpRequest"
122 del kwargs["is_ajax"]
123
124 if "follow_redirects" not in kwargs:
125 kwargs["follow_redirects"] = False
126
127 return super().fetch(path, raise_error=raise_error, **kwargs)
128
129 def create_login_session(self):
130 return self.session_storage.login(USER)
131
132 def assert_success_response(self, response, expected_body):
133 self.assertEqual(response.code, 200)
134 self.assertEqual(response.body.decode(), expected_body)
135
136 def assert_unauth_ajax(self, response):
137 self.assertEqual(response.code, 401)
138 self.assertEqual(response.body, b'{"notauthorized":"true"}')
139