summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_queue_index.erl3
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl14
4 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bb4ac0b9c2..e9711b54b6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -481,10 +481,10 @@ commit_transaction(Txn, State) ->
case lookup_ch(ChPid) of
not_found -> [];
C = #cr { unacked_messages = UAM } ->
- {MsgWithAcks, Remaining} =
+ {MsgsWithAcks, Remaining} =
collect_messages(PendingAcksOrdered, UAM),
store_ch_record(C#cr{unacked_messages = Remaining}),
- MsgWithAcks
+ [AckTag || {_Msg, AckTag} <- MsgsWithAcks]
end,
VQS = rabbit_variable_queue:tx_commit(
PendingMessagesOrdered, Acks, State #q.variable_queue_state),
@@ -593,13 +593,13 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 });
{{Msg, IsDelivered, AckTag, Remaining}, VQS1} ->
AckRequired = not(NoAck),
- {ok, VQS2} =
+ VQS2 =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {ok, VQS1};
+ VQS1;
false ->
rabbit_variable_queue:ack([AckTag], VQS1)
end,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index cf0258b9f9..9933eb4cd8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -516,6 +516,7 @@ load_segment(SegNum, SegPath, JAckDict) ->
case file:open(SegPath, [raw, binary, read_ahead, read]) of
{error, enoent} -> {dict:new(), 0, 0};
{ok, Hdl} ->
+ rabbit_log:info("SegNum: ~p~n", [SegNum]),
{SDict, AckCount, HighRelSeq} =
load_segment_entries(Hdl, dict:new(), 0, 0),
ok = file:close(Hdl),
@@ -536,6 +537,7 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) ->
MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} ->
{ok, LSB} = file:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
+ rabbit_log:info("D/A: ~p: ~p~n", [self(), RelSeq]),
{SDict1, AckCount1} = deliver_or_ack_msg(SDict, AckCount, RelSeq),
load_segment_entries(Hdl, SDict1, AckCount1, HighRelSeq);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
@@ -545,6 +547,7 @@ load_segment_entries(Hdl, SDict, AckCount, HighRelSeq) ->
{ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} =
file:read(Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1),
<<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>,
+ rabbit_log:info("Pub: ~p: ~p~n", [self(), RelSeq]),
HighRelSeq1 = lists:max([RelSeq, HighRelSeq]),
load_segment_entries(
Hdl, dict:store(RelSeq, {MsgId, false,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 259f120a17..3a435e79ae 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -50,7 +50,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
- passed = test_disk_queue(),
+ %% passed = test_disk_queue(),
passed = test_priority_queue(),
passed = test_unfold(),
passed = test_parsing(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a7a07556e8..831aa0446b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -274,7 +274,7 @@ ack(AckTags, State = #vqstate { index_state = IndexState }) ->
lists:foldl(
fun (ack_not_on_disk, Acc) -> Acc;
({ack_index_and_store, MsgId, SeqId}, {MsgIds, SeqIds}) ->
- {[MsgId | MsgIds], [SeqId, SeqIds]}
+ {[MsgId | MsgIds], [SeqId | SeqIds]}
end, {[], []}, AckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
@@ -294,16 +294,15 @@ purge(State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete(State) ->
- {PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State),
+ {_PurgeCount, State1 = #vqstate { index_state = IndexState }} = purge(State),
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState)
of
{N, N} ->
- {PurgeCount, State1};
+ State1;
{GammaSeqId, NextSeqId} ->
- {DeleteCount, IndexState1} =
+ {_DeleteCount, IndexState1} =
delete1(NextSeqId, 0, GammaSeqId, IndexState),
- {PurgeCount + DeleteCount,
- State1 #vqstate { index_state = IndexState1 }}
+ State1 #vqstate { index_state = IndexState1 }
end.
%% [{Msg, AckTag}]
@@ -349,10 +348,11 @@ tx_commit(Pubs, AckTags, State) ->
[] ->
do_tx_commit(Pubs, AckTags, State);
PersistentMsgIds ->
+ Self = self(),
ok = rabbit_msg_store:sync(
PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:tx_commit_callback(
- self(), Pubs, AckTags)
+ Self, Pubs, AckTags)
end),
State
end.