summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-11-30 12:05:37 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-11-30 12:05:37 +0000
commit755a485ad914deee022a869f7ed2ae1a268cc11b (patch)
treed1b5e110cef9cae9cb869bc42626012c348f1bb5 /src
parent1159352cb2bc1c0f8e2640d9e2754d2e88a42e1f (diff)
downloadrabbitmq-server-git-755a485ad914deee022a869f7ed2ae1a268cc11b.tar.gz
Exposing the publishing chpid right through to the BQ (and tidying up tests/types/specs)
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_invariable_queue.erl21
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl15
6 files changed, 37 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index fd15723178..1bbe3f1cfb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -499,11 +499,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ extra_pids = []}.
safe_delegate_call_ok(F, Pids) ->
{_, Bad} = delegate:invoke(Pids,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 08c688c7a2..1e45ef0b30 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -438,7 +438,7 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -447,7 +447,7 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
%% message_properties.
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Message,
- ?BASE_MESSAGE_PROPERTIES, BQS),
+ ?BASE_MESSAGE_PROPERTIES, ChPid, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
@@ -455,9 +455,9 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ {true, State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES,
+ ChPid, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -466,7 +466,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
BQS = BQ:publish(Message,
- message_properties(State),
+ message_properties(State), ChPid,
State #q.backing_queue_state),
{false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 7237f0ea3c..d04944f946 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -62,12 +62,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
@@ -81,7 +81,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index 41aff18588..5181979996 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -31,8 +31,8 @@
-module(rabbit_invariable_queue).
--export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
+-export([init/2, terminate/1, delete_and_terminate/1, purge/1, publish/4,
+ publish_delivered/5, fetch/2, ack/2, tx_publish/5, tx_ack/3,
dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
@@ -100,17 +100,17 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
ok = persist_acks(QName, IsDurable, none, AckTags, PA),
{Len, State #iv_state { len = 0, queue = queue:new() }}.
-publish(Msg, MsgProps, State = #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = Len }) ->
+publish(Msg, MsgProps, _ChPid, State = #iv_state { queue = Q,
+ qname = QName,
+ durable = IsDurable,
+ len = Len }) ->
ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }.
-publish_delivered(false, _Msg, _MsgProps, State) ->
+publish_delivered(false, _Msg, _MsgProps, _ChPid, State) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid },
- MsgProps,
+ MsgProps, _ChPid,
State = #iv_state { qname = QName, durable = IsDurable,
len = 0, pending_ack = PA }) ->
ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
@@ -159,8 +159,9 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
PA1 = remove_acks(AckTags, PA),
State #iv_state { pending_ack = PA1 }.
-tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
- durable = IsDurable }) ->
+tx_publish(Txn, Msg, MsgProps, _ChPid,
+ State = #iv_state { qname = QName,
+ durable = IsDurable }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a63baddb03..572f1457d1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1830,7 +1830,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1849,8 +1849,8 @@ assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
test_amqqueue(Durable) ->
- #amqqueue{name = test_queue(),
- durable = Durable}.
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
@@ -1912,7 +1912,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 73a68ec346..cd4101fb2f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,8 +32,8 @@
-module(rabbit_variable_queue).
-export([init/2, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -501,14 +501,15 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, _Msg, _MsgProps, _ChPid,
+ State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps,
+ MsgProps, _ChPid,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -640,8 +641,8 @@ ack(AckTags, State) ->
AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of