diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-15 00:15:10 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-15 00:15:10 +0000 |
| commit | 1ca232d53728e19fb31c7032602ad0e87e3d45f8 (patch) | |
| tree | 58c22d2b19ce5f33b0eba6ea4fbfc56d3fa261a8 /src | |
| parent | 202ea16b2fc1f681b37e5523f71ca4c36422ef17 (diff) | |
| download | rabbitmq-server-git-1ca232d53728e19fb31c7032602ad0e87e3d45f8.tar.gz | |
dead-letter when auto-deleting
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a631990bff..30822aa6ae 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -733,12 +733,19 @@ mk_dead_letter_fun(Reason, _State) -> BQS1 end. +dead_letter_deleted_queue_reply(From, State) -> + case dead_letter_deleted_queue(From, State) of + {stop, State1} -> {stop, normal, State1}; + {stop, Count, State1} -> {stop, normal, {ok, Count}, State1}; + {ok, State1} -> noreply(State1) + end. + dead_letter_deleted_queue(undefined, State = #q{dlx = undefined}) -> - {stop, normal, State}; + {stop, State}; dead_letter_deleted_queue(_From, State = #q{dlx = undefined, backing_queue_state = BQS, backing_queue = BQ}) -> - {stop, normal, {ok, BQ:len(BQS)}, State}; + {stop, BQ:len(BQS), State}; dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, backing_queue = BQ, blocked_ops = Ops}) -> @@ -752,8 +759,8 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, true -> Ops; %% don't queue more than one delete false -> [{delete, {From, BQ:len(BQS)}} | Ops] end, - noreply(State#q{blocked_ops = Ops1, - backing_queue_state = BQS1}) + {ok, State#q{blocked_ops = Ops1, + backing_queue_state = BQS1}} end. dead_letter_msg(Msg, AckTag, Reason, @@ -1143,8 +1150,8 @@ handle_call({notify_down, ChPid}, _From, State) -> %% return stop with a reply, terminate/2 will be called by %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of - {ok, NewState} -> reply(ok, NewState); - {stop, NewState} -> {stop, normal, ok, NewState} + {ok, State1} -> reply(ok, State1); + {stop, State1} -> dead_letter_deleted_queue_reply(undefined, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -1219,7 +1226,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> {stop, normal, ok, State1} + true -> dead_letter_deleted_queue_reply(undefined, State1) end end; @@ -1235,7 +1242,8 @@ handle_call({delete, IfUnused, IfEmpty}, From, if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); IfUnused and not(IsUnused) -> reply({error, in_use}, State); - true -> dead_letter_deleted_queue(From, State) + true -> dead_letter_deleted_queue_reply(From, + State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1311,7 +1319,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> end)); handle_cast(delete_immediately, State) -> - dead_letter_deleted_queue(undefined, State); + dead_letter_deleted_queue_reply(undefined, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1372,7 +1380,7 @@ handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> dead_letter_deleted_queue(undefined, State); + true -> dead_letter_deleted_queue_reply(undefined, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1394,11 +1402,15 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - {stop, normal, State}; + case dead_letter_deleted_queue(undefined, State) of + {stop, State1} -> {stop, normal, State1}; + {stop, _, State1} -> {stop, normal, State1}; + {ok, State1} -> noreply(State1) + end; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of {ok, State1} -> handle_queue_down(DownPid, State1); - {stop, State1} -> {stop, normal, State1} + {stop, State1} -> dead_letter_deleted_queue(undefined, State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, |
