diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-02 14:33:53 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-02 14:33:53 +0100 |
| commit | 993959790fe5ccca6670cd34160d0a8aaf998a5d (patch) | |
| tree | b94de883b27b540272eb52dc4fd2e9579b71d768 | |
| parent | b7abb6d9a8afe2786bfe5640dc5754ca108d168f (diff) | |
| download | rabbitmq-server-git-993959790fe5ccca6670cd34160d0a8aaf998a5d.tar.gz | |
...and now some of it actually works
| -rw-r--r-- | src/rabbit_queue_index.erl | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 36637323f4..0ec0cd3dc5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/1, write_published/5, write_delivered/2, write_acks/2, +-export([init/1, write_published/4, write_delivered/2, write_acks/2, flush_journal/1, read_segment_entries/2]). -define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768). @@ -40,18 +40,17 @@ -define(SEQ_BITS, (?SEQ_BYTES * 8)). -define(SEGMENT_EXTENSION, ".idx"). --define(REL_SEQ_BITS, 13). --define(SEGMENT_ENTRIES_COUNT, 8192). %% trunc(math:pow(2,?REL_SEQ_BITS))). +-define(REL_SEQ_BITS, 14). +-define(SEGMENT_ENTRIES_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). -%% seq only is binary 000 followed by 13 bits of rel seq id -%% (range: 0 - 8191) --define(REL_SEQ_ONLY_PREFIX, 000). --define(REL_SEQ_ONLY_PREFIX_BITS, 3). +%% seq only is binary 00 followed by 14 bits of rel seq id +%% (range: 0 - 16383) +-define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX_BITS, 2). -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). -%% publish record is binary 1 followed by bits for -%% is_delivered and is_persistent, then 13 bits of rel seq id, -%% and 128 bits of md5sum msg id +%% publish record is binary 1 followed by a bit for is_persistent, +%% then 14 bits of rel seq id, and 128 bits of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). @@ -97,7 +96,7 @@ }). -spec(init/1 :: (string()) -> qistate()). --spec(write_published/5 :: (msg_id(), seq_id(), boolean(), boolean(), qistate()) +-spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate()) -> qistate()). -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). @@ -124,15 +123,15 @@ init(Name) -> seg_ack_counts = AckCounts }. -write_published(MsgId, SeqId, IsDelivered, IsPersistent, State) -> +write_published(MsgId, SeqId, IsPersistent, State) + when is_binary(MsgId) -> + ?MSG_ID_BYTES = size(MsgId), {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), {Hdl, State1} = get_file_handle_for_seg(SegNum, State), - IsDeliveredNum = bool_to_int(IsDelivered), - IsPersistentNum = bool_to_int(IsPersistent), ok = file:write(Hdl, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsDeliveredNum:1, IsPersistentNum:1, - RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS/binary>>), + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS, MsgId/binary>>), State1. write_delivered(SeqId, State) -> @@ -204,6 +203,8 @@ read_segment_entries(InitSeqId, State = #qistate { dir = Dir }) -> %% Minor Helpers %%---------------------------------------------------------------------------- +close_file_handle_for_seg(undefined, State) -> + State; close_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl }) -> ok = file:sync(Hdl), @@ -215,8 +216,9 @@ close_file_handle_for_seg(_SegNum, State) -> get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl }) -> {Hdl, State}; -get_file_handle_for_seg(SegNum, State) -> - State1 = #qistate { dir = Dir } = close_file_handle_for_seg(SegNum, State), +get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) -> + State1 = #qistate { dir = Dir } = + close_file_handle_for_seg(CurSegNum, State), {ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum), [binary, raw, append, delayed_write]), {Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}. @@ -241,12 +243,10 @@ seg_num_to_path(Dir, SegNum) -> find_ack_counts(Dir) -> SegNumsPaths = - lists:map( - fun (SegName) -> - {list_to_integer( - lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, - SegName)), filename:join(Dir, SegName)} - end, filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)), + [{list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, + SegName)), filename:join(Dir, SegName)} + || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)], lists:foldl( fun ({SegNum, SegPath}, Acc) -> case load_segment(SegNum, SegPath) of @@ -312,14 +312,16 @@ load_segment_entries(SegNum, Hdl, {SDict, AckCount}) -> load_segment_entries(SegNum, Hdl, deliver_or_ack_msg(SDict, AckCount, RelSeq)); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsDeliveredNum:1, IsPersistentNum:1, MSB/bitstring>>} -> - {ok, <<LSB:8/binary, MsgId:?MSG_ID_BITS/binary>>} = + IsPersistentNum:1, MSB/bitstring>>} -> + %% because we specify /binary, and binaries are complete + %% bytes, the size spec is in bytes, not bits. + {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), <<RelSeq:?REL_SEQ_BITS>> = <<MSB/bitstring, LSB/binary>>, load_segment_entries( - SegNum, Hdl, {dict:store(RelSeq, {MsgId, 1 == IsDeliveredNum, - 1 == IsPersistentNum}, SDict), - AckCount}); + SegNum, Hdl, {dict:store(RelSeq, {MsgId, false, + 1 == IsPersistentNum}, + SDict), AckCount}); _ErrOrEoF -> {SDict, AckCount} end. @@ -344,7 +346,7 @@ append_acks_to_segment(SegPath, SegNum, AckCounts, Acks) -> case append_acks_to_segment(SegPath, AckCount, Acks) of 0 -> AckCounts; ?SEGMENT_ENTRIES_COUNT -> dict:erase(SegNum, AckCounts); - AckCount -> dict:store(SegNum, AckCount, AckCounts) + AckCount2 -> dict:store(SegNum, AckCount2, AckCounts) end. append_acks_to_segment(SegPath, AckCount, Acks) |
