summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qpid/driver.py446
-rw-r--r--qpid/python/qpid/lockable.py68
-rw-r--r--qpid/python/qpid/messaging.py617
-rw-r--r--qpid/python/qpid/util.py6
4 files changed, 597 insertions, 540 deletions
diff --git a/qpid/python/qpid/driver.py b/qpid/python/qpid/driver.py
new file mode 100644
index 0000000000..13aba6320b
--- /dev/null
+++ b/qpid/python/qpid/driver.py
@@ -0,0 +1,446 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import compat, connection, socket, sys, time
+from datatypes import RangedSet, Message as Message010
+from lockable import synchronized, Lockable
+from logging import getLogger
+from messaging import get_codec, Message, Pattern, UNLIMITED
+from ops import delivery_mode
+from session import Client, INCOMPLETE, SessionDetached
+from threading import Condition, Thread
+from util import connect
+
+log = getLogger("qpid.messaging")
+
+def parse_addr(address):
+ parts = address.split("/", 1)
+ if len(parts) == 1:
+ return parts[0], None
+ else:
+ return parts[0], parts[i1]
+
+def reply_to2addr(reply_to):
+ if reply_to.routing_key is None:
+ return reply_to.exchange
+ elif reply_to.exchange in (None, ""):
+ return reply_to.routing_key
+ else:
+ return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
+
+class Attachment:
+
+ def __init__(self, target):
+ self.target = target
+
+DURABLE_DEFAULT=True
+
+FILTER_DEFAULTS = {
+ "topic": Pattern("*")
+ }
+
+def delegate(handler, session):
+ class Delegate(Client):
+
+ def message_transfer(self, cmd):
+ return handler._message_transfer(session, cmd)
+ return Delegate
+
+class Driver(Lockable):
+
+ def __init__(self, connection):
+ self.connection = connection
+ self._lock = self.connection._lock
+ self._condition = self.connection._condition
+ self._wakeup_cond = Condition()
+ self._socket = None
+ self._conn = None
+ self._connected = False
+ self._attachments = {}
+ self._modcount = self.connection._modcount
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+ # XXX: need to figure out how to join on this thread
+
+ def start(self):
+ self.thread.start()
+
+ def wakeup(self):
+ self._wakeup_cond.acquire()
+ try:
+ self._wakeup_cond.notifyAll()
+ finally:
+ self._wakeup_cond.release()
+
+ def start(self):
+ self.thread.start()
+
+ def run(self):
+ while True:
+ self._wakeup_cond.acquire()
+ try:
+ if self.connection._modcount <= self._modcount:
+ self._wakeup_cond.wait(10)
+ finally:
+ self._wakeup_cond.release()
+ self.dispatch(self.connection._modcount)
+
+ @synchronized
+ def dispatch(self, modcount):
+ try:
+ if self._conn is None and self.connection._connected:
+ self.connect()
+ elif self._conn is not None and not self.connection._connected:
+ self.disconnect()
+
+ if self._conn is not None:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+
+ exi = None
+ except:
+ exi = sys.exc_info()
+
+ if exi:
+ msg = compat.format_exc()
+ recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
+ "Bad file descriptor", "start timed out", "Broken pipe"]
+ for r in recoverable:
+ if self.connection.reconnect and r in msg:
+ print "waiting to retry"
+ self.reset()
+ time.sleep(3)
+ print "retrying..."
+ return
+ else:
+ self.connection.error = (msg,)
+
+ self._modcount = modcount
+ self.notifyAll()
+
+ def connect(self):
+ if self._conn is not None:
+ return
+ try:
+ self._socket = connect(self.connection.host, self.connection.port)
+ except socket.error, e:
+ raise ConnectError(e)
+ self._conn = connection.Connection(self._socket)
+ try:
+ self._conn.start(timeout=10)
+ self._connected = True
+ except connection.VersionError, e:
+ raise ConnectError(e)
+ except Timeout:
+ print "start timed out"
+ raise ConnectError("start timed out")
+
+ def disconnect(self):
+ self._conn.close()
+ self.reset()
+
+ def reset(self):
+ self._conn = None
+ self._connected = False
+ self._attachments.clear()
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+
+ def connected(self):
+ return self._conn is not None
+
+ def attach(self, ssn):
+ _ssn = self._attachments.get(ssn)
+ if _ssn is None:
+ _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
+ _ssn.auto_sync = False
+ _ssn.invoke_lock = self._lock
+ _ssn.lock = self._lock
+ _ssn.condition = self._condition
+ if ssn.transactional:
+ # XXX: adding an attribute to qpid.session.Session
+ _ssn.acked = []
+ _ssn.tx_select()
+ self._attachments[ssn] = _ssn
+
+ for snd in ssn.senders:
+ self.link_out(snd)
+ for rcv in ssn.receivers:
+ self.link_in(rcv)
+
+ if ssn.closing:
+ _ssn.close()
+ del self._attachments[ssn]
+
+ def _exchange_query(self, ssn, address):
+ # XXX: auto sync hack is to avoid deadlock on future
+ result = ssn.exchange_query(name=address, sync=True)
+ ssn.sync()
+ return result.get()
+
+ def link_out(self, snd):
+ _ssn = self._attachments[snd.session]
+ _snd = self._attachments.get(snd)
+ if _snd is None:
+ _snd = Attachment(snd)
+ node, _snd._subject = parse_addr(snd.target)
+ result = self._exchange_query(_ssn, node)
+ if result.not_found:
+ # XXX: should check 'create' option
+ _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
+ _ssn.sync()
+ _snd._exchange = ""
+ _snd._routing_key = node
+ else:
+ _snd._exchange = node
+ _snd._routing_key = _snd._subject
+ self._attachments[snd] = _snd
+
+ if snd.closed:
+ del self._attachments[snd]
+ return None
+ else:
+ return _snd
+
+ def link_in(self, rcv):
+ _ssn = self._attachments[rcv.session]
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None:
+ _rcv = Attachment(rcv)
+ result = self._exchange_query(_ssn, rcv.source)
+ if result.not_found:
+ _rcv._queue = rcv.source
+ # XXX: should check 'create' option
+ _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
+ else:
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
+ if rcv.filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ else:
+ f = rcv.filter
+ f._bind(_ssn, rcv.source, _rcv._queue)
+ _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
+ _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+ self._attachments[rcv] = _rcv
+ # XXX: need to kill syncs
+ _ssn.sync()
+
+ if rcv.closing:
+ _ssn.message_cancel(rcv.destination, sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ del self._attachments[rcv]
+ rcv.closed = True
+ return None
+ else:
+ return _rcv
+
+ def process(self, ssn):
+ if ssn.closing: return
+
+ _ssn = self._attachments[ssn]
+
+ while ssn.outgoing:
+ msg = ssn.outgoing[0]
+ snd = msg._sender
+ self.send(snd, msg)
+ ssn.outgoing.pop(0)
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ if ssn.acked:
+ messages = ssn.acked[:]
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ _ssn.receiver._completed.add_range(range)
+ ch = _ssn.channel
+ if ch is None:
+ raise SessionDetached()
+ ch.session_completed(_ssn.receiver._completed)
+ _ssn.message_accept(ids, sync=True)
+ # XXX: really need to make this async so that we don't give up the lock
+ _ssn.sync()
+
+ # XXX: we're ignoring acks that get lost when disconnected
+ for m in messages:
+ ssn.acked.remove(m)
+ if ssn.transactional:
+ _ssn.acked.append(m)
+
+ if ssn.committing:
+ _ssn.tx_commit(sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ del _ssn.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+
+ if ssn.aborting:
+ for rcv in ssn.receivers:
+ _ssn.message_stop(rcv.destination)
+ _ssn.sync()
+
+ messages = _ssn.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ _ssn.receiver._completed.add_range(range)
+ _ssn.channel.session_completed(_ssn.receiver._completed)
+ _ssn.message_release(ids)
+ _ssn.tx_rollback(sync=True)
+ _ssn.sync()
+
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del _ssn.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+
+ def grant(self, rcv):
+ _ssn = self._attachments[rcv.session]
+ _rcv = self.link_in(rcv)
+
+ if rcv.granted is UNLIMITED:
+ if rcv.impending is UNLIMITED:
+ delta = 0
+ else:
+ delta = UNLIMITED
+ elif rcv.impending is UNLIMITED:
+ delta = -1
+ else:
+ delta = max(rcv.granted, rcv.received) - rcv.impending
+
+ if delta is UNLIMITED:
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ rcv.impending = UNLIMITED
+ elif delta > 0:
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+ rcv.impending += delta
+ elif delta < 0:
+ if rcv.drain:
+ _ssn.message_flush(rcv.destination, sync=True)
+ else:
+ _ssn.message_stop(rcv.destination, sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ rcv.impending = rcv.received
+ self.grant(rcv)
+
+ def process_receiver(self, rcv):
+ if rcv.closed: return
+ self.grant(rcv)
+
+ def send(self, snd, msg):
+ _ssn = self._attachments[snd.session]
+ _snd = self.link_out(snd)
+
+ # XXX: what if subject is specified for a normal queue?
+ if _snd._routing_key is None:
+ rk = msg.subject
+ else:
+ rk = _snd._routing_key
+ # XXX: do we need to query to figure out how to create the reply-to interoperably?
+ if msg.reply_to:
+ rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+ else:
+ rt = None
+ dp = _ssn.delivery_properties(routing_key=rk)
+ mp = _ssn.message_properties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
+ if msg.subject is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["subject"] = msg.subject
+ if msg.to is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["to"] = msg.to
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ enc, dec = get_codec(msg.content_type)
+ body = enc(msg.content)
+ _ssn.message_transfer(destination=_snd._exchange,
+ message=Message010(dp, mp, body),
+ sync=True)
+ log.debug("SENT [%s] %s", snd.session, msg)
+ # XXX: really need to make this async so that we don't give up the lock
+ _ssn.sync()
+ # XXX: should we log the ack somehow too?
+ snd.acked += 1
+
+ @synchronized
+ def _message_transfer(self, ssn, cmd):
+ m = Message010(cmd.payload)
+ m.headers = cmd.headers
+ m.id = cmd.id
+ msg = self._decode(m)
+ rcv = ssn.receivers[int(cmd.destination)]
+ msg._receiver = rcv
+ if rcv.impending is not UNLIMITED:
+ assert rcv.received < rcv.impending
+ rcv.received += 1
+ log.debug("RECV [%s] %s", ssn, msg)
+ ssn.incoming.append(msg)
+ self.notifyAll()
+ return INCOMPLETE
+
+ def _decode(self, message):
+ dp = message.get("delivery_properties")
+ mp = message.get("message_properties")
+ ap = mp.application_headers
+ enc, dec = get_codec(mp.content_type)
+ content = dec(message.body)
+ msg = Message(content)
+ msg.id = mp.message_id
+ if ap is not None:
+ msg.to = ap.get("to")
+ msg.subject = ap.get("subject")
+ msg.user_id = mp.user_id
+ if mp.reply_to is not None:
+ msg.reply_to = reply_to2addr(mp.reply_to)
+ msg.correlation_id = mp.correlation_id
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
+ msg.properties = mp.application_headers
+ msg.content_type = mp.content_type
+ msg._transfer_id = message.id
+ return msg
diff --git a/qpid/python/qpid/lockable.py b/qpid/python/qpid/lockable.py
new file mode 100644
index 0000000000..0415d53e27
--- /dev/null
+++ b/qpid/python/qpid/lockable.py
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import inspect, time
+
+def synchronized(meth):
+ args, vargs, kwargs, defs = inspect.getargspec(meth)
+ scope = {}
+ scope["meth"] = meth
+ exec """
+def %s%s:
+ %s
+ %s.lock()
+ try:
+ return meth%s
+ finally:
+ %s.unlock()
+""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs),
+ repr(inspect.getdoc(meth)), args[0],
+ inspect.formatargspec(args, vargs, kwargs, defs,
+ formatvalue=lambda x: ""),
+ args[0]) in scope
+ return scope[meth.__name__]
+
+class Lockable(object):
+
+ def lock(self):
+ self._lock.acquire()
+
+ def unlock(self):
+ self._lock.release()
+
+ def wait(self, predicate, timeout=None):
+ passed = 0
+ start = time.time()
+ while not predicate():
+ if timeout is None:
+ # using the timed wait prevents keyboard interrupts from being
+ # blocked while waiting
+ self._condition.wait(3)
+ elif passed < timeout:
+ self._condition.wait(timeout - passed)
+ else:
+ return False
+ passed = time.time() - start
+ return True
+
+ def notify(self):
+ self._condition.notify()
+
+ def notifyAll(self):
+ self._condition.notifyAll()
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index 8e14072c59..d6d8a9ec04 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,74 +30,19 @@ Areas that still need work:
- protocol negotiation/multiprotocol impl
"""
-import connection, time, socket, sys, compat, inspect
from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial
+from datatypes import timestamp, uuid4, Serial
from exceptions import Timeout
+from lockable import synchronized, Lockable
from logging import getLogger
-from ops import PRIMITIVE, delivery_mode
-from session import Client, INCOMPLETE, SessionDetached
+from ops import PRIMITIVE
from threading import Thread, RLock, Condition
-from util import connect
+from util import default
log = getLogger("qpid.messaging")
static = staticmethod
-def synchronized(meth):
- args, vargs, kwargs, defs = inspect.getargspec(meth)
- scope = {}
- scope["meth"] = meth
- exec """
-def %s%s:
- %s
- %s.lock()
- try:
- return meth%s
- finally:
- %s.unlock()
-""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs),
- repr(inspect.getdoc(meth)), args[0],
- inspect.formatargspec(args, vargs, kwargs, defs,
- formatvalue=lambda x: ""),
- args[0]) in scope
- return scope[meth.__name__]
-
-class Lockable(object):
-
- def lock(self):
- self._lock.acquire()
-
- def unlock(self):
- self._lock.release()
-
- def wait(self, predicate, timeout=None):
- passed = 0
- start = time.time()
- while not predicate():
- if timeout is None:
- # using the timed wait prevents keyboard interrupts from being
- # blocked while waiting
- self._condition.wait(3)
- elif passed < timeout:
- self._condition.wait(timeout - passed)
- else:
- return False
- passed = time.time() - start
- return True
-
- def notify(self):
- self._condition.notify()
-
- def notifyAll(self):
- self._condition.notifyAll()
-
-def default(value, default):
- if value is None:
- return default
- else:
- return value
-
AMQP_PORT = 5672
AMQPS_PORT = 5671
@@ -113,9 +58,16 @@ class Constant:
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
class ConnectionError(Exception):
+ """
+ The base class for all connection related exceptions.
+ """
pass
class ConnectError(ConnectionError):
+ """
+ Exception raised when there is an error connecting to the remote
+ peer.
+ """
pass
class Connection(Lockable):
@@ -165,25 +117,26 @@ class Connection(Lockable):
self._condition = Condition(self._lock)
self._modcount = Serial(0)
self.error = None
+ from driver import Driver
self._driver = Driver(self)
self._driver.start()
- def wakeup(self):
+ def _wakeup(self):
self._modcount += 1
self._driver.wakeup()
- def catchup(self, exc=ConnectionError):
+ def _catchup(self, exc=ConnectionError):
mc = self._modcount
self.wait(lambda: not self._driver._modcount < mc)
- self.check_error(exc)
+ self._check_error(exc)
- def check_error(self, exc=ConnectionError):
+ def _check_error(self, exc=ConnectionError):
if self.error:
raise exc(*self.error)
- def ewait(self, predicate, timeout=None, exc=ConnectionError):
+ def _ewait(self, predicate, timeout=None, exc=ConnectionError):
result = self.wait(lambda: self.error or predicate(), timeout)
- self.check_error(exc)
+ self._check_error(exc)
return result
@synchronized
@@ -210,7 +163,7 @@ class Connection(Lockable):
else:
ssn = Session(self, name, self.started, transactional=transactional)
self.sessions[name] = ssn
- self.wakeup()
+ self._wakeup()
return ssn
@synchronized
@@ -223,8 +176,8 @@ class Connection(Lockable):
Connect to the remote endpoint.
"""
self._connected = True
- self.wakeup()
- self.ewait(lambda: self._driver._connected, exc=ConnectError)
+ self._wakeup()
+ self._ewait(lambda: self._driver._connected, exc=ConnectError)
@synchronized
def disconnect(self):
@@ -232,8 +185,8 @@ class Connection(Lockable):
Disconnect from the remote endpoint.
"""
self._connected = False
- self.wakeup()
- self.ewait(lambda: not self._driver._connected)
+ self._wakeup()
+ self._ewait(lambda: not self._driver._connected)
@synchronized
def connected(self):
@@ -283,17 +236,6 @@ class Pattern:
ssn.exchange_bind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#"))
-FILTER_DEFAULTS = {
- "topic": Pattern("*")
- }
-
-def delegate(handler, session):
- class Delegate(Client):
-
- def message_transfer(self, cmd):
- handler._message_transfer(session, cmd)
- return Delegate
-
class SessionError(Exception):
pass
@@ -354,17 +296,17 @@ class Session(Lockable):
def __repr__(self):
return "<Session %s>" % self.name
- def wakeup(self):
- self.connection.wakeup()
+ def _wakeup(self):
+ self.connection._wakeup()
- def catchup(self, exc=SessionError):
- self.connection.catchup(exc)
+ def _catchup(self, exc=SessionError):
+ self.connection._catchup(exc)
- def check_error(self, exc=SessionError):
- self.connection.check_error(exc)
+ def _check_error(self, exc=SessionError):
+ self.connection._check_error(exc)
- def ewait(self, predicate, timeout=None, exc=SessionError):
- return self.connection.ewait(predicate, timeout, exc)
+ def _ewait(self, predicate, timeout=None, exc=SessionError):
+ return self.connection._ewait(predicate, timeout, exc)
@synchronized
def sender(self, target):
@@ -379,7 +321,7 @@ class Session(Lockable):
"""
sender = Sender(self, len(self.senders), target)
self.senders.append(sender)
- self.wakeup()
+ self._wakeup()
# XXX: because of the lack of waiting here we can end up getting
# into the driver loop with messages sent for senders that haven't
# been linked yet, something similar can probably happen for
@@ -400,7 +342,7 @@ class Session(Lockable):
receiver = Receiver(self, len(self.receivers), source, filter,
self.started)
self.receivers.append(receiver)
- self.wakeup()
+ self._wakeup()
return receiver
@synchronized
@@ -459,14 +401,14 @@ class Session(Lockable):
if self.ack_capacity <= 0:
# XXX: this is currently a SendError, maybe it should be a SessionError?
raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
- self.wakeup()
- self.ewait(lambda: len(self.acked) < self.ack_capacity)
+ self._wakeup()
+ self._ewait(lambda: len(self.acked) < self.ack_capacity)
self.unacked.remove(m)
self.acked.append(m)
- self.wakeup()
+ self._wakeup()
if sync:
- self.ewait(lambda: not [m for m in messages if m in self.acked])
+ self._ewait(lambda: not [m for m in messages if m in self.acked])
@synchronized
def commit(self):
@@ -477,8 +419,8 @@ class Session(Lockable):
if not self.transactional:
raise NontransactionalSession()
self.committing = True
- self.wakeup()
- self.ewait(lambda: not self.committing)
+ self._wakeup()
+ self._ewait(lambda: not self.committing)
if self.aborted:
raise TransactionAborted()
assert self.committed
@@ -492,8 +434,8 @@ class Session(Lockable):
if not self.transactional:
raise NontransactionalSession()
self.aborting = True
- self.wakeup()
- self.ewait(lambda: not self.aborting)
+ self._wakeup()
+ self._ewait(lambda: not self.aborting)
assert self.aborted
@synchronized
@@ -543,31 +485,16 @@ class Session(Lockable):
link.close()
self.closing = True
- self.wakeup()
- self.catchup()
+ self._wakeup()
+ self._catchup()
self.wait(lambda: self.closed)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
# XXX: should be able to express this condition through API calls
- self.ewait(lambda: not self.outgoing and not self.acked)
+ self._ewait(lambda: not self.outgoing and not self.acked)
self.connection._remove_session(self)
-def parse_addr(address):
- parts = address.split("/", 1)
- if len(parts) == 1:
- return parts[0], None
- else:
- return parts[0], parts[i1]
-
-def reply_to2addr(reply_to):
- if reply_to.routing_key is None:
- return reply_to.exchange
- elif reply_to.exchange in (None, ""):
- return reply_to.routing_key
- else:
- return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-
class SendError(SessionError):
pass
@@ -591,17 +518,17 @@ class Sender(Lockable):
self._lock = self.session._lock
self._condition = self.session._condition
- def wakeup(self):
- self.session.wakeup()
+ def _wakeup(self):
+ self.session._wakeup()
- def catchup(self, exc=SendError):
- self.session.catchup(exc)
+ def _catchup(self, exc=SendError):
+ self.session._catchup(exc)
- def check_error(self, exc=SendError):
- self.session.check_error(exc)
+ def _check_error(self, exc=SendError):
+ self.session._check_error(exc)
- def ewait(self, predicate, timeout=None, exc=SendError):
- return self.session.ewait(predicate, timeout, exc)
+ def _ewait(self, predicate, timeout=None, exc=SendError):
+ return self.session._ewait(predicate, timeout, exc)
@synchronized
def pending(self):
@@ -638,7 +565,7 @@ class Sender(Lockable):
if self.capacity is not UNLIMITED:
if self.capacity <= 0:
raise InsufficientCapacity("capacity = %s" % self.capacity)
- self.ewait(lambda: self.pending() < self.capacity)
+ self._ewait(lambda: self.pending() < self.capacity)
# XXX: what if we send the same message to multiple senders?
message._sender = self
@@ -646,10 +573,10 @@ class Sender(Lockable):
self.queued += 1
mno = self.queued
- self.wakeup()
+ self._wakeup()
if sync:
- self.ewait(lambda: self.acked >= mno)
+ self._ewait(lambda: self.acked >= mno)
assert message not in self.session.outgoing
@synchronized
@@ -701,17 +628,17 @@ class Receiver(Lockable):
self._lock = self.session._lock
self._condition = self.session._condition
- def wakeup(self):
- self.session.wakeup()
+ def _wakeup(self):
+ self.session._wakeup()
- def catchup(self, exc=ReceiveError):
- self.session.catchup()
+ def _catchup(self, exc=ReceiveError):
+ self.session._catchup()
- def check_error(self, exc=ReceiveError):
- self.session.check_error(exc)
+ def _check_error(self, exc=ReceiveError):
+ self.session._check_error(exc)
- def ewait(self, predicate, timeout=None, exc=ReceiveError):
- return self.session.ewait(predicate, timeout, exc)
+ def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+ return self.session._ewait(predicate, timeout, exc)
@synchronized
def pending(self):
@@ -757,23 +684,23 @@ class Receiver(Lockable):
"""
if self._capacity() == 0:
self.granted = self.returned + 1
- self.wakeup()
- self.ewait(lambda: self.impending >= self.granted)
+ self._wakeup()
+ self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
self.drain = True
self.granted = self.received
- self.wakeup()
- self.ewait(lambda: self.impending == self.received)
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
self.drain = False
self._grant()
- self.wakeup()
+ self._wakeup()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
elif self._capacity() not in (0, UNLIMITED.value):
self.granted += 1
- self.wakeup()
+ self._wakeup()
return msg
def _grant(self):
@@ -793,7 +720,7 @@ class Receiver(Lockable):
"""
self.started = True
self._grant()
- self.wakeup()
+ self._wakeup()
@synchronized
def stop(self):
@@ -802,8 +729,8 @@ class Receiver(Lockable):
"""
self.started = False
self._grant()
- self.wakeup()
- self.ewait(lambda: self.impending == self.received)
+ self._wakeup()
+ self._ewait(lambda: self.impending == self.received)
@synchronized
def close(self):
@@ -811,9 +738,9 @@ class Receiver(Lockable):
Close the receiver.
"""
self.closing = True
- self.wakeup()
+ self._wakeup()
try:
- self.ewait(lambda: self.closed)
+ self._ewait(lambda: self.closed)
finally:
self.session.receivers.remove(self)
@@ -900,397 +827,7 @@ class Message:
def __repr__(self):
return "Message(%r)" % self.content
-class Attachment:
-
- def __init__(self, target):
- self.target = target
-
-DURABLE_DEFAULT=True
-
-class Driver(Lockable):
-
- def __init__(self, connection):
- self.connection = connection
- self._lock = self.connection._lock
- self._condition = self.connection._condition
- self._wakeup_cond = Condition()
- self._socket = None
- self._conn = None
- self._connected = False
- self._attachments = {}
- self._modcount = self.connection._modcount
- self.thread = Thread(target=self.run)
- self.thread.setDaemon(True)
- # XXX: need to figure out how to join on this thread
-
- def start(self):
- self.thread.start()
-
- def wakeup(self):
- self._wakeup_cond.acquire()
- try:
- self._wakeup_cond.notifyAll()
- finally:
- self._wakeup_cond.release()
-
- def start(self):
- self.thread.start()
-
- def run(self):
- while True:
- self._wakeup_cond.acquire()
- try:
- if self.connection._modcount <= self._modcount:
- self._wakeup_cond.wait(10)
- finally:
- self._wakeup_cond.release()
- self.dispatch(self.connection._modcount)
-
- @synchronized
- def dispatch(self, modcount):
- try:
- if self._conn is None and self.connection._connected:
- self.connect()
- elif self._conn is not None and not self.connection._connected:
- self.disconnect()
-
- if self._conn is not None:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
-
- exi = None
- except:
- exi = sys.exc_info()
-
- if exi:
- msg = compat.format_exc()
- recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
- "Bad file descriptor", "start timed out", "Broken pipe"]
- for r in recoverable:
- if self.connection.reconnect and r in msg:
- print "waiting to retry"
- self.reset()
- time.sleep(3)
- print "retrying..."
- return
- else:
- self.connection.error = (msg,)
-
- self._modcount = modcount
- self.notifyAll()
-
- def connect(self):
- if self._conn is not None:
- return
- try:
- self._socket = connect(self.connection.host, self.connection.port)
- except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start(timeout=10)
- self._connected = True
- except connection.VersionError, e:
- raise ConnectError(e)
- except Timeout:
- print "start timed out"
- raise ConnectError("start timed out")
-
- def disconnect(self):
- self._conn.close()
- self.reset()
-
- def reset(self):
- self._conn = None
- self._connected = False
- self._attachments.clear()
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
-
- def connected(self):
- return self._conn is not None
-
- def attach(self, ssn):
- _ssn = self._attachments.get(ssn)
- if _ssn is None:
- _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
- _ssn.auto_sync = False
- _ssn.invoke_lock = self._lock
- _ssn.lock = self._lock
- _ssn.condition = self._condition
- if ssn.transactional:
- # XXX: adding an attribute to qpid.session.Session
- _ssn.acked = []
- _ssn.tx_select()
- self._attachments[ssn] = _ssn
-
- for snd in ssn.senders:
- self.link_out(snd)
- for rcv in ssn.receivers:
- self.link_in(rcv)
-
- if ssn.closing:
- _ssn.close()
- del self._attachments[ssn]
-
- def _exchange_query(self, ssn, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = ssn.exchange_query(name=address, sync=True)
- ssn.sync()
- return result.get()
-
- def link_out(self, snd):
- _ssn = self._attachments[snd.session]
- _snd = self._attachments.get(snd)
- if _snd is None:
- _snd = Attachment(snd)
- node, _snd._subject = parse_addr(snd.target)
- result = self._exchange_query(_ssn, node)
- if result.not_found:
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
- _ssn.sync()
- _snd._exchange = ""
- _snd._routing_key = node
- else:
- _snd._exchange = node
- _snd._routing_key = _snd._subject
- self._attachments[snd] = _snd
-
- if snd.closed:
- del self._attachments[snd]
- return None
- else:
- return _snd
-
- def link_in(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self._attachments.get(rcv)
- if _rcv is None:
- _rcv = Attachment(rcv)
- result = self._exchange_query(_ssn, rcv.source)
- if result.not_found:
- _rcv._queue = rcv.source
- # XXX: should check 'create' option
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
- else:
- _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
- _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
- if rcv.filter is None:
- f = FILTER_DEFAULTS[result.type]
- else:
- f = rcv.filter
- f._bind(_ssn, rcv.source, _rcv._queue)
- _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
- _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
- self._attachments[rcv] = _rcv
- # XXX: need to kill syncs
- _ssn.sync()
-
- if rcv.closing:
- _ssn.message_cancel(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del self._attachments[rcv]
- rcv.closed = True
- return None
- else:
- return _rcv
-
- def process(self, ssn):
- if ssn.closing: return
-
- _ssn = self._attachments[ssn]
-
- while ssn.outgoing:
- msg = ssn.outgoing[0]
- snd = msg._sender
- self.send(snd, msg)
- ssn.outgoing.pop(0)
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- if ssn.acked:
- messages = ssn.acked[:]
- ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- ch = _ssn.channel
- if ch is None:
- raise SessionDetached()
- ch.session_completed(_ssn.receiver._completed)
- _ssn.message_accept(ids, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
-
- # XXX: we're ignoring acks that get lost when disconnected
- for m in messages:
- ssn.acked.remove(m)
- if ssn.transactional:
- _ssn.acked.append(m)
-
- if ssn.committing:
- _ssn.tx_commit(sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- del _ssn.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
-
- if ssn.aborting:
- for rcv in ssn.receivers:
- _ssn.message_stop(rcv.destination)
- _ssn.sync()
-
- messages = _ssn.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- _ssn.receiver._completed.add_range(range)
- _ssn.channel.session_completed(_ssn.receiver._completed)
- _ssn.message_release(ids)
- _ssn.tx_rollback(sync=True)
- _ssn.sync()
-
- del ssn.incoming[:]
- del ssn.unacked[:]
- del _ssn.acked[:]
-
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
-
- def grant(self, rcv):
- _ssn = self._attachments[rcv.session]
- _rcv = self.link_in(rcv)
-
- if rcv.granted is UNLIMITED:
- if rcv.impending is UNLIMITED:
- delta = 0
- else:
- delta = UNLIMITED
- elif rcv.impending is UNLIMITED:
- delta = -1
- else:
- delta = max(rcv.granted, rcv.received) - rcv.impending
-
- if delta is UNLIMITED:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
- rcv.impending = UNLIMITED
- elif delta > 0:
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
- _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
- rcv.impending += delta
- elif delta < 0:
- if rcv.drain:
- _ssn.message_flush(rcv.destination, sync=True)
- else:
- _ssn.message_stop(rcv.destination, sync=True)
- # XXX: need to kill syncs
- _ssn.sync()
- rcv.impending = rcv.received
- self.grant(rcv)
-
- def process_receiver(self, rcv):
- if rcv.closed: return
- self.grant(rcv)
-
- def send(self, snd, msg):
- _ssn = self._attachments[snd.session]
- _snd = self.link_out(snd)
-
- # XXX: what if subject is specified for a normal queue?
- if _snd._routing_key is None:
- rk = msg.subject
- else:
- rk = _snd._routing_key
- # XXX: do we need to query to figure out how to create the reply-to interoperably?
- if msg.reply_to:
- rt = _ssn.reply_to(*parse_addr(msg.reply_to))
- else:
- rt = None
- dp = _ssn.delivery_properties(routing_key=rk)
- mp = _ssn.message_properties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- content_type=msg.content_type,
- application_headers=msg.properties)
- if msg.subject is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["subject"] = msg.subject
- if msg.to is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["to"] = msg.to
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
- enc, dec = get_codec(msg.content_type)
- body = enc(msg.content)
- _ssn.message_transfer(destination=_snd._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", snd.session, msg)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- # XXX: should we log the ack somehow too?
- snd.acked += 1
-
- @synchronized
- def _message_transfer(self, ssn, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = ssn.receivers[int(cmd.destination)]
- msg._receiver = rcv
- if rcv.impending is not UNLIMITED:
- assert rcv.received < rcv.impending
- rcv.received += 1
- log.debug("RECV [%s] %s", ssn, msg)
- ssn.incoming.append(msg)
- self.notifyAll()
- return INCOMPLETE
-
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
- ap = mp.application_headers
- enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
- msg = Message(content)
- msg.id = mp.message_id
- if ap is not None:
- msg.to = ap.get("to")
- msg.subject = ap.get("subject")
- msg.user_id = mp.user_id
- if mp.reply_to is not None:
- msg.reply_to = reply_to2addr(mp.reply_to)
- msg.correlation_id = mp.correlation_id
- msg.durable = dp.delivery_mode == delivery_mode.persistent
- msg.properties = mp.application_headers
- msg.content_type = mp.content_type
- msg._transfer_id = message.id
- return msg
-
__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
"ConnectionError", "ConnectError", "SessionError", "Disconnected",
"SendError", "InsufficientCapacity", "ReceiveError", "Empty",
- "timestamp", "uuid4"]
+ "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]
diff --git a/qpid/python/qpid/util.py b/qpid/python/qpid/util.py
index c46716b88f..3409d777f9 100644
--- a/qpid/python/qpid/util.py
+++ b/qpid/python/qpid/util.py
@@ -134,3 +134,9 @@ class URL:
if self.port:
s += ":%s" % self.port
return s
+
+def default(value, default):
+ if value is None:
+ return default
+ else:
+ return value