summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-10-02 14:34:39 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-10-02 14:34:47 +0100
commite65e740b2a4d800bafd6d2f7e35ec5a9622a86b4 (patch)
treeefe74b25ef5d3269553a8bea0cb19af50db6485d
parent4e87deff41daee0f79f7b3f92f27539292f45a65 (diff)
downloadrabbitmq-server-git-e65e740b2a4d800bafd6d2f7e35ec5a9622a86b4.tar.gz
Drop head after changing a policy to drop-head. Clarify TODOs
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
3 files changed, 12 insertions, 4 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7721fc70fc..ed88e69378 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -428,8 +428,14 @@ init_max_bytes(MaxBytes, State) ->
init_overflow(undefined, State) ->
State;
init_overflow(Overflow, State) ->
- %% TODO maybe drop head
- State#q{overflow = binary_to_existing_atom(Overflow, utf8)}.
+ OverflowVal = binary_to_existing_atom(Overflow, utf8),
+ case OverflowVal of
+ 'drop-head' ->
+ {_Dropped, State1} = maybe_drop_head(State#q{overflow = OverflowVal}),
+ State1;
+ _ ->
+ State#q{overflow = OverflowVal}
+ end.
init_queue_mode(undefined, State) ->
State;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 345f90d5e7..1167ac66f9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1879,7 +1879,8 @@ send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
- %% TODO: msg seq nos, same as for confirms.
+ %% TODO: msg seq nos, same as for confirms. Need to implement
+ %% nack rates first.
send_nacks(lists:append(R), State1#ch{rejected = []});
pausing -> State
end;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4a6e077f24..6139099ed1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -302,7 +302,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
%% We are acking messages to the channel process that sent us
%% the message delivery. See
%% rabbit_amqqueue_process:handle_ch_down for more info.
- %% TODO: reject publishes
+ %% If message is rejected by the master, the publish will be nacked
+ %% even if slaves confirm it. No need to check for length here.
maybe_flow_ack(Sender, Flow),
noreply(maybe_enqueue_message(Delivery, State));