summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl1186
-rw-r--r--src/rabbit_queue_index3.erl850
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl2
4 files changed, 594 insertions, 1460 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 91ecd66925..2b4ec1a473 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,54 +32,28 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1,
+ write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
-%%----------------------------------------------------------------------------
-%% The queue disk index
-%%
-%% The queue disk index operates over a journal, and a number of
-%% segment files. Each segment is the same size, both in max number of
-%% entries, and max file size, owing to fixed sized records.
-%%
-%% Publishes are written directly to the segment files. The segment is
-%% found by dividing the sequence id by the the max number of entries
-%% per segment. Only the relative sequence within the segment is
-%% recorded as the sequence id within a segment file (i.e. sequence id
-%% modulo max number of entries per segment). This is keeps entries
-%% as small as possible. Publishes are only ever going to be received
-%% in contiguous ascending order.
-%%
-%% Acks and deliveries are written to a bounded journal and are also
-%% held in memory, each in a dict with the segment as the key. Again,
-%% the records are fixed size: the entire sequence id is written and
-%% is limited to a 63-bit unsigned integer. The remaining bit
-%% indicates whether the journal entry is for a delivery or an
-%% ack. When the journal gets too big, or flush_journal is called, the
-%% journal is (possibly incrementally) flushed out to the segment
-%% files. As acks and delivery notes can be received in any order
-%% (this is not obvious for deliveries, but consider what happens when
-%% eg msgs are *re*queued - you'll publish and then mark the msgs
-%% delivered immediately, which may be out of order), this journal
-%% reduces seeking, and batches writes to the segment files, keeping
-%% performance high.
-%%
-%% On startup, the journal is read along with all the segment files,
-%% and the journal is fully flushed out to the segment files. Care is
-%% taken to ensure that no message can be delivered or ack'd twice.
-%%
-%%----------------------------------------------------------------------------
-
-define(CLEAN_FILENAME, "clean.dot").
+%%----------------------------------------------------------------------------
+%% ---- Journal details ----
+
-define(MAX_JOURNAL_ENTRY_COUNT, 32768).
-define(JOURNAL_FILENAME, "journal.jif").
--define(DEL_BIT, 0).
--define(ACK_BIT, 1).
+-define(PUB_PERSIST_JPREFIX, 2#00).
+-define(PUB_TRANS_JPREFIX, 2#01).
+-define(DEL_JPREFIX, 2#10).
+-define(ACK_JPREFIX, 2#11).
+-define(JPREFIX_BITS, 2).
-define(SEQ_BYTES, 8).
--define(SEQ_BITS, ((?SEQ_BYTES * 8) - 1)).
+-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)).
+
+%% ---- Segment details ----
+
-define(SEGMENT_EXTENSION, ".idx").
-define(REL_SEQ_BITS, 14).
@@ -111,13 +85,18 @@
-record(qistate,
{ dir,
- seg_num_handles,
- journal_count,
- journal_ack_dict,
- journal_del_dict,
- seg_ack_counts,
- publish_handle,
- partial_segments
+ segments,
+ journal_handle,
+ dirty_count
+ }).
+
+-record(segment,
+ { pubs,
+ acks,
+ handle,
+ journal_entries,
+ path,
+ num
}).
-include("rabbit.hrl").
@@ -129,16 +108,10 @@
-type(hdl() :: ('undefined' | any())).
-type(msg_id() :: binary()).
-type(seq_id() :: integer()).
--type(hdl_and_count() :: ('undefined' |
- {non_neg_integer(), hdl(), non_neg_integer()})).
--type(qistate() :: #qistate { dir :: file_path(),
- seg_num_handles :: dict(),
- journal_count :: integer(),
- journal_ack_dict :: dict(),
- journal_del_dict :: dict(),
- seg_ack_counts :: dict(),
- publish_handle :: hdl_and_count(),
- partial_segments :: dict()
+-type(qistate() :: #qistate { dir :: file_path(),
+ segments :: dict(),
+ journal_handle :: hdl(),
+ dirty_count :: integer()
}).
-spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}).
@@ -148,7 +121,7 @@
-> qistate()).
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(sync_seq_ids/3 :: ([seq_id()], boolean(), qistate()) -> qistate()).
+-spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> qistate()).
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
@@ -167,16 +140,61 @@
init(Name) ->
State = blank_state(Name),
- {TotalMsgCount, State1} = read_and_prune_segments(State),
- scatter_journal(TotalMsgCount, State1).
-
-terminate(State = #qistate { seg_num_handles = SegHdls }) ->
- case 0 == dict:size(SegHdls) of
- true -> State;
- false -> State1 = #qistate { dir = Dir } = close_all_handles(State),
- store_clean_shutdown(Dir),
- State1 #qistate { publish_handle = undefined }
- end.
+ %% 1. Load the journal completely. This will also load segments
+ %% which have entries in the journal and remove duplicates.
+ %% The counts will correctly reflect the combination of the
+ %% segment and the journal.
+ State1 = load_journal(State),
+ %% 2. Flush the journal. This makes life easier for everyone, as
+ %% it means there won't be any publishes in the journal alone.
+ State2 = #qistate { dir = Dir } = flush_journal(State1),
+ %% 3. Load each segment in turn and filter out messages that are
+ %% not in the msg_store, by adding acks to the journal. These
+ %% acks only go to the RAM journal as it doesn't matter if we
+ %% lose them. Also mark delivered if not clean shutdown.
+ AllSegs = all_segment_nums(State2),
+ CleanShutdown = detect_clean_shutdown(Dir),
+ %% We know the journal is empty here, so we don't need to combine
+ %% with the journal, and we don't need to worry about messages
+ %% that have been acked.
+ State3 =
+ lists:foldl(
+ fun (Seg, StateN) ->
+ {SegDict, _PubCount, _AckCount, StateN1} =
+ load_segment(Seg, false, StateN),
+ dict:fold(
+ fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
+ StateM) ->
+ SeqId = reconstruct_seq_id(Seg, RelSeq),
+ InMsgStore = rabbit_msg_store:contains(MsgId),
+ case {InMsgStore, CleanShutdown} of
+ {true, true} ->
+ StateM;
+ {true, false} when Del == del ->
+ StateM;
+ {true, false} ->
+ add_to_journal(SeqId, del, StateM);
+ {false, _} when Del == del ->
+ add_to_journal(SeqId, ack, StateM);
+ {false, _} ->
+ add_to_journal(
+ SeqId, ack,
+ add_to_journal(SeqId, del, StateM))
+ end
+ end, StateN1, SegDict)
+ end, State2, AllSegs),
+ %% 4. Go through all segments and calculate the number of unacked
+ %% messages we have.
+ Count = lists:foldl(
+ fun (Seg, CountAcc) ->
+ #segment { pubs = PubCount, acks = AckCount } =
+ find_segment(Seg, State3),
+ CountAcc + PubCount - AckCount
+ end, 0, AllSegs),
+ {Count, State3}.
+
+terminate(State) ->
+ terminate(true, State).
terminate_and_erase(State) ->
State1 = terminate(State),
@@ -186,123 +204,114 @@ terminate_and_erase(State) ->
write_published(MsgId, SeqId, IsPersistent, State)
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
- {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- {Hdl, State1} = get_pub_handle(SegNum, State),
- ok = file_handle_cache:append(Hdl,
- <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS, MsgId/binary>>),
- State1.
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(JournalHdl,
+ [<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>,
+ MsgId]),
+ maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)).
+
+write_delivered(SeqId, State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(JournalHdl,
+ <<?DEL_JPREFIX:?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>),
+ maybe_flush_journal(add_to_journal(SeqId, del, State1)).
+
+write_acks(SeqIds, State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(JournalHdl,
+ [<<?ACK_JPREFIX:?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
+ State2 = lists:foldl(fun (SeqId, StateN) ->
+ add_to_journal(SeqId, ack, StateN)
+ end, State1, SeqIds),
+ maybe_flush_journal(State2).
+
+sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ State.
-write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) ->
- {JDelDict1, State1} =
- write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>],
- [SeqId], JDelDict, State),
- maybe_flush(State1 #qistate { journal_del_dict = JDelDict1 }).
-
-write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) ->
- {JAckDict1, State1} =
- write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> || SeqId <- SeqIds],
- SeqIds, JAckDict, State),
- maybe_flush(State1 #qistate { journal_ack_dict = JAckDict1 }).
-
-sync_seq_ids(SeqIds, SyncAckJournal, State) ->
- State1 = case SyncAckJournal of
- true -> {Hdl, State2} = get_journal_handle(State),
- ok = file_handle_cache:sync(Hdl),
- State2;
- false -> State
- end,
- SegNumsSet =
- lists:foldl(
- fun (SeqId, Set) ->
- {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- sets:add_element(SegNum, Set)
- end, sets:new(), SeqIds),
- sets:fold(
- fun (SegNum, StateN) ->
- {Hdl1, StateM} = get_seg_handle(SegNum, StateN),
- ok = file_handle_cache:sync(Hdl1),
- StateM
- end, State1, SegNumsSet).
-
-flush_journal(State = #qistate { journal_count = 0 }) ->
+flush_journal(State = #qistate { dirty_count = 0 }) ->
State;
-flush_journal(State = #qistate { journal_ack_dict = JAckDict,
- journal_del_dict = JDelDict,
- journal_count = JCount }) ->
- SegNum = case dict:fetch_keys(JAckDict) of
- [] -> hd(dict:fetch_keys(JDelDict));
- [N|_] -> N
- end,
- Dels = seg_entries_from_dict(SegNum, JDelDict),
- Acks = seg_entries_from_dict(SegNum, JAckDict),
- State1 = append_dels_to_segment(SegNum, Dels, State),
- State2 = append_acks_to_segment(SegNum, Acks, State1),
- JCount1 = JCount - length(Dels) - length(Acks),
- State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
- journal_ack_dict = dict:erase(SegNum, JAckDict),
- journal_count = JCount1 },
- case JCount1 of
- 0 -> {Hdl, State4} = get_journal_handle(State3),
- {ok, 0} = file_handle_cache:position(Hdl, bof),
- ok = file_handle_cache:truncate(Hdl),
- ok = file_handle_cache:sync(Hdl),
- State4;
- _ -> flush_journal(State3)
- end.
+flush_journal(State = #qistate { segments = Segments }) ->
+ State1 =
+ dict:fold(
+ fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
+ acks = AckCount } = Segment, StateN) ->
+ case PubCount > 0 andalso PubCount == AckCount of
+ true ->
+ ok = delete_segment(Segment),
+ StateN;
+ false ->
+ case 0 == dict:size(JEntries) of
+ true ->
+ store_segment(Segment, StateN);
+ false ->
+ {Hdl, Segment1} = get_segment_handle(Segment),
+ dict:fold(fun write_entry_to_segment/3,
+ Hdl, JEntries),
+ ok = file_handle_cache:sync(Hdl),
+ store_segment(
+ Segment1 #segment { journal_entries =
+ dict:new() }, StateN)
+ end
+ end
+ end, State #qistate { segments = dict:new() }, Segments),
+ {JournalHdl, State2} = get_journal_handle(State1),
+ {ok, 0} = file_handle_cache:position(JournalHdl, bof),
+ ok = file_handle_cache:truncate(JournalHdl),
+ ok = file_handle_cache:sync(JournalHdl),
+ State2 #qistate { dirty_count = 0 }.
read_segment_entries(InitSeqId, State) ->
- {SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
- load_segment(SegNum, State),
+ {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
+ {SegDict, _PubCount, _AckCount, State1} =
+ load_segment(Seg, false, State),
+ #segment { journal_entries = JEntries } = find_segment(Seg, State1),
+ SegDict1 = journal_plus_segment(JEntries, SegDict),
%% deliberately sort the list desc, because foldl will reverse it
- RelSeqs = rev_sort(dict:fetch_keys(SDict)),
+ RelSeqs = rev_sort(dict:fetch_keys(SegDict1)),
{lists:foldl(fun (RelSeq, Acc) ->
- {MsgId, IsDelivered, IsPersistent} =
- dict:fetch(RelSeq, SDict),
- [ {MsgId, reconstruct_seq_id(SegNum, RelSeq),
- IsPersistent, IsDelivered} | Acc]
+ {{MsgId, IsPersistent}, IsDelivered, no_ack} =
+ dict:fetch(RelSeq, SegDict1),
+ [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
+ IsPersistent, IsDelivered == del} | Acc ]
end, [], RelSeqs),
State1}.
next_segment_boundary(SeqId) ->
- {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- reconstruct_seq_id(SegNum + 1, 0).
+ {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(Seg + 1, 0).
segment_size() ->
?SEGMENT_ENTRY_COUNT.
-find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
- SegNums = all_segment_nums(Dir),
+find_lowest_seq_id_seg_and_next_seq_id(State) ->
+ SegNums = all_segment_nums(State),
%% We don't want the lowest seq_id, merely the seq_id of the start
%% of the lowest segment. That seq_id may not actually exist, but
%% that's fine. The important thing is that the segment exists and
%% the seq_id reported is on a segment boundary.
+ %% We also don't really care about the max seq_id. Just start the
+ %% next segment: it makes life much easier.
+
%% SegNums is sorted, ascending.
- LowSeqIdSeg =
+ {LowSeqIdSeg, NextSeqId} =
case SegNums of
- [] -> 0;
- [MinSegNum|_] -> reconstruct_seq_id(MinSegNum, 0)
+ [] -> {0, 0};
+ [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0),
+ reconstruct_seq_id(1 + lists:last(SegNums), 0)}
end,
- {NextSeqId, State1} =
- case SegNums of
- [] -> {0, State};
- _ -> MaxSegNum = lists:last(SegNums),
- {_SDict, PubCount, _AckCount, HighRelSeq, State2} =
- load_segment(MaxSegNum, State),
- NextSeqId1 = reconstruct_seq_id(MaxSegNum, HighRelSeq),
- NextSeqId2 = case PubCount of
- 0 -> NextSeqId1;
- _ -> NextSeqId1 + 1
- end,
- {NextSeqId2, State2}
- end,
- {LowSeqIdSeg, NextSeqId, State1}.
+ {LowSeqIdSeg, NextSeqId, State}.
start_msg_store(DurableQueues) ->
- DurableDict =
+ DurableDict =
dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
Queue #amqqueue.name} || Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
@@ -337,175 +346,84 @@ start_msg_store(DurableQueues) ->
ok.
%%----------------------------------------------------------------------------
-%% Minor Helpers
+%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
-maybe_flush(State = #qistate { journal_count = JCount })
- when JCount > ?MAX_JOURNAL_ENTRY_COUNT ->
- flush_journal(State);
-maybe_flush(State) ->
- State.
-
-write_to_journal(BinList, SeqIds, Dict,
- State = #qistate { journal_count = JCount }) ->
- {Hdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(Hdl, BinList),
- {Dict1, JCount1} =
- lists:foldl(
- fun (SeqId, {Dict2, JCount2}) ->
- {add_seqid_to_dict(SeqId, Dict2), JCount2 + 1}
- end, {Dict, JCount}, SeqIds),
- {Dict1, State1 #qistate { journal_count = JCount1 }}.
-
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
- lists:flatten(io_lib:format("~.36B", [Num])).
-
-queues_dir() ->
- filename:join(rabbit_mnesia:dir(), "queues").
+queue_index_walker([]) ->
+ finished;
+queue_index_walker([QueueName|QueueNames]) ->
+ State = blank_state(QueueName),
+ State1 = load_journal(State),
+ SegNums = all_segment_nums(State1),
+ queue_index_walker({SegNums, State1, QueueNames});
-rev_sort(List) ->
- lists:sort(fun (A, B) -> B < A end, List).
+queue_index_walker({[], State, QueueNames}) ->
+ _State = terminate(false, State),
+ queue_index_walker(QueueNames);
+queue_index_walker({[Seg | SegNums], State, QueueNames}) ->
+ SeqId = reconstruct_seq_id(Seg, 0),
+ {Messages, State1} = read_segment_entries(SeqId, State),
+ queue_index_walker({Messages, State1, SegNums, QueueNames});
-get_journal_handle(State = #qistate { dir = Dir, seg_num_handles = SegHdls }) ->
- case dict:find(journal, SegHdls) of
- {ok, Hdl} ->
- {Hdl, State};
- error ->
- Path = filename:join(Dir, ?JOURNAL_FILENAME),
- Mode = [raw, binary, write, read, read_ahead],
- new_handle(journal, Path, Mode, State)
+queue_index_walker({[], State, SegNums, QueueNames}) ->
+ queue_index_walker({SegNums, State, QueueNames});
+queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs],
+ State, SegNums, QueueNames}) ->
+ case IsPersistent of
+ true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
+ false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
end.
-get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) ->
- {State1, PubHandle1 = {_SegNum, Hdl, _Count}} =
- get_counted_handle(SegNum, State, PubHandle),
- {Hdl, State1 #qistate { publish_handle = PubHandle1 }}.
-
-get_counted_handle(SegNum, State, undefined) ->
- get_counted_handle(SegNum, State, {SegNum, undefined, 0});
-get_counted_handle(SegNum, State = #qistate { partial_segments = Partials },
- {SegNum, undefined, Count}) ->
- {Hdl, State1} = get_seg_handle(SegNum, State),
- {CountExtra, Partials1} =
- case dict:find(SegNum, Partials) of
- {ok, CountExtra1} -> {CountExtra1, dict:erase(SegNum, Partials)};
- error -> {0, Partials}
- end,
- Count1 = Count + 1 + CountExtra,
- {State1 #qistate { partial_segments = Partials1 }, {SegNum, Hdl, Count1}};
-get_counted_handle(SegNum, State, {SegNum, Hdl, Count})
- when Count < ?SEGMENT_ENTRY_COUNT ->
- {State, {SegNum, Hdl, Count + 1}};
-get_counted_handle(SegNumA, State, {SegNumB, Hdl, ?SEGMENT_ENTRY_COUNT})
- when SegNumA == SegNumB + 1 ->
- ok = file_handle_cache:append_write_buffer(Hdl),
- get_counted_handle(SegNumA, State, undefined);
-get_counted_handle(SegNumA, State = #qistate { partial_segments = Partials,
- seg_ack_counts = AckCounts },
- {SegNumB, _Hdl, Count}) ->
- %% don't flush here because it's possible SegNumB has been deleted
- State1 =
- case dict:find(SegNumB, AckCounts) of
- {ok, Count} ->
- %% #acks == #pubs, and we're moving to different
- %% segment, so delete.
- delete_segment(SegNumB, State);
- _ ->
- State #qistate {
- partial_segments = dict:store(SegNumB, Count, Partials) }
- end,
- get_counted_handle(SegNumA, State1, undefined).
+%%----------------------------------------------------------------------------
+%% Minors
+%%----------------------------------------------------------------------------
-get_seg_handle(SegNum, State = #qistate { dir = Dir, seg_num_handles = SegHdls }) ->
- case dict:find(SegNum, SegHdls) of
- {ok, Hdl} ->
- {Hdl, State};
- error ->
- new_handle(SegNum, seg_num_to_path(Dir, SegNum),
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
- State)
- end.
+maybe_flush_journal(State = #qistate { dirty_count = DCount })
+ when DCount > ?MAX_JOURNAL_ENTRY_COUNT ->
+ flush_journal(State);
+maybe_flush_journal(State) ->
+ State.
-delete_segment(SegNum, State = #qistate { dir = Dir,
- seg_ack_counts = AckCounts,
- partial_segments = Partials }) ->
- State1 = close_handle(SegNum, State),
- ok = case file:delete(seg_num_to_path(Dir, SegNum)) of
- ok -> ok;
- {error, enoent} -> ok
- end,
- State1 #qistate {seg_ack_counts = dict:erase(SegNum, AckCounts),
- partial_segments = dict:erase(SegNum, Partials) }.
-
-new_handle(Key, Path, Mode, State = #qistate { seg_num_handles = SegHdls }) ->
- {ok, Hdl} = file_handle_cache:open(Path, Mode, [{write_buffer, infinity}]),
- {Hdl, State #qistate { seg_num_handles = dict:store(Key, Hdl, SegHdls) }}.
-
-close_handle(Key, State = #qistate { seg_num_handles = SegHdls }) ->
- case dict:find(Key, SegHdls) of
- {ok, Hdl} ->
- ok = file_handle_cache:close(Hdl),
- State #qistate { seg_num_handles = dict:erase(Key, SegHdls) };
- error ->
- State
- end.
+all_segment_nums(#qistate { segments = Segments, dir = Dir }) ->
+ sets:to_list(
+ lists:foldl(
+ fun (SegName, Set) ->
+ sets:add_element(
+ list_to_integer(
+ lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end,
+ SegName)), Set)
+ end, sets:from_list(dict:fetch_keys(Segments)),
+ filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir))).
-close_all_handles(State = #qistate { seg_num_handles = SegHdls }) ->
- ok = dict:fold(fun (_Key, Hdl, ok) ->
- file_handle_cache:close(Hdl)
- end, ok, SegHdls),
- State #qistate { seg_num_handles = dict:new() }.
+blank_state(QueueName) ->
+ StrName = queue_name_to_dir_name(QueueName),
+ Dir = filename:join(queues_dir(), StrName),
+ ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+ #qistate { dir = Dir,
+ segments = dict:new(),
+ journal_handle = undefined,
+ dirty_count = 0
+ }.
-bool_to_int(true ) -> 1;
-bool_to_int(false) -> 0.
+rev_sort(List) ->
+ lists:sort(fun (A, B) -> B < A end, List).
seq_id_to_seg_and_rel_seq_id(SeqId) ->
{ SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
-reconstruct_seq_id(SegNum, RelSeq) ->
- (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq.
+reconstruct_seq_id(Seg, RelSeq) ->
+ (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq.
-seg_num_to_path(Dir, SegNum) ->
- SegName = integer_to_list(SegNum),
+seg_num_to_path(Dir, Seg) ->
+ SegName = integer_to_list(Seg),
filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
-delete_queue_directory(Dir) ->
- {ok, Entries} = file:list_dir(Dir),
- ok = lists:foldl(fun (Entry, ok) ->
- file:delete(filename:join(Dir, Entry))
- end, ok, Entries),
- ok = file:del_dir(Dir).
-
-add_seqid_to_dict(SeqId, Dict) ->
- {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- add_seqid_to_dict(SegNum, RelSeq, Dict).
-
-add_seqid_to_dict(SegNum, RelSeq, Dict) ->
- dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], Dict).
-
-all_segment_nums(Dir) ->
- lists:sort(
- [list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
- || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]).
-
-blank_state(QueueName) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- #qistate { dir = Dir,
- seg_num_handles = dict:new(),
- journal_count = 0,
- journal_ack_dict = dict:new(),
- journal_del_dict = dict:new(),
- seg_ack_counts = dict:new(),
- publish_handle = undefined,
- partial_segments = dict:new()
- }.
+delete_segment(#segment { handle = undefined }) ->
+ ok;
+delete_segment(#segment { handle = Hdl, path = Path }) ->
+ ok = file_handle_cache:close(Hdl),
+ ok = file:delete(Path),
+ ok.
detect_clean_shutdown(Dir) ->
case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
@@ -519,258 +437,143 @@ store_clean_shutdown(Dir) ->
[{write_buffer, unbuffered}]),
ok = file_handle_cache:close(Hdl).
-seg_entries_from_dict(SegNum, Dict) ->
- case dict:find(SegNum, Dict) of
- {ok, Entries} -> Entries;
- error -> []
- end.
-
-
-%%----------------------------------------------------------------------------
-%% Msg Store Startup Delta Function
-%%----------------------------------------------------------------------------
-
-queue_index_walker([]) ->
- finished;
-queue_index_walker([QueueName|QueueNames]) ->
- State = blank_state(QueueName),
- {Hdl, State1} = get_journal_handle(State),
- {JAckDict, _JDelDict} = load_journal(Hdl, dict:new(), dict:new()),
- State2 = #qistate { dir = Dir } =
- close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }),
- SegNums = all_segment_nums(Dir),
- queue_index_walker({SegNums, State2, QueueNames});
-
-queue_index_walker({[], State, QueueNames}) ->
- _State = terminate(State),
- queue_index_walker(QueueNames);
-queue_index_walker({[SegNum | SegNums], State, QueueNames}) ->
- {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
- load_segment(SegNum, State),
- queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames});
-
-queue_index_walker({[], State, SegNums, QueueNames}) ->
- queue_index_walker({SegNums, State, QueueNames});
-queue_index_walker({[{_RelSeq, {MsgId, _IsDelivered, IsPersistent}} | Msgs],
- State, SegNums, QueueNames}) ->
- case IsPersistent of
- true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
- false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
- end.
+queue_name_to_dir_name(Name = #resource { kind = queue }) ->
+ Bin = term_to_binary(Name),
+ Size = 8*size(Bin),
+ <<Num:Size>> = Bin,
+ lists:flatten(io_lib:format("~.36B", [Num])).
+queues_dir() ->
+ filename:join(rabbit_mnesia:dir(), "queues").
-%%----------------------------------------------------------------------------
-%% Startup Functions
-%%----------------------------------------------------------------------------
+delete_queue_directory(Dir) ->
+ {ok, Entries} = file:list_dir(Dir),
+ ok = lists:foldl(fun (Entry, ok) ->
+ file:delete(filename:join(Dir, Entry))
+ end, ok, Entries),
+ ok = file:del_dir(Dir).
-read_and_prune_segments(State = #qistate { dir = Dir }) ->
- SegNums = all_segment_nums(Dir),
- CleanShutdown = detect_clean_shutdown(Dir),
- {TotalMsgCount, State1} =
- lists:foldl(
- fun (SegNum, {TotalMsgCount1, StateN =
- #qistate { publish_handle = PublishHandle,
- partial_segments = Partials }}) ->
- {SDict, PubCount, AckCount, _HighRelSeq, StateM} =
- load_segment(SegNum, StateN),
- StateL = #qistate { seg_ack_counts = AckCounts } =
- drop_and_deliver(SegNum, SDict, CleanShutdown, StateM),
- %% ignore the effect of drop_and_deliver on
- %% TotalMsgCount and AckCounts, as drop_and_deliver
- %% will add to the journal dicts, which will then
- %% effect TotalMsgCount when we scatter the journal
- TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict),
- AckCounts1 = case AckCount of
- 0 -> AckCounts;
- N -> dict:store(SegNum, N, AckCounts)
- end,
- %% In the following, whilst there may be several
- %% partial segments, we only remember the last
- %% one. All other partial segments get added into
- %% the partial_segments dict
- {PublishHandle1, Partials1} =
- case PubCount of
- ?SEGMENT_ENTRY_COUNT ->
- {PublishHandle, Partials};
- 0 ->
- {PublishHandle, Partials};
- _ ->
- {{SegNum, undefined, PubCount},
- case PublishHandle of
- undefined ->
- Partials;
- {SegNumOld, undefined, PubCountOld} ->
- dict:store(SegNumOld, PubCountOld,
- Partials)
- end}
- end,
- {TotalMsgCount2,
- StateL #qistate { seg_ack_counts = AckCounts1,
- publish_handle = PublishHandle1,
- partial_segments = Partials1 }}
- end, {0, State}, SegNums),
- {TotalMsgCount, State1}.
-
-scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
- {Hdl, State1 = #qistate { journal_del_dict = JDelDict,
- journal_ack_dict = JAckDict }} =
- get_journal_handle(State),
- %% ADict and DDict may well contain duplicates. However, this is
- %% ok, because we use sets to eliminate dups before writing to
- %% segments
- {ADict, DDict} = load_journal(Hdl, JAckDict, JDelDict),
- State2 = close_handle(journal, State1),
- {TotalMsgCount1, ADict1, State3} =
- dict:fold(fun replay_journal_to_segment/3,
- {TotalMsgCount, ADict,
- %% supply empty dicts so that when
- %% replay_journal_to_segment loads segments, it
- %% gets all msgs, and ignores anything we've found
- %% in the journal.
- State2 #qistate { journal_del_dict = dict:new(),
- journal_ack_dict = dict:new() }}, DDict),
- %% replay for segments which only had acks, and no deliveries
- {TotalMsgCount2, State4} =
- dict:fold(fun replay_journal_acks_to_segment/3,
- {TotalMsgCount1, State3}, ADict1),
- JournalPath = filename:join(Dir, ?JOURNAL_FILENAME),
- ok = file:delete(JournalPath),
- {TotalMsgCount2, State4}.
-
-load_journal(Hdl, ADict, DDict) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<?DEL_BIT:1, SeqId:?SEQ_BITS>>} ->
- load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict));
- {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} ->
- load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict);
- _ErrOrEoF ->
- {ADict, DDict}
+get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
+ {ok, Hdl} = file_handle_cache:open(Path,
+ [binary, raw, read, write,
+ {read_ahead, ?SEGMENT_TOTAL_SIZE}],
+ [{write_buffer, infinity}]),
+ {Hdl, Segment #segment { handle = Hdl }};
+get_segment_handle(Segment = #segment { handle = Hdl }) ->
+ {Hdl, Segment}.
+
+find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
+ case dict:find(Seg, Segments) of
+ {ok, Segment = #segment{}} -> Segment;
+ error -> #segment { pubs = 0,
+ acks = 0,
+ handle = undefined,
+ journal_entries = dict:new(),
+ path = seg_num_to_path(Dir, Seg),
+ num = Seg
+ }
end.
-replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) ->
- {TotalMsgCount, ADict, State};
-replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) ->
- {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
- load_segment(SegNum, State),
- ValidDels = sets:to_list(
- sets:filter(
- fun (RelSeq) ->
- case dict:find(RelSeq, SDict) of
- {ok, {_MsgId, false, _IsPersistent}} -> true;
- _ -> false
- end
- end, sets:from_list(Dels))),
- State2 = append_dels_to_segment(SegNum, ValidDels, State1),
- Acks = seg_entries_from_dict(SegNum, ADict),
- case Acks of
- [] -> {TotalMsgCount, ADict, State2};
- _ -> ADict1 = dict:erase(SegNum, ADict),
- {Count, State3} =
- filter_acks_and_append_to_segment(SegNum, SDict,
- Acks, State2),
- {TotalMsgCount - Count, ADict1, State3}
- end.
+store_segment(Segment = #segment { num = Seg },
+ State = #qistate { segments = Segments }) ->
+ State #qistate { segments = dict:store(Seg, Segment, Segments) }.
+
+get_journal_handle(State =
+ #qistate { journal_handle = undefined, dir = Dir }) ->
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ {ok, Hdl} = file_handle_cache:open(Path,
+ [binary, raw, read, write,
+ {read_ahead, ?SEGMENT_TOTAL_SIZE}],
+ [{write_buffer, infinity}]),
+ {Hdl, State #qistate { journal_handle = Hdl }};
+get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
+ {Hdl, State}.
-replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) ->
- {TotalMsgCount, State};
-replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) ->
- {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
- load_segment(SegNum, State),
- {Count, State2} =
- filter_acks_and_append_to_segment(SegNum, SDict, Acks, State1),
- {TotalMsgCount - Count, State2}.
-
-filter_acks_and_append_to_segment(SegNum, SDict, Acks, State) ->
- ValidRelSeqIds = dict:fetch_keys(SDict),
- ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds),
- sets:from_list(Acks))),
- {length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State)}.
-
-drop_and_deliver(SegNum, SDict, CleanShutdown,
- State = #qistate { journal_del_dict = JDelDict,
- journal_ack_dict = JAckDict }) ->
- {JDelDict1, JAckDict1} =
- dict:fold(
- fun (RelSeq, {MsgId, IsDelivered, true}, {JDelDict2, JAckDict2}) ->
- %% msg is persistent, keep only if the msg_store has it
- case {IsDelivered, rabbit_msg_store:contains(MsgId)} of
- {false, true} when not CleanShutdown ->
- %% not delivered, but dirty shutdown => mark delivered
- {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
- JAckDict2};
- {_, true} ->
- {JDelDict2, JAckDict2};
- {true, false} ->
- {JDelDict2,
- add_seqid_to_dict(SegNum, RelSeq, JAckDict2)};
- {false, false} ->
- {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
- add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}
- end;
- (RelSeq, {_MsgId, false, false}, {JDelDict2, JAckDict2}) ->
- %% not persistent and not delivered => deliver and ack it
- {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
- add_seqid_to_dict(SegNum, RelSeq, JAckDict2)};
- (RelSeq, {_MsgId, true, false}, {JDelDict2, JAckDict2}) ->
- %% not persistent but delivered => ack it
- {JDelDict2,
- add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}
- end, {JDelDict, JAckDict}, SDict),
- State #qistate { journal_del_dict = JDelDict1,
- journal_ack_dict = JAckDict1 }.
+bool_to_int(true ) -> 1;
+bool_to_int(false) -> 0.
+write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) ->
+ Hdl;
+write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
+ ok = case Publish of
+ no_pub ->
+ ok;
+ {MsgId, IsPersistent} ->
+ file_handle_cache:append(
+ Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>, MsgId])
+ end,
+ ok = case {Del, Ack} of
+ {no_del, no_ack} -> ok;
+ _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ Data = case {Del, Ack} of
+ {del, ack} -> [Binary, Binary];
+ _ -> Binary
+ end,
+ file_handle_cache:append(Hdl, Data)
+ end,
+ Hdl.
+
+terminate(StoreShutdown, State =
+ #qistate { segments = Segments, journal_handle = JournalHdl,
+ dir = Dir }) ->
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ ok = dict:fold(
+ fun (_Seg, #segment { handle = undefined }, ok) ->
+ ok;
+ (_Seg, #segment { handle = Hdl }, ok) ->
+ file_handle_cache:close(Hdl)
+ end, ok, Segments),
+ case StoreShutdown of
+ true -> store_clean_shutdown(Dir);
+ false -> ok
+ end,
+ State #qistate { journal_handle = undefined, segments = dict:new() }.
%%----------------------------------------------------------------------------
-%% Loading Segments
+%% Majors
%%----------------------------------------------------------------------------
-load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
- dir = Dir }) ->
- SegmentExists = case dict:find(SegNum, SegHdls) of
- {ok, _} -> true;
- error -> filelib:is_file(seg_num_to_path(Dir, SegNum))
+%% Loading segments
+
+%% Does not do any combining with the journal at all. The PubCount
+%% that comes back is the number of publishes in the segment. The
+%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
+%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks
+%% is true, then dict:size(SegDict) == PubCount.
+load_segment(Seg, KeepAcks, State) ->
+ Segment = #segment { path = Path, handle = SegHdl } =
+ find_segment(Seg, State),
+ SegmentExists = case SegHdl of
+ undefined -> filelib:is_file(Path);
+ _ -> true
end,
case SegmentExists of
false ->
- {dict:new(), 0, 0, 0, State};
+ {dict:new(), 0, 0, State};
true ->
- {Hdl, State1 = #qistate { journal_del_dict = JDelDict,
- journal_ack_dict = JAckDict }} =
- get_seg_handle(SegNum, State),
+ {Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- {SDict, PubCount, AckCount, HighRelSeq} =
- load_segment_entries(Hdl, dict:new(), 0, 0, 0),
- %% delete ack'd msgs first
- {SDict1, AckCount1} =
- lists:foldl(fun (RelSeq, {SDict2, AckCount2}) ->
- {dict:erase(RelSeq, SDict2), AckCount2 + 1}
- end, {SDict, AckCount},
- seg_entries_from_dict(SegNum, JAckDict)),
- %% ensure remaining msgs are delivered as necessary
- SDict3 =
- lists:foldl(
- fun (RelSeq, SDict4) ->
- case dict:find(RelSeq, SDict4) of
- {ok, {MsgId, false, IsPersistent}} ->
- dict:store(RelSeq,
- {MsgId, true, IsPersistent},
- SDict4);
- _ ->
- SDict4
- end
- end, SDict1, seg_entries_from_dict(SegNum, JDelDict)),
- {SDict3, PubCount, AckCount1, HighRelSeq, State1}
+ {SegDict, PubCount, AckCount} =
+ load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0),
+ {SegDict, PubCount, AckCount, store_segment(Segment1, State)}
end.
-load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) ->
+load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) ->
case file_handle_cache:read(Hdl, 1) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
{ok, LSB} = file_handle_cache:read(
Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
- load_segment_entries(Hdl, SDict1, PubCount, AckCount1, HighRelSeq);
+ {AckCount1, SegDict1} =
+ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict),
+ load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
%% because we specify /binary, and binaries are complete
@@ -779,71 +582,252 @@ load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) ->
file_handle_cache:read(
Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
- load_segment_entries(
- Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum},
- SDict), PubCount + 1, AckCount, HighRelSeq1);
+ SegDict1 =
+ dict:store(RelSeq,
+ {{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
+ SegDict),
+ load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount);
_ErrOrEoF ->
- {SDict, PubCount, AckCount, HighRelSeq}
+ {SegDict, PubCount, AckCount}
end.
-deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
- case dict:find(RelSeq, SDict) of
- {ok, {MsgId, false, IsPersistent}} ->
- {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount};
- {ok, {_MsgId, true, _IsPersistent}} ->
- {dict:erase(RelSeq, SDict), AckCount + 1}
+deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
+ case dict:find(RelSeq, SegDict) of
+ {ok, {PubRecord, no_del, no_ack}} ->
+ {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)};
+ {ok, {PubRecord, del, no_ack}} when KeepAcks ->
+ {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)};
+ {ok, {_PubRecord, del, no_ack}} ->
+ {AckCount + 1, dict:erase(RelSeq, SegDict)}
end.
+%% Loading Journal. This isn't idempotent and will mess up the counts
+%% if you call it more than once on the same state. Assumes the counts
+%% are 0 to start with.
+
+load_journal(State) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ dict:fold(
+ fun (Seg, #segment { journal_entries = JEntries,
+ pubs = PubCountInJournal,
+ acks = AckCountInJournal }, StateN) ->
+ %% We want to keep acks in so that we can remove them if
+ %% duplicates are in the journal. The counts here are
+ %% purely from the segment itself.
+ {SegDict, PubCountInSeg, AckCountInSeg, StateN1} =
+ load_segment(Seg, true, StateN),
+ %% Removed counts here are the number of pubs and acks
+ %% that are duplicates - i.e. found in both the segment
+ %% and journal.
+ {JEntries1, PubsRemoved, AcksRemoved} =
+ journal_minus_segment(JEntries, SegDict),
+ Segment1 = find_segment(Seg, StateN1),
+ PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
+ AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
+ store_segment(Segment1 #segment { journal_entries = JEntries1,
+ pubs = PubCount1,
+ acks = AckCount1 }, StateN1)
+ end, State2, Segments).
+
+load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
+ case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
+ {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
+ case Prefix of
+ ?DEL_JPREFIX ->
+ load_journal_entries(add_to_journal(SeqId, del, State));
+ ?ACK_JPREFIX ->
+ load_journal_entries(add_to_journal(SeqId, ack, State));
+ _ ->
+ case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of
+ {ok, <<MsgIdNum:?MSG_ID_BITS>>} ->
+ %% work around for binary data
+ %% fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<MsgId:?MSG_ID_BYTES/binary>> =
+ <<MsgIdNum:?MSG_ID_BITS>>,
+ Publish = {MsgId,
+ case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end},
+ load_journal_entries(
+ add_to_journal(SeqId, Publish, State));
+ _ErrOrEoF -> %% err, we've lost at least a publish
+ State
+ end
+ end;
+ _ErrOrEoF -> State
+ end.
-%%----------------------------------------------------------------------------
-%% Appending Acks or Dels to Segments
-%%----------------------------------------------------------------------------
-
-append_acks_to_segment(SegNum, Acks,
- State = #qistate { seg_ack_counts = AckCounts,
- partial_segments = Partials }) ->
- AckCount = case dict:find(SegNum, AckCounts) of
- {ok, AckCount1} -> AckCount1;
- error -> 0
- end,
- AckTarget = case dict:find(SegNum, Partials) of
- {ok, PubCount} -> PubCount;
- error -> ?SEGMENT_ENTRY_COUNT
- end,
- AckCount2 = AckCount + length(Acks),
- append_acks_to_segment(SegNum, AckCount2, Acks, AckTarget, State).
-
-append_acks_to_segment(SegNum, AckCount, _Acks, AckCount, State =
- #qistate { publish_handle = PubHdl }) ->
- PubHdl1 = case PubHdl of
- %% If we're adjusting the pubhdl here then there
- %% will be no entry in partials, thus the target ack
- %% count must be SEGMENT_ENTRY_COUNT
- {SegNum, Hdl, AckCount = ?SEGMENT_ENTRY_COUNT}
- when Hdl /= undefined ->
- {SegNum + 1, undefined, 0};
- _ ->
- PubHdl
- end,
- delete_segment(SegNum, State #qistate { publish_handle = PubHdl1 });
-append_acks_to_segment(_SegNum, _AckCount, [], _AckTarget, State) ->
- State;
-append_acks_to_segment(SegNum, AckCount, Acks, AckTarget, State =
- #qistate { seg_ack_counts = AckCounts })
- when AckCount < AckTarget ->
- {Hdl, State1} = append_to_segment(SegNum, Acks, State),
- ok = file_handle_cache:sync(Hdl),
- State1 #qistate { seg_ack_counts =
- dict:store(SegNum, AckCount, AckCounts) }.
-
-append_dels_to_segment(SegNum, Dels, State) ->
- {_Hdl, State1} = append_to_segment(SegNum, Dels, State),
- State1.
+add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
+ {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ Segment = #segment { journal_entries = SegJDict,
+ pubs = PubCount, acks = AckCount } =
+ find_segment(Seg, State),
+ SegJDict1 = add_to_journal(RelSeq, Action, SegJDict),
+ Segment1 = Segment #segment { journal_entries = SegJDict1 },
+ Segment2 =
+ case Action of
+ del -> Segment1;
+ ack -> Segment1 #segment { acks = AckCount + 1 };
+ {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
+ end,
+ store_segment(Segment2, State #qistate { dirty_count = DCount + 1 });
+
+%% This is a more relaxed version of deliver_or_ack_msg because we can
+%% have dels or acks in the journal without the corresponding
+%% pub. Also, always want to keep acks. Things must occur in the right
+%% order though.
+add_to_journal(RelSeq, Action, SegJDict) ->
+ case dict:find(RelSeq, SegJDict) of
+ {ok, {PubRecord, no_del, no_ack}} when Action == del ->
+ dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict);
+ {ok, {PubRecord, DelRecord, no_ack}} when Action == ack ->
+ dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict);
+ error when Action == del ->
+ dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict);
+ error when Action == ack ->
+ dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict);
+ error ->
+ {_MsgId, _IsPersistent} = Action, %% ASSERTION
+ dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict)
+ end.
-append_to_segment(SegNum, AcksOrDels, State) ->
- {Hdl, State1} = get_seg_handle(SegNum, State),
- ok = file_handle_cache:append(
- Hdl, [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> || RelSeq <- AcksOrDels ]),
- {Hdl, State1}.
+%% Combine what we have just read from a segment file with what we're
+%% holding for that segment in memory. There must be no
+%% duplicates. Used when providing segment entries to the variable
+%% queue.
+journal_plus_segment(JEntries, SegDict) ->
+ dict:fold(fun (RelSeq, JObj, SegDictOut) ->
+ SegEntry = case dict:find(RelSeq, SegDictOut) of
+ error -> not_found;
+ {ok, SObj = {_, _, _}} -> SObj
+ end,
+ journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut)
+ end, SegDict, JEntries).
+
+%% Here, the OutDict is the SegDict which we may be adding to (for
+%% items only in the journal), modifying (bits in both), or erasing
+%% from (ack in journal, not segment).
+journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
+ not_found,
+ RelSeq, OutDict) ->
+ dict:store(RelSeq, Obj, OutDict);
+journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
+ not_found,
+ RelSeq, OutDict) ->
+ dict:store(RelSeq, Obj, OutDict);
+journal_plus_segment({{_MsgId, _IsPersistent}, del, ack},
+ not_found,
+ RelSeq, OutDict) ->
+ dict:erase(RelSeq, OutDict);
+
+journal_plus_segment({no_pub, del, no_ack},
+ {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict) ->
+ dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict);
+
+journal_plus_segment({no_pub, del, ack},
+ {{_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict) ->
+ dict:erase(RelSeq, OutDict);
+journal_plus_segment({no_pub, no_del, ack},
+ {{_MsgId, _IsPersistent}, del, no_ack},
+ RelSeq, OutDict) ->
+ dict:erase(RelSeq, OutDict).
+
+
+%% Remove from the journal entries for a segment, items that are
+%% duplicates of entries found in the segment itself. Used on start up
+%% to clean up the journal.
+journal_minus_segment(JEntries, SegDict) ->
+ dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) ->
+ SegEntry = case dict:find(RelSeq, SegDict) of
+ error -> not_found;
+ {ok, SObj = {_, _, _}} -> SObj
+ end,
+ journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut,
+ PubsRemoved, AcksRemoved)
+ end, {dict:new(), 0, 0}, JEntries).
+
+%% Here, the OutDict is a fresh journal that we're filling with valid
+%% entries. PubsRemoved and AcksRemoved only get increased when the a
+%% publish or ack is in both the journal and the segment.
+
+%% Both the same. Must be at least the publish
+journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack},
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved + 1, AcksRemoved};
+journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack},
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved + 1, AcksRemoved + 1};
+
+%% Just publish in journal
+journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
+ not_found,
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
+
+%% Just deliver in journal
+journal_minus_segment(Obj = {no_pub, del, no_ack},
+ {{_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
+journal_minus_segment({no_pub, del, no_ack},
+ {{_MsgId, _IsPersistent}, del, no_ack},
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved, AcksRemoved};
+
+%% Just ack in journal
+journal_minus_segment(Obj = {no_pub, no_del, ack},
+ {{_MsgId, _IsPersistent}, del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
+journal_minus_segment({no_pub, no_del, ack},
+ {{_MsgId, _IsPersistent}, del, ack},
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved, AcksRemoved};
+
+%% Publish and deliver in journal
+journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
+ not_found,
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
+journal_minus_segment({PubRecord, del, no_ack},
+ {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict),
+ PubsRemoved + 1, AcksRemoved};
+
+%% Deliver and ack in journal
+journal_minus_segment(Obj = {no_pub, del, ack},
+ {{_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
+journal_minus_segment({no_pub, del, ack},
+ {{_MsgId, _IsPersistent}, del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict),
+ PubsRemoved, AcksRemoved};
+journal_minus_segment({no_pub, del, ack},
+ {{_MsgId, _IsPersistent}, del, ack},
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved, AcksRemoved + 1};
+
+%% Publish, deliver and ack in journal
+journal_minus_segment({{_MsgId, _IsPersistent}, del, ack},
+ not_found,
+ _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {OutDict, PubsRemoved, AcksRemoved};
+journal_minus_segment({PubRecord, del, ack},
+ {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, {no_pub, del, ack}, OutDict),
+ PubsRemoved + 1, AcksRemoved};
+journal_minus_segment({PubRecord, del, ack},
+ {PubRecord = {_MsgId, _IsPersistent}, del, no_ack},
+ RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
+ {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict),
+ PubsRemoved + 1, AcksRemoved}.
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl
deleted file mode 100644
index 43a210d950..0000000000
--- a/src/rabbit_queue_index3.erl
+++ /dev/null
@@ -1,850 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_queue_index3).
-
--export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, sync_seq_ids/2, flush_journal/1,
- read_segment_entries/2, next_segment_boundary/1, segment_size/0,
- find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
-
--define(CLEAN_FILENAME, "clean.dot").
-
-%%----------------------------------------------------------------------------
-%% ---- Journal details ----
-
--define(MAX_JOURNAL_ENTRY_COUNT, 32768).
--define(JOURNAL_FILENAME, "journal.jif").
-
--define(PUB_PERSIST_JPREFIX, 00).
--define(PUB_TRANS_JPREFIX, 01).
--define(DEL_JPREFIX, 10).
--define(ACK_JPREFIX, 11).
--define(JPREFIX_BITS, 2).
--define(SEQ_BYTES, 8).
--define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)).
-
-%% ---- Segment details ----
-
--define(SEGMENT_EXTENSION, ".idx").
-
--define(REL_SEQ_BITS, 14).
--define(REL_SEQ_BITS_BYTE_ALIGNED, (?REL_SEQ_BITS + 8 - (?REL_SEQ_BITS rem 8))).
--define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))).
-
-%% seq only is binary 00 followed by 14 bits of rel seq id
-%% (range: 0 - 16383)
--define(REL_SEQ_ONLY_PREFIX, 00).
--define(REL_SEQ_ONLY_PREFIX_BITS, 2).
--define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
-
-%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, and 128 bits of md5sum msg id
--define(PUBLISH_PREFIX, 1).
--define(PUBLISH_PREFIX_BITS, 1).
-
--define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
--define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
-%% 16 bytes for md5sum + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2).
-
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
-
-%%----------------------------------------------------------------------------
-
--record(qistate,
- { dir,
- segments,
- journal_handle,
- dirty_count
- }).
-
--record(segment,
- { pubs,
- acks,
- handle,
- journal_entries,
- path,
- num
- }).
-
--include("rabbit.hrl").
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(hdl() :: ('undefined' | any())).
--type(msg_id() :: binary()).
--type(seq_id() :: integer()).
--type(qistate() :: #qistate { dir :: file_path(),
- segments :: dict(),
- journal_handle :: hdl(),
- dirty_count :: integer()
- }).
-
--spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}).
--spec(terminate/1 :: (qistate()) -> qistate()).
--spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
--spec(write_published/4 :: (msg_id(), seq_id(), boolean(), qistate())
- -> qistate()).
--spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
--spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(sync_seq_ids/2 :: ([seq_id()], qistate()) -> qistate()).
--spec(flush_journal/1 :: (qistate()) -> qistate()).
--spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
- {[{msg_id(), seq_id(), boolean(), boolean()}], qistate()}).
--spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
--spec(segment_size/0 :: () -> non_neg_integer()).
--spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
- {non_neg_integer(), non_neg_integer(), qistate()}).
--spec(start_msg_store/1 :: ([amqqueue()]) -> 'ok').
-
--endif.
-
-
-%%----------------------------------------------------------------------------
-%% Public API
-%%----------------------------------------------------------------------------
-
-init(Name) ->
- State = blank_state(Name),
- %% 1. Load the journal completely. This will also load segments
- %% which have entries in the journal and remove duplicates.
- %% The counts will correctly reflect the combination of the
- %% segment and the journal.
- State1 = load_journal(State),
- %% 2. Flush the journal. This makes life easier for everyone, as
- %% it means there won't be any publishes in the journal alone.
- State2 = #qistate { dir = Dir } = flush_journal(State1),
- %% 3. Load each segment in turn and filter out messages that are
- %% not in the msg_store, by adding acks to the journal. These
- %% acks only go to the RAM journal as it doesn't matter if we
- %% lose them. Also mark delivered if not clean shutdown.
- AllSegs = all_segment_nums(Dir),
- CleanShutdown = detect_clean_shutdown(Dir),
- %% We know the journal is empty here, so we don't need to combine
- %% with the journal, and we don't need to worry about messages
- %% that have been acked.
- State3 =
- lists:foldl(
- fun (Seg, StateN) ->
- {SegDict, _PubCount, _AckCount, StateN1} =
- load_segment(Seg, false, StateN),
- dict:fold(
- fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
- StateM) ->
- SeqId = reconstruct_seq_id(Seg, RelSeq),
- InMsgStore = rabbit_msg_store:contains(MsgId),
- case {InMsgStore, CleanShutdown} of
- {true, true} ->
- StateM;
- {true, false} when Del == del ->
- StateM;
- {true, false} ->
- add_to_journal(SeqId, del, StateM);
- {false, _} when Del == del ->
- add_to_journal(SeqId, ack, StateM);
- {false, _} ->
- add_to_journal(
- SeqId, ack,
- add_to_journal(SeqId, del, StateM))
- end
- end, StateN1, SegDict)
- end, State2, AllSegs),
- %% 4. Go through all segments and calculate the number of unacked
- %% messages we have.
- Count = lists:foldl(
- fun (Seg, CountAcc) ->
- #segment { pubs = PubCount, acks = AckCount } =
- find_segment(Seg, State3),
- CountAcc + PubCount - AckCount
- end, 0, AllSegs),
- {Count, State3}.
-
-terminate(State) ->
- terminate(true, State).
-
-terminate_and_erase(State) ->
- State1 = terminate(State),
- ok = delete_queue_directory(State1 #qistate.dir),
- State1.
-
-write_published(MsgId, SeqId, IsPersistent, State)
- when is_binary(MsgId) ->
- ?MSG_ID_BYTES = size(MsgId),
- {JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(JournalHdl,
- [<<(case IsPersistent of
- true -> ?PUB_PERSIST_JPREFIX;
- false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>,
- MsgId]),
- maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)).
-
-write_delivered(SeqId, State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- ok = file_handle_cache:append(JournalHdl,
- <<?DEL_JPREFIX:?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>),
- maybe_flush_journal(add_to_journal(SeqId, del, State1)).
-
-write_acks(SeqIds, State) ->
- {SeqIds1, State1} = remove_pubs_dels_from_journal(SeqIds, State),
- case SeqIds1 of
- [] ->
- State;
- _ ->
- {JournalHdl, State2} = get_journal_handle(State1),
- ok = file_handle_cache:append(JournalHdl,
- [<<?ACK_JPREFIX:?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>
- || SeqId <- SeqIds1]),
- State3 = lists:foldl(fun (SeqId, StateN) ->
- add_to_journal(SeqId, ack, StateN)
- end, State2, SeqIds1),
- maybe_flush_journal(State3)
- end.
-
-sync_seq_ids(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
- ok = file_handle_cache:sync(JournalHdl),
- State.
-
-flush_journal(State = #qistate { dirty_count = 0 }) ->
- State;
-flush_journal(State = #qistate { segments = Segments }) ->
- State1 =
- dict:fold(
- fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount,
- acks = AckCount } = Segment, StateN) ->
- case dict:is_empty(JEntries) of
- true -> store_segment(Segment, StateN);
- false when AckCount == PubCount ->
- ok = delete_segment(Segment);
- false ->
- {Hdl, Segment1} = get_segment_handle(Segment),
- dict:fold(fun write_entry_to_segment/3,
- Hdl, JEntries),
- ok = file_handle_cache:sync(Hdl),
- store_segment(
- Segment1 #segment { journal_entries = dict:new() },
- StateN)
- end
- end, State, Segments),
- {JournalHdl, State2} = get_journal_handle(State1),
- {ok, 0} = file_handle_cache:position(JournalHdl, bof),
- ok = file_handle_cache:truncate(JournalHdl),
- ok = file_handle_cache:sync(JournalHdl),
- State2 #qistate { dirty_count = 0 }.
-
-read_segment_entries(InitSeqId, State) ->
- {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- {SegDict, _PubCount, _AckCount, State1} =
- load_segment(Seg, false, State),
- #segment { journal_entries = JEntries } = find_segment(Seg, State1),
- SegDict1 = journal_plus_segment(JEntries, SegDict),
- %% deliberately sort the list desc, because foldl will reverse it
- RelSeqs = rev_sort(dict:fetch_keys(SegDict1)),
- {lists:foldl(fun (RelSeq, Acc) ->
- {{MsgId, IsPersistent}, IsDelivered, no_ack} =
- dict:fetch(RelSeq, SegDict1),
- [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
- IsPersistent, IsDelivered} | Acc ]
- end, [], RelSeqs),
- State1}.
-
-next_segment_boundary(SeqId) ->
- {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- reconstruct_seq_id(Seg + 1, 0).
-
-segment_size() ->
- ?SEGMENT_ENTRY_COUNT.
-
-find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
- SegNums = all_segment_nums(Dir),
- %% We don't want the lowest seq_id, merely the seq_id of the start
- %% of the lowest segment. That seq_id may not actually exist, but
- %% that's fine. The important thing is that the segment exists and
- %% the seq_id reported is on a segment boundary.
-
- %% We also don't really care about the max seq_id. Just start the
- %% next segment: it makes life much easier.
-
- %% SegNums is sorted, ascending.
- {LowSeqIdSeg, NextSeqId} =
- case SegNums of
- [] -> {0, 0};
- [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0),
- reconstruct_seq_id(lists:last(SegNums), 0)}
- end,
- {LowSeqIdSeg, NextSeqId, State}.
-
-start_msg_store(DurableQueues) ->
- DurableDict =
- dict:from_list([ {queue_name_to_dir_name(Queue #amqqueue.name),
- Queue #amqqueue.name} || Queue <- DurableQueues ]),
- QueuesDir = queues_dir(),
- Directories = case file:list_dir(QueuesDir) of
- {ok, Entries} ->
- [ Entry || Entry <- Entries,
- filelib:is_dir(
- filename:join(QueuesDir, Entry)) ];
- {error, enoent} ->
- []
- end,
- DurableDirectories = sets:from_list(dict:fetch_keys(DurableDict)),
- {DurableQueueNames, TransientDirs} =
- lists:foldl(
- fun (QueueDir, {DurableAcc, TransientAcc}) ->
- case sets:is_element(QueueDir, DurableDirectories) of
- true ->
- {[dict:fetch(QueueDir, DurableDict) | DurableAcc],
- TransientAcc};
- false ->
- {DurableAcc, [QueueDir | TransientAcc]}
- end
- end, {[], []}, Directories),
- MsgStoreDir = filename:join(rabbit_mnesia:dir(), "msg_store"),
- ok = rabbit:start_child(rabbit_msg_store, [MsgStoreDir,
- fun queue_index_walker/1,
- DurableQueueNames]),
- lists:foreach(fun (DirName) ->
- Dir = filename:join(queues_dir(), DirName),
- ok = delete_queue_directory(Dir)
- end, TransientDirs),
- ok.
-
-%%----------------------------------------------------------------------------
-%% Msg Store Startup Delta Function
-%%----------------------------------------------------------------------------
-
-queue_index_walker([]) ->
- finished;
-queue_index_walker([QueueName|QueueNames]) ->
- State = #qistate { dir = Dir } = blank_state(QueueName),
- State1 = #qistate { journal_handle = JHdl } = load_journal(State),
- ok = file_handle_cache:close(JHdl),
- SegNums = all_segment_nums(Dir),
- queue_index_walker({SegNums, State1, QueueNames});
-
-queue_index_walker({[], State, QueueNames}) ->
- _State = terminate(false, State),
- queue_index_walker(QueueNames);
-queue_index_walker({[Seg | SegNums], State, QueueNames}) ->
- SeqId = reconstruct_seq_id(Seg, 0),
- {Messages, State1} = read_segment_entries(SeqId, State),
- queue_index_walker({Messages, State1, SegNums, QueueNames});
-
-queue_index_walker({[], State, SegNums, QueueNames}) ->
- queue_index_walker({SegNums, State, QueueNames});
-queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs],
- State, SegNums, QueueNames}) ->
- case IsPersistent of
- true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
- false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
- end.
-
-%%----------------------------------------------------------------------------
-%% Minors
-%%----------------------------------------------------------------------------
-
-maybe_flush_journal(State = #qistate { dirty_count = DCount })
- when DCount > ?MAX_JOURNAL_ENTRY_COUNT ->
- flush_journal(State);
-maybe_flush_journal(State) ->
- State.
-
-all_segment_nums(Dir) ->
- lists:sort(
- [list_to_integer(
- lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName))
- || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]).
-
-blank_state(QueueName) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- #qistate { dir = Dir,
- segments = dict:new(),
- journal_handle = undefined,
- dirty_count = 0
- }.
-
-rev_sort(List) ->
- lists:sort(fun (A, B) -> B < A end, List).
-
-seq_id_to_seg_and_rel_seq_id(SeqId) ->
- { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
-
-reconstruct_seq_id(Seg, RelSeq) ->
- (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq.
-
-seg_num_to_path(Dir, Seg) ->
- SegName = integer_to_list(Seg),
- filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
-
-delete_segment(#segment { handle = undefined }) ->
- ok;
-delete_segment(#segment { handle = Hdl, path = Path }) ->
- ok = file_handle_cache:close(Hdl),
- ok = file:delete(Path),
- ok.
-
-detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
- ok -> true;
- {error, enoent} -> false
- end.
-
-store_clean_shutdown(Dir) ->
- {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME),
- [write, raw, binary],
- [{write_buffer, unbuffered}]),
- ok = file_handle_cache:close(Hdl).
-
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
- lists:flatten(io_lib:format("~.36B", [Num])).
-
-queues_dir() ->
- filename:join(rabbit_mnesia:dir(), "queues").
-
-delete_queue_directory(Dir) ->
- {ok, Entries} = file:list_dir(Dir),
- ok = lists:foldl(fun (Entry, ok) ->
- file:delete(filename:join(Dir, Entry))
- end, ok, Entries),
- ok = file:del_dir(Dir).
-
-get_segment_handle(Segment = #segment { handle = undefined, path = Path }) ->
- {ok, Hdl} = file_handle_cache:open(Path,
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
- [{write_buffer, infinity}]),
- {Hdl, Segment #segment { handle = Hdl }};
-get_segment_handle(Segment = #segment { handle = Hdl }) ->
- {Hdl, Segment}.
-
-find_segment(Seg, #qistate { segments = Segments, dir = Dir }) ->
- case dict:find(Seg, Segments) of
- {ok, Segment = #segment{}} -> Segment;
- error -> #segment { pubs = 0,
- acks = 0,
- handle = undefined,
- journal_entries = dict:new(),
- path = seg_num_to_path(Dir, Seg),
- num = Seg
- }
- end.
-
-store_segment(Segment = #segment { num = Seg },
- State = #qistate { segments = Segments }) ->
- State #qistate { segments = dict:store(Seg, Segment, Segments) }.
-
-get_journal_handle(State =
- #qistate { journal_handle = undefined, dir = Dir }) ->
- Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path,
- [binary, raw, read, write,
- {read_ahead, ?SEGMENT_TOTAL_SIZE}],
- [{write_buffer, infinity}]),
- {Hdl, State #qistate { journal_handle = Hdl }};
-get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
- {Hdl, State}.
-
-bool_to_int(true ) -> 1;
-bool_to_int(false) -> 0.
-
-write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) ->
- ok = case Publish of
- no_pub ->
- ok;
- {MsgId, IsPersistent} ->
- file_handle_cache:append(
- Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, MsgId])
- end,
- ok = case {Del, Ack} of
- {no_del, no_ack} -> ok;
- _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- Data = case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end,
- file_handle_cache:append(Hdl, Data)
- end,
- Hdl.
-
-terminate(StoreShutdown, State =
- #qistate { segments = Segments, journal_handle = JournalHdl,
- dir = Dir }) ->
- ok = case JournalHdl of
- undefined -> ok;
- _ -> file_handle_cache:close(JournalHdl)
- end,
- ok = dict:fold(
- fun (_Seg, #segment { handle = undefined }, ok) ->
- ok;
- (_Seg, #segment { handle = Hdl }, ok) ->
- file_handle_cache:close(Hdl)
- end, ok, Segments),
- case StoreShutdown of
- true -> store_clean_shutdown(Dir);
- false -> ok
- end,
- State #qistate { journal_handle = undefined, segments = dict:new() }.
-
-remove_pubs_dels_from_journal(SeqIds, State) ->
- lists:foldl(
- fun (SeqId, {SeqIdsAcc, StateN}) ->
- {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- Segment = #segment { journal_entries = JEntries,
- acks = AckCount } =
- find_segment(Seg, StateN),
- case dict:find(RelSeq, JEntries) of
- {ok, {{_MsgId, _IsPersistent}, del, no_ack}} ->
- StateN1 =
- store_segment(
- Segment #segment { journal_entries =
- dict:erase(RelSeq, JEntries),
- acks = AckCount + 1 },
- StateN),
- {SeqIdsAcc, StateN1};
- _ ->
- {[SeqId | SeqIdsAcc], StateN}
- end
- end, {[], State}, SeqIds).
-
-%%----------------------------------------------------------------------------
-%% Majors
-%%----------------------------------------------------------------------------
-
-%% Loading segments
-
-%% Does not do any combining with the journal at all. The PubCount
-%% that comes back is the number of publishes in the segment. The
-%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
-%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks
-%% is true, then dict:size(SegDict) == PubCount.
-load_segment(Seg, KeepAcks, State) ->
- Segment = #segment { path = Path, handle = SegHdl } =
- find_segment(Seg, State),
- SegmentExists = case SegHdl of
- undefined -> filelib:is_file(Path);
- _ -> true
- end,
- case SegmentExists of
- false ->
- {dict:new(), 0, 0, State};
- true ->
- {Hdl, Segment1} = get_segment_handle(Segment),
- {ok, 0} = file_handle_cache:position(Hdl, bof),
- {SegDict, PubCount, AckCount} =
- load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0),
- {SegDict, PubCount, AckCount, store_segment(Segment1, State)}
- end.
-
-load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) ->
- case file_handle_cache:read(Hdl, 1) of
- {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
- {ok, LSB} = file_handle_cache:read(
- Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
- <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- {AckCount1, SegDict1} =
- deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict),
- load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1);
- {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
- %% because we specify /binary, and binaries are complete
- %% bytes, the size spec is in bytes, not bits.
- {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
- file_handle_cache:read(
- Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
- <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- SegDict1 =
- dict:store(RelSeq,
- {{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
- SegDict),
- load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount);
- _ErrOrEoF ->
- {SegDict, PubCount, AckCount}
- end.
-
-deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) ->
- case dict:find(RelSeq, SegDict) of
- {ok, {PubRecord, no_del, no_ack}} ->
- {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)};
- {ok, {PubRecord, del, no_ack}} when KeepAcks ->
- {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)};
- {ok, {_PubRecord, del, no_ack}} when KeepAcks ->
- {AckCount + 1, dict:erase(RelSeq, SegDict)}
- end.
-
-%% Loading Journal. This isn't idempotent and will mess up the counts
-%% if you call it more than once on the same state. Assumes the counts
-%% are 0 to start with.
-
-load_journal(State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- {ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State1 = #qistate { segments = Segments } = load_journal_entries(State),
- dict:fold(
- fun (Seg, #segment { journal_entries = JEntries,
- pubs = PubCountInJournal,
- acks = AckCountInJournal }, StateN) ->
- %% We want to keep acks in so that we can remove them if
- %% duplicates are in the journal. The counts here are
- %% purely from the segment itself.
- {SegDict, PubCountInSeg, AckCountInSeg, StateN1} =
- load_segment(Seg, true, StateN),
- %% Removed counts here are the number of pubs and acks
- %% that are duplicates - i.e. found in both the segment
- %% and journal.
- {JEntries1, PubsRemoved, AcksRemoved} =
- journal_minus_segment(JEntries, SegDict),
- {Segment1, StateN2} = find_segment(Seg, StateN1),
- PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved,
- AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved,
- store_segment(Segment1 #segment { journal_entries = JEntries1,
- pubs = PubCount1,
- acks = AckCount1 }, StateN2)
- end, State1, Segments).
-
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of
- {ok, <<MsgIdNum:?MSG_ID_BITS>>} ->
- %% work around for binary data
- %% fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<MsgId:?MSG_ID_BYTES/binary>> =
- <<MsgIdNum:?MSG_ID_BITS>>,
- Publish = {MsgId,
- case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
- load_journal_entries(
- add_to_journal(SeqId, Publish, State));
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
-
-add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) ->
- {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- Segment = #segment { journal_entries = SegJDict,
- pubs = PubCount, acks = AckCount } =
- find_segment(Seg, State),
- SegJDict1 = add_to_journal(RelSeq, Action, SegJDict),
- Segment1 = Segment #segment { journal_entries = SegJDict1 },
- Segment2 =
- case Action of
- del -> Segment1;
- ack -> Segment1 #segment { acks = AckCount + 1 };
- {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
- end,
- store_segment(Segment2, State #qistate { dirty_count = DCount + 1 });
-
-%% This is a more relaxed version of deliver_or_ack_msg because we can
-%% have dels or acks in the journal without the corresponding
-%% pub. Also, always want to keep acks. Things must occur in the right
-%% order though.
-add_to_journal(RelSeq, Action, SegJDict) ->
- case dict:find(RelSeq, SegJDict) of
- {ok, {PubRecord, no_del, no_ack}} when Action == del ->
- dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict);
- {ok, {PubRecord, DelRecord, no_ack}} when Action == ack ->
- dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict);
- error when Action == del ->
- dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict);
- error when Action == ack ->
- dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict);
- error ->
- {_MsgId, _IsPersistent} = Action, %% ASSERTION
- dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict)
- end.
-
-%% Combine what we have just read from a segment file with what we're
-%% holding for that segment in memory. There must be no
-%% duplicates. Used when providing segment entries to the variable
-%% queue.
-journal_plus_segment(JEntries, SegDict) ->
- dict:fold(fun (RelSeq, JObj, SegDictOut) ->
- SegEntry = case dict:find(RelSeq, SegDictOut) of
- error -> not_found;
- {ok, SObj = {_, _, _}} -> SObj
- end,
- journal_plus_segment(JObj, SegEntry, RelSeq, SegDictOut)
- end, SegDict, JEntries).
-
-%% Here, the OutDict is the SegDict which we may be adding to (for
-%% items only in the journal), modifying (bits in both), or erasing
-%% from (ack in journal, not segment).
-journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
- not_found,
- RelSeq, OutDict) ->
- dict:store(RelSeq, Obj, OutDict);
-journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
- not_found,
- RelSeq, OutDict) ->
- dict:store(RelSeq, Obj, OutDict);
-journal_plus_segment({{_MsgId, _IsPersistent}, del, ack},
- not_found,
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict);
-
-journal_plus_segment({no_pub, del, no_ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict) ->
- dict:store(RelSeq, {PubRecord, del, no_ack}, OutDict);
-
-journal_plus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict);
-journal_plus_segment({no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutDict) ->
- dict:erase(RelSeq, OutDict).
-
-
-%% Remove from the journal entries for a segment, items that are
-%% duplicates of entries found in the segment itself. Used on start up
-%% to clean up the journal.
-journal_minus_segment(JEntries, SegDict) ->
- dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) ->
- SegEntry = case dict:find(RelSeq, SegDict) of
- error -> not_found;
- {ok, SObj = {_, _, _}} -> SObj
- end,
- journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut,
- PubsRemoved, AcksRemoved)
- end, {dict:new(), 0, 0}, JEntries).
-
-%% Here, the OutDict is a fresh journal that we're filling with valid
-%% entries. PubsRemoved and AcksRemoved only get increased when the a
-%% publish or ack is in both the journal and the segment.
-
-%% Both the same. Must be at least the publish
-journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack},
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved + 1, AcksRemoved};
-journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack},
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved + 1, AcksRemoved + 1};
-
-%% Just publish in journal
-journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
- not_found,
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
-
-%% Just deliver in journal
-journal_minus_segment(Obj = {no_pub, del, no_ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
-journal_minus_segment({no_pub, del, no_ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved, AcksRemoved};
-
-%% Just ack in journal
-journal_minus_segment(Obj = {no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
-journal_minus_segment({no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, ack},
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved, AcksRemoved};
-
-%% Publish and deliver in journal
-journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
- not_found,
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
-journal_minus_segment({PubRecord, del, no_ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict),
- PubsRemoved + 1, AcksRemoved};
-
-%% Deliver and ack in journal
-journal_minus_segment(Obj = {no_pub, del, ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved};
-journal_minus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict),
- PubsRemoved, AcksRemoved};
-journal_minus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, del, ack},
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved, AcksRemoved + 1};
-
-%% Publish, deliver and ack in journal
-journal_minus_segment({{_MsgId, _IsPersistent}, del, ack},
- not_found,
- _RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {OutDict, PubsRemoved, AcksRemoved};
-journal_minus_segment({PubRecord, del, ack},
- {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, {no_pub, del, ack}, OutDict),
- PubsRemoved + 1, AcksRemoved};
-journal_minus_segment({PubRecord, del, ack},
- {PubRecord = {_MsgId, _IsPersistent}, del, no_ack},
- RelSeq, OutDict, PubsRemoved, AcksRemoved) ->
- {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict),
- PubsRemoved + 1, AcksRemoved}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f84ba70adc..dc81ea18b9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1078,6 +1078,8 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
test_queue_index() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ TwoSegs = SegmentSize + SegmentSize,
stop_msg_store(),
ok = empty_test_queue(),
SeqIdsA = lists:seq(0,9999),
@@ -1086,7 +1088,7 @@ test_queue_index() ->
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
{Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
- {0, 10000, Qi3} =
+ {0, SegSize, Qi3} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2),
{ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3),
ok = verify_read_with_published(false, false, ReadA,
@@ -1097,10 +1099,10 @@ test_queue_index() ->
ok = rabbit_queue_index:start_msg_store([test_amqqueue(true)]),
%% should get length back as 0, as all the msgs were transient
{0, Qi6} = rabbit_queue_index:init(test_queue()),
- {0, 10000, Qi7} =
+ {0, SegSize, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
{Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
- {0, 20000, Qi9} =
+ {0, TwoSegs, Qi9} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
{ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
ok = verify_read_with_published(false, true, ReadB,
@@ -1111,7 +1113,7 @@ test_queue_index() ->
%% should get length back as 10000
LenB = length(SeqIdsB),
{LenB, Qi12} = rabbit_queue_index:init(test_queue()),
- {0, 20000, Qi13} =
+ {0, TwoSegs, Qi13} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
@@ -1119,10 +1121,8 @@ test_queue_index() ->
lists:reverse(SeqIdsMsgIdsB)),
Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
Qi17 = queue_index_flush_journal(Qi16),
- %% the entire first segment will have gone as they were firstly
- %% transient, and secondly ack'd
- SegmentSize = rabbit_queue_index:segment_size(),
- {SegmentSize, 20000, Qi18} =
+ %% Everything will have gone now because #pubs == #acks
+ {0, 0, Qi18} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi17),
_Qi19 = rabbit_queue_index:terminate(Qi18),
ok = stop_msg_store(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0a5909a055..f2d45700f0 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -460,7 +460,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) ->
{SeqIdsAcc1, StateN1}
end, {[], State1}, lists:flatten(lists:reverse(SPubs))),
IndexState1 =
- rabbit_queue_index:sync_seq_ids(PubSeqIds, [] /= SAcks, IndexState),
+ rabbit_queue_index:sync_seq_ids(PubSeqIds, IndexState),
[ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.