summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-13 13:58:38 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-13 13:58:38 +0000
commit42f2b1ba6074b7643b30dfeb693c8cb92d4a2a3b (patch)
tree41e657da56ffea66b16c4273230fe5aa7b59ae19
parentbfe4ae94b81f01f6c5561a3862b8fa8ff9a71967 (diff)
downloadrabbitmq-server-git-42f2b1ba6074b7643b30dfeb693c8cb92d4a2a3b.tar.gz
Extend state conversion function
-rw-r--r--src/rabbit_amqqueue_process.erl40
1 files changed, 24 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1c4a371675..a7468936bb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -48,7 +48,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
--export([init_with_backing_queue_state/4]).
+-export([init_with_backing_queue_state/6]).
% Queue's state
-record(q, {q,
@@ -131,22 +131,30 @@ init(Q) ->
guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-init_with_backing_queue_state(Q, BQ, BQS, RateTRef) ->
+init_with_backing_queue_state(Q, BQ, BQS, RateTRef, AckTags, Deliveries) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- process_args(#q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- guid_to_channel = dict:new()}).
+ State = requeue_and_run(
+ AckTags,
+ process_args(
+ #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()})),
+ lists:foldl(
+ fun (Delivery, StateN) ->
+ {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN),
+ StateN1
+ end, State, Deliveries).
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);