diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-26 15:01:58 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-26 15:01:58 +0100 |
| commit | 2bd0541ad102ec76334dbc7cdf1d414e182b363c (patch) | |
| tree | be077da6f9b34b78a1196bfcedb4cd22a7b1ddac | |
| parent | 610e4a3ec1dc35d00ca0f804665fcf2722715cdf (diff) | |
| download | rabbitmq-server-git-2bd0541ad102ec76334dbc7cdf1d414e182b363c.tar.gz | |
made sure that messages that go through the MQ have binary properties added. This allows us to measure their size. The effect of this is that all messages that come out of the MQ have binary properties, even if they went in without. This might be controversial. The only reason for doing this is that otherwise, we'd have to convert-to-measure twice, once on the way in, and once on the way out. If people feel strongly about this, please yell.
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 14 |
2 files changed, 41 insertions, 18 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index bb0ac9733a..9e0eb13f5e 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -114,10 +114,21 @@ init(Queue, IsDurable) -> memory_loss = undefined, prefetcher = undefined }}. size_of_message( - #basic_message { content = #content { payload_fragments_rev = Payload }}) -> - lists:foldl(fun (Frag, SumAcc) -> - SumAcc + size(Frag) - end, 0, Payload). + #basic_message { content = #content { payload_fragments_rev = Payload, + properties_bin = PropsBin }}) + when is_binary(PropsBin) -> + size(PropsBin) + lists:foldl(fun (Frag, SumAcc) -> + SumAcc + size(Frag) + end, 0, Payload). + +ensure_binary_properties(Msg = #basic_message { + content = Content = #content { + properties = Props, + properties_bin = none }}) -> + Msg #basic_message { content = Content #content { + properties_bin = rabbit_framing:encode_properties(Props) }}; +ensure_binary_properties(Msg) -> + Msg. set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; @@ -299,15 +310,16 @@ on_disk(mixed, _IsDurable, _IsPersistent) -> false. publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, msg_buf = MsgBuf, length = Length }) -> + Msg1 = ensure_binary_properties(Msg), ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:publish(Q, Msg, false); + true -> rabbit_disk_queue:publish(Q, Msg1, false); false -> ok end, MsgBuf1 = case Mode of disk -> inc_queue_length(MsgBuf, 1); - mixed -> queue:in({Msg, false}, MsgBuf) + mixed -> queue:in({Msg1, false}, MsgBuf) end, - {ok, gain_memory(size_of_message(Msg), + {ok, gain_memory(size_of_message(Msg1), State #mqstate { msg_buf = MsgBuf1, length = Length + 1 })}. @@ -318,15 +330,17 @@ publish_delivered(Msg = #basic_message { guid = MsgId, State = #mqstate { is_durable = IsDurable, queue = Q, length = 0 }) when IsDurable andalso IsPersistent -> - ok = rabbit_disk_queue:publish(Q, Msg, true), - State1 = gain_memory(size_of_message(Msg), State), + Msg1 = ensure_binary_properties(Msg), + ok = rabbit_disk_queue:publish(Q, Msg1, true), + State1 = gain_memory(size_of_message(Msg1), State), %% must call phantom_fetch otherwise the msg remains at the head %% of the queue. This is synchronous, but unavoidable as we need %% the AckTag {MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), {ok, AckTag, State1}; publish_delivered(Msg, State = #mqstate { length = 0 }) -> - {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}. + Msg1 = ensure_binary_properties(Msg), + {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}. fetch(State = #mqstate { length = 0 }) -> {empty, State}; @@ -390,10 +404,11 @@ maybe_ack(Q, _, _, AckTag) -> remove_diskless(MsgsWithAcks) -> lists:foldl( fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> + Msg1 = ensure_binary_properties(Msg), {case AckTag of not_on_disk -> AccAckTags; _ -> [AckTag | AccAckTags] - end, size_of_message(Msg) + AccSize} + end, size_of_message(Msg1) + AccSize} end, {[], 0}, MsgsWithAcks). ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> @@ -406,11 +421,12 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, State = #mqstate { mode = Mode, is_durable = IsDurable }) -> + Msg1 = ensure_binary_properties(Msg), ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:tx_publish(Msg); + true -> rabbit_disk_queue:tx_publish(Msg1); false -> ok end, - {ok, gain_memory(size_of_message(Msg), State)}. + {ok, gain_memory(size_of_message(Msg1), State)}. tx_commit(Publishes, MsgsWithAcks, State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, @@ -441,7 +457,8 @@ tx_rollback(Publishes, lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, {Acc, CSizeAcc}) -> - CSizeAcc1 = CSizeAcc + size_of_message(Msg), + Msg1 = ensure_binary_properties(Msg), + CSizeAcc1 = CSizeAcc + size_of_message(Msg1), {case on_disk(Mode, IsDurable, IsPersistent) of true -> [MsgId | Acc]; _ -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 884adbf870..44abdda415 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -856,6 +856,10 @@ rdq_match_message( MsgId, MsgBody, Size) when size(MsgBody) =:= Size -> ok. +rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}, + #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) -> + ok. + rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), @@ -1226,9 +1230,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - lists:foldl( fun (Msg, {Acc, MS7}) -> Rem = Len1 - (Msg #basic_message.guid) - 1, - {{Msg, false, AckTag, Rem}, MS7a} = + {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), - {[{Msg, AckTag} | Acc], MS7a} + ok = rdq_match_messages(Msg, Msg1), + {[{Msg1, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA ++ MsgsB), 0 = rabbit_mixed_queue:len(MS8), rabbit_mixed_queue:ack(AckTags, MS8); @@ -1239,9 +1244,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - lists:foldl( fun (Msg, {Acc, MS7}) -> Rem = Len0 - (Msg #basic_message.guid) - 1, - {{Msg, false, AckTag, Rem}, MS7a} = + {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), - {[{Msg, AckTag} | Acc], MS7a} + ok = rdq_match_messages(Msg, Msg1), + {[{Msg1, AckTag} | Acc], MS7a} end, {[], MS6}, MsgsA), 0 = rabbit_mixed_queue:len(MS8), rabbit_mixed_queue:ack(AckTags, MS8) |
