summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-11-15 22:10:17 +0300
committerGitHub <noreply@github.com>2019-11-15 22:10:17 +0300
commit880965aa67009dc3fb8dc161b8b1b6df6a71ed9d (patch)
treea3327e6b46831878e56dfd1f5327f106fc7ab599
parent9045fbe0ed76d1c74823ff1517a6f5b8c62cfa08 (diff)
parent54c80294917b00c07a4cd7f5da2ba18ee57fa894 (diff)
downloadrabbitmq-server-git-880965aa67009dc3fb8dc161b8b1b6df6a71ed9d.tar.gz
Merge pull request #2146 from rabbitmq/qq-local-delivery
Quorum queue local delivery
-rw-r--r--src/rabbit_fifo.erl7
-rw-r--r--test/rabbit_fifo_SUITE.erl8
2 files changed, 8 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index ccacc136fb..68337adcaa 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1418,7 +1418,7 @@ take_next_msg(#?MODULE{returns = Returns,
end.
send_msg_effect({CTag, CPid}, Msgs) ->
- {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
+ {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
send_log_effect({CTag, CPid}, IdxMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
@@ -1427,8 +1427,9 @@ send_log_effect({CTag, CPid}, IdxMsgs) ->
Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) ->
{MsgId, {Header, Msg}}
end, Log, Data),
- [{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}]
- end}.
+ [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
+ end,
+ {local, node(CPid)}}.
reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{log, [RaftIdx],
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index dba47aa225..1714d57932 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -306,10 +306,10 @@ return_checked_out_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again
- {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _},
{aux, active}]} =
apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1),
ok.
@@ -323,10 +323,10 @@ return_checked_out_limit_test(_) ->
delivery_limit => 1}),
{State0, [_, _]} = enq(1, 1, first, Init),
{State1, [_Monitor,
- {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again
- {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _},
{aux, active}]} =
apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1),
{#rabbit_fifo{ra_indexes = RaIdxs}, ok, []} =