diff options
Diffstat (limited to 'test/units')
15 files changed, 0 insertions, 3166 deletions
diff --git a/test/units/modules/cloud/amazon/test_aws_acm.py b/test/units/modules/cloud/amazon/test_aws_acm.py deleted file mode 100644 index d2fd87b8de..0000000000 --- a/test/units/modules/cloud/amazon/test_aws_acm.py +++ /dev/null @@ -1,122 +0,0 @@ -# (c) 2019 Telstra Corporation Limited -# -# This file is part of Ansible -# -# Ansible is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# Ansible is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with Ansible. If not, see <http://www.gnu.org/licenses/>. - -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type -from ansible.modules.cloud.amazon.aws_acm import pem_chain_split, chain_compare -from ansible.module_utils._text import to_bytes, to_text -from pprint import pprint - - -def test_chain_compare(): - - # The functions we're testing take module as an argument - # Just so they can call module.fail_json - # Let's just use None for the unit tests, - # Because they shouldn't fail - # And if they do, fail_json is not applicable - module = None - - fixture_suffix = 'test/units/modules/cloud/amazon/fixtures/certs' - - # Test chain split function on super simple (invalid) certs - expected = ['aaa', 'bbb', 'ccc'] - - for fname in ['simple-chain-a.cert', 'simple-chain-b.cert']: - path = fixture_suffix + '/' + fname - with open(path, 'r') as f: - pem = to_text(f.read()) - actual = pem_chain_split(module, pem) - actual = [a.strip() for a in actual] - if actual != expected: - print("Expected:") - pprint(expected) - print("Actual:") - pprint(actual) - raise AssertionError("Failed to properly split %s" % fname) - - # Now test real chains - # chains with same same_as should be considered equal - test_chains = [ - { # Original Cert chain - 'path': fixture_suffix + '/chain-1.0.cert', - 'same_as': 1, - 'length': 3 - }, - { # Same as 1.0, but longer PEM lines - 'path': fixture_suffix + '/chain-1.1.cert', - 'same_as': 1, - 'length': 3 - }, - { # Same as 1.0, but without the stuff before each -------- - 'path': fixture_suffix + '/chain-1.2.cert', - 'same_as': 1, - 'length': 3 - }, - { # Same as 1.0, but in a different order, so should be considered different - 'path': fixture_suffix + '/chain-1.3.cert', - 'same_as': 2, - 'length': 3 - }, - { # Same as 1.0, but with last link missing - 'path': fixture_suffix + '/chain-1.4.cert', - 'same_as': 3, - 'length': 2 - }, - { # Completely different cert chain to all the others - 'path': fixture_suffix + '/chain-4.cert', - 'same_as': 4, - 'length': 3 - }, - { # Single cert - 'path': fixture_suffix + '/a.pem', - 'same_as': 5, - 'length': 1 - }, - { # a different, single cert - 'path': fixture_suffix + '/b.pem', - 'same_as': 6, - 'length': 1 - } - ] - - for chain in test_chains: - with open(chain['path'], 'r') as f: - chain['pem_text'] = to_text(f.read()) - - # Test to make sure our regex isn't too greedy - chain['split'] = pem_chain_split(module, chain['pem_text']) - if len(chain['split']) != chain['length']: - print("Cert before split") - print(chain['pem_text']) - print("Cert after split") - pprint(chain['split']) - print("path: %s" % chain['path']) - print("Expected chain length: %d" % chain['length']) - print("Actual chain length: %d" % len(chain['split'])) - raise AssertionError("Chain %s was not split properly" % chain['path']) - - for chain_a in test_chains: - for chain_b in test_chains: - expected = (chain_a['same_as'] == chain_b['same_as']) - - # Now test the comparison function - actual = chain_compare(module, chain_a['pem_text'], chain_b['pem_text']) - if expected != actual: - print("Error, unexpected comparison result between \n%s\nand\n%s" % (chain_a['path'], chain_b['path'])) - print("Expected %s got %s" % (str(expected), str(actual))) - assert(expected == actual) diff --git a/test/units/modules/cloud/amazon/test_aws_api_gateway.py b/test/units/modules/cloud/amazon/test_aws_api_gateway.py deleted file mode 100644 index 30b0120a11..0000000000 --- a/test/units/modules/cloud/amazon/test_aws_api_gateway.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# (c) 2016 Michael De La Rue -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import sys - -import pytest - -from ansible.module_utils.ec2 import HAS_BOTO3 -from units.modules.utils import set_module_args - -if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules") - -import ansible.modules.cloud.amazon.aws_api_gateway as agw -from ansible.module_utils.aws import core - - -exit_return_dict = {} - - -def fake_exit_json(self, **kwargs): - """ store the kwargs given to exit_json rather than putting them out to stdout""" - global exit_return_dict - exit_return_dict = kwargs - sys.exit(0) - - -def test_upload_api(monkeypatch): - class FakeConnection: - - def put_rest_api(self, *args, **kwargs): - assert kwargs["body"] == "the-swagger-text-is-fake" - return {"msg": "success!"} - - def return_fake_connection(*args, **kwargs): - return FakeConnection() - - monkeypatch.setattr(core, "boto3_conn", return_fake_connection) - monkeypatch.setattr(core.AnsibleAWSModule, "exit_json", fake_exit_json) - - set_module_args({ - "api_id": "fred", - "state": "present", - "swagger_text": "the-swagger-text-is-fake", - "region": 'mars-north-1', - "_ansible_tmpdir": "/tmp/ansibl-abcdef", - }) - with pytest.raises(SystemExit): - agw.main() - assert exit_return_dict["changed"] - - -def test_warn_if_region_not_specified(): - - set_module_args({ - "name": "aws_api_gateway", - "state": "present", - "runtime": 'python2.7', - "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution', - "handler": 'lambda_python.my_handler'}) - with pytest.raises(SystemExit): - print(agw.main()) diff --git a/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py b/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py deleted file mode 100644 index d232b51095..0000000000 --- a/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py +++ /dev/null @@ -1,92 +0,0 @@ -# (c) 2017 Red Hat Inc. -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep -from ansible.modules.cloud.amazon import aws_direct_connect_connection - - -class FakeModule(object): - def __init__(self, **kwargs): - self.params = kwargs - - def fail_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - raise Exception('FAIL') - - def exit_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - - -# When rerecording these tests, create a stand alone connection with default values in us-west-2 -# with the name ansible-test-connection and set connection_id to the appropriate value -connection_id = "dxcon-fgq9rgot" -connection_name = 'ansible-test-connection' - - -def test_connection_status(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = aws_direct_connect_connection.connection_status(client, connection_id)['connection'] - assert status['connectionName'] == connection_name - assert status['connectionId'] == connection_id - - -def test_connection_exists_by_id(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - exists = aws_direct_connect_connection.connection_exists(client, connection_id) - assert exists == connection_id - - -def test_connection_exists_by_name(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - exists = aws_direct_connect_connection.connection_exists(client, None, connection_name) - assert exists == connection_id - - -def test_connection_does_not_exist(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - exists = aws_direct_connect_connection.connection_exists(client, 'dxcon-notthere') - assert exists is False - - -def test_changed_properties(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = aws_direct_connect_connection.connection_status(client, connection_id)['connection'] - location = "differentlocation" - bandwidth = status['bandwidth'] - assert aws_direct_connect_connection.changed_properties(status, location, bandwidth) is True - - -def test_associations_are_not_updated(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - status = aws_direct_connect_connection.connection_status(client, connection_id)['connection'] - lag_id = status.get('lagId') - assert aws_direct_connect_connection.update_associations(client, status, connection_id, lag_id) is False - - -def test_create_and_delete(placeboify, maybe_sleep): - client = placeboify.client('directconnect') - created_conn = verify_create_works(placeboify, maybe_sleep, client) - deleted_conn = verify_delete_works(placeboify, maybe_sleep, client, created_conn) - - -def verify_create_works(placeboify, maybe_sleep, client): - created = aws_direct_connect_connection.create_connection(client=client, - location="EqSE2", - bandwidth="1Gbps", - name="ansible-test-2", - lag_id=None) - assert created.startswith('dxcon') - return created - - -def verify_delete_works(placeboify, maybe_sleep, client, conn_id): - changed = aws_direct_connect_connection.ensure_absent(client, conn_id) - assert changed is True diff --git a/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py b/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py deleted file mode 100644 index 1f733aeb4c..0000000000 --- a/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py +++ /dev/null @@ -1,168 +0,0 @@ -# (c) 2017 Red Hat Inc. -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest -import os -import collections -from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep -from ansible.modules.cloud.amazon import aws_direct_connect_link_aggregation_group as lag_module -from ansible.module_utils.ec2 import get_aws_connection_info, boto3_conn - - -@pytest.fixture(scope="module") -def dependencies(): - - # each LAG dict will contain the keys: module, connections, virtual_interfaces - Dependencies = collections.namedtuple("Dependencies", ["lag_1", "lag_2"]) - lag_1 = dict() - lag_2 = dict() - - vanilla_params = {"name": "ansible_lag_1", - "location": "EqSe2", - "num_connections": 1, - "min_links": 0, - "bandwidth": "1Gbps"} - - for lag in ("ansible_lag_1", "ansible_lag_2"): - params = dict(vanilla_params) - params["name"] = lag - if lag == "ansible_lag_1": - lag_1["module"] = FakeModule(**params) - else: - lag_2["module"] = FakeModule(**params) - - if os.getenv("PLACEBO_RECORD"): - region, ec2_url, aws_connect_kwargs = get_aws_connection_info(lag_1["module"], boto3=True) - client = boto3_conn(lag_1["module"], conn_type="client", resource="directconnect", region=region, endpoint=ec2_url, **aws_connect_kwargs) - # See if link aggregation groups exist - for name in ("ansible_lag_1", "ansible_lag_2"): - lag_id = lag_module.create_lag(client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None) - if name == "ansible_lag_1": - lag_1["lag_id"] = lag_id - lag_1["name"] = name - else: - lag_2["lag_id"] = lag_id - lag_2["name"] = name - yield Dependencies(lag_1=lag_1, lag_2=lag_2) - else: - lag_1.update(lag_id="dxlag-fgkk4dja", name="ansible_lag_1") - lag_2.update(lag_id="dxlag-fgytkicv", name="ansible_lag_2") - yield Dependencies(lag_1=lag_1, lag_2=lag_2) - - if os.getenv("PLACEBO_RECORD"): - # clean up - lag_module.ensure_absent(client, lag_1["lag_id"], lag_1["name"], True, True, True, 120) - lag_module.ensure_absent(client, lag_2["lag_id"], lag_2["name"], True, True, True, 120) - - -class FakeModule(object): - def __init__(self, **kwargs): - self.params = kwargs - - def fail_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - raise Exception("FAIL") - - def exit_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - - -def test_nonexistent_lag_status(placeboify, maybe_sleep): - client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id="doesntexist", - lag_name="doesntexist", - verify=True) - assert not exists - - -def test_lag_status(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - status = lag_module.lag_status(client, lag_id=dependencies.lag_1.get("lag_id")) - assert status.get("lagId") == dependencies.lag_1.get("lag_id") - assert status.get("lagName") == "ansible_lag_1" - - -def test_lag_exists(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id=dependencies.lag_1.get("lag_id"), - lag_name=None, - verify=True) - assert exists - - -def test_lag_exists_using_name(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id=None, - lag_name=dependencies.lag_1.get("name"), - verify=True) - assert exists - - -def test_nonexistent_lag_does_not_exist(placeboify, maybe_sleep): - client = placeboify.client("directconnect") - exists = lag_module.lag_exists(client=client, - lag_id="dxlag-XXXXXXXX", - lag_name="doesntexist", - verify=True) - assert not exists - - -def test_lag_changed_true(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - status = lag_module.lag_status(client=client, lag_id=dependencies.lag_1.get("lag_id")) - assert lag_module.lag_changed(status, "new_name", 1) - - -def test_lag_changed_true_no(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - status = lag_module.lag_status(client=client, lag_id=dependencies.lag_1.get("lag_id")) - assert not lag_module.lag_changed(status, "ansible_lag_1", 0) - - -def test_update_lag(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - status_before = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id")) - lag_module.update_lag(client, - lag_id=dependencies.lag_2.get("lag_id"), - lag_name="ansible_lag_2_update", - min_links=0, - wait=False, - wait_timeout=0, - num_connections=1) - status_after = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id")) - assert status_before != status_after - - # remove the lag name from the statuses and verify it was the only thing changed - del status_before['lagName'] - del status_after['lagName'] - assert status_before == status_after - - -def test_delete_nonexistent_lag(placeboify, maybe_sleep): - client = placeboify.client("directconnect") - changed = lag_module.ensure_absent(client, "dxlag-XXXXXXXX", "doesntexist", True, True, True, 120) - assert not changed - - -def test_delete_lag_with_connections_without_force_delete(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - with pytest.raises(Exception) as error_message: - lag_module.ensure_absent(client, dependencies.lag_1.get("lag_id"), "ansible_lag_1", False, True, True, 120) - assert "To force deletion of the LAG use delete_force: True" in error_message - - -def test_delete_lag_with_connections(placeboify, maybe_sleep, dependencies): - client = placeboify.client("directconnect") - changed = lag_module.ensure_absent(client, dependencies.lag_1.get("lag_id"), "ansible_lag_1", True, True, True, 120) - assert changed diff --git a/test/units/modules/cloud/amazon/test_data_pipeline.py b/test/units/modules/cloud/amazon/test_data_pipeline.py deleted file mode 100644 index 7d821b3993..0000000000 --- a/test/units/modules/cloud/amazon/test_data_pipeline.py +++ /dev/null @@ -1,250 +0,0 @@ -# (c) 2017 Red Hat Inc. -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import os -import json -import collections - -import pytest -from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep - -from ansible.modules.cloud.amazon import data_pipeline -from ansible.module_utils._text import to_text - -# test_api_gateway.py requires the `boto3` and `botocore` modules -boto3 = pytest.importorskip('boto3') - - -@pytest.fixture(scope='module') -def dp_setup(): - """ - Yield a FakeModule object, data pipeline id of a vanilla data pipeline, and data pipeline objects - - This fixture is module-scoped, since this can be reused for multiple tests. - """ - Dependencies = collections.namedtuple("Dependencies", ["module", "data_pipeline_id", "objects"]) - - # get objects to use to test populating and activating the data pipeline - if not os.getenv('PLACEBO_RECORD'): - objects = [{"name": "Every 1 day", - "id": "DefaultSchedule", - "fields": []}, - {"name": "Default", - "id": "Default", - "fields": []}] - else: - s3 = boto3.client('s3') - data = s3.get_object(Bucket="ansible-test-datapipeline", Key="pipeline-object/new.json") - objects = json.loads(to_text(data['Body'].read())) - - # create a module with vanilla data pipeline parameters - params = {'name': 'ansible-test-create-pipeline', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'timeout': 300, - 'objects': [], - 'tags': {}, - 'parameters': [], - 'values': []} - module = FakeModule(**params) - - # yield a module, the data pipeline id, and the data pipeline objects (that are not yet defining the vanilla data pipeline) - if not os.getenv('PLACEBO_RECORD'): - yield Dependencies(module=module, data_pipeline_id='df-0590406117G8DPQZY2HA', objects=objects) - else: - connection = boto3.client('datapipeline') - changed, result = data_pipeline.create_pipeline(connection, module) - data_pipeline_id = result['data_pipeline']['pipeline_id'] - yield Dependencies(module=module, data_pipeline_id=data_pipeline_id, objects=objects) - - # remove data pipeline - if os.getenv('PLACEBO_RECORD'): - module.params.update(state='absent') - data_pipeline.delete_pipeline(connection, module) - - -class FakeModule(object): - def __init__(self, **kwargs): - self.params = kwargs - - def fail_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - raise Exception('FAIL') - - def exit_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - - -def test_create_pipeline_already_exists(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - changed, result = data_pipeline.create_pipeline(connection, dp_setup.module) - assert changed is False - assert "Data Pipeline ansible-test-create-pipeline is present" in result['msg'] - - -def test_pipeline_field(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - pipeline_field_info = data_pipeline.pipeline_field(connection, dp_setup.data_pipeline_id, "@pipelineState") - assert pipeline_field_info == "PENDING" - - -def test_define_pipeline(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - changed, result = data_pipeline.define_pipeline(connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id) - assert 'has been updated' in result - - -def test_deactivate_pipeline(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - changed, result = data_pipeline.deactivate_pipeline(connection, dp_setup.module) - assert "Data Pipeline ansible-test-create-pipeline deactivated" in result['msg'] - - -def test_activate_without_population(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - with pytest.raises(Exception) as error_message: - changed, result = data_pipeline.activate_pipeline(connection, dp_setup.module) - assert error_message == "You need to populate your pipeline before activation." - - -def test_create_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-unittest-create-pipeline', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'timeout': 300, - 'tags': {}} - m = FakeModule(**params) - changed, result = data_pipeline.create_pipeline(connection, m) - assert changed is True - assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline created." - - data_pipeline.delete_pipeline(connection, m) - - -def test_create_pipeline_with_tags(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-unittest-create-pipeline_tags', - 'description': 'ansible-datapipeline-unit-test', - 'state': 'present', - 'tags': {'ansible': 'test'}, - 'timeout': 300} - m = FakeModule(**params) - changed, result = data_pipeline.create_pipeline(connection, m) - assert changed is True - assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline_tags created." - - data_pipeline.delete_pipeline(connection, m) - - -def test_delete_nonexistent_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-test-nonexistent', - 'description': 'ansible-test-nonexistent', - 'state': 'absent', - 'objects': [], - 'tags': {'ansible': 'test'}, - 'timeout': 300} - m = FakeModule(**params) - changed, result = data_pipeline.delete_pipeline(connection, m) - assert changed is False - - -def test_delete_pipeline(placeboify, maybe_sleep): - connection = placeboify.client('datapipeline') - params = {'name': 'ansible-test-nonexistent', - 'description': 'ansible-test-nonexistent', - 'state': 'absent', - 'objects': [], - 'tags': {'ansible': 'test'}, - 'timeout': 300} - m = FakeModule(**params) - data_pipeline.create_pipeline(connection, m) - changed, result = data_pipeline.delete_pipeline(connection, m) - assert changed is True - - -def test_build_unique_id_different(): - m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id'}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id-different'}) - assert data_pipeline.build_unique_id(m) != data_pipeline.build_unique_id(m2) - - -def test_build_unique_id_same(): - m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}}) - assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2) - - -def test_build_unique_id_obj(): - # check that the object can be different and the unique id should be the same; should be able to modify objects - m = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'first': 'object'}]}) - m2 = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'second': 'object'}]}) - assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2) - - -def test_format_tags(): - unformatted_tags = {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'} - formatted_tags = data_pipeline.format_tags(unformatted_tags) - for tag_set in formatted_tags: - assert unformatted_tags[tag_set['key']] == tag_set['value'] - - -def test_format_empty_tags(): - unformatted_tags = {} - formatted_tags = data_pipeline.format_tags(unformatted_tags) - assert formatted_tags == [] - - -def test_pipeline_description(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - dp_id = dp_setup.data_pipeline_id - pipelines = data_pipeline.pipeline_description(connection, dp_id) - assert dp_id == pipelines['pipelineDescriptionList'][0]['pipelineId'] - - -def test_pipeline_description_nonexistent(placeboify, maybe_sleep): - hypothetical_pipeline_id = "df-015440025PF7YGLDK47C" - connection = placeboify.client('datapipeline') - with pytest.raises(Exception) as error: - data_pipeline.pipeline_description(connection, hypothetical_pipeline_id) - assert error == data_pipeline.DataPipelineNotFound - - -def test_check_dp_exists_true(placeboify, maybe_sleep, dp_setup): - connection = placeboify.client('datapipeline') - exists = data_pipeline.check_dp_exists(connection, dp_setup.data_pipeline_id) - assert exists is True - - -def test_check_dp_exists_false(placeboify, maybe_sleep): - hypothetical_pipeline_id = "df-015440025PF7YGLDK47C" - connection = placeboify.client('datapipeline') - exists = data_pipeline.check_dp_exists(connection, hypothetical_pipeline_id) - assert exists is False - - -def test_check_dp_status(placeboify, maybe_sleep, dp_setup): - inactive_states = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING'] - connection = placeboify.client('datapipeline') - state = data_pipeline.check_dp_status(connection, dp_setup.data_pipeline_id, inactive_states) - assert state is True - - -def test_activate_pipeline(placeboify, maybe_sleep, dp_setup): - # use objects to define pipeline before activating - connection = placeboify.client('datapipeline') - data_pipeline.define_pipeline(connection, - module=dp_setup.module, - objects=dp_setup.objects, - dp_id=dp_setup.data_pipeline_id) - changed, result = data_pipeline.activate_pipeline(connection, dp_setup.module) - assert changed is True diff --git a/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py b/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py deleted file mode 100644 index 53cd8c4c2c..0000000000 --- a/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py +++ /dev/null @@ -1,207 +0,0 @@ -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest -import unittest - -from mock import patch - -import ansible.modules.cloud.amazon.ec2_vpc_nat_gateway as ng - - -boto3 = pytest.importorskip("boto3") -botocore = pytest.importorskip("botocore") - -aws_region = 'us-west-2' - - -class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase): - - def test_get_nat_gateways(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, stream = ( - ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_get_nat_gateways_no_gateways_found(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, stream = ( - ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True) - ) - self.assertTrue(success) - self.assertEqual(stream, []) - - def test_wait_for_status(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, gws = ( - ng.wait_for_status( - client, 5, 'nat-123456789', 'available', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS[0] - self.assertTrue(success) - self.assertEqual(gws, should_return) - - @patch('time.sleep') - def test_wait_for_status_to_timeout(self, mock_sleep): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, gws = ( - ng.wait_for_status( - client, 2, 'nat-12345678', 'available', check_mode=True - ) - ) - self.assertFalse(success) - self.assertEqual(gws, {}) - - def test_gateway_in_subnet_exists_with_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertEqual(gws, should_return) - - def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', 'eipalloc-123', check_mode=True - ) - ) - should_return = list() - self.assertEqual(gws, should_return) - - def test_gateway_in_subnet_exists_without_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - gws, err_msg = ( - ng.gateway_in_subnet_exists( - client, 'subnet-123456789', check_mode=True - ) - ) - should_return = ng.DRY_RUN_GATEWAYS - self.assertEqual(gws, should_return) - - def test_get_eip_allocation_id_by_address(self): - client = boto3.client('ec2', region_name=aws_region) - allocation_id, error_msg = ( - ng.get_eip_allocation_id_by_address( - client, '55.55.55.55', check_mode=True - ) - ) - should_return = 'eipalloc-1234567' - self.assertEqual(allocation_id, should_return) - - def test_get_eip_allocation_id_by_address_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - allocation_id, err_msg = ( - ng.get_eip_allocation_id_by_address( - client, '52.52.52.52', check_mode=True - ) - ) - self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist') - self.assertTrue(allocation_id is None) - - def test_allocate_eip_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg, eip_id = ( - ng.allocate_eip_address( - client, check_mode=True - ) - ) - self.assertTrue(success) - - def test_release_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, err_msg = ( - ng.release_address( - client, 'eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - - def test_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.create( - client, 'subnet-123456', 'eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_pre_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_pre_create_idemptotent_with_allocation_id(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_pre_create_idemptotent_with_eip_address(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_pre_create_idemptotent_if_exist_do_not_create(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.pre_create( - client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True - ) - ) - self.assertTrue(success) - self.assertFalse(changed) - - def test_delete(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.remove( - client, 'nat-123456789', check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_delete_and_release_ip(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.remove( - client, 'nat-123456789', release_eip=True, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - - def test_delete_if_does_not_exist(self): - client = boto3.client('ec2', region_name=aws_region) - success, changed, err_msg, results = ( - ng.remove( - client, 'nat-12345', check_mode=True - ) - ) - self.assertFalse(success) - self.assertFalse(changed) diff --git a/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py b/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py deleted file mode 100644 index 5bf3b40f91..0000000000 --- a/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py +++ /dev/null @@ -1,352 +0,0 @@ -# (c) 2017 Red Hat Inc. -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest -import os -from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep -from ansible.modules.cloud.amazon import ec2_vpc_vpn -from ansible.module_utils.ec2 import get_aws_connection_info, boto3_conn, boto3_tag_list_to_ansible_dict - - -class FakeModule(object): - def __init__(self, **kwargs): - self.params = kwargs - - def fail_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - raise Exception('FAIL') - - def exit_json(self, *args, **kwargs): - self.exit_args = args - self.exit_kwargs = kwargs - - -def get_vgw(connection): - # see if two vgw exist and return them if so - vgw = connection.describe_vpn_gateways(Filters=[{'Name': 'tag:Ansible_VPN', 'Values': ['Test']}]) - if len(vgw['VpnGateways']) >= 2: - return [vgw['VpnGateways'][0]['VpnGatewayId'], vgw['VpnGateways'][1]['VpnGatewayId']] - # otherwise create two and return them - vgw_1 = connection.create_vpn_gateway(Type='ipsec.1') - vgw_2 = connection.create_vpn_gateway(Type='ipsec.1') - for resource in (vgw_1, vgw_2): - connection.create_tags(Resources=[resource['VpnGateway']['VpnGatewayId']], Tags=[{'Key': 'Ansible_VPN', 'Value': 'Test'}]) - return [vgw_1['VpnGateway']['VpnGatewayId'], vgw_2['VpnGateway']['VpnGatewayId']] - - -def get_cgw(connection): - # see if two cgw exist and return them if so - cgw = connection.describe_customer_gateways(DryRun=False, Filters=[{'Name': 'state', 'Values': ['available']}, - {'Name': 'tag:Name', 'Values': ['Ansible-CGW']}]) - if len(cgw['CustomerGateways']) >= 2: - return [cgw['CustomerGateways'][0]['CustomerGatewayId'], cgw['CustomerGateways'][1]['CustomerGatewayId']] - # otherwise create and return them - cgw_1 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='9.8.7.6', BgpAsn=65000) - cgw_2 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='5.4.3.2', BgpAsn=65000) - for resource in (cgw_1, cgw_2): - connection.create_tags(Resources=[resource['CustomerGateway']['CustomerGatewayId']], Tags=[{'Key': 'Ansible-CGW', 'Value': 'Test'}]) - return [cgw_1['CustomerGateway']['CustomerGatewayId'], cgw_2['CustomerGateway']['CustomerGatewayId']] - - -def get_dependencies(): - if os.getenv('PLACEBO_RECORD'): - module = FakeModule(**{}) - region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True) - connection = boto3_conn(module, conn_type='client', resource='ec2', region=region, endpoint=ec2_url, **aws_connect_kwargs) - vgw = get_vgw(connection) - cgw = get_cgw(connection) - else: - vgw = ["vgw-35d70c2b", "vgw-32d70c2c"] - cgw = ["cgw-6113c87f", "cgw-9e13c880"] - - return cgw, vgw - - -def setup_mod_conn(placeboify, params): - conn = placeboify.client('ec2') - m = FakeModule(**params) - return m, conn - - -def make_params(cgw, vgw, tags=None, filters=None, routes=None): - tags = {} if tags is None else tags - filters = {} if filters is None else filters - routes = [] if routes is None else routes - - return {'customer_gateway_id': cgw, - 'static_only': True, - 'vpn_gateway_id': vgw, - 'connection_type': 'ipsec.1', - 'purge_tags': True, - 'tags': tags, - 'filters': filters, - 'routes': routes, - 'delay': 15, - 'wait_timeout': 600} - - -def make_conn(placeboify, module, connection): - customer_gateway_id = module.params['customer_gateway_id'] - static_only = module.params['static_only'] - vpn_gateway_id = module.params['vpn_gateway_id'] - connection_type = module.params['connection_type'] - check_mode = module.params['check_mode'] - changed = True - vpn = ec2_vpc_vpn.create_connection(connection, customer_gateway_id, static_only, vpn_gateway_id, connection_type) - return changed, vpn - - -def tear_down_conn(placeboify, connection, vpn_connection_id): - ec2_vpc_vpn.delete_connection(connection, vpn_connection_id, delay=15, max_attempts=40) - - -def test_find_connection_vpc_conn_id(placeboify, maybe_sleep): - # setup dependencies for 2 vpn connections - dependencies = setup_req(placeboify, 2) - dep1, dep2 = dependencies[0], dependencies[1] - params1, vpn1, m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection'] - params2, vpn2, m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection'] - - # find the connection with a vpn_connection_id and assert it is the expected one - assert vpn1['VpnConnectionId'] == ec2_vpc_vpn.find_connection(conn1, params1, vpn1['VpnConnectionId'])['VpnConnectionId'] - - tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId']) - - -def test_find_connection_filters(placeboify, maybe_sleep): - # setup dependencies for 2 vpn connections - dependencies = setup_req(placeboify, 2) - dep1, dep2 = dependencies[0], dependencies[1] - params1, vpn1, m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection'] - params2, vpn2, m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection'] - - # update to different tags - params1.update(tags={'Wrong': 'Tag'}) - params2.update(tags={'Correct': 'Tag'}) - ec2_vpc_vpn.ensure_present(conn1, params1) - ec2_vpc_vpn.ensure_present(conn2, params2) - - # create some new parameters for a filter - params = {'filters': {'tags': {'Correct': 'Tag'}}} - - # find the connection that has the parameters above - found = ec2_vpc_vpn.find_connection(conn1, params) - - # assert the correct connection was found - assert found['VpnConnectionId'] == vpn2['VpnConnectionId'] - - # delete the connections - tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId']) - - -def test_find_connection_insufficient_filters(placeboify, maybe_sleep): - # get list of customer gateways and virtual private gateways - cgw, vgw = get_dependencies() - - # create two connections with the same tags - params = make_params(cgw[0], vgw[0], tags={'Correct': 'Tag'}) - params2 = make_params(cgw[1], vgw[1], tags={'Correct': 'Tag'}) - m, conn = setup_mod_conn(placeboify, params) - m2, conn2 = setup_mod_conn(placeboify, params2) - vpn1 = ec2_vpc_vpn.ensure_present(conn, m.params)[1] - vpn2 = ec2_vpc_vpn.ensure_present(conn2, m2.params)[1] - - # reset the parameters so only filtering by tags will occur - m.params = {'filters': {'tags': {'Correct': 'Tag'}}} - - # assert that multiple matching connections have been found - with pytest.raises(Exception) as error_message: - ec2_vpc_vpn.find_connection(conn, m.params) - assert error_message == "More than one matching VPN connection was found.To modify or delete a VPN please specify vpn_connection_id or add filters." - - # delete the connections - tear_down_conn(placeboify, conn, vpn1['VpnConnectionId']) - tear_down_conn(placeboify, conn, vpn2['VpnConnectionId']) - - -def test_find_connection_nonexistent(placeboify, maybe_sleep): - # create parameters but don't create a connection with them - params = {'filters': {'tags': {'Correct': 'Tag'}}} - m, conn = setup_mod_conn(placeboify, params) - - # try to find a connection with matching parameters and assert None are found - assert ec2_vpc_vpn.find_connection(conn, m.params) is None - - -def test_create_connection(placeboify, maybe_sleep): - # get list of customer gateways and virtual private gateways - cgw, vgw = get_dependencies() - - # create a connection - params = make_params(cgw[0], vgw[0]) - m, conn = setup_mod_conn(placeboify, params) - changed, vpn = ec2_vpc_vpn.ensure_present(conn, m.params) - - # assert that changed is true and that there is a connection id - assert changed is True - assert 'VpnConnectionId' in vpn - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_create_connection_that_exists(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # try to recreate the same connection - changed, vpn2 = ec2_vpc_vpn.ensure_present(conn, params) - - # nothing should have changed - assert changed is False - assert vpn['VpnConnectionId'] == vpn2['VpnConnectionId'] - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_modify_deleted_connection(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # delete it - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - # try to update the deleted connection - m.params.update(vpn_connection_id=vpn['VpnConnectionId']) - with pytest.raises(Exception) as error_message: - ec2_vpc_vpn.ensure_present(conn, m.params) - assert error_message == "There is no VPN connection available or pending with that id. Did you delete it?" - - -def test_delete_connection(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # delete it - changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params) - - assert changed is True - assert vpn == {} - - -def test_delete_nonexistent_connection(placeboify, maybe_sleep): - # create parameters and ensure any connection matching (None) is deleted - params = {'filters': {'tags': {'ThisConnection': 'DoesntExist'}}, 'delay': 15, 'wait_timeout': 600} - m, conn = setup_mod_conn(placeboify, params) - changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params) - - assert changed is False - assert vpn == {} - - -def test_check_for_update_tags(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # add and remove a number of tags - m.params['tags'] = {'One': 'one', 'Two': 'two'} - ec2_vpc_vpn.ensure_present(conn, m.params) - m.params['tags'] = {'Two': 'two', 'Three': 'three', 'Four': 'four'} - changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId']) - - flat_dict_changes = boto3_tag_list_to_ansible_dict(changes['tags_to_add']) - correct_changes = boto3_tag_list_to_ansible_dict([{'Key': 'Three', 'Value': 'three'}, {'Key': 'Four', 'Value': 'four'}]) - assert flat_dict_changes == correct_changes - assert changes['tags_to_remove'] == ['One'] - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_check_for_update_nonmodifiable_attr(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - current_vgw = params['vpn_gateway_id'] - - # update a parameter that isn't modifiable - m.params.update(vpn_gateway_id="invalidchange") - - err = 'You cannot modify vpn_gateway_id, the current value of which is {0}. Modifiable VPN connection attributes are tags.'.format(current_vgw) - with pytest.raises(Exception) as error_message: - ec2_vpc_vpn.check_for_update(m, conn, vpn['VpnConnectionId']) - assert error_message == err - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_add_tags(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # add a tag to the connection - ec2_vpc_vpn.add_tags(conn, vpn['VpnConnectionId'], add=[{'Key': 'Ansible-Test', 'Value': 'VPN'}]) - - # assert tag is there - current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert current_vpn['Tags'] == [{'Key': 'Ansible-Test', 'Value': 'VPN'}] - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_remove_tags(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # remove a tag from the connection - ec2_vpc_vpn.remove_tags(conn, vpn['VpnConnectionId'], remove=['Ansible-Test']) - - # assert the tag is gone - current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert 'Tags' not in current_vpn - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def test_add_routes(placeboify, maybe_sleep): - # setup dependencies for 1 vpn connection - dependencies = setup_req(placeboify, 1) - params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection'] - - # create connection with a route - ec2_vpc_vpn.add_routes(conn, vpn['VpnConnectionId'], ['195.168.2.0/24', '196.168.2.0/24']) - - # assert both routes are there - current_vpn = ec2_vpc_vpn.find_connection(conn, params) - assert set(each['DestinationCidrBlock'] for each in current_vpn['Routes']) == set(['195.168.2.0/24', '196.168.2.0/24']) - - # delete connection - tear_down_conn(placeboify, conn, vpn['VpnConnectionId']) - - -def setup_req(placeboify, number_of_results=1): - ''' returns dependencies for VPN connections ''' - assert number_of_results in (1, 2) - results = [] - cgw, vgw = get_dependencies() - for each in range(0, number_of_results): - params = make_params(cgw[each], vgw[each]) - m, conn = setup_mod_conn(placeboify, params) - vpn = ec2_vpc_vpn.ensure_present(conn, params)[1] - - results.append({'module': m, 'connection': conn, 'vpn': vpn, 'params': params}) - if number_of_results == 1: - return results[0] - else: - return results[0], results[1] diff --git a/test/units/modules/cloud/amazon/test_iam_password_policy.py b/test/units/modules/cloud/amazon/test_iam_password_policy.py deleted file mode 100644 index 85b828a130..0000000000 --- a/test/units/modules/cloud/amazon/test_iam_password_policy.py +++ /dev/null @@ -1,30 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest - -from units.modules.utils import set_module_args -from ansible.module_utils.ec2 import HAS_BOTO3 - -if not HAS_BOTO3: - pytestmark = pytest.mark.skip("iam_password_policy.py requires the `boto3` and `botocore` modules") -else: - import boto3 - from ansible.modules.cloud.amazon import iam_password_policy - - -def test_warn_if_state_not_specified(): - set_module_args({ - "min_pw_length": "8", - "require_symbols": "false", - "require_numbers": "true", - "require_uppercase": "true", - "require_lowercase": "true", - "allow_pw_change": "true", - "pw_max_age": "60", - "pw_reuse_prevent": "5", - "pw_expire": "false" - }) - with pytest.raises(SystemExit): - print(iam_password_policy.main()) diff --git a/test/units/modules/cloud/amazon/test_kinesis_stream.py b/test/units/modules/cloud/amazon/test_kinesis_stream.py deleted file mode 100644 index e549ae9d11..0000000000 --- a/test/units/modules/cloud/amazon/test_kinesis_stream.py +++ /dev/null @@ -1,330 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest -import unittest - -boto3 = pytest.importorskip("boto3") -botocore = pytest.importorskip("botocore") - -from ansible.modules.cloud.amazon import kinesis_stream - -aws_region = 'us-west-2' - - -class AnsibleKinesisStreamFunctions(unittest.TestCase): - - def test_convert_to_lower(self): - example = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - converted_example = kinesis_stream.convert_to_lower(example) - keys = list(converted_example.keys()) - keys.sort() - for i in range(len(keys)): - if i == 0: - self.assertEqual(keys[i], 'has_more_shards') - if i == 1: - self.assertEqual(keys[i], 'retention_period_hours') - if i == 2: - self.assertEqual(keys[i], 'stream_arn') - if i == 3: - self.assertEqual(keys[i], 'stream_name') - if i == 4: - self.assertEqual(keys[i], 'stream_status') - - def test_make_tags_in_aws_format(self): - example = { - 'env': 'development' - } - should_return = [ - { - 'Key': 'env', - 'Value': 'development' - } - ] - aws_tags = kinesis_stream.make_tags_in_aws_format(example) - self.assertEqual(aws_tags, should_return) - - def test_make_tags_in_proper_format(self): - example = [ - { - 'Key': 'env', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - should_return = { - 'env': 'development', - 'service': 'web' - } - proper_tags = kinesis_stream.make_tags_in_proper_format(example) - self.assertEqual(proper_tags, should_return) - - def test_recreate_tags_from_list(self): - example = [('environment', 'development'), ('service', 'web')] - should_return = [ - { - 'Key': 'environment', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - aws_tags = kinesis_stream.recreate_tags_from_list(example) - self.assertEqual(aws_tags, should_return) - - def test_get_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) - self.assertTrue(success) - should_return = [ - { - 'Key': 'DryRunMode', - 'Value': 'true' - } - ] - self.assertEqual(tags, should_return) - - def test_find_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.find_stream(client, 'test', check_mode=True) - ) - should_return = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 5, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_wait_for_status(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.wait_for_status( - client, 'test', 'ACTIVE', check_mode=True - ) - ) - should_return = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 5, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_tags_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update_tags( - client, 'test', tags, check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_retention_action_increase(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 48, 'increase', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_decrease(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'decrease', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'create', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_shard_count(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.update_shard_count( - client, 'test', 5, check_mode=True - ) - ) - self.assertTrue(success) - - def test_update(self): - client = boto3.client('kinesis', region_name=aws_region) - current_stream = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 1, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update( - client, current_stream, 'test', number_of_shards=2, retention_period=48, - tags=tags, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_create_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg, results = ( - kinesis_stream.create_stream( - client, 'test', number_of_shards=10, retention_period=48, - tags=tags, check_mode=True - ) - ) - should_return = { - 'open_shards_count': 5, - 'closed_shards_count': 0, - 'shards_count': 5, - 'has_more_shards': True, - 'retention_period_hours': 24, - 'stream_name': 'test', - 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'stream_status': 'ACTIVE', - 'encryption_type': 'NONE', - 'tags': tags, - } - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(results, should_return) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_enable_stream_encription(self): - client = boto3.client('kinesis', region_name=aws_region) - success, changed, err_msg, results = ( - kinesis_stream.start_stream_encryption( - client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test encryption started successfully.') - - def test_dsbale_stream_encryption(self): - client = boto3.client('kinesis', region_name=aws_region) - success, changed, err_msg, results = ( - kinesis_stream.stop_stream_encryption( - client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test encryption stopped successfully.') diff --git a/test/units/modules/cloud/amazon/test_lambda.py b/test/units/modules/cloud/amazon/test_lambda.py deleted file mode 100644 index 14ea2b454c..0000000000 --- a/test/units/modules/cloud/amazon/test_lambda.py +++ /dev/null @@ -1,273 +0,0 @@ -# -# (c) 2017 Michael De La Rue -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import copy -import pytest - -from units.compat.mock import MagicMock, Mock, patch -from ansible.module_utils import basic -from units.modules.utils import set_module_args - - -boto3 = pytest.importorskip("boto3") - -# lambda is a keyword so we have to hack this. -_temp = __import__("ansible.modules.cloud.amazon.lambda") -lda = getattr(_temp.modules.cloud.amazon, "lambda") - - -base_lambda_config = { - 'FunctionName': 'lambda_name', - 'Role': 'arn:aws:iam::987654321012:role/lambda_basic_execution', - 'Handler': 'lambda_python.my_handler', - 'Description': 'this that the other', - 'Timeout': 3, - 'MemorySize': 128, - 'Runtime': 'python2.7', - 'CodeSha256': 'AqMZ+xptM7aC9VXu+5jyp1sqO+Nj4WFMNzQxtPMP2n8=', -} - -one_change_lambda_config = copy.copy(base_lambda_config) -one_change_lambda_config['Timeout'] = 4 -two_change_lambda_config = copy.copy(one_change_lambda_config) -two_change_lambda_config['Role'] = 'arn:aws:iam::987654321012:role/lambda_advanced_execution' -code_change_lambda_config = copy.copy(base_lambda_config) -code_change_lambda_config['CodeSha256'] = 'P+Zy8U4T4RiiHWElhL10VBKj9jw4rSJ5bm/TiW+4Rts=' - -base_module_args = { - "region": "us-west-1", - "name": "lambda_name", - "state": "present", - "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip", - "runtime": 'python2.7', - "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution', - "memory_size": 128, - "timeout": 3, - "handler": 'lambda_python.my_handler' -} -module_args_with_environment = dict(base_module_args, environment_variables={ - "variable_name": "variable_value" -}) - - -def make_mock_no_connection_connection(config): - """return a mock of ansible's boto3_conn ready to return a mock AWS API client""" - lambda_client_double = MagicMock() - lambda_client_double.get_function.configure_mock( - return_value=False - ) - lambda_client_double.update_function_configuration.configure_mock( - return_value={ - 'Version': 1 - } - ) - fake_boto3_conn = Mock(return_value=lambda_client_double) - return (fake_boto3_conn, lambda_client_double) - - -def make_mock_connection(config): - """return a mock of ansible's boto3_conn ready to return a mock AWS API client""" - lambda_client_double = MagicMock() - lambda_client_double.get_function.configure_mock( - return_value={ - 'Configuration': config - } - ) - lambda_client_double.update_function_configuration.configure_mock( - return_value={ - 'Version': 1 - } - ) - fake_boto3_conn = Mock(return_value=lambda_client_double) - return (fake_boto3_conn, lambda_client_double) - - -class AnsibleFailJson(Exception): - pass - - -def fail_json_double(*args, **kwargs): - """works like fail_json but returns module results inside exception instead of stdout""" - kwargs['failed'] = True - raise AnsibleFailJson(kwargs) - - -# TODO: def test_handle_different_types_in_config_params(): - - -def test_create_lambda_if_not_exist(): - - set_module_args(base_module_args) - (boto3_conn_double, lambda_client_double) = make_mock_no_connection_connection(code_change_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \ - "unexpectedly updated lambda configuration when should have only created" - assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \ - "update lambda function code when function should have been created only" - assert(len(lambda_client_double.create_function.mock_calls) > 0), \ - "failed to call create_function " - (create_args, create_kwargs) = lambda_client_double.create_function.call_args - assert (len(create_kwargs) > 0), "expected create called with keyword args, none found" - - try: - # For now I assume that we should NOT send an empty environment. It might - # be okay / better to explicitly send an empty environment. However `None' - # is not acceptable - mikedlr - create_kwargs["Environment"] - raise(Exception("Environment sent to boto when none expected")) - except KeyError: - pass # We are happy, no environment is fine - - -def test_update_lambda_if_code_changed(): - - set_module_args(base_module_args) - (boto3_conn_double, lambda_client_double) = make_mock_connection(code_change_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \ - "unexpectedly updatede lambda configuration when only code changed" - assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \ - "lambda function update called multiple times when only one time should be needed" - assert(len(lambda_client_double.update_function_code.mock_calls) > 1), \ - "failed to update lambda function when code changed" - # 3 because after uploading we call into the return from mock to try to find what function version - # was returned so the MagicMock actually sees two calls for one update. - assert(len(lambda_client_double.update_function_code.mock_calls) < 3), \ - "lambda function code update called multiple times when only one time should be needed" - - -def test_update_lambda_if_config_changed(): - - set_module_args(base_module_args) - (boto3_conn_double, lambda_client_double) = make_mock_connection(two_change_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \ - "failed to update lambda function when configuration changed" - assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \ - "lambda function update called multiple times when only one time should be needed" - assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \ - "updated lambda code when no change should have happened" - - -def test_update_lambda_if_only_one_config_item_changed(): - - set_module_args(base_module_args) - (boto3_conn_double, lambda_client_double) = make_mock_connection(one_change_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \ - "failed to update lambda function when configuration changed" - assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \ - "lambda function update called multiple times when only one time should be needed" - assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \ - "updated lambda code when no change should have happened" - - -def test_update_lambda_if_added_environment_variable(): - - set_module_args(module_args_with_environment) - (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \ - "failed to update lambda function when configuration changed" - assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \ - "lambda function update called multiple times when only one time should be needed" - assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \ - "updated lambda code when no change should have happened" - - (update_args, update_kwargs) = lambda_client_double.update_function_configuration.call_args - assert (len(update_kwargs) > 0), "expected update configuration called with keyword args, none found" - assert update_kwargs['Environment']['Variables'] == module_args_with_environment['environment_variables'] - - -def test_dont_update_lambda_if_nothing_changed(): - set_module_args(base_module_args) - (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config) - - with patch.object(lda, 'boto3_conn', boto3_conn_double): - try: - lda.main() - except SystemExit: - pass - - # guard against calling other than for a lambda connection (e.g. IAM) - assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used" - assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly" - assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \ - "updated lambda function when no configuration changed" - assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \ - "updated lambda code when no change should have happened" - - -def test_warn_region_not_specified(): - - set_module_args({ - "name": "lambda_name", - "state": "present", - # Module is called without a region causing error - # "region": "us-east-1", - "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip", - "runtime": 'python2.7', - "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution', - "handler": 'lambda_python.my_handler'}) - - get_aws_connection_info_double = Mock(return_value=(None, None, None)) - - with patch.object(lda, 'get_aws_connection_info', get_aws_connection_info_double): - with patch.object(basic.AnsibleModule, 'fail_json', fail_json_double): - try: - lda.main() - except AnsibleFailJson as e: - result = e.args[0] - assert("region must be specified" in result['msg']) diff --git a/test/units/modules/cloud/amazon/test_lambda_policy.py b/test/units/modules/cloud/amazon/test_lambda_policy.py deleted file mode 100644 index 5c32370469..0000000000 --- a/test/units/modules/cloud/amazon/test_lambda_policy.py +++ /dev/null @@ -1,155 +0,0 @@ -# -# (c) 2017 Michael De La Rue -# -# This file is part of Ansible -# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) - -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import copy - -import pytest - -from ansible.module_utils.aws.core import HAS_BOTO3 -from units.compat.mock import MagicMock -from units.modules.utils import set_module_args - -if not HAS_BOTO3: - pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules") - -# these are here cause ... boto! -from ansible.modules.cloud.amazon import lambda_policy -from ansible.modules.cloud.amazon.lambda_policy import setup_module_object -try: - from botocore.exceptions import ClientError -except ImportError: - pass - - -base_module_args = { - "region": "us-west-1", - "function_name": "this_is_a_test_function", - "state": "present", - "statement_id": "test-allow-lambda", - "principal": 123456, - "action": "lambda:*" -} - - -def test_module_is_created_sensibly(): - set_module_args(base_module_args) - module = setup_module_object() - assert module.params['function_name'] == 'this_is_a_test_function' - - -module_double = MagicMock() -module_double.fail_json_aws.side_effect = Exception("unexpected call to fail_json_aws") -module_double.check_mode = False - -fake_module_params_present = { - "state": "present", - "statement_id": "test-allow-lambda", - "principal": "apigateway.amazonaws.com", - "action": "lambda:InvokeFunction", - "source_arn": u'arn:aws:execute-api:us-east-1:123456789:efghijklmn/authorizers/*', - "version": 0, - "alias": None -} -fake_module_params_different = copy.deepcopy(fake_module_params_present) -fake_module_params_different["action"] = "lambda:api-gateway" -fake_module_params_absent = copy.deepcopy(fake_module_params_present) -fake_module_params_absent["state"] = "absent" - -fake_policy_return = { - u'Policy': ( - u'{"Version":"2012-10-17","Id":"default","Statement":[{"Sid":"1234567890abcdef1234567890abcdef",' - u'"Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},"Action":"lambda:InvokeFunction",' - u'"Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",' - u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:abcdefghij/authorizers/1a2b3c"}}},' - u'{"Sid":"2234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},' - u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",' - u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:klmnopqrst/authorizers/4d5f6g"}}},' - u'{"Sid":"1234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},' - u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",' - u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:eu-west-1:123456789:uvwxyzabcd/authorizers/7h8i9j"}}},' - u'{"Sid":"test-allow-lambda","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},' - u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",' - u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:efghijklmn/authorizers/*"}}},' - u'{"Sid":"1234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},' - u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",' - u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:opqrstuvwx/authorizers/0k1l2m"}}}]}'), - 'ResponseMetadata': { - 'RetryAttempts': 0, - 'HTTPStatusCode': 200, - 'RequestId': 'abcdefgi-1234-a567-b890-123456789abc', - 'HTTPHeaders': { - 'date': 'Sun, 13 Aug 2017 10:54:17 GMT', - 'x-amzn-requestid': 'abcdefgi-1234-a567-b890-123456789abc', - 'content-length': '1878', - 'content-type': 'application/json', - 'connection': 'keep-alive'}}} - -error_response = {'Error': {'Code': 'ResourceNotFoundException', 'Message': 'Fake Testing Error'}} -operation_name = 'FakeOperation' - - -def test_manage_state_adds_missing_permissions(): - lambda_client_double = MagicMock() - # Policy actually: not present Requested State: present Should: create - lambda_client_double.get_policy.side_effect = ClientError(error_response, operation_name) - fake_module_params = copy.deepcopy(fake_module_params_present) - module_double.params = fake_module_params - lambda_policy.manage_state(module_double, lambda_client_double) - assert lambda_client_double.get_policy.call_count > 0 - assert lambda_client_double.add_permission.call_count > 0 - lambda_client_double.remove_permission.assert_not_called() - - -def test_manage_state_leaves_existing_permissions(): - lambda_client_double = MagicMock() - # Policy actually: present Requested State: present Should: do nothing - lambda_client_double.get_policy.return_value = fake_policy_return - fake_module_params = copy.deepcopy(fake_module_params_present) - module_double.params = fake_module_params - lambda_policy.manage_state(module_double, lambda_client_double) - assert lambda_client_double.get_policy.call_count > 0 - lambda_client_double.add_permission.assert_not_called() - lambda_client_double.remove_permission.assert_not_called() - - -def test_manage_state_updates_nonmatching_permissions(): - lambda_client_double = MagicMock() - # Policy actually: present Requested State: present Should: do nothing - lambda_client_double.get_policy.return_value = fake_policy_return - fake_module_params = copy.deepcopy(fake_module_params_different) - module_double.params = fake_module_params - lambda_policy.manage_state(module_double, lambda_client_double) - assert lambda_client_double.get_policy.call_count > 0 - assert lambda_client_double.add_permission.call_count > 0 - assert lambda_client_double.remove_permission.call_count > 0 - - -def test_manage_state_removes_unwanted_permissions(): - lambda_client_double = MagicMock() - # Policy actually: present Requested State: not present Should: remove - lambda_client_double.get_policy.return_value = fake_policy_return - fake_module_params = copy.deepcopy(fake_module_params_absent) - module_double.params = fake_module_params - lambda_policy.manage_state(module_double, lambda_client_double) - assert lambda_client_double.get_policy.call_count > 0 - lambda_client_double.add_permission.assert_not_called() - assert lambda_client_double.remove_permission.call_count > 0 - - -def test_manage_state_leaves_already_removed_permissions(): - lambda_client_double = MagicMock() - # Policy actually: absent Requested State: absent Should: do nothing - lambda_client_double.get_policy.side_effect = ClientError(error_response, operation_name) - fake_module_params = copy.deepcopy(fake_module_params_absent) - module_double.params = fake_module_params - lambda_policy.manage_state(module_double, lambda_client_double) - assert lambda_client_double.get_policy.call_count > 0 - lambda_client_double.add_permission.assert_not_called() - lambda_client_double.remove_permission.assert_not_called() diff --git a/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py b/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py deleted file mode 100644 index 1891b5c890..0000000000 --- a/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py +++ /dev/null @@ -1,52 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from ansible.modules.cloud.amazon import redshift_cross_region_snapshots as rcrs - -mock_status_enabled = { - 'SnapshotCopyGrantName': 'snapshot-us-east-1-to-us-west-2', - 'DestinationRegion': 'us-west-2', - 'RetentionPeriod': 1, -} - -mock_status_disabled = {} - -mock_request_illegal = { - 'snapshot_copy_grant': 'changed', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 1 -} - -mock_request_update = { - 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 3 -} - -mock_request_no_update = { - 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2', - 'destination_region': 'us-west-2', - 'snapshot_retention_period': 1 -} - - -def test_fail_at_unsupported_operations(): - response = rcrs.requesting_unsupported_modifications( - mock_status_enabled, mock_request_illegal - ) - assert response is True - - -def test_needs_update_true(): - response = rcrs.needs_update(mock_status_enabled, mock_request_update) - assert response is True - - -def test_no_change(): - response = rcrs.requesting_unsupported_modifications( - mock_status_enabled, mock_request_no_update - ) - needs_update_response = rcrs.needs_update(mock_status_enabled, mock_request_no_update) - assert response is False - assert needs_update_response is False diff --git a/test/units/modules/cloud/amazon/test_route53_zone.py b/test/units/modules/cloud/amazon/test_route53_zone.py deleted file mode 100644 index 283584a4dd..0000000000 --- a/test/units/modules/cloud/amazon/test_route53_zone.py +++ /dev/null @@ -1,610 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import functools - -from ansible.modules.cloud.amazon import route53_zone -from units.compat import unittest -from units.compat.mock import patch, call -from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args - - -def parameterized(params_list): - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - for params_map in params_list: - params_map.update(kwargs) - func(*args, **params_map) - return wrapper - return decorator - - -# Inline and replace with subdict.items() <= superdict.items(), when Python 2.6 compat can be dropped -def is_subdict(subdict, superdict): - return all(superdict[k] == v for k, v in subdict.items()) - - -@patch('ansible.module_utils.aws.core.HAS_BOTO3', new=True) -@patch.object(route53_zone.AnsibleAWSModule, 'client') -@patch.object(route53_zone.time, 'time', return_value=1) -class TestRoute53Module(ModuleTestCase): - def test_mutually_exclusive(self, *args): - with self.assertRaises(AnsibleFailJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'vpc_id': 'vpc-94ccc2ff', - 'vpc_region': 'eu-central-1', - 'comment': 'foobar', - 'delegation_set_id': 'A1BCDEF2GHIJKL', - 'state': 'present', - }) - route53_zone.main() - - self.assertEqual( - exec_info.exception.args[0]['msg'], - 'parameters are mutually exclusive: delegation_set_id|vpc_id, delegation_set_id|vpc_region', - ) - - @parameterized([ - { - 'check_mode': False, - 'response': { - 'private_zone': False, - 'vpc_id': None, - 'vpc_region': None, - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': '', - 'zone_id': 'ZONE_ID', - }, - }, - { - 'check_mode': True, - 'response': { - 'private_zone': False, - 'vpc_id': None, - 'vpc_region': None, - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': None, - }, - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[]) - def test_create_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response): - client_mock.return_value.create_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': { - 'Comment': 'foobar', - 'PrivateZone': False, - }, - }, - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': 'foobar', - 'state': 'present', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.create_hosted_zone.assert_not_called() - else: - client_mock.return_value.create_hosted_zone.assert_called_once_with(**{ - 'HostedZoneConfig': { - 'Comment': 'foobar', - 'PrivateZone': False, - }, - 'Name': 'example.com.', - 'CallerReference': 'example.com.-1', - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @parameterized([ - { - 'check_mode': False, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID', - }, - }, - { - 'check_mode': True, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': None, - }, - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[]) - def test_create_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response): - client_mock.return_value.create_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': { - 'Comment': 'foobar', - 'PrivateZone': True - }, - }, - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': 'foobar', - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'state': 'present', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.create_hosted_zone.assert_not_called() - else: - client_mock.return_value.create_hosted_zone.assert_called_once_with(**{ - 'HostedZoneConfig': { - 'Comment': 'foobar', - 'PrivateZone': True, - }, - 'Name': 'example.com.', - 'CallerReference': 'example.com.-1', - 'VPC': { - 'VPCRegion': 'eu-central-1', - 'VPCId': 'vpc-1', - }, - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @parameterized([ - { - 'check_mode': False, - 'response': { - 'private_zone': False, - 'vpc_id': None, - 'vpc_region': None, - 'comment': 'new', - 'name': 'example.com.', - 'delegation_set_id': '', - 'zone_id': 'ZONE_ID', - }, - }, - { - 'check_mode': True, - 'response': { - 'private_zone': False, - 'vpc_id': None, - 'vpc_region': None, - 'comment': 'new', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID', - }, - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }]) - def test_update_comment_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }, - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': 'new', - 'state': 'present', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.update_hosted_zone_comment.assert_not_called() - else: - client_mock.return_value.update_hosted_zone_comment.assert_called_once_with(**{ - 'Id': '/hostedzone/ZONE_ID', - 'Comment': 'new', - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/Z22OU4IUOVYM30', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }]) - def test_update_public_zone_no_changes(self, find_zones_mock, time_mock, client_mock): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }, - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': '', - 'state': 'present', - }) - route53_zone.main() - - client_mock.return_value.update_hosted_zone_comment.assert_not_called() - self.assertEqual(exec_info.exception.args[0]['changed'], False) - - @parameterized([ - { - 'check_mode': False, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'comment': 'new', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID', - }, - }, - { - 'check_mode': True, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'comment': 'new', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID', - }, - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }]) - def test_update_comment_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }, - 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}], - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': 'new', - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'state': 'present', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.update_hosted_zone_comment.assert_not_called() - else: - client_mock.return_value.update_hosted_zone_comment.assert_called_once_with(**{ - 'Id': '/hostedzone/ZONE_ID', - 'Comment': 'new', - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @parameterized([ - { - 'check_mode': False, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-2', - 'vpc_region': 'us-east-2', - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID_2', - }, - }, - { - 'check_mode': True, - 'response': { - 'private_zone': True, - 'vpc_id': 'vpc-2', - 'vpc_region': 'us-east-2', - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': None, - }, - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }]) - def test_update_vpc_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }, - 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}], - } - client_mock.return_value.create_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID_2', - 'Name': 'example.com.', - 'Config': { - 'Comment': 'foobar', - 'PrivateZone': True - }, - }, - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'us-east-2', - 'zone': 'example.com', - 'comment': 'foobar', - 'vpc_id': 'vpc-2', - 'vpc_region': 'us-east-2', - 'state': 'present', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.create_hosted_zone.assert_not_called() - else: - client_mock.return_value.create_hosted_zone.assert_called_once_with(**{ - 'HostedZoneConfig': { - 'Comment': 'foobar', - 'PrivateZone': True, - }, - 'Name': 'example.com.', - 'CallerReference': 'example.com.-1', - 'VPC': { - 'VPCRegion': 'us-east-2', - 'VPCId': 'vpc-2', - }, - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }]) - def test_update_private_zone_no_changes(self, find_zones_mock, time_mock, client_mock): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }, - 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}], - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'comment': 'foobar', - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'state': 'present', - }) - route53_zone.main() - - client_mock.return_value.update_hosted_zone_comment.assert_not_called() - self.assertEqual(exec_info.exception.args[0]['changed'], False) - - response = { - 'private_zone': True, - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'comment': 'foobar', - 'name': 'example.com.', - 'delegation_set_id': None, - 'zone_id': 'ZONE_ID', - } - self.assertTrue(is_subdict(response, exec_info.exception.args[0])) - - @parameterized([ - {'check_mode': False}, - {'check_mode': True} - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }]) - def test_delete_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode): - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'state': 'absent', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.delete_hosted_zone.assert_not_called() - else: - client_mock.return_value.delete_hosted_zone.assert_called_once_with(**{ - 'Id': '/hostedzone/ZONE_ID', - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - - @parameterized([ - {'check_mode': False}, - {'check_mode': True} - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }]) - def test_delete_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode): - client_mock.return_value.get_hosted_zone.return_value = { - 'HostedZone': { - 'Id': '/hostedzone/ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }, - 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}], - } - - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'vpc_id': 'vpc-1', - 'vpc_region': 'eu-central-1', - 'state': 'absent', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.delete_hosted_zone.assert_not_called() - else: - client_mock.return_value.delete_hosted_zone.assert_called_once_with(**{ - 'Id': '/hostedzone/ZONE_ID', - }) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - - @parameterized([ - {'check_mode': False}, - {'check_mode': True} - ]) - @parameterized([ - { - 'hosted_zone_id': 'PRIVATE_ZONE_ID', - 'call_params': [call(**{ - 'Id': 'PRIVATE_ZONE_ID', - })], - }, { - 'hosted_zone_id': 'all', - 'call_params': [call(**{ - 'Id': '/hostedzone/PUBLIC_ZONE_ID', - }), call(**{ - 'Id': '/hostedzone/PRIVATE_ZONE_ID', - })], - } - ]) - @patch.object(route53_zone, 'find_zones', return_value=[{ - 'Id': '/hostedzone/PUBLIC_ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': '', 'PrivateZone': False}, - }, { - 'Id': '/hostedzone/PRIVATE_ZONE_ID', - 'Name': 'example.com.', - 'Config': {'Comment': 'foobar', 'PrivateZone': True}, - }]) - def test_delete_by_zone_id(self, find_zones_mock, time_mock, client_mock, hosted_zone_id, call_params, check_mode): - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'hosted_zone_id': hosted_zone_id, - 'state': 'absent', - '_ansible_check_mode': check_mode, - }) - route53_zone.main() - - if check_mode: - client_mock.return_value.delete_hosted_zone.assert_not_called() - else: - client_mock.return_value.delete_hosted_zone.assert_has_calls(call_params) - - self.assertEqual(exec_info.exception.args[0]['changed'], True) - - @patch.object(route53_zone, 'find_zones', return_value=[]) - def test_delete_absent_zone(self, find_zones_mock, time_mock, client_mock): - with self.assertRaises(AnsibleExitJson) as exec_info: - set_module_args({ - 'secret_key': 'SECRET_KEY', - 'access_key': 'ACCESS_KEY', - 'region': 'eu-central-1', - 'zone': 'example.com', - 'state': 'absent', - }) - route53_zone.main() - - client_mock.return_value.delete_hosted_zone.assert_not_called() - self.assertEqual(exec_info.exception.args[0]['changed'], False) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/units/modules/cloud/amazon/test_s3_bucket_notification.py b/test/units/modules/cloud/amazon/test_s3_bucket_notification.py deleted file mode 100644 index cf342064c0..0000000000 --- a/test/units/modules/cloud/amazon/test_s3_bucket_notification.py +++ /dev/null @@ -1,262 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest - -from units.compat.mock import MagicMock, patch -from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args - -from ansible.modules.cloud.amazon.s3_bucket_notification import AmazonBucket, Config -from ansible.modules.cloud.amazon import s3_bucket_notification -try: - from botocore.exceptions import ClientError -except ImportError: - pass - - -class TestAmazonBucketOperations: - def test_current_config(self): - api_config = { - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-arn', - 'Events': [], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '' - }, { - 'Name': 'Suffix', - 'Value': '' - }] - } - } - } - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [api_config] - } - bucket = AmazonBucket(client, 'test-bucket') - current = bucket.current_config('test-id') - assert current.raw == api_config - assert client.get_bucket_notification_configuration.call_count == 1 - - def test_current_config_empty(self): - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [] - } - bucket = AmazonBucket(client, 'test-bucket') - current = bucket.current_config('test-id') - assert current is None - assert client.get_bucket_notification_configuration.call_count == 1 - - def test_apply_invalid_config(self): - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [] - } - client.put_bucket_notification_configuration.side_effect = ClientError({}, '') - bucket = AmazonBucket(client, 'test-bucket') - config = Config.from_params(**{ - 'event_name': 'test_event', - 'lambda_function_arn': 'lambda_arn', - 'lambda_version': 1, - 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'], - 'prefix': '', - 'suffix': '' - }) - with pytest.raises(ClientError): - bucket.apply_config(config) - - def test_apply_config(self): - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [] - } - - bucket = AmazonBucket(client, 'test-bucket') - config = Config.from_params(**{ - 'event_name': 'test_event', - 'lambda_function_arn': 'lambda_arn', - 'lambda_version': 1, - 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'], - 'prefix': '', - 'suffix': '' - }) - bucket.apply_config(config) - assert client.get_bucket_notification_configuration.call_count == 1 - assert client.put_bucket_notification_configuration.call_count == 1 - - def test_apply_config_add_event(self): - api_config = { - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-arn', - 'Events': ['s3:ObjectRemoved:*'], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '' - }, { - 'Name': 'Suffix', - 'Value': '' - }] - } - } - } - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [api_config] - } - - bucket = AmazonBucket(client, 'test-bucket') - config = Config.from_params(**{ - 'event_name': 'test-id', - 'lambda_function_arn': 'test-arn', - 'lambda_version': 1, - 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'], - 'prefix': '', - 'suffix': '' - }) - bucket.apply_config(config) - assert client.get_bucket_notification_configuration.call_count == 1 - assert client.put_bucket_notification_configuration.call_count == 1 - client.put_bucket_notification_configuration.assert_called_with( - Bucket='test-bucket', - NotificationConfiguration={ - 'LambdaFunctionConfigurations': [{ - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-arn:1', - 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '' - }, { - 'Name': 'Suffix', - 'Value': '' - }] - } - } - }] - } - ) - - def test_delete_config(self): - api_config = { - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-arn', - 'Events': [], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '' - }, { - 'Name': 'Suffix', - 'Value': '' - }] - } - } - } - client = MagicMock() - client.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [api_config] - } - bucket = AmazonBucket(client, 'test-bucket') - config = Config.from_params(**{ - 'event_name': 'test-id', - 'lambda_function_arn': 'lambda_arn', - 'lambda_version': 1, - 'events': [], - 'prefix': '', - 'suffix': '' - }) - bucket.delete_config(config) - assert client.get_bucket_notification_configuration.call_count == 1 - assert client.put_bucket_notification_configuration.call_count == 1 - client.put_bucket_notification_configuration.assert_called_with( - Bucket='test-bucket', - NotificationConfiguration={'LambdaFunctionConfigurations': []} - ) - - -class TestConfig: - def test_config_from_params(self): - config = Config({ - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-arn:10', - 'Events': [], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '' - }, { - 'Name': 'Suffix', - 'Value': '' - }] - } - } - }) - config_from_params = Config.from_params(**{ - 'event_name': 'test-id', - 'lambda_function_arn': 'test-arn', - 'lambda_version': 10, - 'events': [], - 'prefix': '', - 'suffix': '' - }) - assert config.raw == config_from_params.raw - assert config == config_from_params - - -class TestModule(ModuleTestCase): - def test_module_fail_when_required_args_missing(self): - with pytest.raises(AnsibleFailJson): - set_module_args({}) - s3_bucket_notification.main() - - @patch('ansible.modules.cloud.amazon.s3_bucket_notification.AnsibleAWSModule.client') - def test_add_s3_bucket_notification(self, aws_client): - aws_client.return_value.get_bucket_notification_configuration.return_value = { - 'LambdaFunctionConfigurations': [] - } - set_module_args({ - 'region': 'us-east-2', - 'lambda_function_arn': 'test-lambda-arn', - 'bucket_name': 'test-lambda', - 'event_name': 'test-id', - 'events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], - 'state': 'present', - 'prefix': '/images', - 'suffix': '.jpg' - }) - with pytest.raises(AnsibleExitJson) as context: - s3_bucket_notification.main() - result = context.value.args[0] - assert result['changed'] is True - assert aws_client.return_value.get_bucket_notification_configuration.call_count == 1 - aws_client.return_value.put_bucket_notification_configuration.assert_called_with( - Bucket='test-lambda', - NotificationConfiguration={ - 'LambdaFunctionConfigurations': [{ - 'Id': 'test-id', - 'LambdaFunctionArn': 'test-lambda-arn', - 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'], - 'Filter': { - 'Key': { - 'FilterRules': [{ - 'Name': 'Prefix', - 'Value': '/images' - }, { - 'Name': 'Suffix', - 'Value': '.jpg' - }] - } - } - }] - }) diff --git a/test/units/plugins/connection/test_aws_ssm.py b/test/units/plugins/connection/test_aws_ssm.py deleted file mode 100644 index bcea207e78..0000000000 --- a/test/units/plugins/connection/test_aws_ssm.py +++ /dev/null @@ -1,194 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -from io import StringIO -import pytest -import sys -from ansible import constants as C -from ansible.compat.selectors import SelectorKey, EVENT_READ -from units.compat import unittest -from units.compat.mock import patch, MagicMock, PropertyMock -from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound -from ansible.module_utils.six.moves import shlex_quote -from ansible.module_utils._text import to_bytes -from ansible.playbook.play_context import PlayContext -from ansible.plugins.connection import aws_ssm -from ansible.plugins.loader import connection_loader - - -@pytest.mark.skipif(sys.version_info < (2, 7), reason="requires Python 2.7 or higher") -class TestConnectionBaseClass(unittest.TestCase): - - @patch('os.path.exists') - @patch('subprocess.Popen') - @patch('select.poll') - @patch('boto3.client') - def test_plugins_connection_aws_ssm_start_session(self, boto_client, s_poll, s_popen, mock_ospe): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.get_option = MagicMock() - conn.get_option.side_effect = ['i1234', 'executable', 'abcd', 'i1234'] - conn.host = 'abc' - mock_ospe.return_value = True - boto3 = MagicMock() - boto3.client('ssm').return_value = MagicMock() - conn.start_session = MagicMock() - conn._session_id = MagicMock() - conn._session_id.return_value = 's1' - s_popen.return_value.stdin.write = MagicMock() - s_poll.return_value = MagicMock() - s_poll.return_value.register = MagicMock() - s_popen.return_value.poll = MagicMock() - s_popen.return_value.poll.return_value = None - conn._stdin_readline = MagicMock() - conn._stdin_readline.return_value = 'abc123' - conn.SESSION_START = 'abc' - conn.start_session() - - @patch('random.choice') - def test_plugins_connection_aws_ssm_exec_command(self, r_choice): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - r_choice.side_effect = ['a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b'] - conn.MARK_LENGTH = 5 - conn._session = MagicMock() - conn._session.stdin.write = MagicMock() - conn._wrap_command = MagicMock() - conn._wrap_command.return_value = 'cmd1' - conn._flush_stderr = MagicMock() - conn._windows = MagicMock() - conn._windows.return_value = True - sudoable = True - conn._session.poll = MagicMock() - conn._session.poll.return_value = None - remaining = 0 - conn._timeout = MagicMock() - conn._poll_stdout = MagicMock() - conn._poll_stdout.poll = MagicMock() - conn._poll_stdout.poll.return_value = True - conn._session.stdout = MagicMock() - conn._session.stdout.readline = MagicMock() - begin = True - mark_end = 'a' - line = ['a', 'b'] - conn._post_process = MagicMock() - conn._post_process.return_value = 'test' - conn._session.stdout.readline.side_effect = iter(['aaaaa\n', 'Hi\n', '0\n', 'bbbbb\n']) - conn.get_option = MagicMock() - conn.get_option.return_value = 1 - cmd = MagicMock() - returncode = 'a' - stdout = 'b' - return (returncode, stdout, conn._flush_stderr) - - def test_plugins_connection_aws_ssm_prepare_terminal(self): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.is_windows = MagicMock() - conn.is_windows.return_value = True - - def test_plugins_connection_aws_ssm_wrap_command(self): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.is_windows = MagicMock() - conn.is_windows.return_value = True - return('windows1') - - def test_plugins_connection_aws_ssm_post_process(self): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.is_windows = MagicMock() - conn.is_windows.return_value = True - success = 3 - fail = 2 - conn.stdout = MagicMock() - returncode = 0 - return(returncode, conn.stdout) - - @patch('subprocess.Popen') - def test_plugins_connection_aws_ssm_flush_stderr(self, s_popen): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.poll_stderr = MagicMock() - conn.poll_stderr.register = MagicMock() - conn.stderr = None - s_popen.poll().return_value = 123 - return(conn.stderr) - - @patch('boto3.client') - def test_plugins_connection_aws_ssm_get_url(self, boto): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - boto3 = MagicMock() - boto3.client('s3').return_value = MagicMock() - boto3.generate_presigned_url.return_value = MagicMock() - return (boto3.generate_presigned_url.return_value) - - @patch('os.path.exists') - def test_plugins_connection_aws_ssm_put_file(self, mock_ospe): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn._connect = MagicMock() - conn._file_transport_command = MagicMock() - conn._file_transport_command.return_value = (0, 'stdout', 'stderr') - res, stdout, stderr = conn.put_file('/in/file', '/out/file') - - def test_plugins_connection_aws_ssm_fetch_file(self): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn._connect = MagicMock() - conn._file_transport_command = MagicMock() - conn._file_transport_command.return_value = (0, 'stdout', 'stderr') - res, stdout, stderr = conn.fetch_file('/in/file', '/out/file') - - @patch('subprocess.check_output') - @patch('boto3.client') - def test_plugins_connection_file_transport_command(self, boto_client, s_check_output): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.get_option = MagicMock() - conn.get_option.side_effect = ['1', '2', '3', '4', '5'] - conn._get_url = MagicMock() - conn._get_url.side_effect = ['url1', 'url2'] - boto3 = MagicMock() - boto3.client('s3').return_value = MagicMock() - conn.get_option.return_value = 1 - ssm_action = 'get' - get_command = MagicMock() - put_command = MagicMock() - conn.exec_command = MagicMock() - conn.exec_command.return_value = (put_command, None, False) - conn.download_fileobj = MagicMock() - (returncode, stdout, stderr) = conn.exec_command(put_command, in_data=None, sudoable=False) - returncode = 0 - (returncode, stdout, stderr) = conn.exec_command(get_command, in_data=None, sudoable=False) - - @patch('subprocess.check_output') - def test_plugins_connection_aws_ssm_close(self, s_check_output): - pc = PlayContext() - new_stdin = StringIO() - conn = connection_loader.get('aws_ssm', pc, new_stdin) - conn.instance_id = "i-12345" - conn._session_id = True - conn.get_option = MagicMock() - conn.get_option.side_effect = ["/abc", "pqr"] - conn._session = MagicMock() - conn._session.terminate = MagicMock() - conn._session.communicate = MagicMock() - conn._terminate_session = MagicMock() - conn._terminate_session.return_value = '' - conn._session_id = MagicMock() - conn._session_id.return_value = 'a' - conn._client = MagicMock() - conn.close() |