summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-08 11:18:26 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-08 11:18:26 +0100
commitf7aaa4f2423f05cadaa1c04a9fe684bedfd29b16 (patch)
tree04f8238ebd83142d73b6203e0ca8ab5e1e5f48b2 /src
parent6d89e86148174f2abc8a83ddb71555c498a25a16 (diff)
downloadrabbitmq-server-git-f7aaa4f2423f05cadaa1c04a9fe684bedfd29b16.tar.gz
change guid to a binary, using the md5 of term_to_binary
The main motivation is to reduce the memory and on-disk footprint of the guid from ~34 bytes to 16. But it turns out that this actually results in a speed improvement of a few percent as well, even for non-persistent messaging, presumably due to the memory management effects and the fact that 16 byte binaries are easier to copy between processes than the deep(ish) original guid structure.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_guid.erl4
-rw-r--r--src/rabbit_msg_file.erl37
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_tests.erl38
4 files changed, 43 insertions, 38 deletions
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 45816b85c5..5053d18831 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -99,7 +99,7 @@ guid() ->
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a guid. Note that any
%% monotonicity of the guid is not preserved in the encoding.
@@ -110,7 +110,7 @@ string_guid(Prefix) ->
%%
%% TODO: once debian stable and EPEL have moved from R11B-2 to
%% R11B-4 or later we should change this to use base64.
- Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ ssl_base64:encode(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index f14656cfbc..46128612e7 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -46,7 +46,7 @@
-ifdef(use_specs).
-type(io_device() :: any()).
--type(msg_id() :: any()).
+-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(msg_attrs() :: any()).
-type(position() :: non_neg_integer()).
@@ -63,16 +63,16 @@
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody, MsgAttrs) ->
- [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins =
- [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]],
- [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
- [size(B) || B <- Bins],
+append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) ->
+ MsgBodyBin = term_to_binary(MsgBody),
+ MsgAttrsBin = term_to_binary(MsgAttrs),
+ [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes =
+ [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]],
Size = lists:sum(Sizes),
case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS,
- MsgIdBin:MsgIdBinSize/binary,
+ MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
@@ -85,17 +85,16 @@ read(FileHdl, TotalSize) ->
SizeWriteOkBytes = Size + 1,
case file:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS,
Rest:SizeWriteOkBytes/binary>>} ->
- BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize,
- <<MsgIdBin:MsgIdBinSize/binary,
+ BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize,
+ <<MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest,
- [MsgId, MsgBody, MsgAttrs] =
- [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]],
- {ok, {MsgId, MsgBody, MsgAttrs}};
+ {ok, {MsgId,
+ binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}};
KO -> KO
end.
@@ -119,10 +118,10 @@ read_next(FileHdl, Offset) ->
case file:read(FileHdl, ThreeIntegers) of
{ok,
<<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdSize:?INTEGER_SIZE_BITS,
MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} ->
if Size == 0 -> eof; %% Nothing we can do other than stop
- MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 ->
+ MsgIdSize == 0 orelse MsgAttrsBinSize == 0 ->
%% current message corrupted, try skipping past it
ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
case file:position(FileHdl, {cur, Size + 1}) of
@@ -131,9 +130,9 @@ read_next(FileHdl, Offset) ->
KO -> KO
end;
true -> %% all good, let's continue
- HeaderSize = MsgIdBinSize + MsgAttrsBinSize,
+ HeaderSize = MsgIdSize + MsgAttrsBinSize,
case file:read(FileHdl, HeaderSize) of
- {ok, <<MsgIdBin:MsgIdBinSize/binary,
+ {ok, <<MsgId:MsgIdSize/binary,
MsgAttrsBin:MsgAttrsBinSize/binary>>} ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
ExpectedAbsPos = Offset + TotalSize - 1,
@@ -144,7 +143,7 @@ read_next(FileHdl, Offset) ->
case file:read(FileHdl, 1) of
{ok, <<?WRITE_OK_MARKER:
?WRITE_OK_SIZE_BITS>>} ->
- {ok, {binary_to_term(MsgIdBin),
+ {ok, {MsgId,
binary_to_term(MsgAttrsBin),
TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 357c4867e7..da904193a0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -82,7 +82,7 @@
-type(mode() :: 'ram_disk' | 'disk_only').
-type(dets_table() :: any()).
-type(ets_table() :: any()).
--type(msg_id() :: any()).
+-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(msg_attrs() :: any()).
-type(file_path() :: any()).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 039e9aa487..1f2187bca7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -848,8 +848,11 @@ benchmark_disk_queue() ->
passed.
rdq_message(MsgId, MsgBody, IsPersistent) ->
- rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent).
+ rabbit_basic:message(x, <<>>, [], MsgBody, term_to_binary(MsgId),
+ IsPersistent).
+rdq_match_message(Msg, MsgId, MsgBody, Size) when is_number(MsgId) ->
+ rdq_match_message(Msg, term_to_binary(MsgId), MsgBody, Size);
rdq_match_message(
#basic_message { guid = MsgId, content =
#content { payload_fragments_rev = [MsgBody] }},
@@ -860,13 +863,17 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f
#basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) ->
ok.
+commit_list(List, MsgCount) ->
+ lists:zip([term_to_binary(MsgId) || MsgId <- List],
+ lists:duplicate(MsgCount, false)).
+
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Startup = rdq_virgin(),
rdq_start(),
QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
- CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ CommitList = commit_list(List, MsgCount),
{Publish, ok} =
timer:tc(?MODULE, rdq_time_commands,
[[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false))
@@ -905,7 +912,7 @@ rdq_stress_gc(MsgCount) ->
MsgSizeBytes = 256*1024,
Msg = <<0:(8*MsgSizeBytes)>>, % 256KB
List = lists:seq(1, MsgCount),
- CommitList = lists:zip(List, lists:duplicate(MsgCount, false)),
+ CommitList = commit_list(List, MsgCount),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List],
rabbit_disk_queue:tx_commit(q, CommitList, []),
StartChunk = round(MsgCount / 20), % 5%
@@ -948,7 +955,7 @@ rdq_test_startup_with_queue_gaps() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1005,7 +1012,7 @@ rdq_test_redeliver() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1058,7 +1065,7 @@ rdq_test_purge() ->
Total = 1000,
Half = round(Total/2),
All = lists:seq(1,Total),
- CommitAll = lists:zip(All, lists:duplicate(Total, false)),
+ CommitAll = commit_list(All, Total),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All],
rabbit_disk_queue:tx_commit(q, CommitAll, []),
io:format("Publish done~n", []),
@@ -1170,12 +1177,10 @@ rdq_test_mixed_queue_modes() ->
rdq_test_mode_conversion_mid_txn() ->
Payload = <<0:(8*256)>>,
MsgIdsA = lists:seq(0,9),
- MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
- (0 == MsgId rem 2))
- || MsgId <- MsgIdsA ],
+ MsgsA = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2))
+ || MsgId <- MsgIdsA ],
MsgIdsB = lists:seq(10,20),
- MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId,
- (0 == MsgId rem 2))
+ MsgsB = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2))
|| MsgId <- MsgIdsB ],
rdq_virgin(),
@@ -1229,7 +1234,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
{AckTags, MS8} =
lists:foldl(
fun (Msg, {Acc, MS7}) ->
- Rem = Len1 - (Msg #basic_message.guid) - 1,
+ MsgId = binary_to_term(Msg #basic_message.guid),
+ Rem = Len1 - MsgId - 1,
{{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
ok = rdq_match_messages(Msg, Msg1),
@@ -1243,7 +1249,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
{AckTags, MS8} =
lists:foldl(
fun (Msg, {Acc, MS7}) ->
- Rem = Len0 - (Msg #basic_message.guid) - 1,
+ MsgId = binary_to_term(Msg #basic_message.guid),
+ Rem = Len0 - MsgId - 1,
{{Msg1, false, AckTag, Rem}, MS7a} =
rabbit_mixed_queue:fetch(MS7),
ok = rdq_match_messages(Msg, Msg1),
@@ -1266,9 +1273,8 @@ rdq_test_disk_queue_modes() ->
Total = 1000,
Half1 = lists:seq(1,round(Total/2)),
Half2 = lists:seq(1 + round(Total/2), Total),
- CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)),
- CommitHalf2 = lists:zip(Half2, lists:duplicate
- (Total - round(Total/2), false)),
+ CommitHalf1 = commit_list(Half1, round(Total/2)),
+ CommitHalf2 = commit_list(Half2, Total - round(Total/2)),
[rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []),
io:format("Publish done~n", []),