summaryrefslogtreecommitdiff
path: root/test/units
diff options
context:
space:
mode:
Diffstat (limited to 'test/units')
-rw-r--r--test/units/modules/cloud/amazon/test_aws_acm.py122
-rw-r--r--test/units/modules/cloud/amazon/test_aws_api_gateway.py69
-rw-r--r--test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py92
-rw-r--r--test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py168
-rw-r--r--test/units/modules/cloud/amazon/test_data_pipeline.py250
-rw-r--r--test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py207
-rw-r--r--test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py352
-rw-r--r--test/units/modules/cloud/amazon/test_iam_password_policy.py30
-rw-r--r--test/units/modules/cloud/amazon/test_kinesis_stream.py330
-rw-r--r--test/units/modules/cloud/amazon/test_lambda.py273
-rw-r--r--test/units/modules/cloud/amazon/test_lambda_policy.py155
-rw-r--r--test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py52
-rw-r--r--test/units/modules/cloud/amazon/test_route53_zone.py610
-rw-r--r--test/units/modules/cloud/amazon/test_s3_bucket_notification.py262
-rw-r--r--test/units/plugins/connection/test_aws_ssm.py194
15 files changed, 0 insertions, 3166 deletions
diff --git a/test/units/modules/cloud/amazon/test_aws_acm.py b/test/units/modules/cloud/amazon/test_aws_acm.py
deleted file mode 100644
index d2fd87b8de..0000000000
--- a/test/units/modules/cloud/amazon/test_aws_acm.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# (c) 2019 Telstra Corporation Limited
-#
-# This file is part of Ansible
-#
-# Ansible is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# Ansible is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-from ansible.modules.cloud.amazon.aws_acm import pem_chain_split, chain_compare
-from ansible.module_utils._text import to_bytes, to_text
-from pprint import pprint
-
-
-def test_chain_compare():
-
- # The functions we're testing take module as an argument
- # Just so they can call module.fail_json
- # Let's just use None for the unit tests,
- # Because they shouldn't fail
- # And if they do, fail_json is not applicable
- module = None
-
- fixture_suffix = 'test/units/modules/cloud/amazon/fixtures/certs'
-
- # Test chain split function on super simple (invalid) certs
- expected = ['aaa', 'bbb', 'ccc']
-
- for fname in ['simple-chain-a.cert', 'simple-chain-b.cert']:
- path = fixture_suffix + '/' + fname
- with open(path, 'r') as f:
- pem = to_text(f.read())
- actual = pem_chain_split(module, pem)
- actual = [a.strip() for a in actual]
- if actual != expected:
- print("Expected:")
- pprint(expected)
- print("Actual:")
- pprint(actual)
- raise AssertionError("Failed to properly split %s" % fname)
-
- # Now test real chains
- # chains with same same_as should be considered equal
- test_chains = [
- { # Original Cert chain
- 'path': fixture_suffix + '/chain-1.0.cert',
- 'same_as': 1,
- 'length': 3
- },
- { # Same as 1.0, but longer PEM lines
- 'path': fixture_suffix + '/chain-1.1.cert',
- 'same_as': 1,
- 'length': 3
- },
- { # Same as 1.0, but without the stuff before each --------
- 'path': fixture_suffix + '/chain-1.2.cert',
- 'same_as': 1,
- 'length': 3
- },
- { # Same as 1.0, but in a different order, so should be considered different
- 'path': fixture_suffix + '/chain-1.3.cert',
- 'same_as': 2,
- 'length': 3
- },
- { # Same as 1.0, but with last link missing
- 'path': fixture_suffix + '/chain-1.4.cert',
- 'same_as': 3,
- 'length': 2
- },
- { # Completely different cert chain to all the others
- 'path': fixture_suffix + '/chain-4.cert',
- 'same_as': 4,
- 'length': 3
- },
- { # Single cert
- 'path': fixture_suffix + '/a.pem',
- 'same_as': 5,
- 'length': 1
- },
- { # a different, single cert
- 'path': fixture_suffix + '/b.pem',
- 'same_as': 6,
- 'length': 1
- }
- ]
-
- for chain in test_chains:
- with open(chain['path'], 'r') as f:
- chain['pem_text'] = to_text(f.read())
-
- # Test to make sure our regex isn't too greedy
- chain['split'] = pem_chain_split(module, chain['pem_text'])
- if len(chain['split']) != chain['length']:
- print("Cert before split")
- print(chain['pem_text'])
- print("Cert after split")
- pprint(chain['split'])
- print("path: %s" % chain['path'])
- print("Expected chain length: %d" % chain['length'])
- print("Actual chain length: %d" % len(chain['split']))
- raise AssertionError("Chain %s was not split properly" % chain['path'])
-
- for chain_a in test_chains:
- for chain_b in test_chains:
- expected = (chain_a['same_as'] == chain_b['same_as'])
-
- # Now test the comparison function
- actual = chain_compare(module, chain_a['pem_text'], chain_b['pem_text'])
- if expected != actual:
- print("Error, unexpected comparison result between \n%s\nand\n%s" % (chain_a['path'], chain_b['path']))
- print("Expected %s got %s" % (str(expected), str(actual)))
- assert(expected == actual)
diff --git a/test/units/modules/cloud/amazon/test_aws_api_gateway.py b/test/units/modules/cloud/amazon/test_aws_api_gateway.py
deleted file mode 100644
index 30b0120a11..0000000000
--- a/test/units/modules/cloud/amazon/test_aws_api_gateway.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# (c) 2016 Michael De La Rue
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import sys
-
-import pytest
-
-from ansible.module_utils.ec2 import HAS_BOTO3
-from units.modules.utils import set_module_args
-
-if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules")
-
-import ansible.modules.cloud.amazon.aws_api_gateway as agw
-from ansible.module_utils.aws import core
-
-
-exit_return_dict = {}
-
-
-def fake_exit_json(self, **kwargs):
- """ store the kwargs given to exit_json rather than putting them out to stdout"""
- global exit_return_dict
- exit_return_dict = kwargs
- sys.exit(0)
-
-
-def test_upload_api(monkeypatch):
- class FakeConnection:
-
- def put_rest_api(self, *args, **kwargs):
- assert kwargs["body"] == "the-swagger-text-is-fake"
- return {"msg": "success!"}
-
- def return_fake_connection(*args, **kwargs):
- return FakeConnection()
-
- monkeypatch.setattr(core, "boto3_conn", return_fake_connection)
- monkeypatch.setattr(core.AnsibleAWSModule, "exit_json", fake_exit_json)
-
- set_module_args({
- "api_id": "fred",
- "state": "present",
- "swagger_text": "the-swagger-text-is-fake",
- "region": 'mars-north-1',
- "_ansible_tmpdir": "/tmp/ansibl-abcdef",
- })
- with pytest.raises(SystemExit):
- agw.main()
- assert exit_return_dict["changed"]
-
-
-def test_warn_if_region_not_specified():
-
- set_module_args({
- "name": "aws_api_gateway",
- "state": "present",
- "runtime": 'python2.7',
- "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution',
- "handler": 'lambda_python.my_handler'})
- with pytest.raises(SystemExit):
- print(agw.main())
diff --git a/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py b/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py
deleted file mode 100644
index d232b51095..0000000000
--- a/test/units/modules/cloud/amazon/test_aws_direct_connect_connection.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# (c) 2017 Red Hat Inc.
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep
-from ansible.modules.cloud.amazon import aws_direct_connect_connection
-
-
-class FakeModule(object):
- def __init__(self, **kwargs):
- self.params = kwargs
-
- def fail_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
- raise Exception('FAIL')
-
- def exit_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
-
-
-# When rerecording these tests, create a stand alone connection with default values in us-west-2
-# with the name ansible-test-connection and set connection_id to the appropriate value
-connection_id = "dxcon-fgq9rgot"
-connection_name = 'ansible-test-connection'
-
-
-def test_connection_status(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = aws_direct_connect_connection.connection_status(client, connection_id)['connection']
- assert status['connectionName'] == connection_name
- assert status['connectionId'] == connection_id
-
-
-def test_connection_exists_by_id(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- exists = aws_direct_connect_connection.connection_exists(client, connection_id)
- assert exists == connection_id
-
-
-def test_connection_exists_by_name(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- exists = aws_direct_connect_connection.connection_exists(client, None, connection_name)
- assert exists == connection_id
-
-
-def test_connection_does_not_exist(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- exists = aws_direct_connect_connection.connection_exists(client, 'dxcon-notthere')
- assert exists is False
-
-
-def test_changed_properties(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = aws_direct_connect_connection.connection_status(client, connection_id)['connection']
- location = "differentlocation"
- bandwidth = status['bandwidth']
- assert aws_direct_connect_connection.changed_properties(status, location, bandwidth) is True
-
-
-def test_associations_are_not_updated(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- status = aws_direct_connect_connection.connection_status(client, connection_id)['connection']
- lag_id = status.get('lagId')
- assert aws_direct_connect_connection.update_associations(client, status, connection_id, lag_id) is False
-
-
-def test_create_and_delete(placeboify, maybe_sleep):
- client = placeboify.client('directconnect')
- created_conn = verify_create_works(placeboify, maybe_sleep, client)
- deleted_conn = verify_delete_works(placeboify, maybe_sleep, client, created_conn)
-
-
-def verify_create_works(placeboify, maybe_sleep, client):
- created = aws_direct_connect_connection.create_connection(client=client,
- location="EqSE2",
- bandwidth="1Gbps",
- name="ansible-test-2",
- lag_id=None)
- assert created.startswith('dxcon')
- return created
-
-
-def verify_delete_works(placeboify, maybe_sleep, client, conn_id):
- changed = aws_direct_connect_connection.ensure_absent(client, conn_id)
- assert changed is True
diff --git a/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py b/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py
deleted file mode 100644
index 1f733aeb4c..0000000000
--- a/test/units/modules/cloud/amazon/test_aws_direct_connect_link_aggregation_group.py
+++ /dev/null
@@ -1,168 +0,0 @@
-# (c) 2017 Red Hat Inc.
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-import os
-import collections
-from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep
-from ansible.modules.cloud.amazon import aws_direct_connect_link_aggregation_group as lag_module
-from ansible.module_utils.ec2 import get_aws_connection_info, boto3_conn
-
-
-@pytest.fixture(scope="module")
-def dependencies():
-
- # each LAG dict will contain the keys: module, connections, virtual_interfaces
- Dependencies = collections.namedtuple("Dependencies", ["lag_1", "lag_2"])
- lag_1 = dict()
- lag_2 = dict()
-
- vanilla_params = {"name": "ansible_lag_1",
- "location": "EqSe2",
- "num_connections": 1,
- "min_links": 0,
- "bandwidth": "1Gbps"}
-
- for lag in ("ansible_lag_1", "ansible_lag_2"):
- params = dict(vanilla_params)
- params["name"] = lag
- if lag == "ansible_lag_1":
- lag_1["module"] = FakeModule(**params)
- else:
- lag_2["module"] = FakeModule(**params)
-
- if os.getenv("PLACEBO_RECORD"):
- region, ec2_url, aws_connect_kwargs = get_aws_connection_info(lag_1["module"], boto3=True)
- client = boto3_conn(lag_1["module"], conn_type="client", resource="directconnect", region=region, endpoint=ec2_url, **aws_connect_kwargs)
- # See if link aggregation groups exist
- for name in ("ansible_lag_1", "ansible_lag_2"):
- lag_id = lag_module.create_lag(client, num_connections=1, location="EqSe2", bandwidth="1Gbps", name=name, connection_id=None)
- if name == "ansible_lag_1":
- lag_1["lag_id"] = lag_id
- lag_1["name"] = name
- else:
- lag_2["lag_id"] = lag_id
- lag_2["name"] = name
- yield Dependencies(lag_1=lag_1, lag_2=lag_2)
- else:
- lag_1.update(lag_id="dxlag-fgkk4dja", name="ansible_lag_1")
- lag_2.update(lag_id="dxlag-fgytkicv", name="ansible_lag_2")
- yield Dependencies(lag_1=lag_1, lag_2=lag_2)
-
- if os.getenv("PLACEBO_RECORD"):
- # clean up
- lag_module.ensure_absent(client, lag_1["lag_id"], lag_1["name"], True, True, True, 120)
- lag_module.ensure_absent(client, lag_2["lag_id"], lag_2["name"], True, True, True, 120)
-
-
-class FakeModule(object):
- def __init__(self, **kwargs):
- self.params = kwargs
-
- def fail_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
- raise Exception("FAIL")
-
- def exit_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
-
-
-def test_nonexistent_lag_status(placeboify, maybe_sleep):
- client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id="doesntexist",
- lag_name="doesntexist",
- verify=True)
- assert not exists
-
-
-def test_lag_status(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- status = lag_module.lag_status(client, lag_id=dependencies.lag_1.get("lag_id"))
- assert status.get("lagId") == dependencies.lag_1.get("lag_id")
- assert status.get("lagName") == "ansible_lag_1"
-
-
-def test_lag_exists(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id=dependencies.lag_1.get("lag_id"),
- lag_name=None,
- verify=True)
- assert exists
-
-
-def test_lag_exists_using_name(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id=None,
- lag_name=dependencies.lag_1.get("name"),
- verify=True)
- assert exists
-
-
-def test_nonexistent_lag_does_not_exist(placeboify, maybe_sleep):
- client = placeboify.client("directconnect")
- exists = lag_module.lag_exists(client=client,
- lag_id="dxlag-XXXXXXXX",
- lag_name="doesntexist",
- verify=True)
- assert not exists
-
-
-def test_lag_changed_true(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- status = lag_module.lag_status(client=client, lag_id=dependencies.lag_1.get("lag_id"))
- assert lag_module.lag_changed(status, "new_name", 1)
-
-
-def test_lag_changed_true_no(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- status = lag_module.lag_status(client=client, lag_id=dependencies.lag_1.get("lag_id"))
- assert not lag_module.lag_changed(status, "ansible_lag_1", 0)
-
-
-def test_update_lag(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- status_before = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id"))
- lag_module.update_lag(client,
- lag_id=dependencies.lag_2.get("lag_id"),
- lag_name="ansible_lag_2_update",
- min_links=0,
- wait=False,
- wait_timeout=0,
- num_connections=1)
- status_after = lag_module.lag_status(client=client, lag_id=dependencies.lag_2.get("lag_id"))
- assert status_before != status_after
-
- # remove the lag name from the statuses and verify it was the only thing changed
- del status_before['lagName']
- del status_after['lagName']
- assert status_before == status_after
-
-
-def test_delete_nonexistent_lag(placeboify, maybe_sleep):
- client = placeboify.client("directconnect")
- changed = lag_module.ensure_absent(client, "dxlag-XXXXXXXX", "doesntexist", True, True, True, 120)
- assert not changed
-
-
-def test_delete_lag_with_connections_without_force_delete(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- with pytest.raises(Exception) as error_message:
- lag_module.ensure_absent(client, dependencies.lag_1.get("lag_id"), "ansible_lag_1", False, True, True, 120)
- assert "To force deletion of the LAG use delete_force: True" in error_message
-
-
-def test_delete_lag_with_connections(placeboify, maybe_sleep, dependencies):
- client = placeboify.client("directconnect")
- changed = lag_module.ensure_absent(client, dependencies.lag_1.get("lag_id"), "ansible_lag_1", True, True, True, 120)
- assert changed
diff --git a/test/units/modules/cloud/amazon/test_data_pipeline.py b/test/units/modules/cloud/amazon/test_data_pipeline.py
deleted file mode 100644
index 7d821b3993..0000000000
--- a/test/units/modules/cloud/amazon/test_data_pipeline.py
+++ /dev/null
@@ -1,250 +0,0 @@
-# (c) 2017 Red Hat Inc.
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import os
-import json
-import collections
-
-import pytest
-from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep
-
-from ansible.modules.cloud.amazon import data_pipeline
-from ansible.module_utils._text import to_text
-
-# test_api_gateway.py requires the `boto3` and `botocore` modules
-boto3 = pytest.importorskip('boto3')
-
-
-@pytest.fixture(scope='module')
-def dp_setup():
- """
- Yield a FakeModule object, data pipeline id of a vanilla data pipeline, and data pipeline objects
-
- This fixture is module-scoped, since this can be reused for multiple tests.
- """
- Dependencies = collections.namedtuple("Dependencies", ["module", "data_pipeline_id", "objects"])
-
- # get objects to use to test populating and activating the data pipeline
- if not os.getenv('PLACEBO_RECORD'):
- objects = [{"name": "Every 1 day",
- "id": "DefaultSchedule",
- "fields": []},
- {"name": "Default",
- "id": "Default",
- "fields": []}]
- else:
- s3 = boto3.client('s3')
- data = s3.get_object(Bucket="ansible-test-datapipeline", Key="pipeline-object/new.json")
- objects = json.loads(to_text(data['Body'].read()))
-
- # create a module with vanilla data pipeline parameters
- params = {'name': 'ansible-test-create-pipeline',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'timeout': 300,
- 'objects': [],
- 'tags': {},
- 'parameters': [],
- 'values': []}
- module = FakeModule(**params)
-
- # yield a module, the data pipeline id, and the data pipeline objects (that are not yet defining the vanilla data pipeline)
- if not os.getenv('PLACEBO_RECORD'):
- yield Dependencies(module=module, data_pipeline_id='df-0590406117G8DPQZY2HA', objects=objects)
- else:
- connection = boto3.client('datapipeline')
- changed, result = data_pipeline.create_pipeline(connection, module)
- data_pipeline_id = result['data_pipeline']['pipeline_id']
- yield Dependencies(module=module, data_pipeline_id=data_pipeline_id, objects=objects)
-
- # remove data pipeline
- if os.getenv('PLACEBO_RECORD'):
- module.params.update(state='absent')
- data_pipeline.delete_pipeline(connection, module)
-
-
-class FakeModule(object):
- def __init__(self, **kwargs):
- self.params = kwargs
-
- def fail_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
- raise Exception('FAIL')
-
- def exit_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
-
-
-def test_create_pipeline_already_exists(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- changed, result = data_pipeline.create_pipeline(connection, dp_setup.module)
- assert changed is False
- assert "Data Pipeline ansible-test-create-pipeline is present" in result['msg']
-
-
-def test_pipeline_field(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- pipeline_field_info = data_pipeline.pipeline_field(connection, dp_setup.data_pipeline_id, "@pipelineState")
- assert pipeline_field_info == "PENDING"
-
-
-def test_define_pipeline(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- changed, result = data_pipeline.define_pipeline(connection, dp_setup.module, dp_setup.objects, dp_setup.data_pipeline_id)
- assert 'has been updated' in result
-
-
-def test_deactivate_pipeline(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- changed, result = data_pipeline.deactivate_pipeline(connection, dp_setup.module)
- assert "Data Pipeline ansible-test-create-pipeline deactivated" in result['msg']
-
-
-def test_activate_without_population(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- with pytest.raises(Exception) as error_message:
- changed, result = data_pipeline.activate_pipeline(connection, dp_setup.module)
- assert error_message == "You need to populate your pipeline before activation."
-
-
-def test_create_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-unittest-create-pipeline',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'timeout': 300,
- 'tags': {}}
- m = FakeModule(**params)
- changed, result = data_pipeline.create_pipeline(connection, m)
- assert changed is True
- assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline created."
-
- data_pipeline.delete_pipeline(connection, m)
-
-
-def test_create_pipeline_with_tags(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-unittest-create-pipeline_tags',
- 'description': 'ansible-datapipeline-unit-test',
- 'state': 'present',
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
- m = FakeModule(**params)
- changed, result = data_pipeline.create_pipeline(connection, m)
- assert changed is True
- assert result['msg'] == "Data Pipeline ansible-unittest-create-pipeline_tags created."
-
- data_pipeline.delete_pipeline(connection, m)
-
-
-def test_delete_nonexistent_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-test-nonexistent',
- 'description': 'ansible-test-nonexistent',
- 'state': 'absent',
- 'objects': [],
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
- m = FakeModule(**params)
- changed, result = data_pipeline.delete_pipeline(connection, m)
- assert changed is False
-
-
-def test_delete_pipeline(placeboify, maybe_sleep):
- connection = placeboify.client('datapipeline')
- params = {'name': 'ansible-test-nonexistent',
- 'description': 'ansible-test-nonexistent',
- 'state': 'absent',
- 'objects': [],
- 'tags': {'ansible': 'test'},
- 'timeout': 300}
- m = FakeModule(**params)
- data_pipeline.create_pipeline(connection, m)
- changed, result = data_pipeline.delete_pipeline(connection, m)
- assert changed is True
-
-
-def test_build_unique_id_different():
- m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id'})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id-different'})
- assert data_pipeline.build_unique_id(m) != data_pipeline.build_unique_id(m2)
-
-
-def test_build_unique_id_same():
- m = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'description': 'test-unique-id', 'tags': {'ansible': 'test'}})
- assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2)
-
-
-def test_build_unique_id_obj():
- # check that the object can be different and the unique id should be the same; should be able to modify objects
- m = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'first': 'object'}]})
- m2 = FakeModule(**{'name': 'ansible-unittest-1', 'objects': [{'second': 'object'}]})
- assert data_pipeline.build_unique_id(m) == data_pipeline.build_unique_id(m2)
-
-
-def test_format_tags():
- unformatted_tags = {'key1': 'val1', 'key2': 'val2', 'key3': 'val3'}
- formatted_tags = data_pipeline.format_tags(unformatted_tags)
- for tag_set in formatted_tags:
- assert unformatted_tags[tag_set['key']] == tag_set['value']
-
-
-def test_format_empty_tags():
- unformatted_tags = {}
- formatted_tags = data_pipeline.format_tags(unformatted_tags)
- assert formatted_tags == []
-
-
-def test_pipeline_description(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- dp_id = dp_setup.data_pipeline_id
- pipelines = data_pipeline.pipeline_description(connection, dp_id)
- assert dp_id == pipelines['pipelineDescriptionList'][0]['pipelineId']
-
-
-def test_pipeline_description_nonexistent(placeboify, maybe_sleep):
- hypothetical_pipeline_id = "df-015440025PF7YGLDK47C"
- connection = placeboify.client('datapipeline')
- with pytest.raises(Exception) as error:
- data_pipeline.pipeline_description(connection, hypothetical_pipeline_id)
- assert error == data_pipeline.DataPipelineNotFound
-
-
-def test_check_dp_exists_true(placeboify, maybe_sleep, dp_setup):
- connection = placeboify.client('datapipeline')
- exists = data_pipeline.check_dp_exists(connection, dp_setup.data_pipeline_id)
- assert exists is True
-
-
-def test_check_dp_exists_false(placeboify, maybe_sleep):
- hypothetical_pipeline_id = "df-015440025PF7YGLDK47C"
- connection = placeboify.client('datapipeline')
- exists = data_pipeline.check_dp_exists(connection, hypothetical_pipeline_id)
- assert exists is False
-
-
-def test_check_dp_status(placeboify, maybe_sleep, dp_setup):
- inactive_states = ['INACTIVE', 'PENDING', 'FINISHED', 'DELETING']
- connection = placeboify.client('datapipeline')
- state = data_pipeline.check_dp_status(connection, dp_setup.data_pipeline_id, inactive_states)
- assert state is True
-
-
-def test_activate_pipeline(placeboify, maybe_sleep, dp_setup):
- # use objects to define pipeline before activating
- connection = placeboify.client('datapipeline')
- data_pipeline.define_pipeline(connection,
- module=dp_setup.module,
- objects=dp_setup.objects,
- dp_id=dp_setup.data_pipeline_id)
- changed, result = data_pipeline.activate_pipeline(connection, dp_setup.module)
- assert changed is True
diff --git a/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py b/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py
deleted file mode 100644
index 53cd8c4c2c..0000000000
--- a/test/units/modules/cloud/amazon/test_ec2_vpc_nat_gateway.py
+++ /dev/null
@@ -1,207 +0,0 @@
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-import unittest
-
-from mock import patch
-
-import ansible.modules.cloud.amazon.ec2_vpc_nat_gateway as ng
-
-
-boto3 = pytest.importorskip("boto3")
-botocore = pytest.importorskip("botocore")
-
-aws_region = 'us-west-2'
-
-
-class AnsibleEc2VpcNatGatewayFunctions(unittest.TestCase):
-
- def test_get_nat_gateways(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg, stream = (
- ng.get_nat_gateways(client, 'subnet-123456789', check_mode=True)
- )
- should_return = ng.DRY_RUN_GATEWAYS
- self.assertTrue(success)
- self.assertEqual(stream, should_return)
-
- def test_get_nat_gateways_no_gateways_found(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg, stream = (
- ng.get_nat_gateways(client, 'subnet-1234567', check_mode=True)
- )
- self.assertTrue(success)
- self.assertEqual(stream, [])
-
- def test_wait_for_status(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg, gws = (
- ng.wait_for_status(
- client, 5, 'nat-123456789', 'available', check_mode=True
- )
- )
- should_return = ng.DRY_RUN_GATEWAYS[0]
- self.assertTrue(success)
- self.assertEqual(gws, should_return)
-
- @patch('time.sleep')
- def test_wait_for_status_to_timeout(self, mock_sleep):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg, gws = (
- ng.wait_for_status(
- client, 2, 'nat-12345678', 'available', check_mode=True
- )
- )
- self.assertFalse(success)
- self.assertEqual(gws, {})
-
- def test_gateway_in_subnet_exists_with_allocation_id(self):
- client = boto3.client('ec2', region_name=aws_region)
- gws, err_msg = (
- ng.gateway_in_subnet_exists(
- client, 'subnet-123456789', 'eipalloc-1234567', check_mode=True
- )
- )
- should_return = ng.DRY_RUN_GATEWAYS
- self.assertEqual(gws, should_return)
-
- def test_gateway_in_subnet_exists_with_allocation_id_does_not_exist(self):
- client = boto3.client('ec2', region_name=aws_region)
- gws, err_msg = (
- ng.gateway_in_subnet_exists(
- client, 'subnet-123456789', 'eipalloc-123', check_mode=True
- )
- )
- should_return = list()
- self.assertEqual(gws, should_return)
-
- def test_gateway_in_subnet_exists_without_allocation_id(self):
- client = boto3.client('ec2', region_name=aws_region)
- gws, err_msg = (
- ng.gateway_in_subnet_exists(
- client, 'subnet-123456789', check_mode=True
- )
- )
- should_return = ng.DRY_RUN_GATEWAYS
- self.assertEqual(gws, should_return)
-
- def test_get_eip_allocation_id_by_address(self):
- client = boto3.client('ec2', region_name=aws_region)
- allocation_id, error_msg = (
- ng.get_eip_allocation_id_by_address(
- client, '55.55.55.55', check_mode=True
- )
- )
- should_return = 'eipalloc-1234567'
- self.assertEqual(allocation_id, should_return)
-
- def test_get_eip_allocation_id_by_address_does_not_exist(self):
- client = boto3.client('ec2', region_name=aws_region)
- allocation_id, err_msg = (
- ng.get_eip_allocation_id_by_address(
- client, '52.52.52.52', check_mode=True
- )
- )
- self.assertEqual(err_msg, 'EIP 52.52.52.52 does not exist')
- self.assertTrue(allocation_id is None)
-
- def test_allocate_eip_address(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg, eip_id = (
- ng.allocate_eip_address(
- client, check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_release_address(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, err_msg = (
- ng.release_address(
- client, 'eipalloc-1234567', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_create(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.create(
- client, 'subnet-123456', 'eipalloc-1234567', check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
-
- def test_pre_create(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.pre_create(
- client, 'subnet-123456', check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
-
- def test_pre_create_idemptotent_with_allocation_id(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.pre_create(
- client, 'subnet-123456789', allocation_id='eipalloc-1234567', check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertFalse(changed)
-
- def test_pre_create_idemptotent_with_eip_address(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.pre_create(
- client, 'subnet-123456789', eip_address='55.55.55.55', check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertFalse(changed)
-
- def test_pre_create_idemptotent_if_exist_do_not_create(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.pre_create(
- client, 'subnet-123456789', if_exist_do_not_create=True, check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertFalse(changed)
-
- def test_delete(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.remove(
- client, 'nat-123456789', check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
-
- def test_delete_and_release_ip(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.remove(
- client, 'nat-123456789', release_eip=True, check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
-
- def test_delete_if_does_not_exist(self):
- client = boto3.client('ec2', region_name=aws_region)
- success, changed, err_msg, results = (
- ng.remove(
- client, 'nat-12345', check_mode=True
- )
- )
- self.assertFalse(success)
- self.assertFalse(changed)
diff --git a/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py b/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py
deleted file mode 100644
index 5bf3b40f91..0000000000
--- a/test/units/modules/cloud/amazon/test_ec2_vpc_vpn.py
+++ /dev/null
@@ -1,352 +0,0 @@
-# (c) 2017 Red Hat Inc.
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-import os
-from units.utils.amazon_placebo_fixtures import placeboify, maybe_sleep
-from ansible.modules.cloud.amazon import ec2_vpc_vpn
-from ansible.module_utils.ec2 import get_aws_connection_info, boto3_conn, boto3_tag_list_to_ansible_dict
-
-
-class FakeModule(object):
- def __init__(self, **kwargs):
- self.params = kwargs
-
- def fail_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
- raise Exception('FAIL')
-
- def exit_json(self, *args, **kwargs):
- self.exit_args = args
- self.exit_kwargs = kwargs
-
-
-def get_vgw(connection):
- # see if two vgw exist and return them if so
- vgw = connection.describe_vpn_gateways(Filters=[{'Name': 'tag:Ansible_VPN', 'Values': ['Test']}])
- if len(vgw['VpnGateways']) >= 2:
- return [vgw['VpnGateways'][0]['VpnGatewayId'], vgw['VpnGateways'][1]['VpnGatewayId']]
- # otherwise create two and return them
- vgw_1 = connection.create_vpn_gateway(Type='ipsec.1')
- vgw_2 = connection.create_vpn_gateway(Type='ipsec.1')
- for resource in (vgw_1, vgw_2):
- connection.create_tags(Resources=[resource['VpnGateway']['VpnGatewayId']], Tags=[{'Key': 'Ansible_VPN', 'Value': 'Test'}])
- return [vgw_1['VpnGateway']['VpnGatewayId'], vgw_2['VpnGateway']['VpnGatewayId']]
-
-
-def get_cgw(connection):
- # see if two cgw exist and return them if so
- cgw = connection.describe_customer_gateways(DryRun=False, Filters=[{'Name': 'state', 'Values': ['available']},
- {'Name': 'tag:Name', 'Values': ['Ansible-CGW']}])
- if len(cgw['CustomerGateways']) >= 2:
- return [cgw['CustomerGateways'][0]['CustomerGatewayId'], cgw['CustomerGateways'][1]['CustomerGatewayId']]
- # otherwise create and return them
- cgw_1 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='9.8.7.6', BgpAsn=65000)
- cgw_2 = connection.create_customer_gateway(DryRun=False, Type='ipsec.1', PublicIp='5.4.3.2', BgpAsn=65000)
- for resource in (cgw_1, cgw_2):
- connection.create_tags(Resources=[resource['CustomerGateway']['CustomerGatewayId']], Tags=[{'Key': 'Ansible-CGW', 'Value': 'Test'}])
- return [cgw_1['CustomerGateway']['CustomerGatewayId'], cgw_2['CustomerGateway']['CustomerGatewayId']]
-
-
-def get_dependencies():
- if os.getenv('PLACEBO_RECORD'):
- module = FakeModule(**{})
- region, ec2_url, aws_connect_kwargs = get_aws_connection_info(module, boto3=True)
- connection = boto3_conn(module, conn_type='client', resource='ec2', region=region, endpoint=ec2_url, **aws_connect_kwargs)
- vgw = get_vgw(connection)
- cgw = get_cgw(connection)
- else:
- vgw = ["vgw-35d70c2b", "vgw-32d70c2c"]
- cgw = ["cgw-6113c87f", "cgw-9e13c880"]
-
- return cgw, vgw
-
-
-def setup_mod_conn(placeboify, params):
- conn = placeboify.client('ec2')
- m = FakeModule(**params)
- return m, conn
-
-
-def make_params(cgw, vgw, tags=None, filters=None, routes=None):
- tags = {} if tags is None else tags
- filters = {} if filters is None else filters
- routes = [] if routes is None else routes
-
- return {'customer_gateway_id': cgw,
- 'static_only': True,
- 'vpn_gateway_id': vgw,
- 'connection_type': 'ipsec.1',
- 'purge_tags': True,
- 'tags': tags,
- 'filters': filters,
- 'routes': routes,
- 'delay': 15,
- 'wait_timeout': 600}
-
-
-def make_conn(placeboify, module, connection):
- customer_gateway_id = module.params['customer_gateway_id']
- static_only = module.params['static_only']
- vpn_gateway_id = module.params['vpn_gateway_id']
- connection_type = module.params['connection_type']
- check_mode = module.params['check_mode']
- changed = True
- vpn = ec2_vpc_vpn.create_connection(connection, customer_gateway_id, static_only, vpn_gateway_id, connection_type)
- return changed, vpn
-
-
-def tear_down_conn(placeboify, connection, vpn_connection_id):
- ec2_vpc_vpn.delete_connection(connection, vpn_connection_id, delay=15, max_attempts=40)
-
-
-def test_find_connection_vpc_conn_id(placeboify, maybe_sleep):
- # setup dependencies for 2 vpn connections
- dependencies = setup_req(placeboify, 2)
- dep1, dep2 = dependencies[0], dependencies[1]
- params1, vpn1, m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection']
- params2, vpn2, m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection']
-
- # find the connection with a vpn_connection_id and assert it is the expected one
- assert vpn1['VpnConnectionId'] == ec2_vpc_vpn.find_connection(conn1, params1, vpn1['VpnConnectionId'])['VpnConnectionId']
-
- tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId'])
-
-
-def test_find_connection_filters(placeboify, maybe_sleep):
- # setup dependencies for 2 vpn connections
- dependencies = setup_req(placeboify, 2)
- dep1, dep2 = dependencies[0], dependencies[1]
- params1, vpn1, m1, conn1 = dep1['params'], dep1['vpn'], dep1['module'], dep1['connection']
- params2, vpn2, m2, conn2 = dep2['params'], dep2['vpn'], dep2['module'], dep2['connection']
-
- # update to different tags
- params1.update(tags={'Wrong': 'Tag'})
- params2.update(tags={'Correct': 'Tag'})
- ec2_vpc_vpn.ensure_present(conn1, params1)
- ec2_vpc_vpn.ensure_present(conn2, params2)
-
- # create some new parameters for a filter
- params = {'filters': {'tags': {'Correct': 'Tag'}}}
-
- # find the connection that has the parameters above
- found = ec2_vpc_vpn.find_connection(conn1, params)
-
- # assert the correct connection was found
- assert found['VpnConnectionId'] == vpn2['VpnConnectionId']
-
- # delete the connections
- tear_down_conn(placeboify, conn1, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn2, vpn2['VpnConnectionId'])
-
-
-def test_find_connection_insufficient_filters(placeboify, maybe_sleep):
- # get list of customer gateways and virtual private gateways
- cgw, vgw = get_dependencies()
-
- # create two connections with the same tags
- params = make_params(cgw[0], vgw[0], tags={'Correct': 'Tag'})
- params2 = make_params(cgw[1], vgw[1], tags={'Correct': 'Tag'})
- m, conn = setup_mod_conn(placeboify, params)
- m2, conn2 = setup_mod_conn(placeboify, params2)
- vpn1 = ec2_vpc_vpn.ensure_present(conn, m.params)[1]
- vpn2 = ec2_vpc_vpn.ensure_present(conn2, m2.params)[1]
-
- # reset the parameters so only filtering by tags will occur
- m.params = {'filters': {'tags': {'Correct': 'Tag'}}}
-
- # assert that multiple matching connections have been found
- with pytest.raises(Exception) as error_message:
- ec2_vpc_vpn.find_connection(conn, m.params)
- assert error_message == "More than one matching VPN connection was found.To modify or delete a VPN please specify vpn_connection_id or add filters."
-
- # delete the connections
- tear_down_conn(placeboify, conn, vpn1['VpnConnectionId'])
- tear_down_conn(placeboify, conn, vpn2['VpnConnectionId'])
-
-
-def test_find_connection_nonexistent(placeboify, maybe_sleep):
- # create parameters but don't create a connection with them
- params = {'filters': {'tags': {'Correct': 'Tag'}}}
- m, conn = setup_mod_conn(placeboify, params)
-
- # try to find a connection with matching parameters and assert None are found
- assert ec2_vpc_vpn.find_connection(conn, m.params) is None
-
-
-def test_create_connection(placeboify, maybe_sleep):
- # get list of customer gateways and virtual private gateways
- cgw, vgw = get_dependencies()
-
- # create a connection
- params = make_params(cgw[0], vgw[0])
- m, conn = setup_mod_conn(placeboify, params)
- changed, vpn = ec2_vpc_vpn.ensure_present(conn, m.params)
-
- # assert that changed is true and that there is a connection id
- assert changed is True
- assert 'VpnConnectionId' in vpn
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_create_connection_that_exists(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # try to recreate the same connection
- changed, vpn2 = ec2_vpc_vpn.ensure_present(conn, params)
-
- # nothing should have changed
- assert changed is False
- assert vpn['VpnConnectionId'] == vpn2['VpnConnectionId']
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_modify_deleted_connection(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # delete it
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
- # try to update the deleted connection
- m.params.update(vpn_connection_id=vpn['VpnConnectionId'])
- with pytest.raises(Exception) as error_message:
- ec2_vpc_vpn.ensure_present(conn, m.params)
- assert error_message == "There is no VPN connection available or pending with that id. Did you delete it?"
-
-
-def test_delete_connection(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # delete it
- changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params)
-
- assert changed is True
- assert vpn == {}
-
-
-def test_delete_nonexistent_connection(placeboify, maybe_sleep):
- # create parameters and ensure any connection matching (None) is deleted
- params = {'filters': {'tags': {'ThisConnection': 'DoesntExist'}}, 'delay': 15, 'wait_timeout': 600}
- m, conn = setup_mod_conn(placeboify, params)
- changed, vpn = ec2_vpc_vpn.ensure_absent(conn, m.params)
-
- assert changed is False
- assert vpn == {}
-
-
-def test_check_for_update_tags(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # add and remove a number of tags
- m.params['tags'] = {'One': 'one', 'Two': 'two'}
- ec2_vpc_vpn.ensure_present(conn, m.params)
- m.params['tags'] = {'Two': 'two', 'Three': 'three', 'Four': 'four'}
- changes = ec2_vpc_vpn.check_for_update(conn, m.params, vpn['VpnConnectionId'])
-
- flat_dict_changes = boto3_tag_list_to_ansible_dict(changes['tags_to_add'])
- correct_changes = boto3_tag_list_to_ansible_dict([{'Key': 'Three', 'Value': 'three'}, {'Key': 'Four', 'Value': 'four'}])
- assert flat_dict_changes == correct_changes
- assert changes['tags_to_remove'] == ['One']
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_check_for_update_nonmodifiable_attr(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
- current_vgw = params['vpn_gateway_id']
-
- # update a parameter that isn't modifiable
- m.params.update(vpn_gateway_id="invalidchange")
-
- err = 'You cannot modify vpn_gateway_id, the current value of which is {0}. Modifiable VPN connection attributes are tags.'.format(current_vgw)
- with pytest.raises(Exception) as error_message:
- ec2_vpc_vpn.check_for_update(m, conn, vpn['VpnConnectionId'])
- assert error_message == err
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_add_tags(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # add a tag to the connection
- ec2_vpc_vpn.add_tags(conn, vpn['VpnConnectionId'], add=[{'Key': 'Ansible-Test', 'Value': 'VPN'}])
-
- # assert tag is there
- current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert current_vpn['Tags'] == [{'Key': 'Ansible-Test', 'Value': 'VPN'}]
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_remove_tags(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # remove a tag from the connection
- ec2_vpc_vpn.remove_tags(conn, vpn['VpnConnectionId'], remove=['Ansible-Test'])
-
- # assert the tag is gone
- current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert 'Tags' not in current_vpn
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def test_add_routes(placeboify, maybe_sleep):
- # setup dependencies for 1 vpn connection
- dependencies = setup_req(placeboify, 1)
- params, vpn, m, conn = dependencies['params'], dependencies['vpn'], dependencies['module'], dependencies['connection']
-
- # create connection with a route
- ec2_vpc_vpn.add_routes(conn, vpn['VpnConnectionId'], ['195.168.2.0/24', '196.168.2.0/24'])
-
- # assert both routes are there
- current_vpn = ec2_vpc_vpn.find_connection(conn, params)
- assert set(each['DestinationCidrBlock'] for each in current_vpn['Routes']) == set(['195.168.2.0/24', '196.168.2.0/24'])
-
- # delete connection
- tear_down_conn(placeboify, conn, vpn['VpnConnectionId'])
-
-
-def setup_req(placeboify, number_of_results=1):
- ''' returns dependencies for VPN connections '''
- assert number_of_results in (1, 2)
- results = []
- cgw, vgw = get_dependencies()
- for each in range(0, number_of_results):
- params = make_params(cgw[each], vgw[each])
- m, conn = setup_mod_conn(placeboify, params)
- vpn = ec2_vpc_vpn.ensure_present(conn, params)[1]
-
- results.append({'module': m, 'connection': conn, 'vpn': vpn, 'params': params})
- if number_of_results == 1:
- return results[0]
- else:
- return results[0], results[1]
diff --git a/test/units/modules/cloud/amazon/test_iam_password_policy.py b/test/units/modules/cloud/amazon/test_iam_password_policy.py
deleted file mode 100644
index 85b828a130..0000000000
--- a/test/units/modules/cloud/amazon/test_iam_password_policy.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-
-from units.modules.utils import set_module_args
-from ansible.module_utils.ec2 import HAS_BOTO3
-
-if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("iam_password_policy.py requires the `boto3` and `botocore` modules")
-else:
- import boto3
- from ansible.modules.cloud.amazon import iam_password_policy
-
-
-def test_warn_if_state_not_specified():
- set_module_args({
- "min_pw_length": "8",
- "require_symbols": "false",
- "require_numbers": "true",
- "require_uppercase": "true",
- "require_lowercase": "true",
- "allow_pw_change": "true",
- "pw_max_age": "60",
- "pw_reuse_prevent": "5",
- "pw_expire": "false"
- })
- with pytest.raises(SystemExit):
- print(iam_password_policy.main())
diff --git a/test/units/modules/cloud/amazon/test_kinesis_stream.py b/test/units/modules/cloud/amazon/test_kinesis_stream.py
deleted file mode 100644
index e549ae9d11..0000000000
--- a/test/units/modules/cloud/amazon/test_kinesis_stream.py
+++ /dev/null
@@ -1,330 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-import unittest
-
-boto3 = pytest.importorskip("boto3")
-botocore = pytest.importorskip("botocore")
-
-from ansible.modules.cloud.amazon import kinesis_stream
-
-aws_region = 'us-west-2'
-
-
-class AnsibleKinesisStreamFunctions(unittest.TestCase):
-
- def test_convert_to_lower(self):
- example = {
- 'HasMoreShards': True,
- 'RetentionPeriodHours': 24,
- 'StreamName': 'test',
- 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
- 'StreamStatus': 'ACTIVE'
- }
- converted_example = kinesis_stream.convert_to_lower(example)
- keys = list(converted_example.keys())
- keys.sort()
- for i in range(len(keys)):
- if i == 0:
- self.assertEqual(keys[i], 'has_more_shards')
- if i == 1:
- self.assertEqual(keys[i], 'retention_period_hours')
- if i == 2:
- self.assertEqual(keys[i], 'stream_arn')
- if i == 3:
- self.assertEqual(keys[i], 'stream_name')
- if i == 4:
- self.assertEqual(keys[i], 'stream_status')
-
- def test_make_tags_in_aws_format(self):
- example = {
- 'env': 'development'
- }
- should_return = [
- {
- 'Key': 'env',
- 'Value': 'development'
- }
- ]
- aws_tags = kinesis_stream.make_tags_in_aws_format(example)
- self.assertEqual(aws_tags, should_return)
-
- def test_make_tags_in_proper_format(self):
- example = [
- {
- 'Key': 'env',
- 'Value': 'development'
- },
- {
- 'Key': 'service',
- 'Value': 'web'
- }
- ]
- should_return = {
- 'env': 'development',
- 'service': 'web'
- }
- proper_tags = kinesis_stream.make_tags_in_proper_format(example)
- self.assertEqual(proper_tags, should_return)
-
- def test_recreate_tags_from_list(self):
- example = [('environment', 'development'), ('service', 'web')]
- should_return = [
- {
- 'Key': 'environment',
- 'Value': 'development'
- },
- {
- 'Key': 'service',
- 'Value': 'web'
- }
- ]
- aws_tags = kinesis_stream.recreate_tags_from_list(example)
- self.assertEqual(aws_tags, should_return)
-
- def test_get_tags(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True)
- self.assertTrue(success)
- should_return = [
- {
- 'Key': 'DryRunMode',
- 'Value': 'true'
- }
- ]
- self.assertEqual(tags, should_return)
-
- def test_find_stream(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg, stream = (
- kinesis_stream.find_stream(client, 'test', check_mode=True)
- )
- should_return = {
- 'OpenShardsCount': 5,
- 'ClosedShardsCount': 0,
- 'ShardsCount': 5,
- 'HasMoreShards': True,
- 'RetentionPeriodHours': 24,
- 'StreamName': 'test',
- 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
- 'StreamStatus': 'ACTIVE',
- 'EncryptionType': 'NONE'
- }
- self.assertTrue(success)
- self.assertEqual(stream, should_return)
-
- def test_wait_for_status(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg, stream = (
- kinesis_stream.wait_for_status(
- client, 'test', 'ACTIVE', check_mode=True
- )
- )
- should_return = {
- 'OpenShardsCount': 5,
- 'ClosedShardsCount': 0,
- 'ShardsCount': 5,
- 'HasMoreShards': True,
- 'RetentionPeriodHours': 24,
- 'StreamName': 'test',
- 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
- 'StreamStatus': 'ACTIVE',
- 'EncryptionType': 'NONE'
- }
- self.assertTrue(success)
- self.assertEqual(stream, should_return)
-
- def test_tags_action_create(self):
- client = boto3.client('kinesis', region_name=aws_region)
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, err_msg = (
- kinesis_stream.tags_action(
- client, 'test', tags, 'create', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_tags_action_delete(self):
- client = boto3.client('kinesis', region_name=aws_region)
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, err_msg = (
- kinesis_stream.tags_action(
- client, 'test', tags, 'delete', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_tags_action_invalid(self):
- client = boto3.client('kinesis', region_name=aws_region)
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, err_msg = (
- kinesis_stream.tags_action(
- client, 'test', tags, 'append', check_mode=True
- )
- )
- self.assertFalse(success)
-
- def test_update_tags(self):
- client = boto3.client('kinesis', region_name=aws_region)
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, changed, err_msg = (
- kinesis_stream.update_tags(
- client, 'test', tags, check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_stream_action_create(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.stream_action(
- client, 'test', 10, 'create', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_stream_action_delete(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.stream_action(
- client, 'test', 10, 'delete', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_stream_action_invalid(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.stream_action(
- client, 'test', 10, 'append', check_mode=True
- )
- )
- self.assertFalse(success)
-
- def test_retention_action_increase(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.retention_action(
- client, 'test', 48, 'increase', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_retention_action_decrease(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.retention_action(
- client, 'test', 24, 'decrease', check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_retention_action_invalid(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.retention_action(
- client, 'test', 24, 'create', check_mode=True
- )
- )
- self.assertFalse(success)
-
- def test_update_shard_count(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, err_msg = (
- kinesis_stream.update_shard_count(
- client, 'test', 5, check_mode=True
- )
- )
- self.assertTrue(success)
-
- def test_update(self):
- client = boto3.client('kinesis', region_name=aws_region)
- current_stream = {
- 'OpenShardsCount': 5,
- 'ClosedShardsCount': 0,
- 'ShardsCount': 1,
- 'HasMoreShards': True,
- 'RetentionPeriodHours': 24,
- 'StreamName': 'test',
- 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test',
- 'StreamStatus': 'ACTIVE',
- 'EncryptionType': 'NONE'
- }
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, changed, err_msg = (
- kinesis_stream.update(
- client, current_stream, 'test', number_of_shards=2, retention_period=48,
- tags=tags, check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
- self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
-
- def test_create_stream(self):
- client = boto3.client('kinesis', region_name=aws_region)
- tags = {
- 'env': 'development',
- 'service': 'web'
- }
- success, changed, err_msg, results = (
- kinesis_stream.create_stream(
- client, 'test', number_of_shards=10, retention_period=48,
- tags=tags, check_mode=True
- )
- )
- should_return = {
- 'open_shards_count': 5,
- 'closed_shards_count': 0,
- 'shards_count': 5,
- 'has_more_shards': True,
- 'retention_period_hours': 24,
- 'stream_name': 'test',
- 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test',
- 'stream_status': 'ACTIVE',
- 'encryption_type': 'NONE',
- 'tags': tags,
- }
- self.assertTrue(success)
- self.assertTrue(changed)
- self.assertEqual(results, should_return)
- self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.')
-
- def test_enable_stream_encription(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, changed, err_msg, results = (
- kinesis_stream.start_stream_encryption(
- client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
- self.assertEqual(err_msg, 'Kinesis Stream test encryption started successfully.')
-
- def test_dsbale_stream_encryption(self):
- client = boto3.client('kinesis', region_name=aws_region)
- success, changed, err_msg, results = (
- kinesis_stream.stop_stream_encryption(
- client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True
- )
- )
- self.assertTrue(success)
- self.assertTrue(changed)
- self.assertEqual(err_msg, 'Kinesis Stream test encryption stopped successfully.')
diff --git a/test/units/modules/cloud/amazon/test_lambda.py b/test/units/modules/cloud/amazon/test_lambda.py
deleted file mode 100644
index 14ea2b454c..0000000000
--- a/test/units/modules/cloud/amazon/test_lambda.py
+++ /dev/null
@@ -1,273 +0,0 @@
-#
-# (c) 2017 Michael De La Rue
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import copy
-import pytest
-
-from units.compat.mock import MagicMock, Mock, patch
-from ansible.module_utils import basic
-from units.modules.utils import set_module_args
-
-
-boto3 = pytest.importorskip("boto3")
-
-# lambda is a keyword so we have to hack this.
-_temp = __import__("ansible.modules.cloud.amazon.lambda")
-lda = getattr(_temp.modules.cloud.amazon, "lambda")
-
-
-base_lambda_config = {
- 'FunctionName': 'lambda_name',
- 'Role': 'arn:aws:iam::987654321012:role/lambda_basic_execution',
- 'Handler': 'lambda_python.my_handler',
- 'Description': 'this that the other',
- 'Timeout': 3,
- 'MemorySize': 128,
- 'Runtime': 'python2.7',
- 'CodeSha256': 'AqMZ+xptM7aC9VXu+5jyp1sqO+Nj4WFMNzQxtPMP2n8=',
-}
-
-one_change_lambda_config = copy.copy(base_lambda_config)
-one_change_lambda_config['Timeout'] = 4
-two_change_lambda_config = copy.copy(one_change_lambda_config)
-two_change_lambda_config['Role'] = 'arn:aws:iam::987654321012:role/lambda_advanced_execution'
-code_change_lambda_config = copy.copy(base_lambda_config)
-code_change_lambda_config['CodeSha256'] = 'P+Zy8U4T4RiiHWElhL10VBKj9jw4rSJ5bm/TiW+4Rts='
-
-base_module_args = {
- "region": "us-west-1",
- "name": "lambda_name",
- "state": "present",
- "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip",
- "runtime": 'python2.7',
- "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution',
- "memory_size": 128,
- "timeout": 3,
- "handler": 'lambda_python.my_handler'
-}
-module_args_with_environment = dict(base_module_args, environment_variables={
- "variable_name": "variable_value"
-})
-
-
-def make_mock_no_connection_connection(config):
- """return a mock of ansible's boto3_conn ready to return a mock AWS API client"""
- lambda_client_double = MagicMock()
- lambda_client_double.get_function.configure_mock(
- return_value=False
- )
- lambda_client_double.update_function_configuration.configure_mock(
- return_value={
- 'Version': 1
- }
- )
- fake_boto3_conn = Mock(return_value=lambda_client_double)
- return (fake_boto3_conn, lambda_client_double)
-
-
-def make_mock_connection(config):
- """return a mock of ansible's boto3_conn ready to return a mock AWS API client"""
- lambda_client_double = MagicMock()
- lambda_client_double.get_function.configure_mock(
- return_value={
- 'Configuration': config
- }
- )
- lambda_client_double.update_function_configuration.configure_mock(
- return_value={
- 'Version': 1
- }
- )
- fake_boto3_conn = Mock(return_value=lambda_client_double)
- return (fake_boto3_conn, lambda_client_double)
-
-
-class AnsibleFailJson(Exception):
- pass
-
-
-def fail_json_double(*args, **kwargs):
- """works like fail_json but returns module results inside exception instead of stdout"""
- kwargs['failed'] = True
- raise AnsibleFailJson(kwargs)
-
-
-# TODO: def test_handle_different_types_in_config_params():
-
-
-def test_create_lambda_if_not_exist():
-
- set_module_args(base_module_args)
- (boto3_conn_double, lambda_client_double) = make_mock_no_connection_connection(code_change_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \
- "unexpectedly updated lambda configuration when should have only created"
- assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \
- "update lambda function code when function should have been created only"
- assert(len(lambda_client_double.create_function.mock_calls) > 0), \
- "failed to call create_function "
- (create_args, create_kwargs) = lambda_client_double.create_function.call_args
- assert (len(create_kwargs) > 0), "expected create called with keyword args, none found"
-
- try:
- # For now I assume that we should NOT send an empty environment. It might
- # be okay / better to explicitly send an empty environment. However `None'
- # is not acceptable - mikedlr
- create_kwargs["Environment"]
- raise(Exception("Environment sent to boto when none expected"))
- except KeyError:
- pass # We are happy, no environment is fine
-
-
-def test_update_lambda_if_code_changed():
-
- set_module_args(base_module_args)
- (boto3_conn_double, lambda_client_double) = make_mock_connection(code_change_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \
- "unexpectedly updatede lambda configuration when only code changed"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \
- "lambda function update called multiple times when only one time should be needed"
- assert(len(lambda_client_double.update_function_code.mock_calls) > 1), \
- "failed to update lambda function when code changed"
- # 3 because after uploading we call into the return from mock to try to find what function version
- # was returned so the MagicMock actually sees two calls for one update.
- assert(len(lambda_client_double.update_function_code.mock_calls) < 3), \
- "lambda function code update called multiple times when only one time should be needed"
-
-
-def test_update_lambda_if_config_changed():
-
- set_module_args(base_module_args)
- (boto3_conn_double, lambda_client_double) = make_mock_connection(two_change_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \
- "failed to update lambda function when configuration changed"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \
- "lambda function update called multiple times when only one time should be needed"
- assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \
- "updated lambda code when no change should have happened"
-
-
-def test_update_lambda_if_only_one_config_item_changed():
-
- set_module_args(base_module_args)
- (boto3_conn_double, lambda_client_double) = make_mock_connection(one_change_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \
- "failed to update lambda function when configuration changed"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \
- "lambda function update called multiple times when only one time should be needed"
- assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \
- "updated lambda code when no change should have happened"
-
-
-def test_update_lambda_if_added_environment_variable():
-
- set_module_args(module_args_with_environment)
- (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) > 0), \
- "failed to update lambda function when configuration changed"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) < 2), \
- "lambda function update called multiple times when only one time should be needed"
- assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \
- "updated lambda code when no change should have happened"
-
- (update_args, update_kwargs) = lambda_client_double.update_function_configuration.call_args
- assert (len(update_kwargs) > 0), "expected update configuration called with keyword args, none found"
- assert update_kwargs['Environment']['Variables'] == module_args_with_environment['environment_variables']
-
-
-def test_dont_update_lambda_if_nothing_changed():
- set_module_args(base_module_args)
- (boto3_conn_double, lambda_client_double) = make_mock_connection(base_lambda_config)
-
- with patch.object(lda, 'boto3_conn', boto3_conn_double):
- try:
- lda.main()
- except SystemExit:
- pass
-
- # guard against calling other than for a lambda connection (e.g. IAM)
- assert(len(boto3_conn_double.mock_calls) > 0), "boto connections never used"
- assert(len(boto3_conn_double.mock_calls) < 2), "multiple boto connections used unexpectedly"
- assert(len(lambda_client_double.update_function_configuration.mock_calls) == 0), \
- "updated lambda function when no configuration changed"
- assert(len(lambda_client_double.update_function_code.mock_calls) == 0), \
- "updated lambda code when no change should have happened"
-
-
-def test_warn_region_not_specified():
-
- set_module_args({
- "name": "lambda_name",
- "state": "present",
- # Module is called without a region causing error
- # "region": "us-east-1",
- "zip_file": "test/units/modules/cloud/amazon/fixtures/thezip.zip",
- "runtime": 'python2.7',
- "role": 'arn:aws:iam::987654321012:role/lambda_basic_execution',
- "handler": 'lambda_python.my_handler'})
-
- get_aws_connection_info_double = Mock(return_value=(None, None, None))
-
- with patch.object(lda, 'get_aws_connection_info', get_aws_connection_info_double):
- with patch.object(basic.AnsibleModule, 'fail_json', fail_json_double):
- try:
- lda.main()
- except AnsibleFailJson as e:
- result = e.args[0]
- assert("region must be specified" in result['msg'])
diff --git a/test/units/modules/cloud/amazon/test_lambda_policy.py b/test/units/modules/cloud/amazon/test_lambda_policy.py
deleted file mode 100644
index 5c32370469..0000000000
--- a/test/units/modules/cloud/amazon/test_lambda_policy.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#
-# (c) 2017 Michael De La Rue
-#
-# This file is part of Ansible
-# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import copy
-
-import pytest
-
-from ansible.module_utils.aws.core import HAS_BOTO3
-from units.compat.mock import MagicMock
-from units.modules.utils import set_module_args
-
-if not HAS_BOTO3:
- pytestmark = pytest.mark.skip("test_api_gateway.py requires the `boto3` and `botocore` modules")
-
-# these are here cause ... boto!
-from ansible.modules.cloud.amazon import lambda_policy
-from ansible.modules.cloud.amazon.lambda_policy import setup_module_object
-try:
- from botocore.exceptions import ClientError
-except ImportError:
- pass
-
-
-base_module_args = {
- "region": "us-west-1",
- "function_name": "this_is_a_test_function",
- "state": "present",
- "statement_id": "test-allow-lambda",
- "principal": 123456,
- "action": "lambda:*"
-}
-
-
-def test_module_is_created_sensibly():
- set_module_args(base_module_args)
- module = setup_module_object()
- assert module.params['function_name'] == 'this_is_a_test_function'
-
-
-module_double = MagicMock()
-module_double.fail_json_aws.side_effect = Exception("unexpected call to fail_json_aws")
-module_double.check_mode = False
-
-fake_module_params_present = {
- "state": "present",
- "statement_id": "test-allow-lambda",
- "principal": "apigateway.amazonaws.com",
- "action": "lambda:InvokeFunction",
- "source_arn": u'arn:aws:execute-api:us-east-1:123456789:efghijklmn/authorizers/*',
- "version": 0,
- "alias": None
-}
-fake_module_params_different = copy.deepcopy(fake_module_params_present)
-fake_module_params_different["action"] = "lambda:api-gateway"
-fake_module_params_absent = copy.deepcopy(fake_module_params_present)
-fake_module_params_absent["state"] = "absent"
-
-fake_policy_return = {
- u'Policy': (
- u'{"Version":"2012-10-17","Id":"default","Statement":[{"Sid":"1234567890abcdef1234567890abcdef",'
- u'"Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},"Action":"lambda:InvokeFunction",'
- u'"Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",'
- u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:abcdefghij/authorizers/1a2b3c"}}},'
- u'{"Sid":"2234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},'
- u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",'
- u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:klmnopqrst/authorizers/4d5f6g"}}},'
- u'{"Sid":"1234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},'
- u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",'
- u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:eu-west-1:123456789:uvwxyzabcd/authorizers/7h8i9j"}}},'
- u'{"Sid":"test-allow-lambda","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},'
- u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",'
- u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:efghijklmn/authorizers/*"}}},'
- u'{"Sid":"1234567890abcdef1234567890abcdef","Effect":"Allow","Principal":{"Service":"apigateway.amazonaws.com"},'
- u'"Action":"lambda:InvokeFunction","Resource":"arn:aws:lambda:us-east-1:123456789:function:test_authorizer",'
- u'"Condition":{"ArnLike":{"AWS:SourceArn":"arn:aws:execute-api:us-east-1:123456789:opqrstuvwx/authorizers/0k1l2m"}}}]}'),
- 'ResponseMetadata': {
- 'RetryAttempts': 0,
- 'HTTPStatusCode': 200,
- 'RequestId': 'abcdefgi-1234-a567-b890-123456789abc',
- 'HTTPHeaders': {
- 'date': 'Sun, 13 Aug 2017 10:54:17 GMT',
- 'x-amzn-requestid': 'abcdefgi-1234-a567-b890-123456789abc',
- 'content-length': '1878',
- 'content-type': 'application/json',
- 'connection': 'keep-alive'}}}
-
-error_response = {'Error': {'Code': 'ResourceNotFoundException', 'Message': 'Fake Testing Error'}}
-operation_name = 'FakeOperation'
-
-
-def test_manage_state_adds_missing_permissions():
- lambda_client_double = MagicMock()
- # Policy actually: not present Requested State: present Should: create
- lambda_client_double.get_policy.side_effect = ClientError(error_response, operation_name)
- fake_module_params = copy.deepcopy(fake_module_params_present)
- module_double.params = fake_module_params
- lambda_policy.manage_state(module_double, lambda_client_double)
- assert lambda_client_double.get_policy.call_count > 0
- assert lambda_client_double.add_permission.call_count > 0
- lambda_client_double.remove_permission.assert_not_called()
-
-
-def test_manage_state_leaves_existing_permissions():
- lambda_client_double = MagicMock()
- # Policy actually: present Requested State: present Should: do nothing
- lambda_client_double.get_policy.return_value = fake_policy_return
- fake_module_params = copy.deepcopy(fake_module_params_present)
- module_double.params = fake_module_params
- lambda_policy.manage_state(module_double, lambda_client_double)
- assert lambda_client_double.get_policy.call_count > 0
- lambda_client_double.add_permission.assert_not_called()
- lambda_client_double.remove_permission.assert_not_called()
-
-
-def test_manage_state_updates_nonmatching_permissions():
- lambda_client_double = MagicMock()
- # Policy actually: present Requested State: present Should: do nothing
- lambda_client_double.get_policy.return_value = fake_policy_return
- fake_module_params = copy.deepcopy(fake_module_params_different)
- module_double.params = fake_module_params
- lambda_policy.manage_state(module_double, lambda_client_double)
- assert lambda_client_double.get_policy.call_count > 0
- assert lambda_client_double.add_permission.call_count > 0
- assert lambda_client_double.remove_permission.call_count > 0
-
-
-def test_manage_state_removes_unwanted_permissions():
- lambda_client_double = MagicMock()
- # Policy actually: present Requested State: not present Should: remove
- lambda_client_double.get_policy.return_value = fake_policy_return
- fake_module_params = copy.deepcopy(fake_module_params_absent)
- module_double.params = fake_module_params
- lambda_policy.manage_state(module_double, lambda_client_double)
- assert lambda_client_double.get_policy.call_count > 0
- lambda_client_double.add_permission.assert_not_called()
- assert lambda_client_double.remove_permission.call_count > 0
-
-
-def test_manage_state_leaves_already_removed_permissions():
- lambda_client_double = MagicMock()
- # Policy actually: absent Requested State: absent Should: do nothing
- lambda_client_double.get_policy.side_effect = ClientError(error_response, operation_name)
- fake_module_params = copy.deepcopy(fake_module_params_absent)
- module_double.params = fake_module_params
- lambda_policy.manage_state(module_double, lambda_client_double)
- assert lambda_client_double.get_policy.call_count > 0
- lambda_client_double.add_permission.assert_not_called()
- lambda_client_double.remove_permission.assert_not_called()
diff --git a/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py b/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py
deleted file mode 100644
index 1891b5c890..0000000000
--- a/test/units/modules/cloud/amazon/test_redshift_cross_region_snapshots.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-from ansible.modules.cloud.amazon import redshift_cross_region_snapshots as rcrs
-
-mock_status_enabled = {
- 'SnapshotCopyGrantName': 'snapshot-us-east-1-to-us-west-2',
- 'DestinationRegion': 'us-west-2',
- 'RetentionPeriod': 1,
-}
-
-mock_status_disabled = {}
-
-mock_request_illegal = {
- 'snapshot_copy_grant': 'changed',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 1
-}
-
-mock_request_update = {
- 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 3
-}
-
-mock_request_no_update = {
- 'snapshot_copy_grant': 'snapshot-us-east-1-to-us-west-2',
- 'destination_region': 'us-west-2',
- 'snapshot_retention_period': 1
-}
-
-
-def test_fail_at_unsupported_operations():
- response = rcrs.requesting_unsupported_modifications(
- mock_status_enabled, mock_request_illegal
- )
- assert response is True
-
-
-def test_needs_update_true():
- response = rcrs.needs_update(mock_status_enabled, mock_request_update)
- assert response is True
-
-
-def test_no_change():
- response = rcrs.requesting_unsupported_modifications(
- mock_status_enabled, mock_request_no_update
- )
- needs_update_response = rcrs.needs_update(mock_status_enabled, mock_request_no_update)
- assert response is False
- assert needs_update_response is False
diff --git a/test/units/modules/cloud/amazon/test_route53_zone.py b/test/units/modules/cloud/amazon/test_route53_zone.py
deleted file mode 100644
index 283584a4dd..0000000000
--- a/test/units/modules/cloud/amazon/test_route53_zone.py
+++ /dev/null
@@ -1,610 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import functools
-
-from ansible.modules.cloud.amazon import route53_zone
-from units.compat import unittest
-from units.compat.mock import patch, call
-from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
-
-
-def parameterized(params_list):
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- for params_map in params_list:
- params_map.update(kwargs)
- func(*args, **params_map)
- return wrapper
- return decorator
-
-
-# Inline and replace with subdict.items() <= superdict.items(), when Python 2.6 compat can be dropped
-def is_subdict(subdict, superdict):
- return all(superdict[k] == v for k, v in subdict.items())
-
-
-@patch('ansible.module_utils.aws.core.HAS_BOTO3', new=True)
-@patch.object(route53_zone.AnsibleAWSModule, 'client')
-@patch.object(route53_zone.time, 'time', return_value=1)
-class TestRoute53Module(ModuleTestCase):
- def test_mutually_exclusive(self, *args):
- with self.assertRaises(AnsibleFailJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'vpc_id': 'vpc-94ccc2ff',
- 'vpc_region': 'eu-central-1',
- 'comment': 'foobar',
- 'delegation_set_id': 'A1BCDEF2GHIJKL',
- 'state': 'present',
- })
- route53_zone.main()
-
- self.assertEqual(
- exec_info.exception.args[0]['msg'],
- 'parameters are mutually exclusive: delegation_set_id|vpc_id, delegation_set_id|vpc_region',
- )
-
- @parameterized([
- {
- 'check_mode': False,
- 'response': {
- 'private_zone': False,
- 'vpc_id': None,
- 'vpc_region': None,
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': '',
- 'zone_id': 'ZONE_ID',
- },
- },
- {
- 'check_mode': True,
- 'response': {
- 'private_zone': False,
- 'vpc_id': None,
- 'vpc_region': None,
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': None,
- },
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[])
- def test_create_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response):
- client_mock.return_value.create_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {
- 'Comment': 'foobar',
- 'PrivateZone': False,
- },
- },
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': 'foobar',
- 'state': 'present',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.create_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.create_hosted_zone.assert_called_once_with(**{
- 'HostedZoneConfig': {
- 'Comment': 'foobar',
- 'PrivateZone': False,
- },
- 'Name': 'example.com.',
- 'CallerReference': 'example.com.-1',
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @parameterized([
- {
- 'check_mode': False,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID',
- },
- },
- {
- 'check_mode': True,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': None,
- },
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[])
- def test_create_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response):
- client_mock.return_value.create_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {
- 'Comment': 'foobar',
- 'PrivateZone': True
- },
- },
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': 'foobar',
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'state': 'present',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.create_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.create_hosted_zone.assert_called_once_with(**{
- 'HostedZoneConfig': {
- 'Comment': 'foobar',
- 'PrivateZone': True,
- },
- 'Name': 'example.com.',
- 'CallerReference': 'example.com.-1',
- 'VPC': {
- 'VPCRegion': 'eu-central-1',
- 'VPCId': 'vpc-1',
- },
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @parameterized([
- {
- 'check_mode': False,
- 'response': {
- 'private_zone': False,
- 'vpc_id': None,
- 'vpc_region': None,
- 'comment': 'new',
- 'name': 'example.com.',
- 'delegation_set_id': '',
- 'zone_id': 'ZONE_ID',
- },
- },
- {
- 'check_mode': True,
- 'response': {
- 'private_zone': False,
- 'vpc_id': None,
- 'vpc_region': None,
- 'comment': 'new',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID',
- },
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- }])
- def test_update_comment_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- },
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': 'new',
- 'state': 'present',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.update_hosted_zone_comment.assert_not_called()
- else:
- client_mock.return_value.update_hosted_zone_comment.assert_called_once_with(**{
- 'Id': '/hostedzone/ZONE_ID',
- 'Comment': 'new',
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/Z22OU4IUOVYM30',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- }])
- def test_update_public_zone_no_changes(self, find_zones_mock, time_mock, client_mock):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- },
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': '',
- 'state': 'present',
- })
- route53_zone.main()
-
- client_mock.return_value.update_hosted_zone_comment.assert_not_called()
- self.assertEqual(exec_info.exception.args[0]['changed'], False)
-
- @parameterized([
- {
- 'check_mode': False,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'comment': 'new',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID',
- },
- },
- {
- 'check_mode': True,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'comment': 'new',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID',
- },
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- }])
- def test_update_comment_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- },
- 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}],
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': 'new',
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'state': 'present',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.update_hosted_zone_comment.assert_not_called()
- else:
- client_mock.return_value.update_hosted_zone_comment.assert_called_once_with(**{
- 'Id': '/hostedzone/ZONE_ID',
- 'Comment': 'new',
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @parameterized([
- {
- 'check_mode': False,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-2',
- 'vpc_region': 'us-east-2',
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID_2',
- },
- },
- {
- 'check_mode': True,
- 'response': {
- 'private_zone': True,
- 'vpc_id': 'vpc-2',
- 'vpc_region': 'us-east-2',
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': None,
- },
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- }])
- def test_update_vpc_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode, response):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- },
- 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}],
- }
- client_mock.return_value.create_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID_2',
- 'Name': 'example.com.',
- 'Config': {
- 'Comment': 'foobar',
- 'PrivateZone': True
- },
- },
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'us-east-2',
- 'zone': 'example.com',
- 'comment': 'foobar',
- 'vpc_id': 'vpc-2',
- 'vpc_region': 'us-east-2',
- 'state': 'present',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.create_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.create_hosted_zone.assert_called_once_with(**{
- 'HostedZoneConfig': {
- 'Comment': 'foobar',
- 'PrivateZone': True,
- },
- 'Name': 'example.com.',
- 'CallerReference': 'example.com.-1',
- 'VPC': {
- 'VPCRegion': 'us-east-2',
- 'VPCId': 'vpc-2',
- },
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- }])
- def test_update_private_zone_no_changes(self, find_zones_mock, time_mock, client_mock):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- },
- 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}],
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'comment': 'foobar',
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'state': 'present',
- })
- route53_zone.main()
-
- client_mock.return_value.update_hosted_zone_comment.assert_not_called()
- self.assertEqual(exec_info.exception.args[0]['changed'], False)
-
- response = {
- 'private_zone': True,
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'comment': 'foobar',
- 'name': 'example.com.',
- 'delegation_set_id': None,
- 'zone_id': 'ZONE_ID',
- }
- self.assertTrue(is_subdict(response, exec_info.exception.args[0]))
-
- @parameterized([
- {'check_mode': False},
- {'check_mode': True}
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- }])
- def test_delete_public_zone(self, find_zones_mock, time_mock, client_mock, check_mode):
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'state': 'absent',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.delete_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.delete_hosted_zone.assert_called_once_with(**{
- 'Id': '/hostedzone/ZONE_ID',
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
-
- @parameterized([
- {'check_mode': False},
- {'check_mode': True}
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- }])
- def test_delete_private_zone(self, find_zones_mock, time_mock, client_mock, check_mode):
- client_mock.return_value.get_hosted_zone.return_value = {
- 'HostedZone': {
- 'Id': '/hostedzone/ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- },
- 'VPCs': [{'VPCRegion': 'eu-central-1', 'VPCId': 'vpc-1'}],
- }
-
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'vpc_id': 'vpc-1',
- 'vpc_region': 'eu-central-1',
- 'state': 'absent',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.delete_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.delete_hosted_zone.assert_called_once_with(**{
- 'Id': '/hostedzone/ZONE_ID',
- })
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
-
- @parameterized([
- {'check_mode': False},
- {'check_mode': True}
- ])
- @parameterized([
- {
- 'hosted_zone_id': 'PRIVATE_ZONE_ID',
- 'call_params': [call(**{
- 'Id': 'PRIVATE_ZONE_ID',
- })],
- }, {
- 'hosted_zone_id': 'all',
- 'call_params': [call(**{
- 'Id': '/hostedzone/PUBLIC_ZONE_ID',
- }), call(**{
- 'Id': '/hostedzone/PRIVATE_ZONE_ID',
- })],
- }
- ])
- @patch.object(route53_zone, 'find_zones', return_value=[{
- 'Id': '/hostedzone/PUBLIC_ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': '', 'PrivateZone': False},
- }, {
- 'Id': '/hostedzone/PRIVATE_ZONE_ID',
- 'Name': 'example.com.',
- 'Config': {'Comment': 'foobar', 'PrivateZone': True},
- }])
- def test_delete_by_zone_id(self, find_zones_mock, time_mock, client_mock, hosted_zone_id, call_params, check_mode):
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'hosted_zone_id': hosted_zone_id,
- 'state': 'absent',
- '_ansible_check_mode': check_mode,
- })
- route53_zone.main()
-
- if check_mode:
- client_mock.return_value.delete_hosted_zone.assert_not_called()
- else:
- client_mock.return_value.delete_hosted_zone.assert_has_calls(call_params)
-
- self.assertEqual(exec_info.exception.args[0]['changed'], True)
-
- @patch.object(route53_zone, 'find_zones', return_value=[])
- def test_delete_absent_zone(self, find_zones_mock, time_mock, client_mock):
- with self.assertRaises(AnsibleExitJson) as exec_info:
- set_module_args({
- 'secret_key': 'SECRET_KEY',
- 'access_key': 'ACCESS_KEY',
- 'region': 'eu-central-1',
- 'zone': 'example.com',
- 'state': 'absent',
- })
- route53_zone.main()
-
- client_mock.return_value.delete_hosted_zone.assert_not_called()
- self.assertEqual(exec_info.exception.args[0]['changed'], False)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/units/modules/cloud/amazon/test_s3_bucket_notification.py b/test/units/modules/cloud/amazon/test_s3_bucket_notification.py
deleted file mode 100644
index cf342064c0..0000000000
--- a/test/units/modules/cloud/amazon/test_s3_bucket_notification.py
+++ /dev/null
@@ -1,262 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-import pytest
-
-from units.compat.mock import MagicMock, patch
-from units.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
-
-from ansible.modules.cloud.amazon.s3_bucket_notification import AmazonBucket, Config
-from ansible.modules.cloud.amazon import s3_bucket_notification
-try:
- from botocore.exceptions import ClientError
-except ImportError:
- pass
-
-
-class TestAmazonBucketOperations:
- def test_current_config(self):
- api_config = {
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-arn',
- 'Events': [],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': ''
- }, {
- 'Name': 'Suffix',
- 'Value': ''
- }]
- }
- }
- }
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': [api_config]
- }
- bucket = AmazonBucket(client, 'test-bucket')
- current = bucket.current_config('test-id')
- assert current.raw == api_config
- assert client.get_bucket_notification_configuration.call_count == 1
-
- def test_current_config_empty(self):
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': []
- }
- bucket = AmazonBucket(client, 'test-bucket')
- current = bucket.current_config('test-id')
- assert current is None
- assert client.get_bucket_notification_configuration.call_count == 1
-
- def test_apply_invalid_config(self):
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': []
- }
- client.put_bucket_notification_configuration.side_effect = ClientError({}, '')
- bucket = AmazonBucket(client, 'test-bucket')
- config = Config.from_params(**{
- 'event_name': 'test_event',
- 'lambda_function_arn': 'lambda_arn',
- 'lambda_version': 1,
- 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],
- 'prefix': '',
- 'suffix': ''
- })
- with pytest.raises(ClientError):
- bucket.apply_config(config)
-
- def test_apply_config(self):
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': []
- }
-
- bucket = AmazonBucket(client, 'test-bucket')
- config = Config.from_params(**{
- 'event_name': 'test_event',
- 'lambda_function_arn': 'lambda_arn',
- 'lambda_version': 1,
- 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],
- 'prefix': '',
- 'suffix': ''
- })
- bucket.apply_config(config)
- assert client.get_bucket_notification_configuration.call_count == 1
- assert client.put_bucket_notification_configuration.call_count == 1
-
- def test_apply_config_add_event(self):
- api_config = {
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-arn',
- 'Events': ['s3:ObjectRemoved:*'],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': ''
- }, {
- 'Name': 'Suffix',
- 'Value': ''
- }]
- }
- }
- }
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': [api_config]
- }
-
- bucket = AmazonBucket(client, 'test-bucket')
- config = Config.from_params(**{
- 'event_name': 'test-id',
- 'lambda_function_arn': 'test-arn',
- 'lambda_version': 1,
- 'events': ['s3:ObjectRemoved:*', 's3:ObjectCreated:*'],
- 'prefix': '',
- 'suffix': ''
- })
- bucket.apply_config(config)
- assert client.get_bucket_notification_configuration.call_count == 1
- assert client.put_bucket_notification_configuration.call_count == 1
- client.put_bucket_notification_configuration.assert_called_with(
- Bucket='test-bucket',
- NotificationConfiguration={
- 'LambdaFunctionConfigurations': [{
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-arn:1',
- 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': ''
- }, {
- 'Name': 'Suffix',
- 'Value': ''
- }]
- }
- }
- }]
- }
- )
-
- def test_delete_config(self):
- api_config = {
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-arn',
- 'Events': [],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': ''
- }, {
- 'Name': 'Suffix',
- 'Value': ''
- }]
- }
- }
- }
- client = MagicMock()
- client.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': [api_config]
- }
- bucket = AmazonBucket(client, 'test-bucket')
- config = Config.from_params(**{
- 'event_name': 'test-id',
- 'lambda_function_arn': 'lambda_arn',
- 'lambda_version': 1,
- 'events': [],
- 'prefix': '',
- 'suffix': ''
- })
- bucket.delete_config(config)
- assert client.get_bucket_notification_configuration.call_count == 1
- assert client.put_bucket_notification_configuration.call_count == 1
- client.put_bucket_notification_configuration.assert_called_with(
- Bucket='test-bucket',
- NotificationConfiguration={'LambdaFunctionConfigurations': []}
- )
-
-
-class TestConfig:
- def test_config_from_params(self):
- config = Config({
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-arn:10',
- 'Events': [],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': ''
- }, {
- 'Name': 'Suffix',
- 'Value': ''
- }]
- }
- }
- })
- config_from_params = Config.from_params(**{
- 'event_name': 'test-id',
- 'lambda_function_arn': 'test-arn',
- 'lambda_version': 10,
- 'events': [],
- 'prefix': '',
- 'suffix': ''
- })
- assert config.raw == config_from_params.raw
- assert config == config_from_params
-
-
-class TestModule(ModuleTestCase):
- def test_module_fail_when_required_args_missing(self):
- with pytest.raises(AnsibleFailJson):
- set_module_args({})
- s3_bucket_notification.main()
-
- @patch('ansible.modules.cloud.amazon.s3_bucket_notification.AnsibleAWSModule.client')
- def test_add_s3_bucket_notification(self, aws_client):
- aws_client.return_value.get_bucket_notification_configuration.return_value = {
- 'LambdaFunctionConfigurations': []
- }
- set_module_args({
- 'region': 'us-east-2',
- 'lambda_function_arn': 'test-lambda-arn',
- 'bucket_name': 'test-lambda',
- 'event_name': 'test-id',
- 'events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
- 'state': 'present',
- 'prefix': '/images',
- 'suffix': '.jpg'
- })
- with pytest.raises(AnsibleExitJson) as context:
- s3_bucket_notification.main()
- result = context.value.args[0]
- assert result['changed'] is True
- assert aws_client.return_value.get_bucket_notification_configuration.call_count == 1
- aws_client.return_value.put_bucket_notification_configuration.assert_called_with(
- Bucket='test-lambda',
- NotificationConfiguration={
- 'LambdaFunctionConfigurations': [{
- 'Id': 'test-id',
- 'LambdaFunctionArn': 'test-lambda-arn',
- 'Events': ['s3:ObjectCreated:*', 's3:ObjectRemoved:*'],
- 'Filter': {
- 'Key': {
- 'FilterRules': [{
- 'Name': 'Prefix',
- 'Value': '/images'
- }, {
- 'Name': 'Suffix',
- 'Value': '.jpg'
- }]
- }
- }
- }]
- })
diff --git a/test/units/plugins/connection/test_aws_ssm.py b/test/units/plugins/connection/test_aws_ssm.py
deleted file mode 100644
index bcea207e78..0000000000
--- a/test/units/plugins/connection/test_aws_ssm.py
+++ /dev/null
@@ -1,194 +0,0 @@
-# Make coding more python3-ish
-from __future__ import (absolute_import, division, print_function)
-__metaclass__ = type
-
-from io import StringIO
-import pytest
-import sys
-from ansible import constants as C
-from ansible.compat.selectors import SelectorKey, EVENT_READ
-from units.compat import unittest
-from units.compat.mock import patch, MagicMock, PropertyMock
-from ansible.errors import AnsibleError, AnsibleConnectionFailure, AnsibleFileNotFound
-from ansible.module_utils.six.moves import shlex_quote
-from ansible.module_utils._text import to_bytes
-from ansible.playbook.play_context import PlayContext
-from ansible.plugins.connection import aws_ssm
-from ansible.plugins.loader import connection_loader
-
-
-@pytest.mark.skipif(sys.version_info < (2, 7), reason="requires Python 2.7 or higher")
-class TestConnectionBaseClass(unittest.TestCase):
-
- @patch('os.path.exists')
- @patch('subprocess.Popen')
- @patch('select.poll')
- @patch('boto3.client')
- def test_plugins_connection_aws_ssm_start_session(self, boto_client, s_poll, s_popen, mock_ospe):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.get_option = MagicMock()
- conn.get_option.side_effect = ['i1234', 'executable', 'abcd', 'i1234']
- conn.host = 'abc'
- mock_ospe.return_value = True
- boto3 = MagicMock()
- boto3.client('ssm').return_value = MagicMock()
- conn.start_session = MagicMock()
- conn._session_id = MagicMock()
- conn._session_id.return_value = 's1'
- s_popen.return_value.stdin.write = MagicMock()
- s_poll.return_value = MagicMock()
- s_poll.return_value.register = MagicMock()
- s_popen.return_value.poll = MagicMock()
- s_popen.return_value.poll.return_value = None
- conn._stdin_readline = MagicMock()
- conn._stdin_readline.return_value = 'abc123'
- conn.SESSION_START = 'abc'
- conn.start_session()
-
- @patch('random.choice')
- def test_plugins_connection_aws_ssm_exec_command(self, r_choice):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- r_choice.side_effect = ['a', 'a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'b']
- conn.MARK_LENGTH = 5
- conn._session = MagicMock()
- conn._session.stdin.write = MagicMock()
- conn._wrap_command = MagicMock()
- conn._wrap_command.return_value = 'cmd1'
- conn._flush_stderr = MagicMock()
- conn._windows = MagicMock()
- conn._windows.return_value = True
- sudoable = True
- conn._session.poll = MagicMock()
- conn._session.poll.return_value = None
- remaining = 0
- conn._timeout = MagicMock()
- conn._poll_stdout = MagicMock()
- conn._poll_stdout.poll = MagicMock()
- conn._poll_stdout.poll.return_value = True
- conn._session.stdout = MagicMock()
- conn._session.stdout.readline = MagicMock()
- begin = True
- mark_end = 'a'
- line = ['a', 'b']
- conn._post_process = MagicMock()
- conn._post_process.return_value = 'test'
- conn._session.stdout.readline.side_effect = iter(['aaaaa\n', 'Hi\n', '0\n', 'bbbbb\n'])
- conn.get_option = MagicMock()
- conn.get_option.return_value = 1
- cmd = MagicMock()
- returncode = 'a'
- stdout = 'b'
- return (returncode, stdout, conn._flush_stderr)
-
- def test_plugins_connection_aws_ssm_prepare_terminal(self):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.is_windows = MagicMock()
- conn.is_windows.return_value = True
-
- def test_plugins_connection_aws_ssm_wrap_command(self):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.is_windows = MagicMock()
- conn.is_windows.return_value = True
- return('windows1')
-
- def test_plugins_connection_aws_ssm_post_process(self):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.is_windows = MagicMock()
- conn.is_windows.return_value = True
- success = 3
- fail = 2
- conn.stdout = MagicMock()
- returncode = 0
- return(returncode, conn.stdout)
-
- @patch('subprocess.Popen')
- def test_plugins_connection_aws_ssm_flush_stderr(self, s_popen):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.poll_stderr = MagicMock()
- conn.poll_stderr.register = MagicMock()
- conn.stderr = None
- s_popen.poll().return_value = 123
- return(conn.stderr)
-
- @patch('boto3.client')
- def test_plugins_connection_aws_ssm_get_url(self, boto):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- boto3 = MagicMock()
- boto3.client('s3').return_value = MagicMock()
- boto3.generate_presigned_url.return_value = MagicMock()
- return (boto3.generate_presigned_url.return_value)
-
- @patch('os.path.exists')
- def test_plugins_connection_aws_ssm_put_file(self, mock_ospe):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn._connect = MagicMock()
- conn._file_transport_command = MagicMock()
- conn._file_transport_command.return_value = (0, 'stdout', 'stderr')
- res, stdout, stderr = conn.put_file('/in/file', '/out/file')
-
- def test_plugins_connection_aws_ssm_fetch_file(self):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn._connect = MagicMock()
- conn._file_transport_command = MagicMock()
- conn._file_transport_command.return_value = (0, 'stdout', 'stderr')
- res, stdout, stderr = conn.fetch_file('/in/file', '/out/file')
-
- @patch('subprocess.check_output')
- @patch('boto3.client')
- def test_plugins_connection_file_transport_command(self, boto_client, s_check_output):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.get_option = MagicMock()
- conn.get_option.side_effect = ['1', '2', '3', '4', '5']
- conn._get_url = MagicMock()
- conn._get_url.side_effect = ['url1', 'url2']
- boto3 = MagicMock()
- boto3.client('s3').return_value = MagicMock()
- conn.get_option.return_value = 1
- ssm_action = 'get'
- get_command = MagicMock()
- put_command = MagicMock()
- conn.exec_command = MagicMock()
- conn.exec_command.return_value = (put_command, None, False)
- conn.download_fileobj = MagicMock()
- (returncode, stdout, stderr) = conn.exec_command(put_command, in_data=None, sudoable=False)
- returncode = 0
- (returncode, stdout, stderr) = conn.exec_command(get_command, in_data=None, sudoable=False)
-
- @patch('subprocess.check_output')
- def test_plugins_connection_aws_ssm_close(self, s_check_output):
- pc = PlayContext()
- new_stdin = StringIO()
- conn = connection_loader.get('aws_ssm', pc, new_stdin)
- conn.instance_id = "i-12345"
- conn._session_id = True
- conn.get_option = MagicMock()
- conn.get_option.side_effect = ["/abc", "pqr"]
- conn._session = MagicMock()
- conn._session.terminate = MagicMock()
- conn._session.communicate = MagicMock()
- conn._terminate_session = MagicMock()
- conn._terminate_session.return_value = ''
- conn._session_id = MagicMock()
- conn._session_id.return_value = 'a'
- conn._client = MagicMock()
- conn.close()