summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-25 15:07:48 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-25 15:07:48 +0100
commit6c8e27ad5c55d1e501d90325e22ad52333241c54 (patch)
tree47ae8b21996eb7157eb984809008b336587e4641
parent0f03cdddb9e954ff30cef612dc14c140f264d57d (diff)
downloadrabbitmq-server-git-6c8e27ad5c55d1e501d90325e22ad52333241c54.tar.gz
Replace exclusive_access_or_locked with something cleaner.
-rw-r--r--src/rabbit_channel.erl50
1 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3926d89d85..05740a9087 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -298,14 +298,22 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-exclusive_access_or_locked(ReaderPid, Q) ->
- case Q of
- #amqqueue{ exclusive_owner = none} -> Q;
- #amqqueue{ exclusive_owner = ReaderPid } -> Q;
- _ -> rabbit_misc:protocol_error(resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(Q#amqqueue.name)])
- end.
+with_exclusive_access_or_die(QName, ReaderPid, F) ->
+ rabbit_amqqueue:with_or_die(
+ QName,
+ fun(Q) ->
+ case Q of
+ #amqqueue{ exclusive_owner = none} ->
+ F(Q);
+ #amqqueue{ exclusive_owner = ReaderPid } ->
+ F(Q);
+ _ ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(Q#amqqueue.name)])
+ end
+ end).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -492,12 +500,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% In order to ensure that the consume_ok gets sent before
%% any messages are sent to the consumer, we get the queue
%% process to send the consume_ok on our behalf.
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- exclusive_access_or_locked(ReaderPid, Q),
- NoAck, self(), LimiterPid,
+ Q, NoAck, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -740,8 +747,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end,
- Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive),
+ Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
handle_method(#'queue.delete'{queue = QueueNameBin,
@@ -752,11 +758,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) ->
- rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q),
- IfUnused, IfEmpty)
+ rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -792,12 +797,9 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
_, State = #ch{ reader_pid = ReaderPid }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
- QueueName,
- fun (Q) ->
- exclusive_access_or_locked(ReaderPid, Q),
- rabbit_amqqueue:purge(Q)
- end),
+ {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ QueueName, ReaderPid,
+ fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});