diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-11 11:52:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-11 11:52:38 +0100 |
| commit | f66d8d4e1b3be9f43762677abf0d4a39128ad658 (patch) | |
| tree | 2c7c36fc36c8c218a9c3297a459e9bd1e812271c | |
| parent | a99b541be0332defa32c8ce7548acd9830c56892 (diff) | |
| download | rabbitmq-server-git-f66d8d4e1b3be9f43762677abf0d4a39128ad658.tar.gz | |
And suddenly it works. Testing showed that removing the crude limit UNSENT_MESSAGE_LIMIT made performance better. This then made me wonder if the unblock and notify_sent messages weren't getting through fast enough, and sure enough, using pcast is much better there. Also, turning on dbg:tpl showed that the common path in mixed_queue was to call publish_delivered (i.e. the message has been delivered to a consumer, we just need to record this fact). Making sure everything in there for the non-persistent, non-durable but disk-only mode is asynchronous also helped performance massively.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 21 |
3 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a5c58f2358..01d40aa1a8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -309,10 +309,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 10, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 10, {unblock, ChPid}). constrain_memory(QPid, Constrain) -> gen_server2:pcast(QPid, 10, {constrain, Constrain}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6ad4e4e6b7..d325346c9e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -208,7 +208,7 @@ deliver_queue(Fun, FunAcc0, true -> deliver_queue(Fun, FunAcc1, State3) end end; - %% if IsMsgReady then (AckRequired and we've hit the limiter) + %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), @@ -245,8 +245,8 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> - {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg, - State2 #q.mixed_state), + {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered( + Msg, State2 #q.mixed_state), {AckTag2, State2 #q { mixed_state = MS }}; true -> {noack, State2} diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 4dce52e70f..a950584a10 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -187,16 +187,19 @@ publish_delivered(Msg = State = #mqstate { mode = Mode, is_durable = IsDurable, next_write_seq = NextSeq, queue = Q }) when Mode =:= disk orelse (IsDurable andalso IsPersistent) -> - true = rabbit_disk_queue:is_empty(Q), rabbit_disk_queue:publish(Q, MsgId, msg_to_bin(Msg), false), - %% must call phantom_deliver otherwise the msg remains at the head - %% of the queue - {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), - State2 = - if Mode =:= mixed -> State #mqstate { next_write_seq = NextSeq + 1 }; - true -> State - end, - {ok, AckTag, State2}; + if IsDurable andalso IsPersistent -> + %% must call phantom_deliver otherwise the msg remains at + %% the head of the queue. This is synchronous, but + %% unavoidable as we need the AckTag + {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(Q), + {ok, AckTag, State}; + true -> + %% in this case, we don't actually care about the ack, so + %% auto ack it (asynchronously). + ok = rabbit_disk_queue:auto_ack_next_message(Q), + {ok, noack, State #mqstate { next_write_seq = NextSeq + 1 }} + end; publish_delivered(_Msg, State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> true = queue:is_empty(MsgBuf), {ok, noack, State}. |
