summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-01 18:17:48 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-01 18:17:48 +0100
commit9eff2df570edf1fbd74df17bb8b9eacb0f372e54 (patch)
tree15df3362ac53e3f29625bd5a2bc1866bad80aa9e /src
parentd11dfb4a5b5358cffff7f2758a07cf503ddb8a3a (diff)
downloadrabbitmq-server-git-9eff2df570edf1fbd74df17bb8b9eacb0f372e54.tar.gz
cosmetics and some refactoring
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl78
-rw-r--r--src/rabbit_channel.erl247
2 files changed, 142 insertions, 183 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 62ac6992a9..7723d7229e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -397,7 +397,7 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag, Remaining}, BQS1} =
BQ:fetch(AckRequired, BQS),
{{Message, IsDelivered, AckTag}, 0 == Remaining,
- State #q { backing_queue_state = BQS1 }}.
+ State#q{backing_queue_state = BQS1}}.
confirm_messages(Guids, State) when is_list(Guids) ->
lists:foldl(fun(Guid, State0) ->
@@ -410,23 +410,21 @@ confirm_message(Guid, State = #q{guid_to_channel = GTC}) ->
{ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
_ -> ok
end,
- State #q { guid_to_channel = dict:erase(Guid, GTC) }.
+ State#q{guid_to_channel = dict:erase(Guid, GTC)}.
-record_confirm_message(#delivery{msg_seq_no = undefined }, State) ->
+record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
-record_confirm_message(#delivery{sender = ChPid,
- message = #basic_message{guid = Guid},
- msg_seq_no = MsgSeqNo},
+record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
+ sender = ChPid,
+ message = #basic_message{guid = Guid}},
State = #q{guid_to_channel = GTC}) ->
- State #q { guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC) }.
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}.
ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
AckdGuids = BQ:seqids_to_guids(AckTags, BQS),
- BQS1 = BQ:ack(AckTags, BQS),
- confirm_messages(
- AckdGuids,
- State #q { backing_queue_state = BQS1 }).
+ confirm_messages(AckdGuids,
+ State#q{backing_queue_state = BQ:ack(AckTags, BQS)}).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -460,15 +458,11 @@ attempt_delivery(#delivery{txn = Txn,
deliver_or_enqueue(Delivery = #delivery{message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
- State1 = record_confirm_message(Delivery, State),
- case attempt_delivery(Delivery, State1) of
- {true, NewState} ->
- {true, NewState};
- {false, NewState} ->
- %% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, MsgSeqNo =/= undefined,
- State1 #q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+ case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
+ {true, NewState} -> {true, NewState};
+ {false, NewState} -> BQS = BQ:publish(Message, MsgSeqNo =/= undefined,
+ NewState#q.backing_queue_state),
+ {false, NewState#q{backing_queue_state = BQS}}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -566,13 +560,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- case Fun(BQS) of
- {BQS1, {confirm, Guids}} ->
- run_message_queue(
- confirm_messages(Guids, State #q { backing_queue_state = BQS1 }));
- BQS1 ->
- run_message_queue(State#q{backing_queue_state = BQS1})
- end.
+ {BQS2, State1} =
+ case Fun(BQS) of
+ {BQS1, {confirm, Guids}} -> {BQS1, confirm_messages(Guids, State)};
+ BQS1 -> {BQS1, State}
+ end,
+ run_message_queue(State1#q{backing_queue_state = BQS2}).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -720,15 +713,15 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} = attempt_delivery(Delivery, State),
- State2 = confirm_message(Delivery#delivery.message#basic_message.guid,
- record_confirm_message(Delivery, State1)),
- reply(Delivered, State2);
+ {Delivered, NewState} = attempt_delivery(Delivery, State),
+ reply(Delivered,
+ confirm_message(Delivery#delivery.message#basic_message.guid,
+ record_confirm_message(Delivery, NewState)));
handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, State1} = deliver_or_enqueue(Delivery, State),
- reply(Delivered, State1);
+ {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ reply(Delivered, NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -885,8 +878,8 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, State1} = deliver_or_enqueue(Delivery, State),
- noreply(State1);
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ noreply(NewState);
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -896,14 +889,13 @@ handle_cast({ack, Txn, AckTags, ChPid},
C = #cr{acktags = ChAckTags} ->
{C1, State1} =
case Txn of
- none ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
- _ ->
- {C#cr{txn = Txn},
- State #q { backing_queue_state = BQ:tx_ack(Txn, AckTags, BQS) }}
+ none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ NewC = C#cr{acktags = ChAckTags1},
+ NewState = ack_by_acktags(AckTags, State),
+ {NewC, NewState};
+ _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {C#cr{txn = Txn},
+ State#q{backing_queue_state = BQS1}}
end,
store_ch_record(C1),
noreply(State1)
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 20a554101c..722eff370e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -165,7 +165,6 @@ flush_multiple_acks(Pid) ->
confirm(Pid, MsgSeqNo) ->
gen_server2:cast(Pid, {confirm, MsgSeqNo}).
-
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
@@ -173,30 +172,30 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
- State = #ch{ state = starting,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- limiter_pid = undefined,
- start_limiter_fun = StartLimiterFun,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- blocking = dict:new(),
- queue_collector_pid = CollectorPid,
- stats_timer = StatsTimer,
- confirm_enabled = false,
- published_count = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
- unconfirmed = gb_sets:new(),
- qpid_to_msgs = dict:new() },
+ State = #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ start_limiter_fun = StartLimiterFun,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new(),
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid,
+ stats_timer = StatsTimer,
+ confirm_enabled = false,
+ published_count = 0,
+ confirm_multiple = false,
+ held_confirms = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ qpid_to_msgs = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -304,18 +303,19 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1)}.
-handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
- held_confirms = As,
- stats_timer = StatsTimer,
- unconfirmed = UC}) ->
+handle_pre_hibernate(State = #ch{writer_pid = WriterPid,
+ held_confirms = As,
+ stats_timer = StatsTimer,
+ unconfirmed = UC}) ->
ok = clear_permission_cache(),
flush_multiple(WriterPid, As, UC),
rabbit_event:if_enabled(StatsTimer, fun() ->
- internal_emit_stats(State)
+ internal_emit_stats(State)
end),
+ StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{held_confirms = gb_sets:new(),
- stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- confirm_tref = undefined}}.
+ stats_timer = StatsTimer1,
+ confirm_tref = undefined}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -456,52 +456,39 @@ send_or_enqueue_ack(undefined, State) ->
send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
- do_if_not_dup(
- MsgSeqNo, State,
- fun(MSN, State1 = #ch{writer_pid = WriterPid, qpid_to_msgs = QTM}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = MSN}),
- QTM1 = dict:map(fun (_, Msgs) ->
- gb_sets:delete_any(MsgSeqNo, Msgs)
- end, QTM),
- State1#ch{qpid_to_msgs = QTM1}
- end);
+ do_if_not_dup(MsgSeqNo, State,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ State1
+ end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
- do_if_not_dup(
- MsgSeqNo, State,
- fun(MSN, State1 = #ch{qpid_to_msgs = QTM}) ->
- QTM1 = dict:map(fun (_, Msgs) ->
- gb_sets:delete_any(MsgSeqNo, Msgs)
- end, QTM),
- start_ack_timer(
- State1#ch{held_confirms =
- gb_sets:add(MSN, State1#ch.held_confirms),
- qpid_to_msgs = QTM1})
- end).
+ do_if_not_dup(MsgSeqNo, State,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_ack_timer(State1#ch{held_confirms =
+ gb_sets:add(MSN, As)})
+ end).
msg_sent_to_queue(undefined, _QPid, State) ->
State;
msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
- case dict:find(QPid, QTM) of
- {ok, Msgs} ->
- State#ch{
- qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs), QTM)};
- error ->
- erlang:monitor(process, QPid),
- State#ch{
- qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, gb_sets:new()), QTM)}
- end.
+ Msgs1 = case dict:find(QPid, QTM) of
+ {ok, Msgs} -> Msgs;
+ error -> erlang:monitor(process, QPid),
+ gb_sets:new()
+ end,
+ State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1))}.
do_if_not_dup(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- State1 = Fun(MsgSeqNo, State),
- State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)};
- false ->
- State
+ true -> QTM = dict:map(fun (_, Msgs) ->
+ gb_sets:delete_any(MsgSeqNo, Msgs)
+ end, State#ch.qpid_to_msgs),
+ State1 = Fun(MsgSeqNo, State#ch{qpid_to_msgs = QTM}),
+ State1#ch{unconfirmed = gb_sets:delete(MsgSeqNo, UC)};
+ false -> State
end.
-
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -526,7 +513,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid,
confirm_enabled = ConfirmEnabled}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -537,14 +523,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1}
= case ConfirmEnabled of
- false ->
- {undefined, State};
- true ->
- Count = State#ch.published_count,
- {Count,
- State#ch{published_count = Count + 1,
- unconfirmed =
- gb_sets:add(Count, State#ch.unconfirmed) }}
+ false -> {undefined, State};
+ true -> Count = State#ch.published_count,
+ {Count,
+ State#ch{published_count = Count + 1,
+ unconfirmed =
+ gb_sets:add(Count, State#ch.unconfirmed)}}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -559,25 +543,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> MsgSeqNo;
false -> undefined
end)),
- State2 = case RoutingRes of
- %% Confirm transient messages now
- routed ->
- case {IsPersistent, DeliveredQPids} of
- {_, []} -> send_or_enqueue_ack(MsgSeqNo, State1);
- {true, _} ->
- lists:foldl(fun (QPid, State0) ->
- msg_sent_to_queue(MsgSeqNo, QPid, State0)
- end, State1, DeliveredQPids);
- {false, _} -> send_or_enqueue_ack(MsgSeqNo, State1)
- end;
- %% Confirm after basic.returns
- unroutable ->
- ok = basic_return(Message, WriterPid, no_route),
- send_or_enqueue_ack(MsgSeqNo, State1);
- not_delivered ->
- ok = basic_return(Message, WriterPid, no_consumers),
- send_or_enqueue_ack(MsgSeqNo, State1)
- end,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent,
+ MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
@@ -1002,30 +969,18 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
precondition_failed, "cannot switch from tx to confirm mode", []);
handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _,
- State = #ch{confirm_enabled = false}) ->
- State1 = State#ch{confirm_enabled = true,
- confirm_multiple = Multiple},
- case NoWait of
- true -> {noreply, State1};
- false -> {reply, #'confirm.select_ok'{}, State1}
- end;
+ _, State = #ch{confirm_enabled = false}) ->
+ return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+ NoWait, #'confirm.select_ok'{});
handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _,
- State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- rabbit_log:info("got a confirm.select with same options~n"),
- case NoWait of
- true -> {noreply, State};
- false -> {reply, #'confirm.select_ok'{}, State}
- end;
+ _, State = #ch{confirm_enabled = true,
+ confirm_multiple = Multiple}) ->
+ return_ok(State, NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{},
- _,
- #ch{confirm_enabled = true}) ->
+handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm channel multiple setting", []);
+ precondition_failed, "cannot change confirm_multiple setting", []);
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
@@ -1249,6 +1204,21 @@ is_message_persistent(Content) ->
IsPersistent
end.
+process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_route),
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_consumers),
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
+ send_or_enqueue_ack(MsgSeqNo, State);
+process_routing_result(routed, QPids, true, MsgSeqNo, _, State) ->
+ lists:foldl(fun (QPid, State0) ->
+ msg_sent_to_queue(MsgSeqNo, QPid, State0)
+ end, State, QPids).
+
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
lock_message(false, _MsgStruct, State) ->
@@ -1370,33 +1340,30 @@ stop_ack_timer(State = #ch{confirm_tref = TRef}) ->
flush_multiple(WriterPid, As, NA) ->
case gb_sets:is_empty(As) of
true -> ok;
- false -> SmallestNotAcked = case gb_sets:is_empty(NA) of
- false -> gb_sets:smallest(NA);
- true -> gb_sets:largest(As)+1
- end,
- [First | Rest] = gb_sets:to_list(As),
- Remaining =
- case Rest of
- [] -> [First];
- _ -> flush_multiple(First, Rest, WriterPid, SmallestNotAcked)
- end,
- [rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = A})
- || A <- Remaining]
-end.
-
+ false -> [First | Rest] = gb_sets:to_list(As),
+ [rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = A}) ||
+ A <- case Rest of
+ [] -> [First];
+ _ -> flush_multiple(
+ First, Rest, WriterPid,
+ case gb_sets:is_empty(NA) of
+ false -> gb_sets:smallest(NA);
+ true -> gb_sets:largest(As) + 1
+ end)
+ end],
+ ok
+ end.
flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) ->
- ExpNext = Prev+1,
+ ExpNext = Prev + 1,
case {SNA >= Cur, Cur} of
- {true, ExpNext} ->
- flush_multiple(Cur, Rest, WriterPid, SNA);
- _ ->
- flush_multiple(Prev, [], WriterPid, SNA),
- [Cur | Rest]
+ {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA);
+ _ -> flush_multiple(Prev, [], WriterPid, SNA),
+ [Cur | Rest]
end;
flush_multiple(Prev, [], WriterPid, _) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Prev,
- multiple = true}),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = Prev,
+ multiple = true}),
[].