diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 17:33:13 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 17:33:13 +0100 |
| commit | c93e36365f10a6550db91a25f71bb2c0eb2d10f4 (patch) | |
| tree | 0a857dca5d2e877a689b83f40d0902313ec2908e | |
| parent | 54c4957e9d44abe29982bb2d8313b4bc7ce56f6b (diff) | |
| download | rabbitmq-server-git-c93e36365f10a6550db91a25f71bb2c0eb2d10f4.tar.gz | |
refactoring
| -rw-r--r-- | src/rabbit_channel.erl | 117 |
1 files changed, 56 insertions, 61 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 954fb88891..e32ed1c458 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,8 +48,8 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm}). --record(confirm, {enabled, count, multiple, tref, held_acks, need_acking}). + confirm_enabled, published_count, confirm_multiple, confirm_tref, + held_confirms, need_confirming}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -167,28 +167,28 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), - State = #ch{state = starting, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - limiter_pid = undefined, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - blocking = dict:new(), - queue_collector_pid = CollectorPid, - stats_timer = rabbit_event:init_stats_timer(), - confirm = #confirm{enabled = false, - count = 0, - multiple = false, - held_acks = gb_sets:new(), - need_acking = gb_sets:new()}}, + State = #ch{ state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + blocking = dict:new(), + queue_collector_pid = CollectorPid, + stats_timer = rabbit_event:init_stats_timer(), + confirm_enabled = false, + published_count = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + need_confirming = gb_sets:new() }, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -258,17 +258,17 @@ handle_cast(emit_stats, State) -> {noreply, State}; handle_cast(multiple_ack_flush, - State = #ch{writer_pid = WriterPid, - confirm = C = #confirm{held_acks = As, - need_acking = NA}}) -> + State = #ch{writer_pid = WriterPid, + held_confirms = As, + need_confirming = NA}) -> rabbit_log:info("channel got a multiple_ack_flush message~n" "held acks: ~p~n", [gb_sets:to_list(As)]), case gb_sets:is_empty(As) of true -> ok; % this should never be the case false -> flush_multiple(As, WriterPid, gb_sets:smallest(NA)) end, - {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), - tref = undefined}}}; + {noreply, State #ch { held_confirms = gb_sets:new(), + confirm_tref = undefined }}; handle_cast({confirm, MsgSeqNo}, State) -> {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. @@ -427,10 +427,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> send_or_enqueue_ack(undefined, State) -> State; -send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) -> +send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, - State = #ch{confirm = #confirm{multiple = false}}) -> + State = #ch{confirm_multiple = false}) -> rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]), do_if_not_dup(MsgSeqNo, State, fun(MSN, S = #ch{writer_pid = WriterPid}) -> @@ -438,23 +438,20 @@ send_or_enqueue_ack(MsgSeqNo, WriterPid, #'basic.ack'{delivery_tag = MSN}), S end); -send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) -> +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]), do_if_not_dup(MsgSeqNo, State, fun(MSN, S) -> - State1 - = #ch{confirm = C = #confirm{held_acks = As}} - = start_ack_timer(S), - State1#ch{confirm = - C#confirm{held_acks = - gb_sets:add(MSN, As)}} + State1 = start_ack_timer(S), + State1 #ch { held_confirms = + gb_sets:add(MSN, State1#ch.held_confirms) } end). -do_if_not_dup(MsgSeqNo, State = #ch{confirm = #confirm{need_acking = NA}}, Fun) -> +do_if_not_dup(MsgSeqNo, State = #ch{need_confirming = NA}, Fun) -> case gb_sets:is_element(MsgSeqNo, NA) of true -> - State1 = #ch{confirm = C} = Fun(MsgSeqNo, State), - State1#ch{confirm = C#confirm{need_acking = gb_sets:delete(MsgSeqNo, NA)}}; + State1 = Fun(MsgSeqNo, State), + State1 #ch { need_confirming = gb_sets:delete(MsgSeqNo, NA) }; false -> State end. @@ -494,16 +491,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, IsPersistent = is_message_persistent(DecodedContent), % PubAck transient messages immediately {MsgSeqNo, State1} - = case State#ch.confirm#confirm.enabled of + = case State#ch.confirm_enabled of false -> {undefined, State}; true -> - Confirm = State#ch.confirm, - Count = Confirm#confirm.count, + Count = State#ch.published_count, % Add the current message to the need_acking list - State01 = State#ch{confirm = Confirm#confirm{ - need_acking = gb_sets:add(Count, - Confirm#confirm.need_acking)}}, + State01 = State #ch { + need_confirming = gb_sets:add(Count, + State#ch.need_confirming) }, % Ack transient messages now {CountOrUndefined, State02} = case IsPersistent of @@ -513,9 +509,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Count, State01)} end, % Increase the PubAck counter - Confirm02 = State02#ch.confirm, {CountOrUndefined, - State02#ch{confirm = Confirm02#confirm{count = Count+1}}} + State02 #ch { published_count = Count + 1 }} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -930,7 +925,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, #ch{confirm = #confirm{enabled = true}}) -> +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> rabbit_misc:protocol_error( precondition_failed, "a confirm channel cannot be made transactional", []); @@ -961,11 +956,11 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _, - State = #ch{confirm = C = #confirm{enabled = false}}) -> + State = #ch{confirm_enabled = false}) -> rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", [Multiple, NoWait]), - State1 = State#ch{confirm = C#confirm{enabled = true, - multiple = Multiple}}, + State1 = State #ch { confirm_enabled = true, + confirm_multiple = Multiple }, case NoWait of true -> {noreply, State1}; false -> {reply, #'confirm.select_ok'{}, State1} @@ -973,8 +968,8 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _, - State = #ch{confirm = #confirm{enabled = true, - multiple = Multiple}}) -> + State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> rabbit_log:info("got a confirm.select with same options~n"), case NoWait of true -> {noreply, State}; @@ -983,7 +978,7 @@ handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, handle_method(#'confirm.select'{}, _, - #ch{confirm = #confirm{enabled = true}}) -> + #ch{confirm_enabled = true}) -> rabbit_misc:protocol_error( precondition_failed, "cannot change confirm channel multiple setting", []); @@ -1311,18 +1306,18 @@ erase_queue_stats(QPid) -> [erase({queue_exchange_stats, QX}) || {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. -start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) -> +start_ack_timer(State = #ch{confirm_tref = undefined}) -> {ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL, ?MODULE, flush_multiple_acks, [self()]), - State#ch{confirm = C#confirm{tref = TRef}}; + State #ch { confirm_tref = TRef }; start_ack_timer(State) -> State. -stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) -> +stop_ack_timer(State = #ch{confirm_tref = undefined}) -> State; -stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) -> +stop_ack_timer(State = #ch{confirm_tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), - State#ch{confirm = C#confirm{tref = undefined}}. + State #ch { confirm_tref = undefined }. flush_multiple(Acks, WriterPid, SmallestNotAcked) -> [First | Rest] = gb_sets:to_list(Acks), |
