1 from pprint import pformat
2 from urllib.parse import urlencode
3
4 from tornado.httputil import HTTPHeaders
5 from tornado.testing import AsyncHTTPTestCase
6 from tornado.web import Application
7
8 from pcs.daemon import ruby_pcsd
9
10 USER = "user"
11 GROUPS = ["group1", "group2"]
|
(1) Event Sigma main event: |
A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic). |
|
(2) Event remediation: |
Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store. |
12 PASSWORD = "password"
13
14
15 class RubyPcsdWrapper(ruby_pcsd.Wrapper):
16 def __init__(self, request_type):
17 # pylint: disable=super-init-not-called
18 self.request_type = request_type
19 self.status_code = 200
20 self.headers = {"Some": "value"}
21 self.body = b"Success action"
22
23 self._run_ruby_called = False
24 self._run_ruby_payload = None
25
26 async def run_ruby(
27 self,
28 request_type,
29 http_request=None,
30 payload=None,
31 ):
32 del http_request
33 self._run_ruby_called = True
34 self._run_ruby_payload = payload
35 if request_type != self.request_type:
36 raise AssertionError(
37 f"Wrong request type: expected '{self.request_type}'"
38 f" but was {request_type}"
39 )
40 return {
41 "headers": self.headers,
42 "status": self.status_code,
43 "body": self.body,
44 }
45
46 @property
47 def was_run_ruby_called(self) -> bool:
48 return self._run_ruby_called
49
50 @property
51 def run_ruby_payload(self):
52 return self._run_ruby_payload
53
54
55 class AppTest(AsyncHTTPTestCase):
56 wrapper = None
57
58 def get_app(self):
59 return Application(self.get_routes())
60
61 def get_routes(self):
62 # pylint: disable=no-self-use
63 return []
64
65 def fetch(self, path, raise_error=False, **kwargs):
66 if "follow_redirects" not in kwargs:
67 kwargs["follow_redirects"] = False
68
69 if "is_ajax" in kwargs:
70 if "headers" not in kwargs:
71 kwargs["headers"] = {}
72 kwargs["headers"]["X-Requested-With"] = "XMLHttpRequest"
73 del kwargs["is_ajax"]
74
75 response = super().fetch(path, raise_error=raise_error, **kwargs)
76 # "Strict-Transport-Security" header is expected in every response
77 self.assertTrue(
78 "Strict-Transport-Security" in response.headers,
79 f"No 'Strict-Transport-Security' header in response for '{path}'",
80 )
81 return response
82
83 def post(self, path, body, **kwargs):
84 kwargs.update(
85 {
86 "method": "POST",
87 "body": urlencode(body),
88 }
89 )
90 return self.fetch(path, **kwargs)
91
92 def get(self, path, **kwargs):
93 return self.fetch(path, **kwargs)
94
95 def assert_headers_contains(self, headers: HTTPHeaders, contained: dict):
96 self.assertTrue(
97 all(item in headers.get_all() for item in contained.items()),
98 "Headers does not contain expected headers"
99 "\n Expected headers:"
100 f"\n {pformat(contained, indent=6)}"
101 "\n All headers:"
102 f"\n {pformat(dict(headers.get_all()), indent=6)}",
103 )
104
105 def assert_wrappers_response(self, response):
106 self.assertEqual(response.code, self.wrapper.status_code)
107 self.assert_headers_contains(response.headers, self.wrapper.headers)
108 self.assertEqual(response.body, self.wrapper.body)
109
110 def assert_unauth_ajax(self, response):
111 self.assertEqual(response.code, 401)
112 self.assertEqual(response.body, b'{"notauthorized":"true"}')
113