summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-09 18:37:09 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-09 18:37:09 +0100
commit5fd2883bb3e196b1682e399d08ba31753c980056 (patch)
tree80c2356eedf85082e709c6b600e7944c94914446 /src
parente8983d647d5fc4d817a2e108679fb6cfee35b03d (diff)
parentfa0aa0302e26fd45294c52e00c5d773828ed8fc1 (diff)
downloadrabbitmq-server-git-5fd2883bb3e196b1682e399d08ba31753c980056.tar.gz
Merging default into bug 21673
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl42
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_tests.erl8
3 files changed, 40 insertions, 29 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 8241e93b7b..8c4ba89729 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,9 +33,10 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, message/5, message/6, delivery/4]).
+-export([publish/1, message/4, delivery/4]).
-export([properties/1, publish/4, publish/7]).
-export([build_content/2, from_content/1]).
+-export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -48,11 +49,7 @@
-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> message()).
--spec(message/5 :: (exchange_name(), routing_key(), properties_input(),
- binary(), guid()) -> message()).
--spec(message/6 :: (exchange_name(), routing_key(), properties_input(),
- binary(), guid(), boolean()) -> message()).
+ binary()) -> (message() | {'error', any()})).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
@@ -61,6 +58,8 @@
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-spec(is_message_persistent/1 ::
+ (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
@@ -96,18 +95,18 @@ from_content(Content) ->
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
- message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()).
-
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) ->
- message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false).
-
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) ->
Properties = properties(RawProperties),
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = build_content(Properties, BodyBin),
- guid = MsgId,
- is_persistent = IsPersistent}.
+ Content = build_content(Properties, BodyBin),
+ case is_message_persistent(Content) of
+ {invalid, Other} ->
+ {error, {invalid_delivery_mode, Other}};
+ IsPersistent when is_boolean(IsPersistent) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ guid = rabbit_guid:guid(),
+ is_persistent = IsPersistent}
+ end.
properties(P = #'P_basic'{}) ->
P;
@@ -141,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
properties(Properties), BodyBin))).
+
+is_message_persistent(#content{properties = #'P_basic'{
+ delivery_mode = Mode}}) ->
+ case Mode of
+ 1 -> false;
+ 2 -> true;
+ undefined -> false;
+ Other -> {invalid, Other}
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 51eb93c5cf..9aeb4623f1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1011,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(#content{properties = #'P_basic'{
- delivery_mode = Mode}}) ->
- case Mode of
- 1 -> false;
- 2 -> true;
- undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false
+is_message_persistent(Content) ->
+ case rabbit_basic:is_message_persistent(Content) of
+ {invalid, Other} ->
+ rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
+ [Other]),
+ false;
+ IsPersistent when is_boolean(IsPersistent) ->
+ IsPersistent
end.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d374561f83..97590e663c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include("rabbit_variable_queue.hrl").
-include_lib("kernel/include/file.hrl").
@@ -1340,8 +1341,11 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
rabbit_variable_queue:publish(
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, [], <<>>, rabbit_guid:guid(),
- IsPersistent), VQN)
+ <<>>, #'P_basic'{delivery_mode =
+ case IsPersistent of
+ true -> 2;
+ false -> 1
+ end}, <<>>), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->