summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-03-27 10:12:31 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-03-27 10:12:31 +0000
commitb56ca1c9de895c770b196ea46f887ee2abafb48c (patch)
treeabb76b2292bea7e17b2b41743b305664106f331a
parentce55899c883839230f6e3a6546bb4db9087905bb (diff)
downloadrabbitmq-server-git-b56ca1c9de895c770b196ea46f887ee2abafb48c.tar.gz
Return msg in basic.get when in-memory limit is reached
-rw-r--r--src/rabbit_fifo.erl39
1 files changed, 25 insertions, 14 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 549da8acbb..960d8d144e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -256,9 +256,9 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(Meta, #checkout{spec = {dequeue, Settlement},
- meta = ConsumerMeta,
- consumer_id = ConsumerId},
+apply(#{from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
+ consumer_id = ConsumerId},
#?MODULE{consumers = Consumers} = State0) ->
Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of
@@ -272,17 +272,23 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
{once, 1, simple_prefetch},
State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
- %% TODO handle this checkout_one!!!
- case Settlement of
- unsettled ->
- {_, Pid} = ConsumerId,
- {State2, {dequeue, {MsgId, Msg}, Ready-1},
- [{monitor, process, Pid}]};
- settled ->
- %% immediately settle the checkout
- {State, _, Effects} =
- apply(Meta, make_settle(ConsumerId, [MsgId]),
- State2),
+ {State, Effects} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ case Msg of
+ {RaftIdx, {Header, 'empty'}} ->
+ %% TODO add here new log effect with reply
+ {State, '$ra_no_return',
+ reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From)};
+ _ ->
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
end
end;
@@ -1299,6 +1305,11 @@ send_log_effect({CTag, CPid}, RaftIdx, MsgId, Header) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, Msg}}]}, ra_event}
end}.
+reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
+ {log, RaftIdx, fun({enqueue, _, _, Msg}) ->
+ {wrap_reply, From, {dequeue, {MsgId, {Header, Msg}}, Ready}}
+ end}.
+
checkout_one(#?MODULE{service_queue = SQ0,
messages = Messages0,
consumers = Cons0} = InitState) ->