1    	from unittest import (
2    	    TestCase,
3    	    mock,
4    	)
5    	
6    	from pcs import host
7    	from pcs.cli.common.errors import CmdLineInputError
8    	
9    	from pcs_test.tools.misc import dict_to_modifiers
10   	
11   	
12   	class HostAuth(TestCase):
13   	    host_names = {
14   	        "node-01": {
15   	            "hostname": "node-01.example.com",
16   	            "ipv4": "192.168.122.11",
17   	            "ipv6": "[2620:52:0:25a4:1800:ff:fe00:1]",
18   	        },
19   	        "node-02": {
20   	            "hostname": "node-02.example.com",
21   	            "ipv4": "192.168.122.12",
22   	            "ipv6": "[2620:52:0:25a4:1800:ff:fe00:2]",
23   	        },
24   	    }
25   	
26   	    def setUp(self):
27   	        self.lib = mock.Mock()
28   	        self.patch_get_user_and_pass = mock.patch(
29   	            "pcs.utils.get_user_and_pass",
30   	            return_value=("hacluster", "password"),
31   	        )
32   	        self.patch_auth_hosts = mock.patch("pcs.utils.auth_hosts")
33   	
34   	    @staticmethod
35   	    def _fixture_args(name_addr_port_tuple_list):
36   	        arg_list = []
37   	        for name, addr, port in name_addr_port_tuple_list:
38   	            port_str = ":{}".format(port) if port is not None else ""
39   	            arg_list.extend([name, f"addr={addr}{port_str}"])
40   	        return arg_list
41   	
42   	    def _assert_invalid_port(self, name_addr_port_tuple_list):
43   	        arg_list = self._fixture_args(name_addr_port_tuple_list)
44   	        mock_auth_hosts = self.patch_auth_hosts.start()
45   	        with self.assertRaises(CmdLineInputError) as cm:
46   	            host.auth_cmd(self.lib, arg_list, dict_to_modifiers({}))
47   	        _, addr, port = name_addr_port_tuple_list[-1]
48   	        self.assertEqual(
49   	            (
50   	                "Invalid port number in address '{addr}:{port}', use 1..65535"
51   	            ).format(addr=addr, port=port),
52   	            cm.exception.message,
53   	        )
54   	        mock_auth_hosts.assert_not_called()
55   	        self.patch_auth_hosts.stop()
56   	
57   	    def _assert_valid_port(self, name_addr_port_tuple_list):
58   	        arg_list = self._fixture_args(name_addr_port_tuple_list)
59   	        mock_get_user_and_pass = self.patch_get_user_and_pass.start()
60   	        mock_auth_hosts = self.patch_auth_hosts.start()
61   	        host.auth_cmd(self.lib, arg_list, dict_to_modifiers({}))
62   	        mock_get_user_and_pass.assert_called_once_with()
63   	        mock_auth_hosts.assert_called_once_with(
64   	            {
65   	                name: {
66   	                    "dest_list": [
67   	                        dict(
68   	                            addr=(
69   	                                addr
70   	                                if addr.count(":") <= 1
71   	                                else addr.strip("[]")
72   	                            ),
73   	                            port=port,
74   	                        )
75   	                    ],
76   	                    "username": "hacluster",
(1) Event Sigma main event: A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic).
(2) Event remediation: Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store.
77   	                    "password": "password",
78   	                }
79   	                for name, addr, port in name_addr_port_tuple_list
80   	            }
81   	        )
82   	        self.patch_get_user_and_pass.stop()
83   	        self.patch_auth_hosts.stop()
84   	
85   	    def run_port_subtests(self, assert_function, port_list):
86   	        for addr_type in ["hostname", "ipv4", "ipv6"]:
87   	            name_list = sorted(self.host_names.keys())[0 : len(port_list)]
88   	            addr_list = [self.host_names[name][addr_type] for name in name_list]
89   	
90   	            with self.subTest(addr_type=addr_type):
91   	                assert_function(list(zip(name_list, addr_list, port_list)))
92   	
93   	    @mock.patch("pcs.utils.auth_hosts")
94   	    def test_no_args(self, mock_auth_hosts):
95   	        with self.assertRaises(CmdLineInputError) as cm:
96   	            host.auth_cmd(self.lib, [], dict_to_modifiers({}))
97   	        self.assertEqual("No host specified", cm.exception.message)
98   	        mock_auth_hosts.assert_not_called()
99   	
100  	    def test_invalid_port_notanumber(self):
101  	        self.run_port_subtests(self._assert_invalid_port, ["notanumber"])
102  	
103  	    def test_invalid_port_lower_bound(self):
104  	        self.run_port_subtests(self._assert_invalid_port, [0])
105  	
106  	    def test_invalid_port_higher_bound(self):
107  	        self.run_port_subtests(self._assert_invalid_port, [65536])
108  	
109  	    def test_invalid_port_notanumber_multinode(self):
110  	        self.run_port_subtests(
111  	            self._assert_invalid_port,
112  	            [1, "notanumber"],
113  	        )
114  	
115  	    def test_invalid_port_lower_bound_multinode(self):
116  	        self.run_port_subtests(self._assert_invalid_port, [3000, 0])
117  	
118  	    def test_invalid_port_higher_bound_hostname_multinode(self):
119  	        self.run_port_subtests(self._assert_invalid_port, [65535, 65536])
120  	
121  	    def test_valid_port(self):
122  	        self.run_port_subtests(self._assert_valid_port, [3000])
123  	
124  	    def test_valid_port_multinode(self):
125  	        self.run_port_subtests(self._assert_valid_port, [3000, 5000])
126