diff options
Diffstat (limited to 'troveclient/client.py')
-rw-r--r-- | troveclient/client.py | 592 |
1 files changed, 313 insertions, 279 deletions
diff --git a/troveclient/client.py b/troveclient/client.py index 87fecea..14ec959 100644 --- a/troveclient/client.py +++ b/troveclient/client.py @@ -13,12 +13,24 @@ # License for the specific language governing permissions and limitations # under the License. -import httplib2 +""" +OpenStack Client interface. Handles the REST calls and responses. +""" + +from __future__ import print_function + import logging import os -import time -import urlparse -import sys + +try: + import urlparse +except ImportError: + import urllib.parse as urlparse + +try: + from eventlet import sleep +except ImportError: + from time import sleep try: import json @@ -30,112 +42,67 @@ if not hasattr(urlparse, 'parse_qsl'): import cgi urlparse.parse_qsl = cgi.parse_qsl -from troveclient import auth -from troveclient import exceptions - +import requests -_logger = logging.getLogger(__name__) -RDC_PP = os.environ.get("RDC_PP", "False") == "True" +from troveclient.openstack.common.apiclient import exceptions +from troveclient import service_catalog +from troveclient import utils +from troveclient.openstack.common.apiclient import client -expected_errors = (400, 401, 403, 404, 408, 409, 413, 422, 500, 501) - - -def log_to_streamhandler(stream=None): - stream = stream or sys.stderr - ch = logging.StreamHandler(stream) - _logger.setLevel(logging.DEBUG) - _logger.addHandler(ch) - - -if 'REDDWARFCLIENT_DEBUG' in os.environ and os.environ['REDDWARFCLIENT_DEBUG']: - log_to_streamhandler() - - -class TroveHTTPClient(httplib2.Http): +class HTTPClient(object): USER_AGENT = 'python-troveclient' - def __init__(self, user, password, tenant, auth_url, service_name, - service_url=None, - auth_strategy=None, insecure=False, - timeout=None, proxy_tenant_id=None, + def __init__(self, user, password, projectid, auth_url, insecure=False, + timeout=None, tenant_id=None, proxy_tenant_id=None, proxy_token=None, region_name=None, endpoint_type='publicURL', service_type=None, - timings=False): - - super(TroveHTTPClient, self).__init__(timeout=timeout) - - self.username = user + service_name=None, database_service_name=None, retries=None, + http_log_debug=False, cacert=None): + self.user = user self.password = password - self.tenant = tenant - if auth_url: - self.auth_url = auth_url.rstrip('/') - else: - self.auth_url = None + self.projectid = projectid + self.tenant_id = tenant_id + self.auth_url = auth_url.rstrip('/') + self.version = 'v1' self.region_name = region_name self.endpoint_type = endpoint_type - self.service_url = service_url self.service_type = service_type self.service_name = service_name - self.timings = timings - - self.times = [] # [("item", starttime, endtime), ...] + self.database_service_name = database_service_name + self.retries = int(retries or 0) + self.http_log_debug = http_log_debug + self.management_url = None self.auth_token = None self.proxy_token = proxy_token self.proxy_tenant_id = proxy_tenant_id + self.timeout = timeout - # httplib2 overrides - self.force_exception_to_status_code = True - self.disable_ssl_certificate_validation = insecure - - auth_cls = auth.get_authenticator_cls(auth_strategy) - - self.authenticator = auth_cls(self, auth_strategy, - self.auth_url, self.username, - self.password, self.tenant, - region=region_name, - service_type=service_type, - service_name=service_name, - service_url=service_url) - - def get_timings(self): - return self.times - - def http_log(self, args, kwargs, resp, body): - if not RDC_PP: - self.simple_log(args, kwargs, resp, body) + if insecure: + self.verify_cert = False else: - self.pretty_log(args, kwargs, resp, body) - - def simple_log(self, args, kwargs, resp, body): - if not _logger.isEnabledFor(logging.DEBUG): - return - - string_parts = ['curl -i'] - for element in args: - if element in ('GET', 'POST'): - string_parts.append(' -X %s' % element) + if cacert: + self.verify_cert = cacert else: - string_parts.append(' %s' % element) - - for element in kwargs['headers']: - header = ' -H "%s: %s"' % (element, kwargs['headers'][element]) - string_parts.append(header) - - _logger.debug("REQ: %s\n" % "".join(string_parts)) - if 'body' in kwargs: - _logger.debug("REQ BODY: %s\n" % (kwargs['body'])) - _logger.debug("RESP:%s %s\n", resp, body) - - def pretty_log(self, args, kwargs, resp, body): - if not _logger.isEnabledFor(logging.DEBUG): + self.verify_cert = True + + self._logger = logging.getLogger(__name__) + if self.http_log_debug and not self._logger.handlers: + ch = logging.StreamHandler() + self._logger.setLevel(logging.DEBUG) + self._logger.addHandler(ch) + if hasattr(requests, 'logging'): + requests.logging.getLogger(requests.__name__).addHandler(ch) + + def http_log_req(self, args, kwargs): + if not self.http_log_debug: return string_parts = ['curl -i'] for element in args: - if element in ('GET', 'POST'): + if element in ('GET', 'POST', 'DELETE', 'PUT'): string_parts.append(' -X %s' % element) else: string_parts.append(' %s' % element) @@ -144,97 +111,96 @@ class TroveHTTPClient(httplib2.Http): header = ' -H "%s: %s"' % (element, kwargs['headers'][element]) string_parts.append(header) - curl_cmd = "".join(string_parts) - _logger.debug("REQUEST:") - if 'body' in kwargs: - _logger.debug("%s -d '%s'" % (curl_cmd, kwargs['body'])) - try: - req_body = json.dumps(json.loads(kwargs['body']), - sort_keys=True, indent=4) - except: - req_body = kwargs['body'] - _logger.debug("BODY: %s\n" % (req_body)) - else: - _logger.debug(curl_cmd) + if 'data' in kwargs: + string_parts.append(" -d '%s'" % (kwargs['data'])) + self._logger.debug("\nREQ: %s\n" % "".join(string_parts)) - try: - resp_body = json.dumps(json.loads(body), sort_keys=True, indent=4) - except: - resp_body = body - _logger.debug("RESPONSE HEADERS: %s" % resp) - _logger.debug("RESPONSE BODY : %s" % resp_body) + def http_log_resp(self, resp): + if not self.http_log_debug: + return + self._logger.debug( + "RESP: [%s] %s\nRESP BODY: %s\n", + resp.status_code, + resp.headers, + resp.text) - def request(self, *args, **kwargs): + def request(self, url, method, **kwargs): kwargs.setdefault('headers', kwargs.get('headers', {})) kwargs['headers']['User-Agent'] = self.USER_AGENT - self.morph_request(kwargs) - - resp, body = super(TroveHTTPClient, self).request(*args, **kwargs) - - # Save this in case anyone wants it. - self.last_response = (resp, body) - self.http_log(args, kwargs, resp, body) - - if body: + kwargs['headers']['Accept'] = 'application/json' + if 'body' in kwargs: + kwargs['headers']['Content-Type'] = 'application/json' + kwargs['data'] = json.dumps(kwargs['body']) + del kwargs['body'] + + if self.timeout: + kwargs.setdefault('timeout', self.timeout) + self.http_log_req((url, method,), kwargs) + resp = requests.request( + method, + url, + verify=self.verify_cert, + **kwargs) + self.http_log_resp(resp) + + if resp.text: try: - body = self.morph_response_body(body) - except exceptions.ResponseFormatError: - # Acceptable only if the response status is an error code. - # Otherwise its the API or client misbehaving. - self.raise_error_from_status(resp, None) - raise # Not accepted! + body = json.loads(resp.text) + except ValueError: + pass + body = None else: body = None - if resp.status in expected_errors: - raise exceptions.from_response(resp, body) + if resp.status_code >= 400: + raise exceptions.from_response(resp, body, url) return resp, body - def raise_error_from_status(self, resp, body): - if resp.status in expected_errors: - raise exceptions.from_response(resp, body) - - def morph_request(self, kwargs): - kwargs['headers']['Accept'] = 'application/json' - kwargs['headers']['Content-Type'] = 'application/json' - if 'body' in kwargs: - kwargs['body'] = json.dumps(kwargs['body']) - - def morph_response_body(self, body_string): - try: - return json.loads(body_string) - except ValueError: - raise exceptions.ResponseFormatError() - - def _time_request(self, url, method, **kwargs): - start_time = time.time() - resp, body = self.request(url, method, **kwargs) - self.times.append(("%s %s" % (method, url), - start_time, time.time())) - return resp, body - def _cs_request(self, url, method, **kwargs): - def request(): + auth_attempts = 0 + attempts = 0 + backoff = 1 + while True: + attempts += 1 + if not self.management_url or not self.auth_token: + self.authenticate() kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token - if self.tenant: - kwargs['headers']['X-Auth-Project-Id'] = self.tenant - - resp, body = self._time_request(self.service_url + url, method, - **kwargs) - return resp, body - - if not self.auth_token or not self.service_url: - self.authenticate() - - # Perform the request once. If we get a 401 back then it - # might be because the auth token expired, so try to - # re-authenticate and try again. If it still fails, bail. - try: - return request() - except exceptions.Unauthorized, ex: - self.authenticate() - return request() + if self.projectid: + kwargs['headers']['X-Auth-Project-Id'] = self.projectid + try: + resp, body = self.request(self.management_url + url, method, + **kwargs) + return resp, body + except exceptions.BadRequest as e: + if attempts > self.retries: + raise + except exceptions.Unauthorized: + if auth_attempts > 0: + raise + self._logger.debug("Unauthorized, reauthenticating.") + self.management_url = self.auth_token = None + # First reauth. Discount this attempt. + attempts -= 1 + auth_attempts += 1 + continue + except exceptions.ClientException as e: + if attempts > self.retries: + raise + if 500 <= e.code <= 599: + pass + else: + raise + except requests.exceptions.ConnectionError as e: + # Catch a connection refused from requests.request + self._logger.debug("Connection refused: %s" % e) + msg = 'Unable to establish connection: %s' % e + raise exceptions.ConnectionError(msg) + self._logger.debug( + "Failed attempt(%s of %s), retrying in %s seconds" % + (attempts, self.retries, backoff)) + sleep(backoff) + backoff *= 2 def get(self, url, **kwargs): return self._cs_request(url, 'GET', **kwargs) @@ -248,124 +214,192 @@ class TroveHTTPClient(httplib2.Http): def delete(self, url, **kwargs): return self._cs_request(url, 'DELETE', **kwargs) - def authenticate(self): - """Auths the client and gets a token. May optionally set a service url. - - The client will get auth errors until the authentication step - occurs. Additionally, if a service_url was not explicitly given in - the clients __init__ method, one will be obtained from the auth - service. - + def _extract_service_catalog(self, url, resp, body, extract_token=True): + """See what the auth service told us and process the response. + We may get redirected to another site, fail or actually get + back a service catalog with a token and our endpoints. """ - catalog = self.authenticator.authenticate() - if self.service_url: - possible_service_url = None + + if resp.status_code == 200: # content must always present + try: + self.auth_url = url + self.service_catalog = \ + service_catalog.ServiceCatalog(body) + + if extract_token: + self.auth_token = self.service_catalog.get_token() + + management_url = self.service_catalog.url_for( + attr='region', + filter_value=self.region_name, + endpoint_type=self.endpoint_type, + service_type=self.service_type, + service_name=self.service_name, + database_service_name=self.database_service_name) + self.management_url = management_url.rstrip('/') + return None + except exceptions.AmbiguousEndpoints: + print("Found more than one valid endpoint. Use a more " + "restrictive filter") + raise + except KeyError: + raise exceptions.AuthorizationFailure() + except exceptions.EndpointNotFound: + print("Could not find any suitable endpoint. Correct region?") + raise + + elif resp.status_code == 305: + return resp['location'] else: - if self.endpoint_type == "publicURL": - possible_service_url = catalog.get_public_url() - elif self.endpoint_type == "adminURL": - possible_service_url = catalog.get_management_url() - self.authenticate_with_token(catalog.get_token(), possible_service_url) - - def authenticate_with_token(self, token, service_url=None): - self.auth_token = token - if not self.service_url: - if not service_url: - raise exceptions.ServiceUrlNotGiven() - else: - self.service_url = service_url - - -class Dbaas(object): - """ - Top-level object to access the Rackspace Database as a Service API. - - Create an instance with your creds:: - - >>> red = Dbaas(USERNAME, API_KEY, TENANT, AUTH_URL, SERVICE_NAME, \ - SERVICE_URL) - - Then call methods on its managers:: - - >>> red.instances.list() - ... - >>> red.flavors.list() - ... - - &c. - """ - - def __init__(self, username, api_key, tenant=None, auth_url=None, - service_type='database', service_name=None, - service_url=None, insecure=False, auth_strategy='keystone', - region_name=None, client_cls=TroveHTTPClient): - from troveclient.versions import Versions - from troveclient.databases import Databases - from troveclient.flavors import Flavors - from troveclient.instances import Instances - from troveclient.limits import Limits - from troveclient.users import Users - from troveclient.root import Root - from troveclient.hosts import Hosts - from troveclient.quota import Quotas - from troveclient.backups import Backups - from troveclient.security_groups import SecurityGroups - from troveclient.security_groups import SecurityGroupRules - from troveclient.storage import StorageInfo - from troveclient.management import Management - from troveclient.management import MgmtFlavors - from troveclient.accounts import Accounts - from troveclient.diagnostics import DiagnosticsInterrogator - from troveclient.diagnostics import HwInfoInterrogator - - self.client = client_cls(username, api_key, tenant, auth_url, - service_type=service_type, - service_name=service_name, - service_url=service_url, - insecure=insecure, - auth_strategy=auth_strategy, - region_name=region_name) - self.versions = Versions(self) - self.databases = Databases(self) - self.flavors = Flavors(self) - self.instances = Instances(self) - self.limits = Limits(self) - self.users = Users(self) - self.root = Root(self) - self.hosts = Hosts(self) - self.quota = Quotas(self) - self.backups = Backups(self) - self.security_groups = SecurityGroups(self) - self.security_group_rules = SecurityGroupRules(self) - self.storage = StorageInfo(self) - self.management = Management(self) - self.mgmt_flavor = MgmtFlavors(self) - self.accounts = Accounts(self) - self.diagnostics = DiagnosticsInterrogator(self) - self.hwinfo = HwInfoInterrogator(self) - - class Mgmt(object): - def __init__(self, dbaas): - self.instances = dbaas.management - self.hosts = dbaas.hosts - self.accounts = dbaas.accounts - self.storage = dbaas.storage - - self.mgmt = Mgmt(self) - - def set_management_url(self, url): - self.client.management_url = url - - def get_timings(self): - return self.client.get_timings() + raise exceptions.from_response(resp, body, url) - def authenticate(self): - """ - Authenticate against the server. + def _fetch_endpoints_from_auth(self, url): + """We have a token, but don't know the final endpoint for + the region. We have to go back to the auth service and + ask again. This request requires an admin-level token + to work. The proxy token supplied could be from a low-level enduser. - This is called to perform an authentication to retrieve a token. + We can't get this from the keystone service endpoint, we have to use + the admin endpoint. - Returns on success; raises :exc:`exceptions.Unauthorized` if the - credentials are wrong. + This will overwrite our admin token with the user token. """ - self.client.authenticate() + + # GET ...:5001/v2.0/tokens/#####/endpoints + url = '/'.join([url, 'tokens', '%s?belongsTo=%s' + % (self.proxy_token, self.proxy_tenant_id)]) + self._logger.debug("Using Endpoint URL: %s" % url) + resp, body = self.request(url, "GET", + headers={'X-Auth-Token': self.auth_token}) + return self._extract_service_catalog(url, resp, body, + extract_token=False) + + def authenticate(self): + magic_tuple = urlparse.urlsplit(self.auth_url) + scheme, netloc, path, query, frag = magic_tuple + port = magic_tuple.port + if port is None: + port = 80 + path_parts = path.split('/') + for part in path_parts: + if len(part) > 0 and part[0] == 'v': + self.version = part + break + + # TODO(sandy): Assume admin endpoint is 35357 for now. + # Ideally this is going to have to be provided by the service catalog. + new_netloc = netloc.replace(':%d' % port, ':%d' % (35357,)) + admin_url = urlparse.urlunsplit((scheme, new_netloc, + path, query, frag)) + + auth_url = self.auth_url + if self.version == "v2.0": + while auth_url: + if "TROVE_RAX_AUTH" in os.environ: + auth_url = self._rax_auth(auth_url) + else: + auth_url = self._v2_auth(auth_url) + + # Are we acting on behalf of another user via an + # existing token? If so, our actual endpoints may + # be different than that of the admin token. + if self.proxy_token: + self._fetch_endpoints_from_auth(admin_url) + # Since keystone no longer returns the user token + # with the endpoints any more, we need to replace + # our service account token with the user token. + self.auth_token = self.proxy_token + else: + try: + while auth_url: + auth_url = self._v1_auth(auth_url) + # In some configurations trove makes redirection to + # v2.0 keystone endpoint. Also, new location does not contain + # real endpoint, only hostname and port. + except exceptions.AuthorizationFailure: + if auth_url.find('v2.0') < 0: + auth_url = auth_url + '/v2.0' + self._v2_auth(auth_url) + + def _v1_auth(self, url): + if self.proxy_token: + raise exceptions.NoTokenLookupException() + + headers = {'X-Auth-User': self.user, + 'X-Auth-Key': self.password} + if self.projectid: + headers['X-Auth-Project-Id'] = self.projectid + + resp, body = self.request(url, 'GET', headers=headers) + if resp.status_code in (200, 204): # in some cases we get No Content + try: + mgmt_header = 'x-server-management-url' + self.management_url = resp.headers[mgmt_header].rstrip('/') + self.auth_token = resp.headers['x-auth-token'] + self.auth_url = url + except (KeyError, TypeError): + raise exceptions.AuthorizationFailure() + elif resp.status_code == 305: + return resp.headers['location'] + else: + raise exceptions.from_response(resp, body, url) + + def _v2_auth(self, url): + """Authenticate against a v2.0 auth service.""" + body = {"auth": { + "passwordCredentials": {"username": self.user, + "password": self.password}}} + + if self.projectid: + body['auth']['tenantName'] = self.projectid + elif self.tenant_id: + body['auth']['tenantId'] = self.tenant_id + + self._authenticate(url, body) + + def _rax_auth(self, url): + """Authenticate against the Rackspace auth service.""" + body = {"auth": { + "RAX-KSKEY:apiKeyCredentials": { + "username": self.user, + "apiKey": self.password, + "tenantName": self.projectid}}} + + self._authenticate(url, body) + + def _authenticate(self, url, body): + """Authenticate and extract the service catalog.""" + token_url = url + "/tokens" + + # Make sure we follow redirects when trying to reach Keystone + resp, body = self.request( + token_url, + "POST", + body=body, + allow_redirects=True) + + return self._extract_service_catalog(url, resp, body) + + def get_database_api_version_from_endpoint(self): + magic_tuple = urlparse.urlsplit(self.management_url) + scheme, netloc, path, query, frag = magic_tuple + v = path.split("/")[1] + valid_versions = ['v1.0'] + if v not in valid_versions: + msg = "Invalid client version '%s'. must be one of: %s" % ( + (v, ', '.join(valid_versions))) + raise exceptions.UnsupportedVersion(msg) + return v[1:] + + +def get_version_map(): + return { + '1.0': 'troveclient.v1.client.Client', + } + + +def Client(version, *args, **kwargs): + version_map = get_version_map() + client_class = client.BaseClient.get_class('database', + version, version_map) + return client_class(*args, **kwargs) |