diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 5 |
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8fe0d62386..80051149fe 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -90,13 +90,14 @@ start_link(Q) -> %%---------------------------------------------------------------------------- -init(Q) -> +init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + {ok, MS} = rabbit_mixed_queue:start_link(QName, mixed), %% TODO, CHANGE ME {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - mixed_state = rabbit_mixed_queue:start_link(qname(Q), mixed), %% TODO, CHANGE ME + mixed_state = MS, next_msg_id = 1, round_robin = queue:new()}, ?HIBERNATE_AFTER}. @@ -437,17 +438,17 @@ commit_transaction(Txn, State) -> {MsgWithAcks, Remaining} = collect_messages(PendingAcksOrdered, UAM), store_ch_record(C#cr{unacked_messages = Remaining}), - MS = rabbit_mixed_queue:tx_commit( - PendingMessagesOrdered, - lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), - State #q.mixed_state), + {ok, MS} = rabbit_mixed_queue:tx_commit( + PendingMessagesOrdered, + lists:map(fun ({_Msg, AckTag}) -> AckTag end, MsgWithAcks), + State #q.mixed_state), State #q { mixed_state = MS } end. rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages } = lookup_tx(Txn), - MS = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state), + {ok, MS} = rabbit_mixed_queue:tx_cancel(lists:reverse(PendingMessages), State #q.mixed_state), State #q { mixed_state = MS }. %% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C @@ -631,12 +632,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, mixed_state = MS, round_robin = RoundRobin}) -> - {Length, MS2} = rabbit_mixed_queue:length(MS), - reply({ok, Name, Length, queue:len(RoundRobin)}, State #q { mixed_state = MS2 }); + Length = rabbit_mixed_queue:length(MS), + reply({ok, Name, Length, queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q { mixed_state = MS }) -> - {Length, MS2} = rabbit_mixed_queue:length(MS), + Length = rabbit_mixed_queue:length(MS), IsEmpty = Length == 0, IsUnused = is_unused(), if @@ -645,7 +646,7 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IfUnused and not(IsUnused) -> reply({error, in_use}, State); true -> - {stop, normal, {ok, Length}, State #q { mixed_state = MS2 }} + {stop, normal, {ok, Length}, State} end; handle_call(purge, _From, State) -> diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 24d0de8d59..037aeebf18 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -204,10 +204,9 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> {Count, State #mqstate { msg_buf = queue:new() }}. length(State = #mqstate { queue = Q, mode = disk }) -> - Length = rabbit_disk_queue:length(Q), - {Length, State}; + rabbit_disk_queue:length(Q); length(State = #mqstate { mode = mixed, msg_buf = MsgBuf }) -> - {queue:length(MsgBuf), State}. + queue:len(MsgBuf). is_empty(State) -> 0 == rabbit_mixed_queue:length(State). |
