summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-28 15:41:19 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-28 15:41:19 +0100
commit241d1072358daa2824ab67de028ab862a2e0d578 (patch)
tree87034ddd4fbad73e5f91a69224db9915ca2ab4e6
parent3ce10894acda38fb92494d91196e2166faa7bf9f (diff)
downloadrabbitmq-server-git-241d1072358daa2824ab67de028ab862a2e0d578.tar.gz
Make the tail fun in execute_mnesia_tx_with_tail *only* get executed after the tx, and roll other uses into the tx fun. This is rather simpler hopefully.
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_binding.erl28
-rw-r--r--src/rabbit_exchange.erl40
-rw-r--r--src/rabbit_misc.erl17
4 files changed, 49 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 80dcb79a4a..60b7b38409 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -214,13 +214,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
- fun (Tx) ->
- R = B(Tx),
- case Tx of
- transaction -> R;
- _ -> Q
- end
- end;
+ fun () -> B(), Q end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -228,7 +222,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
false -> TailFun = internal_delete(QueueName),
- fun (Tx) -> TailFun(Tx), ExistingQ end
+ fun () -> TailFun(), ExistingQ end
end
end
end).
@@ -439,8 +433,10 @@ internal_delete(QueueName) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- fun (Tx) -> rabbit_binding:process_deletions(
- Deletions, Tx)
+ Serials = rabbit_binding:process_deletions(
+ Deletions, transaction),
+ fun () -> rabbit_binding:process_deletions(
+ Deletions, Serials)
end
end
end).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 1564573ecf..b765d5e135 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -122,20 +122,23 @@ add(Binding, InnerFun) ->
case InnerFun(Src, Dst) of
ok ->
case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(B, all_durable([Src, Dst]),
- fun mnesia:write/3),
- fun (Tx) ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
- process_addition(Src, B, Tx)
- end;
- [_] -> fun rabbit_misc:const_ok/1
+ [] -> add_notify(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
end;
{error, _} = Err ->
rabbit_misc:const(Err)
end
end).
+add_notify(Src, Dst, B) ->
+ ok = sync_binding(B, all_durable([Src, Dst]), fun mnesia:write/3),
+ ok = rabbit_exchange:callback(Src, add_binding, [transaction, Src, B]),
+ Serial = serial(Src),
+ fun () ->
+ ok = rabbit_exchange:callback(Src, add_binding, [Serial, Src, B]),
+ ok = rabbit_event:notify(binding_created, info(B))
+ end.
+
remove(Binding, InnerFun) ->
binding_action(
Binding,
@@ -160,7 +163,8 @@ remove(Binding, InnerFun) ->
{error, _} = Err ->
rabbit_misc:const(Err);
{ok, Deletions} ->
- fun (Tx) -> process_deletions(Deletions, Tx) end
+ Serials = process_deletions(Deletions, transaction),
+ fun () -> process_deletions(Deletions, Serials) end
end
end).
@@ -404,12 +408,6 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_addition(Src, _B, transaction) ->
- serial(Src);
-
-process_addition(_Src, B, _Serial) ->
- ok = rabbit_event:notify(binding_created, info(B)).
-
process_deletions(Deletions, transaction) ->
process_deletions(
fun (Mode, X, Bindings, Acc) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5694336a81..e704a44c21 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -270,28 +270,36 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun, PrePostCommitFun) ->
+call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> Result = case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end,
- fun(Tx) -> PrePostCommitFun(Result, Tx) end
+ fun () -> case mnesia:read({rabbit_exchange, XName}) of
+ [] -> rabbit_misc:const({error, not_found});
+ [X] -> Fun(X)
+ end
end).
delete(XName, IfUnused) ->
+ delete0(XName, case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end).
+
+delete0(XName, Fun) ->
call_with_exchange(
XName,
- case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- fun ({deleted, X, Bs, Deletions}, Tx) ->
- rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions), Tx);
- (Error = {error, _InUseOrNotFound}, _Tx) ->
- Error
+ fun (X) ->
+ case Fun(X) of
+ {deleted, X, Bs, Deletions} ->
+ Dels1 = rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions),
+ Serials = rabbit_binding:process_deletions(
+ Dels1, transaction),
+ fun () ->
+ rabbit_binding:process_deletions(Dels1, Serials)
+ end;
+ {error, _InUseOrNotFound} = E ->
+ rabbit_misc:const(E)
+ end
end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3f0bc9bb38..45f599993a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -54,7 +54,7 @@
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
--export([const_ok/1, const/1]).
+-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -191,7 +191,7 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
--spec(const_ok/1 :: (any()) -> 'ok').
+-spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
@@ -409,13 +409,8 @@ execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
execute_mnesia_tx_with_tail(TxFun) ->
case mnesia:is_transaction() of
true -> execute_mnesia_transaction(TxFun);
- false -> {TailFun, TailRes} = execute_mnesia_transaction(
- fun () ->
- TailFun1 = TxFun(),
- Res1 = TailFun1(transaction),
- {TailFun1, Res1}
- end),
- TailFun(TailRes)
+ false -> TailFun = execute_mnesia_transaction(TxFun),
+ TailFun()
end.
ensure_ok(ok, _) -> ok;
@@ -847,8 +842,8 @@ lock_file(Path) ->
ok = file:close(Lock)
end.
-const_ok(_) -> ok.
-const(X) -> fun (_) -> X end.
+const_ok() -> ok.
+const(X) -> fun () -> X end.
%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
%% when IPv6 is enabled but not used (i.e. 99% of the time).