summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-26 15:01:58 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-26 15:01:58 +0100
commit2bd0541ad102ec76334dbc7cdf1d414e182b363c (patch)
treebe077da6f9b34b78a1196bfcedb4cd22a7b1ddac
parent610e4a3ec1dc35d00ca0f804665fcf2722715cdf (diff)
downloadrabbitmq-server-git-2bd0541ad102ec76334dbc7cdf1d414e182b363c.tar.gz
made sure that messages that go through the MQ have binary properties added. This allows us to measure their size. The effect of this is that all messages that come out of the MQ have binary properties, even if they went in without. This might be controversial. The only reason for doing this is that otherwise, we'd have to convert-to-measure twice, once on the way in, and once on the way out. If people feel strongly about this, please yell.
-rw-r--r--src/rabbit_mixed_queue.erl45
-rw-r--r--src/rabbit_tests.erl14
2 files changed, 41 insertions, 18 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index bb0ac9733a..9e0eb13f5e 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -114,10 +114,21 @@ init(Queue, IsDurable) ->
memory_loss = undefined, prefetcher = undefined }}.
size_of_message(
- #basic_message { content = #content { payload_fragments_rev = Payload }}) ->
- lists:foldl(fun (Frag, SumAcc) ->
- SumAcc + size(Frag)
- end, 0, Payload).
+ #basic_message { content = #content { payload_fragments_rev = Payload,
+ properties_bin = PropsBin }})
+ when is_binary(PropsBin) ->
+ size(PropsBin) + lists:foldl(fun (Frag, SumAcc) ->
+ SumAcc + size(Frag)
+ end, 0, Payload).
+
+ensure_binary_properties(Msg = #basic_message {
+ content = Content = #content {
+ properties = Props,
+ properties_bin = none }}) ->
+ Msg #basic_message { content = Content #content {
+ properties_bin = rabbit_framing:encode_properties(Props) }};
+ensure_binary_properties(Msg) ->
+ Msg.
set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
{ok, State};
@@ -299,15 +310,16 @@ on_disk(mixed, _IsDurable, _IsPersistent) -> false.
publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
#mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
msg_buf = MsgBuf, length = Length }) ->
+ Msg1 = ensure_binary_properties(Msg),
ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:publish(Q, Msg, false);
+ true -> rabbit_disk_queue:publish(Q, Msg1, false);
false -> ok
end,
MsgBuf1 = case Mode of
disk -> inc_queue_length(MsgBuf, 1);
- mixed -> queue:in({Msg, false}, MsgBuf)
+ mixed -> queue:in({Msg1, false}, MsgBuf)
end,
- {ok, gain_memory(size_of_message(Msg),
+ {ok, gain_memory(size_of_message(Msg1),
State #mqstate { msg_buf = MsgBuf1,
length = Length + 1 })}.
@@ -318,15 +330,17 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
State = #mqstate { is_durable = IsDurable, queue = Q,
length = 0 })
when IsDurable andalso IsPersistent ->
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- State1 = gain_memory(size_of_message(Msg), State),
+ Msg1 = ensure_binary_properties(Msg),
+ ok = rabbit_disk_queue:publish(Q, Msg1, true),
+ State1 = gain_memory(size_of_message(Msg1), State),
%% must call phantom_fetch otherwise the msg remains at the head
%% of the queue. This is synchronous, but unavoidable as we need
%% the AckTag
{MsgId, IsPersistent, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
{ok, AckTag, State1};
publish_delivered(Msg, State = #mqstate { length = 0 }) ->
- {ok, not_on_disk, gain_memory(size_of_message(Msg), State)}.
+ Msg1 = ensure_binary_properties(Msg),
+ {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}.
fetch(State = #mqstate { length = 0 }) ->
{empty, State};
@@ -390,10 +404,11 @@ maybe_ack(Q, _, _, AckTag) ->
remove_diskless(MsgsWithAcks) ->
lists:foldl(
fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
+ Msg1 = ensure_binary_properties(Msg),
{case AckTag of
not_on_disk -> AccAckTags;
_ -> [AckTag | AccAckTags]
- end, size_of_message(Msg) + AccSize}
+ end, size_of_message(Msg1) + AccSize}
end, {[], 0}, MsgsWithAcks).
ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
@@ -406,11 +421,12 @@ ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
+ Msg1 = ensure_binary_properties(Msg),
ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:tx_publish(Msg);
+ true -> rabbit_disk_queue:tx_publish(Msg1);
false -> ok
end,
- {ok, gain_memory(size_of_message(Msg), State)}.
+ {ok, gain_memory(size_of_message(Msg1), State)}.
tx_commit(Publishes, MsgsWithAcks,
State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
@@ -441,7 +457,8 @@ tx_rollback(Publishes,
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }, {Acc, CSizeAcc}) ->
- CSizeAcc1 = CSizeAcc + size_of_message(Msg),
+ Msg1 = ensure_binary_properties(Msg),
+ CSizeAcc1 = CSizeAcc + size_of_message(Msg1),
{case on_disk(Mode, IsDurable, IsPersistent) of
true -> [MsgId | Acc];
_ -> Acc
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 884adbf870..44abdda415 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -856,6 +856,10 @@ rdq_match_message(
MsgId, MsgBody, Size) when size(MsgBody) =:= Size ->
ok.
+rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }},
+ #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) ->
+ ok.
+
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
@@ -1226,9 +1230,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
lists:foldl(
fun (Msg, {Acc, MS7}) ->
Rem = Len1 - (Msg #basic_message.guid) - 1,
- {{Msg, false, AckTag, Rem}, MS7a} =
+ {{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
- {[{Msg, AckTag} | Acc], MS7a}
+ ok = rdq_match_messages(Msg, Msg1),
+ {[{Msg1, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA ++ MsgsB),
0 = rabbit_mixed_queue:len(MS8),
rabbit_mixed_queue:ack(AckTags, MS8);
@@ -1239,9 +1244,10 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
lists:foldl(
fun (Msg, {Acc, MS7}) ->
Rem = Len0 - (Msg #basic_message.guid) - 1,
- {{Msg, false, AckTag, Rem}, MS7a} =
+ {{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
- {[{Msg, AckTag} | Acc], MS7a}
+ ok = rdq_match_messages(Msg, Msg1),
+ {[{Msg1, AckTag} | Acc], MS7a}
end, {[], MS6}, MsgsA),
0 = rabbit_mixed_queue:len(MS8),
rabbit_mixed_queue:ack(AckTags, MS8)