summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-27 11:03:22 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-27 11:03:22 +0100
commita12101207fd9b8a33e4d25f6edf9ce92df235a39 (patch)
tree86f8c0acc6451b8ab4f53188ee6b3d350bddbb43
parent1fba9f2f33d22bef23e485e8b16085b9115adf1f (diff)
downloadrabbitmq-server-git-a12101207fd9b8a33e4d25f6edf9ce92df235a39.tar.gz
As Matthias spotted, if we crash in the middle of a mixed -> disk transition then we potentially have messages in the wrong order.
To fix this, we push a marker message into the queue before the transition. When the transition completes, we find that that marker message is at the head of the queue. If we are unlucky enough to crash during the transition then on recovery, we have to foldl through the entire queue anyway, so we keep our eyes open for the marker message, and should we see it, we simply move all that is before the marker message to the end of the queue, and then remove the marker. By avoiding putting any numbers into the queue, we neatly sidestep the issue of the disk_queue deleting all non-persistent messages on startup. This has been tested by merging into 21444, then, erlang client: α> Conn = amqp_connection:start_network(#amqp_params{}), Chan = amqp_connection:open_channel(Conn). β> [begin Q1 = list_to_binary(integer_to_list(R)), #'queue.declare_ok'{queue = Q1} = amqp_channel:call(Chan, #'queue.declare'{queue=Q1, durable=true}) end || R <- lists:seq(1,100) ]. γ> [begin Q1 = list_to_binary(integer_to_list(R)), ok = amqp_channel:call(Chan, #'basic.publish'{routing_key = Q1}, #amqp_msg{props = (amqp_util:basic_properties())#'P_basic'{delivery_mode=2}, payload = << 0 : 1024 >>}), ok = amqp_channel:call(Chan, #'basic.publish'{routing_key = Q1}, #amqp_msg{props = (amqp_util:basic_properties()), payload = << 1 : 1024>>}) end || _ <- lists:seq(1,1000), R <- lists:seq(1,100) ]. Then, when that lot's done, get hold of the pid of rabbit and prepare a kill -9 $rabbitpid δ) In another shell, do: for t in $(seq 1 100); do ./scripts/rabbitmqctl pin_queue_to_disk $t ; done ε) When you get to about 50, kill rabbit. ζ) Then start up with just make run-node η) In another shell make start-cover θ) In Rabbit's erlang shell rabbit:start(). ι) When all started up, in the other shell make stop-cover. j) Check the lines hit in mixed_queue:init/2.
-rw-r--r--src/rabbit_mixed_queue.erl45
1 files changed, 39 insertions, 6 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index ae5f771f23..fcd966e939 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -104,13 +104,26 @@
init(Queue, IsDurable) ->
Len = rabbit_disk_queue:len(Queue),
MsgBuf = inc_queue_length(queue:new(), Len),
- Size = rabbit_disk_queue:foldl(
+ {Size, MarkerFound, MarkerCount} = rabbit_disk_queue:foldl(
fun (Msg = #basic_message { is_persistent = true },
- _AckTag, _IsDelivered, Acc) ->
- Acc + size_of_message(Msg)
- end, 0, Queue),
+ _AckTag, _IsDelivered, {SizeAcc, MFound, MCount}) ->
+ SizeAcc1 = SizeAcc + size_of_message(Msg),
+ case {MFound, is_magic_marker_message(Msg)} of
+ {false, false} -> {SizeAcc1, false, MCount + 1};
+ {false, true} -> {SizeAcc1, true, MCount};
+ {true, false} -> {SizeAcc1, true, MCount}
+ end
+ end, {0, false, 0}, Queue),
+ Len1 = case MarkerFound of
+ false -> Len;
+ true ->
+ ok = rabbit_disk_queue:requeue_next_n(Queue, MarkerCount),
+ Len2 = Len - 1,
+ {ok, Len2} = fetch_ack_magic_marker_message(Queue),
+ Len2
+ end,
{ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
- is_durable = IsDurable, length = Len,
+ is_durable = IsDurable, length = Len1,
memory_size = Size, memory_gain = undefined,
memory_loss = undefined, prefetcher = undefined }}.
@@ -339,7 +352,7 @@ is_empty(#mqstate { length = Length }) ->
set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
{ok, State};
set_storage_mode(disk, TxnMessages, State =
- #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf,
+ #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, length = Length,
is_durable = IsDurable, prefetcher = Prefetcher }) ->
State1 = State #mqstate { mode = disk },
MsgBuf1 =
@@ -359,8 +372,10 @@ set_storage_mode(disk, TxnMessages, State =
%% message on disk.
%% Note we also batch together messages on disk so that we minimise
%% the calls to requeue.
+ ok = publish_magic_marker_message(Q),
{ok, MsgBuf3} =
send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()),
+ {ok, Length} = fetch_ack_magic_marker_message(Q),
%% tx_publish txn messages. Some of these will have been already
%% published if they really are durable and persistent which is
%% why we can't just use our own tx_publish/2 function (would end
@@ -551,3 +566,21 @@ on_disk(disk, _IsDurable, _IsPersistent) -> true;
on_disk(mixed, true, true) -> true;
on_disk(mixed, _IsDurable, _IsPersistent) -> false.
+publish_magic_marker_message(Q) ->
+ Msg = rabbit_basic:message(
+ none, internal, [], <<>>, rabbit_guid:guid(), true),
+ ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false).
+
+fetch_ack_magic_marker_message(Q) ->
+ {#basic_message { exchange_name = none, routing_key = internal,
+ is_persistent = true },
+ false, AckTag, Length} = rabbit_disk_queue:fetch(Q),
+ ok = rabbit_disk_queue:ack(Q, [AckTag]),
+ {ok, Length}.
+
+is_magic_marker_message(
+ #basic_message { exchange_name = none, routing_key = internal,
+ is_persistent = true }) ->
+ true;
+is_magic_marker_message(_) ->
+ false.