diff options
| author | Simon MacMullen <simon@lshift.net> | 2010-05-25 15:07:48 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@lshift.net> | 2010-05-25 15:07:48 +0100 |
| commit | 6c8e27ad5c55d1e501d90325e22ad52333241c54 (patch) | |
| tree | 47ae8b21996eb7157eb984809008b336587e4641 | |
| parent | 0f03cdddb9e954ff30cef612dc14c140f264d57d (diff) | |
| download | rabbitmq-server-git-6c8e27ad5c55d1e501d90325e22ad52333241c54.tar.gz | |
Replace exclusive_access_or_locked with something cleaner.
| -rw-r--r-- | src/rabbit_channel.erl | 50 |
1 files changed, 26 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3926d89d85..05740a9087 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -298,14 +298,22 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -exclusive_access_or_locked(ReaderPid, Q) -> - case Q of - #amqqueue{ exclusive_owner = none} -> Q; - #amqqueue{ exclusive_owner = ReaderPid } -> Q; - _ -> rabbit_misc:protocol_error(resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(Q#amqqueue.name)]) - end. +with_exclusive_access_or_die(QName, ReaderPid, F) -> + rabbit_amqqueue:with_or_die( + QName, + fun(Q) -> + case Q of + #amqqueue{ exclusive_owner = none} -> + F(Q); + #amqqueue{ exclusive_owner = ReaderPid } -> + F(Q); + _ -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]) + end + end). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -492,12 +500,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% In order to ensure that the consume_ok gets sent before %% any messages are sent to the consumer, we get the queue %% process to send the consume_ok on our behalf. - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( - exclusive_access_or_locked(ReaderPid, Q), - NoAck, self(), LimiterPid, + Q, NoAck, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -740,8 +747,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - CheckExclusive = fun(Q) -> exclusive_access_or_locked(ReaderPid, Q) end, - Q = rabbit_amqqueue:with_or_die(QueueName, CheckExclusive), + Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun(Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); handle_method(#'queue.delete'{queue = QueueNameBin, @@ -752,11 +758,10 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> - rabbit_amqqueue:delete(exclusive_access_or_locked(ReaderPid, Q), - IfUnused, IfEmpty) + rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -792,12 +797,9 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{ reader_pid = ReaderPid }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( - QueueName, - fun (Q) -> - exclusive_access_or_locked(ReaderPid, Q), - rabbit_amqqueue:purge(Q) - end), + {ok, PurgedMessageCount} = with_exclusive_access_or_die( + QueueName, ReaderPid, + fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); |
