summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_queue_index.erl55
-rw-r--r--src/rabbit_variable_queue.erl22
3 files changed, 32 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f1bfba84c0..b6f898cece 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -720,7 +720,6 @@ handle_call({notify_down, ChPid}, _From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
- io:format("Basic Get~n"),
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
case fetch(AckRequired, State1) of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3c3aef3edf..4f2168d1ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/4, terminate/2, delete_and_terminate/1, publish/3,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/5,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -98,7 +98,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'),
+%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'),
%% ('ack'|'no_ack')} is richer than strictly necessary for most
%% operations. However, for startup, and to ensure the safe and
%% correct combination of journal entries with entries read from the
@@ -162,7 +162,7 @@
%% ---- misc ----
--define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent}
+-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent}
-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
@@ -203,15 +203,15 @@
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) ->
- qistate()).
+-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(),
+ boolean(), qistate()) -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}],
- qistate()}).
+ {[{rabbit_guid:guid(), seq_id(), msg_properties(),
+ boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -250,8 +250,8 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry},
- IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProperties, IsPersistent, State)
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
@@ -260,8 +260,8 @@ publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry},
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, Expiry)]),
- maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)).
+ create_pub_record_body(Guid, MsgProperties)]),
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -293,7 +293,6 @@ read(StartEnd, StartEnd, State) ->
read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
- io:format("Reading~n"),
LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
{Messages, Segments1} =
@@ -329,7 +328,6 @@ bounds(State = #qistate { segments = Segments }) ->
{LowSeqId, NextSeqId, State}.
recover(DurableQueues) ->
- io:format("Recovering~n"),
DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} ||
Queue <- DurableQueues ]),
QueuesDir = queues_dir(),
@@ -396,7 +394,6 @@ store_clean_shutdown(Terms, Dir) ->
rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
init_clean(RecoveredCounts, State) ->
- io:format("Clean Init"),
%% Load the journal. Since this is a clean recovery this (almost)
%% gets us back to where we were on shutdown.
State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
@@ -413,11 +410,9 @@ init_clean(RecoveredCounts, State) ->
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
- io:format("Clean:~p~n", [Segments1]),
{undefined, State1 # qistate { segments = Segments1 }}.
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
- io:format("Dirty Init"),
%% Recover the journal completely. This will also load segments
%% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
@@ -440,7 +435,6 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Unconditionally flush since the dirty_count doesn't get updated
%% by the above foldl.
State2 = flush_journal(State1 #qistate { segments = Segments1 }),
- io:format("Dirty:~p~n", [State2]),
{Count, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
@@ -463,7 +457,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) ->
+ fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
Del, RelSeq, Segment1)
end,
@@ -529,8 +523,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(Guid, Expiry) ->
- io:format("Writing GUID: ~p~n", [Guid]),
+create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) ->
[Guid, expiry_to_binary(Expiry)].
expiry_to_binary(undefined) ->
@@ -542,8 +535,7 @@ read_pub_record_body(Hdl) ->
{ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES),
<<GuidNum:?GUID_BITS, Expiry:?EXPIRY_BITS>> = Bin,
<<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>,
- io:format("Read GUID: ~p~n", [Guid]),
- {Guid, Expiry}.
+ {Guid, #msg_properties{expiry = Expiry}}.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -622,7 +614,6 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- io:format("Journal Path:~p~n", [Path]),
{ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -666,10 +657,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
?ACK_JPREFIX ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
- io:format("found a pub record"),
case read_pub_record_body(Hdl) of
- {Guid, Expiry} ->
- Publish = {Guid, Expiry, case Prefix of
+ {Guid, MsgProperties} ->
+ Publish = {Guid, MsgProperties, case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end},
@@ -770,12 +760,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {Guid, Expiry, IsPersistent} ->
+ {Guid, MsgProperties, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, Expiry)])
+ create_pub_record_body(Guid, MsgProperties)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -795,10 +785,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq),
+ [ {Guid, MsgProperties, reconstruct_seq_id(StartSeg, RelSeq),
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
@@ -821,7 +811,6 @@ load_segment(KeepAcked, #segment { path = Path }) ->
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),
- io:format("Load segment: ~p~n", [Res]),
Res
end.
@@ -831,8 +820,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {Guid, Expiry} = read_pub_record_body(Hdl),
- Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack},
+ {Guid, MsgProperties} = read_pub_record_body(Hdl),
+ Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff19b87f3b..e6ff18c922 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -837,7 +837,7 @@ persistent_guids(Pubs) ->
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({Guid, SeqId, IsPersistent, IsDelivered},
+ fun ({Guid, MsgProperties, SeqId, IsPersistent, IsDelivered},
{Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -849,7 +849,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) ->
is_persistent = IsPersistent,
is_delivered = IsDelivered,
msg_on_disk = true,
- index_on_disk = true
+ index_on_disk = true,
+ msg_properties = MsgProperties
}) | Filtered1],
Delivers1,
Acks1}
@@ -1057,14 +1058,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ guid = Guid,
seq_id = SeqId,
is_persistent = IsPersistent,
- is_delivered = IsDelivered },
+ is_delivered = IsDelivered,
+ msg_properties = MsgProperties},
IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- IndexEntry = queue_index_entry(MsgStatus),
- IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent,
+ IndexState1 = rabbit_queue_index:publish(Guid,
+ SeqId,
+ MsgProperties,
+ IsPersistent,
IndexState),
{MsgStatus #msg_status { index_on_disk = true },
maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
@@ -1081,12 +1086,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
{MsgStatus2, State #vqstate { index_state = IndexState1,
msg_store_clients = MSCState1 }}.
-queue_index_entry(#msg_status {guid = Guid,
- seq_id = SeqId,
- msg_properties =
- #msg_properties{expiry = Expiry}}) ->
- #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}.
-
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
@@ -1308,7 +1307,6 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- io:format("Reading~n"),
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(