diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-30 12:05:37 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-11-30 12:05:37 +0000 |
| commit | 755a485ad914deee022a869f7ed2ae1a268cc11b (patch) | |
| tree | d1b5e110cef9cae9cb869bc42626012c348f1bb5 /src | |
| parent | 1159352cb2bc1c0f8e2640d9e2754d2e88a42e1f (diff) | |
| download | rabbitmq-server-git-755a485ad914deee022a869f7ed2ae1a268cc11b.tar.gz | |
Exposing the publishing chpid right through to the BQ (and tidying up tests/types/specs)
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
6 files changed, 37 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fd15723178..1bbe3f1cfb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -499,11 +499,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + extra_pids = []}. safe_delegate_call_ok(F, Pids) -> {_, Bad} = delegate:invoke(Pids, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08c688c7a2..1e45ef0b30 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -438,7 +438,7 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -447,7 +447,7 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> %% message_properties. {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Message, - ?BASE_MESSAGE_PROPERTIES, BQS), + ?BASE_MESSAGE_PROPERTIES, ChPid, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, @@ -455,9 +455,9 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + {true, State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, + ChPid, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -466,7 +466,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, NewState} -> %% Txn is none and no unblocked channels with consumers BQS = BQ:publish(Message, - message_properties(State), + message_properties(State), ChPid, State #q.backing_queue_state), {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 7237f0ea3c..d04944f946 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,12 +62,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Drop messages from the head of the queue while the supplied %% predicate returns true. @@ -81,7 +81,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 41aff18588..5181979996 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,8 +31,8 @@ -module(rabbit_invariable_queue). --export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, +-export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/4, + publish_delivered/5, fetch/2, ack/2, tx_publish/5, tx_ack/3, dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -100,17 +100,17 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> +publish(Msg, MsgProps, _ChPid, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg, MsgProps), State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. -publish_delivered(false, _Msg, _MsgProps, State) -> +publish_delivered(false, _Msg, _MsgProps, _ChPid, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, + MsgProps, _ChPid, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg, MsgProps), @@ -159,8 +159,9 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, - durable = IsDurable }) -> +tx_publish(Txn, Msg, MsgProps, _ChPid, + State = #iv_state { qname = QName, + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a63baddb03..572f1457d1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1830,7 +1830,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1849,8 +1849,8 @@ assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. test_amqqueue(Durable) -> - #amqqueue{name = test_queue(), - durable = Durable}. + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), @@ -1912,7 +1912,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73a68ec346..cd4101fb2f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_variable_queue). -export([init/2, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -501,14 +501,15 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _MsgProps, _ChPid, + State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, + MsgProps, _ChPid, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -640,8 +641,8 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of |
