diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-09 18:37:09 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-09 18:37:09 +0100 |
| commit | 5fd2883bb3e196b1682e399d08ba31753c980056 (patch) | |
| tree | 80c2356eedf85082e709c6b600e7944c94914446 /src | |
| parent | e8983d647d5fc4d817a2e108679fb6cfee35b03d (diff) | |
| parent | fa0aa0302e26fd45294c52e00c5d773828ed8fc1 (diff) | |
| download | rabbitmq-server-git-5fd2883bb3e196b1682e399d08ba31753c980056.tar.gz | |
Merging default into bug 21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_basic.erl | 42 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 |
3 files changed, 40 insertions, 29 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8241e93b7b..8c4ba89729 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,9 +33,10 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, message/5, message/6, delivery/4]). +-export([publish/1, message/4, delivery/4]). -export([properties/1, publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,11 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). --spec(message/5 :: (exchange_name(), routing_key(), properties_input(), - binary(), guid()) -> message()). --spec(message/6 :: (exchange_name(), routing_key(), properties_input(), - binary(), guid(), boolean()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -61,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -96,18 +95,18 @@ from_content(Content) -> {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> - message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, rabbit_guid:guid()). - -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId) -> - message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, false). - -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin, MsgId, IsPersistent) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - guid = MsgId, - is_persistent = IsPersistent}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -141,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 51eb93c5cf..9aeb4623f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1011,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d374561f83..97590e663c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -include("rabbit_variable_queue.hrl"). -include_lib("kernel/include/file.hrl"). @@ -1340,8 +1341,11 @@ variable_queue_publish(IsPersistent, Count, VQ) -> rabbit_variable_queue:publish( rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, [], <<>>, rabbit_guid:guid(), - IsPersistent), VQN) + <<>>, #'P_basic'{delivery_mode = + case IsPersistent of + true -> 2; + false -> 1 + end}, <<>>), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> |
