summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-15 15:25:30 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-15 15:25:30 +0100
commit2dfcd8bab4e0f9a37a05b373e6c673d68578598e (patch)
tree28f266feef8aa4990aefd329ba8eb7b2d3df46a7
parentccef72413241141ec7a20e67219cf6f1ee7c5391 (diff)
downloadrabbitmq-server-git-2dfcd8bab4e0f9a37a05b373e6c673d68578598e.tar.gz
midway through fixing test_queue_recover
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_tests.erl29
-rw-r--r--src/rabbit_variable_queue.erl3
3 files changed, 35 insertions, 7 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c631f7a26b..1b9cd38218 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -427,6 +427,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% find the number of unacked messages.
lists:foldl(
fun (Seg, {Segments2, CountAcc}) ->
+ io:format("Recovering: ~p~n", [Seg]),
Segment = #segment { unacked = UnackedCount } =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
@@ -584,6 +585,7 @@ maybe_flush_journal(State) ->
State.
flush_journal(State = #qistate { segments = Segments }) ->
+ io:format("Flushing journal~n"),
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
@@ -709,7 +711,8 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
{ok, Segment} -> Segment;
- error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
+ error -> io:format("New Seg~n"),
+ SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
Path = filename:join(Dir, SegName),
#segment { num = Seg,
path = Path,
@@ -805,8 +808,10 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
+ io:format("path: ~p~n", [Path]),
case filelib:is_file(Path) of
- false -> {array_new(), 0};
+ false -> io:format("Creating new~n"),
+ {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
@@ -815,6 +820,7 @@ load_segment(KeepAcked, #segment { path = Path }) ->
end.
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
+ io:format("Loading seg entries"),
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6e9fd8c8cf..79792786b2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1414,6 +1414,7 @@ test_backing_queue() ->
application:set_env(rabbit, msg_store_file_size_limit,
FileSizeLimit, infinity),
passed = test_queue_index(),
+ passed = test_queue_index_props(),
passed = test_variable_queue(),
passed = test_queue_recover(),
application:set_env(rabbit, queue_index_max_journal_entries,
@@ -1657,6 +1658,21 @@ verify_read_with_published(Delivered, Persistent,
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
+test_queue_index_props() ->
+ with_empty_test_queue(
+ fun(Qi0) ->
+ Guid = rabbit_guid:guid(),
+ Props = #msg_properties{expiry=12345},
+ Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0),
+ {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1),
+ Qi2
+ end),
+
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([]),
+
+ passed.
+
test_queue_index() ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
@@ -1798,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ _Props, IsDelivered, AckTagN, Rem}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -1838,6 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
{Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
VQ7 = lists:foldl(
fun (Duration1, VQ4) ->
@@ -1860,7 +1877,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -1924,7 +1941,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
+ {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
@@ -1936,7 +1953,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
+ VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
@@ -1954,6 +1971,7 @@ test_queue_recover() ->
sender = self(), message = Msg},
[true = rabbit_amqqueue:deliver(QPid, Delivery) ||
_ <- lists:seq(1, Count)],
+ io:format("Calling commit~n"),
rabbit_amqqueue:commit_all([QPid], TxID, self()),
exit(QPid, kill),
MRef = erlang:monitor(process, QPid),
@@ -1961,6 +1979,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
+ io:format("Restarting queue~n"),
ok = rabbit_amqqueue:start(),
rabbit_amqqueue:with_or_die(
QName,
@@ -1970,7 +1989,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
rabbit_amqqueue:internal_delete(QName)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index aee8d47b36..c8d3c6e4e8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -370,6 +370,7 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(QueueName, IsDurable, Recover) ->
+ io:format("Initing ~p ~p~n", [QueueName, Recover]),
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
QueueName, Recover,
@@ -377,6 +378,7 @@ init(QueueName, IsDurable, Recover) ->
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end),
+ io:format("Inited: ~p~n", [DeltaCount]),
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
{PRef, TRef, Terms1} =
@@ -956,6 +958,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
Pubs = lists:append(lists:reverse(SPubs)),
+ io:format("Committing: ~p~n", [length(Pubs)]),
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties},