summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--swiftclient/client.py29
-rw-r--r--swiftclient/command_helpers.py8
-rw-r--r--swiftclient/service.py20
-rw-r--r--tests/unit/test_command_helpers.py26
-rw-r--r--tests/unit/test_shell.py372
-rw-r--r--tests/unit/test_swiftclient.py47
-rw-r--r--tests/unit/utils.py54
7 files changed, 447 insertions, 109 deletions
diff --git a/swiftclient/client.py b/swiftclient/client.py
index 7ce7218..3bc9c75 100644
--- a/swiftclient/client.py
+++ b/swiftclient/client.py
@@ -285,29 +285,36 @@ def get_keystoneclient_2_0(auth_url, user, key, os_options, **kwargs):
return get_auth_keystone(auth_url, user, key, os_options, **kwargs)
-def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
- """
- Authenticate against a keystone server.
-
- We are using the keystoneclient library for authentication.
- """
-
- insecure = kwargs.get('insecure', False)
- auth_version = kwargs.get('auth_version', '2.0')
- debug = logger.isEnabledFor(logging.DEBUG) and True or False
-
+def _import_keystone_client(auth_version):
+ # the attempted imports are encapsulated in this function to allow
+ # mocking for tests
try:
if auth_version in AUTH_VERSIONS_V3:
from keystoneclient.v3 import client as ksclient
else:
from keystoneclient.v2_0 import client as ksclient
from keystoneclient import exceptions
+ return ksclient, exceptions
except ImportError:
sys.exit('''
Auth versions 2.0 and 3 require python-keystoneclient, install it or use Auth
version 1.0 which requires ST_AUTH, ST_USER, and ST_KEY environment
variables to be set or overridden with -A, -U, or -K.''')
+
+def get_auth_keystone(auth_url, user, key, os_options, **kwargs):
+ """
+ Authenticate against a keystone server.
+
+ We are using the keystoneclient library for authentication.
+ """
+
+ insecure = kwargs.get('insecure', False)
+ auth_version = kwargs.get('auth_version', '2.0')
+ debug = logger.isEnabledFor(logging.DEBUG) and True or False
+
+ ksclient, exceptions = _import_keystone_client(auth_version)
+
try:
_ksclient = ksclient.Client(
username=user,
diff --git a/swiftclient/command_helpers.py b/swiftclient/command_helpers.py
index 9a78b9b..0822699 100644
--- a/swiftclient/command_helpers.py
+++ b/swiftclient/command_helpers.py
@@ -45,6 +45,14 @@ def stat_account(conn, options):
policies.add(policy_name)
for policy in policies:
+ container_count_header = (POLICY_HEADER_PREFIX + policy +
+ '-container-count')
+ if container_count_header in headers:
+ items.append(
+ ('Containers in policy "' + policy + '"',
+ prt_bytes(headers[container_count_header],
+ options['human']).lstrip())
+ )
items.extend((
('Objects in policy "' + policy + '"',
prt_bytes(
diff --git a/swiftclient/service.py b/swiftclient/service.py
index 467bda9..471e663 100644
--- a/swiftclient/service.py
+++ b/swiftclient/service.py
@@ -77,8 +77,10 @@ class SwiftError(Exception):
def process_options(options):
- if not (options['auth'] and options['user'] and options['key']):
- # Use 2.0 auth if none of the old args are present
+ if (not (options.get('auth') and options.get('user')
+ and options.get('key'))
+ and options.get('auth_version') != '3'):
+ # Use keystone 2.0 auth if any of the old-style args are missing
options['auth_version'] = '2.0'
# Use new-style args if old ones not present
@@ -91,8 +93,15 @@ def process_options(options):
# Specific OpenStack options
options['os_options'] = {
+ 'user_id': options['os_user_id'],
+ 'user_domain_id': options['os_user_domain_id'],
+ 'user_domain_name': options['os_user_domain_name'],
'tenant_id': options['os_tenant_id'],
'tenant_name': options['os_tenant_name'],
+ 'project_id': options['os_project_id'],
+ 'project_name': options['os_project_name'],
+ 'project_domain_id': options['os_project_domain_id'],
+ 'project_domain_name': options['os_project_domain_name'],
'service_type': options['os_service_type'],
'endpoint_type': options['os_endpoint_type'],
'auth_token': options['os_auth_token'],
@@ -111,9 +120,16 @@ _default_global_options = {
"key": environ.get('ST_KEY'),
"retries": 5,
"os_username": environ.get('OS_USERNAME'),
+ "os_user_id": environ.get('OS_USER_ID'),
+ "os_user_domain_name": environ.get('OS_USER_DOMAIN_NAME'),
+ "os_user_domain_id": environ.get('OS_USER_DOMAIN_ID'),
"os_password": environ.get('OS_PASSWORD'),
"os_tenant_id": environ.get('OS_TENANT_ID'),
"os_tenant_name": environ.get('OS_TENANT_NAME'),
+ "os_project_name": environ.get('OS_PROJECT_NAME'),
+ "os_project_id": environ.get('OS_PROJECT_ID'),
+ "os_project_domain_name": environ.get('OS_PROJECT_DOMAIN_NAME'),
+ "os_project_domain_id": environ.get('OS_PROJECT_DOMAIN_ID'),
"os_auth_url": environ.get('OS_AUTH_URL'),
"os_auth_token": environ.get('OS_AUTH_TOKEN'),
"os_storage_url": environ.get('OS_STORAGE_URL'),
diff --git a/tests/unit/test_command_helpers.py b/tests/unit/test_command_helpers.py
index ad8012e..67e9ac2 100644
--- a/tests/unit/test_command_helpers.py
+++ b/tests/unit/test_command_helpers.py
@@ -124,6 +124,32 @@ Objects in policy "nada": 1000000
"""
self.assertOut(expected)
+ def test_stat_account_policy_stat_with_container_counts(self):
+ # stub head_account
+ stub_headers = {
+ 'x-account-container-count': 42,
+ 'x-account-object-count': 1000000,
+ 'x-account-bytes-used': 2 ** 30,
+ 'x-account-storage-policy-nada-container-count': 10,
+ 'x-account-storage-policy-nada-object-count': 1000000,
+ 'x-account-storage-policy-nada-bytes-used': 2 ** 30,
+ }
+ self.conn.head_account.return_value = stub_headers
+
+ with self.output_manager as output_manager:
+ items, headers = h.stat_account(self.conn, self.options)
+ h.print_account_stats(items, headers, output_manager)
+ expected = """
+ Account: a
+ Containers: 42
+ Objects: 1000000
+ Bytes: 1073741824
+Containers in policy "nada": 10
+ Objects in policy "nada": 1000000
+ Bytes in policy "nada": 1073741824
+"""
+ self.assertOut(expected)
+
def test_stat_container_human(self):
self.options['human'] = True
# stub head container request
diff --git a/tests/unit/test_shell.py b/tests/unit/test_shell.py
index 80d63d9..78da0dd 100644
--- a/tests/unit/test_shell.py
+++ b/tests/unit/test_shell.py
@@ -21,10 +21,12 @@ import unittest
import six
import swiftclient
+from swiftclient.service import SwiftError
import swiftclient.shell
import swiftclient.utils
from os.path import basename, dirname
+from tests.unit.test_swiftclient import MockHttpTest
if six.PY2:
BUILTIN_OPEN = '__builtin__.open'
@@ -38,6 +40,40 @@ mocked_os_environ = {
}
+def _make_args(cmd, opts, os_opts, separator='-', flags=None, cmd_args=None):
+ """
+ Construct command line arguments for given options.
+ """
+ args = [""]
+ flags = flags or []
+ for k, v in opts.items():
+ arg = "--" + k.replace("_", "-")
+ args = args + [arg, v]
+ for k, v in os_opts.items():
+ arg = "--os" + separator + k.replace("_", separator)
+ args = args + [arg, v]
+ for flag in flags:
+ args.append('--%s' % flag)
+ args = args + [cmd]
+ if cmd_args:
+ args = args + cmd_args
+ return args
+
+
+def _make_env(opts, os_opts):
+ """
+ Construct a dict of environment variables for given options.
+ """
+ env = {}
+ for k, v in opts.items():
+ key = 'ST_' + k.upper().replace('-', '_')
+ env[key] = v
+ for k, v in os_opts.items():
+ key = 'OS_' + k.upper().replace('-', '_')
+ env[key] = v
+ return env
+
+
@mock.patch.dict(os.environ, mocked_os_environ)
class TestShell(unittest.TestCase):
def __init__(self, *args, **kwargs):
@@ -399,47 +435,21 @@ class TestParsing(unittest.TestCase):
def setUp(self):
super(TestParsing, self).setUp()
- self._orig_environ = os.environ.copy()
+ self._environ_vars = {}
keys = os.environ.keys()
for k in keys:
- if k in ('ST_KEY', 'ST_USER', 'ST_AUTH'):
- del os.environ[k]
+ if (k in ('ST_KEY', 'ST_USER', 'ST_AUTH')
+ or k.startswith('OS_')):
+ self._environ_vars[k] = os.environ.pop(k)
def tearDown(self):
- os.environ = self._orig_environ
+ os.environ.update(self._environ_vars)
def _make_fake_command(self, result):
def fake_command(parser, args, thread_manager):
result[0], result[1] = swiftclient.shell.parse_args(parser, args)
return fake_command
- def _make_args(self, cmd, opts, os_opts, separator='-'):
- """
- Construct command line arguments for given options.
- """
- args = [""]
- for k, v in opts.items():
- arg = "--" + k.replace("_", "-")
- args = args + [arg, v]
- for k, v in os_opts.items():
- arg = "--os" + separator + k.replace("_", separator)
- args = args + [arg, v]
- args = args + [cmd]
- return args
-
- def _make_env(self, opts, os_opts):
- """
- Construct a dict of environment variables for given options.
- """
- env = {}
- for k, v in opts.items():
- key = 'ST_' + k.upper()
- env[key] = v
- for k, v in os_opts.items():
- key = 'OS_' + k.upper()
- env[key] = v
- return env
-
def _verify_opts(self, actual_opts, opts, os_opts={}, os_opts_dict={}):
"""
Check parsed options are correct.
@@ -502,7 +512,7 @@ class TestParsing(unittest.TestCase):
# username with domain is sufficient in args because keystone will
# assume user is in default domain
- args = self._make_args("stat", opts, os_opts, '-')
+ args = _make_args("stat", opts, os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@@ -516,7 +526,7 @@ class TestParsing(unittest.TestCase):
all_os_opts = os_opts.copy()
all_os_opts.update(os_opts_dict)
- args = self._make_args("stat", opts, all_os_opts, '-')
+ args = _make_args("stat", opts, all_os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@@ -528,7 +538,7 @@ class TestParsing(unittest.TestCase):
os_opts_dict = {"storage_url": "http://example.com:8080/v1",
"auth_token": "0123abcd"}
- args = self._make_args("stat", opts, os_opts_dict, '-')
+ args = _make_args("stat", opts, os_opts_dict, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@@ -558,7 +568,7 @@ class TestParsing(unittest.TestCase):
all_os_opts.update(os_opts_dict)
# check using hyphen separator
- args = self._make_args("stat", opts, all_os_opts, '-')
+ args = _make_args("stat", opts, all_os_opts, '-')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@@ -566,7 +576,7 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check using underscore separator
- args = self._make_args("stat", opts, all_os_opts, '_')
+ args = _make_args("stat", opts, all_os_opts, '_')
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch('swiftclient.shell.st_stat', fake_command):
@@ -574,8 +584,8 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check using environment variables
- args = self._make_args("stat", {}, {})
- env = self._make_env(opts, all_os_opts)
+ args = _make_args("stat", {}, {})
+ env = _make_env(opts, all_os_opts)
result = [None, None]
fake_command = self._make_fake_command(result)
with mock.patch.dict(os.environ, env):
@@ -584,7 +594,7 @@ class TestParsing(unittest.TestCase):
self._verify_opts(result[0], opts, os_opts, os_opts_dict)
# check again using OS_AUTH_VERSION instead of ST_AUTH_VERSION
- env = self._make_env({}, all_os_opts)
+ env = _make_env({}, all_os_opts)
env.update({'OS_AUTH_VERSION': '3'})
result = [None, None]
fake_command = self._make_fake_command(result)
@@ -600,7 +610,7 @@ class TestParsing(unittest.TestCase):
os_opts = {"password": "secret",
"username": "user",
"auth_url": "http://example.com:5000/v3"}
- args = self._make_args("stat", opts, os_opts)
+ args = _make_args("stat", opts, os_opts)
with mock.patch('swiftclient.shell.st_stat', fake_command):
swiftclient.shell.main(args)
self.assertEqual(['stat'], result[1])
@@ -613,37 +623,37 @@ class TestParsing(unittest.TestCase):
opts = {"auth_version": "3"}
os_opts = {"password": "secret",
"auth_url": "http://example.com:5000/v3"}
- args = self._make_args("stat", opts, os_opts)
+ args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"auth_url": "http://example.com:5000/v3"}
- args = self._make_args("stat", opts, os_opts)
+ args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"password": "secret"}
- args = self._make_args("stat", opts, os_opts)
+ args = _make_args("stat", opts, os_opts)
self.assertRaises(SystemExit, swiftclient.shell.main, args)
def test_insufficient_env_vars_v3(self):
- args = self._make_args("stat", {}, {})
+ args = _make_args("stat", {}, {})
opts = {"auth_version": "3"}
os_opts = {"password": "secret",
"auth_url": "http://example.com:5000/v3"}
- env = self._make_env(opts, os_opts)
+ env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"auth_url": "http://example.com:5000/v3"}
- env = self._make_env(opts, os_opts)
+ env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
os_opts = {"username": "user",
"password": "secret"}
- env = self._make_env(opts, os_opts)
+ env = _make_env(opts, os_opts)
with mock.patch.dict(os.environ, env):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@@ -651,7 +661,7 @@ class TestParsing(unittest.TestCase):
# --help returns condensed help message
opts = {"help": ""}
os_opts = {}
- args = self._make_args("stat", opts, os_opts)
+ args = _make_args("stat", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@@ -665,7 +675,7 @@ class TestParsing(unittest.TestCase):
# "password": "secret",
# "username": "user",
# "auth_url": "http://example.com:5000/v3"}
- args = self._make_args("", opts, os_opts)
+ args = _make_args("", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
@@ -675,10 +685,272 @@ class TestParsing(unittest.TestCase):
## --os-help return os options help
opts = {}
- args = self._make_args("", opts, os_opts)
+ args = _make_args("", opts, os_opts)
mock_stdout = six.StringIO()
with mock.patch('sys.stdout', mock_stdout):
self.assertRaises(SystemExit, swiftclient.shell.main, args)
out = mock_stdout.getvalue()
self.assertTrue(out.find('[--key <api_key>]') > 0)
self.assertTrue(out.find('--os-username=<auth-user-name>') > 0)
+
+
+class FakeKeystone(object):
+ '''
+ Fake keystone client module. Returns given endpoint url and auth token.
+ '''
+ def __init__(self, endpoint, token):
+ self.calls = []
+ self.auth_version = None
+ self.endpoint = endpoint
+ self.token = token
+
+ class _Client():
+ def __init__(self, endpoint, token, **kwargs):
+ self.auth_token = token
+ self.endpoint = endpoint
+ self.service_catalog = self.ServiceCatalog(endpoint)
+
+ class ServiceCatalog(object):
+ def __init__(self, endpoint):
+ self.calls = []
+ self.endpoint_url = endpoint
+
+ def url_for(self, **kwargs):
+ self.calls.append(kwargs)
+ return self.endpoint_url
+
+ def Client(self, **kwargs):
+ self.calls.append(kwargs)
+ self.client = self._Client(endpoint=self.endpoint, token=self.token,
+ **kwargs)
+ return self.client
+
+ class Unauthorized(Exception):
+ pass
+
+ class AuthorizationFailure(Exception):
+ pass
+
+ class EndpointNotFound(Exception):
+ pass
+
+
+def _make_fake_import_keystone_client(fake_import):
+ def _fake_import_keystone_client(auth_version):
+ fake_import.auth_version = auth_version
+ return fake_import, fake_import
+
+ return _fake_import_keystone_client
+
+
+class TestKeystoneOptions(MockHttpTest):
+ """
+ Tests to check that options are passed from the command line or
+ environment variables through to the keystone client interface.
+ """
+ all_os_opts = {'password': 'secret',
+ 'username': 'user',
+ 'auth-url': 'http://example.com:5000/v3',
+ 'user-domain-name': 'userdomain',
+ 'user-id': 'userid',
+ 'user-domain-id': 'userdomainid',
+ 'tenant-name': 'tenantname',
+ 'tenant-id': 'tenantid',
+ 'project-name': 'projectname',
+ 'project-id': 'projectid',
+ 'project-domain-id': 'projectdomainid',
+ 'project-domain-name': 'projectdomain',
+ 'cacert': 'foo'}
+ catalog_opts = {'service-type': 'my-object-store',
+ 'endpoint-type': 'public',
+ 'region-name': 'my-region'}
+ flags = ['insecure', 'debug']
+
+ # options that are given default values in code if missing from CLI
+ defaults = {'auth-version': '2.0',
+ 'service-type': 'object-store',
+ 'endpoint-type': 'publicURL'}
+
+ def _build_os_opts(self, keys):
+ os_opts = {}
+ for k in keys:
+ os_opts[k] = self.all_os_opts.get(k, self.catalog_opts.get(k))
+ return os_opts
+
+ def _test_options_passed_to_keystone(self, cmd, opts, os_opts,
+ flags=None, use_env=False,
+ cmd_args=None, no_auth=False):
+ flags = flags or []
+ if use_env:
+ # set up fake environment variables and make a minimal command line
+ env = _make_env(opts, os_opts)
+ args = _make_args(cmd, {}, {}, separator='-', flags=flags,
+ cmd_args=cmd_args)
+ else:
+ # set up empty environment and make full command line
+ env = {}
+ args = _make_args(cmd, opts, os_opts, separator='-', flags=flags,
+ cmd_args=cmd_args)
+ ks_endpoint = 'http://example.com:8080/v1/AUTH_acc'
+ ks_token = 'fake_auth_token'
+ fake_ks = FakeKeystone(endpoint=ks_endpoint, token=ks_token)
+ # fake_conn will check that storage_url and auth_token are as expected
+ endpoint = os_opts.get('storage-url', ks_endpoint)
+ token = os_opts.get('auth-token', ks_token)
+ fake_conn = self.fake_http_connection(204, headers={},
+ storage_url=endpoint,
+ auth_token=token)
+
+ with mock.patch('swiftclient.client._import_keystone_client',
+ _make_fake_import_keystone_client(fake_ks)):
+ with mock.patch('swiftclient.client.http_connection', fake_conn):
+ with mock.patch.dict(os.environ, env, clear=True):
+ try:
+ swiftclient.shell.main(args)
+ except SystemExit as e:
+ self.fail('Unexpected SystemExit: %s' % e)
+ except SwiftError as err:
+ self.fail('Unexpected SwiftError: %s' % err)
+
+ if no_auth:
+ # check that keystone client was not used and terminate tests
+ self.assertIsNone(getattr(fake_ks, 'auth_version'))
+ self.assertEqual(len(fake_ks.calls), 0)
+ return
+
+ # check correct auth version was passed to _import_keystone_client
+ key = 'auth-version'
+ expected = opts.get(key, self.defaults.get(key))
+ self.assertEqual(expected, fake_ks.auth_version)
+
+ # check args passed to keystone Client __init__
+ self.assertEqual(len(fake_ks.calls), 1)
+ actual_args = fake_ks.calls[0]
+ for key in self.all_os_opts.keys():
+ expected = os_opts.get(key, self.defaults.get(key))
+ key = key.replace('-', '_')
+ self.assertTrue(key in actual_args,
+ 'Expected key %s not found in args %s'
+ % (key, actual_args))
+ self.assertEqual(expected, actual_args[key],
+ 'Expected %s for key %s, found %s'
+ % (expected, key, actual_args[key]))
+ for flag in flags:
+ self.assertTrue(flag in actual_args)
+ self.assertTrue(actual_args[flag])
+
+ # check args passed to ServiceCatalog.url_for() method
+ self.assertEqual(len(fake_ks.client.service_catalog.calls), 1)
+ actual_args = fake_ks.client.service_catalog.calls[0]
+ for key in self.catalog_opts.keys():
+ expected = os_opts.get(key, self.defaults.get(key))
+ key = key.replace('-', '_')
+ if key == 'region_name':
+ key = 'filter_value'
+ self.assertTrue(key in actual_args,
+ 'Expected key %s not found in args %s'
+ % (key, actual_args))
+ self.assertEqual(expected, actual_args[key],
+ 'Expected %s for key %s, found %s'
+ % (expected, key, actual_args[key]))
+ key, v = 'attr', 'region'
+ self.assertTrue(key in actual_args,
+ 'Expected key %s not found in args %s'
+ % (key, actual_args))
+ self.assertEqual(v, actual_args[key],
+ 'Expected %s for key %s, found %s'
+ % (v, key, actual_args[key]))
+
+ def _test_options(self, opts, os_opts, flags=None, no_auth=False):
+ # repeat test for different commands using env and command line options
+ for cmd in ('stat', 'post'):
+ self._test_options_passed_to_keystone(cmd, opts, os_opts,
+ flags=flags, no_auth=no_auth)
+ self._test_options_passed_to_keystone(cmd, opts, os_opts,
+ flags=flags, use_env=True,
+ no_auth=no_auth)
+
+ def test_all_args_passed_to_keystone(self):
+ # check that all possible command line args are passed to keystone
+ opts = {'auth-version': '3'}
+ os_opts = dict(self.all_os_opts)
+ os_opts.update(self.catalog_opts)
+ self._test_options(opts, os_opts, flags=self.flags)
+
+ opts = {'auth-version': '2.0'}
+ self._test_options(opts, os_opts, flags=self.flags)
+
+ opts = {}
+ self._test_options(opts, os_opts, flags=self.flags)
+
+ def test_catalog_options_and_flags_not_required_v3(self):
+ # check that all possible command line args are passed to keystone
+ opts = {'auth-version': '3'}
+ os_opts = dict(self.all_os_opts)
+ self._test_options(opts, os_opts, flags=None)
+
+ def test_ok_option_combinations_v3(self):
+ opts = {'auth-version': '3'}
+ keys = ('username', 'password', 'tenant-name', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('user-id', 'password', 'tenant-name', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('user-id', 'password', 'tenant-id', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('user-id', 'password', 'project-name', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('user-id', 'password', 'project-id', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ def test_ok_option_combinations_v2(self):
+ opts = {'auth-version': '2.0'}
+ keys = ('username', 'password', 'tenant-name', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('username', 'password', 'tenant-id', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ # allow auth_version to default to 2.0
+ opts = {}
+ keys = ('username', 'password', 'tenant-name', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ keys = ('username', 'password', 'tenant-id', 'auth-url')
+ os_opts = self._build_os_opts(keys)
+ self._test_options(opts, os_opts)
+
+ def test_url_and_token_provided_on_command_line(self):
+ endpoint = 'http://alternate.com:8080/v1/AUTH_another'
+ token = 'alternate_auth_token'
+ os_opts = {'auth-token': token,
+ 'storage-url': endpoint}
+ opts = {'auth-version': '3'}
+ self._test_options(opts, os_opts, no_auth=True)
+
+ opts = {'auth-version': '2.0'}
+ self._test_options(opts, os_opts, no_auth=True)
+
+ def test_url_provided_on_command_line(self):
+ endpoint = 'http://alternate.com:8080/v1/AUTH_another'
+ os_opts = {'username': 'username',
+ 'password': 'password',
+ 'project-name': 'projectname',
+ 'auth-url': 'http://example.com:5000/v3',
+ 'storage-url': endpoint}
+ opts = {'auth-version': '3'}
+ self._test_options(opts, os_opts)
+
+ opts = {'auth-version': '2.0'}
+ self._test_options(opts, os_opts)
diff --git a/tests/unit/test_swiftclient.py b/tests/unit/test_swiftclient.py
index facd6d9..1a5c772 100644
--- a/tests/unit/test_swiftclient.py
+++ b/tests/unit/test_swiftclient.py
@@ -30,7 +30,7 @@ from six.moves.urllib.parse import urlparse
from six.moves import reload_module
# TODO: mock http connection class with more control over headers
-from .utils import fake_http_connect, fake_get_auth_keystone
+from .utils import MockHttpTest, fake_get_auth_keystone
from swiftclient import client as c
import swiftclient.utils
@@ -103,51 +103,6 @@ class TestJsonImport(testtools.TestCase):
self.assertEqual(loads, c.json_loads)
-class MockHttpTest(testtools.TestCase):
-
- def setUp(self):
- super(MockHttpTest, self).setUp()
-
- def fake_http_connection(*args, **kwargs):
- _orig_http_connection = c.http_connection
- return_read = kwargs.get('return_read')
- query_string = kwargs.get('query_string')
- storage_url = kwargs.get('storage_url')
-
- def wrapper(url, proxy=None, cacert=None, insecure=False,
- ssl_compression=True):
- if storage_url:
- self.assertEqual(storage_url, url)
-
- parsed, _conn = _orig_http_connection(url, proxy=proxy)
- conn = fake_http_connect(*args, **kwargs)()
-
- def request(method, url, *args, **kwargs):
- if query_string:
- self.assertTrue(url.endswith('?' + query_string))
- if url.endswith('invalid_cert') and not insecure:
- from swiftclient import client as c
- raise c.ClientException("invalid_certificate")
- return
- conn.request = request
-
- conn.has_been_read = False
- _orig_read = conn.read
-
- def read(*args, **kwargs):
- conn.has_been_read = True
- return _orig_read(*args, **kwargs)
- conn.read = return_read or read
-
- return parsed, conn
- return wrapper
- self.fake_http_connection = fake_http_connection
-
- def tearDown(self):
- super(MockHttpTest, self).tearDown()
- reload_module(c)
-
-
class MockHttpResponse():
def __init__(self, status=0):
self.status = status
diff --git a/tests/unit/utils.py b/tests/unit/utils.py
index cb671cf..09b31c1 100644
--- a/tests/unit/utils.py
+++ b/tests/unit/utils.py
@@ -14,6 +14,9 @@
# limitations under the License.
from requests import RequestException
from time import sleep
+import testtools
+from six.moves import reload_module
+from swiftclient import client as c
def fake_get_auth_keystone(os_options, exc=None, **kwargs):
@@ -156,3 +159,54 @@ def fake_http_connect(*code_iter, **kwargs):
return fake_conn
return connect
+
+
+class MockHttpTest(testtools.TestCase):
+
+ def setUp(self):
+ super(MockHttpTest, self).setUp()
+
+ def fake_http_connection(*args, **kwargs):
+ _orig_http_connection = c.http_connection
+ return_read = kwargs.get('return_read')
+ query_string = kwargs.get('query_string')
+ storage_url = kwargs.get('storage_url')
+ auth_token = kwargs.get('auth_token')
+
+ def wrapper(url, proxy=None, cacert=None, insecure=False,
+ ssl_compression=True):
+ if storage_url:
+ self.assertEqual(storage_url, url)
+
+ parsed, _conn = _orig_http_connection(url, proxy=proxy)
+ conn = fake_http_connect(*args, **kwargs)()
+
+ def request(method, url, *args, **kwargs):
+ if auth_token:
+ headers = args[1]
+ self.assertTrue('X-Auth-Token' in headers)
+ actual_token = headers.get('X-Auth-Token')
+ self.assertEqual(auth_token, actual_token)
+ if query_string:
+ self.assertTrue(url.endswith('?' + query_string))
+ if url.endswith('invalid_cert') and not insecure:
+ from swiftclient import client as c
+ raise c.ClientException("invalid_certificate")
+ return
+ conn.request = request
+
+ conn.has_been_read = False
+ _orig_read = conn.read
+
+ def read(*args, **kwargs):
+ conn.has_been_read = True
+ return _orig_read(*args, **kwargs)
+ conn.read = return_read or read
+
+ return parsed, conn
+ return wrapper
+ self.fake_http_connection = fake_http_connection
+
+ def tearDown(self):
+ super(MockHttpTest, self).tearDown()
+ reload_module(c)