diff options
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/driver.py | 446 | ||||
| -rw-r--r-- | qpid/python/qpid/lockable.py | 68 | ||||
| -rw-r--r-- | qpid/python/qpid/messaging.py | 617 | ||||
| -rw-r--r-- | qpid/python/qpid/util.py | 6 |
4 files changed, 597 insertions, 540 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py new file mode 100644 index 0000000000..13aba6320b --- /dev/null +++ b/qpid/python/qpid/driver.py @@ -0,0 +1,446 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import compat, connection, socket, sys, time +from datatypes import RangedSet, Message as Message010 +from lockable import synchronized, Lockable +from logging import getLogger +from messaging import get_codec, Message, Pattern, UNLIMITED +from ops import delivery_mode +from session import Client, INCOMPLETE, SessionDetached +from threading import Condition, Thread +from util import connect + +log = getLogger("qpid.messaging") + +def parse_addr(address): + parts = address.split("/", 1) + if len(parts) == 1: + return parts[0], None + else: + return parts[0], parts[i1] + +def reply_to2addr(reply_to): + if reply_to.routing_key is None: + return reply_to.exchange + elif reply_to.exchange in (None, ""): + return reply_to.routing_key + else: + return "%s/%s" % (reply_to.exchange, reply_to.routing_key) + +class Attachment: + + def __init__(self, target): + self.target = target + +DURABLE_DEFAULT=True + +FILTER_DEFAULTS = { + "topic": Pattern("*") + } + +def delegate(handler, session): + class Delegate(Client): + + def message_transfer(self, cmd): + return handler._message_transfer(session, cmd) + return Delegate + +class Driver(Lockable): + + def __init__(self, connection): + self.connection = connection + self._lock = self.connection._lock + self._condition = self.connection._condition + self._wakeup_cond = Condition() + self._socket = None + self._conn = None + self._connected = False + self._attachments = {} + self._modcount = self.connection._modcount + self.thread = Thread(target=self.run) + self.thread.setDaemon(True) + # XXX: need to figure out how to join on this thread + + def start(self): + self.thread.start() + + def wakeup(self): + self._wakeup_cond.acquire() + try: + self._wakeup_cond.notifyAll() + finally: + self._wakeup_cond.release() + + def start(self): + self.thread.start() + + def run(self): + while True: + self._wakeup_cond.acquire() + try: + if self.connection._modcount <= self._modcount: + self._wakeup_cond.wait(10) + finally: + self._wakeup_cond.release() + self.dispatch(self.connection._modcount) + + @synchronized + def dispatch(self, modcount): + try: + if self._conn is None and self.connection._connected: + self.connect() + elif self._conn is not None and not self.connection._connected: + self.disconnect() + + if self._conn is not None: + for ssn in self.connection.sessions.values(): + self.attach(ssn) + self.process(ssn) + + exi = None + except: + exi = sys.exc_info() + + if exi: + msg = compat.format_exc() + recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", + "Bad file descriptor", "start timed out", "Broken pipe"] + for r in recoverable: + if self.connection.reconnect and r in msg: + print "waiting to retry" + self.reset() + time.sleep(3) + print "retrying..." + return + else: + self.connection.error = (msg,) + + self._modcount = modcount + self.notifyAll() + + def connect(self): + if self._conn is not None: + return + try: + self._socket = connect(self.connection.host, self.connection.port) + except socket.error, e: + raise ConnectError(e) + self._conn = connection.Connection(self._socket) + try: + self._conn.start(timeout=10) + self._connected = True + except connection.VersionError, e: + raise ConnectError(e) + except Timeout: + print "start timed out" + raise ConnectError("start timed out") + + def disconnect(self): + self._conn.close() + self.reset() + + def reset(self): + self._conn = None + self._connected = False + self._attachments.clear() + for ssn in self.connection.sessions.values(): + for m in ssn.acked + ssn.unacked + ssn.incoming: + m._transfer_id = None + for rcv in ssn.receivers: + rcv.impending = rcv.received + + def connected(self): + return self._conn is not None + + def attach(self, ssn): + _ssn = self._attachments.get(ssn) + if _ssn is None: + _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) + _ssn.auto_sync = False + _ssn.invoke_lock = self._lock + _ssn.lock = self._lock + _ssn.condition = self._condition + if ssn.transactional: + # XXX: adding an attribute to qpid.session.Session + _ssn.acked = [] + _ssn.tx_select() + self._attachments[ssn] = _ssn + + for snd in ssn.senders: + self.link_out(snd) + for rcv in ssn.receivers: + self.link_in(rcv) + + if ssn.closing: + _ssn.close() + del self._attachments[ssn] + + def _exchange_query(self, ssn, address): + # XXX: auto sync hack is to avoid deadlock on future + result = ssn.exchange_query(name=address, sync=True) + ssn.sync() + return result.get() + + def link_out(self, snd): + _ssn = self._attachments[snd.session] + _snd = self._attachments.get(snd) + if _snd is None: + _snd = Attachment(snd) + node, _snd._subject = parse_addr(snd.target) + result = self._exchange_query(_ssn, node) + if result.not_found: + # XXX: should check 'create' option + _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) + _ssn.sync() + _snd._exchange = "" + _snd._routing_key = node + else: + _snd._exchange = node + _snd._routing_key = _snd._subject + self._attachments[snd] = _snd + + if snd.closed: + del self._attachments[snd] + return None + else: + return _snd + + def link_in(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self._attachments.get(rcv) + if _rcv is None: + _rcv = Attachment(rcv) + result = self._exchange_query(_ssn, rcv.source) + if result.not_found: + _rcv._queue = rcv.source + # XXX: should check 'create' option + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) + else: + _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) + _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) + if rcv.filter is None: + f = FILTER_DEFAULTS[result.type] + else: + f = rcv.filter + f._bind(_ssn, rcv.source, _rcv._queue) + _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) + _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) + self._attachments[rcv] = _rcv + # XXX: need to kill syncs + _ssn.sync() + + if rcv.closing: + _ssn.message_cancel(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + del self._attachments[rcv] + rcv.closed = True + return None + else: + return _rcv + + def process(self, ssn): + if ssn.closing: return + + _ssn = self._attachments[ssn] + + while ssn.outgoing: + msg = ssn.outgoing[0] + snd = msg._sender + self.send(snd, msg) + ssn.outgoing.pop(0) + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + if ssn.acked: + messages = ssn.acked[:] + ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) + for range in ids: + _ssn.receiver._completed.add_range(range) + ch = _ssn.channel + if ch is None: + raise SessionDetached() + ch.session_completed(_ssn.receiver._completed) + _ssn.message_accept(ids, sync=True) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + + # XXX: we're ignoring acks that get lost when disconnected + for m in messages: + ssn.acked.remove(m) + if ssn.transactional: + _ssn.acked.append(m) + + if ssn.committing: + _ssn.tx_commit(sync=True) + # XXX: need to kill syncs + _ssn.sync() + del _ssn.acked[:] + ssn.committing = False + ssn.committed = True + ssn.aborting = False + ssn.aborted = False + + if ssn.aborting: + for rcv in ssn.receivers: + _ssn.message_stop(rcv.destination) + _ssn.sync() + + messages = _ssn.acked + ssn.unacked + ssn.incoming + ids = RangedSet(*[m._transfer_id for m in messages]) + for range in ids: + _ssn.receiver._completed.add_range(range) + _ssn.channel.session_completed(_ssn.receiver._completed) + _ssn.message_release(ids) + _ssn.tx_rollback(sync=True) + _ssn.sync() + + del ssn.incoming[:] + del ssn.unacked[:] + del _ssn.acked[:] + + for rcv in ssn.receivers: + rcv.impending = rcv.received + rcv.returned = rcv.received + # XXX: do we need to update granted here as well? + + for rcv in ssn.receivers: + self.process_receiver(rcv) + + ssn.aborting = False + ssn.aborted = True + ssn.committing = False + ssn.committed = False + + def grant(self, rcv): + _ssn = self._attachments[rcv.session] + _rcv = self.link_in(rcv) + + if rcv.granted is UNLIMITED: + if rcv.impending is UNLIMITED: + delta = 0 + else: + delta = UNLIMITED + elif rcv.impending is UNLIMITED: + delta = -1 + else: + delta = max(rcv.granted, rcv.received) - rcv.impending + + if delta is UNLIMITED: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) + rcv.impending = UNLIMITED + elif delta > 0: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) + rcv.impending += delta + elif delta < 0: + if rcv.drain: + _ssn.message_flush(rcv.destination, sync=True) + else: + _ssn.message_stop(rcv.destination, sync=True) + # XXX: need to kill syncs + _ssn.sync() + rcv.impending = rcv.received + self.grant(rcv) + + def process_receiver(self, rcv): + if rcv.closed: return + self.grant(rcv) + + def send(self, snd, msg): + _ssn = self._attachments[snd.session] + _snd = self.link_out(snd) + + # XXX: what if subject is specified for a normal queue? + if _snd._routing_key is None: + rk = msg.subject + else: + rk = _snd._routing_key + # XXX: do we need to query to figure out how to create the reply-to interoperably? + if msg.reply_to: + rt = _ssn.reply_to(*parse_addr(msg.reply_to)) + else: + rt = None + dp = _ssn.delivery_properties(routing_key=rk) + mp = _ssn.message_properties(message_id=msg.id, + user_id=msg.user_id, + reply_to=rt, + correlation_id=msg.correlation_id, + content_type=msg.content_type, + application_headers=msg.properties) + if msg.subject is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["subject"] = msg.subject + if msg.to is not None: + if mp.application_headers is None: + mp.application_headers = {} + mp.application_headers["to"] = msg.to + if msg.durable: + dp.delivery_mode = delivery_mode.persistent + enc, dec = get_codec(msg.content_type) + body = enc(msg.content) + _ssn.message_transfer(destination=_snd._exchange, + message=Message010(dp, mp, body), + sync=True) + log.debug("SENT [%s] %s", snd.session, msg) + # XXX: really need to make this async so that we don't give up the lock + _ssn.sync() + # XXX: should we log the ack somehow too? + snd.acked += 1 + + @synchronized + def _message_transfer(self, ssn, cmd): + m = Message010(cmd.payload) + m.headers = cmd.headers + m.id = cmd.id + msg = self._decode(m) + rcv = ssn.receivers[int(cmd.destination)] + msg._receiver = rcv + if rcv.impending is not UNLIMITED: + assert rcv.received < rcv.impending + rcv.received += 1 + log.debug("RECV [%s] %s", ssn, msg) + ssn.incoming.append(msg) + self.notifyAll() + return INCOMPLETE + + def _decode(self, message): + dp = message.get("delivery_properties") + mp = message.get("message_properties") + ap = mp.application_headers + enc, dec = get_codec(mp.content_type) + content = dec(message.body) + msg = Message(content) + msg.id = mp.message_id + if ap is not None: + msg.to = ap.get("to") + msg.subject = ap.get("subject") + msg.user_id = mp.user_id + if mp.reply_to is not None: + msg.reply_to = reply_to2addr(mp.reply_to) + msg.correlation_id = mp.correlation_id + msg.durable = dp.delivery_mode == delivery_mode.persistent + msg.properties = mp.application_headers + msg.content_type = mp.content_type + msg._transfer_id = message.id + return msg diff --git a/qpid/python/qpid/lockable.py b/qpid/python/qpid/lockable.py new file mode 100644 index 0000000000..0415d53e27 --- /dev/null +++ b/qpid/python/qpid/lockable.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import inspect, time + +def synchronized(meth): + args, vargs, kwargs, defs = inspect.getargspec(meth) + scope = {} + scope["meth"] = meth + exec """ +def %s%s: + %s + %s.lock() + try: + return meth%s + finally: + %s.unlock() +""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), + repr(inspect.getdoc(meth)), args[0], + inspect.formatargspec(args, vargs, kwargs, defs, + formatvalue=lambda x: ""), + args[0]) in scope + return scope[meth.__name__] + +class Lockable(object): + + def lock(self): + self._lock.acquire() + + def unlock(self): + self._lock.release() + + def wait(self, predicate, timeout=None): + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + # using the timed wait prevents keyboard interrupts from being + # blocked while waiting + self._condition.wait(3) + elif passed < timeout: + self._condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + + def notify(self): + self._condition.notify() + + def notifyAll(self): + self._condition.notifyAll() diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 8e14072c59..d6d8a9ec04 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,74 +30,19 @@ Areas that still need work: - protocol negotiation/multiprotocol impl """ -import connection, time, socket, sys, compat, inspect from codec010 import StringCodec -from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial +from datatypes import timestamp, uuid4, Serial from exceptions import Timeout +from lockable import synchronized, Lockable from logging import getLogger -from ops import PRIMITIVE, delivery_mode -from session import Client, INCOMPLETE, SessionDetached +from ops import PRIMITIVE from threading import Thread, RLock, Condition -from util import connect +from util import default log = getLogger("qpid.messaging") static = staticmethod -def synchronized(meth): - args, vargs, kwargs, defs = inspect.getargspec(meth) - scope = {} - scope["meth"] = meth - exec """ -def %s%s: - %s - %s.lock() - try: - return meth%s - finally: - %s.unlock() -""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), - repr(inspect.getdoc(meth)), args[0], - inspect.formatargspec(args, vargs, kwargs, defs, - formatvalue=lambda x: ""), - args[0]) in scope - return scope[meth.__name__] - -class Lockable(object): - - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - - def wait(self, predicate, timeout=None): - passed = 0 - start = time.time() - while not predicate(): - if timeout is None: - # using the timed wait prevents keyboard interrupts from being - # blocked while waiting - self._condition.wait(3) - elif passed < timeout: - self._condition.wait(timeout - passed) - else: - return False - passed = time.time() - start - return True - - def notify(self): - self._condition.notify() - - def notifyAll(self): - self._condition.notifyAll() - -def default(value, default): - if value is None: - return default - else: - return value - AMQP_PORT = 5672 AMQPS_PORT = 5671 @@ -113,9 +58,16 @@ class Constant: UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) class ConnectionError(Exception): + """ + The base class for all connection related exceptions. + """ pass class ConnectError(ConnectionError): + """ + Exception raised when there is an error connecting to the remote + peer. + """ pass class Connection(Lockable): @@ -165,25 +117,26 @@ class Connection(Lockable): self._condition = Condition(self._lock) self._modcount = Serial(0) self.error = None + from driver import Driver self._driver = Driver(self) self._driver.start() - def wakeup(self): + def _wakeup(self): self._modcount += 1 self._driver.wakeup() - def catchup(self, exc=ConnectionError): + def _catchup(self, exc=ConnectionError): mc = self._modcount self.wait(lambda: not self._driver._modcount < mc) - self.check_error(exc) + self._check_error(exc) - def check_error(self, exc=ConnectionError): + def _check_error(self, exc=ConnectionError): if self.error: raise exc(*self.error) - def ewait(self, predicate, timeout=None, exc=ConnectionError): + def _ewait(self, predicate, timeout=None, exc=ConnectionError): result = self.wait(lambda: self.error or predicate(), timeout) - self.check_error(exc) + self._check_error(exc) return result @synchronized @@ -210,7 +163,7 @@ class Connection(Lockable): else: ssn = Session(self, name, self.started, transactional=transactional) self.sessions[name] = ssn - self.wakeup() + self._wakeup() return ssn @synchronized @@ -223,8 +176,8 @@ class Connection(Lockable): Connect to the remote endpoint. """ self._connected = True - self.wakeup() - self.ewait(lambda: self._driver._connected, exc=ConnectError) + self._wakeup() + self._ewait(lambda: self._driver._connected, exc=ConnectError) @synchronized def disconnect(self): @@ -232,8 +185,8 @@ class Connection(Lockable): Disconnect from the remote endpoint. """ self._connected = False - self.wakeup() - self.ewait(lambda: not self._driver._connected) + self._wakeup() + self._ewait(lambda: not self._driver._connected) @synchronized def connected(self): @@ -283,17 +236,6 @@ class Pattern: ssn.exchange_bind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#")) -FILTER_DEFAULTS = { - "topic": Pattern("*") - } - -def delegate(handler, session): - class Delegate(Client): - - def message_transfer(self, cmd): - handler._message_transfer(session, cmd) - return Delegate - class SessionError(Exception): pass @@ -354,17 +296,17 @@ class Session(Lockable): def __repr__(self): return "<Session %s>" % self.name - def wakeup(self): - self.connection.wakeup() + def _wakeup(self): + self.connection._wakeup() - def catchup(self, exc=SessionError): - self.connection.catchup(exc) + def _catchup(self, exc=SessionError): + self.connection._catchup(exc) - def check_error(self, exc=SessionError): - self.connection.check_error(exc) + def _check_error(self, exc=SessionError): + self.connection._check_error(exc) - def ewait(self, predicate, timeout=None, exc=SessionError): - return self.connection.ewait(predicate, timeout, exc) + def _ewait(self, predicate, timeout=None, exc=SessionError): + return self.connection._ewait(predicate, timeout, exc) @synchronized def sender(self, target): @@ -379,7 +321,7 @@ class Session(Lockable): """ sender = Sender(self, len(self.senders), target) self.senders.append(sender) - self.wakeup() + self._wakeup() # XXX: because of the lack of waiting here we can end up getting # into the driver loop with messages sent for senders that haven't # been linked yet, something similar can probably happen for @@ -400,7 +342,7 @@ class Session(Lockable): receiver = Receiver(self, len(self.receivers), source, filter, self.started) self.receivers.append(receiver) - self.wakeup() + self._wakeup() return receiver @synchronized @@ -459,14 +401,14 @@ class Session(Lockable): if self.ack_capacity <= 0: # XXX: this is currently a SendError, maybe it should be a SessionError? raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) - self.wakeup() - self.ewait(lambda: len(self.acked) < self.ack_capacity) + self._wakeup() + self._ewait(lambda: len(self.acked) < self.ack_capacity) self.unacked.remove(m) self.acked.append(m) - self.wakeup() + self._wakeup() if sync: - self.ewait(lambda: not [m for m in messages if m in self.acked]) + self._ewait(lambda: not [m for m in messages if m in self.acked]) @synchronized def commit(self): @@ -477,8 +419,8 @@ class Session(Lockable): if not self.transactional: raise NontransactionalSession() self.committing = True - self.wakeup() - self.ewait(lambda: not self.committing) + self._wakeup() + self._ewait(lambda: not self.committing) if self.aborted: raise TransactionAborted() assert self.committed @@ -492,8 +434,8 @@ class Session(Lockable): if not self.transactional: raise NontransactionalSession() self.aborting = True - self.wakeup() - self.ewait(lambda: not self.aborting) + self._wakeup() + self._ewait(lambda: not self.aborting) assert self.aborted @synchronized @@ -543,31 +485,16 @@ class Session(Lockable): link.close() self.closing = True - self.wakeup() - self.catchup() + self._wakeup() + self._catchup() self.wait(lambda: self.closed) while self.thread.isAlive(): self.thread.join(3) self.thread = None # XXX: should be able to express this condition through API calls - self.ewait(lambda: not self.outgoing and not self.acked) + self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) -def parse_addr(address): - parts = address.split("/", 1) - if len(parts) == 1: - return parts[0], None - else: - return parts[0], parts[i1] - -def reply_to2addr(reply_to): - if reply_to.routing_key is None: - return reply_to.exchange - elif reply_to.exchange in (None, ""): - return reply_to.routing_key - else: - return "%s/%s" % (reply_to.exchange, reply_to.routing_key) - class SendError(SessionError): pass @@ -591,17 +518,17 @@ class Sender(Lockable): self._lock = self.session._lock self._condition = self.session._condition - def wakeup(self): - self.session.wakeup() + def _wakeup(self): + self.session._wakeup() - def catchup(self, exc=SendError): - self.session.catchup(exc) + def _catchup(self, exc=SendError): + self.session._catchup(exc) - def check_error(self, exc=SendError): - self.session.check_error(exc) + def _check_error(self, exc=SendError): + self.session._check_error(exc) - def ewait(self, predicate, timeout=None, exc=SendError): - return self.session.ewait(predicate, timeout, exc) + def _ewait(self, predicate, timeout=None, exc=SendError): + return self.session._ewait(predicate, timeout, exc) @synchronized def pending(self): @@ -638,7 +565,7 @@ class Sender(Lockable): if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) - self.ewait(lambda: self.pending() < self.capacity) + self._ewait(lambda: self.pending() < self.capacity) # XXX: what if we send the same message to multiple senders? message._sender = self @@ -646,10 +573,10 @@ class Sender(Lockable): self.queued += 1 mno = self.queued - self.wakeup() + self._wakeup() if sync: - self.ewait(lambda: self.acked >= mno) + self._ewait(lambda: self.acked >= mno) assert message not in self.session.outgoing @synchronized @@ -701,17 +628,17 @@ class Receiver(Lockable): self._lock = self.session._lock self._condition = self.session._condition - def wakeup(self): - self.session.wakeup() + def _wakeup(self): + self.session._wakeup() - def catchup(self, exc=ReceiveError): - self.session.catchup() + def _catchup(self, exc=ReceiveError): + self.session._catchup() - def check_error(self, exc=ReceiveError): - self.session.check_error(exc) + def _check_error(self, exc=ReceiveError): + self.session._check_error(exc) - def ewait(self, predicate, timeout=None, exc=ReceiveError): - return self.session.ewait(predicate, timeout, exc) + def _ewait(self, predicate, timeout=None, exc=ReceiveError): + return self.session._ewait(predicate, timeout, exc) @synchronized def pending(self): @@ -757,23 +684,23 @@ class Receiver(Lockable): """ if self._capacity() == 0: self.granted = self.returned + 1 - self.wakeup() - self.ewait(lambda: self.impending >= self.granted) + self._wakeup() + self._ewait(lambda: self.impending >= self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: self.drain = True self.granted = self.received - self.wakeup() - self.ewait(lambda: self.impending == self.received) + self._wakeup() + self._ewait(lambda: self.impending == self.received) self.drain = False self._grant() - self.wakeup() + self._wakeup() msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() elif self._capacity() not in (0, UNLIMITED.value): self.granted += 1 - self.wakeup() + self._wakeup() return msg def _grant(self): @@ -793,7 +720,7 @@ class Receiver(Lockable): """ self.started = True self._grant() - self.wakeup() + self._wakeup() @synchronized def stop(self): @@ -802,8 +729,8 @@ class Receiver(Lockable): """ self.started = False self._grant() - self.wakeup() - self.ewait(lambda: self.impending == self.received) + self._wakeup() + self._ewait(lambda: self.impending == self.received) @synchronized def close(self): @@ -811,9 +738,9 @@ class Receiver(Lockable): Close the receiver. """ self.closing = True - self.wakeup() + self._wakeup() try: - self.ewait(lambda: self.closed) + self._ewait(lambda: self.closed) finally: self.session.receivers.remove(self) @@ -900,397 +827,7 @@ class Message: def __repr__(self): return "Message(%r)" % self.content -class Attachment: - - def __init__(self, target): - self.target = target - -DURABLE_DEFAULT=True - -class Driver(Lockable): - - def __init__(self, connection): - self.connection = connection - self._lock = self.connection._lock - self._condition = self.connection._condition - self._wakeup_cond = Condition() - self._socket = None - self._conn = None - self._connected = False - self._attachments = {} - self._modcount = self.connection._modcount - self.thread = Thread(target=self.run) - self.thread.setDaemon(True) - # XXX: need to figure out how to join on this thread - - def start(self): - self.thread.start() - - def wakeup(self): - self._wakeup_cond.acquire() - try: - self._wakeup_cond.notifyAll() - finally: - self._wakeup_cond.release() - - def start(self): - self.thread.start() - - def run(self): - while True: - self._wakeup_cond.acquire() - try: - if self.connection._modcount <= self._modcount: - self._wakeup_cond.wait(10) - finally: - self._wakeup_cond.release() - self.dispatch(self.connection._modcount) - - @synchronized - def dispatch(self, modcount): - try: - if self._conn is None and self.connection._connected: - self.connect() - elif self._conn is not None and not self.connection._connected: - self.disconnect() - - if self._conn is not None: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) - - exi = None - except: - exi = sys.exc_info() - - if exi: - msg = compat.format_exc() - recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer", - "Bad file descriptor", "start timed out", "Broken pipe"] - for r in recoverable: - if self.connection.reconnect and r in msg: - print "waiting to retry" - self.reset() - time.sleep(3) - print "retrying..." - return - else: - self.connection.error = (msg,) - - self._modcount = modcount - self.notifyAll() - - def connect(self): - if self._conn is not None: - return - try: - self._socket = connect(self.connection.host, self.connection.port) - except socket.error, e: - raise ConnectError(e) - self._conn = connection.Connection(self._socket) - try: - self._conn.start(timeout=10) - self._connected = True - except connection.VersionError, e: - raise ConnectError(e) - except Timeout: - print "start timed out" - raise ConnectError("start timed out") - - def disconnect(self): - self._conn.close() - self.reset() - - def reset(self): - self._conn = None - self._connected = False - self._attachments.clear() - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for rcv in ssn.receivers: - rcv.impending = rcv.received - - def connected(self): - return self._conn is not None - - def attach(self, ssn): - _ssn = self._attachments.get(ssn) - if _ssn is None: - _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn)) - _ssn.auto_sync = False - _ssn.invoke_lock = self._lock - _ssn.lock = self._lock - _ssn.condition = self._condition - if ssn.transactional: - # XXX: adding an attribute to qpid.session.Session - _ssn.acked = [] - _ssn.tx_select() - self._attachments[ssn] = _ssn - - for snd in ssn.senders: - self.link_out(snd) - for rcv in ssn.receivers: - self.link_in(rcv) - - if ssn.closing: - _ssn.close() - del self._attachments[ssn] - - def _exchange_query(self, ssn, address): - # XXX: auto sync hack is to avoid deadlock on future - result = ssn.exchange_query(name=address, sync=True) - ssn.sync() - return result.get() - - def link_out(self, snd): - _ssn = self._attachments[snd.session] - _snd = self._attachments.get(snd) - if _snd is None: - _snd = Attachment(snd) - node, _snd._subject = parse_addr(snd.target) - result = self._exchange_query(_ssn, node) - if result.not_found: - # XXX: should check 'create' option - _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True) - _ssn.sync() - _snd._exchange = "" - _snd._routing_key = node - else: - _snd._exchange = node - _snd._routing_key = _snd._subject - self._attachments[snd] = _snd - - if snd.closed: - del self._attachments[snd] - return None - else: - return _snd - - def link_in(self, rcv): - _ssn = self._attachments[rcv.session] - _rcv = self._attachments.get(rcv) - if _rcv is None: - _rcv = Attachment(rcv) - result = self._exchange_query(_ssn, rcv.source) - if result.not_found: - _rcv._queue = rcv.source - # XXX: should check 'create' option - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT) - else: - _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination) - _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True) - if rcv.filter is None: - f = FILTER_DEFAULTS[result.type] - else: - f = rcv.filter - f._bind(_ssn, rcv.source, _rcv._queue) - _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination) - _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True) - self._attachments[rcv] = _rcv - # XXX: need to kill syncs - _ssn.sync() - - if rcv.closing: - _ssn.message_cancel(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - del self._attachments[rcv] - rcv.closed = True - return None - else: - return _rcv - - def process(self, ssn): - if ssn.closing: return - - _ssn = self._attachments[ssn] - - while ssn.outgoing: - msg = ssn.outgoing[0] - snd = msg._sender - self.send(snd, msg) - ssn.outgoing.pop(0) - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - if ssn.acked: - messages = ssn.acked[:] - ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None]) - for range in ids: - _ssn.receiver._completed.add_range(range) - ch = _ssn.channel - if ch is None: - raise SessionDetached() - ch.session_completed(_ssn.receiver._completed) - _ssn.message_accept(ids, sync=True) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - - # XXX: we're ignoring acks that get lost when disconnected - for m in messages: - ssn.acked.remove(m) - if ssn.transactional: - _ssn.acked.append(m) - - if ssn.committing: - _ssn.tx_commit(sync=True) - # XXX: need to kill syncs - _ssn.sync() - del _ssn.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False - - if ssn.aborting: - for rcv in ssn.receivers: - _ssn.message_stop(rcv.destination) - _ssn.sync() - - messages = _ssn.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - _ssn.receiver._completed.add_range(range) - _ssn.channel.session_completed(_ssn.receiver._completed) - _ssn.message_release(ids) - _ssn.tx_rollback(sync=True) - _ssn.sync() - - del ssn.incoming[:] - del ssn.unacked[:] - del _ssn.acked[:] - - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False - - def grant(self, rcv): - _ssn = self._attachments[rcv.session] - _rcv = self.link_in(rcv) - - if rcv.granted is UNLIMITED: - if rcv.impending is UNLIMITED: - delta = 0 - else: - delta = UNLIMITED - elif rcv.impending is UNLIMITED: - delta = -1 - else: - delta = max(rcv.granted, rcv.received) - rcv.impending - - if delta is UNLIMITED: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) - rcv.impending = UNLIMITED - elif delta > 0: - _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) - _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) - rcv.impending += delta - elif delta < 0: - if rcv.drain: - _ssn.message_flush(rcv.destination, sync=True) - else: - _ssn.message_stop(rcv.destination, sync=True) - # XXX: need to kill syncs - _ssn.sync() - rcv.impending = rcv.received - self.grant(rcv) - - def process_receiver(self, rcv): - if rcv.closed: return - self.grant(rcv) - - def send(self, snd, msg): - _ssn = self._attachments[snd.session] - _snd = self.link_out(snd) - - # XXX: what if subject is specified for a normal queue? - if _snd._routing_key is None: - rk = msg.subject - else: - rk = _snd._routing_key - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if msg.reply_to: - rt = _ssn.reply_to(*parse_addr(msg.reply_to)) - else: - rt = None - dp = _ssn.delivery_properties(routing_key=rk) - mp = _ssn.message_properties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - content_type=msg.content_type, - application_headers=msg.properties) - if msg.subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["subject"] = msg.subject - if msg.to is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers["to"] = msg.to - if msg.durable: - dp.delivery_mode = delivery_mode.persistent - enc, dec = get_codec(msg.content_type) - body = enc(msg.content) - _ssn.message_transfer(destination=_snd._exchange, - message=Message010(dp, mp, body), - sync=True) - log.debug("SENT [%s] %s", snd.session, msg) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - # XXX: should we log the ack somehow too? - snd.acked += 1 - - @synchronized - def _message_transfer(self, ssn, cmd): - m = Message010(cmd.payload) - m.headers = cmd.headers - m.id = cmd.id - msg = self._decode(m) - rcv = ssn.receivers[int(cmd.destination)] - msg._receiver = rcv - if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending - rcv.received += 1 - log.debug("RECV [%s] %s", ssn, msg) - ssn.incoming.append(msg) - self.notifyAll() - return INCOMPLETE - - def _decode(self, message): - dp = message.get("delivery_properties") - mp = message.get("message_properties") - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(message.body) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.to = ap.get("to") - msg.subject = ap.get("subject") - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - msg.durable = dp.delivery_mode == delivery_mode.persistent - msg.properties = mp.application_headers - msg.content_type = mp.content_type - msg._transfer_id = message.id - return msg - __all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message", "ConnectionError", "ConnectError", "SessionError", "Disconnected", "SendError", "InsufficientCapacity", "ReceiveError", "Empty", - "timestamp", "uuid4"] + "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"] diff --git a/qpid/python/qpid/util.py b/qpid/python/qpid/util.py index c46716b88f..3409d777f9 100644 --- a/qpid/python/qpid/util.py +++ b/qpid/python/qpid/util.py @@ -134,3 +134,9 @@ class URL: if self.port: s += ":%s" % self.port return s + +def default(value, default): + if value is None: + return default + else: + return value |
