diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-08 13:01:14 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-08 13:01:14 +0000 |
| commit | b3ed57b031b51b1c01819064a38e337fc2860abc (patch) | |
| tree | 52a0645464e01aa4729b0063f9360368654d6971 /src | |
| parent | 048c0fe1a76edb282a69b52c7f42305bf9f0470e (diff) | |
| download | rabbitmq-server-git-b3ed57b031b51b1c01819064a38e337fc2860abc.tar.gz | |
add BQ:validate_message
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
5 files changed, 69 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8c19aa168b..0c9eba9dfb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -487,46 +487,64 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming case {NeedsConfirming, MsgSeqNo} of {_, undefined} -> ok; {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); {confirm, _} -> ok end, - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, - DeliverFun = - fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> - %% we don't need an expiry here because messages are - %% not being enqueued, so we use an empty - %% message_properties. - {AckTag, BQS1} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= confirm)}, - ChPid, BQS), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS1}} - end, - {Delivered, State1} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, NeedsConfirming, State1}; + case BQ:validate_message(Message, BQS) of + {invalid, _Bool} = Invalid -> + {Invalid, NeedsConfirming, State}; + valid -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, + State1 = #q{backing_queue_state = BQS1}) -> + %% we don't need an expiry here because + %% messages are not being enqueued, so we use + %% an empty message_properties. + {AckTag, BQS2} = + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = + (NeedsConfirming =:= confirm)}, + ChPid, BQS1), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS2}} + end, + {Delivered, State1} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + State), + {{valid, Delivered}, NeedsConfirming, State1} + end; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, BQS), - {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. + case BQ:validate_message(Message, BQS) of + {invalid, _Reason} = Invalid -> + {Invalid, NeedsConfirming, State}; + valid -> + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), + BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS), + {{valid, true}, NeedsConfirming, + State#q{backing_queue_state = BQS1}} + end. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, _, State1} -> + {{invalid, seen}, _, State1} -> + {true, State1}; + {{valid, true}, _, State1} -> {true, State1}; - {false, NeedsConfirming, State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}} -> + {{valid, false}, NeedsConfirming, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ @@ -863,9 +881,9 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, _NeedsConfirming, State1} = + {{_Valid, Bool}, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, State1); + reply(Bool, State1); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index d42fe14015..726b9befb8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -127,7 +127,10 @@ behaviour_info(callbacks) -> %% Passed a function to be invoked with the relevant backing %% queue's state. Useful for when the backing queue or other %% components need to pass functions into the backing queue. - {invoke, 3} + {invoke, 3}, + + %% TODO: document me + {validate_message, 2} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 800d9453d1..513a8bb5b3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -280,3 +280,8 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), {MsgIds, State #state { backing_queue_state = BQS1 }}. + +validate_message(Message, #state { backing_queue = BQ, + backing_queue_state = BSQ }) -> + %% this will definitely change. + BQ:validate_message(Message, BQS). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d9ad71208c..0aedff59e4 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -119,6 +119,16 @@ init([#amqqueue { name = QueueName } = Q]) -> handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> %% Synchronous, "immediate" delivery mode + %% + %% TODO: we cannot reply here because we may not have received + %% this from gm, and indeed the master might die before it + %% receives it. Thus if we are promoted to master at that point + %% then we must reply appropriately. So we're going to have to + %% enqueue it, record that it needs a reply, and then reply either + %% when we get the nod via gm, or, if we're promoted, in the mean + %% time we'll have to figure out something else... Of course, if + %% we've already seen it from gm then we're going to have to reply + %% now. gen_server2:reply(From, false), %% master may deliver it, not us noreply(maybe_enqueue_message(Delivery, State)); @@ -419,7 +429,7 @@ maybe_enqueue_message( msg_seq_no = MsgSeqNo, sender = ChPid }, State = #state { sender_queues = SQ, - msg_id_status = MS }) -> + msg_id_status = MS }) -> %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> @@ -506,6 +516,7 @@ process_instruction( State1 = State #state { sender_queues = SQ1, msg_id_status = MS2 }, + %% we probably want to work in BQ:validate_message here {ok, case Deliver of false -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7019efbbf7..4ad46f1ab3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, multiple_routing_keys/0]). + status/1, invoke/3, validate_message/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -868,6 +868,8 @@ status(#vqstate { invoke(?MODULE, Fun, State) -> Fun(State). +validate_message(_Msg, _State) -> true. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
