diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-21 17:04:36 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-21 17:04:36 +0100 |
| commit | a10db26a6dfc303b5551dafbf4575a77926eca12 (patch) | |
| tree | 894505f7989b271cfd3d75c65589ca0d86292d77 /src | |
| parent | bd006be70b6fc03f2d7226a4aaa337e8daf42697 (diff) | |
| download | rabbitmq-server-git-a10db26a6dfc303b5551dafbf4575a77926eca12.tar.gz | |
Added sync_all for queue_index, trap exits in amqqueue_process, make sure that terminate calls terminate in variable_queue which calls through into terminate in queue_process.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 16 |
3 files changed, 24 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 546d8fbea5..e2477e988b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -98,6 +98,7 @@ start_link(Q) -> init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + process_flag(trap_exit, true), ok = rabbit_memory_manager:register (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), VQS = rabbit_variable_queue:init(QName), @@ -113,6 +114,8 @@ init(Q = #amqqueue { name = QName }) -> {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +terminate(shutdown, #q{variable_queue_state = VQS}) -> + _VQS = rabbit_variable_queue:terminate(VQS); terminate(_Reason, State = #q{variable_queue_state = VQS}) -> %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6df7cc2a20..48da7e3f33 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,7 @@ -module(rabbit_queue_index). -export([init/1, terminate/1, terminate_and_erase/1, write_published/4, - write_delivered/2, write_acks/2, flush_journal/1, + write_delivered/2, write_acks/2, flush_journal/1, sync_all/1, read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). @@ -229,6 +229,16 @@ full_flush_journal(State) -> {false, State1} -> State1 end. +sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) -> + HCState1 = + dict:fold( + fun (_Key, Hdl, HCStateN) -> + {ok, HCStateM} = + horrendously_dumb_file_handle_cache:sync(Hdl, HCStateN), + HCStateM + end, HCState, SegHdls), + State #qistate { hc_state = HCState1 }. + flush_journal(State = #qistate { journal_ack_count = 0 }) -> {false, State}; flush_journal(State = #qistate { journal_ack_dict = JAckDict, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 75ff101e5a..7a85b30272 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,10 +31,10 @@ -module(rabbit_variable_queue). --export([init/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, - remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1, - maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, - tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]). +-export([init/1, terminate/1, publish/2, publish_delivered/2, + set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, + ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, + requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]). %%---------------------------------------------------------------------------- @@ -140,6 +140,9 @@ init(QueueName) -> }, maybe_load_next_segment(State). +terminate(State = #vqstate { index_state = IndexState }) -> + State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. + publish(Msg, State) -> publish(Msg, false, false, State). @@ -383,9 +386,10 @@ do_tx_commit(Pubs, AckTags, From, State) -> {[SeqId | SeqIdsAcc], StateN1} end, {[], State}, Pubs), %% TODO need to do something here about syncing the queue index, PubSeqIds - State2 = ack(AckTags, State1), + State2 = #vqstate { index_state = IndexState } = ack(AckTags, State1), + IndexState1 = rabbit_queue_index:sync_all(IndexState), gen_server2:reply(From, ok), - State2. + State2 #vqstate { index_state = IndexState1 }. %%---------------------------------------------------------------------------- |
