diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 11:03:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 11:03:22 +0100 |
| commit | a12101207fd9b8a33e4d25f6edf9ce92df235a39 (patch) | |
| tree | 86f8c0acc6451b8ab4f53188ee6b3d350bddbb43 | |
| parent | 1fba9f2f33d22bef23e485e8b16085b9115adf1f (diff) | |
| download | rabbitmq-server-git-a12101207fd9b8a33e4d25f6edf9ce92df235a39.tar.gz | |
As Matthias spotted, if we crash in the middle of a mixed -> disk transition then we potentially have messages in the wrong order.
To fix this, we push a marker message into the queue before the transition. When the transition completes, we find that that marker message is at the head of the queue. If we are unlucky enough to crash during the transition then on recovery, we have to foldl through the entire queue anyway, so we keep our eyes open for the marker message, and should we see it, we simply move all that is before the marker message to the end of the queue, and then remove the marker. By avoiding putting any numbers into the queue, we neatly sidestep the issue of the disk_queue deleting all non-persistent messages on startup.
This has been tested by merging into 21444, then, erlang client:
α> Conn = amqp_connection:start_network(#amqp_params{}), Chan = amqp_connection:open_channel(Conn).
β> [begin Q1 = list_to_binary(integer_to_list(R)), #'queue.declare_ok'{queue = Q1} = amqp_channel:call(Chan, #'queue.declare'{queue=Q1, durable=true}) end || R <- lists:seq(1,100) ].
γ> [begin Q1 = list_to_binary(integer_to_list(R)), ok = amqp_channel:call(Chan, #'basic.publish'{routing_key = Q1}, #amqp_msg{props = (amqp_util:basic_properties())#'P_basic'{delivery_mode=2}, payload = << 0 : 1024 >>}), ok = amqp_channel:call(Chan, #'basic.publish'{routing_key = Q1}, #amqp_msg{props = (amqp_util:basic_properties()), payload = << 1 : 1024>>}) end || _ <- lists:seq(1,1000), R <- lists:seq(1,100) ].
Then, when that lot's done, get hold of the pid of rabbit and prepare a kill -9 $rabbitpid
δ) In another shell, do:
for t in $(seq 1 100); do ./scripts/rabbitmqctl pin_queue_to_disk $t ; done
ε) When you get to about 50, kill rabbit.
ζ) Then start up with just make run-node
η) In another shell make start-cover
θ) In Rabbit's erlang shell rabbit:start().
ι) When all started up, in the other shell make stop-cover.
j) Check the lines hit in mixed_queue:init/2.
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 45 |
1 files changed, 39 insertions, 6 deletions
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index ae5f771f23..fcd966e939 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -104,13 +104,26 @@ init(Queue, IsDurable) -> Len = rabbit_disk_queue:len(Queue), MsgBuf = inc_queue_length(queue:new(), Len), - Size = rabbit_disk_queue:foldl( + {Size, MarkerFound, MarkerCount} = rabbit_disk_queue:foldl( fun (Msg = #basic_message { is_persistent = true }, - _AckTag, _IsDelivered, Acc) -> - Acc + size_of_message(Msg) - end, 0, Queue), + _AckTag, _IsDelivered, {SizeAcc, MFound, MCount}) -> + SizeAcc1 = SizeAcc + size_of_message(Msg), + case {MFound, is_magic_marker_message(Msg)} of + {false, false} -> {SizeAcc1, false, MCount + 1}; + {false, true} -> {SizeAcc1, true, MCount}; + {true, false} -> {SizeAcc1, true, MCount} + end + end, {0, false, 0}, Queue), + Len1 = case MarkerFound of + false -> Len; + true -> + ok = rabbit_disk_queue:requeue_next_n(Queue, MarkerCount), + Len2 = Len - 1, + {ok, Len2} = fetch_ack_magic_marker_message(Queue), + Len2 + end, {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, - is_durable = IsDurable, length = Len, + is_durable = IsDurable, length = Len1, memory_size = Size, memory_gain = undefined, memory_loss = undefined, prefetcher = undefined }}. @@ -339,7 +352,7 @@ is_empty(#mqstate { length = Length }) -> set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> {ok, State}; set_storage_mode(disk, TxnMessages, State = - #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, + #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, length = Length, is_durable = IsDurable, prefetcher = Prefetcher }) -> State1 = State #mqstate { mode = disk }, MsgBuf1 = @@ -359,8 +372,10 @@ set_storage_mode(disk, TxnMessages, State = %% message on disk. %% Note we also batch together messages on disk so that we minimise %% the calls to requeue. + ok = publish_magic_marker_message(Q), {ok, MsgBuf3} = send_messages_to_disk(IsDurable, Q, MsgBuf1, 0, 0, [], [], queue:new()), + {ok, Length} = fetch_ack_magic_marker_message(Q), %% tx_publish txn messages. Some of these will have been already %% published if they really are durable and persistent which is %% why we can't just use our own tx_publish/2 function (would end @@ -551,3 +566,21 @@ on_disk(disk, _IsDurable, _IsPersistent) -> true; on_disk(mixed, true, true) -> true; on_disk(mixed, _IsDurable, _IsPersistent) -> false. +publish_magic_marker_message(Q) -> + Msg = rabbit_basic:message( + none, internal, [], <<>>, rabbit_guid:guid(), true), + ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false). + +fetch_ack_magic_marker_message(Q) -> + {#basic_message { exchange_name = none, routing_key = internal, + is_persistent = true }, + false, AckTag, Length} = rabbit_disk_queue:fetch(Q), + ok = rabbit_disk_queue:ack(Q, [AckTag]), + {ok, Length}. + +is_magic_marker_message( + #basic_message { exchange_name = none, routing_key = internal, + is_persistent = true }) -> + true; +is_magic_marker_message(_) -> + false. |
