diff options
| author | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-03-19 19:39:55 +0000 |
| commit | a96bf8ba7ce40d12ee4b3f85002133e1738225a4 (patch) | |
| tree | 13db6eefd1120c228c11ff7d94a500bbbd4d1289 /python/qpid | |
| parent | 27e6ef93eea10d1aeb7ca6a6a37926aa5f85c380 (diff) | |
| download | qpid-python-a96bf8ba7ce40d12ee4b3f85002133e1738225a4.tar.gz | |
Merged revisions 504590 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9
........
r504590 | gsim | 2007-02-07 10:36:01 -0500 (Wed, 07 Feb 2007) | 6 lines
Added support for receiving and sending of references
Added asynchronous mode to channels (responses can be tracked via a future, rather than blocking on each request)
Added ability to override server suggested connection tune params
Added two tests for reference functionality (more to follow)
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@520061 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid')
| -rw-r--r-- | python/qpid/client.py | 31 | ||||
| -rw-r--r-- | python/qpid/codec.py | 27 | ||||
| -rw-r--r-- | python/qpid/peer.py | 27 | ||||
| -rw-r--r-- | python/qpid/reference.py | 117 | ||||
| -rw-r--r-- | python/qpid/testlib.py | 4 |
5 files changed, 192 insertions, 14 deletions
diff --git a/python/qpid/client.py b/python/qpid/client.py index 20d1093878..ea6aa7901a 100644 --- a/python/qpid/client.py +++ b/python/qpid/client.py @@ -28,6 +28,7 @@ from delegate import Delegate from connection import Connection, Frame, connect from spec import load from queue import Queue +from reference import ReferenceId, References class Client: @@ -69,13 +70,14 @@ class Client: self.lock.release() return q - def start(self, response, mechanism="AMQPLAIN", locale="en_US"): + def start(self, response, mechanism="AMQPLAIN", locale="en_US", tune_params=None): self.mechanism = mechanism self.response = response self.locale = locale + self.tune_params = tune_params self.conn = Connection(connect(self.host, self.port), self.spec) - self.peer = Peer(self.conn, ClientDelegate(self)) + self.peer = Peer(self.conn, ClientDelegate(self), self.opened) self.conn.init() self.peer.start() @@ -85,6 +87,9 @@ class Client: def channel(self, id): return self.peer.channel(id) + def opened(self, ch): + ch.references = References() + class ClientDelegate(Delegate): def __init__(self, client): @@ -97,9 +102,29 @@ class ClientDelegate(Delegate): locale=self.client.locale) def connection_tune(self, ch, msg): - msg.tune_ok(*msg.frame.args) + if self.client.tune_params: + #todo: just override the params, i.e. don't require them + # all to be included in tune_params + msg.tune_ok(**self.client.tune_params) + else: + msg.tune_ok(*msg.frame.args) self.client.started.set() + def message_transfer(self, ch, msg): + if isinstance(msg.body, ReferenceId): + self.client.queue(msg.destination).put(ch.references.get(msg.body.id)) + else: + self.client.queue(msg.destination).put(msg) + + def message_open(self, ch, msg): + ch.references.open(msg.reference) + + def message_close(self, ch, msg): + ch.references.close(msg.reference) + + def message_append(self, ch, msg): + ch.references.get(msg.reference).append(msg.bytes) + def basic_deliver(self, ch, msg): self.client.queue(msg.consumer_tag).put(msg) diff --git a/python/qpid/codec.py b/python/qpid/codec.py index 205405894a..3c1e73c2e6 100644 --- a/python/qpid/codec.py +++ b/python/qpid/codec.py @@ -26,6 +26,7 @@ fields. from cStringIO import StringIO from struct import * +from reference import ReferenceId class EOF(Exception): pass @@ -195,14 +196,24 @@ class Codec: return self.decode_longlong() def encode_content(self, s): - # XXX - self.encode_octet(0) - self.encode_longstr(s) - - def decode_content(self): - # XXX - self.decode_octet() - return self.decode_longstr() + # content can be passed as a string in which case it is assumed to + # be inline data, or as an instance of ReferenceId indicating it is + # a reference id + if isinstance(s, ReferenceId): + self.encode_octet(1) + self.encode_longstr(s.id) + else: + self.encode_octet(0) + self.encode_longstr(s) + + def decode_content(self): + # return a string for inline data and a ReferenceId instance for + # references + type = self.decode_octet() + if type == 0: + return self.decode_longstr() + else: + return ReferenceId(self.decode_longstr()) def test(type, value): if isinstance(value, (list, tuple)): diff --git a/python/qpid/peer.py b/python/qpid/peer.py index b5c655dc2a..6c8c6647c9 100644 --- a/python/qpid/peer.py +++ b/python/qpid/peer.py @@ -50,13 +50,14 @@ class Sequence: class Peer: - def __init__(self, conn, delegate): + def __init__(self, conn, delegate, channel_callback=None): self.conn = conn self.delegate = delegate self.outgoing = Queue(0) self.work = Queue(0) self.channels = {} self.lock = thread.allocate_lock() + self.channel_callback = channel_callback #notified when channels are created def channel(self, id): self.lock.acquire() @@ -66,6 +67,8 @@ class Peer: except KeyError: ch = Channel(id, self.outgoing, self.conn.spec) self.channels[id] = ch + if self.channel_callback: + self.channel_callback(ch) finally: self.lock.release() return ch @@ -177,6 +180,7 @@ class Channel: # XXX: better switch self.reliable = False + self.synchronous = True def close(self, reason): if self.closed: @@ -238,6 +242,12 @@ class Channel: content = kwargs.pop("content", None) frame = Method(type, type.arguments(*args, **kwargs)) if self.reliable: + if not self.synchronous: + future = Future() + self.request(frame, future.put_response, content) + if not frame.method.responses: return None + else: return future + self.request(frame, self.queue_response, content) if not frame.method.responses: return None @@ -304,3 +314,18 @@ def read_content(queue): buf.write(content) read += len(content) return Content(buf.getvalue(), children, header.properties.copy()) + +class Future: + def __init__(self): + self.completed = threading.Event() + + def put_response(self, channel, response): + self.response = response + self.completed.set() + + def get_response(self, timeout=None): + self.completed.wait(timeout) + return self.response + + def is_complete(self): + return self.completed.isSet() diff --git a/python/qpid/reference.py b/python/qpid/reference.py new file mode 100644 index 0000000000..d357560390 --- /dev/null +++ b/python/qpid/reference.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +Support for amqp 'reference' content (as opposed to inline content) +""" + +import threading +from queue import Queue, Closed + +class NotOpened(Exception): pass + +class AlreadyOpened(Exception): pass + +""" +A representation of a reference id; can be passed wherever amqp +content is required in place of inline data +""" +class ReferenceId: + + def __init__(self, id): + self.id = id + +""" +Holds content received through 'reference api'. Instances of this +class will be placed in the consumers queue on receiving a transfer +(assuming the reference has been opened). Data can be retrieved in +chunks (as append calls are received) or in full (after reference has +been closed signalling data s complete). +""" + +class Reference: + + def __init__(self, id): + self.id = id + self.chunks = Queue(0) + + def close(self): + self.chunks.close() + + def append(self, bytes): + self.chunks.put(bytes) + + def get_chunk(self): + return self.chunks.get() + + def get_complete(self): + data = "" + for chunk in self: + data += chunk + return data + + def next(self): + try: + return self.get_chunk() + except Closed, e: + raise StopIteration + + def __iter__(self): + return self + +""" +Manages a set of opened references. New references can be opened and +existing references can be retrieved or closed. +""" +class References: + + def __init__(self): + self.map = {} + self.lock = threading.Lock() + + def get(self, id): + self.lock.acquire() + try: + try: + ref = self.map[id] + except KeyError: + raise NotOpened() + finally: + self.lock.release() + return ref + + def open(self, id): + self.lock.acquire() + try: + if id in self.map: raise AlreadyOpened() + self.map[id] = Reference(id) + finally: + self.lock.release() + + + def close(self, id): + self.get(id).close() + self.lock.acquire() + try: + del map[id] + finally: + self.lock.release() + diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index 708ebcdcb9..dcbf0ed91c 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -145,7 +145,7 @@ Options: print "=======================================" return result.wasSuccessful() - def connect(self, host=None, port=None, spec=None, user=None, password=None): + def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): """Connect to the broker, returns a qpid.client.Client""" host = host or self.host port = port or self.port @@ -153,7 +153,7 @@ Options: user = user or self.user password = password or self.password client = qpid.client.Client(host, port, spec) - client.start({"LOGIN": user, "PASSWORD": password}) + client.start({"LOGIN": user, "PASSWORD": password}, tune_params=tune_params) return client |
