diff options
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 14 |
2 files changed, 41 insertions, 18 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index bb0ac9733a..9e0eb13f5e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -114,10 +114,21 @@ init(Queue, IsDurable) -> memory_loss = undefined, prefetcher = undefined }}. size_of_message( - #basic_message { content = #content { payload_fragments_rev = Payload }}) -> - lists:foldl(fun (Frag, SumAcc) -> - SumAcc + size(Frag) - end, 0, Payload). + #basic_message { content = #content { payload_fragments_rev = Payload, + properties_bin = PropsBin }}) + when is_binary(PropsBin) -> + size(PropsBin) + lists:foldl(fun (Frag, SumAcc) -> + SumAcc + size(Frag) + end, 0, Payload). + +ensure_binary_properties(Msg = #basic_message { + content = Content = #content { + properties = Props, + properties_bin = none }}) -> + Msg #basic_message { content = Content #content { + properties_bin = rabbit_framing:encode_properties(Props) }}; +ensure_binary_properties(Msg) -> + Msg. set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; @@ -299,15 +310,16 @@ on_disk(mixed, _IsDurable, _IsPersistent) -> false. publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> + Msg1 = ensure_binary_properties(Msg), ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:publish(Q, Msg, false); + true -> rabbit_disk_queue:publish(Q, Msg1, false); false -> ok end, MsgBuf1 = case Mode of disk -> inc_queue_length(MsgBuf, 1); - mixed -> queue:in({Msg, false}, MsgBuf) + mixed -> queue:in({Msg1, false}, MsgBuf) end, - {ok, gain_memory(size_of_message(Msg), + {ok, gain_memory(size_of_message(Msg1), State #mqstate { msg_buf = MsgBuf1, length = Length + 1 })}. @@ -318,15 +330,17 @@ publish_delivered(Msg = #basic_message { guid = MsgId, State = #mqstate { is_durable = IsDurable, queue = Q, length = 0 }) when IsDurable andalso IsPersistent -> - ok = rabbit_disk_queue:publish(Q, Msg, true), - State1 = gain_memory(size_of_message(Msg), State), + Msg1 = ensure_binary_properties(Msg), + ok = rabbit_disk_queue:publish(Q, Msg1, true), + State1 = gain_memory(size_of_message(Msg1), State), %% must call phantom_fetch otherwise the msg remains at the head %% of the queue. This is synchronous, but unavoidable as we need %% the AckTag {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; publish_delivered(Msg, State = #mqstate { length = 0 }) -> - {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}. + Msg1 = ensure_binary_properties(Msg), + {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}. fetch(State = #mqstate { length = 0 }) -> {empty, State}; @@ -390,10 +404,11 @@ maybe_ack(Q, _, _, AckTag) -> remove_diskless(MsgsWithAcks) -> lists:foldl( fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> + Msg1 = ensure_binary_properties(Msg), {case AckTag of not_on_disk -> AccAckTags; _ -> [AckTag | AccAckTags] - end, size_of_message(Msg) + AccSize} + end, size_of_message(Msg1) + AccSize} end, {[], 0}, MsgsWithAcks). ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> @@ -406,11 +421,12 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { mode = Mode, is_durable = IsDurable }) -> + Msg1 = ensure_binary_properties(Msg), ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:tx_publish(Msg); + true -> rabbit_disk_queue:tx_publish(Msg1); false -> ok end, - {ok, gain_memory(size_of_message(Msg), State)}. + {ok, gain_memory(size_of_message(Msg1), State)}. tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, @@ -441,7 +457,8 @@ tx_rollback(Publishes, lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, {Acc, CSizeAcc}) -> - CSizeAcc1 = CSizeAcc + size_of_message(Msg), + Msg1 = ensure_binary_properties(Msg), + CSizeAcc1 = CSizeAcc + size_of_message(Msg1), {case on_disk(Mode, IsDurable, IsPersistent) of true -> [MsgId | Acc]; _ -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 884adbf870..44abdda415 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -856,6 +856,10 @@ rdq_match_message( MsgId, MsgBody, Size) when size(MsgBody) =:= Size -> ok. +rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}, + #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) -> + ok. + rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), @@ -1226,9 +1230,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - lists:foldl( fun (Msg, {Acc, MS7}) -> Rem = Len1 - (Msg #basic_message.guid) - 1, - {{Msg, false, AckTag, Rem}, MS7a} = + {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), - {[{Msg, AckTag} | Acc], MS7a} + ok = rdq_match_messages(Msg, Msg1), + {[{Msg1, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA ++ MsgsB), 0 = rabbit_mixed_queue:len(MS8), rabbit_mixed_queue:ack(AckTags, MS8); @@ -1239,9 +1244,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - lists:foldl( fun (Msg, {Acc, MS7}) -> Rem = Len0 - (Msg #basic_message.guid) - 1, - {{Msg, false, AckTag, Rem}, MS7a} = + {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), - {[{Msg, AckTag} | Acc], MS7a} + ok = rdq_match_messages(Msg, Msg1), + {[{Msg1, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA), 0 = rabbit_mixed_queue:len(MS8), rabbit_mixed_queue:ack(AckTags, MS8) |
