1 from io import BytesIO
2 from logging import Logger
3 from unittest import (
4 TestCase,
5 mock,
6 )
7
8 from pcs.common.file import (
9 FileMetadata,
10 RawFile,
11 RawFileError,
12 )
13 from pcs.common.file_type_codes import PCS_USERS_CONF
14 from pcs.lib.auth import const
15 from pcs.lib.auth.config.facade import Facade
16 from pcs.lib.auth.config.parser import ParserError
17 from pcs.lib.auth.config.types import TokenEntry
18 from pcs.lib.auth.provider import (
19 AuthProvider,
20 _UpdateFacadeError,
21 )
22 from pcs.lib.auth.types import AuthUser
23 from pcs.lib.file.instance import FileInstance
24 from pcs.lib.file.json import JsonParserException
25 from pcs.lib.interface.config import ParserErrorException
26
27 _FILE_PATH = "file path"
28 _FILE_METADATA = FileMetadata(
29 file_type_code=PCS_USERS_CONF,
30 path=_FILE_PATH,
31 owner_user_name="root",
32 owner_group_name="root",
33 permissions=None,
34 is_binary=False,
35 )
36
37 _FACADE = Facade(
38 [
39 TokenEntry(token="token-user1", username="user1", creation_date="now"),
40 TokenEntry(token="token-user2", username="user2", creation_date="now"),
41 ]
42 )
43
44
45 class AuthProviderGetFacadeTest(TestCase):
46 # pylint: disable=protected-access
47 def setUp(self):
48 self.file_instance_mock = mock.Mock(spec_set=FileInstance)
49 self.file_instance_mock.raw_file.metadata = _FILE_METADATA
50 self.logger = mock.Mock(spec_set=Logger)
51 with mock.patch.object(
52 FileInstance,
53 "for_pcs_users_config",
54 lambda *_args, **_kwargs: self.file_instance_mock,
55 ):
56 self.provider = AuthProvider(self.logger)
57
58 def test_io_error(self):
59 reason = "reason"
60 self.file_instance_mock.read_to_facade.side_effect = RawFileError(
61 _FILE_METADATA, RawFileError.ACTION_READ, reason
62 )
63 self.assertEqual(tuple(), self.provider._get_facade().config)
64 self.logger.error.assert_called_once_with(
65 "Unable to read file '%s': %s", _FILE_PATH, reason
66 )
67
68 def test_json_parser_error(self):
69 self.file_instance_mock.read_to_facade.side_effect = (
70 JsonParserException(None)
71 )
72 self.assertEqual(tuple(), self.provider._get_facade().config)
73 self.logger.error.assert_called_once_with(
74 "Unable to parse file '%s': not valid json", _FILE_PATH
75 )
76
77 def test_invalid_format(self):
78 reason = "reason"
79 self.file_instance_mock.read_to_facade.side_effect = ParserError(reason)
80 self.assertEqual(tuple(), self.provider._get_facade().config)
81 self.logger.error.assert_called_once_with(
82 "Unable to parse file '%s': %s", _FILE_PATH, reason
83 )
84
85 def test_other_parsing_error(self):
86 self.file_instance_mock.read_to_facade.side_effect = (
87 ParserErrorException()
88 )
89 self.assertEqual(tuple(), self.provider._get_facade().config)
90 self.logger.error.assert_called_once_with(
91 "Unable to parse file '%s'", _FILE_PATH
92 )
93
94 def test_success(self):
95 facade = Facade([f"token{i}" for i in range(3)])
96 self.file_instance_mock.read_to_facade.return_value = facade
97 self.assertEqual(facade, self.provider._get_facade())
98 self.logger.error.assert_not_called()
99
100
101 class AuthProviderUpdateFacadeTest(TestCase):
102 # pylint: disable=protected-access
103 def setUp(self):
104 self.file_instance_mock = mock.Mock(spec_set=FileInstance)
105 self.raw_file_mock = mock.MagicMock(spec_set=RawFile)
106 self.file_instance_mock.raw_file = self.raw_file_mock
107 self.file_instance_mock.raw_file.metadata = _FILE_METADATA
108 self.logger = mock.Mock(spec_set=Logger)
109 self.facade = Facade([f"token{i}" for i in range(3)])
110 with mock.patch.object(
111 FileInstance,
112 "for_pcs_users_config",
113 lambda *_args, **_kwargs: self.file_instance_mock,
114 ):
115 self.provider = AuthProvider(self.logger)
116
117 def test_empty_file(self):
118 data = b""
119 new_data = b"new data"
120 io_buffer = BytesIO(data)
121 self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
122 self.file_instance_mock.facade_to_raw.return_value = new_data
123 with self.provider._update_facade() as empty_facade:
124 self.assertEqual(tuple(), empty_facade.config)
125 self.logger.error.assert_not_called()
126 self.file_instance_mock.raw_to_facade.assert_not_called()
127 self.file_instance_mock.facade_to_raw.assert_called_once_with(
128 empty_facade
129 )
130 self.assertEqual(io_buffer.getvalue(), new_data)
131
132 def test_read_error(self):
133 reason = "reason"
134 self.file_instance_mock.raw_file.update.return_value.__enter__.side_effect = RawFileError(
135 _FILE_METADATA, RawFileError.ACTION_UPDATE, reason
136 )
137 with (
138 self.assertRaises(_UpdateFacadeError),
139 self.provider._update_facade(),
140 ):
141 self.fail("should not get here")
142 self.logger.error.assert_called_once_with(
143 "Unable to update file '%s': %s", _FILE_PATH, reason
144 )
145
146 def test_write_error(self):
147 data = b"original data"
148 new_data = b"new data"
149 mock_facade = "facade"
150 reason = "reason"
151 self.file_instance_mock.raw_file.update.return_value.__exit__.side_effect = RawFileError(
152 _FILE_METADATA, RawFileError.ACTION_UPDATE, reason
153 )
154 io_buffer = BytesIO(data)
155 self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
156 self.file_instance_mock.raw_to_facade.return_value = mock_facade
157 self.file_instance_mock.facade_to_raw.return_value = new_data
158 with (
159 self.assertRaises(_UpdateFacadeError),
160 self.provider._update_facade() as facade,
161 ):
162 self.assertIs(mock_facade, facade)
163 self.logger.error.assert_called_once_with(
164 "Unable to update file '%s': %s", _FILE_PATH, reason
165 )
166 self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
167 self.file_instance_mock.facade_to_raw.assert_called_once_with(
168 mock_facade
169 )
170
171 def test_parsing_error(self):
172 data = b"original data"
173 new_data = b"new data"
174 io_buffer = BytesIO(data)
175 self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
176 self.file_instance_mock.raw_to_facade.side_effect = (
177 ParserErrorException()
178 )
179 self.file_instance_mock.facade_to_raw.return_value = new_data
180 with self.provider._update_facade() as empty_facade:
181 self.assertEqual(tuple(), empty_facade.config)
182 self.assertEqual(io_buffer.getvalue(), new_data)
183 self.logger.error.assert_called_once_with(
184 "Unable to parse file '%s'", _FILE_PATH
185 )
186 self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
187 self.file_instance_mock.facade_to_raw.assert_called_once_with(
188 empty_facade
189 )
190
191 def test_success(self):
192 data = b"original data"
193 new_data = b"new data"
194 mock_facade = "facade"
195 io_buffer = BytesIO(data)
196 self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
197 self.file_instance_mock.raw_to_facade.return_value = mock_facade
198 self.file_instance_mock.facade_to_raw.return_value = new_data
199 with self.provider._update_facade() as facade:
200 self.assertIs(facade, mock_facade)
201 self.assertEqual(io_buffer.getvalue(), new_data)
202 self.logger.error.assert_not_called()
203 self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
204 self.file_instance_mock.facade_to_raw.assert_called_once_with(
205 mock_facade
206 )
207
208
209 @mock.patch.object(AuthProvider, "_get_facade", lambda _self: _FACADE)
210 @mock.patch("pcs.lib.auth.provider.get_user_groups")
211 class AuthProviderLoginByTokenTest(TestCase):
212 def setUp(self):
213 self.logger = mock.Mock(spec_set=Logger)
214 self.provider = AuthProvider(self.logger)
215
216 def test_non_existing_token(self, groups_mock):
217 self.assertIsNone(self.provider.auth_by_token("non existing token"))
218 groups_mock.assert_not_called()
219
220 def test_not_in_admin_group(self, groups_mock):
221 groups_mock.return_value = ["group1", "group0"]
222 self.assertIsNone(self.provider.auth_by_token("token-user2"))
223 groups_mock.assert_called_once_with("user2")
224
225 def test_success(self, groups_mock):
226 groups = ["group1", const.ADMIN_GROUP, "group0"]
227 groups_mock.return_value = groups
228 self.assertEqual(
229 AuthUser(username="user1", groups=tuple(groups)),
230 self.provider.auth_by_token("token-user1"),
231 )
232 groups_mock.assert_called_once_with("user1")
233
234
235 @mock.patch("pcs.lib.auth.provider.authenticate_user")
236 @mock.patch("pcs.lib.auth.provider.get_user_groups")
237 class AuthProviderLoginByUsernamePasswordTest(TestCase):
238 def setUp(self):
239 self.logger = mock.Mock(spec_set=Logger)
240 self.provider = AuthProvider(self.logger)
241 self.username = "user name"
|
(1) Event Sigma main event: |
A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic). |
|
(2) Event remediation: |
Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store. |
242 self.password = "psswd"
243
244 def test_invalid_credentials(self, groups_mock, pam_mock):
245 pam_mock.return_value = False
246 self.assertIsNone(
247 self.provider.auth_by_username_password(
248 self.username, self.password
249 )
250 )
251 pam_mock.assert_called_once_with(self.username, self.password)
252 groups_mock.assert_not_called()
253
254 def test_not_in_admin_group(self, groups_mock, pam_mock):
255 pam_mock.return_value = True
256 groups_mock.return_value = ["group1", "group0"]
257 self.assertIsNone(
258 self.provider.auth_by_username_password(
259 self.username, self.password
260 )
261 )
262 pam_mock.assert_called_once_with(self.username, self.password)
263 groups_mock.assert_called_once_with(self.username)
264
265 def test_success(self, groups_mock, pam_mock):
266 pam_mock.return_value = True
267 groups = ["group1", const.ADMIN_GROUP, "group0"]
268 groups_mock.return_value = groups
269 self.assertEqual(
270 AuthUser(username=self.username, groups=tuple(groups)),
271 self.provider.auth_by_username_password(
272 self.username, self.password
273 ),
274 )
275 pam_mock.assert_called_once_with(self.username, self.password)
276 groups_mock.assert_called_once_with(self.username)
277
278
279 @mock.patch.object(AuthProvider, "_update_facade")
280 class AuthProviderCreateTokenTest(TestCase):
281 def setUp(self):
282 self.logger = mock.Mock(spec_set=Logger)
283 self.provider = AuthProvider(self.logger)
284
285 def test_failure(self, update_facade_mock):
286 token = "new_token"
287 username = "new_user"
288 facade_mock = mock.Mock(spec_set=Facade)
289 facade_mock.add_user.return_value = token
290 update_facade_mock.return_value.__enter__.return_value = facade_mock
291 update_facade_mock.return_value.__exit__.side_effect = (
292 _UpdateFacadeError()
293 )
294 self.assertIsNone(self.provider.create_token(username))
295 facade_mock.add_user.assert_called_once_with(username)
296
297 def test_success(self, update_facade_mock):
298 token = "new_token"
299 username = "new_user"
300 facade_mock = mock.Mock(spec_set=Facade)
301 facade_mock.add_user.return_value = token
302 update_facade_mock.return_value.__enter__.return_value = facade_mock
303 self.assertEqual(token, self.provider.create_token(username))
304 facade_mock.add_user.assert_called_once_with(username)
305