summaryrefslogtreecommitdiff
path: root/docker/api/image.py
diff options
context:
space:
mode:
authorJoffrey F <joffrey@docker.com>2015-09-10 16:01:14 -0700
committerJoffrey F <joffrey@docker.com>2015-09-11 16:19:29 -0700
commitc735cc69a02f6a34417a24d37a7934bc235bc99e (patch)
treea74f3459647457dd06f7aa52d74662199da9ea65 /docker/api/image.py
parentde5aab87b419ce24c7e3e84225e9a06b9e880300 (diff)
downloaddocker-py-c735cc69a02f6a34417a24d37a7934bc235bc99e.tar.gz
Split different APIs into several mixins for clarity / easier maintenance
Signed-off-by: Joffrey F <joffrey@docker.com>
Diffstat (limited to 'docker/api/image.py')
-rw-r--r--docker/api/image.py271
1 files changed, 271 insertions, 0 deletions
diff --git a/docker/api/image.py b/docker/api/image.py
new file mode 100644
index 0000000..c6939ef
--- /dev/null
+++ b/docker/api/image.py
@@ -0,0 +1,271 @@
+import logging
+import six
+import warnings
+
+from ..auth import auth
+from ..constants import INSECURE_REGISTRY_DEPRECATION_WARNING
+from ..utils import utils, check_resource
+from .. import errors
+
+log = logging.getLogger(__name__)
+
+
+class ImageApiMixin(object):
+
+ @check_resource
+ def get_image(self, image):
+ res = self._get(self._url("/images/{0}/get", image), stream=True)
+ self._raise_for_status(res)
+ return res.raw
+
+ @check_resource
+ def history(self, image):
+ res = self._get(self._url("/images/{0}/history", image))
+ return self._result(res, True)
+
+ def images(self, name=None, quiet=False, all=False, viz=False,
+ filters=None):
+ if viz:
+ if utils.compare_version('1.7', self._version) >= 0:
+ raise Exception('Viz output is not supported in API >= 1.7!')
+ return self._result(self._get(self._url("images/viz")))
+ params = {
+ 'filter': name,
+ 'only_ids': 1 if quiet else 0,
+ 'all': 1 if all else 0,
+ }
+ if filters:
+ params['filters'] = utils.convert_filters(filters)
+ res = self._result(self._get(self._url("/images/json"), params=params),
+ True)
+ if quiet:
+ return [x['Id'] for x in res]
+ return res
+
+ def import_image(self, src=None, repository=None, tag=None, image=None):
+ if src:
+ if isinstance(src, six.string_types):
+ try:
+ result = self.import_image_from_file(
+ src, repository=repository, tag=tag)
+ except IOError:
+ result = self.import_image_from_url(
+ src, repository=repository, tag=tag)
+ else:
+ result = self.import_image_from_data(
+ src, repository=repository, tag=tag)
+ elif image:
+ result = self.import_image_from_image(
+ image, repository=repository, tag=tag)
+ else:
+ raise Exception("Must specify a src or image")
+
+ return result
+
+ def import_image_from_data(self, data, repository=None, tag=None):
+ u = self._url("/images/create")
+ params = {
+ 'fromSrc': '-',
+ 'repo': repository,
+ 'tag': tag
+ }
+ headers = {
+ 'Content-Type': 'application/tar',
+ }
+ return self._result(
+ self._post(u, data=data, params=params, headers=headers))
+
+ def import_image_from_file(self, filename, repository=None, tag=None):
+ u = self._url("/images/create")
+ params = {
+ 'fromSrc': '-',
+ 'repo': repository,
+ 'tag': tag
+ }
+ headers = {
+ 'Content-Type': 'application/tar',
+ }
+ with open(filename, 'rb') as f:
+ return self._result(
+ self._post(u, data=f, params=params, headers=headers,
+ timeout=None))
+
+ def import_image_from_stream(self, stream, repository=None, tag=None):
+ u = self._url("/images/create")
+ params = {
+ 'fromSrc': '-',
+ 'repo': repository,
+ 'tag': tag
+ }
+ headers = {
+ 'Content-Type': 'application/tar',
+ 'Transfer-Encoding': 'chunked',
+ }
+ return self._result(
+ self._post(u, data=stream, params=params, headers=headers))
+
+ def import_image_from_url(self, url, repository=None, tag=None):
+ u = self._url("/images/create")
+ params = {
+ 'fromSrc': url,
+ 'repo': repository,
+ 'tag': tag
+ }
+ return self._result(
+ self._post(u, data=None, params=params))
+
+ def import_image_from_image(self, image, repository=None, tag=None):
+ u = self._url("/images/create")
+ params = {
+ 'fromImage': image,
+ 'repo': repository,
+ 'tag': tag
+ }
+ return self._result(
+ self._post(u, data=None, params=params))
+
+ @check_resource
+ def insert(self, image, url, path):
+ if utils.compare_version('1.12', self._version) >= 0:
+ raise errors.DeprecatedMethod(
+ 'insert is not available for API version >=1.12'
+ )
+ api_url = self._url("/images/{0}/insert", image)
+ params = {
+ 'url': url,
+ 'path': path
+ }
+ return self._result(self._post(api_url, params=params))
+
+ @check_resource
+ def inspect_image(self, image):
+ return self._result(
+ self._get(self._url("/images/{0}/json", image)), True
+ )
+
+ def load_image(self, data):
+ res = self._post(self._url("/images/load"), data=data)
+ self._raise_for_status(res)
+
+ def pull(self, repository, tag=None, stream=False,
+ insecure_registry=False, auth_config=None):
+ if insecure_registry:
+ warnings.warn(
+ INSECURE_REGISTRY_DEPRECATION_WARNING.format('pull()'),
+ DeprecationWarning
+ )
+
+ if not tag:
+ repository, tag = utils.parse_repository_tag(repository)
+ registry, repo_name = auth.resolve_repository_name(repository)
+ if repo_name.count(":") == 1:
+ repository, tag = repository.rsplit(":", 1)
+
+ params = {
+ 'tag': tag,
+ 'fromImage': repository
+ }
+ headers = {}
+
+ if utils.compare_version('1.5', self._version) >= 0:
+ # If we don't have any auth data so far, try reloading the config
+ # file one more time in case anything showed up in there.
+ if auth_config is None:
+ log.debug('Looking for auth config')
+ if not self._auth_configs:
+ log.debug(
+ "No auth config in memory - loading from filesystem")
+ self._auth_configs = auth.load_config()
+ authcfg = auth.resolve_authconfig(self._auth_configs, registry)
+ # Do not fail here if no authentication exists for this
+ # specific registry as we can have a readonly pull. Just
+ # put the header if we can.
+ if authcfg:
+ log.debug('Found auth config')
+ # auth_config needs to be a dict in the format used by
+ # auth.py username , password, serveraddress, email
+ headers['X-Registry-Auth'] = auth.encode_header(
+ authcfg
+ )
+ else:
+ log.debug('No auth config found')
+ else:
+ log.debug('Sending supplied auth config')
+ headers['X-Registry-Auth'] = auth.encode_header(auth_config)
+
+ response = self._post(
+ self._url('/images/create'), params=params, headers=headers,
+ stream=stream, timeout=None
+ )
+
+ self._raise_for_status(response)
+
+ if stream:
+ return self._stream_helper(response)
+
+ return self._result(response)
+
+ def push(self, repository, tag=None, stream=False,
+ insecure_registry=False):
+ if insecure_registry:
+ warnings.warn(
+ INSECURE_REGISTRY_DEPRECATION_WARNING.format('push()'),
+ DeprecationWarning
+ )
+
+ if not tag:
+ repository, tag = utils.parse_repository_tag(repository)
+ registry, repo_name = auth.resolve_repository_name(repository)
+ u = self._url("/images/{0}/push", repository)
+ params = {
+ 'tag': tag
+ }
+ headers = {}
+
+ if utils.compare_version('1.5', self._version) >= 0:
+ # If we don't have any auth data so far, try reloading the config
+ # file one more time in case anything showed up in there.
+ if not self._auth_configs:
+ self._auth_configs = auth.load_config()
+ authcfg = auth.resolve_authconfig(self._auth_configs, registry)
+
+ # Do not fail here if no authentication exists for this specific
+ # registry as we can have a readonly pull. Just put the header if
+ # we can.
+ if authcfg:
+ headers['X-Registry-Auth'] = auth.encode_header(authcfg)
+
+ response = self._post_json(
+ u, None, headers=headers, stream=stream, params=params
+ )
+
+ self._raise_for_status(response)
+
+ if stream:
+ return self._stream_helper(response)
+
+ return self._result(response)
+
+ @check_resource
+ def remove_image(self, image, force=False, noprune=False):
+ params = {'force': force, 'noprune': noprune}
+ res = self._delete(self._url("/images/{0}", image), params=params)
+ self._raise_for_status(res)
+
+ def search(self, term):
+ return self._result(
+ self._get(self._url("/images/search"), params={'term': term}),
+ True
+ )
+
+ @check_resource
+ def tag(self, image, repository, tag=None, force=False):
+ params = {
+ 'tag': tag,
+ 'repo': repository,
+ 'force': 1 if force else 0
+ }
+ url = self._url("/images/{0}/tag", image)
+ res = self._post(url, params=params)
+ self._raise_for_status(res)
+ return res.status_code == 201