diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_invariable_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 359 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 96 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 177 |
11 files changed, 411 insertions, 420 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 73fd6f0e51..85452abfaa 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, set_admin/1, clear_admin/1, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). --export([set_permissions/5, set_permissions/6, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, list_user_vhost_permissions/2]). @@ -52,9 +52,6 @@ -type(username() :: binary()). -type(password() :: binary()). -type(regexp() :: binary()). --type(scope() :: binary()). --type(scope_atom() :: 'client' | 'all'). - -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | rabbit_types:channel_exit()). @@ -82,21 +79,15 @@ -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). --spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), - regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). -spec(list_permissions/0 :: - () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), - scope_atom()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), - scope_atom()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). -spec(list_user_vhost_permissions/2 :: - (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), - scope_atom()}]). + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp()}]). -endif. @@ -188,20 +179,15 @@ check_resource_access(Username, [] -> false; [#user_permission{permission = P}] -> - case {Name, P} of - {<<"amq.gen",_/binary>>, #permission{scope = client}} -> - true; - _ -> - PermRegexp = - case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false - end + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false end end, if Res -> ok; @@ -334,7 +320,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -355,16 +341,7 @@ validate_regexp(RegexpBin) -> end. set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> - set_permissions(<<"client">>, Username, VHostPath, ConfigurePerm, - WritePerm, ReadPerm). - -set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), - Scope = case ScopeBin of - <<"client">> -> client; - <<"all">> -> all; - _ -> throw({error, {invalid_scope, ScopeBin}}) - end, rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -374,7 +351,6 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, configure = ConfigurePerm, write = WritePerm, read = ReadPerm}}, @@ -393,35 +369,34 @@ clear_permissions(Username, VHostPath) -> end)). list_permissions() -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(match_user_vhost('_', '_'))]. list_vhost_permissions(VHostPath) -> - [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_user_vhost_permissions(Username, VHostPath) -> - [{ConfigurePerm, WritePerm, ReadPerm, Scope} || - {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + [{ConfigurePerm, WritePerm, ReadPerm} || + {_, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user_and_vhost( Username, VHostPath, match_user_vhost(Username, VHostPath)))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ scope = Scope, - configure = ConfigurePerm, + permission = #permission{ configure = ConfigurePerm, write = WritePerm, read = ReadPerm}} <- %% TODO: use dirty ops instead diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index bc2b5ac9e7..9d78bafa95 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) -> "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- - [{<<"x-expires">>, fun check_expires_argument/1}, + [{<<"x-expires">>, fun check_expires_argument/1}, {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9b70a8f5c..6048920e2b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -150,22 +150,6 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -init_queue_state(State) -> - lists:foldl(fun(F, S) -> F(S) end, State, - [fun init_expires/1, fun init_ttl/1]). - -init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State - end. - -init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of - {_Type, TTL} -> drop_expired_messages(State#q{ttl = TTL}); - undefined -> State - end. - declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined, @@ -180,8 +164,7 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - State1 = init_queue_state( - State#q{backing_queue_state = BQS}), + State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(StatsTimer, @@ -190,6 +173,19 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. +process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> + lists:foldl(fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}]). + +init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). + +init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -588,10 +584,8 @@ reset_msg_expiry_fun(TTL) -> message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. -calculate_msg_expiry(undefined) -> - undefined; -calculate_msg_expiry(TTL) -> - now_millis() + (TTL * 1000). +calculate_msg_expiry(undefined) -> undefined; +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; @@ -610,18 +604,15 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ttl_timer_ref = undefined}) when TTL =/= undefined -> case BQ:is_empty(BQS) of - true -> - State; - false -> - State#q{ttl_timer_ref = - timer:apply_after(TTL, rabbit_amqqueue, - drop_expired, [self()])} + true -> State; + false -> TRef = timer:apply_after(TTL, rabbit_amqqueue, drop_expired, + [self()]), + State#q{ttl_timer_ref = TRef} end; ensure_ttl_timer(State) -> State. -now_millis() -> - timer:now_diff(now(), {0,0,0}). +now_millis() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8facaf1606..6b21274529 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -39,7 +39,6 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). --define(SCOPE_OPT, "-s"). %%---------------------------------------------------------------------------- @@ -67,7 +66,7 @@ start() -> {[Command0 | Args], Opts} = rabbit_misc:get_options( [{flag, ?QUIET_OPT}, {option, ?NODE_OPT, NodeStr}, - {option, ?VHOST_OPT, "/"}, {option, ?SCOPE_OPT, "client"}], + {option, ?VHOST_OPT, "/"}], FullCommand), Opts1 = lists:map(fun({K, V}) -> case K of @@ -289,10 +288,9 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Scope = proplists:get_value(?SCOPE_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Scope, Username, VHost, CPerm, WPerm, RPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index ad01d8f727..9855f18cd0 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -122,12 +122,10 @@ dropwhile(_Pred, State = #iv_state { len = 0 }) -> dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), case Pred(MsgProps) of - true -> - {_, State1} = - fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), - dropwhile(Pred, State1); - false -> - State + true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, + IsDelivered, State), + dropwhile(Pred, State1); + false -> State end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf94..b9f3e1a3a4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -37,7 +37,8 @@ client_init/2, client_terminate/2, client_delete_and_terminate/3, write/4, read/3, contains/2, remove/2, release/2, sync/3]). --export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal +-export([sync/1, set_maximum_since_use/2, + has_readers/2, combine_files/3, delete_file/2]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -74,7 +75,6 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_active, %% is the GC currently working? gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table @@ -100,10 +100,27 @@ -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). +-record(gc_state, + { dir, + index_module, + index_state, + file_summary_ets, + msg_store + }). + %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([gc_state/0, file_num/0]). + +-opaque(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). + -type(server() :: pid() | atom()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { @@ -138,12 +155,11 @@ -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). -spec(sync/1 :: (server()) -> 'ok'). --spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> - 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(gc/3 :: (non_neg_integer(), non_neg_integer(), - {ets:tid(), file:filename(), atom(), any()}) -> - 'concurrent_readers' | non_neg_integer()). +-spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). +-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> + non_neg_integer()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> non_neg_integer()). -endif. @@ -375,9 +391,6 @@ sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). -gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). - set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -482,9 +495,14 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, read_from_disk(MsgLocation, CState1, DedupCacheEts), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; - MsgLocation -> %% different file! + #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState) + client_read1(Server, MsgLocation, Defer, CState); + not_found -> %% it seems not to exist. Defer, just to be sure. + try Release() %% this can badarg, same as locked case, above + catch error:badarg -> ok + end, + Defer() end end. @@ -547,8 +565,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = [], - gc_active = false, + pending_gc_completion = orddict:new(), gc_pid = undefined, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -570,8 +587,13 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {ok, Offset} = file_handle_cache:position(CurHdl, Offset), ok = file_handle_cache:truncate(CurHdl), - {ok, GCPid} = rabbit_msg_store_gc:start_link(Dir, IndexState, IndexModule, - FileSummaryEts), + {ok, GCPid} = rabbit_msg_store_gc:start_link( + #gc_state { dir = Dir, + index_module = IndexModule, + index_state = IndexState, + file_summary_ets = FileSummaryEts, + msg_store = self() + }), {ok, maybe_compact( State1 #msstate { current_file_handle = CurHdl, gc_pid = GCPid }), @@ -588,10 +610,11 @@ prioritise_call(Msg, _From, _State) -> prioritise_cast(Msg, _State) -> case Msg of - sync -> 8; - {gc_done, _Reclaimed, _Source, _Destination} -> 8; - {set_maximum_since_use, _Age} -> 8; - _ -> 0 + sync -> 8; + {combine_files, _Source, _Destination, _Reclaimed} -> 8; + {delete_file, _File, _Reclaimed} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 end. handle_call(successfully_recovered_state, _From, State) -> @@ -686,37 +709,23 @@ handle_cast({sync, Guids, K}, handle_cast(sync, State) -> noreply(internal_sync(State)); -handle_cast({gc_done, Reclaimed, Src, Dst}, +handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, - gc_active = {Src, Dst}, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts }) -> - %% GC done, so now ensure that any clients that have open fhs to - %% those files close them before using them again. This has to be - %% done here (given it's done in the msg_store, and not the gc), - %% and not when starting up the GC, because if done when starting - %% up the GC, the client could find the close, and close and - %% reopen the fh, whilst the GC is waiting for readers to - %% disappear, before it's actually done the GC. - true = mark_handle_to_close(FileHandlesEts, Src), - true = mark_handle_to_close(FileHandlesEts, Dst), - %% we always move data left, so Src has gone and was on the - %% right, so need to make dest = source.right.left, and also - %% dest.right = source.right - [#file_summary { left = Dst, - right = SrcRight, - locked = true, - readers = 0 }] = ets:lookup(FileSummaryEts, Src), - %% this could fail if SrcRight =:= undefined - ets:update_element(FileSummaryEts, SrcRight, {#file_summary.left, Dst}), - true = ets:update_element(FileSummaryEts, Dst, - [{#file_summary.locked, false}, - {#file_summary.right, SrcRight}]), - true = ets:delete(FileSummaryEts, Src), - noreply( - maybe_compact(run_pending( - State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false }))); + ok = cleanup_after_file_deletion(Source, State), + %% see comment in cleanup_after_file_deletion + true = mark_handle_to_close(FileHandlesEts, Destination), + true = ets:update_element(FileSummaryEts, Destination, + {#file_summary.locked, false}), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([Source, Destination], State1))); + +handle_cast({delete_file, File, Reclaimed}, + State = #msstate { sum_file_size = SumFileSize }) -> + ok = cleanup_after_file_deletion(File, State), + State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, + noreply(maybe_compact(run_pending([File], State1))); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -867,7 +876,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> add_to_pending_gc_completion({read, Guid, From}, - State); + File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -897,19 +906,18 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> +contains_message(Guid, From, + State = #msstate { pending_gc_completion = Pending }) -> case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> - case GCActive of - {A, B} when File =:= A orelse File =:= B -> - add_to_pending_gc_completion( - {contains, Guid, From}, State); - _ -> - gen_server2:reply(From, true), - State + case orddict:is_key(File, Pending) of + true -> add_to_pending_gc_completion( + {contains, Guid, From}, File, State); + false -> gen_server2:reply(From, true), + State end end. @@ -928,7 +936,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, State); + add_to_pending_gc_completion({remove, Guid}, File, State); [#file_summary {}] -> ok = Dec(), [_] = ets:update_counter( @@ -944,20 +952,25 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, end. add_to_pending_gc_completion( - Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op | Pending] }. - -run_pending(State = #msstate { pending_gc_completion = [] }) -> - State; -run_pending(State = #msstate { pending_gc_completion = Pending }) -> - State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + Op, File, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = + rabbit_misc:orddict_cons(File, Op, Pending) }. -run_pending({read, Guid, From}, State) -> +run_pending(Files, State) -> + lists:foldl( + fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> + Pending1 = orddict:erase(File, Pending), + lists:foldl( + fun run_pending_action/2, + State1 #msstate { pending_gc_completion = Pending1 }, + lists:reverse(orddict:fetch(File, Pending))) + end, State, Files). + +run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); -run_pending({contains, Guid, From}, State) -> +run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending({remove, Guid}, State) -> +run_pending_action({remove, Guid}, State) -> remove_message(Guid, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> @@ -969,6 +982,10 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +orddict_store(Key, Val, Dict) -> + false = orddict:is_key(Key, Dict), + orddict:store(Key, Val, Dict). + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- @@ -1429,12 +1446,12 @@ maybe_roll_to_new_file( maybe_roll_to_new_file(_, State) -> State. -maybe_compact(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_active = false, - gc_pid = GCPid, - file_summary_ets = FileSummaryEts, - file_size_limit = FileSizeLimit }) +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + gc_pid = GCPid, + pending_gc_completion = Pending, + file_summary_ets = FileSummaryEts, + file_size_limit = FileSizeLimit }) when (SumFileSize > 2 * FileSizeLimit andalso (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> %% TODO: the algorithm here is sub-optimal - it may result in a @@ -1443,27 +1460,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, '$end_of_table' -> State; First -> - case find_files_to_gc(FileSummaryEts, FileSizeLimit, - ets:lookup(FileSummaryEts, First)) of + case find_files_to_combine(FileSummaryEts, FileSizeLimit, + ets:lookup(FileSummaryEts, First)) of not_found -> State; {Src, Dst} -> + Pending1 = orddict_store(Dst, [], + orddict_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), true = ets:update_element(FileSummaryEts, Dst, {#file_summary.locked, true}), - ok = rabbit_msg_store_gc:gc(GCPid, Src, Dst), - State1 #msstate { gc_active = {Src, Dst} } + ok = rabbit_msg_store_gc:combine(GCPid, Src, Dst), + State1 #msstate { pending_gc_completion = Pending1 } end end; maybe_compact(State) -> State. -find_files_to_gc(FileSummaryEts, FileSizeLimit, - [#file_summary { file = Dst, - valid_total_size = DstValid, - right = Src }]) -> +find_files_to_combine(FileSummaryEts, FileSizeLimit, + [#file_summary { file = Dst, + valid_total_size = DstValid, + right = Src, + locked = DstLocked }]) -> case Src of undefined -> not_found; @@ -1471,13 +1491,16 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, [#file_summary { file = Src, valid_total_size = SrcValid, left = Dst, - right = SrcRight }] = Next = + right = SrcRight, + locked = SrcLocked }] = Next = ets:lookup(FileSummaryEts, Src), case SrcRight of undefined -> not_found; - _ -> case DstValid + SrcValid =< FileSizeLimit of + _ -> case (DstValid + SrcValid =< FileSizeLimit) andalso + (DstValid > 0) andalso (SrcValid > 0) andalso + not (DstLocked orelse SrcLocked) of true -> {Src, Dst}; - false -> find_files_to_gc( + false -> find_files_to_combine( FileSummaryEts, FileSizeLimit, Next) end end @@ -1486,85 +1509,86 @@ find_files_to_gc(FileSummaryEts, FileSizeLimit, delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, State = #msstate { - dir = Dir, - sum_file_size = SumFileSize, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + gc_pid = GCPid, + file_summary_ets = FileSummaryEts, + pending_gc_completion = Pending }) -> [#file_summary { valid_total_size = ValidData, - left = Left, - right = Right, - file_size = FileSize, locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - %% we should NEVER find the current file in here hence right - %% should always be a file, not undefined - 0 -> case {Left, Right} of - {undefined, _} when Right =/= undefined -> - %% the eldest file is empty. - true = ets:update_element( - FileSummaryEts, Right, - {#file_summary.left, undefined}); - {_, _} when Right =/= undefined -> - true = ets:update_element(FileSummaryEts, Right, - {#file_summary.left, Left}), - true = ets:update_element(FileSummaryEts, Left, - {#file_summary.right, Right}) - end, - true = mark_handle_to_close(FileHandlesEts, File), - true = ets:delete(FileSummaryEts, File), - {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - [index_delete(Guid, State) || - {Guid, _TotalSize, _Offset} <- Messages], - State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - State1 #msstate { sum_file_size = SumFileSize - FileSize }; + 0 -> %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + true = ets:update_element(FileSummaryEts, File, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:delete(GCPid, File), + Pending1 = orddict_store(File, [], Pending), + close_handle(File, + State #msstate { pending_gc_completion = Pending1 }); _ -> State end. +cleanup_after_file_deletion(File, + #msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts }) -> + %% Ensure that any clients that have open fhs to the file close + %% them before using them again. This has to be done here (given + %% it's done in the msg_store, and not the gc), and not when + %% starting up the GC, because if done when starting up the GC, + %% the client could find the close, and close and reopen the fh, + %% whilst the GC is waiting for readers to disappear, before it's + %% actually done the GC. + true = mark_handle_to_close(FileHandlesEts, File), + [#file_summary { left = Left, + right = Right, + locked = true, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + %% We'll never delete the current file, so right is never undefined + true = Right =/= undefined, %% ASSERTION + true = ets:update_element(FileSummaryEts, Right, + {#file_summary.left, Left}), + %% ensure the double linked list is maintained + true = case Left of + undefined -> true; %% File is the eldest file (left-most) + _ -> ets:update_element(FileSummaryEts, Left, + {#file_summary.right, Right}) + end, + true = ets:delete(FileSummaryEts, File), + ok. + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- external %%---------------------------------------------------------------------------- -gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> - [SrcObj = #file_summary { - readers = SrcReaders, - left = DstFile, - file_size = SrcFileSize, - locked = true }] = ets:lookup(FileSummaryEts, SrcFile), - [DstObj = #file_summary { - readers = DstReaders, - right = SrcFile, - file_size = DstFileSize, - locked = true }] = ets:lookup(FileSummaryEts, DstFile), - - case SrcReaders =:= 0 andalso DstReaders =:= 0 of - true -> TotalValidData = combine_files(SrcObj, DstObj, State), - %% don't update dest.right, because it could be - %% changing at the same time - true = ets:update_element( - FileSummaryEts, DstFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SrcFileSize + DstFileSize - TotalValidData; - false -> concurrent_readers - end. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - right = Source }, - State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), +has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> + [#file_summary { locked = true, readers = Count }] = + ets:lookup(FileSummaryEts, File), + Count /= 0. + +combine_files(Source, Destination, + State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), + [#file_summary { + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), + + SourceName = filenum_to_name(Source), + DestinationName = filenum_to_name(Destination), {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), {ok, DestinationHdl} = open_file(Dir, DestinationName, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, + TotalValidData = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't %% need a tmp file %% if they're not equal, then we need to write out everything past @@ -1577,7 +1601,7 @@ combine_files(#file_summary { file = Source, drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of [] -> ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); + DestinationHdl, DestinationContiguousTop, TotalValidData); _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), ok = copy_messages( @@ -1591,7 +1615,7 @@ combine_files(#file_summary { file = Source, %% Destination and copy from Tmp back to the end {ok, 0} = file_handle_cache:position(TmpHdl, 0), ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), + DestinationHdl, DestinationContiguousTop, TotalValidData), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), %% position in DestinationHdl should now be DestinationValid @@ -1599,14 +1623,36 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + ok = copy_messages(SourceWorkList, DestinationValid, TotalValidData, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), ok = file_handle_cache:delete(SourceHdl), - ExpectedSize. -load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> + %% don't update dest.right, because it could be changing at the + %% same time + true = ets:update_element( + FileSummaryEts, Destination, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + + Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + +delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + dir = Dir, + msg_store = Server }) -> + [#file_summary { valid_total_size = 0, + locked = true, + file_size = FileSize, + readers = 0 }] = ets:lookup(FileSummaryEts, File), + {[], 0} = load_and_vacuum_message_file(File, State), + ok = file:delete(form_filename(Dir, filenum_to_name(File))), + gen_server2:cast(Server, {delete_file, File, FileSize}). + +load_and_vacuum_message_file(File, #gc_state { dir = Dir, + index_module = Index, + index_state = IndexState }) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), @@ -1627,7 +1673,8 @@ load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> + Destination, #gc_state { index_module = Index, + index_state = IndexState }) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index a7855bbf79..cd9fd4973f 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -33,20 +33,16 @@ -behaviour(gen_server2). --export([start_link/4, gc/3, no_readers/2, stop/1]). +-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]). -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). --record(gcstate, - {dir, - index_state, - index_module, - parent, - file_summary_ets, - scheduled +-record(state, + { pending_no_readers, + msg_store_state }). -include("rabbit.hrl"). @@ -55,10 +51,12 @@ -ifdef(use_specs). --spec(start_link/4 :: (file:filename(), any(), atom(), ets:tid()) -> +-spec(start_link/1 :: (rabbit_msg_store:gc_state()) -> rabbit_types:ok_pid_or_error()). --spec(gc/3 :: (pid(), non_neg_integer(), non_neg_integer()) -> 'ok'). --spec(no_readers/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(combine/3 :: (pid(), rabbit_msg_store:file_num(), + rabbit_msg_store:file_num()) -> 'ok'). +-spec(delete/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). +-spec(no_readers/2 :: (pid(), rabbit_msg_store:file_num()) -> 'ok'). -spec(stop/1 :: (pid()) -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,13 +64,15 @@ %%---------------------------------------------------------------------------- -start_link(Dir, IndexState, IndexModule, FileSummaryEts) -> - gen_server2:start_link( - ?MODULE, [self(), Dir, IndexState, IndexModule, FileSummaryEts], - [{timeout, infinity}]). +start_link(MsgStoreState) -> + gen_server2:start_link(?MODULE, [MsgStoreState], + [{timeout, infinity}]). -gc(Server, Source, Destination) -> - gen_server2:cast(Server, {gc, Source, Destination}). +combine(Server, Source, Destination) -> + gen_server2:cast(Server, {combine, Source, Destination}). + +delete(Server, File) -> + gen_server2:cast(Server, {delete, File}). no_readers(Server, File) -> gen_server2:cast(Server, {no_readers, File}). @@ -85,16 +85,11 @@ set_maximum_since_use(Pid, Age) -> %%---------------------------------------------------------------------------- -init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> +init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - {ok, #gcstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = undefined }, - hibernate, + {ok, #state { pending_no_readers = dict:new(), + msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; @@ -103,18 +98,23 @@ prioritise_cast(_Msg, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, - State = #gcstate { scheduled = undefined }) -> - {noreply, attempt_gc(State #gcstate { scheduled = {Source, Destination} }), - hibernate}; +handle_cast({combine, Source, Destination}, State) -> + {noreply, attempt_action(combine, [Source, Destination], State), hibernate}; -handle_cast({no_readers, File}, - State = #gcstate { scheduled = {Source, Destination} }) - when File =:= Source orelse File =:= Destination -> - {noreply, attempt_gc(State), hibernate}; +handle_cast({delete, File}, State) -> + {noreply, attempt_action(delete, [File], State), hibernate}; -handle_cast({no_readers, _File}, State) -> - {noreply, State, hibernate}; +handle_cast({no_readers, File}, + State = #state { pending_no_readers = Pending }) -> + {noreply, case dict:find(File, Pending) of + error -> + State; + {ok, {Action, Files}} -> + Pending1 = dict:erase(File, Pending), + attempt_action( + Action, Files, + State #state { pending_no_readers = Pending1 }) + end, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -129,16 +129,18 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -attempt_gc(State = #gcstate { dir = Dir, - index_state = IndexState, - index_module = Index, - parent = Parent, - file_summary_ets = FileSummaryEts, - scheduled = {Source, Destination} }) -> - case rabbit_msg_store:gc(Source, Destination, - {FileSummaryEts, Dir, Index, IndexState}) of - concurrent_readers -> State; - Reclaimed -> ok = rabbit_msg_store:gc_done( - Parent, Reclaimed, Source, Destination), - State #gcstate { scheduled = undefined } +attempt_action(Action, Files, + State = #state { pending_no_readers = Pending, + msg_store_state = MsgStoreState }) -> + case [File || File <- Files, + rabbit_msg_store:has_readers(File, MsgStoreState)] of + [] -> do_action(Action, Files, MsgStoreState), + State; + [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), + State #state { pending_no_readers = Pending1 } end. + +do_action(combine, [Source, Destination], MsgStoreState) -> + rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); +do_action(delete, [File], MsgStoreState) -> + rabbit_msg_store:delete_file(File, MsgStoreState). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index f84dff833a..1b8371286c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), +%% the tuple: {('no_pub'|{Guid, MsgProps, IsPersistent}), %% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly %% necessary for most operations. However, for startup, and to ensure %% the safe and correct combination of journal entries with entries @@ -162,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProps, IsPersistent} -define(READ_MODE, [binary, raw, read]). -define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). @@ -259,8 +259,7 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProperties, IsPersistent, State) - when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( @@ -269,9 +268,9 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]), + create_pub_record_body(Guid, MsgProps)]), maybe_flush_journal( - add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). + add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -464,9 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, - {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, - Segment1) -> + fun (RelSeq, {{Guid, _MsgProps, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -519,8 +516,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, - {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> @@ -678,8 +674,8 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case read_pub_record_body(Hdl) of - {Guid, MsgProperties} -> - Publish = {Guid, MsgProperties, + {Guid, MsgProps} -> + Publish = {Guid, MsgProps, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false @@ -781,12 +777,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, MsgProperties, IsPersistent} -> + {Guid, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]) + create_pub_record_body(Guid, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -806,12 +802,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, - {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, - Acc) + fun (RelSeq, {{Guid, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProps, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -841,8 +835,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> - {Guid, MsgProperties} = read_pub_record_body(Hdl), - Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProps} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 00547a2674..f2a65eeb29 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,8 +41,8 @@ -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). +-define(PERSISTENT_MSG_STORE, msg_store_persistent). +-define(TRANSIENT_MSG_STORE, msg_store_transient). test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], @@ -962,9 +962,6 @@ test_user_management() -> control_action(list_permissions, [], [{"-p", "/testhost"}]), {error, {invalid_regexp, _, _}} = control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), - {error, {invalid_scope, _}} = - control_action(set_permissions, ["guest", "foo", ".*", ".*"], - [{"-s", "cilent"}]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -987,9 +984,7 @@ test_user_management() -> ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], [{"-p", "/testhost"}]), ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "client"}]), - ok = control_action(set_permissions, ["foo", ".*", ".*", ".*"], - [{"-p", "/testhost"}, {"-s", "all"}]), + [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_permissions, [], [{"-p", "/testhost"}]), ok = control_action(list_user_permissions, ["foo"]), @@ -1297,7 +1292,7 @@ info_action(Command, Args, CheckVHost) -> {bad_argument, dummy} = control_action(Command, ["dummy"]), ok. -default_options() -> [{"-s", "client"}, {"-p", "/"}, {"-q", "false"}]. +default_options() -> [{"-p", "/"}, {"-q", "false"}]. expand_options(As, Bs) -> lists:foldl(fun({K, _}=A, R) -> diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index d625bda1ad..91f2c4cae0 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -37,13 +37,13 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, - unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, listener/0, + unencoded_content/0, encoded_content/0, message_properties/0, + vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0, message_properties/0]). + connection_exit/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0b948c1b8e..a1c442d3df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -249,7 +249,7 @@ is_delivered, msg_on_disk, index_on_disk, - msg_properties + msg_props }). -record(delta, @@ -499,32 +499,32 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProperties, State) -> - {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), +publish(Msg, MsgProps, State) -> + {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + pending_ack = PA1 })}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -532,7 +532,7 @@ dropwhile(Pred, State) -> dropwhile1(Pred, State) -> internal_queue_out( - fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> + fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of true -> {_, State2} = internal_fetch(false, MsgStatus, State1), @@ -575,22 +575,26 @@ read_msg(MsgStatus = #msg_status { msg = undefined, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), - {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, msg_store_clients = MSCState1 }}; read_msg(MsgStatus, State) -> {MsgStatus, State}. -internal_fetch(AckRequired, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, - persistent_count = PCount, pending_ack = PA }) -> +internal_fetch(AckRequired, MsgStatus = #msg_status { + seq_id = SeqId, + guid = Guid, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + State = #vqstate {ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -634,14 +638,13 @@ ack(AckTags, State) -> fun (_AckEntry, State1) -> State1 end, AckTags, State)). -tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, +tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProps), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -683,18 +686,16 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg, - msg_properties = MsgProperties }, State1) -> - {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, - false, State1), + fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> + {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), + true, false, State1), State2; - ({IsPersistent, Guid, MsgProperties}, State1) -> + ({IsPersistent, Guid, MsgProps}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps), true, true, State2), State3 end, @@ -844,11 +845,11 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, - MsgProperties) -> + MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, - msg_properties = MsgProperties }. + msg_props = MsgProps }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -883,26 +884,26 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} - <- Pubs]. + [Guid || {#basic_message { guid = Guid, + is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProps, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {[m(#msg_status { msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_properties = MsgProperties + false -> {[m(#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps }) | Filtered1], Delivers1, Acks1} @@ -978,9 +979,10 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, - _Guid, _MsgProps} -> IsPersistent + #msg_status {} -> + false; + {IsPersistent, _Guid, _MsgProps} -> + IsPersistent end]; false -> [] end, @@ -1011,21 +1013,16 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:foldl( - fun({Fun, PubsN}, OuterAcc) -> - lists:foldl( - fun({Msg, MsgProps}, Acc) -> - [{Msg, Fun(MsgProps)} | Acc] - end, OuterAcc, PubsN) - end, [], SPubs), + Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), + {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties}, + MsgProps}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, {SeqId, State3} = - publish(Msg, MsgProperties, false, IsPersistent1, State2), + publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1082,7 +1079,7 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, IsDelivered, MsgOnDisk, + MsgProps, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1091,7 +1088,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of @@ -1112,16 +1109,16 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - {ok, MSCState1} = - with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) - end), + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + {ok, MSCState1} = with_msg_store_state( + MSCState, IsPersistent, + fun (MsgStore, MSCState2) -> + rabbit_msg_store:write(MsgStore, Guid, Msg1, + MSCState2) + end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. @@ -1131,19 +1128,15 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_properties = MsgProperties}, - IndexState) + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_props = MsgProps}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, - MsgProperties, - IsPersistent, - IndexState), + IndexState1 = rabbit_queue_index:publish( + Guid, SeqId, MsgProps, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> @@ -1163,13 +1156,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, %% Internal gubbins for acks %%---------------------------------------------------------------------------- -record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, +record_pending_ack(#msg_status { seq_id = SeqId, + guid = Guid, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - msg_properties = MsgProperties } = MsgStatus, - PA) -> + msg_on_disk = MsgOnDisk, + msg_props = MsgProps } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProperties}; + true -> {IsPersistent, Guid, MsgProps}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1220,9 +1213,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, - {IsPersistent, Guid, _MsgProperties}, - {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. |
