summaryrefslogtreecommitdiff
path: root/RC5/ruby/lib/qpid/queue.rb
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-12-19 19:34:45 +0000
commit38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac (patch)
tree3599403c0c9690898f1e336c009a5564c587c732 /RC5/ruby/lib/qpid/queue.rb
parenta8960649bcd365ef70a5de7812f5910222388a6d (diff)
downloadqpid-python-38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac.tar.gz
Tagging RC5 for M4 release
git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'RC5/ruby/lib/qpid/queue.rb')
-rw-r--r--RC5/ruby/lib/qpid/queue.rb101
1 files changed, 101 insertions, 0 deletions
diff --git a/RC5/ruby/lib/qpid/queue.rb b/RC5/ruby/lib/qpid/queue.rb
new file mode 100644
index 0000000000..4150173b53
--- /dev/null
+++ b/RC5/ruby/lib/qpid/queue.rb
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Augment the standard python multithreaded Queue implementation to add a
+# close() method so that threads blocking on the content of a queue can be
+# notified if the queue is no longer in use.
+
+require 'thread'
+
+# Python nominally uses a bounded queue, but the code never establishes
+# a maximum size; we therefore use Ruby's unbounded queue
+class Qpid::Queue < ::Queue
+
+ DONE = Object.new
+ STOP = Object.new
+
+ def initialize
+ super
+ @error = nil
+ @listener = nil
+ @exc_listener = nil
+ @exc_listener_lock = Monitor.new
+ @thread = nil
+ end
+
+ def close(error = nil)
+ @error = error
+ put(DONE)
+ unless @thread.nil?
+ @thread.join()
+ @thread = nil
+ end
+ end
+
+ def get(block = true, timeout = nil)
+ unless timeout.nil?
+ raise NotImplementedError
+ end
+ result = pop(! block)
+ if result == DONE
+ # this guarantees that any other waiting threads or any future
+ # calls to get will also result in a Qpid::Closed exception
+ put(DONE)
+ raise Qpid::Closed.new(@error)
+ else
+ return result
+ end
+ end
+
+ alias :put :push
+
+ def exc_listen(&block)
+ @exc_listener_lock.synchronize do
+ @exc_listener = block
+ end
+ end
+
+ def listen(&block)
+ if ! block_given? && @thread
+ put(STOP)
+ @thread.join()
+ @thread = nil
+ end
+
+ # FIXME: There is a potential race since we could be changing one
+ # non-nil listener to another
+ @listener = block
+
+ if block_given? && @thread.nil?
+ @thread = Thread.new do
+ loop do
+ begin
+ o = get()
+ break if o == STOP
+ @listener.call(o)
+ rescue Qpid::Closed => e
+ @exc_listener.call(e) if @exc_listener
+ break
+ end
+ end
+ end
+ end
+ end
+
+end