From 38cde902ffe68eac8ffb0884bcc9c7bfa98c02ac Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 19 Dec 2008 19:34:45 +0000 Subject: Tagging RC5 for M4 release git-svn-id: https://svn.apache.org/repos/asf/qpid/tags/M4@728121 13f79535-47bb-0310-9956-ffa450edef68 --- RC5/ruby/lib/qpid/queue.rb | 101 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 RC5/ruby/lib/qpid/queue.rb (limited to 'RC5/ruby/lib/qpid/queue.rb') diff --git a/RC5/ruby/lib/qpid/queue.rb b/RC5/ruby/lib/qpid/queue.rb new file mode 100644 index 0000000000..4150173b53 --- /dev/null +++ b/RC5/ruby/lib/qpid/queue.rb @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Augment the standard python multithreaded Queue implementation to add a +# close() method so that threads blocking on the content of a queue can be +# notified if the queue is no longer in use. + +require 'thread' + +# Python nominally uses a bounded queue, but the code never establishes +# a maximum size; we therefore use Ruby's unbounded queue +class Qpid::Queue < ::Queue + + DONE = Object.new + STOP = Object.new + + def initialize + super + @error = nil + @listener = nil + @exc_listener = nil + @exc_listener_lock = Monitor.new + @thread = nil + end + + def close(error = nil) + @error = error + put(DONE) + unless @thread.nil? + @thread.join() + @thread = nil + end + end + + def get(block = true, timeout = nil) + unless timeout.nil? + raise NotImplementedError + end + result = pop(! block) + if result == DONE + # this guarantees that any other waiting threads or any future + # calls to get will also result in a Qpid::Closed exception + put(DONE) + raise Qpid::Closed.new(@error) + else + return result + end + end + + alias :put :push + + def exc_listen(&block) + @exc_listener_lock.synchronize do + @exc_listener = block + end + end + + def listen(&block) + if ! block_given? && @thread + put(STOP) + @thread.join() + @thread = nil + end + + # FIXME: There is a potential race since we could be changing one + # non-nil listener to another + @listener = block + + if block_given? && @thread.nil? + @thread = Thread.new do + loop do + begin + o = get() + break if o == STOP + @listener.call(o) + rescue Qpid::Closed => e + @exc_listener.call(e) if @exc_listener + break + end + end + end + end + end + +end -- cgit v1.2.1