diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-28 15:41:19 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-28 15:41:19 +0100 |
| commit | 241d1072358daa2824ab67de028ab862a2e0d578 (patch) | |
| tree | 87034ddd4fbad73e5f91a69224db9915ca2ab4e6 | |
| parent | 3ce10894acda38fb92494d91196e2166faa7bf9f (diff) | |
| download | rabbitmq-server-git-241d1072358daa2824ab67de028ab862a2e0d578.tar.gz | |
Make the tail fun in execute_mnesia_tx_with_tail *only* get executed after the tx, and roll other uses into the tx fun. This is rather simpler hopefully.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 17 |
4 files changed, 49 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 80dcb79a4a..60b7b38409 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -214,13 +214,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case mnesia:read({rabbit_durable_queue, QueueName}) of [] -> ok = store_queue(Q), B = add_default_binding(Q), - fun (Tx) -> - R = B(Tx), - case Tx of - transaction -> R; - _ -> Q - end - end; + fun () -> B(), Q end; %% Q exists on stopped node [_] -> rabbit_misc:const(not_found) end; @@ -228,7 +222,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> case rabbit_misc:is_process_alive(QPid) of true -> rabbit_misc:const(ExistingQ); false -> TailFun = internal_delete(QueueName), - fun (Tx) -> TailFun(Tx), ExistingQ end + fun () -> TailFun(), ExistingQ end end end end). @@ -439,8 +433,10 @@ internal_delete(QueueName) -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> rabbit_misc:const({error, not_found}); [_] -> Deletions = internal_delete1(QueueName), - fun (Tx) -> rabbit_binding:process_deletions( - Deletions, Tx) + Serials = rabbit_binding:process_deletions( + Deletions, transaction), + fun () -> rabbit_binding:process_deletions( + Deletions, Serials) end end end). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 1564573ecf..b765d5e135 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -122,20 +122,23 @@ add(Binding, InnerFun) -> case InnerFun(Src, Dst) of ok -> case mnesia:read({rabbit_route, B}) of - [] -> ok = sync_binding(B, all_durable([Src, Dst]), - fun mnesia:write/3), - fun (Tx) -> - ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), - process_addition(Src, B, Tx) - end; - [_] -> fun rabbit_misc:const_ok/1 + [] -> add_notify(Src, Dst, B); + [_] -> fun rabbit_misc:const_ok/0 end; {error, _} = Err -> rabbit_misc:const(Err) end end). +add_notify(Src, Dst, B) -> + ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:write/3), + ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]), + Serial = serial(Src), + fun () -> + ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]), + ok = rabbit_event:notify(binding_created, info(B)) + end. + remove(Binding, InnerFun) -> binding_action( Binding, @@ -160,7 +163,8 @@ remove(Binding, InnerFun) -> {error, _} = Err -> rabbit_misc:const(Err); {ok, Deletions} -> - fun (Tx) -> process_deletions(Deletions, Tx) end + Serials = process_deletions(Deletions, transaction), + fun () -> process_deletions(Deletions, Serials) end end end). @@ -404,12 +408,6 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> anything_but(not_deleted, Deleted1, Deleted2), [Bindings1 | Bindings2]}. -process_addition(Src, _B, transaction) -> - serial(Src); - -process_addition(_Src, B, _Serial) -> - ok = rabbit_event:notify(binding_created, info(B)). - process_deletions(Deletions, transaction) -> process_deletions( fun (Mode, X, Bindings, Acc) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 5694336a81..e704a44c21 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -270,28 +270,36 @@ process_route(#resource{kind = queue} = QName, {WorkList, SeenXs, QNames}) -> {WorkList, SeenXs, [QName | QNames]}. -call_with_exchange(XName, Fun, PrePostCommitFun) -> +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_tx_with_tail( - fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end, - fun(Tx) -> PrePostCommitFun(Result, Tx) end + fun () -> case mnesia:read({rabbit_exchange, XName}) of + [] -> rabbit_misc:const({error, not_found}); + [X] -> Fun(X) + end end). delete(XName, IfUnused) -> + delete0(XName, case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end). + +delete0(XName, Fun) -> call_with_exchange( XName, - case IfUnused of - true -> fun conditional_delete/1; - false -> fun unconditional_delete/1 - end, - fun ({deleted, X, Bs, Deletions}, Tx) -> - rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions), Tx); - (Error = {error, _InUseOrNotFound}, _Tx) -> - Error + fun (X) -> + case Fun(X) of + {deleted, X, Bs, Deletions} -> + Dels1 = rabbit_binding:add_deletion( + XName, {X, deleted, Bs}, Deletions), + Serials = rabbit_binding:process_deletions( + Dels1, transaction), + fun () -> + rabbit_binding:process_deletions(Dels1, Serials) + end; + {error, _InUseOrNotFound} = E -> + rabbit_misc:const(E) + end end). maybe_auto_delete(#exchange{auto_delete = false}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3f0bc9bb38..45f599993a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -54,7 +54,7 @@ -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([lock_file/1]). --export([const_ok/1, const/1]). +-export([const_ok/0, const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -191,7 +191,7 @@ digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). --spec(const_ok/1 :: (any()) -> 'ok'). +-spec(const_ok/0 :: () -> 'ok'). -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -409,13 +409,8 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) -> execute_mnesia_tx_with_tail(TxFun) -> case mnesia:is_transaction() of true -> execute_mnesia_transaction(TxFun); - false -> {TailFun, TailRes} = execute_mnesia_transaction( - fun () -> - TailFun1 = TxFun(), - Res1 = TailFun1(transaction), - {TailFun1, Res1} - end), - TailFun(TailRes) + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() end. ensure_ok(ok, _) -> ok; @@ -847,8 +842,8 @@ lock_file(Path) -> ok = file:close(Lock) end. -const_ok(_) -> ok. -const(X) -> fun (_) -> X end. +const_ok() -> ok. +const(X) -> fun () -> X end. %% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see %% when IPv6 is enabled but not used (i.e. 99% of the time). |
