summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-08 13:01:14 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-08 13:01:14 +0000
commitb3ed57b031b51b1c01819064a38e337fc2860abc (patch)
tree52a0645464e01aa4729b0063f9360368654d6971 /src
parent048c0fe1a76edb282a69b52c7f42305bf9f0470e (diff)
downloadrabbitmq-server-git-b3ed57b031b51b1c01819064a38e337fc2860abc.tar.gz
add BQ:validate_message
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl72
-rw-r--r--src/rabbit_backing_queue.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_variable_queue.erl4
5 files changed, 69 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8c19aa168b..0c9eba9dfb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -487,46 +487,64 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
%% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
case {NeedsConfirming, MsgSeqNo} of
{_, undefined} -> ok;
{no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
{confirm, _} -> ok
end,
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
- DeliverFun =
- fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
- %% we don't need an expiry here because messages are
- %% not being enqueued, so we use an empty
- %% message_properties.
- {AckTag, BQS1} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= confirm)},
- ChPid, BQS),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS1}}
- end,
- {Delivered, State1} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, NeedsConfirming, State1};
+ case BQ:validate_message(Message, BQS) of
+ {invalid, _Bool} = Invalid ->
+ {Invalid, NeedsConfirming, State};
+ valid ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false,
+ State1 = #q{backing_queue_state = BQS1}) ->
+ %% we don't need an expiry here because
+ %% messages are not being enqueued, so we use
+ %% an empty message_properties.
+ {AckTag, BQS2} =
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
+ ChPid, BQS1),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS2}}
+ end,
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ State),
+ {{valid, Delivered}, NeedsConfirming, State1}
+ end;
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
{NeedsConfirming, State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, BQS),
- {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
+ case BQ:validate_message(Message, BQS) of
+ {invalid, _Reason} = Invalid ->
+ {Invalid, NeedsConfirming, State};
+ valid ->
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS),
+ {{valid, true}, NeedsConfirming,
+ State#q{backing_queue_state = BQS1}}
+ end.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, _, State1} ->
+ {{invalid, seen}, _, State1} ->
+ {true, State1};
+ {{valid, true}, _, State1} ->
{true, State1};
- {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}} ->
+ {{valid, false}, NeedsConfirming,
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
@@ -863,9 +881,9 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, _NeedsConfirming, State1} =
+ {{_Valid, Bool}, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, State1);
+ reply(Bool, State1);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d42fe14015..726b9befb8 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -127,7 +127,10 @@ behaviour_info(callbacks) ->
%% Passed a function to be invoked with the relevant backing
%% queue's state. Useful for when the backing queue or other
%% components need to pass functions into the backing queue.
- {invoke, 3}
+ {invoke, 3},
+
+ %% TODO: document me
+ {validate_message, 2}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 800d9453d1..513a8bb5b3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -280,3 +280,8 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+
+validate_message(Message, #state { backing_queue = BQ,
+ backing_queue_state = BSQ }) ->
+ %% this will definitely change.
+ BQ:validate_message(Message, BQS).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d9ad71208c..0aedff59e4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -119,6 +119,16 @@ init([#amqqueue { name = QueueName } = Q]) ->
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "immediate" delivery mode
+ %%
+ %% TODO: we cannot reply here because we may not have received
+ %% this from gm, and indeed the master might die before it
+ %% receives it. Thus if we are promoted to master at that point
+ %% then we must reply appropriately. So we're going to have to
+ %% enqueue it, record that it needs a reply, and then reply either
+ %% when we get the nod via gm, or, if we're promoted, in the mean
+ %% time we'll have to figure out something else... Of course, if
+ %% we've already seen it from gm then we're going to have to reply
+ %% now.
gen_server2:reply(From, false), %% master may deliver it, not us
noreply(maybe_enqueue_message(Delivery, State));
@@ -419,7 +429,7 @@ maybe_enqueue_message(
msg_seq_no = MsgSeqNo,
sender = ChPid },
State = #state { sender_queues = SQ,
- msg_id_status = MS }) ->
+ msg_id_status = MS }) ->
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
@@ -506,6 +516,7 @@ process_instruction(
State1 = State #state { sender_queues = SQ1,
msg_id_status = MS2 },
+ %% we probably want to work in BQ:validate_message here
{ok,
case Deliver of
false ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7019efbbf7..4ad46f1ab3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, multiple_routing_keys/0]).
+ status/1, invoke/3, validate_message/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -868,6 +868,8 @@ status(#vqstate {
invoke(?MODULE, Fun, State) ->
Fun(State).
+validate_message(_Msg, _State) -> true.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------