diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-03-27 10:12:31 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-03-27 10:12:31 +0000 |
| commit | b56ca1c9de895c770b196ea46f887ee2abafb48c (patch) | |
| tree | abb76b2292bea7e17b2b41743b305664106f331a | |
| parent | ce55899c883839230f6e3a6546bb4db9087905bb (diff) | |
| download | rabbitmq-server-git-b56ca1c9de895c770b196ea46f887ee2abafb48c.tar.gz | |
Return msg in basic.get when in-memory limit is reached
| -rw-r--r-- | src/rabbit_fifo.erl | 39 |
1 files changed, 25 insertions, 14 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 549da8acbb..960d8d144e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -256,9 +256,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, %% credit for unknown consumer - just ignore {State0, ok} end; -apply(Meta, #checkout{spec = {dequeue, Settlement}, - meta = ConsumerMeta, - consumer_id = ConsumerId}, +apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement}, + meta = ConsumerMeta, + consumer_id = ConsumerId}, #?MODULE{consumers = Consumers} = State0) -> Exists = maps:is_key(ConsumerId, Consumers), case messages_ready(State0) of @@ -272,17 +272,23 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), - %% TODO handle this checkout_one!!! - case Settlement of - unsettled -> - {_, Pid} = ConsumerId, - {State2, {dequeue, {MsgId, Msg}, Ready-1}, - [{monitor, process, Pid}]}; - settled -> - %% immediately settle the checkout - {State, _, Effects} = - apply(Meta, make_settle(ConsumerId, [MsgId]), - State2), + {State, Effects} = case Settlement of + unsettled -> + {_, Pid} = ConsumerId, + {State2, [{monitor, process, Pid}]}; + settled -> + %% immediately settle the checkout + {State3, _, Effects0} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), + {State3, Effects0} + end, + case Msg of + {RaftIdx, {Header, 'empty'}} -> + %% TODO add here new log effect with reply + {State, '$ra_no_return', + reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)}; + _ -> {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -1299,6 +1305,11 @@ send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) -> {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event} end}. +reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> + {log, RaftIdx, fun({enqueue, _, _, Msg}) -> + {wrap_reply, From, {dequeue, {MsgId, {Header, Msg}}, Ready}} + end}. + checkout_one(#?MODULE{service_queue = SQ0, messages = Messages0, consumers = Cons0} = InitState) -> |
