1 from unittest import (
2 TestCase,
3 mock,
4 )
5
6 from pcs import host
7 from pcs.cli.common.errors import CmdLineInputError
8
9 from pcs_test.tools.misc import dict_to_modifiers
10
11
12 class HostAuth(TestCase):
13 host_names = {
14 "node-01": {
15 "hostname": "node-01.example.com",
16 "ipv4": "192.168.122.11",
17 "ipv6": "[2620:52:0:25a4:1800:ff:fe00:1]",
18 },
19 "node-02": {
20 "hostname": "node-02.example.com",
21 "ipv4": "192.168.122.12",
22 "ipv6": "[2620:52:0:25a4:1800:ff:fe00:2]",
23 },
24 }
25
26 def setUp(self):
27 self.lib = mock.Mock()
28 self.patch_get_user_and_pass = mock.patch(
29 "pcs.utils.get_user_and_pass",
30 return_value=("hacluster", "password"),
31 )
32 self.patch_auth_hosts = mock.patch("pcs.utils.auth_hosts")
33
34 @staticmethod
35 def _fixture_args(name_addr_port_tuple_list):
36 arg_list = []
37 for name, addr, port in name_addr_port_tuple_list:
38 port_str = ":{}".format(port) if port is not None else ""
39 arg_list.extend([name, f"addr={addr}{port_str}"])
40 return arg_list
41
42 def _assert_invalid_port(self, name_addr_port_tuple_list):
43 arg_list = self._fixture_args(name_addr_port_tuple_list)
44 mock_auth_hosts = self.patch_auth_hosts.start()
45 with self.assertRaises(CmdLineInputError) as cm:
46 host.auth_cmd(self.lib, arg_list, dict_to_modifiers({}))
47 _, addr, port = name_addr_port_tuple_list[-1]
48 self.assertEqual(
49 (
50 "Invalid port number in address '{addr}:{port}', use 1..65535"
51 ).format(addr=addr, port=port),
52 cm.exception.message,
53 )
54 mock_auth_hosts.assert_not_called()
55 self.patch_auth_hosts.stop()
56
57 def _assert_valid_port(self, name_addr_port_tuple_list):
58 arg_list = self._fixture_args(name_addr_port_tuple_list)
59 mock_get_user_and_pass = self.patch_get_user_and_pass.start()
60 mock_auth_hosts = self.patch_auth_hosts.start()
61 host.auth_cmd(self.lib, arg_list, dict_to_modifiers({}))
62 mock_get_user_and_pass.assert_called_once_with()
63 mock_auth_hosts.assert_called_once_with(
64 {
65 name: {
66 "dest_list": [
67 dict(
68 addr=(
69 addr
70 if addr.count(":") <= 1
71 else addr.strip("[]")
72 ),
73 port=port,
74 )
75 ],
76 "username": "hacluster",
|
(1) Event Sigma main event: |
A secret, such as a password, cryptographic key, or token is stored in plaintext directly in the source code, in an application's properties, or configuration file. Users with access to the secret may then use the secret to access resources that they otherwise would not have access to. Secret type: Password (generic). |
|
(2) Event remediation: |
Avoid setting sensitive configuration values as string literals. Instead, these values should be set using variables with the sensitive data loaded from an encrypted file or a secret store. |
77 "password": "password",
78 }
79 for name, addr, port in name_addr_port_tuple_list
80 }
81 )
82 self.patch_get_user_and_pass.stop()
83 self.patch_auth_hosts.stop()
84
85 def run_port_subtests(self, assert_function, port_list):
86 for addr_type in ["hostname", "ipv4", "ipv6"]:
87 name_list = sorted(self.host_names.keys())[0 : len(port_list)]
88 addr_list = [self.host_names[name][addr_type] for name in name_list]
89
90 with self.subTest(addr_type=addr_type):
91 assert_function(list(zip(name_list, addr_list, port_list)))
92
93 @mock.patch("pcs.utils.auth_hosts")
94 def test_no_args(self, mock_auth_hosts):
95 with self.assertRaises(CmdLineInputError) as cm:
96 host.auth_cmd(self.lib, [], dict_to_modifiers({}))
97 self.assertEqual("No host specified", cm.exception.message)
98 mock_auth_hosts.assert_not_called()
99
100 def test_invalid_port_notanumber(self):
101 self.run_port_subtests(self._assert_invalid_port, ["notanumber"])
102
103 def test_invalid_port_lower_bound(self):
104 self.run_port_subtests(self._assert_invalid_port, [0])
105
106 def test_invalid_port_higher_bound(self):
107 self.run_port_subtests(self._assert_invalid_port, [65536])
108
109 def test_invalid_port_notanumber_multinode(self):
110 self.run_port_subtests(
111 self._assert_invalid_port,
112 [1, "notanumber"],
113 )
114
115 def test_invalid_port_lower_bound_multinode(self):
116 self.run_port_subtests(self._assert_invalid_port, [3000, 0])
117
118 def test_invalid_port_higher_bound_hostname_multinode(self):
119 self.run_port_subtests(self._assert_invalid_port, [65535, 65536])
120
121 def test_valid_port(self):
122 self.run_port_subtests(self._assert_valid_port, [3000])
123
124 def test_valid_port_multinode(self):
125 self.run_port_subtests(self._assert_valid_port, [3000, 5000])
126