summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-02 14:33:53 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-02 14:33:53 +0100
commit993959790fe5ccca6670cd34160d0a8aaf998a5d (patch)
treeb94de883b27b540272eb52dc4fd2e9579b71d768
parentb7abb6d9a8afe2786bfe5640dc5754ca108d168f (diff)
downloadrabbitmq-server-git-993959790fe5ccca6670cd34160d0a8aaf998a5d.tar.gz
...and now some of it actually works
-rw-r--r--src/rabbit_queue_index.erl62
1 files changed, 32 insertions, 30 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 36637323f4..0ec0cd3dc5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, write_published/5, write_delivered/2, write_acks/2,
+-export([init/1, write_published/4, write_delivered/2, write_acks/2,
flush_journal/1, read_segment_entries/2]).
-define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768).
@@ -40,18 +40,17 @@
-define(SEQ_BITS, (?SEQ_BYTES * 8)).
-define(SEGMENT_EXTENSION, ".idx").
--define(REL_SEQ_BITS, 13).
--define(SEGMENT_ENTRIES_COUNT, 8192). %% trunc(math:pow(2,?REL_SEQ_BITS))).
+-define(REL_SEQ_BITS, 14).
+-define(SEGMENT_ENTRIES_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
-%% seq only is binary 000 followed by 13 bits of rel seq id
-%% (range: 0 - 8191)
--define(REL_SEQ_ONLY_PREFIX, 000).
--define(REL_SEQ_ONLY_PREFIX_BITS, 3).
+%% seq only is binary 00 followed by 14 bits of rel seq id
+%% (range: 0 - 16383)
+-define(REL_SEQ_ONLY_PREFIX, 00).
+-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
-%% publish record is binary 1 followed by bits for
-%% is_delivered and is_persistent, then 13 bits of rel seq id,
-%% and 128 bits of md5sum msg id
+%% publish record is binary 1 followed by a bit for is_persistent,
+%% then 14 bits of rel seq id, and 128 bits of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
@@ -97,7 +96,7 @@
}).
-spec(init/1 :: (string()) -> qistate()).
--spec(write_published/5 :: (msg_id(), seq_id(), boolean(), boolean(), qistate())
+-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -124,15 +123,15 @@ init(Name) ->
seg_ack_counts = AckCounts
}.
-write_published(MsgId, SeqId, IsDelivered, IsPersistent, State) ->
+write_published(MsgId, SeqId, IsPersistent, State)
+ when is_binary(MsgId) ->
+ ?MSG_ID_BYTES = size(MsgId),
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
{Hdl, State1} = get_file_handle_for_seg(SegNum, State),
- IsDeliveredNum = bool_to_int(IsDelivered),
- IsPersistentNum = bool_to_int(IsPersistent),
ok = file:write(Hdl,
<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsDeliveredNum:1, IsPersistentNum:1,
- RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS/binary>>),
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, MsgId/binary>>),
State1.
write_delivered(SeqId, State) ->
@@ -204,6 +203,8 @@ read_segment_entries(InitSeqId, State = #qistate { dir = Dir }) ->
%% Minor Helpers
%%----------------------------------------------------------------------------
+close_file_handle_for_seg(undefined, State) ->
+ State;
close_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum,
cur_seg_hdl = Hdl }) ->
ok = file:sync(Hdl),
@@ -215,8 +216,9 @@ close_file_handle_for_seg(_SegNum, State) ->
get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum,
cur_seg_hdl = Hdl }) ->
{Hdl, State};
-get_file_handle_for_seg(SegNum, State) ->
- State1 = #qistate { dir = Dir } = close_file_handle_for_seg(SegNum, State),
+get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) ->
+ State1 = #qistate { dir = Dir } =
+ close_file_handle_for_seg(CurSegNum, State),
{ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum),
[binary, raw, append, delayed_write]),
{Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}.
@@ -241,12 +243,10 @@ seg_num_to_path(Dir, SegNum) ->
find_ack_counts(Dir) ->
SegNumsPaths =
- lists:map(
- fun (SegName) ->
- {list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
- SegName)), filename:join(Dir, SegName)}
- end, filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)),
+ [{list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
+ SegName)), filename:join(Dir, SegName)}
+ || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
lists:foldl(
fun ({SegNum, SegPath}, Acc) ->
case load_segment(SegNum, SegPath) of
@@ -312,14 +312,16 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount}) ->
load_segment_entries(SegNum, Hdl,
deliver_or_ack_msg(SDict, AckCount, RelSeq));
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsDeliveredNum:1, IsPersistentNum:1, MSB/bitstring>>} ->
- {ok, <<LSB:8/binary, MsgId:?MSG_ID_BITS/binary>>} =
+ IsPersistentNum:1, MSB/bitstring>>} ->
+ %% because we specify /binary, and binaries are complete
+ %% bytes, the size spec is in bytes, not bits.
+ {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>,
load_segment_entries(
- SegNum, Hdl, {dict:store(RelSeq, {MsgId, 1 == IsDeliveredNum,
- 1 == IsPersistentNum}, SDict),
- AckCount});
+ SegNum, Hdl, {dict:store(RelSeq, {MsgId, false,
+ 1 == IsPersistentNum},
+ SDict), AckCount});
_ErrOrEoF -> {SDict, AckCount}
end.
@@ -344,7 +346,7 @@ append_acks_to_segment(SegPath, SegNum, AckCounts, Acks) ->
case append_acks_to_segment(SegPath, AckCount, Acks) of
0 -> AckCounts;
?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts);
- AckCount -> dict:store(SegNum, AckCount, AckCounts)
+ AckCount2 -> dict:store(SegNum, AckCount2, AckCounts)
end.
append_acks_to_segment(SegPath, AckCount, Acks)