summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-08 16:05:08 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-08 16:05:08 +0100
commit332d920fa5d8c1ba3adb249f638ff07587bdcbf4 (patch)
tree18a6b2979669e00e446858a43b6e62937f0bc3e4
parent23e259db14f2cd45c2d76a157a96f3e166febb9d (diff)
downloadrabbitmq-server-git-332d920fa5d8c1ba3adb249f638ff07587bdcbf4.tar.gz
Pluggable queues land
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit_internal_queue_type_spec.hrl55
-rw-r--r--include/rabbit_queue.hrl44
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_internal_queue_type.erl97
-rw-r--r--src/rabbit_tests.erl45
-rw-r--r--src/rabbit_variable_queue.erl61
7 files changed, 202 insertions, 108 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 035fa0547d..a481af0863 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,6 +19,7 @@
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
{msg_store_index_module, rabbit_msg_store_ets_index},
+ {queue_internal_queue_module, rabbit_variable_queue},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_vhost, <<"/">>},
diff --git a/include/rabbit_internal_queue_type_spec.hrl b/include/rabbit_internal_queue_type_spec.hrl
new file mode 100644
index 0000000000..6409efb6f8
--- /dev/null
+++ b/include/rabbit_internal_queue_type_spec.hrl
@@ -0,0 +1,55 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-spec(init/2 :: (queue_name(), pid() | atom()) -> state()).
+-spec(terminate/1 :: (state()) -> state()).
+-spec(publish/2 :: (basic_message(), state()) -> state()).
+-spec(publish_delivered/2 :: (basic_message(), state()) -> {ack(), state()}).
+-spec(set_queue_ram_duration_target/2 ::
+ (('undefined' | 'infinity' | number()), state()) -> state()).
+-spec(remeasure_rates/1 :: (state()) -> state()).
+-spec(ram_duration/1 :: (state()) -> number()).
+-spec(fetch/1 :: (state()) ->
+ {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
+ state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(len/1 :: (state()) -> non_neg_integer()).
+-spec(is_empty/1 :: (state()) -> boolean()).
+-spec(purge/1 :: (state()) -> {non_neg_integer(), state()}).
+-spec(delete_and_terminate/1 :: (state()) -> state()).
+-spec(requeue/2 :: ([{basic_message(), ack()}], state()) -> state()).
+-spec(tx_publish/2 :: (basic_message(), state()) -> state()).
+-spec(tx_rollback/2 :: ([msg_id()], state()) -> state()).
+-spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, state()) ->
+ {boolean(), state()}).
+-spec(needs_sync/1 :: (state()) -> ('undefined' | {atom(), [any()]})).
+-spec(handle_pre_hibernate/1 :: (state()) -> state()).
+-spec(status/1 :: (state()) -> [{atom(), any()}]).
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl
deleted file mode 100644
index 66966ba80b..0000000000
--- a/include/rabbit_queue.hrl
+++ /dev/null
@@ -1,44 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--record(delta,
- { start_seq_id,
- count,
- end_seq_id %% note the end_seq_id is always >, not >=
- }).
-
--ifdef(use_specs).
-
--type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
- end_seq_id :: non_neg_integer() }).
-
--endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 33ea625c55..4c42b0ef84 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -114,11 +114,14 @@ init(Q) ->
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
+ {ok, InternalQueueModule} =
+ application:get_env(queue_internal_queue_module),
+
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- internal_queue = rabbit_variable_queue,
+ internal_queue = InternalQueueModule,
internal_queue_state = undefined,
internal_queue_timeout_fun = undefined,
next_msg_id = 1,
@@ -387,7 +390,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{internal_queue = IQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- {_SeqId, IQS} = IQ:publish(Message, State #q.internal_queue_state),
+ IQS = IQ:publish(Message, State #q.internal_queue_state),
{false, NewState #q { internal_queue_state = IQS }}
end.
diff --git a/src/rabbit_internal_queue_type.erl b/src/rabbit_internal_queue_type.erl
new file mode 100644
index 0000000000..4ee4556a48
--- /dev/null
+++ b/src/rabbit_internal_queue_type.erl
@@ -0,0 +1,97 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_internal_queue_type).
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ %% Called with queue name and the persistent msg_store to
+ %% use. Transient store is in ?TRANSIENT_MSG_STORE
+ {init, 2},
+
+ %% Called on queue shutdown when queue isn't being deleted
+ {terminate, 1},
+
+ %% Called when the queue is terminating and needs to delete all
+ %% its content.
+ {delete_and_terminate, 1},
+
+ %% Remove all messages in the queue, but not messages which have
+ %% been fetched and are pending acks.
+ {purge, 1},
+
+ %% Publish a message
+ {publish, 2},
+
+ %% Called for messages which have already been passed straight
+ %% out to a client. The queue will be empty for these calls
+ %% (i.e. saves the round trip through the internal queue).
+ {publish_delivered, 2},
+
+ {fetch, 1},
+
+ {ack, 2},
+
+ {tx_publish, 2},
+ {tx_rollback, 2},
+ {tx_commit, 4},
+
+ %% Reinsert messages into the queue which have already been
+ %% delivered and were (likely) pending acks.q
+ {requeue, 2},
+
+ {len, 1},
+
+ {is_empty, 1},
+
+ {set_queue_ram_duration_target, 2},
+
+ {remeasure_rates, 1},
+
+ {ram_duration, 1},
+
+ %% Can return 'undefined' or a function atom name plus list of
+ %% arguments to be invoked in the internal queue module as soon
+ %% as the queue process can manage (either on an empty mailbox,
+ %% or when a timer fires).
+ {needs_sync, 1},
+
+ %% Called immediately before the queue hibernates
+ {handle_pre_hibernate, 1},
+
+ %% Exists for debugging purposes, to be able to expose state via
+ %% rabbitmqctl list_queues internal_queue_status
+ {status, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 22138bf112..838c5f9c58 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,7 +41,6 @@
-import(lists).
-include("rabbit.hrl").
--include("rabbit_queue.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -1348,14 +1347,13 @@ test_queue_index() ->
variable_queue_publish(IsPersistent, Count, VQ) ->
lists:foldl(
- fun (_N, {Acc, VQ1}) ->
- {SeqId, VQ2} = rabbit_variable_queue:publish(
- rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, [], <<>>, rabbit_guid:guid(),
- IsPersistent), VQ1),
- {[SeqId | Acc], VQ2}
- end, {[], VQ}, lists:seq(1, Count)).
+ fun (_N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, [], <<>>, rabbit_guid:guid(),
+ IsPersistent), VQN)
+ end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -1377,9 +1375,7 @@ fresh_variable_queue() ->
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
- assert_prop(S0, delta, #delta { start_seq_id = undefined,
- count = 0,
- end_seq_id = undefined }),
+ assert_prop(S0, delta, {delta, undefined, 0, undefined}),
assert_prop(S0, q3, 0),
assert_prop(S0, q4, 0),
VQ.
@@ -1394,7 +1390,7 @@ test_variable_queue_dynamic_duration_change() ->
VQ0 = fresh_variable_queue(),
%% start by sending in a couple of segments worth
Len1 = 2*SegmentSize,
- {_SeqIds, VQ1} = variable_queue_publish(false, Len1, VQ0),
+ VQ1 = variable_queue_publish(false, Len1, VQ0),
VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
{ok, _TRef} = timer:send_after(1000, {duration, 60,
fun (V) -> (V*0.75)-1 end}),
@@ -1406,7 +1402,7 @@ test_variable_queue_dynamic_duration_change() ->
%% just publish and fetch some persistent msgs, this hits the the
%% partial segment path in queue_index due to the period when
%% duration was 0 and the entire queue was delta.
- {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6),
+ VQ7 = variable_queue_publish(true, 20, VQ6),
{VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7),
VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8),
VQ10 = rabbit_variable_queue:handle_pre_hibernate(VQ9),
@@ -1417,7 +1413,7 @@ test_variable_queue_dynamic_duration_change() ->
passed.
test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
- {_SeqIds, VQ1} = variable_queue_publish(false, 1, VQ0),
+ VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1),
VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
receive
@@ -1444,27 +1440,24 @@ test_variable_queue_partial_segments_delta_thing() ->
SegmentSize = rabbit_queue_index:segment_size(),
HalfSegment = SegmentSize div 2,
VQ0 = fresh_variable_queue(),
- {_SeqIds, VQ1} =
- variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
+ VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2),
%% one segment in q3 as betas, and half a segment in delta
S3 = rabbit_variable_queue:status(VQ3),
io:format("~p~n", [S3]),
- assert_prop(S3, delta, #delta { start_seq_id = SegmentSize,
- count = HalfSegment,
- end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S3, delta, {delta, SegmentSize, HalfSegment,
+ SegmentSize + HalfSegment}),
assert_prop(S3, q3, SegmentSize),
assert_prop(S3, len, SegmentSize + HalfSegment),
VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3),
- {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4),
+ VQ5 = variable_queue_publish(true, 1, VQ4),
%% should have 1 alpha, but it's in the same segment as the deltas
S5 = rabbit_variable_queue:status(VQ5),
io:format("~p~n", [S5]),
assert_prop(S5, q1, 1),
- assert_prop(S5, delta, #delta { start_seq_id = SegmentSize,
- count = HalfSegment,
- end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S5, delta, {delta, SegmentSize, HalfSegment,
+ SegmentSize + HalfSegment}),
assert_prop(S5, q3, SegmentSize),
assert_prop(S5, len, SegmentSize + HalfSegment + 1),
{VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
@@ -1472,9 +1465,7 @@ test_variable_queue_partial_segments_delta_thing() ->
%% the half segment should now be in q3 as betas
S6 = rabbit_variable_queue:status(VQ6),
io:format("~p~n", [S6]),
- assert_prop(S6, delta, #delta { start_seq_id = undefined,
- count = 0,
- end_seq_id = undefined }),
+ assert_prop(S6, delta, {delta, undefined, 0, undefined}),
assert_prop(S6, q1, 1),
assert_prop(S6, q3, HalfSegment),
assert_prop(S6, len, HalfSegment + 1),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1934fafc3e..297c3ef401 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -133,6 +133,8 @@
%%----------------------------------------------------------------------------
+-behaviour(rabbit_internal_queue_type).
+
-record(vqstate,
{ q1,
q2,
@@ -162,7 +164,6 @@
}).
-include("rabbit.hrl").
--include("rabbit_queue.hrl").
-record(msg_status,
{ msg,
@@ -174,6 +175,12 @@
index_on_disk
}).
+-record(delta,
+ { start_seq_id,
+ count,
+ end_seq_id %% note the end_seq_id is always >, not >=
+ }).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
%% betas that we must be due to write indices for before we do any
@@ -187,12 +194,17 @@
-ifdef(use_specs).
+-type(msg_id() :: binary()).
-type(bpqueue() :: any()).
--type(msg_id() :: binary()).
-type(seq_id() :: non_neg_integer()).
-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()}
| 'ack_not_on_disk').
--type(vqstate() :: #vqstate {
+
+-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer (),
+ end_seq_id :: non_neg_integer() }).
+
+-type(state() :: #vqstate {
q1 :: queue(),
q2 :: bpqueue(),
delta :: delta(),
@@ -220,36 +232,12 @@
transient_threshold :: non_neg_integer()
}).
--spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()).
--spec(terminate/1 :: (vqstate()) -> vqstate()).
--spec(publish/2 :: (basic_message(), vqstate()) ->
- {seq_id(), vqstate()}).
--spec(publish_delivered/2 :: (basic_message(), vqstate()) ->
- {ack(), vqstate()}).
--spec(set_queue_ram_duration_target/2 ::
- (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()).
--spec(remeasure_rates/1 :: (vqstate()) -> vqstate()).
--spec(ram_duration/1 :: (vqstate()) -> number()).
--spec(fetch/1 :: (vqstate()) ->
- {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
- vqstate()}).
--spec(ack/2 :: ([ack()], vqstate()) -> vqstate()).
--spec(len/1 :: (vqstate()) -> non_neg_integer()).
--spec(is_empty/1 :: (vqstate()) -> boolean()).
--spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}).
--spec(delete_and_terminate/1 :: (vqstate()) -> vqstate()).
--spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()).
--spec(tx_publish/2 :: (basic_message(), vqstate()) -> vqstate()).
--spec(tx_rollback/2 :: ([msg_id()], vqstate()) -> vqstate()).
--spec(tx_commit/4 :: ([msg_id()], [ack()], {pid(), any()}, vqstate()) ->
- {boolean(), vqstate()}).
-spec(tx_commit_post_msg_store/5 ::
- (boolean(), [msg_id()], [ack()], {pid(), any()}, vqstate()) ->
- {boolean(), vqstate()}).
--spec(tx_commit_index/1 :: (vqstate()) -> {boolean(), vqstate()}).
--spec(needs_sync/1 :: (vqstate()) -> ('undefined' | {atom(), [any()]})).
--spec(handle_pre_hibernate/1 :: (vqstate()) -> vqstate()).
--spec(status/1 :: (vqstate()) -> [{atom(), any()}]).
+ (boolean(), [msg_id()], [ack()], {pid(), any()}, state()) ->
+ {boolean(), state()}).
+-spec(tx_commit_index/1 :: (state()) -> {boolean(), state()}).
+
+-include("rabbit_internal_queue_type_spec.hrl").
-endif.
@@ -321,7 +309,8 @@ terminate(State = #vqstate {
publish(Msg, State) ->
State1 = limit_ram_index(State),
- publish(Msg, false, false, State1).
+ {_SeqId, State2} = publish(Msg, false, false, State1),
+ State2.
publish_delivered(Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -553,7 +542,8 @@ requeue(MsgsWithAckTags, State) ->
rabbit_misc:dict_cons(MsgStore, MsgId, Dict),
true}
end,
- {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
+ {_SeqId, StateN1} =
+ publish(Msg, true, MsgOnDisk, StateN),
{SeqIdsAcc1, Dict1, StateN1}
end, {[], dict:new(), State}, MsgsWithAckTags),
IndexState1 =
@@ -648,7 +638,8 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
- {SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN),
+ {SeqId, StateN1} =
+ publish(Msg, false, IsPersistent, StateN),
{case IsPersistentStore andalso IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc