diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-08 16:05:08 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-08 16:05:08 +0100 |
| commit | 332d920fa5d8c1ba3adb249f638ff07587bdcbf4 (patch) | |
| tree | 18a6b2979669e00e446858a43b6e62937f0bc3e4 | |
| parent | 23e259db14f2cd45c2d76a157a96f3e166febb9d (diff) | |
| download | rabbitmq-server-git-332d920fa5d8c1ba3adb249f638ff07587bdcbf4.tar.gz | |
Pluggable queues land
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | include/rabbit_internal_queue_type_spec.hrl | 55 | ||||
| -rw-r--r-- | include/rabbit_queue.hrl | 44 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_internal_queue_type.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 61 |
7 files changed, 202 insertions, 108 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 035fa0547d..a481af0863 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -19,6 +19,7 @@ {ssl_options, []}, {vm_memory_high_watermark, 0.4}, {msg_store_index_module, rabbit_msg_store_ets_index}, + {queue_internal_queue_module, rabbit_variable_queue}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_vhost, <<"/">>}, diff --git a/include/rabbit_internal_queue_type_spec.hrl b/include/rabbit_internal_queue_type_spec.hrl new file mode 100644 index 0000000000..6409efb6f8 --- /dev/null +++ b/include/rabbit_internal_queue_type_spec.hrl @@ -0,0 +1,55 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-spec(init/2 :: (queue_name(), pid() | atom()) -> state()). +-spec(terminate/1 :: (state()) -> state()). +-spec(publish/2 :: (basic_message(), state()) -> state()). +-spec(publish_delivered/2 :: (basic_message(), state()) -> {ack(), state()}). +-spec(set_queue_ram_duration_target/2 :: + (('undefined' | 'infinity' | number()), state()) -> state()). +-spec(remeasure_rates/1 :: (state()) -> state()). +-spec(ram_duration/1 :: (state()) -> number()). +-spec(fetch/1 :: (state()) -> + {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}), + state()}). +-spec(ack/2 :: ([ack()], state()) -> state()). +-spec(len/1 :: (state()) -> non_neg_integer()). +-spec(is_empty/1 :: (state()) -> boolean()). +-spec(purge/1 :: (state()) -> {non_neg_integer(), state()}). +-spec(delete_and_terminate/1 :: (state()) -> state()). +-spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()). +-spec(tx_publish/2 :: (basic_message(), state()) -> state()). +-spec(tx_rollback/2 :: ([msg_id()], state()) -> state()). +-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, state()) -> + {boolean(), state()}). +-spec(needs_sync/1 :: (state()) -> ('undefined' | {atom(), [any()]})). +-spec(handle_pre_hibernate/1 :: (state()) -> state()). +-spec(status/1 :: (state()) -> [{atom(), any()}]). diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl deleted file mode 100644 index 66966ba80b..0000000000 --- a/include/rabbit_queue.hrl +++ /dev/null @@ -1,44 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --record(delta, - { start_seq_id, - count, - end_seq_id %% note the end_seq_id is always >, not >= - }). - --ifdef(use_specs). - --type(delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer (), - end_seq_id :: non_neg_integer() }). - --endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 33ea625c55..4c42b0ef84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -114,11 +114,14 @@ init(Q) -> rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), + {ok, InternalQueueModule} = + application:get_env(queue_internal_queue_module), + {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - internal_queue = rabbit_variable_queue, + internal_queue = InternalQueueModule, internal_queue_state = undefined, internal_queue_timeout_fun = undefined, next_msg_id = 1, @@ -387,7 +390,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - {_SeqId, IQS} = IQ:publish(Message, State #q.internal_queue_state), + IQS = IQ:publish(Message, State #q.internal_queue_state), {false, NewState #q { internal_queue_state = IQS }} end. diff --git a/src/rabbit_internal_queue_type.erl b/src/rabbit_internal_queue_type.erl new file mode 100644 index 0000000000..4ee4556a48 --- /dev/null +++ b/src/rabbit_internal_queue_type.erl @@ -0,0 +1,97 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_internal_queue_type). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% Called with queue name and the persistent msg_store to + %% use. Transient store is in ?TRANSIENT_MSG_STORE + {init, 2}, + + %% Called on queue shutdown when queue isn't being deleted + {terminate, 1}, + + %% Called when the queue is terminating and needs to delete all + %% its content. + {delete_and_terminate, 1}, + + %% Remove all messages in the queue, but not messages which have + %% been fetched and are pending acks. + {purge, 1}, + + %% Publish a message + {publish, 2}, + + %% Called for messages which have already been passed straight + %% out to a client. The queue will be empty for these calls + %% (i.e. saves the round trip through the internal queue). + {publish_delivered, 2}, + + {fetch, 1}, + + {ack, 2}, + + {tx_publish, 2}, + {tx_rollback, 2}, + {tx_commit, 4}, + + %% Reinsert messages into the queue which have already been + %% delivered and were (likely) pending acks.q + {requeue, 2}, + + {len, 1}, + + {is_empty, 1}, + + {set_queue_ram_duration_target, 2}, + + {remeasure_rates, 1}, + + {ram_duration, 1}, + + %% Can return 'undefined' or a function atom name plus list of + %% arguments to be invoked in the internal queue module as soon + %% as the queue process can manage (either on an empty mailbox, + %% or when a timer fires). + {needs_sync, 1}, + + %% Called immediately before the queue hibernates + {handle_pre_hibernate, 1}, + + %% Exists for debugging purposes, to be able to expose state via + %% rabbitmqctl list_queues internal_queue_status + {status, 1} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 22138bf112..838c5f9c58 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,7 +41,6 @@ -import(lists). -include("rabbit.hrl"). --include("rabbit_queue.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -1348,14 +1347,13 @@ test_queue_index() -> variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( - fun (_N, {Acc, VQ1}) -> - {SeqId, VQ2} = rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, [], <<>>, rabbit_guid:guid(), - IsPersistent), VQ1), - {[SeqId | Acc], VQ2} - end, {[], VQ}, lists:seq(1, Count)). + fun (_N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, [], <<>>, rabbit_guid:guid(), + IsPersistent), VQN) + end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> @@ -1377,9 +1375,7 @@ fresh_variable_queue() -> assert_prop(S0, len, 0), assert_prop(S0, q1, 0), assert_prop(S0, q2, 0), - assert_prop(S0, delta, #delta { start_seq_id = undefined, - count = 0, - end_seq_id = undefined }), + assert_prop(S0, delta, {delta, undefined, 0, undefined}), assert_prop(S0, q3, 0), assert_prop(S0, q4, 0), VQ. @@ -1394,7 +1390,7 @@ test_variable_queue_dynamic_duration_change() -> VQ0 = fresh_variable_queue(), %% start by sending in a couple of segments worth Len1 = 2*SegmentSize, - {_SeqIds, VQ1} = variable_queue_publish(false, Len1, VQ0), + VQ1 = variable_queue_publish(false, Len1, VQ0), VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), {ok, _TRef} = timer:send_after(1000, {duration, 60, fun (V) -> (V*0.75)-1 end}), @@ -1406,7 +1402,7 @@ test_variable_queue_dynamic_duration_change() -> %% just publish and fetch some persistent msgs, this hits the the %% partial segment path in queue_index due to the period when %% duration was 0 and the entire queue was delta. - {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6), + VQ7 = variable_queue_publish(true, 20, VQ6), {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9), @@ -1417,7 +1413,7 @@ test_variable_queue_dynamic_duration_change() -> passed. test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> - {_SeqIds, VQ1} = variable_queue_publish(false, 1, VQ0), + VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1), VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), receive @@ -1444,27 +1440,24 @@ test_variable_queue_partial_segments_delta_thing() -> SegmentSize = rabbit_queue_index:segment_size(), HalfSegment = SegmentSize div 2, VQ0 = fresh_variable_queue(), - {_SeqIds, VQ1} = - variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), + VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2), %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), - assert_prop(S3, delta, #delta { start_seq_id = SegmentSize, - count = HalfSegment, - end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S3, delta, {delta, SegmentSize, HalfSegment, + SegmentSize + HalfSegment}), assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3), - {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4), + VQ5 = variable_queue_publish(true, 1, VQ4), %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), io:format("~p~n", [S5]), assert_prop(S5, q1, 1), - assert_prop(S5, delta, #delta { start_seq_id = SegmentSize, - count = HalfSegment, - end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S5, delta, {delta, SegmentSize, HalfSegment, + SegmentSize + HalfSegment}), assert_prop(S5, q3, SegmentSize), assert_prop(S5, len, SegmentSize + HalfSegment + 1), {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, @@ -1472,9 +1465,7 @@ test_variable_queue_partial_segments_delta_thing() -> %% the half segment should now be in q3 as betas S6 = rabbit_variable_queue:status(VQ6), io:format("~p~n", [S6]), - assert_prop(S6, delta, #delta { start_seq_id = undefined, - count = 0, - end_seq_id = undefined }), + assert_prop(S6, delta, {delta, undefined, 0, undefined}), assert_prop(S6, q1, 1), assert_prop(S6, q3, HalfSegment), assert_prop(S6, len, HalfSegment + 1), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1934fafc3e..297c3ef401 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -133,6 +133,8 @@ %%---------------------------------------------------------------------------- +-behaviour(rabbit_internal_queue_type). + -record(vqstate, { q1, q2, @@ -162,7 +164,6 @@ }). -include("rabbit.hrl"). --include("rabbit_queue.hrl"). -record(msg_status, { msg, @@ -174,6 +175,12 @@ index_on_disk }). +-record(delta, + { start_seq_id, + count, + end_seq_id %% note the end_seq_id is always >, not >= + }). + %% When we discover, on publish, that we should write some indices to %% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of %% betas that we must be due to write indices for before we do any @@ -187,12 +194,17 @@ -ifdef(use_specs). +-type(msg_id() :: binary()). -type(bpqueue() :: any()). --type(msg_id() :: binary()). -type(seq_id() :: non_neg_integer()). -type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()} | 'ack_not_on_disk'). --type(vqstate() :: #vqstate { + +-type(delta() :: #delta { start_seq_id :: non_neg_integer(), + count :: non_neg_integer (), + end_seq_id :: non_neg_integer() }). + +-type(state() :: #vqstate { q1 :: queue(), q2 :: bpqueue(), delta :: delta(), @@ -220,36 +232,12 @@ transient_threshold :: non_neg_integer() }). --spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()). --spec(terminate/1 :: (vqstate()) -> vqstate()). --spec(publish/2 :: (basic_message(), vqstate()) -> - {seq_id(), vqstate()}). --spec(publish_delivered/2 :: (basic_message(), vqstate()) -> - {ack(), vqstate()}). --spec(set_queue_ram_duration_target/2 :: - (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()). --spec(remeasure_rates/1 :: (vqstate()) -> vqstate()). --spec(ram_duration/1 :: (vqstate()) -> number()). --spec(fetch/1 :: (vqstate()) -> - {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}), - vqstate()}). --spec(ack/2 :: ([ack()], vqstate()) -> vqstate()). --spec(len/1 :: (vqstate()) -> non_neg_integer()). --spec(is_empty/1 :: (vqstate()) -> boolean()). --spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}). --spec(delete_and_terminate/1 :: (vqstate()) -> vqstate()). --spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()). --spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()). --spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()). --spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) -> - {boolean(), vqstate()}). -spec(tx_commit_post_msg_store/5 :: - (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) -> - {boolean(), vqstate()}). --spec(tx_commit_index/1 :: (vqstate()) -> {boolean(), vqstate()}). --spec(needs_sync/1 :: (vqstate()) -> ('undefined' | {atom(), [any()]})). --spec(handle_pre_hibernate/1 :: (vqstate()) -> vqstate()). --spec(status/1 :: (vqstate()) -> [{atom(), any()}]). + (boolean(), [msg_id()], [ack()], {pid(), any()}, state()) -> + {boolean(), state()}). +-spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}). + +-include("rabbit_internal_queue_type_spec.hrl"). -endif. @@ -321,7 +309,8 @@ terminate(State = #vqstate { publish(Msg, State) -> State1 = limit_ram_index(State), - publish(Msg, false, false, State1). + {_SeqId, State2} = publish(Msg, false, false, State1), + State2. publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -553,7 +542,8 @@ requeue(MsgsWithAckTags, State) -> rabbit_misc:dict_cons(MsgStore, MsgId, Dict), true} end, - {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN), + {_SeqId, StateN1} = + publish(Msg, true, MsgOnDisk, StateN), {SeqIdsAcc1, Dict1, StateN1} end, {[], dict:new(), State}, MsgsWithAckTags), IndexState1 = @@ -648,7 +638,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, StateN}) -> - {SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN), + {SeqId, StateN1} = + publish(Msg, false, IsPersistent, StateN), {case IsPersistentStore andalso IsPersistent of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc |
