diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-22 12:04:47 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-22 12:04:47 +0100 |
| commit | 80572e52c3423b58ad4509564a64f9cf29a8035e (patch) | |
| tree | 4dd9cb226e9978b2a45d5edd1f7522e4cc6056e4 | |
| parent | be7cce80e3b9f47c7b326bfb97864de53e6cf4c2 (diff) | |
| parent | ce5ae4dc0d7857eb8748b847c90724bfb7ac7abb (diff) | |
| download | rabbitmq-server-git-80572e52c3423b58ad4509564a64f9cf29a8035e.tar.gz | |
merging in from bug19662
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 100 |
5 files changed, 154 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19f6f308cd..b19ff7a014 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -820,10 +820,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end)); handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) -> + PendingMessages = + lists:flatten([Pending || #tx { pending_messages = Pending} + <- all_tx_record()]), {ok, MS1} = (case Constrain of - true -> fun rabbit_mixed_queue:to_disk_only_mode/1; - false -> fun rabbit_mixed_queue:to_mixed_mode/1 - end)(MS), + true -> fun rabbit_mixed_queue:to_disk_only_mode/2; + false -> fun rabbit_mixed_queue:to_mixed_mode/2 + end)(PendingMessages, MS), noreply(State #q { mixed_state = MS1 }). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index f9a8f488af..63d6a4815d 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, message/5, delivery/4]). +-export([publish/1, message/4, message/5, message/6, delivery/4]). %%---------------------------------------------------------------------------- @@ -44,8 +44,10 @@ -spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> message()). --spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), guid()) -> - message()). +-spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), + guid()) -> message()). +-spec(message/6 :: (exchange_name(), routing_key(), binary(), binary(), + guid(), bool()) -> message()). -endif. @@ -69,6 +71,9 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()). message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) -> + message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, false). + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersistent) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -78,4 +83,4 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) -> routing_key = RoutingKeyBin, content = Content, guid = MsgId, - is_persistent = false}. + is_persistent = IsPersistent}. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index adf5895168..bf2de56529 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1055,11 +1055,11 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q), ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}), - {atomic, {WriteSeqId1, Q}} = + {atomic, {WriteSeqId1, Q, State}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun requeue_message/2, {WriteSeqId, Q}, + lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State}, MsgSeqIdsZipped) end), true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1, @@ -1068,7 +1068,7 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_], requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, {_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}}, - {ExpectedSeqIdTo, Q}) -> + {ExpectedSeqIdTo, Q, State}) -> SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo), [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId, @@ -1084,7 +1084,8 @@ requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}}, write), ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write) end, - {NextSeqIdTo1, Q}. + decrement_cache(MsgId, State), + {NextSeqIdTo1, Q, State}. internal_purge(Q, State = #dqstate { sequences = Sequences }) -> case ets:lookup(Sequences, Q) of diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index db27a3e3a7..e0f9d2f226 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -39,7 +39,7 @@ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, length/1, is_empty/1, delete_queue/1]). --export([to_disk_only_mode/1, to_mixed_mode/1, estimate_extra_memory/1]). +-export([to_disk_only_mode/2, to_mixed_mode/2, estimate_extra_memory/1]). -record(mqstate, { mode, msg_buf, @@ -81,6 +81,10 @@ -spec(length/1 :: (mqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (mqstate()) -> bool()). + +-spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()). +-spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()). + -spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer). -endif. @@ -91,7 +95,7 @@ init(Queue, IsDurable, disk) -> is_durable = IsDurable, length = 0, memory_size = 0 }); init(Queue, IsDurable, mixed) -> {ok, State} = init(Queue, IsDurable, disk), - to_mixed_mode(State). + to_mixed_mode([], State). size_of_message( #basic_message { content = #content { payload_fragments_rev = Payload }}) -> @@ -99,10 +103,11 @@ size_of_message( SumAcc + size(Frag) end, 0, Payload). -to_disk_only_mode(State = #mqstate { mode = disk }) -> +to_disk_only_mode(_TxnMessages, State = #mqstate { mode = disk }) -> {ok, State}; -to_disk_only_mode(State = - #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) -> +to_disk_only_mode(TxnMessages, State = + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + is_durable = IsDurable }) -> rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]), %% We enqueue _everything_ here. This means that should a message %% already be in the disk queue we must remove it and add it back @@ -137,12 +142,25 @@ to_disk_only_mode(State = true -> rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue)) end, + %% tx_publish txn messages. Some of these will have been already + %% published if they really are durable and persistent which is + %% why we can't just use our own tx_publish/2 function (would end + %% up publishing twice, so refcount would go wrong in disk_queue). + lists:foreach( + fun (Msg = #basic_message { is_persistent = IsPersistent }) -> + ok = case IsDurable andalso IsPersistent of + true -> ok; + _ -> rabbit_disk_queue:tx_publish(Msg) + end + end, TxnMessages), {ok, State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}. -to_mixed_mode(State = #mqstate { mode = mixed }) -> +to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) -> {ok, State}; -to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> +to_mixed_mode(TxnMessages, State = + #mqstate { mode = disk, queue = Q, length = Length, + is_durable = IsDurable }) -> rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]), %% load up a new queue with everything that's on disk. %% don't remove non-persistent messages that happen to be on disk @@ -153,6 +171,19 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) -> {Buf, L}) -> {queue:in({Msg, IsDelivered, true}, Buf), L+1} end, {queue:new(), 0}, QList), + %% remove txn messages from disk which are neither persistent and + %% durable. This is necessary to avoid leaks. This is also pretty + %% much the inverse behaviour of our own tx_cancel/2 which is why + %% we're not using it. + Cancel = + lists:foldl( + fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) -> + case IsDurable andalso IsPersistent of + true -> Acc; + _ -> [Msg #basic_message.guid | Acc] + end + end, [], TxnMessages), + ok = rabbit_disk_queue:tx_cancel(lists:reverse(Cancel)), {ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}. purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 2defca6446..7d74968b9c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -696,6 +696,7 @@ test_disk_queue() -> passed = rdq_test_purge(), passed = rdq_test_dump_queue(), passed = rdq_test_mixed_queue_modes(), + passed = rdq_test_mode_conversion_mid_txn(), rdq_virgin(), ok = control_action(stop_app, []), ok = control_action(start_app, []), @@ -1003,11 +1004,11 @@ rdq_test_mixed_queue_modes() -> 30 = rabbit_mixed_queue:length(MS6), io:format("Published a mixture of messages; ~w~n", [rabbit_mixed_queue:estimate_extra_memory(MS6)]), - {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6), + {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6), 30 = rabbit_mixed_queue:length(MS7), io:format("Converted to disk only mode; ~w~n", [rabbit_mixed_queue:estimate_extra_memory(MS7)]), - {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7), + {ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7), 30 = rabbit_mixed_queue:length(MS8), io:format("Converted to mixed mode; ~w~n", [rabbit_mixed_queue:estimate_extra_memory(MS8)]), @@ -1022,7 +1023,7 @@ rdq_test_mixed_queue_modes() -> end, MS8, lists:seq(1,10)), 20 = rabbit_mixed_queue:length(MS10), io:format("Delivered initial non persistent messages~n"), - {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10), + {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode([], MS10), 20 = rabbit_mixed_queue:length(MS11), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1042,7 +1043,7 @@ rdq_test_mixed_queue_modes() -> 0 = rabbit_mixed_queue:length(MS14), {ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14), io:format("Delivered and acked all messages~n"), - {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15), + {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode([], MS15), 0 = rabbit_mixed_queue:length(MS16), io:format("Converted to disk only mode~n"), rdq_stop(), @@ -1054,6 +1055,92 @@ rdq_test_mixed_queue_modes() -> rdq_stop(), passed. +rdq_test_mode_conversion_mid_txn() -> + Payload = <<0:(8*256)>>, + MsgIdsA = lists:seq(0,9), + MsgsA = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId, + (0 == MsgId rem 2)) + || MsgId <- MsgIdsA ], + MsgIdsB = lists:seq(10,20), + MsgsB = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId, + (0 == MsgId rem 2)) + || MsgId <- MsgIdsB ], + + rdq_virgin(), + rdq_start(), + {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit), + + rdq_stop_virgin_start(), + {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel), + + + rdq_stop_virgin_start(), + {ok, MS2} = rabbit_mixed_queue:init(q, true, disk), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit), + + rdq_stop_virgin_start(), + {ok, MS3} = rabbit_mixed_queue:init(q, true, disk), + passed = rdq_tx_publish_mixed_alter_commit_get( + MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel), + + rdq_stop(), + passed. + +rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCancel) -> + 0 = rabbit_mixed_queue:length(MS0), + MS2 = lists:foldl( + fun (Msg, MS1) -> + {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1), + MS1a + end, MS0, MsgsA), + Len0 = length(MsgsA), + Len0 = rabbit_mixed_queue:length(MS2), + MS4 = lists:foldl( + fun (Msg, MS3) -> + {ok, MS3a} = rabbit_mixed_queue:tx_publish(Msg, MS3), + MS3a + end, MS2, MsgsB), + Len0 = rabbit_mixed_queue:length(MS4), + {ok, MS5} = ChangeFun(MsgsB, MS4), + Len0 = rabbit_mixed_queue:length(MS5), + {ok, MS9} = + case CommitOrCancel of + commit -> + {ok, MS6} = rabbit_mixed_queue:tx_commit(MsgsB, [], MS5), + Len1 = Len0 + length(MsgsB), + Len1 = rabbit_mixed_queue:length(MS6), + {AckTags, MS8} = + lists:foldl( + fun (Msg, {Acc, MS7}) -> + Rem = Len1 - (Msg #basic_message.guid) - 1, + {{Msg, false, AckTag, Rem}, MS7a} = + rabbit_mixed_queue:deliver(MS7), + {[AckTag | Acc], MS7a} + end, {[], MS6}, MsgsA ++ MsgsB), + 0 = rabbit_mixed_queue:length(MS8), + rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8); + cancel -> + {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5), + Len0 = rabbit_mixed_queue:length(MS6), + {AckTags, MS8} = + lists:foldl( + fun (Msg, {Acc, MS7}) -> + Rem = Len0 - (Msg #basic_message.guid) - 1, + {{Msg, false, AckTag, Rem}, MS7a} = + rabbit_mixed_queue:deliver(MS7), + {[AckTag | Acc], MS7a} + end, {[], MS6}, MsgsA), + 0 = rabbit_mixed_queue:length(MS8), + rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8) + end, + 0 = rabbit_mixed_queue:length(MS9), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). @@ -1072,3 +1159,8 @@ rdq_start() -> rdq_stop() -> rabbit_disk_queue:stop(), timer:sleep(1000). + +rdq_stop_virgin_start() -> + rdq_stop(), + rdq_virgin(), + rdq_start(). |
