diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-11-15 22:10:17 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-11-15 22:10:17 +0300 |
| commit | 880965aa67009dc3fb8dc161b8b1b6df6a71ed9d (patch) | |
| tree | a3327e6b46831878e56dfd1f5327f106fc7ab599 | |
| parent | 9045fbe0ed76d1c74823ff1517a6f5b8c62cfa08 (diff) | |
| parent | 54c80294917b00c07a4cd7f5da2ba18ee57fa894 (diff) | |
| download | rabbitmq-server-git-880965aa67009dc3fb8dc161b8b1b6df6a71ed9d.tar.gz | |
Merge pull request #2146 from rabbitmq/qq-local-delivery
Quorum queue local delivery
| -rw-r--r-- | src/rabbit_fifo.erl | 7 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 8 |
2 files changed, 8 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index ccacc136fb..68337adcaa 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1418,7 +1418,7 @@ take_next_msg(#?MODULE{returns = Returns, end. send_msg_effect({CTag, CPid}, Msgs) -> - {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. + {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}. send_log_effect({CTag, CPid}, IdxMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), @@ -1427,8 +1427,9 @@ send_log_effect({CTag, CPid}, IdxMsgs) -> Msgs = lists:zipwith(fun({enqueue, _, _, Msg}, {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), - [{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}] - end}. + [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] + end, + {local, node(CPid)}}. reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {log, [RaftIdx], diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index dba47aa225..1714d57932 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -306,10 +306,10 @@ return_checked_out_test(_) -> Cid = {<<"cid">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, - {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again - {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _}, {aux, active}]} = apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1), ok. @@ -323,10 +323,10 @@ return_checked_out_limit_test(_) -> delivery_limit => 1}), {State0, [_, _]} = enq(1, 1, first, Init), {State1, [_Monitor, - {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {aux, active} | _ ]} = check_auto(Cid, 2, State0), % returning immediately checks out the same message again - {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event}, + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _}, {aux, active}]} = apply(meta(3), rabbit_fifo:make_return(Cid, [MsgId]), State1), {#rabbit_fifo{ra_indexes = RaIdxs}, ok, []} = |
