summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-22 12:04:47 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-22 12:04:47 +0100
commit80572e52c3423b58ad4509564a64f9cf29a8035e (patch)
tree4dd9cb226e9978b2a45d5edd1f7522e4cc6056e4
parentbe7cce80e3b9f47c7b326bfb97864de53e6cf4c2 (diff)
parentce5ae4dc0d7857eb8748b847c90724bfb7ac7abb (diff)
downloadrabbitmq-server-git-80572e52c3423b58ad4509564a64f9cf29a8035e.tar.gz
merging in from bug19662
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_basic.erl13
-rw-r--r--src/rabbit_disk_queue.erl9
-rw-r--r--src/rabbit_mixed_queue.erl45
-rw-r--r--src/rabbit_tests.erl100
5 files changed, 154 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 19f6f308cd..b19ff7a014 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -820,10 +820,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end));
handle_cast({constrain, Constrain}, State = #q { mixed_state = MS }) ->
+ PendingMessages =
+ lists:flatten([Pending || #tx { pending_messages = Pending}
+ <- all_tx_record()]),
{ok, MS1} = (case Constrain of
- true -> fun rabbit_mixed_queue:to_disk_only_mode/1;
- false -> fun rabbit_mixed_queue:to_mixed_mode/1
- end)(MS),
+ true -> fun rabbit_mixed_queue:to_disk_only_mode/2;
+ false -> fun rabbit_mixed_queue:to_mixed_mode/2
+ end)(PendingMessages, MS),
noreply(State #q { mixed_state = MS1 }).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index f9a8f488af..63d6a4815d 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, message/5, delivery/4]).
+-export([publish/1, message/4, message/5, message/6, delivery/4]).
%%----------------------------------------------------------------------------
@@ -44,8 +44,10 @@
-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
message()).
--spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(), guid()) ->
- message()).
+-spec(message/5 :: (exchange_name(), routing_key(), binary(), binary(),
+ guid()) -> message()).
+-spec(message/6 :: (exchange_name(), routing_key(), binary(), binary(),
+ guid(), bool()) -> message()).
-endif.
@@ -69,6 +71,9 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, rabbit_guid:guid()).
message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) ->
+ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, false).
+
+message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId, IsPersistent) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = #'P_basic'{content_type = ContentTypeBin},
@@ -78,4 +83,4 @@ message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin, MsgId) ->
routing_key = RoutingKeyBin,
content = Content,
guid = MsgId,
- is_persistent = false}.
+ is_persistent = IsPersistent}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index adf5895168..bf2de56529 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -1055,11 +1055,11 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
{ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
ReadSeqId1 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, {next, true}}}),
- {atomic, {WriteSeqId1, Q}} =
+ {atomic, {WriteSeqId1, Q, State}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q},
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q, State},
MsgSeqIdsZipped)
end),
true = ets:insert(Sequences, {Q, ReadSeqId1, WriteSeqId1,
@@ -1068,7 +1068,7 @@ internal_requeue(Q, MsgSeqIds = [{_, {FirstSeqIdTo, _}}|_],
requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
{_NextMsgSeqId, {NextSeqIdTo, _NextNewIsDelivered}}},
- {ExpectedSeqIdTo, Q}) ->
+ {ExpectedSeqIdTo, Q, State}) ->
SeqIdTo1 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
NextSeqIdTo1 = find_next_seq_id(SeqIdTo1, NextSeqIdTo),
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId,
@@ -1084,7 +1084,8 @@ requeue_message({{{MsgId, SeqIdOrig}, {SeqIdTo, NewIsDelivered}},
write),
ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
end,
- {NextSeqIdTo1, Q}.
+ decrement_cache(MsgId, State),
+ {NextSeqIdTo1, Q, State}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index db27a3e3a7..e0f9d2f226 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -39,7 +39,7 @@
tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
length/1, is_empty/1, delete_queue/1]).
--export([to_disk_only_mode/1, to_mixed_mode/1, estimate_extra_memory/1]).
+-export([to_disk_only_mode/2, to_mixed_mode/2, estimate_extra_memory/1]).
-record(mqstate, { mode,
msg_buf,
@@ -81,6 +81,10 @@
-spec(length/1 :: (mqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (mqstate()) -> bool()).
+
+-spec(to_disk_only_mode/2 :: ([message()], mqstate()) -> okmqs()).
+-spec(to_mixed_mode/2 :: ([message()], mqstate()) -> okmqs()).
+
-spec(estimate_extra_memory/1 :: (mqstate()) -> non_neg_integer).
-endif.
@@ -91,7 +95,7 @@ init(Queue, IsDurable, disk) ->
is_durable = IsDurable, length = 0, memory_size = 0 });
init(Queue, IsDurable, mixed) ->
{ok, State} = init(Queue, IsDurable, disk),
- to_mixed_mode(State).
+ to_mixed_mode([], State).
size_of_message(
#basic_message { content = #content { payload_fragments_rev = Payload }}) ->
@@ -99,10 +103,11 @@ size_of_message(
SumAcc + size(Frag)
end, 0, Payload).
-to_disk_only_mode(State = #mqstate { mode = disk }) ->
+to_disk_only_mode(_TxnMessages, State = #mqstate { mode = disk }) ->
{ok, State};
-to_disk_only_mode(State =
- #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) ->
+to_disk_only_mode(TxnMessages, State =
+ #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ is_durable = IsDurable }) ->
rabbit_log:info("Converting queue to disk only mode: ~p~n", [Q]),
%% We enqueue _everything_ here. This means that should a message
%% already be in the disk queue we must remove it and add it back
@@ -137,12 +142,25 @@ to_disk_only_mode(State =
true ->
rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(Requeue))
end,
+ %% tx_publish txn messages. Some of these will have been already
+ %% published if they really are durable and persistent which is
+ %% why we can't just use our own tx_publish/2 function (would end
+ %% up publishing twice, so refcount would go wrong in disk_queue).
+ lists:foreach(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }) ->
+ ok = case IsDurable andalso IsPersistent of
+ true -> ok;
+ _ -> rabbit_disk_queue:tx_publish(Msg)
+ end
+ end, TxnMessages),
{ok,
State #mqstate { mode = disk, msg_buf = queue:new(), memory_size = Size }}.
-to_mixed_mode(State = #mqstate { mode = mixed }) ->
+to_mixed_mode(_TxnMessages, State = #mqstate { mode = mixed }) ->
{ok, State};
-to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
+to_mixed_mode(TxnMessages, State =
+ #mqstate { mode = disk, queue = Q, length = Length,
+ is_durable = IsDurable }) ->
rabbit_log:info("Converting queue to mixed mode: ~p~n", [Q]),
%% load up a new queue with everything that's on disk.
%% don't remove non-persistent messages that happen to be on disk
@@ -153,6 +171,19 @@ to_mixed_mode(State = #mqstate { mode = disk, queue = Q, length = Length }) ->
{Buf, L}) ->
{queue:in({Msg, IsDelivered, true}, Buf), L+1}
end, {queue:new(), 0}, QList),
+ %% remove txn messages from disk which are neither persistent and
+ %% durable. This is necessary to avoid leaks. This is also pretty
+ %% much the inverse behaviour of our own tx_cancel/2 which is why
+ %% we're not using it.
+ Cancel =
+ lists:foldl(
+ fun (Msg = #basic_message { is_persistent = IsPersistent }, Acc) ->
+ case IsDurable andalso IsPersistent of
+ true -> Acc;
+ _ -> [Msg #basic_message.guid | Acc]
+ end
+ end, [], TxnMessages),
+ ok = rabbit_disk_queue:tx_cancel(lists:reverse(Cancel)),
{ok, State #mqstate { mode = mixed, msg_buf = MsgBuf1, memory_size = 0 }}.
purge_non_persistent_messages(State = #mqstate { mode = disk, queue = Q,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2defca6446..7d74968b9c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -696,6 +696,7 @@ test_disk_queue() ->
passed = rdq_test_purge(),
passed = rdq_test_dump_queue(),
passed = rdq_test_mixed_queue_modes(),
+ passed = rdq_test_mode_conversion_mid_txn(),
rdq_virgin(),
ok = control_action(stop_app, []),
ok = control_action(start_app, []),
@@ -1003,11 +1004,11 @@ rdq_test_mixed_queue_modes() ->
30 = rabbit_mixed_queue:length(MS6),
io:format("Published a mixture of messages; ~w~n",
[rabbit_mixed_queue:estimate_extra_memory(MS6)]),
- {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode(MS6),
+ {ok, MS7} = rabbit_mixed_queue:to_disk_only_mode([], MS6),
30 = rabbit_mixed_queue:length(MS7),
io:format("Converted to disk only mode; ~w~n",
[rabbit_mixed_queue:estimate_extra_memory(MS7)]),
- {ok, MS8} = rabbit_mixed_queue:to_mixed_mode(MS7),
+ {ok, MS8} = rabbit_mixed_queue:to_mixed_mode([], MS7),
30 = rabbit_mixed_queue:length(MS8),
io:format("Converted to mixed mode; ~w~n",
[rabbit_mixed_queue:estimate_extra_memory(MS8)]),
@@ -1022,7 +1023,7 @@ rdq_test_mixed_queue_modes() ->
end, MS8, lists:seq(1,10)),
20 = rabbit_mixed_queue:length(MS10),
io:format("Delivered initial non persistent messages~n"),
- {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode(MS10),
+ {ok, MS11} = rabbit_mixed_queue:to_disk_only_mode([], MS10),
20 = rabbit_mixed_queue:length(MS11),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1042,7 +1043,7 @@ rdq_test_mixed_queue_modes() ->
0 = rabbit_mixed_queue:length(MS14),
{ok, MS15} = rabbit_mixed_queue:ack(AckTags, MS14),
io:format("Delivered and acked all messages~n"),
- {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode(MS15),
+ {ok, MS16} = rabbit_mixed_queue:to_disk_only_mode([], MS15),
0 = rabbit_mixed_queue:length(MS16),
io:format("Converted to disk only mode~n"),
rdq_stop(),
@@ -1054,6 +1055,92 @@ rdq_test_mixed_queue_modes() ->
rdq_stop(),
passed.
+rdq_test_mode_conversion_mid_txn() ->
+ Payload = <<0:(8*256)>>,
+ MsgIdsA = lists:seq(0,9),
+ MsgsA = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId,
+ (0 == MsgId rem 2))
+ || MsgId <- MsgIdsA ],
+ MsgIdsB = lists:seq(10,20),
+ MsgsB = [ rabbit_basic:message(x, <<>>, <<>>, Payload, MsgId,
+ (0 == MsgId rem 2))
+ || MsgId <- MsgIdsB ],
+
+ rdq_virgin(),
+ rdq_start(),
+ {ok, MS0} = rabbit_mixed_queue:init(q, true, mixed),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS0, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, commit),
+
+ rdq_stop_virgin_start(),
+ {ok, MS1} = rabbit_mixed_queue:init(q, true, mixed),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS1, MsgsA, MsgsB, fun rabbit_mixed_queue:to_disk_only_mode/2, cancel),
+
+
+ rdq_stop_virgin_start(),
+ {ok, MS2} = rabbit_mixed_queue:init(q, true, disk),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS2, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, commit),
+
+ rdq_stop_virgin_start(),
+ {ok, MS3} = rabbit_mixed_queue:init(q, true, disk),
+ passed = rdq_tx_publish_mixed_alter_commit_get(
+ MS3, MsgsA, MsgsB, fun rabbit_mixed_queue:to_mixed_mode/2, cancel),
+
+ rdq_stop(),
+ passed.
+
+rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, ChangeFun, CommitOrCancel) ->
+ 0 = rabbit_mixed_queue:length(MS0),
+ MS2 = lists:foldl(
+ fun (Msg, MS1) ->
+ {ok, MS1a} = rabbit_mixed_queue:publish(Msg, MS1),
+ MS1a
+ end, MS0, MsgsA),
+ Len0 = length(MsgsA),
+ Len0 = rabbit_mixed_queue:length(MS2),
+ MS4 = lists:foldl(
+ fun (Msg, MS3) ->
+ {ok, MS3a} = rabbit_mixed_queue:tx_publish(Msg, MS3),
+ MS3a
+ end, MS2, MsgsB),
+ Len0 = rabbit_mixed_queue:length(MS4),
+ {ok, MS5} = ChangeFun(MsgsB, MS4),
+ Len0 = rabbit_mixed_queue:length(MS5),
+ {ok, MS9} =
+ case CommitOrCancel of
+ commit ->
+ {ok, MS6} = rabbit_mixed_queue:tx_commit(MsgsB, [], MS5),
+ Len1 = Len0 + length(MsgsB),
+ Len1 = rabbit_mixed_queue:length(MS6),
+ {AckTags, MS8} =
+ lists:foldl(
+ fun (Msg, {Acc, MS7}) ->
+ Rem = Len1 - (Msg #basic_message.guid) - 1,
+ {{Msg, false, AckTag, Rem}, MS7a} =
+ rabbit_mixed_queue:deliver(MS7),
+ {[AckTag | Acc], MS7a}
+ end, {[], MS6}, MsgsA ++ MsgsB),
+ 0 = rabbit_mixed_queue:length(MS8),
+ rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8);
+ cancel ->
+ {ok, MS6} = rabbit_mixed_queue:tx_cancel(MsgsB, MS5),
+ Len0 = rabbit_mixed_queue:length(MS6),
+ {AckTags, MS8} =
+ lists:foldl(
+ fun (Msg, {Acc, MS7}) ->
+ Rem = Len0 - (Msg #basic_message.guid) - 1,
+ {{Msg, false, AckTag, Rem}, MS7a} =
+ rabbit_mixed_queue:deliver(MS7),
+ {[AckTag | Acc], MS7a}
+ end, {[], MS6}, MsgsA),
+ 0 = rabbit_mixed_queue:length(MS8),
+ rabbit_mixed_queue:ack(lists:reverse(AckTags), MS8)
+ end,
+ 0 = rabbit_mixed_queue:length(MS9),
+ passed.
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).
@@ -1072,3 +1159,8 @@ rdq_start() ->
rdq_stop() ->
rabbit_disk_queue:stop(),
timer:sleep(1000).
+
+rdq_stop_virgin_start() ->
+ rdq_stop(),
+ rdq_virgin(),
+ rdq_start().