summaryrefslogtreecommitdiff
path: root/openstackclient
diff options
context:
space:
mode:
Diffstat (limited to 'openstackclient')
-rw-r--r--openstackclient/api/auth.py104
-rw-r--r--openstackclient/common/clientmanager.py20
-rw-r--r--openstackclient/shell.py58
-rw-r--r--openstackclient/tests/common/test_clientmanager.py83
-rw-r--r--openstackclient/tests/test_shell.py427
5 files changed, 375 insertions, 317 deletions
diff --git a/openstackclient/api/auth.py b/openstackclient/api/auth.py
index ffbcb6fc..9fb26e71 100644
--- a/openstackclient/api/auth.py
+++ b/openstackclient/api/auth.py
@@ -77,25 +77,27 @@ def select_auth_plugin(options):
auth_plugin_name = None
- if options.os_auth_type in [plugin.name for plugin in get_plugin_list()]:
- # A direct plugin name was given, use it
- return options.os_auth_type
-
- if options.os_url and options.os_token:
+ # Do the token/url check first as this must override the default
+ # 'password' set by os-client-config
+ # Also, url and token are not copied into o-c-c's auth dict (yet?)
+ if options.auth.get('url', None) and options.auth.get('token', None):
# service token authentication
auth_plugin_name = 'token_endpoint'
- elif options.os_username:
- if options.os_identity_api_version == '3':
+ elif options.auth_type in [plugin.name for plugin in PLUGIN_LIST]:
+ # A direct plugin name was given, use it
+ auth_plugin_name = options.auth_type
+ elif options.auth.get('username', None):
+ if options.identity_api_version == '3':
auth_plugin_name = 'v3password'
- elif options.os_identity_api_version == '2.0':
+ elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2password'
else:
# let keystoneclient figure it out itself
auth_plugin_name = 'osc_password'
- elif options.os_token:
- if options.os_identity_api_version == '3':
+ elif options.auth.get('token', None):
+ if options.identity_api_version == '3':
auth_plugin_name = 'v3token'
- elif options.os_identity_api_version == '2.0':
+ elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2token'
else:
# let keystoneclient figure it out itself
@@ -109,35 +111,27 @@ def select_auth_plugin(options):
def build_auth_params(auth_plugin_name, cmd_options):
- auth_params = {}
+
+ auth_params = dict(cmd_options.auth)
if auth_plugin_name:
LOG.debug('auth_type: %s', auth_plugin_name)
auth_plugin_class = base.get_plugin_class(auth_plugin_name)
- plugin_options = auth_plugin_class.get_options()
- for option in plugin_options:
- option_name = 'os_' + option.dest
- LOG.debug('fetching option %s' % option_name)
- auth_params[option.dest] = getattr(cmd_options, option_name, None)
# grab tenant from project for v2.0 API compatibility
if auth_plugin_name.startswith("v2"):
- auth_params['tenant_id'] = getattr(
- cmd_options,
- 'os_project_id',
- None,
- )
- auth_params['tenant_name'] = getattr(
- cmd_options,
- 'os_project_name',
- None,
- )
+ if 'project_id' in auth_params:
+ auth_params['tenant_id'] = auth_params['project_id']
+ del auth_params['project_id']
+ if 'project_name' in auth_params:
+ auth_params['tenant_name'] = auth_params['project_name']
+ del auth_params['project_name']
else:
LOG.debug('no auth_type')
# delay the plugin choice, grab every option
+ auth_plugin_class = None
plugin_options = set([o.replace('-', '_') for o in get_options_list()])
for option in plugin_options:
- option_name = 'os_' + option
- LOG.debug('fetching option %s' % option_name)
- auth_params[option] = getattr(cmd_options, option_name, None)
+ LOG.debug('fetching option %s' % option)
+ auth_params[option] = getattr(cmd_options.auth, option, None)
return (auth_plugin_class, auth_params)
@@ -146,15 +140,29 @@ def check_valid_auth_options(options, auth_plugin_name):
msg = ''
if auth_plugin_name.endswith('password'):
- if not options.os_username:
- msg += _('Set a username with --os-username or OS_USERNAME\n')
- if not options.os_auth_url:
- msg += _('Set an authentication URL, with --os-auth-url or'
- ' OS_AUTH_URL\n')
- if (not options.os_project_id and not options.os_domain_id and not
- options.os_domain_name and not options.os_project_name):
+ if not options.auth.get('username', None):
+ msg += _('Set a username with --os-username, OS_USERNAME,'
+ ' or auth.username\n')
+ if not options.auth.get('auth_url', None):
+ msg += _('Set an authentication URL, with --os-auth-url,'
+ ' OS_AUTH_URL or auth.auth_url\n')
+ if (not options.auth.get('project_id', None) and not
+ options.auth.get('domain_id', None) and not
+ options.auth.get('domain_name', None) and not
+ options.auth.get('project_name', None)):
msg += _('Set a scope, such as a project or domain, with '
- '--os-project-name or OS_PROJECT_NAME')
+ '--os-project-name, OS_PROJECT_NAME or auth.project_name')
+ elif auth_plugin_name.endswith('token'):
+ if not options.auth.get('token', None):
+ msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
+ if not options.auth.get('auth_url', None):
+ msg += _('Set a service AUTH_URL, with --os-auth-url, '
+ 'OS_AUTH_URL or auth.auth_url\n')
+ elif auth_plugin_name == 'token_endpoint':
+ if not options.auth.get('token', None):
+ msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
+ if not options.auth.get('url', None):
+ msg += _('Set a service URL, with --os-url, OS_URL or auth.url\n')
if msg:
raise exc.CommandError('Missing parameter(s): \n%s' % msg)
@@ -171,6 +179,7 @@ def build_auth_plugins_option_parser(parser):
parser.add_argument(
'--os-auth-type',
metavar='<auth-type>',
+ dest='auth_type',
default=utils.env('OS_AUTH_TYPE'),
help='Select an auhentication type. Available types: ' +
', '.join(available_plugins) +
@@ -178,7 +187,7 @@ def build_auth_plugins_option_parser(parser):
' (Env: OS_AUTH_TYPE)',
choices=available_plugins
)
- # make sure we catch old v2.0 env values
+ # Maintain compatibility with old tenant env vars
envs = {
'OS_PROJECT_NAME': utils.env(
'OS_PROJECT_NAME',
@@ -190,15 +199,20 @@ def build_auth_plugins_option_parser(parser):
),
}
for o in get_options_list():
- # remove allusion to tenants from v2.0 API
+ # Remove tenant options from KSC plugins and replace them below
if 'tenant' not in o:
parser.add_argument(
'--os-' + o,
metavar='<auth-%s>' % o,
- default=envs.get(OPTIONS_LIST[o]['env'],
- utils.env(OPTIONS_LIST[o]['env'])),
- help='%s\n(Env: %s)' % (OPTIONS_LIST[o]['help'],
- OPTIONS_LIST[o]['env']),
+ dest=o.replace('-', '_'),
+ default=envs.get(
+ OPTIONS_LIST[o]['env'],
+ utils.env(OPTIONS_LIST[o]['env']),
+ ),
+ help='%s\n(Env: %s)' % (
+ OPTIONS_LIST[o]['help'],
+ OPTIONS_LIST[o]['env'],
+ ),
)
# add tenant-related options for compatibility
# this is deprecated but still used in some tempest tests...
@@ -206,14 +220,12 @@ def build_auth_plugins_option_parser(parser):
'--os-tenant-name',
metavar='<auth-tenant-name>',
dest='os_project_name',
- default=utils.env('OS_TENANT_NAME'),
help=argparse.SUPPRESS,
)
parser.add_argument(
'--os-tenant-id',
metavar='<auth-tenant-id>',
dest='os_project_id',
- default=utils.env('OS_TENANT_ID'),
help=argparse.SUPPRESS,
)
return parser
diff --git a/openstackclient/common/clientmanager.py b/openstackclient/common/clientmanager.py
index 6d2a9d76..ca5ece0d 100644
--- a/openstackclient/common/clientmanager.py
+++ b/openstackclient/common/clientmanager.py
@@ -58,7 +58,7 @@ class ClientManager(object):
def __init__(
self,
- cli_options,
+ cli_options=None,
api_version=None,
verify=True,
pw_func=None,
@@ -82,8 +82,8 @@ class ClientManager(object):
self._cli_options = cli_options
self._api_version = api_version
self._pw_callback = pw_func
- self._url = self._cli_options.os_url
- self._region_name = self._cli_options.os_region_name
+ self._url = self._cli_options.auth.get('url', None)
+ self._region_name = self._cli_options.region_name
self.timing = self._cli_options.timing
@@ -121,7 +121,7 @@ class ClientManager(object):
# Horrible hack alert...must handle prompt for null password if
# password auth is requested.
if (self.auth_plugin_name.endswith('password') and
- not self._cli_options.os_password):
+ not self._cli_options.auth.get('password', None)):
self._cli_options.os_password = self._pw_callback()
(auth_plugin, self._auth_params) = auth.build_auth_params(
@@ -129,13 +129,15 @@ class ClientManager(object):
self._cli_options,
)
- default_domain = self._cli_options.os_default_domain
+ # TODO(mordred): This is a usability improvement that's broadly useful
+ # We should port it back up into os-client-config.
+ default_domain = self._cli_options.default_domain
# NOTE(stevemar): If PROJECT_DOMAIN_ID or PROJECT_DOMAIN_NAME is
# present, then do not change the behaviour. Otherwise, set the
# PROJECT_DOMAIN_ID to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and
- not self._auth_params.get('project_domain_id') and
- not self._auth_params.get('project_domain_name')):
+ not self._auth_params.get('project_domain_id', None) and
+ not self._auth_params.get('project_domain_name', None)):
self._auth_params['project_domain_id'] = default_domain
# NOTE(stevemar): If USER_DOMAIN_ID or USER_DOMAIN_NAME is present,
@@ -143,8 +145,8 @@ class ClientManager(object):
# to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and
self.auth_plugin_name.endswith('password') and
- not self._auth_params.get('user_domain_id') and
- not self._auth_params.get('user_domain_name')):
+ not self._auth_params.get('user_domain_id', None) and
+ not self._auth_params.get('user_domain_name', None)):
self._auth_params['user_domain_id'] = default_domain
# For compatibility until all clients can be updated
diff --git a/openstackclient/shell.py b/openstackclient/shell.py
index f4e3b7e5..00f4a3c9 100644
--- a/openstackclient/shell.py
+++ b/openstackclient/shell.py
@@ -33,6 +33,8 @@ from openstackclient.common import exceptions as exc
from openstackclient.common import timing
from openstackclient.common import utils
+from os_client_config import config as cloud_config
+
DEFAULT_DOMAIN = 'default'
@@ -86,10 +88,6 @@ class OpenStackShell(app.App):
# Until we have command line arguments parsed, dump any stack traces
self.dump_stack_trace = True
- # This is instantiated in initialize_app() only when using
- # password flow auth
- self.auth_client = None
-
# Assume TLS host certificate verification is enabled
self.verify = True
@@ -165,10 +163,19 @@ class OpenStackShell(app.App):
description,
version)
+ # service token auth argument
+ parser.add_argument(
+ '--os-cloud',
+ metavar='<cloud-config-name>',
+ dest='cloud',
+ default=utils.env('OS_CLOUD'),
+ help='Cloud name in clouds.yaml (Env: OS_CLOUD)',
+ )
# Global arguments
parser.add_argument(
'--os-region-name',
metavar='<auth-region-name>',
+ dest='region_name',
default=utils.env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)')
parser.add_argument(
@@ -213,8 +220,43 @@ class OpenStackShell(app.App):
* authenticate against Identity if requested
"""
+ # Parent __init__ parses argv into self.options
super(OpenStackShell, self).initialize_app(argv)
+ # Resolve the verify/insecure exclusive pair here as cloud_config
+ # doesn't know about verify
+ self.options.insecure = (
+ self.options.insecure and not self.options.verify
+ )
+
+ # Set the default plugin to token_endpoint if rl and token are given
+ if (self.options.url and self.options.token):
+ # Use service token authentication
+ cloud_config.set_default('auth_type', 'token_endpoint')
+ else:
+ cloud_config.set_default('auth_type', 'osc_password')
+ self.log.debug("options: %s", self.options)
+
+ # Do configuration file handling
+ cc = cloud_config.OpenStackConfig()
+ self.log.debug("defaults: %s", cc.defaults)
+
+ self.cloud = cc.get_one_cloud(
+ cloud=self.options.cloud,
+ argparse=self.options,
+ )
+ self.log.debug("cloud cfg: %s", self.cloud.config)
+
+ # Set up client TLS
+ cacert = self.cloud.cacert
+ if cacert:
+ self.verify = cacert
+ else:
+ self.verify = not getattr(self.cloud.config, 'insecure', False)
+
+ # Neutralize verify option
+ self.options.verify = None
+
# Save default domain
self.default_domain = self.options.os_default_domain
@@ -260,14 +302,8 @@ class OpenStackShell(app.App):
# Handle deferred help and exit
self.print_help_if_requested()
- # Set up common client session
- if self.options.os_cacert:
- self.verify = self.options.os_cacert
- else:
- self.verify = not self.options.insecure
-
self.client_manager = clientmanager.ClientManager(
- cli_options=self.options,
+ cli_options=self.cloud,
verify=self.verify,
api_version=self.api_version,
pw_func=prompt_for_password,
diff --git a/openstackclient/tests/common/test_clientmanager.py b/openstackclient/tests/common/test_clientmanager.py
index 7a2f57be..26cf4967 100644
--- a/openstackclient/tests/common/test_clientmanager.py
+++ b/openstackclient/tests/common/test_clientmanager.py
@@ -48,13 +48,14 @@ class Container(object):
class FakeOptions(object):
def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST:
- setattr(self, 'os_' + option.replace('-', '_'), None)
- self.os_auth_type = None
- self.os_identity_api_version = '2.0'
+ setattr(self, option.replace('-', '_'), None)
+ self.auth_type = None
+ self.identity_api_version = '2.0'
self.timing = None
- self.os_region_name = None
- self.os_url = None
- self.os_default_domain = 'default'
+ self.region_name = None
+ self.url = None
+ self.auth = {}
+ self.default_domain = 'default'
self.__dict__.update(kwargs)
@@ -86,9 +87,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
- os_token=fakes.AUTH_TOKEN,
- os_url=fakes.AUTH_URL,
- os_auth_type='token_endpoint',
+ auth_type='token_endpoint',
+ auth=dict(
+ token=fakes.AUTH_TOKEN,
+ url=fakes.AUTH_URL,
+ ),
),
api_version=API_VERSION,
verify=True
@@ -114,9 +117,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
- os_token=fakes.AUTH_TOKEN,
- os_auth_url=fakes.AUTH_URL,
- os_auth_type='v2token',
+ auth=dict(
+ token=fakes.AUTH_TOKEN,
+ auth_url=fakes.AUTH_URL,
+ ),
+ auth_type='v2token',
),
api_version=API_VERSION,
verify=True
@@ -138,10 +143,12 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
- os_auth_url=fakes.AUTH_URL,
- os_username=fakes.USERNAME,
- os_password=fakes.PASSWORD,
- os_project_name=fakes.PROJECT_NAME,
+ auth=dict(
+ auth_url=fakes.AUTH_URL,
+ username=fakes.USERNAME,
+ password=fakes.PASSWORD,
+ project_name=fakes.PROJECT_NAME,
+ ),
),
api_version=API_VERSION,
verify=False,
@@ -198,11 +205,13 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
- os_auth_url=fakes.AUTH_URL,
- os_username=fakes.USERNAME,
- os_password=fakes.PASSWORD,
- os_project_name=fakes.PROJECT_NAME,
- os_auth_type='v2password',
+ auth=dict(
+ auth_url=fakes.AUTH_URL,
+ username=fakes.USERNAME,
+ password=fakes.PASSWORD,
+ project_name=fakes.PROJECT_NAME,
+ ),
+ auth_type='v2password',
),
api_version=API_VERSION,
verify='cafile',
@@ -214,8 +223,8 @@ class TestClientManager(utils.TestCase):
self.assertEqual('cafile', client_manager._cacert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name):
- auth_params['os_auth_type'] = auth_plugin_name
- auth_params['os_identity_api_version'] = api_version
+ auth_params['auth_type'] = auth_plugin_name
+ auth_params['identity_api_version'] = api_version
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params),
api_version=API_VERSION,
@@ -230,19 +239,33 @@ class TestClientManager(utils.TestCase):
def test_client_manager_select_auth_plugin(self):
# test token auth
- params = dict(os_token=fakes.AUTH_TOKEN,
- os_auth_url=fakes.AUTH_URL)
+ params = dict(
+ auth=dict(
+ auth_url=fakes.AUTH_URL,
+ token=fakes.AUTH_TOKEN,
+ ),
+ )
self._select_auth_plugin(params, '2.0', 'v2token')
self._select_auth_plugin(params, '3', 'v3token')
self._select_auth_plugin(params, 'XXX', 'token')
# test token/endpoint auth
- params = dict(os_token=fakes.AUTH_TOKEN, os_url='test')
+ params = dict(
+ auth_plugin='token_endpoint',
+ auth=dict(
+ url='test',
+ token=fakes.AUTH_TOKEN,
+ ),
+ )
self._select_auth_plugin(params, 'XXX', 'token_endpoint')
# test password auth
- params = dict(os_auth_url=fakes.AUTH_URL,
- os_username=fakes.USERNAME,
- os_password=fakes.PASSWORD,
- os_project_name=fakes.PROJECT_NAME)
+ params = dict(
+ auth=dict(
+ auth_url=fakes.AUTH_URL,
+ username=fakes.USERNAME,
+ password=fakes.PASSWORD,
+ project_name=fakes.PROJECT_NAME,
+ ),
+ )
self._select_auth_plugin(params, '2.0', 'v2password')
self._select_auth_plugin(params, '3', 'v3password')
self._select_auth_plugin(params, 'XXX', 'password')
diff --git a/openstackclient/tests/test_shell.py b/openstackclient/tests/test_shell.py
index f1043072..a43be954 100644
--- a/openstackclient/tests/test_shell.py
+++ b/openstackclient/tests/test_shell.py
@@ -82,34 +82,62 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "project"])
- self.assertEqual(default_args["auth_url"],
- _shell.options.os_auth_url)
- self.assertEqual(default_args["project_id"],
- _shell.options.os_project_id)
- self.assertEqual(default_args["project_name"],
- _shell.options.os_project_name)
- self.assertEqual(default_args["domain_id"],
- _shell.options.os_domain_id)
- self.assertEqual(default_args["domain_name"],
- _shell.options.os_domain_name)
- self.assertEqual(default_args["user_domain_id"],
- _shell.options.os_user_domain_id)
- self.assertEqual(default_args["user_domain_name"],
- _shell.options.os_user_domain_name)
- self.assertEqual(default_args["project_domain_id"],
- _shell.options.os_project_domain_id)
- self.assertEqual(default_args["project_domain_name"],
- _shell.options.os_project_domain_name)
- self.assertEqual(default_args["username"],
- _shell.options.os_username)
- self.assertEqual(default_args["password"],
- _shell.options.os_password)
- self.assertEqual(default_args["region_name"],
- _shell.options.os_region_name)
- self.assertEqual(default_args["trust_id"],
- _shell.options.os_trust_id)
- self.assertEqual(default_args['auth_type'],
- _shell.options.os_auth_type)
+ self.assertEqual(
+ default_args.get("auth_url", ''),
+ _shell.options.auth_url,
+ )
+ self.assertEqual(
+ default_args.get("project_id", ''),
+ _shell.options.project_id,
+ )
+ self.assertEqual(
+ default_args.get("project_name", ''),
+ _shell.options.project_name,
+ )
+ self.assertEqual(
+ default_args.get("domain_id", ''),
+ _shell.options.domain_id,
+ )
+ self.assertEqual(
+ default_args.get("domain_name", ''),
+ _shell.options.domain_name,
+ )
+ self.assertEqual(
+ default_args.get("user_domain_id", ''),
+ _shell.options.user_domain_id,
+ )
+ self.assertEqual(
+ default_args.get("user_domain_name", ''),
+ _shell.options.user_domain_name,
+ )
+ self.assertEqual(
+ default_args.get("project_domain_id", ''),
+ _shell.options.project_domain_id,
+ )
+ self.assertEqual(
+ default_args.get("project_domain_name", ''),
+ _shell.options.project_domain_name,
+ )
+ self.assertEqual(
+ default_args.get("username", ''),
+ _shell.options.username,
+ )
+ self.assertEqual(
+ default_args.get("password", ''),
+ _shell.options.password,
+ )
+ self.assertEqual(
+ default_args.get("region_name", ''),
+ _shell.options.region_name,
+ )
+ self.assertEqual(
+ default_args.get("trust_id", ''),
+ _shell.options.trust_id,
+ )
+ self.assertEqual(
+ default_args.get('auth_type', ''),
+ _shell.options.auth_type,
+ )
def _assert_token_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@@ -118,9 +146,34 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
- self.assertEqual(default_args["os_token"], _shell.options.os_token)
- self.assertEqual(default_args["os_auth_url"],
- _shell.options.os_auth_url)
+ self.assertEqual(
+ default_args.get("token", ''),
+ _shell.options.token,
+ "token"
+ )
+ self.assertEqual(
+ default_args.get("auth_url", ''),
+ _shell.options.auth_url,
+ "auth_url"
+ )
+
+ def _assert_token_endpoint_auth(self, cmd_options, default_args):
+ with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
+ self.app):
+ _shell, _cmd = make_shell(), cmd_options + " list role"
+ fake_execute(_shell, _cmd)
+
+ self.app.assert_called_with(["list", "role"])
+ self.assertEqual(
+ default_args.get("token", ''),
+ _shell.options.token,
+ "token",
+ )
+ self.assertEqual(
+ default_args.get("url", ''),
+ _shell.options.url,
+ "url",
+ )
def _assert_cli(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@@ -178,301 +231,233 @@ class TestShellPasswordAuth(TestShell):
flag = "--os-auth-url " + DEFAULT_AUTH_URL
kwargs = {
"auth_url": DEFAULT_AUTH_URL,
- "project_id": "",
- "project_name": "",
- "user_domain_id": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_id_flow(self):
flag = "--os-project-id " + DEFAULT_PROJECT_ID
kwargs = {
- "auth_url": "",
"project_id": DEFAULT_PROJECT_ID,
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_name_flow(self):
flag = "--os-project-name " + DEFAULT_PROJECT_NAME
kwargs = {
- "auth_url": "",
- "project_id": "",
"project_name": DEFAULT_PROJECT_NAME,
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_domain_id_flow(self):
flag = "--os-domain-id " + DEFAULT_DOMAIN_ID
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
"domain_id": DEFAULT_DOMAIN_ID,
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_domain_name_flow(self):
flag = "--os-domain-name " + DEFAULT_DOMAIN_NAME
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
"domain_name": DEFAULT_DOMAIN_NAME,
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_user_domain_id_flow(self):
flag = "--os-user-domain-id " + DEFAULT_USER_DOMAIN_ID
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
"user_domain_id": DEFAULT_USER_DOMAIN_ID,
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_user_domain_name_flow(self):
flag = "--os-user-domain-name " + DEFAULT_USER_DOMAIN_NAME
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
"user_domain_name": DEFAULT_USER_DOMAIN_NAME,
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_domain_id_flow(self):
flag = "--os-project-domain-id " + DEFAULT_PROJECT_DOMAIN_ID
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
"project_domain_id": DEFAULT_PROJECT_DOMAIN_ID,
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_domain_name_flow(self):
flag = "--os-project-domain-name " + DEFAULT_PROJECT_DOMAIN_NAME
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
"project_domain_name": DEFAULT_PROJECT_DOMAIN_NAME,
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_username_flow(self):
flag = "--os-username " + DEFAULT_USERNAME
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
"username": DEFAULT_USERNAME,
- "password": "",
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_password_flow(self):
flag = "--os-password " + DEFAULT_PASSWORD
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
"password": DEFAULT_PASSWORD,
- "region_name": "",
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_region_name_flow(self):
flag = "--os-region-name " + DEFAULT_REGION_NAME
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
"region_name": DEFAULT_REGION_NAME,
- "trust_id": "",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_trust_id_flow(self):
flag = "--os-trust-id " + "1234"
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
"trust_id": "1234",
- "auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_auth_type_flow(self):
flag = "--os-auth-type " + "v2password"
kwargs = {
- "auth_url": "",
- "project_id": "",
- "project_name": "",
- "domain_id": "",
- "domain_name": "",
- "user_domain_id": "",
- "user_domain_name": "",
- "project_domain_id": "",
- "project_domain_name": "",
- "username": "",
- "password": "",
- "region_name": "",
- "trust_id": "",
"auth_type": DEFAULT_AUTH_PLUGIN
}
self._assert_password_auth(flag, kwargs)
class TestShellTokenAuth(TestShell):
+ def test_only_token(self):
+ flag = "--os-token " + DEFAULT_TOKEN
+ kwargs = {
+ "token": DEFAULT_TOKEN,
+ "auth_url": '',
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_only_auth_url(self):
+ flag = "--os-auth-url " + DEFAULT_AUTH_URL
+ kwargs = {
+ "token": '',
+ "auth_url": DEFAULT_AUTH_URL,
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_empty_auth(self):
+ os.environ = {}
+ flag = ""
+ kwargs = {}
+ self._assert_token_auth(flag, kwargs)
+
+
+class TestShellTokenAuthEnv(TestShell):
def setUp(self):
- super(TestShellTokenAuth, self).setUp()
+ super(TestShellTokenAuthEnv, self).setUp()
env = {
"OS_TOKEN": DEFAULT_TOKEN,
- "OS_AUTH_URL": DEFAULT_SERVICE_URL,
+ "OS_AUTH_URL": DEFAULT_AUTH_URL,
}
self.orig_env, os.environ = os.environ, env.copy()
def tearDown(self):
- super(TestShellTokenAuth, self).tearDown()
+ super(TestShellTokenAuthEnv, self).tearDown()
os.environ = self.orig_env
- def test_default_auth(self):
+ def test_env(self):
flag = ""
kwargs = {
- "os_token": DEFAULT_TOKEN,
- "os_auth_url": DEFAULT_SERVICE_URL
+ "token": DEFAULT_TOKEN,
+ "auth_url": DEFAULT_AUTH_URL,
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_only_token(self):
+ flag = "--os-token xyzpdq"
+ kwargs = {
+ "token": "xyzpdq",
+ "auth_url": DEFAULT_AUTH_URL,
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_only_auth_url(self):
+ flag = "--os-auth-url http://cloud.local:555"
+ kwargs = {
+ "token": DEFAULT_TOKEN,
+ "auth_url": "http://cloud.local:555",
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_empty_auth(self):
+ os.environ = {}
+ flag = ""
+ kwargs = {
+ "token": '',
+ "auth_url": '',
+ }
+ self._assert_token_auth(flag, kwargs)
+
+
+class TestShellTokenEndpointAuth(TestShell):
+ def test_only_token(self):
+ flag = "--os-token " + DEFAULT_TOKEN
+ kwargs = {
+ "token": DEFAULT_TOKEN,
+ "url": '',
+ }
+ self._assert_token_endpoint_auth(flag, kwargs)
+
+ def test_only_url(self):
+ flag = "--os-url " + DEFAULT_SERVICE_URL
+ kwargs = {
+ "token": '',
+ "url": DEFAULT_SERVICE_URL,
+ }
+ self._assert_token_endpoint_auth(flag, kwargs)
+
+ def test_empty_auth(self):
+ os.environ = {}
+ flag = ""
+ kwargs = {
+ "token": '',
+ "auth_url": '',
+ }
+ self._assert_token_endpoint_auth(flag, kwargs)
+
+
+class TestShellTokenEndpointAuthEnv(TestShell):
+ def setUp(self):
+ super(TestShellTokenEndpointAuthEnv, self).setUp()
+ env = {
+ "OS_TOKEN": DEFAULT_TOKEN,
+ "OS_URL": DEFAULT_SERVICE_URL,
+ }
+ self.orig_env, os.environ = os.environ, env.copy()
+
+ def tearDown(self):
+ super(TestShellTokenEndpointAuthEnv, self).tearDown()
+ os.environ = self.orig_env
+
+ def test_env(self):
+ flag = ""
+ kwargs = {
+ "token": DEFAULT_TOKEN,
+ "url": DEFAULT_SERVICE_URL,
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_only_token(self):
+ flag = "--os-token xyzpdq"
+ kwargs = {
+ "token": "xyzpdq",
+ "url": DEFAULT_SERVICE_URL,
+ }
+ self._assert_token_auth(flag, kwargs)
+
+ def test_only_url(self):
+ flag = "--os-url http://cloud.local:555"
+ kwargs = {
+ "token": DEFAULT_TOKEN,
+ "url": "http://cloud.local:555",
}
self._assert_token_auth(flag, kwargs)
@@ -480,8 +465,8 @@ class TestShellTokenAuth(TestShell):
os.environ = {}
flag = ""
kwargs = {
- "os_token": "",
- "os_auth_url": ""
+ "token": '',
+ "url": '',
}
self._assert_token_auth(flag, kwargs)