diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-17 17:09:13 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-01-17 17:09:13 +0000 |
| commit | 73cdc5325488dab8ddb8a19c1db4d936bc0110bd (patch) | |
| tree | a467b511f5a595b171dfaf85b32e6388d5f8087c | |
| parent | 4773a339f444b510a9083e15e4e1e8f2774db6af (diff) | |
| download | rabbitmq-server-git-73cdc5325488dab8ddb8a19c1db4d936bc0110bd.tar.gz | |
prioritise confirms and coalesce them until we see another command
As well as sending confirms when the channel goes idle we also send
them on the first non-confirm (Erlang) message. I.e. coalescing is
restriced to adjacent confirms in the channel's mailbox. By
prioritising the 'confirm' erlang messages we shuffle all confirms to
(near) the top of the mailbox, adjacent to each other.
With this change we get our 'confirms will be sent eventually'
guarantee back. And performance appears to remain unchanged.
| -rw-r--r-- | src/rabbit_channel.erl | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 47a721bde3..c3cd7e0422 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -203,8 +203,9 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - emit_stats -> 7; - _ -> 0 + emit_stats -> 7; + {confirm, _MsgSeqNos, _QPid} -> 5; + _ -> 0 end. handle_call(flush, _From, State) -> @@ -284,11 +285,11 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> hibernate}; handle_cast({confirm, MsgSeqNos, From}, State) -> - noreply(confirm(MsgSeqNos, From, State)). + State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), + {noreply, State1, case C of [] -> hibernate; _ -> 0 end}. -handle_info(timeout, State = #ch{confirmed = C}) -> - {noreply, send_confirms(lists:append(C), State #ch{confirmed = []}), - hibernate}; +handle_info(timeout, State) -> + noreply(State); handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{unconfirmed = UC}) -> @@ -330,15 +331,11 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState = #ch{confirmed = []}) -> - {reply, Reply, ensure_stats_timer(NewState), hibernate}; reply(Reply, NewState) -> - {reply, Reply, ensure_stats_timer(NewState), 0}. + {reply, Reply, ensure_stats_timer(send_confirms(NewState)), hibernate}. -noreply(NewState = #ch{confirmed = []}) -> - {noreply, ensure_stats_timer(NewState), hibernate}; noreply(NewState) -> - {noreply, ensure_stats_timer(NewState), 0}. + {noreply, ensure_stats_timer(send_confirms(NewState)), hibernate}. ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> ChPid = self(), @@ -1246,6 +1243,9 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. +send_confirms(State = #ch{confirmed = C}) -> + send_confirms(lists:append(C), State #ch{confirmed = []}). + send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> |
