diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 40 |
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1c4a371675..a7468936bb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -48,7 +48,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). --export([init_with_backing_queue_state/4]). +-export([init_with_backing_queue_state/6]). % Queue's state -record(q, {q, @@ -131,22 +131,30 @@ init(Q) -> guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -init_with_backing_queue_state(Q, BQ, BQS, RateTRef) -> +init_with_backing_queue_state(Q, BQ, BQS, RateTRef, AckTags, Deliveries) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - process_args(#q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = RateTRef, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()}). + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()})), + lists:foldl( + fun (Delivery, StateN) -> + {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN), + StateN1 + end, State, Deliveries). terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); |
