diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 13:05:18 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 13:05:18 +0000 |
| commit | b02c6c8792fde10065127fbec9e5727fda222fc8 (patch) | |
| tree | f306a3e709937f461ce7fe1c9a302150ddf16ce5 | |
| parent | 1ffd85e580b5d5e8e7bb6b7123126933aa1cf168 (diff) | |
| parent | a821bd1fccef580bca7fceaaaeb697e924840c89 (diff) | |
| download | rabbitmq-server-git-b02c6c8792fde10065127fbec9e5727fda222fc8.tar.gz | |
Merge default into bug 23554
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 17 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 28 |
9 files changed, 73 insertions, 54 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index fccfad9708..9a74503c1c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -54,7 +54,7 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid}). + arguments, pid, extra_pids}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f67c6f46d1..0f831a7d3d 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -44,23 +44,24 @@ -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> - state()). +-spec(init/2 :: (rabbit_types:amqqueue(), attempt_recovery()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). --spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) +-spec(publish/4 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) + -> state()). +-spec(publish_delivered/5 :: (ack_required(), rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> {ack(), state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). +-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 70d8f2dda7..e322e844e1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -206,12 +206,13 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none, + extra_pids = []}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -501,11 +502,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + extra_pids = []}. safe_delegate_call_ok(F, Pids) -> {_, Bad} = delegate:invoke(Pids, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25859c22f9..cecc85d0a5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -116,12 +116,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -154,8 +153,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -166,7 +164,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(QName, IsDurable, Recover), + BQS = BQ:init(Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -226,6 +224,13 @@ next_state(State) -> false -> {stop_sync_timer(State2), hibernate} end. +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Nodes -> rabbit_mirror_queue_master + end. + ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, @@ -493,7 +498,7 @@ attempt_delivery(#delivery{txn = none, AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ needs_confirming = NeedsConfirming}, - BQS), + ChPid, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, @@ -504,9 +509,9 @@ attempt_delivery(#delivery{txn = Txn, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + {true, State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, + ChPid, BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -517,7 +522,7 @@ deliver_or_enqueue(Delivery, State) -> BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = (MsgSeqNo =/= undefined)}, - BQS), + Delivery #delivery.sender, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 352e76fd0c..d04944f946 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -48,7 +48,7 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 3}, + {init, 2}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -62,12 +62,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Drop messages from the head of the queue while the supplied %% predicate returns true. @@ -81,7 +81,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index d49c072ca2..a4ad7fbce8 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -117,7 +117,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, extra_pids = EPids}] -> + EPids ++ [QPid | QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b9edad9a3a..1142eb8b6d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1835,7 +1835,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1853,9 +1853,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, + VQ = rabbit_variable_queue:init(test_amqqueue(true), false, fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, @@ -1914,7 +1918,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -2031,7 +2035,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2048,7 +2052,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2056,7 +2060,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = 2}, <<>>), @@ -2079,7 +2083,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, + VQ1 = rabbit_variable_queue:init(Q, true, fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 548014be3e..bc1f9d7e4f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -137,7 +137,8 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + extra_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0db5116559..d1da2c8917 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,9 +31,9 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/2, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -42,7 +42,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/4]). %%---------------------------------------------------------------------------- %% Definitions: @@ -409,13 +409,14 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> +init(Queue, Recover) -> Self = self(), - init(QueueName, IsDurable, Recover, + init(Queue, Recover, fun (Guids) -> msgs_written_to_disk(Self, Guids) end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], case IsDurable of @@ -425,7 +426,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName }, true, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -515,16 +517,18 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _MsgProps, _ChPid, + State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -665,8 +669,8 @@ ack(AckTags, State) -> {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of |
