summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-21 17:04:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-21 17:04:36 +0100
commita10db26a6dfc303b5551dafbf4575a77926eca12 (patch)
tree894505f7989b271cfd3d75c65589ca0d86292d77 /src
parentbd006be70b6fc03f2d7226a4aaa337e8daf42697 (diff)
downloadrabbitmq-server-git-a10db26a6dfc303b5551dafbf4575a77926eca12.tar.gz
Added sync_all for queue_index, trap exits in amqqueue_process, make sure that terminate calls terminate in variable_queue which calls through into terminate in queue_process.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_queue_index.erl12
-rw-r--r--src/rabbit_variable_queue.erl16
3 files changed, 24 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 546d8fbea5..e2477e988b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -98,6 +98,7 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ process_flag(trap_exit, true),
ok = rabbit_memory_manager:register
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
VQS = rabbit_variable_queue:init(QName),
@@ -113,6 +114,8 @@ init(Q = #amqqueue { name = QName }) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+terminate(shutdown, #q{variable_queue_state = VQS}) ->
+ _VQS = rabbit_variable_queue:terminate(VQS);
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6df7cc2a20..48da7e3f33 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -32,7 +32,7 @@
-module(rabbit_queue_index).
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
- write_delivered/2, write_acks/2, flush_journal/1,
+ write_delivered/2, write_acks/2, flush_journal/1, sync_all/1,
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
@@ -229,6 +229,16 @@ full_flush_journal(State) ->
{false, State1} -> State1
end.
+sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) ->
+ HCState1 =
+ dict:fold(
+ fun (_Key, Hdl, HCStateN) ->
+ {ok, HCStateM} =
+ horrendously_dumb_file_handle_cache:sync(Hdl, HCStateN),
+ HCStateM
+ end, HCState, SegHdls),
+ State #qistate { hc_state = HCState1 }.
+
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
{false, State};
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 75ff101e5a..7a85b30272 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,10 +31,10 @@
-module(rabbit_variable_queue).
--export([init/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2,
- remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
- maybe_start_prefetcher/1, purge/1, delete/1, requeue/2,
- tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
+-export([init/1, terminate/1, publish/2, publish_delivered/2,
+ set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
+ ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
+ requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
%%----------------------------------------------------------------------------
@@ -140,6 +140,9 @@ init(QueueName) ->
},
maybe_load_next_segment(State).
+terminate(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
+
publish(Msg, State) ->
publish(Msg, false, false, State).
@@ -383,9 +386,10 @@ do_tx_commit(Pubs, AckTags, From, State) ->
{[SeqId | SeqIdsAcc], StateN1}
end, {[], State}, Pubs),
%% TODO need to do something here about syncing the queue index, PubSeqIds
- State2 = ack(AckTags, State1),
+ State2 = #vqstate { index_state = IndexState } = ack(AckTags, State1),
+ IndexState1 = rabbit_queue_index:sync_all(IndexState),
gen_server2:reply(From, ok),
- State2.
+ State2 #vqstate { index_state = IndexState1 }.
%%----------------------------------------------------------------------------