summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl27
-rw-r--r--src/rabbit_mixed_queue.erl83
-rw-r--r--src/rabbit_tests.erl72
3 files changed, 130 insertions, 52 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index cc5099eb3b..8a018d969d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -44,7 +44,7 @@
dump_queue/1, delete_non_durable_queues/1
]).
--export([length/1, is_empty/1]).
+-export([length/1, is_empty/1, next_write_seq/1]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -249,13 +249,14 @@
-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(),
- bool(), seq_id()}]).
+ bool(), {msg_id(), seq_id()}, seq_id()}]).
-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(to_disk_only_mode/0 :: () -> 'ok').
-spec(length/1 :: (queue_name()) -> non_neg_integer()).
+-spec(next_write_seq/1 :: (queue_name()) -> non_neg_integer()).
-spec(is_empty/1 :: (queue_name()) -> bool()).
-endif.
@@ -327,6 +328,9 @@ to_ram_disk_mode() ->
length(Q) ->
gen_server2:call(?SERVER, {length, Q}, infinity).
+next_write_seq(Q) ->
+ gen_server2:call(?SERVER, {next_write_seq, Q}, infinity).
+
is_empty(Q) ->
Length = rabbit_disk_queue:length(Q),
Length == 0.
@@ -460,6 +464,9 @@ handle_call(to_ram_disk_mode, _From,
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
{_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
{reply, Length, State};
+handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) ->
+ {_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q),
+ {reply, WriteSeqId, State};
handle_call({dump_queue, Q}, _From, State) ->
{Result, State1} = internal_dump_queue(Q, State),
{reply, Result, State1};
@@ -483,7 +490,7 @@ handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
{noreply, State1};
handle_cast({requeue, Q, MsgSeqIds}, State) ->
- MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, next}),
+ MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}),
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
{noreply, State1};
handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
@@ -887,7 +894,7 @@ internal_tx_cancel(MsgIds, State) ->
internal_requeue(_Q, [], State) ->
{ok, State};
-internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_],
+internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
@@ -913,7 +920,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_],
{ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
- MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, next}}),
+ MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}),
{atomic, {WriteSeqId2, Q}} =
mnesia:transaction(
fun() ->
@@ -925,8 +932,8 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_],
Length + erlang:length(MsgSeqIds)}),
{ok, State}.
-requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo},
- {_NextMsgSeqId, NextSeqIdTo}},
+requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
+ {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}},
{ExpectedSeqIdTo, Q}) ->
SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
@@ -937,7 +944,8 @@ requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo},
true ->
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2},
- next_seq_id = NextSeqIdTo2
+ next_seq_id = NextSeqIdTo2,
+ is_delivered = NewIsDelivered
},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
@@ -1007,7 +1015,8 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
NextReadSeqId, State2} =
internal_read_message(Q, SeqId, true, true,
State1),
- {true, {MsgId, Msg, Size, Delivered, SeqId},
+ {true,
+ {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId},
{NextReadSeqId, State2}}
end, {ReadSeq, State}),
{lists:reverse(QList), State3}
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index dc180f00c4..c14aef5c10 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -49,61 +49,62 @@
}
).
-start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed ->
- QList = rabbit_disk_queue:dump_queue(Queue),
- {MsgBuf, NextSeq} =
- lists:foldl(
- fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq})
- when SeqId >= NSeq ->
- {queue:in({SeqId, bin_to_msg(Msg), Delivered}, Buf), SeqId + 1}
- end, {queue:new(), 0}, QList),
- {ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq,
- queue = Queue, is_durable = IsDurable }}.
+start_link(Queue, IsDurable, disk) ->
+ NextSeq = rabbit_disk_queue:next_write_seq(Queue),
+ {ok, #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue,
+ next_write_seq = NextSeq, is_durable = IsDurable }};
+start_link(Queue, IsDurable, mixed) ->
+ {ok, State} = start_link(Queue, IsDurable, disk),
+ to_mixed_mode(State #mqstate { next_write_seq = 0 }).
to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable }) ->
+ is_durable = IsDurable,
+ next_write_seq = NextSeq }) ->
Msgs = queue:to_list(MsgBuf),
- AckTags =
+ {NextSeq1, Requeue} =
lists:foldl(
fun ({_Seq, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered}, AcksAcc) ->
- ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)),
+ is_persistent = IsPersistent },
+ IsDelivered}, {NSeq, RQueueAcc}) ->
if IsDurable andalso IsPersistent ->
- {MsgId, IsDelivered, AckTag, _PersistRemaining}
- = rabbit_disk_queue:phantom_deliver(Q),
- [AckTag | AcksAcc];
- true -> AcksAcc
+ {MsgId, IsDelivered, AckTag, _PersistRemaining} =
+ rabbit_disk_queue:phantom_deliver(Q),
+ {NSeq + 1,
+ [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]};
+ true ->
+ ok = if [] == RQueueAcc -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(
+ Q, lists:reverse(RQueueAcc))
+ end,
+ ok = rabbit_disk_queue:publish_with_seq(
+ Q, MsgId, NSeq, msg_to_bin(Msg)),
+ {NSeq + 1, []}
end
- end, [], Msgs),
- ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)),
- State #mqstate { mode = disk, msg_buf = queue:new() }.
+ end, {NextSeq, []}, Msgs),
+ ok = if [] == Requeue -> ok;
+ true ->
+ rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
+ end,
+ {ok, State #mqstate { mode = disk, msg_buf = queue:new(),
+ next_write_seq = NextSeq1 }}.
to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable,
next_write_seq = NextSeq }) ->
QList = rabbit_disk_queue:dump_queue(Q),
- {MsgBuf1, NextSeq1, AckTags} =
+ {MsgBuf1, NextSeq1} =
lists:foldl(
- fun ({MsgId, MsgBin, _Size, IsDelivered, SeqId}, {Buf, NSeq, AcksAcc})
+ fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq})
when SeqId >= NSeq ->
- Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent }
+ Msg = #basic_message { guid = MsgId }
= bin_to_msg(MsgBin),
- Buf1 = queue:in({SeqId, Msg, IsDelivered}, Buf),
+ Buf1 = queue:in({SeqId,
+ Msg #basic_message { is_persistent = true },
+ IsDelivered}, Buf),
NSeq1 = SeqId + 1,
- AcksAcc1 =
- if IsDurable andalso IsPersistent ->
- [AcksAcc];
- true ->
- {MsgId, IsDelivered, AckTag, _PersistRemaining} =
- rabbit_disk_queue:phantom_deliver(Q),
- [AckTag | AcksAcc]
- end,
- {Buf1, NSeq1, AcksAcc1}
- end, {MsgBuf, NextSeq, []}, QList),
- ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)),
- State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }.
+ {Buf1, NSeq1}
+ end, {MsgBuf, NextSeq}, QList),
+ {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }}.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -250,7 +251,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
Acc2 =
if IsDurable andalso IsPersistent ->
{MsgId, _OldSeqId} = AckTag,
- [{AckTag, NextSeq3} | Acc];
+ [{AckTag, {NextSeq3, true}} | Acc];
true -> Acc
end,
MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4b7487b0c0..3d173e2e45 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -910,7 +910,7 @@ rdq_test_dump_queue() ->
[rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
- QList = [{N, Msg, 256, false, (N-1)} || N <- All],
+ QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All],
QList = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
io:format("dump ok undelivered~n", []),
@@ -924,12 +924,80 @@ rdq_test_dump_queue() ->
rdq_stop(),
io:format("dump ok post delivery~n", []),
rdq_start(),
- QList2 = [{N, Msg, 256, true, (N-1)} || N <- All],
+ QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All],
QList2 = rabbit_disk_queue:dump_queue(q),
io:format("dump ok post delivery + restart~n", []),
rdq_stop(),
passed.
+rdq_test_mixed_queue_modes() ->
+ rdq_virgin(),
+ rdq_start(),
+ Payload = <<0:(8*256)>>,
+ {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed),
+ MS2 = lists:foldl(fun (_N, MS1) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS, lists:seq(1,10)),
+ MS4 = lists:foldl(fun (_N, MS3) ->
+ Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload))
+ #basic_message { is_persistent = true },
+ {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3),
+ MS3a
+ end, MS2, lists:seq(1,10)),
+ MS6 = lists:foldl(fun (_N, MS5) ->
+ Msg = rabbit_basic:message(x, <<>>, <<>>, Payload),
+ {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5),
+ MS5a
+ end, MS4, lists:seq(1,10)),
+ 30 = rabbit_mixed_queue:length(MS6),
+ io:format("Published a mixture of messages~n"),
+ {ok, _MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ {ok, MS8} = rabbit_mixed_queue:start_link(q, true, mixed),
+ 30 = rabbit_mixed_queue:length(MS8),
+ io:format("Recovered queue~n"),
+ MS10 =
+ lists:foldl(
+ fun (N, MS9) ->
+ Rem = 30 - N,
+ {{#basic_message { is_persistent = true },
+ false, _AckTag, Rem},
+ MS9a} = rabbit_mixed_queue:deliver(MS9),
+ MS9a
+ end, MS8, lists:seq(1,10)),
+ io:format("Delivered initial non persistent messages~n"),
+ {ok, _MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed),
+ 30 = rabbit_mixed_queue:length(MS12),
+ io:format("Recovered queue~n"),
+ {MS14, AckTags} =
+ lists:foldl(
+ fun (N, {MS13, AcksAcc}) ->
+ Rem = 30 - N,
+ IsDelivered = N < 11,
+ {{#basic_message { is_persistent = true },
+ IsDelivered, AckTag, Rem},
+ MS13a} = rabbit_mixed_queue:deliver(MS13),
+ {MS13a, [AckTag | AcksAcc]}
+ end, {MS2, []}, lists:seq(1,20)),
+ {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
+ io:format("Delivered and acked initial non persistent messages~n"),
+ {ok, _MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ io:format("Converted to disk only mode~n"),
+ rdq_stop(),
+ rdq_start(),
+ {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed),
+ 10 = rabbit_mixed_queue:length(MS17),
+ io:format("Recovered queue~n"),
+ passed.
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).