summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mixed_queue.erl45
-rw-r--r--src/rabbit_tests.erl14
2 files changed, 41 insertions, 18 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index bb0ac9733a..9e0eb13f5e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -114,10 +114,21 @@ init(Queue, IsDurable) ->
memory_loss = undefined, prefetcher = undefined }}.
size_of_message(
- #basic_message { content = #content { payload_fragments_rev = Payload }}) ->
- lists:foldl(fun (Frag, SumAcc) ->
- SumAcc + size(Frag)
- end, 0, Payload).
+ #basic_message { content = #content { payload_fragments_rev = Payload,
+ properties_bin = PropsBin }})
+ when is_binary(PropsBin) ->
+ size(PropsBin) + lists:foldl(fun (Frag, SumAcc) ->
+ SumAcc + size(Frag)
+ end, 0, Payload).
+
+ensure_binary_properties(Msg = #basic_message {
+ content = Content = #content {
+ properties = Props,
+ properties_bin = none }}) ->
+ Msg #basic_message { content = Content #content {
+ properties_bin = rabbit_framing:encode_properties(Props) }};
+ensure_binary_properties(Msg) ->
+ Msg.
set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
{ok, State};
@@ -299,15 +310,16 @@ on_disk(mixed, _IsDurable, _IsPersistent) -> false.
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
+ Msg1 = ensure_binary_properties(Msg),
ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:publish(Q, Msg, false);
+ true -> rabbit_disk_queue:publish(Q, Msg1, false);
false -> ok
end,
MsgBuf1 = case Mode of
disk -> inc_queue_length(MsgBuf, 1);
- mixed -> queue:in({Msg, false}, MsgBuf)
+ mixed -> queue:in({Msg1, false}, MsgBuf)
end,
- {ok, gain_memory(size_of_message(Msg),
+ {ok, gain_memory(size_of_message(Msg1),
State #mqstate { msg_buf = MsgBuf1,
length = Length + 1 })}.
@@ -318,15 +330,17 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
State = #mqstate { is_durable = IsDurable, queue = Q,
length = 0 })
when IsDurable andalso IsPersistent ->
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- State1 = gain_memory(size_of_message(Msg), State),
+ Msg1 = ensure_binary_properties(Msg),
+ ok = rabbit_disk_queue:publish(Q, Msg1, true),
+ State1 = gain_memory(size_of_message(Msg1), State),
%% must call phantom_fetch otherwise the msg remains at the head
%% of the queue. This is synchronous, but unavoidable as we need
%% the AckTag
{MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
publish_delivered(Msg, State = #mqstate { length = 0 }) ->
- {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}.
+ Msg1 = ensure_binary_properties(Msg),
+ {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}.
fetch(State = #mqstate { length = 0 }) ->
{empty, State};
@@ -390,10 +404,11 @@ maybe_ack(Q, _, _, AckTag) ->
remove_diskless(MsgsWithAcks) ->
lists:foldl(
fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ Msg1 = ensure_binary_properties(Msg),
{case AckTag of
not_on_disk -> AccAckTags;
_ -> [AckTag | AccAckTags]
- end, size_of_message(Msg) + AccSize}
+ end, size_of_message(Msg1) + AccSize}
end, {[], 0}, MsgsWithAcks).
ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
@@ -406,11 +421,12 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
+ Msg1 = ensure_binary_properties(Msg),
ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:tx_publish(Msg);
+ true -> rabbit_disk_queue:tx_publish(Msg1);
false -> ok
end,
- {ok, gain_memory(size_of_message(Msg), State)}.
+ {ok, gain_memory(size_of_message(Msg1), State)}.
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
@@ -441,7 +457,8 @@ tx_rollback(Publishes,
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }, {Acc, CSizeAcc}) ->
- CSizeAcc1 = CSizeAcc + size_of_message(Msg),
+ Msg1 = ensure_binary_properties(Msg),
+ CSizeAcc1 = CSizeAcc + size_of_message(Msg1),
{case on_disk(Mode, IsDurable, IsPersistent) of
true -> [MsgId | Acc];
_ -> Acc
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 884adbf870..44abdda415 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -856,6 +856,10 @@ rdq_match_message(
MsgId, MsgBody, Size) when size(MsgBody) =:= Size ->
ok.
+rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }},
+ #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) ->
+ ok.
+
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
@@ -1226,9 +1230,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
lists:foldl(
fun (Msg, {Acc, MS7}) ->
Rem = Len1 - (Msg #basic_message.guid) - 1,
- {{Msg, false, AckTag, Rem}, MS7a} =
+ {{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
- {[{Msg, AckTag} | Acc], MS7a}
+ ok = rdq_match_messages(Msg, Msg1),
+ {[{Msg1, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA ++ MsgsB),
0 = rabbit_mixed_queue:len(MS8),
rabbit_mixed_queue:ack(AckTags, MS8);
@@ -1239,9 +1244,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
lists:foldl(
fun (Msg, {Acc, MS7}) ->
Rem = Len0 - (Msg #basic_message.guid) - 1,
- {{Msg, false, AckTag, Rem}, MS7a} =
+ {{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
- {[{Msg, AckTag} | Acc], MS7a}
+ ok = rdq_match_messages(Msg, Msg1),
+ {[{Msg1, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA),
0 = rabbit_mixed_queue:len(MS8),
rabbit_mixed_queue:ack(AckTags, MS8)