diff options
| -rw-r--r-- | src/rabbit_disk_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 83 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 72 |
3 files changed, 130 insertions, 52 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index cc5099eb3b..8a018d969d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -44,7 +44,7 @@ dump_queue/1, delete_non_durable_queues/1 ]). --export([length/1, is_empty/1]). +-export([length/1, is_empty/1, next_write_seq/1]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -249,13 +249,14 @@ -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), - bool(), seq_id()}]). + bool(), {msg_id(), seq_id()}, seq_id()}]). -spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(to_disk_only_mode/0 :: () -> 'ok'). -spec(length/1 :: (queue_name()) -> non_neg_integer()). +-spec(next_write_seq/1 :: (queue_name()) -> non_neg_integer()). -spec(is_empty/1 :: (queue_name()) -> bool()). -endif. @@ -327,6 +328,9 @@ to_ram_disk_mode() -> length(Q) -> gen_server2:call(?SERVER, {length, Q}, infinity). +next_write_seq(Q) -> + gen_server2:call(?SERVER, {next_write_seq, Q}, infinity). + is_empty(Q) -> Length = rabbit_disk_queue:length(Q), Length == 0. @@ -460,6 +464,9 @@ handle_call(to_ram_disk_mode, _From, handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q), {reply, Length, State}; +handle_call({next_write_seq, Q}, _From, State = #dqstate { sequences = Sequences }) -> + {_ReadSeqId, WriteSeqId, _Length} = sequence_lookup(Sequences, Q), + {reply, WriteSeqId, State}; handle_call({dump_queue, Q}, _From, State) -> {Result, State1} = internal_dump_queue(Q, State), {reply, Result, State1}; @@ -483,7 +490,7 @@ handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), {noreply, State1}; handle_cast({requeue, Q, MsgSeqIds}, State) -> - MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, next}), + MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, {next, true}}), {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), {noreply, State1}; handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> @@ -887,7 +894,7 @@ internal_tx_cancel(MsgIds, State) -> internal_requeue(_Q, [], State) -> {ok, State}; -internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_], +internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've @@ -913,7 +920,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_], {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), - MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, next}}), + MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}), {atomic, {WriteSeqId2, Q}} = mnesia:transaction( fun() -> @@ -925,8 +932,8 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_], Length + erlang:length(MsgSeqIds)}), {ok, State}. -requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo}, - {_NextMsgSeqId, NextSeqIdTo}}, +requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, + {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}}, {ExpectedSeqIdTo, Q}) -> SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), @@ -937,7 +944,8 @@ requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo}, true -> ok = mnesia:write(rabbit_disk_queue, Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2}, - next_seq_id = NextSeqIdTo2 + next_seq_id = NextSeqIdTo2, + is_delivered = NewIsDelivered }, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) @@ -1007,7 +1015,8 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> NextReadSeqId, State2} = internal_read_message(Q, SeqId, true, true, State1), - {true, {MsgId, Msg, Size, Delivered, SeqId}, + {true, + {MsgId, Msg, Size, Delivered, {MsgId, SeqId}, SeqId}, {NextReadSeqId, State2}} end, {ReadSeq, State}), {lists:reverse(QList), State3} diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index dc180f00c4..c14aef5c10 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -49,61 +49,62 @@ } ). -start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed -> - QList = rabbit_disk_queue:dump_queue(Queue), - {MsgBuf, NextSeq} = - lists:foldl( - fun ({_MsgId, Msg, _Size, Delivered, SeqId}, {Buf, NSeq}) - when SeqId >= NSeq -> - {queue:in({SeqId, bin_to_msg(Msg), Delivered}, Buf), SeqId + 1} - end, {queue:new(), 0}, QList), - {ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq, - queue = Queue, is_durable = IsDurable }}. +start_link(Queue, IsDurable, disk) -> + NextSeq = rabbit_disk_queue:next_write_seq(Queue), + {ok, #mqstate { mode = disk, msg_buf = queue:new(), queue = Queue, + next_write_seq = NextSeq, is_durable = IsDurable }}; +start_link(Queue, IsDurable, mixed) -> + {ok, State} = start_link(Queue, IsDurable, disk), + to_mixed_mode(State #mqstate { next_write_seq = 0 }). to_disk_only_mode(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable }) -> + is_durable = IsDurable, + next_write_seq = NextSeq }) -> Msgs = queue:to_list(MsgBuf), - AckTags = + {NextSeq1, Requeue} = lists:foldl( fun ({_Seq, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered}, AcksAcc) -> - ok = rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg)), + is_persistent = IsPersistent }, + IsDelivered}, {NSeq, RQueueAcc}) -> if IsDurable andalso IsPersistent -> - {MsgId, IsDelivered, AckTag, _PersistRemaining} - = rabbit_disk_queue:phantom_deliver(Q), - [AckTag | AcksAcc]; - true -> AcksAcc + {MsgId, IsDelivered, AckTag, _PersistRemaining} = + rabbit_disk_queue:phantom_deliver(Q), + {NSeq + 1, + [ {AckTag, {NSeq, IsDelivered}} | RQueueAcc ]}; + true -> + ok = if [] == RQueueAcc -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs( + Q, lists:reverse(RQueueAcc)) + end, + ok = rabbit_disk_queue:publish_with_seq( + Q, MsgId, NSeq, msg_to_bin(Msg)), + {NSeq + 1, []} end - end, [], Msgs), - ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)), - State #mqstate { mode = disk, msg_buf = queue:new() }. + end, {NextSeq, []}, Msgs), + ok = if [] == Requeue -> ok; + true -> + rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) + end, + {ok, State #mqstate { mode = disk, msg_buf = queue:new(), + next_write_seq = NextSeq1 }}. to_mixed_mode(State = #mqstate { mode = disk, msg_buf = MsgBuf, queue = Q, - is_durable = IsDurable, next_write_seq = NextSeq }) -> QList = rabbit_disk_queue:dump_queue(Q), - {MsgBuf1, NextSeq1, AckTags} = + {MsgBuf1, NextSeq1} = lists:foldl( - fun ({MsgId, MsgBin, _Size, IsDelivered, SeqId}, {Buf, NSeq, AcksAcc}) + fun ({MsgId, MsgBin, _Size, IsDelivered, _AckTag, SeqId}, {Buf, NSeq}) when SeqId >= NSeq -> - Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent } + Msg = #basic_message { guid = MsgId } = bin_to_msg(MsgBin), - Buf1 = queue:in({SeqId, Msg, IsDelivered}, Buf), + Buf1 = queue:in({SeqId, + Msg #basic_message { is_persistent = true }, + IsDelivered}, Buf), NSeq1 = SeqId + 1, - AcksAcc1 = - if IsDurable andalso IsPersistent -> - [AcksAcc]; - true -> - {MsgId, IsDelivered, AckTag, _PersistRemaining} = - rabbit_disk_queue:phantom_deliver(Q), - [AckTag | AcksAcc] - end, - {Buf1, NSeq1, AcksAcc1} - end, {MsgBuf, NextSeq, []}, QList), - ok = rabbit_disk_queue:ack(Q, lists:reverse(AckTags)), - State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }. + {Buf1, NSeq1} + end, {MsgBuf, NextSeq}, QList), + {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, next_write_seq = NextSeq1 }}. msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), @@ -250,7 +251,7 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q, Acc2 = if IsDurable andalso IsPersistent -> {MsgId, _OldSeqId} = AckTag, - [{AckTag, NextSeq3} | Acc]; + [{AckTag, {NextSeq3, true}} | Acc]; true -> Acc end, MsgBuf4 = queue:in({NextSeq3, Msg, true}, MsgBuf3), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4b7487b0c0..3d173e2e45 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -910,7 +910,7 @@ rdq_test_dump_queue() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), - QList = [{N, Msg, 256, false, (N-1)} || N <- All], + QList = [{N, Msg, 256, false, {N, (N-1)}, (N-1)} || N <- All], QList = rabbit_disk_queue:dump_queue(q), rdq_stop(), io:format("dump ok undelivered~n", []), @@ -924,12 +924,80 @@ rdq_test_dump_queue() -> rdq_stop(), io:format("dump ok post delivery~n", []), rdq_start(), - QList2 = [{N, Msg, 256, true, (N-1)} || N <- All], + QList2 = [{N, Msg, 256, true, {N, (N-1)}, (N-1)} || N <- All], QList2 = rabbit_disk_queue:dump_queue(q), io:format("dump ok post delivery + restart~n", []), rdq_stop(), passed. +rdq_test_mixed_queue_modes() -> + rdq_virgin(), + rdq_start(), + Payload = <<0:(8*256)>>, + {ok, MS} = rabbit_mixed_queue:start_link(q, true, mixed), + MS2 = lists:foldl(fun (_N, MS1) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS, lists:seq(1,10)), + MS4 = lists:foldl(fun (_N, MS3) -> + Msg = (rabbit_basic:message(x, <<>>, <<>>, Payload)) + #basic_message { is_persistent = true }, + {ok, MS3a} = rabbit_mixed_queue:publish(Msg, MS3), + MS3a + end, MS2, lists:seq(1,10)), + MS6 = lists:foldl(fun (_N, MS5) -> + Msg = rabbit_basic:message(x, <<>>, <<>>, Payload), + {ok, MS5a} = rabbit_mixed_queue:publish(Msg, MS5), + MS5a + end, MS4, lists:seq(1,10)), + 30 = rabbit_mixed_queue:length(MS6), + io:format("Published a mixture of messages~n"), + {ok, _MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + {ok, MS8} = rabbit_mixed_queue:start_link(q, true, mixed), + 30 = rabbit_mixed_queue:length(MS8), + io:format("Recovered queue~n"), + MS10 = + lists:foldl( + fun (N, MS9) -> + Rem = 30 - N, + {{#basic_message { is_persistent = true }, + false, _AckTag, Rem}, + MS9a} = rabbit_mixed_queue:deliver(MS9), + MS9a + end, MS8, lists:seq(1,10)), + io:format("Delivered initial non persistent messages~n"), + {ok, _MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + {ok, MS12} = rabbit_mixed_queue:start_link(q, true, mixed), + 30 = rabbit_mixed_queue:length(MS12), + io:format("Recovered queue~n"), + {MS14, AckTags} = + lists:foldl( + fun (N, {MS13, AcksAcc}) -> + Rem = 30 - N, + IsDelivered = N < 11, + {{#basic_message { is_persistent = true }, + IsDelivered, AckTag, Rem}, + MS13a} = rabbit_mixed_queue:deliver(MS13), + {MS13a, [AckTag | AcksAcc]} + end, {MS2, []}, lists:seq(1,20)), + {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), + io:format("Delivered and acked initial non persistent messages~n"), + {ok, _MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15), + io:format("Converted to disk only mode~n"), + rdq_stop(), + rdq_start(), + {ok, MS17} = rabbit_mixed_queue:start_link(q, true, mixed), + 10 = rabbit_mixed_queue:length(MS17), + io:format("Recovered queue~n"), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
