1    	from io import BytesIO
2    	from logging import Logger
3    	from unittest import (
4    	    TestCase,
5    	    mock,
6    	)
7    	
8    	from pcs.common.file import (
9    	    FileMetadata,
10   	    RawFile,
11   	    RawFileError,
12   	)
13   	from pcs.common.file_type_codes import PCS_USERS_CONF
14   	from pcs.lib.auth import const
15   	from pcs.lib.auth.config.facade import Facade
16   	from pcs.lib.auth.config.parser import ParserError
17   	from pcs.lib.auth.config.types import TokenEntry
18   	from pcs.lib.auth.provider import (
19   	    AuthProvider,
20   	    _UpdateFacadeError,
21   	)
22   	from pcs.lib.auth.types import AuthUser
23   	from pcs.lib.file.instance import FileInstance
24   	from pcs.lib.file.json import JsonParserException
25   	from pcs.lib.interface.config import ParserErrorException
26   	
27   	_FILE_PATH = "file path"
28   	_FILE_METADATA = FileMetadata(
29   	    file_type_code=PCS_USERS_CONF,
30   	    path=_FILE_PATH,
31   	    owner_user_name="root",
32   	    owner_group_name="root",
33   	    permissions=None,
34   	    is_binary=False,
35   	)
36   	
37   	_FACADE = Facade(
38   	    [
39   	        TokenEntry(token="token-user1", username="user1", creation_date="now"),
40   	        TokenEntry(token="token-user2", username="user2", creation_date="now"),
41   	    ]
42   	)
43   	
44   	
45   	class AuthProviderGetFacadeTest(TestCase):
46   	    # pylint: disable=protected-access
47   	    def setUp(self):
48   	        self.file_instance_mock = mock.Mock(spec_set=FileInstance)
49   	        self.file_instance_mock.raw_file.metadata = _FILE_METADATA
50   	        self.logger = mock.Mock(spec_set=Logger)
51   	        with mock.patch.object(
52   	            FileInstance,
53   	            "for_pcs_users_config",
54   	            lambda *_args, **_kwargs: self.file_instance_mock,
55   	        ):
56   	            self.provider = AuthProvider(self.logger)
57   	
58   	    def test_io_error(self):
59   	        reason = "reason"
60   	        self.file_instance_mock.read_to_facade.side_effect = RawFileError(
61   	            _FILE_METADATA, RawFileError.ACTION_READ, reason
62   	        )
63   	        self.assertEqual(tuple(), self.provider._get_facade().config)
64   	        self.logger.error.assert_called_once_with(
65   	            "Unable to read file '%s': %s", _FILE_PATH, reason
66   	        )
67   	
68   	    def test_json_parser_error(self):
69   	        self.file_instance_mock.read_to_facade.side_effect = (
70   	            JsonParserException(None)
71   	        )
72   	        self.assertEqual(tuple(), self.provider._get_facade().config)
73   	        self.logger.error.assert_called_once_with(
74   	            "Unable to parse file '%s': not valid json", _FILE_PATH
75   	        )
76   	
77   	    def test_invalid_format(self):
78   	        reason = "reason"
79   	        self.file_instance_mock.read_to_facade.side_effect = ParserError(reason)
80   	        self.assertEqual(tuple(), self.provider._get_facade().config)
81   	        self.logger.error.assert_called_once_with(
82   	            "Unable to parse file '%s': %s", _FILE_PATH, reason
83   	        )
84   	
85   	    def test_other_parsing_error(self):
86   	        self.file_instance_mock.read_to_facade.side_effect = (
87   	            ParserErrorException()
88   	        )
89   	        self.assertEqual(tuple(), self.provider._get_facade().config)
90   	        self.logger.error.assert_called_once_with(
91   	            "Unable to parse file '%s'", _FILE_PATH
92   	        )
93   	
94   	    def test_success(self):
95   	        facade = Facade([f"token{i}" for i in range(3)])
96   	        self.file_instance_mock.read_to_facade.return_value = facade
97   	        self.assertEqual(facade, self.provider._get_facade())
98   	        self.logger.error.assert_not_called()
99   	
100  	
101  	class AuthProviderUpdateFacadeTest(TestCase):
102  	    # pylint: disable=protected-access
103  	    def setUp(self):
104  	        self.file_instance_mock = mock.Mock(spec_set=FileInstance)
105  	        self.raw_file_mock = mock.MagicMock(spec_set=RawFile)
106  	        self.file_instance_mock.raw_file = self.raw_file_mock
107  	        self.file_instance_mock.raw_file.metadata = _FILE_METADATA
108  	        self.logger = mock.Mock(spec_set=Logger)
109  	        self.facade = Facade([f"token{i}" for i in range(3)])
110  	        with mock.patch.object(
111  	            FileInstance,
112  	            "for_pcs_users_config",
113  	            lambda *_args, **_kwargs: self.file_instance_mock,
114  	        ):
115  	            self.provider = AuthProvider(self.logger)
116  	
117  	    def test_empty_file(self):
118  	        data = b""
119  	        new_data = b"new data"
120  	        io_buffer = BytesIO(data)
121  	        self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
122  	        self.file_instance_mock.facade_to_raw.return_value = new_data
123  	        with self.provider._update_facade() as empty_facade:
124  	            self.assertEqual(tuple(), empty_facade.config)
125  	        self.logger.error.assert_not_called()
126  	        self.file_instance_mock.raw_to_facade.assert_not_called()
127  	        self.file_instance_mock.facade_to_raw.assert_called_once_with(
128  	            empty_facade
129  	        )
130  	        self.assertEqual(io_buffer.getvalue(), new_data)
131  	
132  	    def test_read_error(self):
133  	        reason = "reason"
134  	        self.file_instance_mock.raw_file.update.return_value.__enter__.side_effect = RawFileError(
135  	            _FILE_METADATA, RawFileError.ACTION_UPDATE, reason
136  	        )
137  	        with (
138  	            self.assertRaises(_UpdateFacadeError),
139  	            self.provider._update_facade(),
140  	        ):
141  	            self.fail("should not get here")
142  	        self.logger.error.assert_called_once_with(
143  	            "Unable to update file '%s': %s", _FILE_PATH, reason
144  	        )
145  	
146  	    def test_write_error(self):
147  	        data = b"original data"
148  	        new_data = b"new data"
149  	        mock_facade = "facade"
150  	        reason = "reason"
151  	        self.file_instance_mock.raw_file.update.return_value.__exit__.side_effect = RawFileError(
152  	            _FILE_METADATA, RawFileError.ACTION_UPDATE, reason
153  	        )
154  	        io_buffer = BytesIO(data)
155  	        self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
156  	        self.file_instance_mock.raw_to_facade.return_value = mock_facade
157  	        self.file_instance_mock.facade_to_raw.return_value = new_data
158  	        with (
159  	            self.assertRaises(_UpdateFacadeError),
160  	            self.provider._update_facade() as facade,
161  	        ):
162  	            self.assertIs(mock_facade, facade)
163  	        self.logger.error.assert_called_once_with(
164  	            "Unable to update file '%s': %s", _FILE_PATH, reason
165  	        )
166  	        self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
167  	        self.file_instance_mock.facade_to_raw.assert_called_once_with(
168  	            mock_facade
169  	        )
170  	
171  	    def test_parsing_error(self):
172  	        data = b"original data"
173  	        new_data = b"new data"
174  	        io_buffer = BytesIO(data)
175  	        self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
176  	        self.file_instance_mock.raw_to_facade.side_effect = (
177  	            ParserErrorException()
178  	        )
179  	        self.file_instance_mock.facade_to_raw.return_value = new_data
180  	        with self.provider._update_facade() as empty_facade:
181  	            self.assertEqual(tuple(), empty_facade.config)
182  	        self.assertEqual(io_buffer.getvalue(), new_data)
183  	        self.logger.error.assert_called_once_with(
184  	            "Unable to parse file '%s'", _FILE_PATH
185  	        )
186  	        self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
187  	        self.file_instance_mock.facade_to_raw.assert_called_once_with(
188  	            empty_facade
189  	        )
190  	
191  	    def test_success(self):
192  	        data = b"original data"
193  	        new_data = b"new data"
194  	        mock_facade = "facade"
195  	        io_buffer = BytesIO(data)
196  	        self.file_instance_mock.raw_file.update.return_value.__enter__.return_value = io_buffer
197  	        self.file_instance_mock.raw_to_facade.return_value = mock_facade
198  	        self.file_instance_mock.facade_to_raw.return_value = new_data
199  	        with self.provider._update_facade() as facade:
200  	            self.assertIs(facade, mock_facade)
201  	        self.assertEqual(io_buffer.getvalue(), new_data)
202  	        self.logger.error.assert_not_called()
203  	        self.file_instance_mock.raw_to_facade.assert_called_once_with(data)
204  	        self.file_instance_mock.facade_to_raw.assert_called_once_with(
205  	            mock_facade
206  	        )
207  	
208  	
209  	@mock.patch.object(AuthProvider, "_get_facade", lambda _self: _FACADE)
210  	@mock.patch("pcs.lib.auth.provider.get_user_groups")
211  	class AuthProviderLoginByTokenTest(TestCase):
212  	    def setUp(self):
213  	        self.logger = mock.Mock(spec_set=Logger)
214  	        self.provider = AuthProvider(self.logger)
215  	
216  	    def test_non_existing_token(self, groups_mock):
217  	        self.assertIsNone(self.provider.auth_by_token("non existing token"))
218  	        groups_mock.assert_not_called()
219  	
220  	    def test_not_in_admin_group(self, groups_mock):
221  	        groups_mock.return_value = ["group1", "group0"]
222  	        self.assertIsNone(self.provider.auth_by_token("token-user2"))
223  	        groups_mock.assert_called_once_with("user2")
224  	
225  	    def test_success(self, groups_mock):
226  	        groups = ["group1", const.ADMIN_GROUP, "group0"]
227  	        groups_mock.return_value = groups
228  	        self.assertEqual(
229  	            AuthUser(username="user1", groups=tuple(groups)),
230  	            self.provider.auth_by_token("token-user1"),
231  	        )
232  	        groups_mock.assert_called_once_with("user1")
233  	
234  	
235  	@mock.patch("pcs.lib.auth.provider.authenticate_user")
236  	@mock.patch("pcs.lib.auth.provider.get_user_groups")
237  	class AuthProviderLoginByUsernamePasswordTest(TestCase):
238  	    def setUp(self):
239  	        self.logger = mock.Mock(spec_set=Logger)
240  	        self.provider = AuthProvider(self.logger)
241  	        self.username = "user name"
(1) Event Sigma main event: A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic).
(2) Event remediation: Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store.
242  	        self.password = "psswd"
243  	
244  	    def test_invalid_credentials(self, groups_mock, pam_mock):
245  	        pam_mock.return_value = False
246  	        self.assertIsNone(
247  	            self.provider.auth_by_username_password(
248  	                self.username, self.password
249  	            )
250  	        )
251  	        pam_mock.assert_called_once_with(self.username, self.password)
252  	        groups_mock.assert_not_called()
253  	
254  	    def test_not_in_admin_group(self, groups_mock, pam_mock):
255  	        pam_mock.return_value = True
256  	        groups_mock.return_value = ["group1", "group0"]
257  	        self.assertIsNone(
258  	            self.provider.auth_by_username_password(
259  	                self.username, self.password
260  	            )
261  	        )
262  	        pam_mock.assert_called_once_with(self.username, self.password)
263  	        groups_mock.assert_called_once_with(self.username)
264  	
265  	    def test_success(self, groups_mock, pam_mock):
266  	        pam_mock.return_value = True
267  	        groups = ["group1", const.ADMIN_GROUP, "group0"]
268  	        groups_mock.return_value = groups
269  	        self.assertEqual(
270  	            AuthUser(username=self.username, groups=tuple(groups)),
271  	            self.provider.auth_by_username_password(
272  	                self.username, self.password
273  	            ),
274  	        )
275  	        pam_mock.assert_called_once_with(self.username, self.password)
276  	        groups_mock.assert_called_once_with(self.username)
277  	
278  	
279  	@mock.patch.object(AuthProvider, "_update_facade")
280  	class AuthProviderCreateTokenTest(TestCase):
281  	    def setUp(self):
282  	        self.logger = mock.Mock(spec_set=Logger)
283  	        self.provider = AuthProvider(self.logger)
284  	
285  	    def test_failure(self, update_facade_mock):
286  	        token = "new_token"
287  	        username = "new_user"
288  	        facade_mock = mock.Mock(spec_set=Facade)
289  	        facade_mock.add_user.return_value = token
290  	        update_facade_mock.return_value.__enter__.return_value = facade_mock
291  	        update_facade_mock.return_value.__exit__.side_effect = (
292  	            _UpdateFacadeError()
293  	        )
294  	        self.assertIsNone(self.provider.create_token(username))
295  	        facade_mock.add_user.assert_called_once_with(username)
296  	
297  	    def test_success(self, update_facade_mock):
298  	        token = "new_token"
299  	        username = "new_user"
300  	        facade_mock = mock.Mock(spec_set=Facade)
301  	        facade_mock.add_user.return_value = token
302  	        update_facade_mock.return_value.__enter__.return_value = facade_mock
303  	        self.assertEqual(token, self.provider.create_token(username))
304  	        facade_mock.add_user.assert_called_once_with(username)
305