summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2016-06-08 00:34:15 +0300
committerMichael Klishin <michael@clojurewerkz.org>2016-06-08 00:34:15 +0300
commit734be9304f1855ee2042d3924c62991ea33eb3d3 (patch)
tree2d0f2e7bb4967bde32e5b95de980c71dbec0b80a /src
parent7e5c76a20856bb685f1e5839f06ab6059ebcd219 (diff)
parent7e1f0b0213560837c46f829b2b1cbdeb89f7d46a (diff)
downloadrabbitmq-server-git-734be9304f1855ee2042d3924c62991ea33eb3d3.tar.gz
Merge branch 'rabbitmq-server-828' into stable
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl99
-rw-r--r--src/rabbit_msg_store.erl16
-rw-r--r--src/rabbit_queue_index.erl19
3 files changed, 73 insertions, 61 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d5f0cbee6f..78b0095036 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -145,7 +145,8 @@
-export([register_callback/3]).
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
- copy/3, set_maximum_since_use/1, delete/1, clear/1]).
+ copy/3, set_maximum_since_use/1, delete/1, clear/1,
+ open_with_absolute_path/3]).
-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2,
info/0, info/1, clear_read_cache/0]).
@@ -249,6 +250,11 @@
[{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
{'read_buffer', (non_neg_integer() | 'unbuffered')}])
-> val_or_error(ref())).
+-spec(open_with_absolute_path/3 ::
+ (file:filename(), [any()],
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
+ {'read_buffer', (non_neg_integer() | 'unbuffered')}])
+ -> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
val_or_error([char()] | binary()) | 'eof').
@@ -300,9 +306,11 @@ register_callback(M, F, A)
gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
open(Path, Mode, Options) ->
- Path1 = filename:absname(Path),
+ open_with_absolute_path(filename:absname(Path), Mode, Options).
+
+open_with_absolute_path(Path, Mode, Options) ->
File1 = #file { reader_count = RCount, has_writer = HasWriter } =
- case get({Path1, fhc_file}) of
+ case get({Path, fhc_file}) of
File = #file {} -> File;
undefined -> #file { reader_count = 0,
has_writer = false }
@@ -311,7 +319,7 @@ open(Path, Mode, Options) ->
IsWriter = is_writer(Mode1),
case IsWriter andalso HasWriter of
true -> {error, writer_exists};
- false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options),
+ false -> {ok, Ref} = new_closed_handle(Path, Mode1, Options),
case get_or_reopen([{Ref, new}]) of
{ok, [_Handle1]} ->
RCount1 = case is_reader(Mode1) of
@@ -319,7 +327,7 @@ open(Path, Mode, Options) ->
false -> RCount
end,
HasWriter1 = HasWriter orelse IsWriter,
- put({Path1, fhc_file},
+ put({Path, fhc_file},
File1 #file { reader_count = RCount1,
has_writer = HasWriter1 }),
{ok, Ref};
@@ -375,7 +383,7 @@ read(Ref, Count) ->
offset = Offset}
= tune_read_buffer_limit(Handle0, Count),
WantedCount = Count - BufRem,
- case prim_file_read(Hdl, lists:max([BufSz, WantedCount])) of
+ case prim_file_read(Hdl, max(BufSz, WantedCount)) of
{ok, Data} ->
<<_:BufPos/binary, BufTl/binary>> = Buf,
ReadCount = size(Data),
@@ -1297,11 +1305,6 @@ pending_out({N, Queue}) ->
pending_count({Count, _Queue}) ->
Count.
-pending_is_empty({0, _Queue}) ->
- true;
-pending_is_empty({_N, _Queue}) ->
- false.
-
%%----------------------------------------------------------------------------
%% server helpers
%%----------------------------------------------------------------------------
@@ -1348,17 +1351,24 @@ process_open(State = #fhc_state { limit = Limit,
{Pending1, State1} = process_pending(Pending, Limit - used(State), State),
State1 #fhc_state { open_pending = Pending1 }.
-process_obtain(Type, State = #fhc_state { limit = Limit,
- obtain_limit = ObtainLimit }) ->
- ObtainCount = obtain_state(Type, count, State),
- Pending = obtain_state(Type, pending, State),
- Quota = case Type of
- file -> Limit - (used(State));
- socket -> lists:min([ObtainLimit - ObtainCount,
- Limit - (used(State))])
- end,
+process_obtain(socket, State = #fhc_state { limit = Limit,
+ obtain_limit = ObtainLimit,
+ open_count = OpenCount,
+ obtain_count_socket = ObtainCount,
+ obtain_pending_socket = Pending,
+ obtain_count_file = ObtainCountF}) ->
+ Quota = min(ObtainLimit - ObtainCount,
+ Limit - (OpenCount + ObtainCount + ObtainCountF)),
{Pending1, State1} = process_pending(Pending, Quota, State),
- set_obtain_state(Type, pending, Pending1, State1).
+ State1#fhc_state{obtain_pending_socket = Pending1};
+process_obtain(file, State = #fhc_state { limit = Limit,
+ open_count = OpenCount,
+ obtain_count_socket = ObtainCountS,
+ obtain_count_file = ObtainCountF,
+ obtain_pending_file = Pending}) ->
+ Quota = Limit - (OpenCount + ObtainCountS + ObtainCountF),
+ {Pending1, State1} = process_pending(Pending, Quota, State),
+ State1#fhc_state{obtain_pending_file = Pending1}.
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, State};
@@ -1383,26 +1393,21 @@ run_pending_item(#pending { kind = Kind,
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
update_counts(Kind, Pid, Requested, State).
-update_counts(Kind, Pid, Delta,
+update_counts(open, Pid, Delta,
State = #fhc_state { open_count = OpenCount,
- obtain_count_file = ObtainCountF,
- obtain_count_socket = ObtainCountS,
clients = Clients }) ->
- {OpenDelta, ObtainDeltaF, ObtainDeltaS} =
- update_counts1(Kind, Pid, Delta, Clients),
- State #fhc_state { open_count = OpenCount + OpenDelta,
- obtain_count_file = ObtainCountF + ObtainDeltaF,
- obtain_count_socket = ObtainCountS + ObtainDeltaS }.
-
-update_counts1(open, Pid, Delta, Clients) ->
ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
- {Delta, 0, 0};
-update_counts1({obtain, file}, Pid, Delta, Clients) ->
+ State #fhc_state { open_count = OpenCount + Delta};
+update_counts({obtain, file}, Pid, Delta,
+ State = #fhc_state {obtain_count_file = ObtainCountF,
+ clients = Clients }) ->
ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
- {0, Delta, 0};
-update_counts1({obtain, socket}, Pid, Delta, Clients) ->
+ State #fhc_state { obtain_count_file = ObtainCountF + Delta};
+update_counts({obtain, socket}, Pid, Delta,
+ State = #fhc_state {obtain_count_socket = ObtainCountS,
+ clients = Clients }) ->
ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
- {0, 0, Delta}.
+ State #fhc_state { obtain_count_socket = ObtainCountS + Delta}.
maybe_reduce(State) ->
case needs_reduce(State) of
@@ -1410,18 +1415,20 @@ maybe_reduce(State) ->
false -> State
end.
-needs_reduce(State = #fhc_state { limit = Limit,
- open_pending = OpenPending,
- obtain_limit = ObtainLimit,
- obtain_count_socket = ObtainCountS,
- obtain_pending_file = ObtainPendingF,
- obtain_pending_socket = ObtainPendingS }) ->
+needs_reduce(#fhc_state { limit = Limit,
+ open_count = OpenCount,
+ open_pending = {OpenPending, _},
+ obtain_limit = ObtainLimit,
+ obtain_count_socket = ObtainCountS,
+ obtain_count_file = ObtainCountF,
+ obtain_pending_file = {ObtainPendingF, _},
+ obtain_pending_socket = {ObtainPendingS, _} }) ->
Limit =/= infinity
- andalso ((used(State) > Limit)
- orelse (not pending_is_empty(OpenPending))
- orelse (not pending_is_empty(ObtainPendingF))
+ andalso (((OpenCount + ObtainCountS + ObtainCountF) > Limit)
+ orelse (OpenPending =/= 0)
+ orelse (ObtainPendingF =/= 0)
orelse (ObtainCountS < ObtainLimit
- andalso not pending_is_empty(ObtainPendingS))).
+ andalso (ObtainPendingS =/= 0))).
reduce(State = #fhc_state { open_pending = OpenPending,
obtain_pending_file = ObtainPendingFile,
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6754c606bb..2300664687 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1360,9 +1360,10 @@ should_mask_action(CRef, MsgId,
%%----------------------------------------------------------------------------
open_file(Dir, FileName, Mode) ->
- file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
- {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+ file_handle_cache:open_with_absolute_path(
+ form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
+ {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
@@ -2112,10 +2113,11 @@ transform_dir(BaseDir, Store, TransformFun) ->
transform_msg_file(FileOld, FileNew, TransformFun) ->
ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
- {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
- {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
- [{write_buffer,
- ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, RefOld} = file_handle_cache:open_with_absolute_path(
+ FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open_with_absolute_path(
+ FileNew, [raw, binary, write],
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]),
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 981d8e74ff..06b6961edb 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -816,8 +816,9 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
_ ->
file_handle_cache_stats:update(queue_index_write),
- {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
- [{write_buffer, infinity}]),
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
%% the file_handle_cache also does a list reverse, so this
%% might not be required here, but before we were doing a
%% sparse_foldr, a lists:reverse/1 seems to be the correct
@@ -832,8 +833,8 @@ get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
ok = rabbit_file:ensure_dir(Path),
- {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
- [{write_buffer, infinity}]),
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?WRITE_MODE, [{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
{Hdl, State}.
@@ -1058,7 +1059,8 @@ load_segment(KeepAcked, #segment { path = Path }) ->
false -> Empty;
true -> Size = rabbit_file:file_size(Path),
file_handle_cache_stats:update(queue_index_read),
- {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
{ok, SegBin} = file_handle_cache:read(Hdl, Size),
ok = file_handle_cache:close(Hdl),
@@ -1383,10 +1385,11 @@ transform_file(Path, Fun) when is_function(Fun)->
case rabbit_file:file_size(Path) of
0 -> ok;
Size -> {ok, PathTmpHdl} =
- file_handle_cache:open(PathTmp, ?WRITE_MODE,
- [{write_buffer, infinity}]),
+ file_handle_cache:open_with_absolute_path(
+ PathTmp, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
- {ok, PathHdl} = file_handle_cache:open(
+ {ok, PathHdl} = file_handle_cache:open_with_absolute_path(
Path, ?READ_MODE, [{read_buffer, Size}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),