summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_aws/src/rabbitmq_aws.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_aws/src/rabbitmq_aws.erl')
-rw-r--r--deps/rabbitmq_aws/src/rabbitmq_aws.erl472
1 files changed, 472 insertions, 0 deletions
diff --git a/deps/rabbitmq_aws/src/rabbitmq_aws.erl b/deps/rabbitmq_aws/src/rabbitmq_aws.erl
new file mode 100644
index 0000000000..4a152f3b21
--- /dev/null
+++ b/deps/rabbitmq_aws/src/rabbitmq_aws.erl
@@ -0,0 +1,472 @@
+%% ====================================================================
+%% @author Gavin M. Roy <gavinmroy@gmail.com>
+%% @copyright 2016, Gavin M. Roy
+%% @doc rabbitmq_aws client library
+%% @end
+%% ====================================================================
+-module(rabbitmq_aws).
+
+-behavior(gen_server).
+
+%% API exports
+-export([get/2, get/3,
+ post/4,
+ refresh_credentials/0,
+ request/5, request/6, request/7,
+ set_credentials/2,
+ has_credentials/0,
+ set_region/1]).
+
+%% gen-server exports
+-export([start_link/0,
+ init/1,
+ terminate/2,
+ code_change/3,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2]).
+
+%% Export all for unit tests
+-ifdef(TEST).
+-compile(export_all).
+-endif.
+
+-include("rabbitmq_aws.hrl").
+
+%%====================================================================
+%% exported wrapper functions
+%%====================================================================
+
+-spec get(Service :: string(),
+ Path :: path()) -> result().
+%% @doc Perform a HTTP GET request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+get(Service, Path) ->
+ get(Service, Path, []).
+
+
+-spec get(Service :: string(),
+ Path :: path(),
+ Headers :: headers()) -> result().
+%% @doc Perform a HTTP GET request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+get(Service, Path, Headers) ->
+ request(Service, get, Path, "", Headers).
+
+
+-spec post(Service :: string(),
+ Path :: path(),
+ Body :: body(),
+ Headers :: headers()) -> result().
+%% @doc Perform a HTTP Post request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+post(Service, Path, Body, Headers) ->
+ request(Service, post, Path, Body, Headers).
+
+
+-spec refresh_credentials() -> ok | error.
+%% @doc Manually refresh the credentials from the environment, filesystem or EC2
+%% Instance metadata service.
+%% @end
+refresh_credentials() ->
+ gen_server:call(rabbitmq_aws, refresh_credentials).
+
+
+-spec request(Service :: string(),
+ Method :: method(),
+ Path :: path(),
+ Body :: body(),
+ Headers :: headers()) -> result().
+%% @doc Perform a HTTP request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+request(Service, Method, Path, Body, Headers) ->
+ gen_server:call(rabbitmq_aws, {request, Service, Method, Headers, Path, Body, [], undefined}).
+
+
+-spec request(Service :: string(),
+ Method :: method(),
+ Path :: path(),
+ Body :: body(),
+ Headers :: headers(),
+ HTTPOptions :: http_options()) -> result().
+%% @doc Perform a HTTP request to the AWS API for the specified service. The
+%% response will automatically be decoded if it is either in JSON or XML
+%% format.
+%% @end
+request(Service, Method, Path, Body, Headers, HTTPOptions) ->
+ gen_server:call(rabbitmq_aws, {request, Service, Method, Headers, Path, Body, HTTPOptions, undefined}).
+
+
+-spec request(Service :: string(),
+ Method :: method(),
+ Path :: path(),
+ Body :: body(),
+ Headers :: headers(),
+ HTTPOptions :: http_options(),
+ Endpoint :: host()) -> result().
+%% @doc Perform a HTTP request to the AWS API for the specified service, overriding
+%% the endpoint URL to use when invoking the API. This is useful for local testing
+%% of services such as DynamoDB. The response will automatically be decoded
+%% if it is either in JSON or XML format.
+%% @end
+request(Service, Method, Path, Body, Headers, HTTPOptions, Endpoint) ->
+ gen_server:call(rabbitmq_aws, {request, Service, Method, Headers, Path, Body, HTTPOptions, Endpoint}).
+
+
+-spec set_credentials(access_key(), secret_access_key()) -> ok.
+%% @doc Manually set the access credentials for requests. This should
+%% be used in cases where the client application wants to control
+%% the credentials instead of automatically discovering them from
+%% configuration or the AWS Instance Metadata service.
+%% @end
+set_credentials(AccessKey, SecretAccessKey) ->
+ gen_server:call(rabbitmq_aws, {set_credentials, AccessKey, SecretAccessKey}).
+
+
+-spec set_region(Region :: string()) -> ok.
+%% @doc Manually set the AWS region to perform API requests to.
+%% @end
+set_region(Region) ->
+ gen_server:call(rabbitmq_aws, {set_region, Region}).
+
+
+%%====================================================================
+%% gen_server functions
+%%====================================================================
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+-spec init(list()) -> {ok, state()}.
+init([]) ->
+ {ok, #state{}}.
+
+
+terminate(_, _) ->
+ ok.
+
+
+code_change(_, _, State) ->
+ {ok, State}.
+
+
+handle_call(Msg, _From, #state{region = undefined}) ->
+ %% Delay initialisation until a RabbitMQ plugin require the AWS backend
+ {ok, Region} = rabbitmq_aws_config:region(),
+ {_, State} = load_credentials(#state{region = Region}),
+ handle_msg(Msg, State);
+handle_call(Msg, _From, State) ->
+ handle_msg(Msg, State).
+
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
+handle_msg({request, Service, Method, Headers, Path, Body, Options, Host}, State) ->
+ {Response, NewState} = perform_request(State, Service, Method, Headers, Path, Body, Options, Host),
+ {reply, Response, NewState};
+
+handle_msg(get_state, State) ->
+ {reply, {ok, State}, State};
+
+handle_msg(refresh_credentials, State) ->
+ {Reply, NewState} = load_credentials(State),
+ {reply, Reply, NewState};
+
+handle_msg({set_credentials, AccessKey, SecretAccessKey}, State) ->
+ {reply, ok, State#state{access_key = AccessKey,
+ secret_access_key = SecretAccessKey,
+ security_token = undefined,
+ expiration = undefined,
+ error = undefined}};
+
+handle_msg({set_region, Region}, State) ->
+ {reply, ok, State#state{region = Region}};
+
+handle_msg(has_credentials, State) ->
+ {reply, has_credentials(State), State};
+
+handle_msg(_Request, State) ->
+ {noreply, State}.
+
+-spec endpoint(State :: state(), Host :: string(),
+ Service :: string(), Path :: string()) -> string().
+%% @doc Return the endpoint URL, either by constructing it with the service
+%% information passed in or by using the passed in Host value.
+%% @ednd
+endpoint(#state{region = Region}, undefined, Service, Path) ->
+ lists:flatten(["https://", endpoint_host(Region, Service), Path]);
+endpoint(_, Host, _, Path) ->
+ lists:flatten(["https://", Host, Path]).
+
+
+-spec endpoint_host(Region :: region(), Service :: string()) -> host().
+%% @doc Construct the endpoint hostname for the request based upon the service
+%% and region.
+%% @end
+endpoint_host(Region, Service) ->
+ lists:flatten(string:join([Service, Region, endpoint_tld(Region)], ".")).
+
+
+-spec endpoint_tld(Region :: region()) -> host().
+%% @doc Construct the endpoint hostname TLD for the request based upon the region.
+%% See https://docs.aws.amazon.com/general/latest/gr/rande.html#ec2_region for details.
+%% @end
+endpoint_tld("cn-north-1") ->
+ "amazonaws.com.cn";
+endpoint_tld("cn-northwest-1") ->
+ "amazonaws.com.cn";
+endpoint_tld(_Other) ->
+ "amazonaws.com".
+
+-spec format_response(Response :: httpc_result()) -> result().
+%% @doc Format the httpc response result, returning the request result data
+%% structure. The response body will attempt to be decoded by invoking the
+%% maybe_decode_body/2 method.
+%% @end
+format_response({ok, {{_Version, 200, _Message}, Headers, Body}}) ->
+ {ok, {Headers, maybe_decode_body(get_content_type(Headers), Body)}};
+format_response({ok, {{_Version, StatusCode, Message}, Headers, Body}}) when StatusCode >= 400 ->
+ {error, Message, {Headers, maybe_decode_body(get_content_type(Headers), Body)}};
+format_response({error, Reason}) ->
+ {error, Reason, undefined}.
+
+-spec get_content_type(Headers :: headers()) -> {Type :: string(), Subtype :: string()}.
+%% @doc Fetch the content type from the headers and return it as a tuple of
+%% {Type, Subtype}.
+%% @end
+get_content_type(Headers) ->
+ Value = case proplists:get_value("content-type", Headers, undefined) of
+ undefined ->
+ proplists:get_value("Content-Type", Headers, "text/xml");
+ Other -> Other
+ end,
+ parse_content_type(Value).
+
+-spec has_credentials() -> true | false.
+has_credentials() ->
+ gen_server:call(rabbitmq_aws, has_credentials).
+
+-spec has_credentials(state()) -> true | false.
+%% @doc check to see if there are credentials made available in the current state
+%% returning false if not or if they have expired.
+%% @end
+has_credentials(#state{error = Error}) when Error /= undefined -> false;
+has_credentials(#state{access_key = Key}) when Key /= undefined -> true;
+has_credentials(_) -> false.
+
+
+-spec expired_credentials(Expiration :: calendar:datetime()) -> true | false.
+%% @doc Indicates if the date that is passed in has expired.
+%% end
+expired_credentials(undefined) -> false;
+expired_credentials(Expiration) ->
+ Now = calendar:datetime_to_gregorian_seconds(local_time()),
+ Expires = calendar:datetime_to_gregorian_seconds(Expiration),
+ Now >= Expires.
+
+
+-spec load_credentials(State :: state()) -> {ok, state()} | {error, state()}.
+%% @doc Load the credentials using the following order of configuration precedence:
+%% - Environment variables
+%% - Credentials file
+%% - EC2 Instance Metadata Service
+%% @end
+load_credentials(#state{region = Region}) ->
+ case rabbitmq_aws_config:credentials() of
+ {ok, AccessKey, SecretAccessKey, Expiration, SecurityToken} ->
+ {ok, #state{region = Region,
+ error = undefined,
+ access_key = AccessKey,
+ secret_access_key = SecretAccessKey,
+ expiration = Expiration,
+ security_token = SecurityToken}};
+ {error, Reason} ->
+ error_logger:error_msg("Could not load AWS credentials from environment variables, AWS_CONFIG_FILE, AWS_SHARED_CREDENTIALS_FILE or EC2 metadata endpoint: ~p. Will depend on config settings to be set.~n.", [Reason]),
+ {error, #state{region = Region,
+ error = Reason,
+ access_key = undefined,
+ secret_access_key = undefined,
+ expiration = undefined,
+ security_token = undefined}}
+ end.
+
+
+-spec local_time() -> calendar:datetime().
+%% @doc Return the current local time.
+%% @end
+local_time() ->
+ [Value] = calendar:local_time_to_universal_time_dst(calendar:local_time()),
+ Value.
+
+
+-spec maybe_decode_body(ContentType :: {nonempty_string(), nonempty_string()}, Body :: body()) -> list() | body().
+%% @doc Attempt to decode the response body based upon the mime type that is
+%% presented.
+%% @end.
+maybe_decode_body({"application", "x-amz-json-1.0"}, Body) ->
+ rabbitmq_aws_json:decode(Body);
+maybe_decode_body({"application", "json"}, Body) ->
+ rabbitmq_aws_json:decode(Body);
+maybe_decode_body({_, "xml"}, Body) ->
+ rabbitmq_aws_xml:parse(Body);
+maybe_decode_body(_ContentType, Body) ->
+ Body.
+
+
+-spec parse_content_type(ContentType :: string()) -> {Type :: string(), Subtype :: string()}.
+%% @doc parse a content type string returning a tuple of type/subtype
+%% @end
+parse_content_type(ContentType) ->
+ Parts = string:tokens(ContentType, ";"),
+ [Type, Subtype] = string:tokens(lists:nth(1, Parts), "/"),
+ {Type, Subtype}.
+
+
+-spec perform_request(State :: state(), Service :: string(), Method :: method(),
+ Headers :: headers(), Path :: path(), Body :: body(),
+ Options :: http_options(), Host :: string() | undefined)
+ -> {Result :: result(), NewState :: state()}.
+%% @doc Make the API request and return the formatted response.
+%% @end
+perform_request(State, Service, Method, Headers, Path, Body, Options, Host) ->
+ perform_request_has_creds(has_credentials(State), State, Service, Method,
+ Headers, Path, Body, Options, Host).
+
+
+-spec perform_request_has_creds(true | false, State :: state(),
+ Service :: string(), Method :: method(),
+ Headers :: headers(), Path :: path(), Body :: body(),
+ Options :: http_options(), Host :: string() | undefined)
+ -> {Result :: result(), NewState :: state()}.
+%% @doc Invoked after checking to see if there are credentials. If there are,
+%% validate they have not or will not expire, performing the request if not,
+%% otherwise return an error result.
+%% @end
+perform_request_has_creds(true, State, Service, Method, Headers, Path, Body, Options, Host) ->
+ perform_request_creds_expired(expired_credentials(State#state.expiration), State,
+ Service, Method, Headers, Path, Body, Options, Host);
+perform_request_has_creds(false, State, _, _, _, _, _, _, _) ->
+ perform_request_creds_error(State).
+
+
+-spec perform_request_creds_expired(true | false, State :: state(),
+ Service :: string(), Method :: method(),
+ Headers :: headers(), Path :: path(), Body :: body(),
+ Options :: http_options(), Host :: string() | undefined)
+ -> {Result :: result(), NewState :: state()}.
+%% @doc Invoked after checking to see if the current credentials have expired.
+%% If they haven't, perform the request, otherwise try and refresh the
+%% credentials before performing the request.
+%% @end
+perform_request_creds_expired(false, State, Service, Method, Headers, Path, Body, Options, Host) ->
+ perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host);
+perform_request_creds_expired(true, State, Service, Method, Headers, Path, Body, Options, Host) ->
+ perform_request_creds_refreshed(load_credentials(State), Service, Method, Headers, Path, Body, Options, Host).
+
+
+-spec perform_request_creds_refreshed({ok, State :: state()} | {error, State :: state()},
+ Service :: string(), Method :: method(),
+ Headers :: headers(), Path :: path(), Body :: body(),
+ Options :: http_options(), Host :: string() | undefined)
+ -> {Result :: result(), NewState :: state()}.
+%% @doc If it's been determined that there are credentials but they have expired,
+%% check to see if the credentials could be loaded and either make the request
+%% or return an error.
+%% @end
+perform_request_creds_refreshed({ok, State}, Service, Method, Headers, Path, Body, Options, Host) ->
+ perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host);
+perform_request_creds_refreshed({error, State}, _, _, _, _, _, _, _) ->
+ perform_request_creds_error(State).
+
+
+-spec perform_request_with_creds(State :: state(), Service :: string(), Method :: method(),
+ Headers :: headers(), Path :: path(), Body :: body(),
+ Options :: http_options(), Host :: string() | undefined)
+ -> {Result :: result(), NewState :: state()}.
+%% @doc Once it is validated that there are credentials to try and that they have not
+%% expired, perform the request and return the response.
+%% @end
+perform_request_with_creds(State, Service, Method, Headers, Path, Body, Options, Host) ->
+ URI = endpoint(State, Host, Service, Path),
+ SignedHeaders = sign_headers(State, Service, Method, URI, Headers, Body),
+ ContentType = proplists:get_value("content-type", SignedHeaders, undefined),
+ perform_request_with_creds(State, Method, URI, SignedHeaders, ContentType, Body, Options).
+
+
+-spec perform_request_with_creds(State :: state(), Method :: method(), URI :: string(),
+ Headers :: headers(), ContentType :: string() | undefined,
+ Body :: body(), Options :: http_options())
+ -> {Result :: result(), NewState :: state()}.
+%% @doc Once it is validated that there are credentials to try and that they have not
+%% expired, perform the request and return the response.
+%% @end
+perform_request_with_creds(State, Method, URI, Headers, undefined, "", Options0) ->
+ Options1 = ensure_timeout(Options0),
+ Response = httpc:request(Method, {URI, Headers}, Options1, []),
+ {format_response(Response), State};
+perform_request_with_creds(State, Method, URI, Headers, ContentType, Body, Options0) ->
+ Options1 = ensure_timeout(Options0),
+ Response = httpc:request(Method, {URI, Headers, ContentType, Body}, Options1, []),
+ {format_response(Response), State}.
+
+
+-spec perform_request_creds_error(State :: state()) ->
+ {result_error(), NewState :: state()}.
+%% @doc Return the error response when there are not any credentials to use with
+%% the request.
+%% @end
+perform_request_creds_error(State) ->
+ {{error, {credentials, State#state.error}}, State}.
+
+
+%% @doc Ensure that the timeout option is set and greater than 0 and less
+%% than about 1/2 of the default gen_server:call timeout. This gives
+%% enough time for a long connect and request phase to succeed.
+%% @end
+-spec ensure_timeout(Options :: http_options()) -> http_options().
+ensure_timeout(Options) ->
+ case proplists:get_value(timeout, Options) of
+ undefined ->
+ Options ++ [{timeout, ?DEFAULT_HTTP_TIMEOUT}];
+ Value when is_integer(Value) andalso Value >= 0 andalso Value =< ?DEFAULT_HTTP_TIMEOUT ->
+ Options;
+ _ ->
+ Options1 = proplists:delete(timeout, Options),
+ Options1 ++ [{timeout, ?DEFAULT_HTTP_TIMEOUT}]
+ end.
+
+
+-spec sign_headers(State :: state(), Service :: string(), Method :: method(),
+ URI :: string(), Headers :: headers(), Body :: body()) -> headers().
+%% @doc Build the signed headers for the API request.
+%% @end
+sign_headers(#state{access_key = AccessKey,
+ secret_access_key = SecretKey,
+ security_token = SecurityToken,
+ region = Region}, Service, Method, URI, Headers, Body) ->
+ rabbitmq_aws_sign:headers(#request{access_key = AccessKey,
+ secret_access_key = SecretKey,
+ security_token = SecurityToken,
+ region = Region,
+ service = Service,
+ method = Method,
+ uri = URI,
+ headers = Headers,
+ body = Body}).