summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-08-20 11:28:57 +0100
committerRob Harrop <rharrop@vmware.com>2010-08-20 11:28:57 +0100
commitec42c2a48cdce9b49920bfc751feda835cdf8bee (patch)
treeae05eed85f4e34ec61add5eeca4ac3cc57356afc /src
parentf2a73308e711246d6ea2c50caad7d846be0e022c (diff)
downloadrabbitmq-server-git-ec42c2a48cdce9b49920bfc751feda835cdf8bee.tar.gz
in flight, want to test against default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_queue_index.erl83
-rw-r--r--src/rabbit_variable_queue.erl15
3 files changed, 71 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b6f898cece..f1bfba84c0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -720,6 +720,7 @@ handle_call({notify_down, ChPid}, _From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
+ io:format("Basic Get~n"),
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6b8bb2889..3c3aef3edf 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/3,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -98,7 +98,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'),
+%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'),
%% ('ack'|'no_ack')} is richer than strictly necessary for most
%% operations. However, for startup, and to ensure the safe and
%% correct combination of journal entries with entries read from the
@@ -141,14 +141,19 @@
-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, and 128 bits of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
+%% of md5sum msg id
-define(PUBLISH_PREFIX, 1).
-define(PUBLISH_PREFIX_BITS, 1).
+-define(EXPIRY_BYTES, 8).
+-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
+-define(NO_EXPIRY, 0).
+
-define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(GUID_BITS, (?GUID_BYTES * 8)).
-%% 16 bytes for md5sum + 2 for seq, bits and prefix
--define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2).
+%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix
+-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2).
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
@@ -157,7 +162,7 @@
%% ---- misc ----
--define(PUB, {_, _}). %% {Guid, IsPersistent}
+-define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent}
-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
@@ -245,15 +250,18 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry},
+ IsPersistent, State) when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS>>,
+ create_pub_record_body(Guid, Expiry)]),
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -282,9 +290,10 @@ flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
{[], State};
-read(Start, End, State = #qistate { segments = Segments,
+read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
+ io:format("Reading~n"),
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
{Messages, Segments1} =
@@ -320,6 +329,7 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
+ io:format("Recovering~n"),
DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
@@ -386,6 +396,7 @@ store_clean_shutdown(Terms, Dir) ->
rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
init_clean(RecoveredCounts, State) ->
+ io:format("Clean Init"),
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.
State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
@@ -402,9 +413,11 @@ init_clean(RecoveredCounts, State) ->
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
+ io:format("Clean:~p~n", [Segments1]),
{undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
+ io:format("Dirty Init"),
%% Recover the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
@@ -427,6 +440,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Unconditionally flush since the dirty_count doesn't get updated
%% by the above foldl.
State2 = flush_journal(State1 #qistate { segments = Segments1 }),
+ io:format("Dirty:~p~n", [State2]),
{Count, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
@@ -449,7 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) ->
+ fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -512,6 +526,26 @@ queue_index_walker_reader(QueueName, Gatherer) ->
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
+%% expiry/binary manipulation
+%%----------------------------------------------------------------------------
+
+create_pub_record_body(Guid, Expiry) ->
+ io:format("Writing GUID: ~p~n", [Guid]),
+ [Guid, expiry_to_binary(Expiry)].
+
+expiry_to_binary(undefined) ->
+ <<?NO_EXPIRY:?EXPIRY_BITS>>;
+expiry_to_binary(Expiry) ->
+ <<Expiry:?EXPIRY_BITS>>.
+
+read_pub_record_body(Hdl) ->
+ {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES),
+ <<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
+ <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
+ io:format("Read GUID: ~p~n", [Guid]),
+ {Guid, Expiry}.
+
+%%----------------------------------------------------------------------------
%% journal manipulation
%%----------------------------------------------------------------------------
@@ -588,6 +622,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ io:format("Journal Path:~p~n", [Path]),
{ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -631,14 +666,10 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- case file_handle_cache:read(Hdl, ?GUID_BYTES) of
- {ok, <<GuidNum:?GUID_BITS>>} ->
- %% work around for binary data
- %% fragmentation. See
- %% rabbit_msg_file:read_next/2
- <<Guid:?GUID_BYTES/binary>> =
- <<GuidNum:?GUID_BITS>>,
- Publish = {Guid, case Prefix of
+ io:format("found a pub record"),
+ case read_pub_record_body(Hdl) of
+ {Guid, Expiry} ->
+ Publish = {Guid, Expiry, case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end},
@@ -739,11 +770,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, IsPersistent} ->
+ {Guid, Expiry, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, Guid])
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(Guid, Expiry)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -763,10 +795,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
+ [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq),
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -789,6 +821,7 @@ load_segment(KeepAcked, #segment { path = Path }) ->
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),
+ io:format("Load segment: ~p~n", [Res]),
Res
end.
@@ -798,8 +831,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
- Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack},
+ {Guid, Expiry} = read_pub_record_body(Hdl),
+ Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 4e978fd5ad..ff19b87f3b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1057,12 +1057,14 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = Guid, seq_id = SeqId,
+ seq_id = SeqId,
is_persistent = IsPersistent,
- is_delivered = IsDelivered }, IndexState)
+ is_delivered = IsDelivered },
+ IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent,
+ IndexEntry = queue_index_entry(MsgStatus),
+ IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent,
IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
@@ -1079,6 +1081,12 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
{MsgStatus2, State #vqstate { index_state = IndexState1,
msg_store_clients = MSCState1 }}.
+queue_index_entry(#msg_status {guid = Guid,
+ seq_id = SeqId,
+ msg_properties =
+ #msg_properties{expiry = Expiry}}) ->
+ #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}.
+
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
@@ -1300,6 +1308,7 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
+ io:format("Reading~n"),
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(