summaryrefslogtreecommitdiff
path: root/troveclient/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'troveclient/client.py')
-rw-r--r--troveclient/client.py592
1 files changed, 313 insertions, 279 deletions
diff --git a/troveclient/client.py b/troveclient/client.py
index 87fecea..14ec959 100644
--- a/troveclient/client.py
+++ b/troveclient/client.py
@@ -13,12 +13,24 @@
# License for the specific language governing permissions and limitations
# under the License.
-import httplib2
+"""
+OpenStack Client interface. Handles the REST calls and responses.
+"""
+
+from __future__ import print_function
+
import logging
import os
-import time
-import urlparse
-import sys
+
+try:
+ import urlparse
+except ImportError:
+ import urllib.parse as urlparse
+
+try:
+ from eventlet import sleep
+except ImportError:
+ from time import sleep
try:
import json
@@ -30,112 +42,67 @@ if not hasattr(urlparse, 'parse_qsl'):
import cgi
urlparse.parse_qsl = cgi.parse_qsl
-from troveclient import auth
-from troveclient import exceptions
-
+import requests
-_logger = logging.getLogger(__name__)
-RDC_PP = os.environ.get("RDC_PP", "False") == "True"
+from troveclient.openstack.common.apiclient import exceptions
+from troveclient import service_catalog
+from troveclient import utils
+from troveclient.openstack.common.apiclient import client
-expected_errors = (400, 401, 403, 404, 408, 409, 413, 422, 500, 501)
-
-
-def log_to_streamhandler(stream=None):
- stream = stream or sys.stderr
- ch = logging.StreamHandler(stream)
- _logger.setLevel(logging.DEBUG)
- _logger.addHandler(ch)
-
-
-if 'REDDWARFCLIENT_DEBUG' in os.environ and os.environ['REDDWARFCLIENT_DEBUG']:
- log_to_streamhandler()
-
-
-class TroveHTTPClient(httplib2.Http):
+class HTTPClient(object):
USER_AGENT = 'python-troveclient'
- def __init__(self, user, password, tenant, auth_url, service_name,
- service_url=None,
- auth_strategy=None, insecure=False,
- timeout=None, proxy_tenant_id=None,
+ def __init__(self, user, password, projectid, auth_url, insecure=False,
+ timeout=None, tenant_id=None, proxy_tenant_id=None,
proxy_token=None, region_name=None,
endpoint_type='publicURL', service_type=None,
- timings=False):
-
- super(TroveHTTPClient, self).__init__(timeout=timeout)
-
- self.username = user
+ service_name=None, database_service_name=None, retries=None,
+ http_log_debug=False, cacert=None):
+ self.user = user
self.password = password
- self.tenant = tenant
- if auth_url:
- self.auth_url = auth_url.rstrip('/')
- else:
- self.auth_url = None
+ self.projectid = projectid
+ self.tenant_id = tenant_id
+ self.auth_url = auth_url.rstrip('/')
+ self.version = 'v1'
self.region_name = region_name
self.endpoint_type = endpoint_type
- self.service_url = service_url
self.service_type = service_type
self.service_name = service_name
- self.timings = timings
-
- self.times = [] # [("item", starttime, endtime), ...]
+ self.database_service_name = database_service_name
+ self.retries = int(retries or 0)
+ self.http_log_debug = http_log_debug
+ self.management_url = None
self.auth_token = None
self.proxy_token = proxy_token
self.proxy_tenant_id = proxy_tenant_id
+ self.timeout = timeout
- # httplib2 overrides
- self.force_exception_to_status_code = True
- self.disable_ssl_certificate_validation = insecure
-
- auth_cls = auth.get_authenticator_cls(auth_strategy)
-
- self.authenticator = auth_cls(self, auth_strategy,
- self.auth_url, self.username,
- self.password, self.tenant,
- region=region_name,
- service_type=service_type,
- service_name=service_name,
- service_url=service_url)
-
- def get_timings(self):
- return self.times
-
- def http_log(self, args, kwargs, resp, body):
- if not RDC_PP:
- self.simple_log(args, kwargs, resp, body)
+ if insecure:
+ self.verify_cert = False
else:
- self.pretty_log(args, kwargs, resp, body)
-
- def simple_log(self, args, kwargs, resp, body):
- if not _logger.isEnabledFor(logging.DEBUG):
- return
-
- string_parts = ['curl -i']
- for element in args:
- if element in ('GET', 'POST'):
- string_parts.append(' -X %s' % element)
+ if cacert:
+ self.verify_cert = cacert
else:
- string_parts.append(' %s' % element)
-
- for element in kwargs['headers']:
- header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
- string_parts.append(header)
-
- _logger.debug("REQ: %s\n" % "".join(string_parts))
- if 'body' in kwargs:
- _logger.debug("REQ BODY: %s\n" % (kwargs['body']))
- _logger.debug("RESP:%s %s\n", resp, body)
-
- def pretty_log(self, args, kwargs, resp, body):
- if not _logger.isEnabledFor(logging.DEBUG):
+ self.verify_cert = True
+
+ self._logger = logging.getLogger(__name__)
+ if self.http_log_debug and not self._logger.handlers:
+ ch = logging.StreamHandler()
+ self._logger.setLevel(logging.DEBUG)
+ self._logger.addHandler(ch)
+ if hasattr(requests, 'logging'):
+ requests.logging.getLogger(requests.__name__).addHandler(ch)
+
+ def http_log_req(self, args, kwargs):
+ if not self.http_log_debug:
return
string_parts = ['curl -i']
for element in args:
- if element in ('GET', 'POST'):
+ if element in ('GET', 'POST', 'DELETE', 'PUT'):
string_parts.append(' -X %s' % element)
else:
string_parts.append(' %s' % element)
@@ -144,97 +111,96 @@ class TroveHTTPClient(httplib2.Http):
header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
string_parts.append(header)
- curl_cmd = "".join(string_parts)
- _logger.debug("REQUEST:")
- if 'body' in kwargs:
- _logger.debug("%s -d '%s'" % (curl_cmd, kwargs['body']))
- try:
- req_body = json.dumps(json.loads(kwargs['body']),
- sort_keys=True, indent=4)
- except:
- req_body = kwargs['body']
- _logger.debug("BODY: %s\n" % (req_body))
- else:
- _logger.debug(curl_cmd)
+ if 'data' in kwargs:
+ string_parts.append(" -d '%s'" % (kwargs['data']))
+ self._logger.debug("\nREQ: %s\n" % "".join(string_parts))
- try:
- resp_body = json.dumps(json.loads(body), sort_keys=True, indent=4)
- except:
- resp_body = body
- _logger.debug("RESPONSE HEADERS: %s" % resp)
- _logger.debug("RESPONSE BODY : %s" % resp_body)
+ def http_log_resp(self, resp):
+ if not self.http_log_debug:
+ return
+ self._logger.debug(
+ "RESP: [%s] %s\nRESP BODY: %s\n",
+ resp.status_code,
+ resp.headers,
+ resp.text)
- def request(self, *args, **kwargs):
+ def request(self, url, method, **kwargs):
kwargs.setdefault('headers', kwargs.get('headers', {}))
kwargs['headers']['User-Agent'] = self.USER_AGENT
- self.morph_request(kwargs)
-
- resp, body = super(TroveHTTPClient, self).request(*args, **kwargs)
-
- # Save this in case anyone wants it.
- self.last_response = (resp, body)
- self.http_log(args, kwargs, resp, body)
-
- if body:
+ kwargs['headers']['Accept'] = 'application/json'
+ if 'body' in kwargs:
+ kwargs['headers']['Content-Type'] = 'application/json'
+ kwargs['data'] = json.dumps(kwargs['body'])
+ del kwargs['body']
+
+ if self.timeout:
+ kwargs.setdefault('timeout', self.timeout)
+ self.http_log_req((url, method,), kwargs)
+ resp = requests.request(
+ method,
+ url,
+ verify=self.verify_cert,
+ **kwargs)
+ self.http_log_resp(resp)
+
+ if resp.text:
try:
- body = self.morph_response_body(body)
- except exceptions.ResponseFormatError:
- # Acceptable only if the response status is an error code.
- # Otherwise its the API or client misbehaving.
- self.raise_error_from_status(resp, None)
- raise # Not accepted!
+ body = json.loads(resp.text)
+ except ValueError:
+ pass
+ body = None
else:
body = None
- if resp.status in expected_errors:
- raise exceptions.from_response(resp, body)
+ if resp.status_code >= 400:
+ raise exceptions.from_response(resp, body, url)
return resp, body
- def raise_error_from_status(self, resp, body):
- if resp.status in expected_errors:
- raise exceptions.from_response(resp, body)
-
- def morph_request(self, kwargs):
- kwargs['headers']['Accept'] = 'application/json'
- kwargs['headers']['Content-Type'] = 'application/json'
- if 'body' in kwargs:
- kwargs['body'] = json.dumps(kwargs['body'])
-
- def morph_response_body(self, body_string):
- try:
- return json.loads(body_string)
- except ValueError:
- raise exceptions.ResponseFormatError()
-
- def _time_request(self, url, method, **kwargs):
- start_time = time.time()
- resp, body = self.request(url, method, **kwargs)
- self.times.append(("%s %s" % (method, url),
- start_time, time.time()))
- return resp, body
-
def _cs_request(self, url, method, **kwargs):
- def request():
+ auth_attempts = 0
+ attempts = 0
+ backoff = 1
+ while True:
+ attempts += 1
+ if not self.management_url or not self.auth_token:
+ self.authenticate()
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
- if self.tenant:
- kwargs['headers']['X-Auth-Project-Id'] = self.tenant
-
- resp, body = self._time_request(self.service_url + url, method,
- **kwargs)
- return resp, body
-
- if not self.auth_token or not self.service_url:
- self.authenticate()
-
- # Perform the request once. If we get a 401 back then it
- # might be because the auth token expired, so try to
- # re-authenticate and try again. If it still fails, bail.
- try:
- return request()
- except exceptions.Unauthorized, ex:
- self.authenticate()
- return request()
+ if self.projectid:
+ kwargs['headers']['X-Auth-Project-Id'] = self.projectid
+ try:
+ resp, body = self.request(self.management_url + url, method,
+ **kwargs)
+ return resp, body
+ except exceptions.BadRequest as e:
+ if attempts > self.retries:
+ raise
+ except exceptions.Unauthorized:
+ if auth_attempts > 0:
+ raise
+ self._logger.debug("Unauthorized, reauthenticating.")
+ self.management_url = self.auth_token = None
+ # First reauth. Discount this attempt.
+ attempts -= 1
+ auth_attempts += 1
+ continue
+ except exceptions.ClientException as e:
+ if attempts > self.retries:
+ raise
+ if 500 <= e.code <= 599:
+ pass
+ else:
+ raise
+ except requests.exceptions.ConnectionError as e:
+ # Catch a connection refused from requests.request
+ self._logger.debug("Connection refused: %s" % e)
+ msg = 'Unable to establish connection: %s' % e
+ raise exceptions.ConnectionError(msg)
+ self._logger.debug(
+ "Failed attempt(%s of %s), retrying in %s seconds" %
+ (attempts, self.retries, backoff))
+ sleep(backoff)
+ backoff *= 2
def get(self, url, **kwargs):
return self._cs_request(url, 'GET', **kwargs)
@@ -248,124 +214,192 @@ class TroveHTTPClient(httplib2.Http):
def delete(self, url, **kwargs):
return self._cs_request(url, 'DELETE', **kwargs)
- def authenticate(self):
- """Auths the client and gets a token. May optionally set a service url.
-
- The client will get auth errors until the authentication step
- occurs. Additionally, if a service_url was not explicitly given in
- the clients __init__ method, one will be obtained from the auth
- service.
-
+ def _extract_service_catalog(self, url, resp, body, extract_token=True):
+ """See what the auth service told us and process the response.
+ We may get redirected to another site, fail or actually get
+ back a service catalog with a token and our endpoints.
"""
- catalog = self.authenticator.authenticate()
- if self.service_url:
- possible_service_url = None
+
+ if resp.status_code == 200: # content must always present
+ try:
+ self.auth_url = url
+ self.service_catalog = \
+ service_catalog.ServiceCatalog(body)
+
+ if extract_token:
+ self.auth_token = self.service_catalog.get_token()
+
+ management_url = self.service_catalog.url_for(
+ attr='region',
+ filter_value=self.region_name,
+ endpoint_type=self.endpoint_type,
+ service_type=self.service_type,
+ service_name=self.service_name,
+ database_service_name=self.database_service_name)
+ self.management_url = management_url.rstrip('/')
+ return None
+ except exceptions.AmbiguousEndpoints:
+ print("Found more than one valid endpoint. Use a more "
+ "restrictive filter")
+ raise
+ except KeyError:
+ raise exceptions.AuthorizationFailure()
+ except exceptions.EndpointNotFound:
+ print("Could not find any suitable endpoint. Correct region?")
+ raise
+
+ elif resp.status_code == 305:
+ return resp['location']
else:
- if self.endpoint_type == "publicURL":
- possible_service_url = catalog.get_public_url()
- elif self.endpoint_type == "adminURL":
- possible_service_url = catalog.get_management_url()
- self.authenticate_with_token(catalog.get_token(), possible_service_url)
-
- def authenticate_with_token(self, token, service_url=None):
- self.auth_token = token
- if not self.service_url:
- if not service_url:
- raise exceptions.ServiceUrlNotGiven()
- else:
- self.service_url = service_url
-
-
-class Dbaas(object):
- """
- Top-level object to access the Rackspace Database as a Service API.
-
- Create an instance with your creds::
-
- >>> red = Dbaas(USERNAME, API_KEY, TENANT, AUTH_URL, SERVICE_NAME, \
- SERVICE_URL)
-
- Then call methods on its managers::
-
- >>> red.instances.list()
- ...
- >>> red.flavors.list()
- ...
-
- &c.
- """
-
- def __init__(self, username, api_key, tenant=None, auth_url=None,
- service_type='database', service_name=None,
- service_url=None, insecure=False, auth_strategy='keystone',
- region_name=None, client_cls=TroveHTTPClient):
- from troveclient.versions import Versions
- from troveclient.databases import Databases
- from troveclient.flavors import Flavors
- from troveclient.instances import Instances
- from troveclient.limits import Limits
- from troveclient.users import Users
- from troveclient.root import Root
- from troveclient.hosts import Hosts
- from troveclient.quota import Quotas
- from troveclient.backups import Backups
- from troveclient.security_groups import SecurityGroups
- from troveclient.security_groups import SecurityGroupRules
- from troveclient.storage import StorageInfo
- from troveclient.management import Management
- from troveclient.management import MgmtFlavors
- from troveclient.accounts import Accounts
- from troveclient.diagnostics import DiagnosticsInterrogator
- from troveclient.diagnostics import HwInfoInterrogator
-
- self.client = client_cls(username, api_key, tenant, auth_url,
- service_type=service_type,
- service_name=service_name,
- service_url=service_url,
- insecure=insecure,
- auth_strategy=auth_strategy,
- region_name=region_name)
- self.versions = Versions(self)
- self.databases = Databases(self)
- self.flavors = Flavors(self)
- self.instances = Instances(self)
- self.limits = Limits(self)
- self.users = Users(self)
- self.root = Root(self)
- self.hosts = Hosts(self)
- self.quota = Quotas(self)
- self.backups = Backups(self)
- self.security_groups = SecurityGroups(self)
- self.security_group_rules = SecurityGroupRules(self)
- self.storage = StorageInfo(self)
- self.management = Management(self)
- self.mgmt_flavor = MgmtFlavors(self)
- self.accounts = Accounts(self)
- self.diagnostics = DiagnosticsInterrogator(self)
- self.hwinfo = HwInfoInterrogator(self)
-
- class Mgmt(object):
- def __init__(self, dbaas):
- self.instances = dbaas.management
- self.hosts = dbaas.hosts
- self.accounts = dbaas.accounts
- self.storage = dbaas.storage
-
- self.mgmt = Mgmt(self)
-
- def set_management_url(self, url):
- self.client.management_url = url
-
- def get_timings(self):
- return self.client.get_timings()
+ raise exceptions.from_response(resp, body, url)
- def authenticate(self):
- """
- Authenticate against the server.
+ def _fetch_endpoints_from_auth(self, url):
+ """We have a token, but don't know the final endpoint for
+ the region. We have to go back to the auth service and
+ ask again. This request requires an admin-level token
+ to work. The proxy token supplied could be from a low-level enduser.
- This is called to perform an authentication to retrieve a token.
+ We can't get this from the keystone service endpoint, we have to use
+ the admin endpoint.
- Returns on success; raises :exc:`exceptions.Unauthorized` if the
- credentials are wrong.
+ This will overwrite our admin token with the user token.
"""
- self.client.authenticate()
+
+ # GET ...:5001/v2.0/tokens/#####/endpoints
+ url = '/'.join([url, 'tokens', '%s?belongsTo=%s'
+ % (self.proxy_token, self.proxy_tenant_id)])
+ self._logger.debug("Using Endpoint URL: %s" % url)
+ resp, body = self.request(url, "GET",
+ headers={'X-Auth-Token': self.auth_token})
+ return self._extract_service_catalog(url, resp, body,
+ extract_token=False)
+
+ def authenticate(self):
+ magic_tuple = urlparse.urlsplit(self.auth_url)
+ scheme, netloc, path, query, frag = magic_tuple
+ port = magic_tuple.port
+ if port is None:
+ port = 80
+ path_parts = path.split('/')
+ for part in path_parts:
+ if len(part) > 0 and part[0] == 'v':
+ self.version = part
+ break
+
+ # TODO(sandy): Assume admin endpoint is 35357 for now.
+ # Ideally this is going to have to be provided by the service catalog.
+ new_netloc = netloc.replace(':%d' % port, ':%d' % (35357,))
+ admin_url = urlparse.urlunsplit((scheme, new_netloc,
+ path, query, frag))
+
+ auth_url = self.auth_url
+ if self.version == "v2.0":
+ while auth_url:
+ if "TROVE_RAX_AUTH" in os.environ:
+ auth_url = self._rax_auth(auth_url)
+ else:
+ auth_url = self._v2_auth(auth_url)
+
+ # Are we acting on behalf of another user via an
+ # existing token? If so, our actual endpoints may
+ # be different than that of the admin token.
+ if self.proxy_token:
+ self._fetch_endpoints_from_auth(admin_url)
+ # Since keystone no longer returns the user token
+ # with the endpoints any more, we need to replace
+ # our service account token with the user token.
+ self.auth_token = self.proxy_token
+ else:
+ try:
+ while auth_url:
+ auth_url = self._v1_auth(auth_url)
+ # In some configurations trove makes redirection to
+ # v2.0 keystone endpoint. Also, new location does not contain
+ # real endpoint, only hostname and port.
+ except exceptions.AuthorizationFailure:
+ if auth_url.find('v2.0') < 0:
+ auth_url = auth_url + '/v2.0'
+ self._v2_auth(auth_url)
+
+ def _v1_auth(self, url):
+ if self.proxy_token:
+ raise exceptions.NoTokenLookupException()
+
+ headers = {'X-Auth-User': self.user,
+ 'X-Auth-Key': self.password}
+ if self.projectid:
+ headers['X-Auth-Project-Id'] = self.projectid
+
+ resp, body = self.request(url, 'GET', headers=headers)
+ if resp.status_code in (200, 204): # in some cases we get No Content
+ try:
+ mgmt_header = 'x-server-management-url'
+ self.management_url = resp.headers[mgmt_header].rstrip('/')
+ self.auth_token = resp.headers['x-auth-token']
+ self.auth_url = url
+ except (KeyError, TypeError):
+ raise exceptions.AuthorizationFailure()
+ elif resp.status_code == 305:
+ return resp.headers['location']
+ else:
+ raise exceptions.from_response(resp, body, url)
+
+ def _v2_auth(self, url):
+ """Authenticate against a v2.0 auth service."""
+ body = {"auth": {
+ "passwordCredentials": {"username": self.user,
+ "password": self.password}}}
+
+ if self.projectid:
+ body['auth']['tenantName'] = self.projectid
+ elif self.tenant_id:
+ body['auth']['tenantId'] = self.tenant_id
+
+ self._authenticate(url, body)
+
+ def _rax_auth(self, url):
+ """Authenticate against the Rackspace auth service."""
+ body = {"auth": {
+ "RAX-KSKEY:apiKeyCredentials": {
+ "username": self.user,
+ "apiKey": self.password,
+ "tenantName": self.projectid}}}
+
+ self._authenticate(url, body)
+
+ def _authenticate(self, url, body):
+ """Authenticate and extract the service catalog."""
+ token_url = url + "/tokens"
+
+ # Make sure we follow redirects when trying to reach Keystone
+ resp, body = self.request(
+ token_url,
+ "POST",
+ body=body,
+ allow_redirects=True)
+
+ return self._extract_service_catalog(url, resp, body)
+
+ def get_database_api_version_from_endpoint(self):
+ magic_tuple = urlparse.urlsplit(self.management_url)
+ scheme, netloc, path, query, frag = magic_tuple
+ v = path.split("/")[1]
+ valid_versions = ['v1.0']
+ if v not in valid_versions:
+ msg = "Invalid client version '%s'. must be one of: %s" % (
+ (v, ', '.join(valid_versions)))
+ raise exceptions.UnsupportedVersion(msg)
+ return v[1:]
+
+
+def get_version_map():
+ return {
+ '1.0': 'troveclient.v1.client.Client',
+ }
+
+
+def Client(version, *args, **kwargs):
+ version_map = get_version_map()
+ client_class = client.BaseClient.get_class('database',
+ version, version_map)
+ return client_class(*args, **kwargs)