summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_basic.erl30
-rw-r--r--src/rabbit_channel.erl19
2 files changed, 33 insertions, 16 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 7595d53b60..4ab7a2a0b1 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -36,6 +36,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -48,7 +49,7 @@
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> message()).
+ binary()) -> (message() | {'error', any()})).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -57,6 +58,8 @@
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-spec(is_message_persistent/1 ::
+ (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
@@ -93,11 +96,17 @@ from_content(Content) ->
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = build_content(Properties, BodyBin),
- guid = rabbit_guid:guid(),
- is_persistent = false}.
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
properties(P = #'P_basic'{}) ->
P;
@@ -131,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 51eb93c5cf..9aeb4623f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1011,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+is_message_persistent(Content) ->
+ case rabbit_basic:is_message_persistent(Content) of
+ {invalid, Other} ->
+ rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false;
+ IsPersistent when is_boolean(IsPersistent) ->
+ IsPersistent
end.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->