summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-30 18:00:26 +0000
committerMatthew Sackman <matthew@lshift.net>2009-10-30 18:00:26 +0000
commit4a1b5e5279127cbcf75880b4ce145dd4aefae050 (patch)
tree4adabf8121eae20403a82f371743de206aee027e /src
parent6c2709a85d3a1a16ebc1da263ceef42b2e5a54cb (diff)
downloadrabbitmq-server-git-4a1b5e5279127cbcf75880b4ce145dd4aefae050.tar.gz
Well, the transformation is done. Hilariously it works, first time. However, whilst all tests pass, I suspect there may still be faults, eg in the counting of entries in the journal etc. It still needs further checking...
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl364
1 files changed, 206 insertions, 158 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 62c6af5384..acda163654 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -77,9 +77,12 @@
-define(CLEAN_FILENAME, "clean.dot").
-define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768).
--define(ACK_JOURNAL_FILENAME, "ack_journal.jif").
+-define(ACK_JOURNAL_FILENAME, "journal.jif").
+
+-define(DEL_BIT, 0).
+-define(ACK_BIT, 1).
-define(SEQ_BYTES, 8).
--define(SEQ_BITS, (?SEQ_BYTES * 8)).
+-define(SEQ_BITS, ((?SEQ_BYTES * 8) - 1)).
-define(SEGMENT_EXTENSION, ".idx").
-define(REL_SEQ_BITS, 14).
@@ -112,11 +115,11 @@
-record(qistate,
{ dir,
seg_num_handles,
- journal_ack_count,
+ journal_count,
journal_ack_dict,
+ journal_del_dict,
seg_ack_counts,
- publish_handle,
- deliver_handle
+ publish_handle
}).
-include("rabbit.hrl").
@@ -132,11 +135,11 @@
{non_neg_integer(), hdl(), non_neg_integer()})).
-type(qistate() :: #qistate { dir :: file_path(),
seg_num_handles :: dict(),
- journal_ack_count :: integer(),
+ journal_count :: integer(),
journal_ack_dict :: dict(),
+ journal_del_dict :: dict(),
seg_ack_counts :: dict(),
- publish_handle :: hdl_and_count(),
- deliver_handle :: hdl_and_count()
+ publish_handle :: hdl_and_count()
}).
-spec(init/1 :: (queue_name()) -> {non_neg_integer(), qistate()}).
@@ -193,29 +196,17 @@ write_published(MsgId, SeqId, IsPersistent, State)
RelSeq:?REL_SEQ_BITS, MsgId/binary>>),
State1.
-write_delivered(SeqId, State) ->
- {SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- {Hdl, State1} = get_del_handle(SegNum, State),
- ok = file_handle_cache:append(
- Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>),
- State1.
+write_delivered(SeqId, State = #qistate { journal_del_dict = JDelDict }) ->
+ {JDelDict1, State1} =
+ write_to_journal([<<?DEL_BIT:1, SeqId:?SEQ_BITS>>],
+ [SeqId], JDelDict, State),
+ maybe_full_flush(State1 #qistate { journal_del_dict = JDelDict1 }).
-write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict,
- journal_ack_count = JAckCount }) ->
- {Hdl, State1} = get_journal_handle(State),
- {JAckDict1, JAckCount1} =
- lists:foldl(
- fun (SeqId, {JAckDict2, JAckCount2}) ->
- ok = file_handle_cache:append(Hdl, <<SeqId:?SEQ_BITS>>),
- {add_ack_to_ack_dict(SeqId, JAckDict2), JAckCount2 + 1}
- end, {JAckDict, JAckCount}, SeqIds),
- State2 = State1 #qistate { journal_ack_dict = JAckDict1,
- journal_ack_count = JAckCount1 },
- case JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
- true -> full_flush_journal(State2);
- false -> State2
- end.
+write_acks(SeqIds, State = #qistate { journal_ack_dict = JAckDict }) ->
+ {JAckDict1, State1} = write_to_journal([<<?ACK_BIT:1, SeqId:?SEQ_BITS>> ||
+ SeqId <- SeqIds],
+ SeqIds, JAckDict, State),
+ maybe_full_flush(State1 #qistate { journal_ack_dict = JAckDict1 }).
sync_seq_ids(SeqIds, SyncAckJournal, State) ->
State1 = case SyncAckJournal of
@@ -237,37 +228,44 @@ sync_seq_ids(SeqIds, SyncAckJournal, State) ->
StateM
end, State1, SegNumsSet).
-can_flush_journal(#qistate { journal_ack_count = 0 }) ->
+can_flush_journal(#qistate { journal_count = 0 }) ->
false;
can_flush_journal(_) ->
true.
-flush_journal(State = #qistate { journal_ack_count = 0 }) ->
+flush_journal(State = #qistate { journal_count = 0 }) ->
State;
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
- journal_ack_count = JAckCount }) ->
- [SegNum|_] = dict:fetch_keys(JAckDict),
- Acks = dict:fetch(SegNum, JAckDict),
- State1 = append_acks_to_segment(SegNum, Acks, State),
- JAckCount1 = JAckCount - length(Acks),
- State2 = State1 #qistate { journal_ack_dict = dict:erase(SegNum, JAckDict),
- journal_ack_count = JAckCount1 },
+ journal_del_dict = JDelDict,
+ journal_count = JCount }) ->
+ SegNum = case dict:fetch_keys(JAckDict) of
+ [] -> hd(dict:fetch_keys(JDelDict));
+ [N|_] -> N
+ end,
+ Dels = seg_entries_from_dict(SegNum, JDelDict),
+ Acks = seg_entries_from_dict(SegNum, JAckDict),
+ State1 = append_dels_to_segment(SegNum, Dels, State),
+ State2 = append_acks_to_segment(SegNum, Acks, State1),
+ JCount1 = JCount - length(Dels) - length(Acks),
+ State3 = State2 #qistate { journal_del_dict = dict:erase(SegNum, JDelDict),
+ journal_ack_dict = dict:erase(SegNum, JAckDict),
+ journal_count = JCount1 },
if
- JAckCount1 == 0 ->
- {Hdl, State3} = get_journal_handle(State2),
+ JCount1 == 0 ->
+ {Hdl, State4} = get_journal_handle(State3),
ok = file_handle_cache:position(Hdl, bof),
ok = file_handle_cache:truncate(Hdl),
ok = file_handle_cache:sync(Hdl),
- State3;
- JAckCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
- flush_journal(State2);
+ State4;
+ JCount1 > ?MAX_ACK_JOURNAL_ENTRY_COUNT ->
+ flush_journal(State3);
true ->
- State2
+ State3
end.
read_segment_entries(InitSeqId, State) ->
{SegNum, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId),
- {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} =
+ {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
load_segment(SegNum, State),
%% deliberately sort the list desc, because foldl will reverse it
RelSeqs = rev_sort(dict:fetch_keys(SDict)),
@@ -303,7 +301,7 @@ find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) ->
case SegNums of
[] -> {0, State};
_ -> SegNum2 = lists:last(SegNums),
- {_SDict, PubCount, _DelCount, _AckCount, HighRelSeq, State2} =
+ {_SDict, PubCount, _AckCount, HighRelSeq, State2} =
load_segment(SegNum2, State),
NextSeqId1 = reconstruct_seq_id(SegNum2, HighRelSeq),
NextSeqId2 =
@@ -355,6 +353,23 @@ start_msg_store(DurableQueues) ->
%% Minor Helpers
%%----------------------------------------------------------------------------
+write_to_journal(BinList, SeqIds, Dict,
+ State = #qistate { journal_count = JCount }) ->
+ {Hdl, State1} = get_journal_handle(State),
+ ok = file_handle_cache:append(Hdl, BinList),
+ {Dict1, JCount1} =
+ lists:foldl(
+ fun (SeqId, {Dict2, JCount2}) ->
+ {add_seqid_to_dict(SeqId, Dict2), JCount2 + 1}
+ end, {Dict, JCount}, SeqIds),
+ {Dict1, State1 #qistate { journal_count = JCount1 }}.
+
+maybe_full_flush(State = #qistate { journal_count = JCount }) ->
+ case JCount > ?MAX_ACK_JOURNAL_ENTRY_COUNT of
+ true -> full_flush_journal(State);
+ false -> State
+ end.
+
full_flush_journal(State) ->
case can_flush_journal(State) of
true -> State1 = flush_journal(State),
@@ -388,11 +403,6 @@ get_pub_handle(SegNum, State = #qistate { publish_handle = PubHandle }) ->
get_counted_handle(SegNum, State, PubHandle),
{Hdl, State1 #qistate { publish_handle = PubHandle1 }}.
-get_del_handle(SegNum, State = #qistate { deliver_handle = DelHandle }) ->
- {State1, DelHandle1 = {_SegNum, Hdl, _Count}} =
- get_counted_handle(SegNum, State, DelHandle),
- {Hdl, State1 #qistate { deliver_handle = DelHandle1 }}.
-
get_counted_handle(SegNum, State, undefined) ->
{Hdl, State1} = get_seg_handle(SegNum, State),
{State1, {SegNum, Hdl, 1}};
@@ -455,9 +465,12 @@ delete_queue_directory(Dir) ->
[ filename:join(Dir, Entry) || Entry <- Entries ]),
ok = file:del_dir(Dir).
-add_ack_to_ack_dict(SeqId, ADict) ->
+add_seqid_to_dict(SeqId, Dict) ->
{SegNum, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
- dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], ADict).
+ add_seqid_to_dict(SegNum, RelSeq, Dict).
+
+add_seqid_to_dict(SegNum, RelSeq, Dict) ->
+ dict:update(SegNum, fun(Lst) -> [RelSeq|Lst] end, [RelSeq], Dict).
all_segment_nums(Dir) ->
lists:sort(
@@ -471,11 +484,11 @@ blank_state(QueueName) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
#qistate { dir = Dir,
seg_num_handles = dict:new(),
- journal_ack_count = 0,
+ journal_count = 0,
journal_ack_dict = dict:new(),
+ journal_del_dict = dict:new(),
seg_ack_counts = dict:new(),
- publish_handle = undefined,
- deliver_handle = undefined
+ publish_handle = undefined
}.
detect_clean_shutdown(Dir) ->
@@ -489,6 +502,12 @@ store_clean_shutdown(Dir) ->
[write, raw, binary],
[{write_buffer, unbuffered}]),
ok = file_handle_cache:close(Hdl).
+
+seg_entries_from_dict(SegNum, Dict) ->
+ case dict:find(SegNum, Dict) of
+ {ok, Entries} -> Entries;
+ error -> []
+ end.
%%----------------------------------------------------------------------------
@@ -500,7 +519,7 @@ queue_index_walker([]) ->
queue_index_walker([QueueName|QueueNames]) ->
State = blank_state(QueueName),
{Hdl, State1} = get_journal_handle(State),
- JAckDict = load_journal(Hdl, dict:new()),
+ {_JDelDict, JAckDict} = load_journal(Hdl, dict:new(), dict:new()),
State2 = #qistate { dir = Dir } =
close_handle(journal, State1 #qistate { journal_ack_dict = JAckDict }),
SegNums = all_segment_nums(Dir),
@@ -510,7 +529,7 @@ queue_index_walker({[], State, QueueNames}) ->
_State = terminate(State),
queue_index_walker(QueueNames);
queue_index_walker({[SegNum | SegNums], State, QueueNames}) ->
- {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} =
+ {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
load_segment(SegNum, State),
queue_index_walker({dict:to_list(SDict), State1, SegNums, QueueNames});
@@ -534,31 +553,18 @@ read_and_prune_segments(State = #qistate { dir = Dir }) ->
{TotalMsgCount, State1} =
lists:foldl(
fun (SegNum, {TotalMsgCount1, StateN =
- #qistate { publish_handle = PublishHandle,
- deliver_handle = DeliverHandle }}) ->
- {SDict, PubCount, DelCount, AckCount, _HighRelSeq, StateM} =
+ #qistate { publish_handle = PublishHandle }}) ->
+ {SDict, PubCount, AckCount, _HighRelSeq, StateM} =
load_segment(SegNum, StateN),
- {TransientMsgsAcks, StateL =
- #qistate { seg_ack_counts = AckCounts,
- journal_ack_dict = JAckDict }} =
+ StateL = #qistate { seg_ack_counts = AckCounts } =
drop_and_deliver(SegNum, SDict, CleanShutdown, StateM),
- %% ignore TransientMsgsAcks in AckCounts and
- %% JAckDict1 because the TransientMsgsAcks fall
- %% through into scatter_journal at which point the
- %% AckCounts and TotalMsgCount will be correctly
- %% adjusted.
TotalMsgCount2 = TotalMsgCount1 + dict:size(SDict),
AckCounts1 = case AckCount of
0 -> AckCounts;
N -> dict:store(SegNum, N, AckCounts)
end,
- JAckDict1 =
- case TransientMsgsAcks of
- [] -> JAckDict;
- _ -> dict:store(SegNum, TransientMsgsAcks, JAckDict)
- end,
- %% In each of the following, there should only be
- %% one segment that matches the 3rd case. All other
+ %% In the following, there should only be max one
+ %% segment that matches the 3rd case. All other
%% segments should either be full or empty. There
%% could be no partial segments.
PublishHandle1 = case PubCount of
@@ -567,90 +573,117 @@ read_and_prune_segments(State = #qistate { dir = Dir }) ->
_ when PublishHandle == undefined ->
{SegNum, undefined, PubCount}
end,
- DeliverHandle1 = case DelCount of
- ?SEGMENT_ENTRIES_COUNT -> DeliverHandle;
- 0 -> DeliverHandle;
- _ when DeliverHandle == undefined ->
- {SegNum, undefined, DelCount}
- end,
{TotalMsgCount2,
StateL #qistate { seg_ack_counts = AckCounts1,
- journal_ack_dict = JAckDict1,
- publish_handle = PublishHandle1,
- deliver_handle = DeliverHandle1 }}
+ publish_handle = PublishHandle1 }}
end, {0, State}, SegNums),
{TotalMsgCount, State1}.
scatter_journal(TotalMsgCount, State = #qistate { dir = Dir }) ->
- {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} =
+ {Hdl, State1 = #qistate { journal_del_dict = JDelDict,
+ journal_ack_dict = JAckDict }} =
get_journal_handle(State),
- %% ADict may well contain duplicates. However, this is ok, due to
- %% the use of sets in replay_journal_acks_to_segment
- ADict = load_journal(Hdl, JAckDict),
+ %% ADict and DDict may well contain duplicates. However, this is
+ %% ok, because we use sets to eliminate dups before writing to
+ %% segments
+ {ADict, DDict} = load_journal(Hdl, JAckDict, JDelDict),
State2 = close_handle(journal, State1),
- {TotalMsgCount1, State3} =
- dict:fold(fun replay_journal_acks_to_segment/3,
- {TotalMsgCount,
+ {TotalMsgCount1, ADict1, State3} =
+ dict:fold(fun replay_journal_to_segment/3,
+ {TotalMsgCount, ADict,
%% supply empty dict so that when
%% replay_journal_acks_to_segment loads segments,
%% it gets all msgs, and ignores anything we've
%% found in the journal.
- State2 #qistate { journal_ack_dict = dict:new() }}, ADict),
+ State2 #qistate { journal_del_dict = dict:new(),
+ journal_ack_dict = dict:new() }}, DDict),
+ {TotalMsgCount2, State4} =
+ dict:fold(fun replay_journal_acks_to_segment/3,
+ {TotalMsgCount1, State3}, ADict1),
JournalPath = filename:join(Dir, ?ACK_JOURNAL_FILENAME),
ok = file:delete(JournalPath),
- {TotalMsgCount1, State3}.
+ {TotalMsgCount2, State4}.
-load_journal(Hdl, ADict) ->
+load_journal(Hdl, ADict, DDict) ->
case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<SeqId:?SEQ_BITS>>} ->
- load_journal(Hdl, add_ack_to_ack_dict(SeqId, ADict));
- _ErrOrEoF -> ADict
+ {ok, <<?DEL_BIT:1, SeqId:?SEQ_BITS>>} ->
+ load_journal(Hdl, ADict, add_seqid_to_dict(SeqId, DDict));
+ {ok, <<?ACK_BIT:1, SeqId:?SEQ_BITS>>} ->
+ load_journal(Hdl, add_seqid_to_dict(SeqId, ADict), DDict);
+ _ErrOrEoF -> {ADict, DDict}
end.
-replay_journal_acks_to_segment(_, [], Acc) ->
- Acc;
+replay_journal_to_segment(_SegNum, [], {TotalMsgCount, ADict, State}) ->
+ {TotalMsgCount, ADict, State};
+replay_journal_to_segment(SegNum, Dels, {TotalMsgCount, ADict, State}) ->
+ {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
+ load_segment(SegNum, State),
+ ValidDels = sets:to_list(
+ sets:filter(
+ fun (RelSeq) ->
+ case dict:find(RelSeq, SDict) of
+ {ok, {_MsgId, false, _IsPersistent}} -> true;
+ _ -> false
+ end
+ end, sets:from_list(Dels))),
+ State2 = append_dels_to_segment(SegNum, ValidDels, State1),
+ Acks = seg_entries_from_dict(SegNum, ADict),
+ case Acks of
+ [] -> {TotalMsgCount, ADict, State2};
+ _ ->
+ ADict1 = dict:erase(SegNum, ADict),
+ {Count, State3} = filter_acks_and_append_to_segment(SegNum, SDict,
+ Acks, State2),
+ {TotalMsgCount - Count, ADict1, State3}
+ end.
+
+replay_journal_acks_to_segment(_SegNum, [], {TotalMsgCount, State}) ->
+ {TotalMsgCount, State};
replay_journal_acks_to_segment(SegNum, Acks, {TotalMsgCount, State}) ->
- {SDict, _PubCount, _DelCount, _AckCount, _HighRelSeq, State1} =
+ {SDict, _PubCount, _AckCount, _HighRelSeq, State1} =
load_segment(SegNum, State),
+ {Count, State2} =
+ filter_acks_and_append_to_segment(SegNum, SDict, Acks, State1),
+ {TotalMsgCount - Count, State2}.
+
+filter_acks_and_append_to_segment(SegNum, SDict, Acks, State) ->
ValidRelSeqIds = dict:fetch_keys(SDict),
ValidAcks = sets:to_list(sets:intersection(sets:from_list(ValidRelSeqIds),
sets:from_list(Acks))),
- %% ValidAcks will not contain any duplicates at this point.
- {TotalMsgCount - length(ValidAcks),
- append_acks_to_segment(SegNum, ValidAcks, State1)}.
+ {length(ValidAcks), append_acks_to_segment(SegNum, ValidAcks, State)}.
-drop_and_deliver(SegNum, SDict, CleanShutdown, State) ->
- {AckMe, DeliverMe} =
+drop_and_deliver(SegNum, SDict, CleanShutdown,
+ State = #qistate { journal_del_dict = JDelDict,
+ journal_ack_dict = JAckDict }) ->
+ {JDelDict1, JAckDict1} =
dict:fold(
- fun (RelSeq, {MsgId, IsDelivered, true}, {AckMeAcc, DeliverMeAcc}) ->
+ fun (RelSeq, {MsgId, IsDelivered, true}, {JDelDict2, JAckDict2}) ->
%% msg is persistent, keep only if the msg_store has it
case {IsDelivered, rabbit_msg_store:contains(MsgId)} of
{false, true} when not CleanShutdown ->
%% not delivered, but dirty shutdown => mark delivered
- {AckMeAcc, [RelSeq | DeliverMeAcc]};
+ {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
+ JAckDict2};
{_, true} ->
- {AckMeAcc, DeliverMeAcc};
+ {JDelDict2, JAckDict2};
{true, false} ->
- {[RelSeq | AckMeAcc], DeliverMeAcc};
+ {JDelDict2,
+ add_seqid_to_dict(SegNum, RelSeq, JAckDict2)};
{false, false} ->
- {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]}
+ {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
+ add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}
end;
- (RelSeq, {_MsgId, false, false}, {AckMeAcc, DeliverMeAcc}) ->
+ (RelSeq, {_MsgId, false, false}, {JDelDict2, JAckDict2}) ->
%% not persistent and not delivered => deliver and ack it
- {[RelSeq | AckMeAcc], [RelSeq | DeliverMeAcc]};
- (RelSeq, {_MsgId, true, false}, {AckMeAcc, DeliverMeAcc}) ->
+ {add_seqid_to_dict(SegNum, RelSeq, JDelDict2),
+ add_seqid_to_dict(SegNum, RelSeq, JAckDict2)};
+ (RelSeq, {_MsgId, true, false}, {JDelDict2, JAckDict2}) ->
%% not persistent but delivered => ack it
- {[RelSeq | AckMeAcc], DeliverMeAcc}
- end, {[], []}, SDict),
- {Hdl, State1} = get_seg_handle(SegNum, State),
- ok = case DeliverMe of
- [] -> ok;
- _ -> file_handle_cache:append(
- Hdl,
- [ <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>> || RelSeq <- DeliverMe ])
- end,
- {AckMe, State1}.
+ {JDelDict2,
+ add_seqid_to_dict(SegNum, RelSeq, JAckDict2)}
+ end, {JDelDict, JAckDict}, SDict),
+ State #qistate { journal_del_dict = JDelDict1,
+ journal_ack_dict = JAckDict1 }.
%%----------------------------------------------------------------------------
@@ -664,35 +697,44 @@ load_segment(SegNum, State = #qistate { seg_num_handles = SegHdls,
error -> filelib:is_file(seg_num_to_path(Dir, SegNum))
end,
case SegmentExists of
- false -> {dict:new(), 0, 0, 0, 0, State};
+ false -> {dict:new(), 0, 0, 0, State};
true ->
- {Hdl, State1 = #qistate { journal_ack_dict = JAckDict }} =
+ {Hdl, State1 = #qistate { journal_del_dict = JDelDict,
+ journal_ack_dict = JAckDict }} =
get_seg_handle(SegNum, State),
ok = file_handle_cache:position(Hdl, bof),
- {SDict, PubCount, DelCount, AckCount, HighRelSeq} =
- load_segment_entries(Hdl, dict:new(), 0, 0, 0, 0),
- RelSeqs = case dict:find(SegNum, JAckDict) of
- {ok, RelSeqs1} -> RelSeqs1;
- error -> []
- end,
+ {SDict, PubCount, AckCount, HighRelSeq} =
+ load_segment_entries(Hdl, dict:new(), 0, 0, 0),
+ %% delete ack'd msgs first
{SDict1, AckCount1} =
lists:foldl(fun (RelSeq, {SDict2, AckCount2}) ->
{dict:erase(RelSeq, SDict2), AckCount2 + 1}
- end, {SDict, AckCount}, RelSeqs),
- {SDict1, PubCount, DelCount, AckCount1, HighRelSeq, State1}
+ end, {SDict, AckCount},
+ seg_entries_from_dict(SegNum, JAckDict)),
+ %% ensure remaining msgs are delivered as necessary
+ SDict3 =
+ lists:foldl(
+ fun (RelSeq, SDict4) ->
+ case dict:find(RelSeq, SDict4) of
+ {ok, {MsgId, false, IsPersistent}} ->
+ dict:store(RelSeq, {MsgId, true, IsPersistent},
+ SDict4);
+ _ -> SDict4
+ end
+ end, SDict1, seg_entries_from_dict(SegNum, JDelDict)),
+
+ {SDict3, PubCount, AckCount1, HighRelSeq, State1}
end.
-load_segment_entries(Hdl, SDict, PubCount, DelCount, AckCount, HighRelSeq) ->
+load_segment_entries(Hdl, SDict, PubCount, AckCount, HighRelSeq) ->
case file_handle_cache:read(Hdl, 1) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
{ok, LSB} = file_handle_cache:read(
Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
- {SDict1, DelCount1, AckCount1} =
- deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq),
- load_segment_entries(
- Hdl, SDict1, PubCount, DelCount1, AckCount1, HighRelSeq);
+ {SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
+ load_segment_entries(Hdl, SDict1, PubCount, AckCount1, HighRelSeq);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} ->
%% because we specify /binary, and binaries are complete
@@ -704,23 +746,21 @@ load_segment_entries(Hdl, SDict, PubCount, DelCount, AckCount, HighRelSeq) ->
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(
Hdl, dict:store(RelSeq, {MsgId, false, 1 == IsPersistentNum},
- SDict),
- PubCount + 1, DelCount, AckCount, HighRelSeq1);
- _ErrOrEoF -> {SDict, PubCount, DelCount, AckCount, HighRelSeq}
+ SDict), PubCount + 1, AckCount, HighRelSeq1);
+ _ErrOrEoF -> {SDict, PubCount, AckCount, HighRelSeq}
end.
-deliver_or_ack_msg(SDict, DelCount, AckCount, RelSeq) ->
+deliver_or_ack_msg(SDict, AckCount, RelSeq) ->
case dict:find(RelSeq, SDict) of
{ok, {MsgId, false, IsPersistent}} ->
- {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict),
- DelCount + 1, AckCount};
+ {dict:store(RelSeq, {MsgId, true, IsPersistent}, SDict), AckCount};
{ok, {_MsgId, true, _IsPersistent}} ->
- {dict:erase(RelSeq, SDict), DelCount, AckCount + 1}
+ {dict:erase(RelSeq, SDict), AckCount + 1}
end.
%%----------------------------------------------------------------------------
-%% Appending Acks to Segments
+%% Appending Acks or Dels to Segments
%%----------------------------------------------------------------------------
append_acks_to_segment(SegNum, Acks,
@@ -749,13 +789,21 @@ append_acks_to_segment(SegNum, AckCount, Acks, State = #qistate { dir = Dir })
{?SEGMENT_ENTRIES_COUNT, State1};
append_acks_to_segment(SegNum, AckCount, Acks, State)
when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
- {Hdl, State1} = get_seg_handle(SegNum, State),
- {ok, AckCount1} =
- lists:foldl(
- fun (RelSeq, {ok, AckCount2}) ->
- {file_handle_cache:append(
- Hdl, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>), AckCount2 + 1}
- end, {ok, AckCount}, Acks),
+ {Count, Hdl, State1} = append_to_segment(SegNum, Acks, State),
ok = file_handle_cache:sync(Hdl),
- {AckCount1, State1}.
+ {AckCount + Count, State1}.
+
+append_dels_to_segment(SegNum, Dels, State) ->
+ {_Count, _Hdl, State1} = append_to_segment(SegNum, Dels, State),
+ State1.
+
+append_to_segment(SegNum, AcksOrDels, State) ->
+ {Hdl, State1} = get_seg_handle(SegNum, State),
+ {Count, List} =
+ lists:foldl(fun (RelSeq, {Count1, Acc}) ->
+ {Count1 + 1,
+ [<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>> | Acc]}
+ end, {0, []}, AcksOrDels),
+ ok = file_handle_cache:append(Hdl, List),
+ {Count, Hdl, State1}.