# This file is part of cloud-init. See LICENSE file for license information. import os import shutil import tempfile from copy import copy from unittest import mock import yaml from cloudinit import url_helper from cloudinit.sources import DataSourceMAAS from tests.unittests.helpers import CiTestCase, populate_dir class TestMAASDataSource(CiTestCase): def setUp(self): super(TestMAASDataSource, self).setUp() # Make a temp directoy for tests to use. self.tmp = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.tmp) def test_seed_dir_valid(self): """Verify a valid seeddir is read as such.""" userdata = b"valid01-userdata" data = { "meta-data/instance-id": "i-valid01", "meta-data/local-hostname": "valid01-hostname", "user-data": userdata, "public-keys": "ssh-rsa AAAAB3Nz...aC1yc2E= keyname", } my_d = os.path.join(self.tmp, "valid") populate_dir(my_d, data) ud, md, vd = DataSourceMAAS.read_maas_seed_dir(my_d) self.assertEqual(userdata, ud) for key in ("instance-id", "local-hostname"): self.assertEqual(data["meta-data/" + key], md[key]) # verify that 'userdata' is not returned as part of the metadata self.assertFalse(("user-data" in md)) self.assertIsNone(vd) def test_seed_dir_valid_extra(self): """Verify extra files do not affect seed_dir validity.""" userdata = b"valid-extra-userdata" data = { "meta-data/instance-id": "i-valid-extra", "meta-data/local-hostname": "valid-extra-hostname", "user-data": userdata, "foo": "bar", } my_d = os.path.join(self.tmp, "valid_extra") populate_dir(my_d, data) ud, md, _vd = DataSourceMAAS.read_maas_seed_dir(my_d) self.assertEqual(userdata, ud) for key in ("instance-id", "local-hostname"): self.assertEqual(data["meta-data/" + key], md[key]) # additional files should not just appear as keys in metadata atm self.assertFalse(("foo" in md)) def test_seed_dir_invalid(self): """Verify that invalid seed_dir raises MAASSeedDirMalformed.""" valid = { "instance-id": "i-instanceid", "local-hostname": "test-hostname", "user-data": "", } my_based = os.path.join(self.tmp, "valid_extra") # missing 'userdata' file my_d = "%s-01" % my_based invalid_data = copy(valid) del invalid_data["local-hostname"] populate_dir(my_d, invalid_data) self.assertRaises( DataSourceMAAS.MAASSeedDirMalformed, DataSourceMAAS.read_maas_seed_dir, my_d, ) # missing 'instance-id' my_d = "%s-02" % my_based invalid_data = copy(valid) del invalid_data["instance-id"] populate_dir(my_d, invalid_data) self.assertRaises( DataSourceMAAS.MAASSeedDirMalformed, DataSourceMAAS.read_maas_seed_dir, my_d, ) def test_seed_dir_none(self): """Verify that empty seed_dir raises MAASSeedDirNone.""" my_d = os.path.join(self.tmp, "valid_empty") self.assertRaises( DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, my_d, ) def test_seed_dir_missing(self): """Verify that missing seed_dir raises MAASSeedDirNone.""" self.assertRaises( DataSourceMAAS.MAASSeedDirNone, DataSourceMAAS.read_maas_seed_dir, os.path.join(self.tmp, "nonexistantdirectory"), ) def mock_read_maas_seed_url(self, data, seed, version="19991231"): """mock up readurl to appear as a web server at seed has provided data. return what read_maas_seed_url returns.""" def my_readurl(*args, **kwargs): if len(args): url = args[0] else: url = kwargs["url"] prefix = "%s/%s/" % (seed, version) if not url.startswith(prefix): raise ValueError("unexpected call %s" % url) short = url[len(prefix) :] if short not in data: raise url_helper.UrlError("not found", code=404, url=url) return url_helper.StringResponse(data[short]) # Now do the actual call of the code under test. with mock.patch("cloudinit.url_helper.readurl") as mock_readurl: mock_readurl.side_effect = my_readurl return DataSourceMAAS.read_maas_seed_url(seed, version=version) def test_seed_url_valid(self): """Verify that valid seed_url is read as such.""" valid = { "meta-data/instance-id": "i-instanceid", "meta-data/local-hostname": "test-hostname", "meta-data/public-keys": "test-hostname", "meta-data/vendor-data": b"my-vendordata", "user-data": b"foodata", } my_seed = "http://example.com/xmeta" my_ver = "1999-99-99" ud, md, vd = self.mock_read_maas_seed_url(valid, my_seed, my_ver) self.assertEqual(valid["meta-data/instance-id"], md["instance-id"]) self.assertEqual( valid["meta-data/local-hostname"], md["local-hostname"] ) self.assertEqual(valid["meta-data/public-keys"], md["public-keys"]) self.assertEqual(valid["user-data"], ud) # vendor-data is yaml, which decodes a string self.assertEqual(valid["meta-data/vendor-data"].decode(), vd) def test_seed_url_vendor_data_dict(self): expected_vd = {"key1": "value1"} valid = { "meta-data/instance-id": "i-instanceid", "meta-data/local-hostname": "test-hostname", "meta-data/vendor-data": yaml.safe_dump(expected_vd).encode(), } _ud, md, vd = self.mock_read_maas_seed_url( valid, "http://example.com/foo" ) self.assertEqual(valid["meta-data/instance-id"], md["instance-id"]) self.assertEqual(expected_vd, vd) @mock.patch("cloudinit.sources.DataSourceMAAS.url_helper.OauthUrlHelper") class TestGetOauthHelper(CiTestCase): base_cfg = { "consumer_key": "FAKE_CONSUMER_KEY", "token_key": "FAKE_TOKEN_KEY", "token_secret": "FAKE_TOKEN_SECRET", "consumer_secret": None, } def test_all_required(self, m_helper): """Valid config as expected.""" DataSourceMAAS.get_oauth_helper(self.base_cfg.copy()) m_helper.assert_has_calls([mock.call(**self.base_cfg)]) def test_other_fields_not_passed_through(self, m_helper): """Only relevant fields are passed through.""" mycfg = self.base_cfg.copy() mycfg["unrelated_field"] = "unrelated" DataSourceMAAS.get_oauth_helper(mycfg) m_helper.assert_has_calls([mock.call(**self.base_cfg)]) class TestGetIdHash(CiTestCase): v1_cfg = { "consumer_key": "CKEY", "token_key": "TKEY", "token_secret": "TSEC", } v1_id = ( "v1:403ee5f19c956507f1d0e50814119c405902137ea4f8838bde167c5da8110392" ) def test_v1_expected(self): """Test v1 id generated as expected working behavior from config.""" result = DataSourceMAAS.get_id_from_ds_cfg(self.v1_cfg.copy()) self.assertEqual(self.v1_id, result) def test_v1_extra_fields_are_ignored(self): """Test v1 id ignores unused entries in config.""" cfg = self.v1_cfg.copy() cfg["consumer_secret"] = "BOO" cfg["unrelated"] = "HI MOM" result = DataSourceMAAS.get_id_from_ds_cfg(cfg) self.assertEqual(self.v1_id, result) # vi: ts=4 expandtab