summaryrefslogtreecommitdiff
path: root/test/units/modules/storage/netapp/test_na_ontap_firewall_policy.py
blob: e8d48fe0db5f526db49145038aab9a0564f579ae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# (c) 2018, NetApp, Inc
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

''' unit test template for ONTAP Ansible module '''

from __future__ import print_function
import json
import pytest

from units.compat import unittest
from units.compat.mock import patch, Mock
from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes
import ansible.module_utils.netapp as netapp_utils

from ansible.modules.storage.netapp.na_ontap_firewall_policy \
    import NetAppONTAPFirewallPolicy as fp_module  # module under test

if not netapp_utils.has_netapp_lib():
    pytestmark = pytest.mark.skip('skipping as missing required netapp_lib')


def set_module_args(args):
    """prepare arguments so that they will be picked up during module creation"""
    args = json.dumps({'ANSIBLE_MODULE_ARGS': args})
    basic._ANSIBLE_ARGS = to_bytes(args)  # pylint: disable=protected-access


class AnsibleExitJson(Exception):
    """Exception class to be raised by module.exit_json and caught by the test case"""
    pass


class AnsibleFailJson(Exception):
    """Exception class to be raised by module.fail_json and caught by the test case"""
    pass


def exit_json(*args, **kwargs):  # pylint: disable=unused-argument
    """function to patch over exit_json; package return data into an exception"""
    if 'changed' not in kwargs:
        kwargs['changed'] = False
    raise AnsibleExitJson(kwargs)


def fail_json(*args, **kwargs):  # pylint: disable=unused-argument
    """function to patch over fail_json; package return data into an exception"""
    kwargs['failed'] = True
    raise AnsibleFailJson(kwargs)


class MockONTAPConnection(object):
    ''' mock server connection to ONTAP host '''

    def __init__(self, kind=None, data=None):
        ''' save arguments '''
        self.kind = kind
        self.data = data
        self.xml_in = None
        self.xml_out = None

    def invoke_successfully(self, xml, enable_tunneling):  # pylint: disable=unused-argument
        ''' mock invoke_successfully returning xml data '''
        self.xml_in = xml
        if self.kind == 'policy':
            xml = self.build_policy_info(self.data)
        if self.kind == 'config':
            xml = self.build_firewall_config_info(self.data)
        self.xml_out = xml
        return xml

    @staticmethod
    def build_policy_info(data):
        ''' build xml data for net-firewall-policy-info '''
        xml = netapp_utils.zapi.NaElement('xml')
        attributes = {
            'num-records': 1,
            'attributes-list': {
                'net-firewall-policy-info': {
                    'policy': data['policy'],
                    'service': data['service'],
                    'allow-list': [
                        {'ip-and-mask': '1.2.3.0/24'}
                    ]
                }
            }
        }

        xml.translate_struct(attributes)
        return xml

    @staticmethod
    def build_firewall_config_info(data):
        ''' build xml data for net-firewall-config-info '''
        xml = netapp_utils.zapi.NaElement('xml')
        attributes = {
            'attributes': {
                'net-firewall-config-info': {
                    'is-enabled': 'true',
                    'is-logging': 'false'
                }
            }
        }
        xml.translate_struct(attributes)
        return xml


class TestMyModule(unittest.TestCase):
    ''' a group of related Unit Tests '''

    def setUp(self):
        self.mock_module_helper = patch.multiple(basic.AnsibleModule,
                                                 exit_json=exit_json,
                                                 fail_json=fail_json)
        self.mock_module_helper.start()
        self.addCleanup(self.mock_module_helper.stop)
        self.mock_policy = {
            'policy': 'test',
            'service': 'http',
            'vserver': 'my_vserver',
            'allow_list': '1.2.3.0/24'
        }
        self.mock_config = {
            'node': 'test',
            'enable': 'enable',
            'logging': 'enable'
        }

    def mock_policy_args(self):
        return {
            'policy': self.mock_policy['policy'],
            'service': self.mock_policy['service'],
            'vserver': self.mock_policy['vserver'],
            'allow_list': [self.mock_policy['allow_list']],
            'hostname': 'test',
            'username': 'test_user',
            'password': 'test_pass!'
        }

    def mock_config_args(self):
        return {
            'node': self.mock_config['node'],
            'enable': self.mock_config['enable'],
            'logging': self.mock_config['logging'],
            'hostname': 'test',
            'username': 'test_user',
            'password': 'test_pass!'
        }

    def get_mock_object(self, kind=None):
        """
        Helper method to return an na_ontap_firewall_policy object
        :param kind: passes this param to MockONTAPConnection()
        :return: na_ontap_firewall_policy object
        """
        obj = fp_module()
        obj.autosupport_log = Mock(return_value=None)
        if kind is None:
            obj.server = MockONTAPConnection()
        else:
            mock_data = self.mock_config if kind == 'config' else self.mock_policy
            obj.server = MockONTAPConnection(kind=kind, data=mock_data)
        return obj

    def test_module_fail_when_required_args_missing(self):
        ''' required arguments are reported as errors '''
        with pytest.raises(AnsibleFailJson) as exc:
            set_module_args({})
            fp_module()
        print('Info: %s' % exc.value.args[0]['msg'])

    def test_helper_firewall_policy_attributes(self):
        ''' helper returns dictionary with vserver, service and policy details '''
        data = self.mock_policy
        set_module_args(self.mock_policy_args())
        result = self.get_mock_object('policy').firewall_policy_attributes()
        del data['allow_list']
        assert data == result

    def test_helper_validate_ip_addresses_positive(self):
        ''' test if helper validates if IP is a network address '''
        data = self.mock_policy_args()
        data['allow_list'] = ['1.2.0.0/16', '1.2.3.0/24']
        set_module_args(data)
        result = self.get_mock_object().validate_ip_addresses()
        assert result is None

    def test_helper_validate_ip_addresses_negative(self):
        ''' test if helper validates if IP is a network address '''
        data = self.mock_policy_args()
        data['allow_list'] = ['1.2.0.10/16', '1.2.3.0/24']
        set_module_args(data)
        with pytest.raises(AnsibleFailJson) as exc:
            self.get_mock_object().validate_ip_addresses()
        msg = 'Error: Invalid IP address value for allow_list parameter.' \
              'Please specify a network address without host bits set: ' \
              '1.2.0.10/16 has host bits set'
        assert exc.value.args[0]['msg'] == msg

    def test_get_nonexistent_policy(self):
        ''' Test if get_firewall_policy returns None for non-existent policy '''
        set_module_args(self.mock_policy_args())
        result = self.get_mock_object().get_firewall_policy()
        assert result is None

    def test_get_existing_policy(self):
        ''' Test if get_firewall_policy returns policy details for existing policy '''
        data = self.mock_policy_args()
        set_module_args(data)
        result = self.get_mock_object('policy').get_firewall_policy()
        assert result['service'] == data['service']
        assert result['allow_list'] == ['1.2.3.0/24']  # from build_policy_info()

    def test_successful_create(self):
        ''' Test successful create '''
        set_module_args(self.mock_policy_args())
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object().apply()
        assert exc.value.args[0]['changed']

    def test_create_idempotency(self):
        ''' Test create idempotency '''
        set_module_args(self.mock_policy_args())
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object('policy').apply()
        assert not exc.value.args[0]['changed']

    def test_successful_delete(self):
        ''' Test delete existing job '''
        data = self.mock_policy_args()
        data['state'] = 'absent'
        set_module_args(data)
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object('policy').apply()
        assert exc.value.args[0]['changed']

    def test_delete_idempotency(self):
        ''' Test delete idempotency '''
        data = self.mock_policy_args()
        data['state'] = 'absent'
        set_module_args(data)
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object().apply()
        assert not exc.value.args[0]['changed']

    def test_successful_modify(self):
        ''' Test successful modify allow_list '''
        data = self.mock_policy_args()
        data['allow_list'] = ['1.2.0.0/16']
        set_module_args(data)
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object('policy').apply()
        assert exc.value.args[0]['changed']

    def test_successful_modify_multiple_ips(self):
        ''' Test successful modify allow_list '''
        data = self.mock_policy_args()
        data['allow_list'] = ['1.2.0.0/16', '1.0.0.0/8']
        set_module_args(data)
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object('policy').apply()
        assert exc.value.args[0]['changed']

    def test_get_nonexistent_config(self):
        ''' Test if get_firewall_config returns None for non-existent node '''
        set_module_args(self.mock_config_args())
        result = self.get_mock_object().get_firewall_config_for_node()
        assert result is None

    def test_get_existing_config(self):
        ''' Test if get_firewall_config returns policy details for existing node '''
        data = self.mock_config_args()
        set_module_args(data)
        result = self.get_mock_object('config').get_firewall_config_for_node()
        assert result['enable'] == 'enable'  # from build_config_info()
        assert result['logging'] == 'disable'  # from build_config_info()

    def test_successful_modify_config(self):
        ''' Test successful modify allow_list '''
        data = self.mock_config_args()
        data['enable'] = 'disable'
        data['logging'] = 'enable'
        set_module_args(data)
        with pytest.raises(AnsibleExitJson) as exc:
            self.get_mock_object('config').apply()
        assert exc.value.args[0]['changed']