summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-16 17:39:53 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-16 17:39:53 +0100
commita00e05261cc178cf967b7a4917e3c094a966e489 (patch)
tree6b7d050da7cc168780b75b18c851464a330fe507
parent78690e72efb16c5d777af3fab4f16048d2d1a34d (diff)
downloadrabbitmq-server-git-a00e05261cc178cf967b7a4917e3c094a966e489.tar.gz
publish_delivered(false, ...) -> discard
The two really are the same. Since 'discard' doesn't need the message or props, there is less stuff to pass around over GM.
-rw-r--r--src/rabbit_amqqueue_process.erl29
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_mirror_queue_master.erl59
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
-rw-r--r--src/rabbit_variable_queue.erl21
5 files changed, 69 insertions, 80 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7ce6958223..9def64ff25 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -513,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
+ State) ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([MsgId], State),
+ BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ State1#q{backing_queue_state = BQS1}.
+
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
@@ -521,17 +529,20 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
+attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+ Props = #message_properties{delivered = Delivered},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ fun (true, State1 = #q{backing_queue_state = BQS2}) ->
{AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, Props#message_properties.delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}}
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}};
+ (false, State1) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
{published, BQS1} ->
{true, State#q{backing_queue_state = BQS1}};
@@ -548,11 +559,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([Message#basic_message.id], State2),
- BQS1 = BQ:discard(Message, SenderPid, BQS),
- State3#q{backing_queue_state = BQS1};
+ discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c6d1778532..f257c9776f 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -84,12 +84,16 @@
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
--callback publish_delivered(true, rabbit_types:basic_message(),
+-callback publish_delivered(rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}.
+ -> {ack(), state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this is may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -200,13 +204,6 @@
-callback is_duplicate(rabbit_types:basic_message(), state())
-> {'false'|'published'|'discarded', state()}.
-%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this is may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
--callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
-
-else.
-export([behaviour_info/1]).
@@ -214,12 +211,11 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
- {discard, 3}];
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 377d51868d..cce19c907a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,11 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
+ status/1, invoke/3, is_duplicate/2, fold/3]).
-export([start/1, stop/0]).
@@ -183,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
-publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
- {AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {AckTag,
- ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 })}.
+ State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ {AckTag, ensure_monitoring(ChPid, State1)}.
+
+discard(MsgId, ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQS1,
+ seen_status = dict:erase(MsgId, SS) });
+ {ok, discarded} ->
+ State
+ end.
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
@@ -375,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId },
{discarded, State}
end.
-discard(Msg = #basic_message { id = MsgId }, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
-
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3d8bd8b408..e763bd9b0a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -696,25 +696,23 @@ publish_or_discard(Status, ChPid, MsgId,
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
-process_instruction({publish, false, ChPid, MsgProps,
+process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
-process_instruction({publish, {true, AckRequired}, ChPid, MsgProps,
+process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- {ok, maybe_store_ack(AckRequired, MsgId, AckTag,
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
-process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State) ->
+process_instruction({discard, ChPid, MsgId}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
- BQS1 = BQ:discard(Msg, ChPid, BQS),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddb136a73d..8ff44a296d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,11 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
ram_msg_count = RamMsgCount + 1,
unconfirmed = UC1 })).
-publish_delivered(false, #basic_message { id = MsgId },
- #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { async_callback = Callback,
- len = 0 }) ->
- case NeedsConfirming of
- true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
- false -> ok
- end,
- {undefined, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
+publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, State = #vqstate { len = 0,
@@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+discard(_MsgId, _ChPid, State) -> State.
+
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
true -> {[], State}; %% common case
@@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
is_duplicate(_Msg, State) -> {false, State}.
-discard(_Msg, _ChPid, State) -> State.
-
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------