diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-09-08 11:18:26 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-09-08 11:18:26 +0100 |
| commit | f7aaa4f2423f05cadaa1c04a9fe684bedfd29b16 (patch) | |
| tree | 04f8238ebd83142d73b6203e0ca8ab5e1e5f48b2 /src | |
| parent | 6d89e86148174f2abc8a83ddb71555c498a25a16 (diff) | |
| download | rabbitmq-server-git-f7aaa4f2423f05cadaa1c04a9fe684bedfd29b16.tar.gz | |
change guid to a binary, using the md5 of term_to_binary
The main motivation is to reduce the memory and on-disk footprint of
the guid from ~34 bytes to 16. But it turns out that this actually
results in a speed improvement of a few percent as well, even for
non-persistent messaging, presumably due to the memory management
effects and the fact that 16 byte binaries are easier to copy between
processes than the deep(ish) original guid structure.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_guid.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 38 |
4 files changed, 43 insertions, 38 deletions
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 45816b85c5..5053d18831 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -99,7 +99,7 @@ guid() -> {S, I} -> {S, I+1} end, put(guid, G), - G. + erlang:md5(term_to_binary(G)). %% generate a readable string representation of a guid. Note that any %% monotonicity of the guid is not preserved in the encoding. @@ -110,7 +110,7 @@ string_guid(Prefix) -> %% %% TODO: once debian stable and EPEL have moved from R11B-2 to %% R11B-4 or later we should change this to use base64. - Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). + Prefix ++ "-" ++ ssl_base64:encode(guid()). binstring_guid(Prefix) -> list_to_binary(string_guid(Prefix)). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index f14656cfbc..46128612e7 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -46,7 +46,7 @@ -ifdef(use_specs). -type(io_device() :: any()). --type(msg_id() :: any()). +-type(msg_id() :: binary()). -type(msg() :: any()). -type(msg_attrs() :: any()). -type(position() :: non_neg_integer()). @@ -63,16 +63,16 @@ %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody, MsgAttrs) -> - [MsgIdBin, MsgBodyBin, MsgAttrsBin] = Bins = - [term_to_binary(X) || X <- [MsgId, MsgBody, MsgAttrs]], - [MsgIdBinSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = - [size(B) || B <- Bins], +append(FileHdl, MsgId, MsgBody, MsgAttrs) when is_binary(MsgId) -> + MsgBodyBin = term_to_binary(MsgBody), + MsgAttrsBin = term_to_binary(MsgAttrs), + [MsgIdSize, MsgBodyBinSize, MsgAttrsBinSize] = Sizes = + [size(B) || B <- [MsgId, MsgBodyBin, MsgAttrsBin]], Size = lists:sum(Sizes), case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS, - MsgIdBin:MsgIdBinSize/binary, + MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of @@ -85,17 +85,16 @@ read(FileHdl, TotalSize) -> SizeWriteOkBytes = Size + 1, case file:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS, Rest:SizeWriteOkBytes/binary>>} -> - BodyBinSize = Size - MsgIdBinSize - MsgAttrsBinSize, - <<MsgIdBin:MsgIdBinSize/binary, + BodyBinSize = Size - MsgIdSize - MsgAttrsBinSize, + <<MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>> = Rest, - [MsgId, MsgBody, MsgAttrs] = - [binary_to_term(B) || B <- [MsgIdBin, MsgBodyBin, MsgAttrsBin]], - {ok, {MsgId, MsgBody, MsgAttrs}}; + {ok, {MsgId, + binary_to_term(MsgBodyBin), binary_to_term(MsgAttrsBin)}}; KO -> KO end. @@ -119,10 +118,10 @@ read_next(FileHdl, Offset) -> case file:read(FileHdl, ThreeIntegers) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdSize:?INTEGER_SIZE_BITS, MsgAttrsBinSize:?INTEGER_SIZE_BITS>>} -> if Size == 0 -> eof; %% Nothing we can do other than stop - MsgIdBinSize == 0 orelse MsgAttrsBinSize == 0 -> + MsgIdSize == 0 orelse MsgAttrsBinSize == 0 -> %% current message corrupted, try skipping past it ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, case file:position(FileHdl, {cur, Size + 1}) of @@ -131,9 +130,9 @@ read_next(FileHdl, Offset) -> KO -> KO end; true -> %% all good, let's continue - HeaderSize = MsgIdBinSize + MsgAttrsBinSize, + HeaderSize = MsgIdSize + MsgAttrsBinSize, case file:read(FileHdl, HeaderSize) of - {ok, <<MsgIdBin:MsgIdBinSize/binary, + {ok, <<MsgId:MsgIdSize/binary, MsgAttrsBin:MsgAttrsBinSize/binary>>} -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, ExpectedAbsPos = Offset + TotalSize - 1, @@ -144,7 +143,7 @@ read_next(FileHdl, Offset) -> case file:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - {ok, {binary_to_term(MsgIdBin), + {ok, {MsgId, binary_to_term(MsgAttrsBin), TotalSize, NextOffset}}; {ok, _SomeOtherData} -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 357c4867e7..da904193a0 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -82,7 +82,7 @@ -type(mode() :: 'ram_disk' | 'disk_only'). -type(dets_table() :: any()). -type(ets_table() :: any()). --type(msg_id() :: any()). +-type(msg_id() :: binary()). -type(msg() :: any()). -type(msg_attrs() :: any()). -type(file_path() :: any()). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 039e9aa487..1f2187bca7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -848,8 +848,11 @@ benchmark_disk_queue() -> passed. rdq_message(MsgId, MsgBody, IsPersistent) -> - rabbit_basic:message(x, <<>>, [], MsgBody, MsgId, IsPersistent). + rabbit_basic:message(x, <<>>, [], MsgBody, term_to_binary(MsgId), + IsPersistent). +rdq_match_message(Msg, MsgId, MsgBody, Size) when is_number(MsgId) -> + rdq_match_message(Msg, term_to_binary(MsgId), MsgBody, Size); rdq_match_message( #basic_message { guid = MsgId, content = #content { payload_fragments_rev = [MsgBody] }}, @@ -860,13 +863,17 @@ rdq_match_messages(#basic_message { guid = MsgId, content = #content { payload_f #basic_message { guid = MsgId, content = #content { payload_fragments_rev = MsgBody }}) -> ok. +commit_list(List, MsgCount) -> + lists:zip([term_to_binary(MsgId) || MsgId <- List], + lists:duplicate(MsgCount, false)). + rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> Startup = rdq_virgin(), rdq_start(), QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), - CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + CommitList = commit_list(List, MsgCount), {Publish, ok} = timer:tc(?MODULE, rdq_time_commands, [[fun() -> [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) @@ -905,7 +912,7 @@ rdq_stress_gc(MsgCount) -> MsgSizeBytes = 256*1024, Msg = <<0:(8*MsgSizeBytes)>>, % 256KB List = lists:seq(1, MsgCount), - CommitList = lists:zip(List, lists:duplicate(MsgCount, false)), + CommitList = commit_list(List, MsgCount), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- List], rabbit_disk_queue:tx_commit(q, CommitList, []), StartChunk = round(MsgCount / 20), % 5% @@ -948,7 +955,7 @@ rdq_test_startup_with_queue_gaps() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, true)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1005,7 +1012,7 @@ rdq_test_redeliver() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1058,7 +1065,7 @@ rdq_test_purge() -> Total = 1000, Half = round(Total/2), All = lists:seq(1,Total), - CommitAll = lists:zip(All, lists:duplicate(Total, false)), + CommitAll = commit_list(All, Total), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- All], rabbit_disk_queue:tx_commit(q, CommitAll, []), io:format("Publish done~n", []), @@ -1170,12 +1177,10 @@ rdq_test_mixed_queue_modes() -> rdq_test_mode_conversion_mid_txn() -> Payload = <<0:(8*256)>>, MsgIdsA = lists:seq(0,9), - MsgsA = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, - (0 == MsgId rem 2)) - || MsgId <- MsgIdsA ], + MsgsA = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2)) + || MsgId <- MsgIdsA ], MsgIdsB = lists:seq(10,20), - MsgsB = [ rabbit_basic:message(x, <<>>, [], Payload, MsgId, - (0 == MsgId rem 2)) + MsgsB = [ rdq_message(MsgId, Payload, (0 == MsgId rem 2)) || MsgId <- MsgIdsB ], rdq_virgin(), @@ -1229,7 +1234,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - {AckTags, MS8} = lists:foldl( fun (Msg, {Acc, MS7}) -> - Rem = Len1 - (Msg #basic_message.guid) - 1, + MsgId = binary_to_term(Msg #basic_message.guid), + Rem = Len1 - MsgId - 1, {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), ok = rdq_match_messages(Msg, Msg1), @@ -1243,7 +1249,8 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - {AckTags, MS8} = lists:foldl( fun (Msg, {Acc, MS7}) -> - Rem = Len0 - (Msg #basic_message.guid) - 1, + MsgId = binary_to_term(Msg #basic_message.guid), + Rem = Len0 - MsgId - 1, {{Msg1, false, AckTag, Rem}, MS7a} = rabbit_mixed_queue:fetch(MS7), ok = rdq_match_messages(Msg, Msg1), @@ -1266,9 +1273,8 @@ rdq_test_disk_queue_modes() -> Total = 1000, Half1 = lists:seq(1,round(Total/2)), Half2 = lists:seq(1 + round(Total/2), Total), - CommitHalf1 = lists:zip(Half1, lists:duplicate(round(Total/2), false)), - CommitHalf2 = lists:zip(Half2, lists:duplicate - (Total - round(Total/2), false)), + CommitHalf1 = commit_list(Half1, round(Total/2)), + CommitHalf2 = commit_list(Half2, Total - round(Total/2)), [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []), io:format("Publish done~n", []), |
