summaryrefslogtreecommitdiff
path: root/python/qpid/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r--python/qpid/messaging/__init__.py35
-rw-r--r--python/qpid/messaging/address.py172
-rw-r--r--python/qpid/messaging/constants.py40
-rw-r--r--python/qpid/messaging/driver.py1329
-rw-r--r--python/qpid/messaging/endpoints.py1046
-rw-r--r--python/qpid/messaging/exceptions.py156
-rw-r--r--python/qpid/messaging/message.py173
-rw-r--r--python/qpid/messaging/transports.py116
-rw-r--r--python/qpid/messaging/util.py61
9 files changed, 0 insertions, 3128 deletions
diff --git a/python/qpid/messaging/__init__.py b/python/qpid/messaging/__init__.py
deleted file mode 100644
index f9ddda2e80..0000000000
--- a/python/qpid/messaging/__init__.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-A candidate high level messaging API for python.
-
-Areas that still need work:
-
- - definition of the arguments for L{Session.sender} and L{Session.receiver}
- - standard L{Message} properties
- - L{Message} content encoding
- - protocol negotiation/multiprotocol impl
-"""
-
-from qpid.datatypes import timestamp, uuid4, Serial
-from qpid.messaging.constants import *
-from qpid.messaging.endpoints import *
-from qpid.messaging.exceptions import *
-from qpid.messaging.message import *
diff --git a/python/qpid/messaging/address.py b/python/qpid/messaging/address.py
deleted file mode 100644
index e423f09193..0000000000
--- a/python/qpid/messaging/address.py
+++ /dev/null
@@ -1,172 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import re
-from qpid.lexer import Lexicon, LexError
-from qpid.parser import Parser, ParseError
-
-l = Lexicon()
-
-LBRACE = l.define("LBRACE", r"\{")
-RBRACE = l.define("RBRACE", r"\}")
-LBRACK = l.define("LBRACK", r"\[")
-RBRACK = l.define("RBRACK", r"\]")
-COLON = l.define("COLON", r":")
-SEMI = l.define("SEMI", r";")
-SLASH = l.define("SLASH", r"/")
-COMMA = l.define("COMMA", r",")
-NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+')
-ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?')
-STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""")
-ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]")
-SYM = l.define("SYM", r"[.#*%@$^!+-]")
-WSPACE = l.define("WSPACE", r"[ \n\r\t]+")
-EOF = l.eof("EOF")
-
-LEXER = l.compile()
-
-def lex(st):
- return LEXER.lex(st)
-
-def tok2str(tok):
- if tok.type is STRING:
- return eval(tok.value)
- elif tok.type is ESC:
- if tok.value[1] == "x":
- return eval('"%s"' % tok.value)
- elif tok.value[1] == "u":
- return eval('u"%s"' % tok.value)
- else:
- return tok.value[1]
- else:
- return tok.value
-
-CONSTANTS = {
- "True": True,
- "true": True,
- "False": False,
- "false": False,
- "None": None
- }
-
-def tok2obj(tok):
- if tok.type == ID:
- return CONSTANTS.get(tok.value, tok.value)
- elif tok.type in (STRING, NUMBER):
- return eval(tok.value)
- else:
- return tok.value
-
-def toks2str(toks):
- if toks:
- return "".join(map(tok2str, toks))
- else:
- return None
-
-class AddressParser(Parser):
-
- def __init__(self, tokens):
- Parser.__init__(self, [t for t in tokens if t.type is not WSPACE])
-
- def parse(self):
- result = self.address()
- self.eat(EOF)
- return result
-
- def address(self):
- name = toks2str(self.eat_until(SLASH, SEMI, EOF))
-
- if name is None:
- raise ParseError(self.next())
-
- if self.matches(SLASH):
- self.eat(SLASH)
- subject = toks2str(self.eat_until(SEMI, EOF))
- else:
- subject = None
-
- if self.matches(SEMI):
- self.eat(SEMI)
- options = self.map()
- else:
- options = None
- return name, subject, options
-
- def map(self):
- self.eat(LBRACE)
-
- result = {}
- while True:
- if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK):
- n, v = self.keyval()
- result[n] = v
- if self.matches(COMMA):
- self.eat(COMMA)
- elif self.matches(RBRACE):
- break
- else:
- raise ParseError(self.next(), COMMA, RBRACE)
- elif self.matches(RBRACE):
- break
- else:
- raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK,
- RBRACE)
-
- self.eat(RBRACE)
- return result
-
- def keyval(self):
- key = self.value()
- self.eat(COLON)
- val = self.value()
- return (key, val)
-
- def value(self):
- if self.matches(NUMBER, STRING, ID):
- return tok2obj(self.eat())
- elif self.matches(LBRACE):
- return self.map()
- elif self.matches(LBRACK):
- return self.list()
- else:
- raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK)
-
- def list(self):
- self.eat(LBRACK)
-
- result = []
-
- while True:
- if self.matches(RBRACK):
- break
- else:
- result.append(self.value())
- if self.matches(COMMA):
- self.eat(COMMA)
- elif self.matches(RBRACK):
- break
- else:
- raise ParseError(self.next(), COMMA, RBRACK)
-
- self.eat(RBRACK)
- return result
-
-def parse(addr):
- return AddressParser(lex(addr)).parse()
-
-__all__ = ["parse", "ParseError"]
diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py
deleted file mode 100644
index f230c4def8..0000000000
--- a/python/qpid/messaging/constants.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-__SELF__ = object()
-
-class Constant:
-
- def __init__(self, name, value=__SELF__):
- self.name = name
- if value is __SELF__:
- self.value = self
- else:
- self.value = value
-
- def __repr__(self):
- return self.name
-
-AMQP_PORT = 5672
-AMQPS_PORT = 5671
-
-UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
-
-REJECTED = Constant("REJECTED")
-RELEASED = Constant("RELEASED")
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
deleted file mode 100644
index 78af2827df..0000000000
--- a/python/qpid/messaging/driver.py
+++ /dev/null
@@ -1,1329 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import socket, struct, sys, time
-from logging import getLogger, DEBUG
-from qpid import compat
-from qpid import sasl
-from qpid.concurrency import synchronized
-from qpid.datatypes import RangedSet, Serial
-from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \
- FrameDecoder, SegmentDecoder, OpDecoder
-from qpid.messaging import address, transports
-from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED
-from qpid.messaging.exceptions import *
-from qpid.messaging.message import get_codec, Disposition, Message
-from qpid.ops import *
-from qpid.selector import Selector
-from qpid.util import URL, default
-from qpid.validator import And, Context, List, Map, Types, Values
-from threading import Condition, Thread
-
-log = getLogger("qpid.messaging")
-rawlog = getLogger("qpid.messaging.io.raw")
-opslog = getLogger("qpid.messaging.io.ops")
-
-def addr2reply_to(addr):
- name, subject, options = address.parse(addr)
- if options:
- type = options.get("node", {}).get("type")
- else:
- type = None
-
- if type == "topic":
- return ReplyTo(name, subject)
- else:
- return ReplyTo(None, name)
-
-def reply_to2addr(reply_to):
- if reply_to.exchange in (None, ""):
- return reply_to.routing_key
- elif reply_to.routing_key is None:
- return "%s; {node: {type: topic}}" % reply_to.exchange
- else:
- return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key)
-
-class Attachment:
-
- def __init__(self, target):
- self.target = target
-
-# XXX
-
-DURABLE_DEFAULT=False
-
-# XXX
-
-class Pattern:
- """
- The pattern filter matches the supplied wildcard pattern against a
- message subject.
- """
-
- def __init__(self, value):
- self.value = value
-
- # XXX: this should become part of the driver
- def _bind(self, sst, exchange, queue):
- from qpid.ops import ExchangeBind
-
- sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue,
- binding_key=self.value.replace("*", "#")))
-
-SUBJECT_DEFAULTS = {
- "topic": "#"
- }
-
-# XXX
-ppid = 0
-try:
- ppid = os.getppid()
-except:
- pass
-
-CLIENT_PROPERTIES = {"product": "qpid python client",
- "version": "development",
- "platform": os.name,
- "qpid.client_process": os.path.basename(sys.argv[0]),
- "qpid.client_pid": os.getpid(),
- "qpid.client_ppid": ppid}
-
-def noop(): pass
-def sync_noop(): pass
-
-class SessionState:
-
- def __init__(self, driver, session, name, channel):
- self.driver = driver
- self.session = session
- self.name = name
- self.channel = channel
- self.detached = False
- self.committing = False
- self.aborting = False
-
- # sender state
- self.sent = Serial(0)
- self.acknowledged = RangedSet()
- self.actions = {}
- self.min_completion = self.sent
- self.max_completion = self.sent
- self.results = {}
- self.need_sync = False
-
- # receiver state
- self.received = None
- self.executed = RangedSet()
-
- # XXX: need to periodically exchange completion/known_completion
-
- self.destinations = {}
-
- def write_query(self, query, handler):
- id = self.sent
- self.write_cmd(query, lambda: handler(self.results.pop(id)))
-
- def apply_overrides(self, cmd, overrides):
- for k, v in overrides.items():
- cmd[k.replace('-', '_')] = v
-
- def write_cmd(self, cmd, action=noop, overrides=None, sync=True):
- if overrides:
- self.apply_overrides(cmd, overrides)
-
- if action != noop:
- cmd.sync = sync
- if self.detached:
- raise Exception("detached")
- cmd.id = self.sent
- self.sent += 1
- self.actions[cmd.id] = action
- self.max_completion = cmd.id
- self.write_op(cmd)
- self.need_sync = not cmd.sync
-
- def write_cmds(self, cmds, action=noop):
- if cmds:
- for cmd in cmds[:-1]:
- self.write_cmd(cmd)
- self.write_cmd(cmds[-1], action)
- else:
- action()
-
- def write_op(self, op):
- op.channel = self.channel
- self.driver.write_op(op)
-
-POLICIES = Values("always", "sender", "receiver", "never")
-RELIABILITY = Values("unreliable", "at-most-once", "at-least-once",
- "exactly-once")
-
-DECLARE = Map({}, restricted=False)
-BINDINGS = List(Map({
- "exchange": Types(basestring),
- "queue": Types(basestring),
- "key": Types(basestring),
- "arguments": Map({}, restricted=False)
- }))
-
-COMMON_OPTS = {
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-declare": DECLARE,
- "x-bindings": BINDINGS
- }),
- "link": Map({
- "name": Types(basestring),
- "durable": Types(bool),
- "reliability": RELIABILITY,
- "x-declare": DECLARE,
- "x-bindings": BINDINGS,
- "x-subscribe": Map({}, restricted=False)
- })
- }
-
-RECEIVE_MODES = Values("browse", "consume")
-
-SOURCE_OPTS = COMMON_OPTS.copy()
-SOURCE_OPTS.update({
- "mode": RECEIVE_MODES
- })
-
-TARGET_OPTS = COMMON_OPTS.copy()
-
-class LinkIn:
-
- ADDR_NAME = "source"
- DIR_NAME = "receiver"
- VALIDATOR = Map(SOURCE_OPTS)
-
- def init_link(self, sst, rcv, _rcv):
- _rcv.destination = str(rcv.id)
- sst.destinations[_rcv.destination] = _rcv
- _rcv.draining = False
- _rcv.bytes_open = False
- _rcv.on_unlink = []
-
- def do_link(self, sst, rcv, _rcv, type, subtype, action):
- link_opts = _rcv.options.get("link", {})
- reliability = link_opts.get("reliability", "at-least-once")
- declare = link_opts.get("x-declare", {})
- subscribe = link_opts.get("x-subscribe", {})
- acq_mode = acquire_mode.pre_acquired
- if reliability in ("unreliable", "at-most-once"):
- rcv._accept_mode = accept_mode.none
- else:
- rcv._accept_mode = accept_mode.explicit
-
- if type == "topic":
- default_name = "%s.%s" % (rcv.session.name, _rcv.destination)
- _rcv._queue = link_opts.get("name", default_name)
- sst.write_cmd(QueueDeclare(queue=_rcv._queue,
- durable=link_opts.get("durable", False),
- exclusive=True,
- auto_delete=(reliability == "unreliable")),
- overrides=declare)
- _rcv.on_unlink = [QueueDelete(_rcv._queue)]
- subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype)
- bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject)
- if not bindings:
- sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject))
-
- elif type == "queue":
- _rcv._queue = _rcv.name
- if _rcv.options.get("mode", "consume") == "browse":
- acq_mode = acquire_mode.not_acquired
- bindings = get_bindings(link_opts, queue=_rcv._queue)
-
-
- sst.write_cmds(bindings)
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue,
- destination=_rcv.destination,
- acquire_mode = acq_mode,
- accept_mode = rcv._accept_mode),
- overrides=subscribe)
- sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
-
- def do_unlink(self, sst, rcv, _rcv, action=noop):
- link_opts = _rcv.options.get("link", {})
- reliability = link_opts.get("reliability")
- cmds = [MessageCancel(_rcv.destination)]
- cmds.extend(_rcv.on_unlink)
- sst.write_cmds(cmds, action)
-
- def del_link(self, sst, rcv, _rcv):
- del sst.destinations[_rcv.destination]
-
-class LinkOut:
-
- ADDR_NAME = "target"
- DIR_NAME = "sender"
- VALIDATOR = Map(TARGET_OPTS)
-
- def init_link(self, sst, snd, _snd):
- _snd.closing = False
- _snd.pre_ack = False
-
- def do_link(self, sst, snd, _snd, type, subtype, action):
- link_opts = _snd.options.get("link", {})
- reliability = link_opts.get("reliability", "at-least-once")
- _snd.pre_ack = reliability in ("unreliable", "at-most-once")
- if type == "topic":
- _snd._exchange = _snd.name
- _snd._routing_key = _snd.subject
- bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject)
- elif type == "queue":
- _snd._exchange = ""
- _snd._routing_key = _snd.name
- bindings = get_bindings(link_opts, queue=_snd.name)
- sst.write_cmds(bindings, action)
-
- def do_unlink(self, sst, snd, _snd, action=noop):
- action()
-
- def del_link(self, sst, snd, _snd):
- pass
-
-class Cache:
-
- def __init__(self, ttl):
- self.ttl = ttl
- self.entries = {}
-
- def __setitem__(self, key, value):
- self.entries[key] = time.time(), value
-
- def __getitem__(self, key):
- tstamp, value = self.entries[key]
- if time.time() - tstamp >= self.ttl:
- del self.entries[key]
- raise KeyError(key)
- else:
- return value
-
- def __delitem__(self, key):
- del self.entries[key]
-
-# XXX
-HEADER="!4s4B"
-
-EMPTY_DP = DeliveryProperties()
-EMPTY_MP = MessageProperties()
-
-SUBJECT = "qpid.subject"
-
-CLOSED = "CLOSED"
-READ_ONLY = "READ_ONLY"
-WRITE_ONLY = "WRITE_ONLY"
-OPEN = "OPEN"
-
-class Driver:
-
- def __init__(self, connection):
- self.connection = connection
- self.log_id = "%x" % id(self.connection)
- self._lock = self.connection._lock
-
- self._selector = Selector.default()
- self._attempts = 0
- self._delay = self.connection.reconnect_interval_min
- self._reconnect_log = self.connection.reconnect_log
- self._host = 0
- self._retrying = False
- self._next_retry = None
- self._transport = None
-
- self._timeout = None
-
- self.engine = None
-
- def _next_host(self):
- urls = [URL(u) for u in self.connection.reconnect_urls]
- hosts = [(self.connection.host, default(self.connection.port, 5672))] + \
- [(u.host, default(u.port, 5672)) for u in urls]
- if self._host >= len(hosts):
- self._host = 0
- result = hosts[self._host]
- if self._host == 0:
- self._attempts += 1
- self._host = self._host + 1
- return result
-
- def _num_hosts(self):
- return len(self.connection.reconnect_urls) + 1
-
- @synchronized
- def wakeup(self):
- self.dispatch()
- self._selector.wakeup()
-
- def start(self):
- self._selector.register(self)
-
- def stop(self):
- self._selector.unregister(self)
- if self._transport:
- self.st_closed()
-
- def fileno(self):
- return self._transport.fileno()
-
- @synchronized
- def reading(self):
- return self._transport is not None and \
- self._transport.reading(True)
-
- @synchronized
- def writing(self):
- return self._transport is not None and \
- self._transport.writing(self.engine.pending())
-
- @synchronized
- def timing(self):
- return self._timeout
-
- @synchronized
- def readable(self):
- try:
- data = self._transport.recv(64*1024)
- if data is None:
- return
- elif data:
- rawlog.debug("READ[%s]: %r", self.log_id, data)
- self.engine.write(data)
- else:
- self.close_engine()
- except socket.error, e:
- self.close_engine(ConnectionError(text=str(e)))
-
- self.update_status()
-
- self._notify()
-
- def _notify(self):
- if self.connection.error:
- self.connection._condition.gc()
- self.connection._waiter.notifyAll()
-
- def close_engine(self, e=None):
- if e is None:
- e = ConnectionError(text="connection aborted")
-
- if (self.connection.reconnect and
- (self.connection.reconnect_limit is None or
- self.connection.reconnect_limit <= 0 or
- self._attempts <= self.connection.reconnect_limit)):
- if self._host < self._num_hosts():
- delay = 0
- else:
- delay = self._delay
- self._delay = min(2*self._delay,
- self.connection.reconnect_interval_max)
- self._next_retry = time.time() + delay
- if self._reconnect_log:
- log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e))
- if delay > 0:
- log.warn("sleeping %s seconds" % delay)
- self._retrying = True
- self.engine.close()
- else:
- self.engine.close(e)
-
- self.schedule()
-
- def update_status(self):
- status = self.engine.status()
- return getattr(self, "st_%s" % status.lower())()
-
- def st_closed(self):
- # XXX: this log statement seems to sometimes hit when the socket is not connected
- # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername())
- self._transport.close()
- self._transport = None
- self.engine = None
- return True
-
- def st_open(self):
- return False
-
- @synchronized
- def writeable(self):
- notify = False
- try:
- n = self._transport.send(self.engine.peek())
- if n == 0: return
- sent = self.engine.read(n)
- rawlog.debug("SENT[%s]: %r", self.log_id, sent)
- except socket.error, e:
- self.close_engine(e)
- notify = True
-
- if self.update_status() or notify:
- self._notify()
-
- @synchronized
- def timeout(self):
- self.dispatch()
- self._notify()
- self.schedule()
-
- def schedule(self):
- times = []
- if self.connection.heartbeat:
- times.append(time.time() + self.connection.heartbeat)
- if self._next_retry:
- times.append(self._next_retry)
- if times:
- self._timeout = min(times)
- else:
- self._timeout = None
-
- def dispatch(self):
- try:
- if self._transport is None:
- if self.connection._connected and not self.connection.error:
- self.connect()
- else:
- self.engine.dispatch()
- except HeartbeatTimeout, e:
- self.close_engine(e)
- except:
- # XXX: Does socket get leaked if this occurs?
- msg = compat.format_exc()
- self.connection.error = InternalError(text=msg)
-
- def connect(self):
- if self._retrying and time.time() < self._next_retry:
- return
-
- try:
- # XXX: should make this non blocking
- host, port = self._next_host()
- if self._retrying and self._reconnect_log:
- log.warn("trying: %s:%s", host, port)
- self.engine = Engine(self.connection)
- self.engine.open()
- rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port)
- trans = transports.TRANSPORTS.get(self.connection.transport)
- if trans:
- self._transport = trans(self.connection, host, port)
- else:
- raise ConnectError("no such transport: %s" % self.connection.transport)
- if self._retrying and self._reconnect_log:
- log.warn("reconnect succeeded: %s:%s", host, port)
- self._next_retry = None
- self._attempts = 0
- self._host = 0
- self._delay = self.connection.reconnect_interval_min
- self._retrying = False
- self.schedule()
- except socket.error, e:
- self.close_engine(ConnectError(text=str(e)))
-
-DEFAULT_DISPOSITION = Disposition(None)
-
-def get_bindings(opts, queue=None, exchange=None, key=None):
- bindings = opts.get("x-bindings", [])
- cmds = []
- for b in bindings:
- exchange = b.get("exchange", exchange)
- queue = b.get("queue", queue)
- key = b.get("key", key)
- args = b.get("arguments", {})
- cmds.append(ExchangeBind(queue, exchange, key, args))
- return cmds
-
-CONNECTION_ERRS = {
- # anythong not here (i.e. everything right now) will default to
- # connection error
- }
-
-SESSION_ERRS = {
- # anything not here will default to session error
- error_code.unauthorized_access: UnauthorizedAccess,
- error_code.not_found: NotFound,
- error_code.resource_locked: ReceiverError,
- error_code.resource_limit_exceeded: TargetCapacityExceeded,
- error_code.internal_error: ServerError
- }
-
-class Engine:
-
- def __init__(self, connection):
- self.connection = connection
- self.log_id = "%x" % id(self.connection)
- self._closing = False
- self._connected = False
- self._attachments = {}
-
- self._in = LinkIn()
- self._out = LinkOut()
-
- self._channel_max = 65536
- self._channels = 0
- self._sessions = {}
-
- self.address_cache = Cache(self.connection.address_ttl)
-
- self._status = CLOSED
- self._buf = ""
- self._hdr = ""
- self._last_in = None
- self._last_out = None
- self._op_enc = OpEncoder()
- self._seg_enc = SegmentEncoder()
- self._frame_enc = FrameEncoder()
- self._frame_dec = FrameDecoder()
- self._seg_dec = SegmentDecoder()
- self._op_dec = OpDecoder()
-
- self._sasl = sasl.Client()
- if self.connection.username:
- self._sasl.setAttr("username", self.connection.username)
- if self.connection.password:
- self._sasl.setAttr("password", self.connection.password)
- if self.connection.host:
- self._sasl.setAttr("host", self.connection.host)
- self._sasl.setAttr("service", self.connection.sasl_service)
- if self.connection.sasl_min_ssf is not None:
- self._sasl.setAttr("minssf", self.connection.sasl_min_ssf)
- if self.connection.sasl_max_ssf is not None:
- self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf)
- self._sasl.init()
- self._sasl_encode = False
- self._sasl_decode = False
-
- def _reset(self):
- self.connection._transport_connected = False
-
- for ssn in self.connection.sessions.values():
- for m in ssn.acked + ssn.unacked + ssn.incoming:
- m._transfer_id = None
- for snd in ssn.senders:
- snd.linked = False
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.linked = False
-
- def status(self):
- return self._status
-
- def write(self, data):
- self._last_in = time.time()
- try:
- if self._sasl_decode:
- data = self._sasl.decode(data)
-
- if len(self._hdr) < 8:
- r = 8 - len(self._hdr)
- self._hdr += data[:r]
- data = data[r:]
-
- if len(self._hdr) == 8:
- self.do_header(self._hdr)
-
- self._frame_dec.write(data)
- self._seg_dec.write(*self._frame_dec.read())
- self._op_dec.write(*self._seg_dec.read())
- for op in self._op_dec.read():
- self.assign_id(op)
- opslog.debug("RCVD[%s]: %r", self.log_id, op)
- op.dispatch(self)
- self.dispatch()
- except MessagingError, e:
- self.close(e)
- except:
- self.close(InternalError(text=compat.format_exc()))
-
- def close(self, e=None):
- self._reset()
- if e:
- self.connection.error = e
- self._status = CLOSED
-
- def assign_id(self, op):
- if isinstance(op, Command):
- sst = self.get_sst(op)
- op.id = sst.received
- sst.received += 1
-
- def pending(self):
- return len(self._buf)
-
- def read(self, n):
- result = self._buf[:n]
- self._buf = self._buf[n:]
- return result
-
- def peek(self):
- return self._buf
-
- def write_op(self, op):
- opslog.debug("SENT[%s]: %r", self.log_id, op)
- self._op_enc.write(op)
- self._seg_enc.write(*self._op_enc.read())
- self._frame_enc.write(*self._seg_enc.read())
- bytes = self._frame_enc.read()
- if self._sasl_encode:
- bytes = self._sasl.encode(bytes)
- self._buf += bytes
- self._last_out = time.time()
-
- def do_header(self, hdr):
- cli_major = 0; cli_minor = 10
- magic, _, _, major, minor = struct.unpack(HEADER, hdr)
- if major != cli_major or minor != cli_minor:
- raise VersionError(text="client: %s-%s, server: %s-%s" %
- (cli_major, cli_minor, major, minor))
-
- def do_connection_start(self, start):
- if self.connection.sasl_mechanisms:
- permitted = self.connection.sasl_mechanisms.split()
- mechs = [m for m in start.mechanisms if m in permitted]
- else:
- mechs = start.mechanisms
- try:
- mech, initial = self._sasl.start(" ".join(mechs))
- except sasl.SASLError, e:
- raise AuthenticationFailure(text=str(e))
- self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES,
- mechanism=mech, response=initial))
-
- def do_connection_secure(self, secure):
- resp = self._sasl.step(secure.challenge)
- self.write_op(ConnectionSecureOk(response=resp))
-
- def do_connection_tune(self, tune):
- # XXX: is heartbeat protocol specific?
- if tune.channel_max is not None:
- self.channel_max = tune.channel_max
- self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat,
- channel_max=self.channel_max))
- self.write_op(ConnectionOpen())
- self._sasl_encode = True
-
- def do_connection_open_ok(self, open_ok):
- self.connection.auth_username = self._sasl.auth_username()
- self._connected = True
- self._sasl_decode = True
- self.connection._transport_connected = True
-
- def do_connection_heartbeat(self, hrt):
- pass
-
- def do_connection_close(self, close):
- self.write_op(ConnectionCloseOk())
- if close.reply_code != close_code.normal:
- exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError)
- self.connection.error = exc(close.reply_code, close.reply_text)
- # XXX: should we do a half shutdown on the socket here?
- # XXX: we really need to test this, we may end up reporting a
- # connection abort after this, if we were to do a shutdown on read
- # and stop reading, then we wouldn't report the abort, that's
- # probably the right thing to do
-
- def do_connection_close_ok(self, close_ok):
- self.close()
-
- def do_session_attached(self, atc):
- pass
-
- def do_session_command_point(self, cp):
- sst = self.get_sst(cp)
- sst.received = cp.command_id
-
- def do_session_completed(self, sc):
- sst = self.get_sst(sc)
- for r in sc.commands:
- sst.acknowledged.add(r.lower, r.upper)
-
- if not sc.commands.empty():
- while sst.min_completion in sc.commands:
- if sst.actions.has_key(sst.min_completion):
- sst.actions.pop(sst.min_completion)()
- sst.min_completion += 1
-
- def session_known_completed(self, kcmp):
- sst = self.get_sst(kcmp)
- executed = RangedSet()
- for e in sst.executed.ranges:
- for ke in kcmp.ranges:
- if e.lower in ke and e.upper in ke:
- break
- else:
- executed.add_range(e)
- sst.executed = completed
-
- def do_session_flush(self, sf):
- sst = self.get_sst(sf)
- if sf.expected:
- if sst.received is None:
- exp = None
- else:
- exp = RangedSet(sst.received)
- sst.write_op(SessionExpected(exp))
- if sf.confirmed:
- sst.write_op(SessionConfirmed(sst.executed))
- if sf.completed:
- sst.write_op(SessionCompleted(sst.executed))
-
- def do_session_request_timeout(self, rt):
- sst = self.get_sst(rt)
- sst.write_op(SessionTimeout(timeout=0))
-
- def do_execution_result(self, er):
- sst = self.get_sst(er)
- sst.results[er.command_id] = er.value
- sst.executed.add(er.id)
-
- def do_execution_exception(self, ex):
- sst = self.get_sst(ex)
- exc = SESSION_ERRS.get(ex.error_code, SessionError)
- sst.session.error = exc(ex.error_code, ex.description)
-
- def dispatch(self):
- if not self.connection._connected and not self._closing and self._status != CLOSED:
- self.disconnect()
-
- if self._connected and not self._closing:
- for ssn in self.connection.sessions.values():
- self.attach(ssn)
- self.process(ssn)
-
- if self.connection.heartbeat and self._status != CLOSED:
- now = time.time()
- if self._last_in is not None and \
- now - self._last_in > 2*self.connection.heartbeat:
- raise HeartbeatTimeout(text="heartbeat timeout")
- if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0:
- self.write_op(ConnectionHeartbeat())
-
- def open(self):
- self._reset()
- self._status = OPEN
- self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10)
-
- def disconnect(self):
- self.write_op(ConnectionClose(close_code.normal))
- self._closing = True
-
- def attach(self, ssn):
- sst = self._attachments.get(ssn)
- if sst is None and not ssn.closed:
- for i in xrange(0, self.channel_max):
- if not self._sessions.has_key(i):
- ch = i
- break
- else:
- raise RuntimeError("all channels used")
- sst = SessionState(self, ssn, ssn.name, ch)
- sst.write_op(SessionAttach(name=ssn.name))
- sst.write_op(SessionCommandPoint(sst.sent, 0))
- sst.outgoing_idx = 0
- sst.acked = []
- sst.acked_idx = 0
- if ssn.transactional:
- sst.write_cmd(TxSelect())
- self._attachments[ssn] = sst
- self._sessions[sst.channel] = sst
-
- for snd in ssn.senders:
- self.link(snd, self._out, snd.target)
- for rcv in ssn.receivers:
- self.link(rcv, self._in, rcv.source)
-
- if sst is not None and ssn.closing and not sst.detached:
- sst.detached = True
- sst.write_op(SessionDetach(name=ssn.name))
-
- def get_sst(self, op):
- return self._sessions[op.channel]
-
- def do_session_detached(self, dtc):
- sst = self._sessions.pop(dtc.channel)
- ssn = sst.session
- del self._attachments[ssn]
- ssn.closed = True
-
- def do_session_detach(self, dtc):
- sst = self.get_sst(dtc)
- sst.write_op(SessionDetached(name=dtc.name))
- self.do_session_detached(dtc)
-
- def link(self, lnk, dir, addr):
- sst = self._attachments.get(lnk.session)
- _lnk = self._attachments.get(lnk)
-
- if _lnk is None and not lnk.closed:
- _lnk = Attachment(lnk)
- _lnk.closing = False
- dir.init_link(sst, lnk, _lnk)
-
- err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
- if err:
- lnk.error = err
- lnk.closed = True
- return
-
- def linked():
- lnk.linked = True
-
- def resolved(type, subtype):
- dir.do_link(sst, lnk, _lnk, type, subtype, linked)
-
- self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
- self._attachments[lnk] = _lnk
-
- if lnk.linked and lnk.closing and not lnk.closed:
- if not _lnk.closing:
- def unlinked():
- dir.del_link(sst, lnk, _lnk)
- del self._attachments[lnk]
- lnk.closed = True
- if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
- dir.do_unlink(sst, lnk, _lnk)
- self.delete(sst, _lnk.name, unlinked)
- else:
- dir.do_unlink(sst, lnk, _lnk, unlinked)
- _lnk.closing = True
- elif not lnk.linked and lnk.closing and not lnk.closed:
- if lnk.error: lnk.closed = True
-
- def parse_address(self, lnk, dir, addr):
- if addr is None:
- return MalformedAddress(text="%s is None" % dir.ADDR_NAME)
- else:
- try:
- lnk.name, lnk.subject, lnk.options = address.parse(addr)
- # XXX: subject
- if lnk.options is None:
- lnk.options = {}
- except address.LexError, e:
- return MalformedAddress(text=str(e))
- except address.ParseError, e:
- return MalformedAddress(text=str(e))
-
- def validate_options(self, lnk, dir):
- ctx = Context()
- err = dir.VALIDATOR.validate(lnk.options, ctx)
- if err: return InvalidOption(text="error in options: %s" % err)
-
- def resolve_declare(self, sst, lnk, dir, action):
- declare = lnk.options.get("create") in ("always", dir)
- assrt = lnk.options.get("assert") in ("always", dir)
- def do_resolved(type, subtype):
- err = None
- if type is None:
- if declare:
- err = self.declare(sst, lnk, action)
- else:
- err = NotFound(text="no such queue: %s" % lnk.name)
- else:
- if assrt:
- expected = lnk.options.get("node", {}).get("type")
- if expected and type != expected:
- err = AssertionFailed(text="expected %s, got %s" % (expected, type))
- if err is None:
- action(type, subtype)
-
- if err:
- tgt = lnk.target
- tgt.error = err
- del self._attachments[tgt]
- tgt.closed = True
- return
- self.resolve(sst, lnk.name, do_resolved, force=declare)
-
- def resolve(self, sst, name, action, force=False):
- if not force:
- try:
- type, subtype = self.address_cache[name]
- action(type, subtype)
- return
- except KeyError:
- pass
-
- args = []
- def do_result(r):
- args.append(r)
- def do_action(r):
- do_result(r)
- er, qr = args
- if er.not_found and not qr.queue:
- type, subtype = None, None
- elif qr.queue:
- type, subtype = "queue", None
- else:
- type, subtype = "topic", er.type
- if type is not None:
- self.address_cache[name] = (type, subtype)
- action(type, subtype)
- sst.write_query(ExchangeQuery(name), do_result)
- sst.write_query(QueueQuery(name), do_action)
-
- def declare(self, sst, lnk, action):
- name = lnk.name
- props = lnk.options.get("node", {})
- durable = props.get("durable", DURABLE_DEFAULT)
- type = props.get("type", "queue")
- declare = props.get("x-declare", {})
-
- if type == "topic":
- cmd = ExchangeDeclare(exchange=name, durable=durable)
- bindings = get_bindings(props, exchange=name)
- elif type == "queue":
- cmd = QueueDeclare(queue=name, durable=durable)
- bindings = get_bindings(props, queue=name)
- else:
- raise ValueError(type)
-
- sst.apply_overrides(cmd, declare)
-
- if type == "topic":
- if cmd.type is None:
- cmd.type = "topic"
- subtype = cmd.type
- else:
- subtype = None
-
- cmds = [cmd]
- cmds.extend(bindings)
-
- def declared():
- self.address_cache[name] = (type, subtype)
- action(type, subtype)
-
- sst.write_cmds(cmds, declared)
-
- def delete(self, sst, name, action):
- def deleted():
- del self.address_cache[name]
- action()
-
- def do_delete(type, subtype):
- if type == "topic":
- sst.write_cmd(ExchangeDelete(name), deleted)
- elif type == "queue":
- sst.write_cmd(QueueDelete(name), deleted)
- elif type is None:
- action()
- else:
- raise ValueError(type)
- self.resolve(sst, name, do_delete, force=True)
-
- def process(self, ssn):
- if ssn.closed or ssn.closing: return
-
- sst = self._attachments[ssn]
-
- while sst.outgoing_idx < len(ssn.outgoing):
- msg = ssn.outgoing[sst.outgoing_idx]
- snd = msg._sender
- # XXX: should check for sender error here
- _snd = self._attachments.get(snd)
- if _snd and snd.linked:
- self.send(snd, msg)
- sst.outgoing_idx += 1
- else:
- break
-
- for snd in ssn.senders:
- # XXX: should included snd.acked in this
- if snd.synced >= snd.queued and sst.need_sync:
- sst.write_cmd(ExecutionSync(), sync_noop)
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- if ssn.acked:
- messages = ssn.acked[sst.acked_idx:]
- if messages:
- ids = RangedSet()
-
- disposed = [(DEFAULT_DISPOSITION, [])]
- acked = []
- for m in messages:
- # XXX: we're ignoring acks that get lost when disconnected,
- # could we deal this via some message-id based purge?
- if m._transfer_id is None:
- acked.append(m)
- continue
- ids.add(m._transfer_id)
- if m._receiver._accept_mode is accept_mode.explicit:
- disp = m._disposition or DEFAULT_DISPOSITION
- last, msgs = disposed[-1]
- if disp.type is last.type and disp.options == last.options:
- msgs.append(m)
- else:
- disposed.append((disp, [m]))
- else:
- acked.append(m)
-
- for range in ids:
- sst.executed.add_range(range)
- sst.write_op(SessionCompleted(sst.executed))
-
- def ack_acker(msgs):
- def ack_ack():
- for m in msgs:
- ssn.acked.remove(m)
- sst.acked_idx -= 1
- # XXX: should this check accept_mode too?
- if not ssn.transactional:
- sst.acked.remove(m)
- return ack_ack
-
- for disp, msgs in disposed:
- if not msgs: continue
- if disp.type is None:
- op = MessageAccept
- elif disp.type is RELEASED:
- op = MessageRelease
- elif disp.type is REJECTED:
- op = MessageReject
- sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]),
- **disp.options),
- ack_acker(msgs))
- if log.isEnabledFor(DEBUG):
- for m in msgs:
- log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition)
-
- sst.acked.extend(messages)
- sst.acked_idx += len(messages)
- ack_acker(acked)()
-
- if ssn.committing and not sst.committing:
- def commit_ok():
- del sst.acked[:]
- ssn.committing = False
- ssn.committed = True
- ssn.aborting = False
- ssn.aborted = False
- sst.committing = False
- sst.write_cmd(TxCommit(), commit_ok)
- sst.committing = True
-
- if ssn.aborting and not sst.aborting:
- sst.aborting = True
- def do_rb():
- messages = sst.acked + ssn.unacked + ssn.incoming
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- sst.executed.add_range(range)
- sst.write_op(SessionCompleted(sst.executed))
- sst.write_cmd(MessageRelease(ids, True))
- sst.write_cmd(TxRollback(), do_rb_ok)
-
- def do_rb_ok():
- del ssn.incoming[:]
- del ssn.unacked[:]
- del sst.acked[:]
-
- for rcv in ssn.receivers:
- rcv.impending = rcv.received
- rcv.returned = rcv.received
- # XXX: do we need to update granted here as well?
-
- for rcv in ssn.receivers:
- self.process_receiver(rcv)
-
- ssn.aborting = False
- ssn.aborted = True
- ssn.committing = False
- ssn.committed = False
- sst.aborting = False
-
- for rcv in ssn.receivers:
- _rcv = self._attachments[rcv]
- sst.write_cmd(MessageStop(_rcv.destination))
- sst.write_cmd(ExecutionSync(), do_rb)
-
- def grant(self, rcv):
- sst = self._attachments[rcv.session]
- _rcv = self._attachments.get(rcv)
- if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining:
- return
-
- if rcv.granted is UNLIMITED:
- if rcv.impending is UNLIMITED:
- delta = 0
- else:
- delta = UNLIMITED
- elif rcv.impending is UNLIMITED:
- delta = -1
- else:
- delta = max(rcv.granted, rcv.received) - rcv.impending
-
- if delta is UNLIMITED:
- if not _rcv.bytes_open:
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
- _rcv.bytes_open = True
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value))
- rcv.impending = UNLIMITED
- elif delta > 0:
- if not _rcv.bytes_open:
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value))
- _rcv.bytes_open = True
- sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta))
- rcv.impending += delta
- elif delta < 0 and not rcv.draining:
- _rcv.draining = True
- def do_stop():
- rcv.impending = rcv.received
- _rcv.draining = False
- _rcv.bytes_open = False
- self.grant(rcv)
- sst.write_cmd(MessageStop(_rcv.destination), do_stop)
-
- if rcv.draining:
- _rcv.draining = True
- def do_flush():
- rcv.impending = rcv.received
- rcv.granted = rcv.impending
- _rcv.draining = False
- _rcv.bytes_open = False
- rcv.draining = False
- sst.write_cmd(MessageFlush(_rcv.destination), do_flush)
-
-
- def process_receiver(self, rcv):
- if rcv.closed: return
- self.grant(rcv)
-
- def send(self, snd, msg):
- sst = self._attachments[snd.session]
- _snd = self._attachments[snd]
-
- if msg.subject is None or _snd._exchange == "":
- rk = _snd._routing_key
- else:
- rk = msg.subject
-
- if msg.subject is None:
- subject = _snd.subject
- else:
- subject = msg.subject
-
- # XXX: do we need to query to figure out how to create the reply-to interoperably?
- if msg.reply_to:
- rt = addr2reply_to(msg.reply_to)
- else:
- rt = None
- content_encoding = msg.properties.get("x-amqp-0-10.content-encoding")
- dp = DeliveryProperties(routing_key=rk)
- mp = MessageProperties(message_id=msg.id,
- user_id=msg.user_id,
- reply_to=rt,
- correlation_id=msg.correlation_id,
- app_id = msg.properties.get("x-amqp-0-10.app-id"),
- content_type=msg.content_type,
- content_encoding=content_encoding,
- application_headers=msg.properties)
- if subject is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers[SUBJECT] = subject
- if msg.durable is not None:
- if msg.durable:
- dp.delivery_mode = delivery_mode.persistent
- else:
- dp.delivery_mode = delivery_mode.non_persistent
- if msg.priority is not None:
- dp.priority = msg.priority
- if msg.ttl is not None:
- dp.ttl = long(msg.ttl*1000)
- enc, dec = get_codec(msg.content_type)
- body = enc(msg.content)
-
- # XXX: this is not safe for out of order, can this be triggered by pre_ack?
- def msg_acked():
- # XXX: should we log the ack somehow too?
- snd.acked += 1
- m = snd.session.outgoing.pop(0)
- sst.outgoing_idx -= 1
- log.debug("RACK[%s]: %s", sst.session.log_id, msg)
- assert msg == m
-
- xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp),
- payload=body)
-
- if _snd.pre_ack:
- sst.write_cmd(xfr)
- else:
- sst.write_cmd(xfr, msg_acked, sync=msg._sync)
-
- log.debug("SENT[%s]: %s", sst.session.log_id, msg)
-
- if _snd.pre_ack:
- msg_acked()
-
- def do_message_transfer(self, xfr):
- sst = self.get_sst(xfr)
- ssn = sst.session
-
- msg = self._decode(xfr)
- rcv = sst.destinations[xfr.destination].target
- msg._receiver = rcv
- if rcv.impending is not UNLIMITED:
- assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending)
- rcv.received += 1
- log.debug("RCVD[%s]: %s", ssn.log_id, msg)
- ssn.incoming.append(msg)
-
- def _decode(self, xfr):
- dp = EMPTY_DP
- mp = EMPTY_MP
-
- for h in xfr.headers:
- if isinstance(h, DeliveryProperties):
- dp = h
- elif isinstance(h, MessageProperties):
- mp = h
-
- ap = mp.application_headers
- enc, dec = get_codec(mp.content_type)
- content = dec(xfr.payload)
- msg = Message(content)
- msg.id = mp.message_id
- if ap is not None:
- msg.subject = ap.get(SUBJECT)
- msg.user_id = mp.user_id
- if mp.reply_to is not None:
- msg.reply_to = reply_to2addr(mp.reply_to)
- msg.correlation_id = mp.correlation_id
- if dp.delivery_mode is not None:
- msg.durable = dp.delivery_mode == delivery_mode.persistent
- msg.priority = dp.priority
- if dp.ttl is not None:
- msg.ttl = dp.ttl/1000.0
- msg.redelivered = dp.redelivered
- msg.properties = mp.application_headers or {}
- if mp.app_id is not None:
- msg.properties["x-amqp-0-10.app-id"] = mp.app_id
- if mp.content_encoding is not None:
- msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding
- if dp.routing_key is not None:
- msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key
- msg.content_type = mp.content_type
- msg._transfer_id = xfr.id
- return msg
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
deleted file mode 100644
index 338ac70ecf..0000000000
--- a/python/qpid/messaging/endpoints.py
+++ /dev/null
@@ -1,1046 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-A candidate high level messaging API for python.
-
-Areas that still need work:
-
- - definition of the arguments for L{Session.sender} and L{Session.receiver}
- - standard L{Message} properties
- - L{Message} content encoding
- - protocol negotiation/multiprotocol impl
-"""
-
-from logging import getLogger
-from math import ceil
-from qpid.codec010 import StringCodec
-from qpid.concurrency import synchronized, Waiter, Condition
-from qpid.datatypes import Serial, uuid4
-from qpid.messaging.constants import *
-from qpid.messaging.exceptions import *
-from qpid.messaging.message import *
-from qpid.ops import PRIMITIVE
-from qpid.util import default, URL
-from threading import Thread, RLock
-
-log = getLogger("qpid.messaging")
-
-static = staticmethod
-
-class Endpoint:
-
- def _ecwait(self, predicate, timeout=None):
- result = self._ewait(lambda: self.closed or predicate(), timeout)
- self.check_closed()
- return result
-
-class Connection(Endpoint):
-
- """
- A Connection manages a group of L{Sessions<Session>} and connects
- them with a remote endpoint.
- """
-
- @static
- def establish(url=None, **options):
- """
- Constructs a L{Connection} with the supplied parameters and opens
- it.
- """
- conn = Connection(url, **options)
- conn.open()
- return conn
-
- def __init__(self, url=None, **options):
- """
- Creates a connection. A newly created connection must be connected
- with the Connection.connect() method before it can be used.
-
- @type url: str
- @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
- @type host: str
- @param host: the name or ip address of the remote host (overriden by url)
- @type port: int
- @param port: the port number of the remote host (overriden by url)
- @type transport: str
- @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
- @type heartbeat: int
- @param heartbeat: heartbeat interval in seconds
-
- @type username: str
- @param username: the username for authentication (overriden by url)
- @type password: str
- @param password: the password for authentication (overriden by url)
-
- @type sasl_mechanisms: str
- @param sasl_mechanisms: space separated list of permitted sasl mechanisms
- @type sasl_service: str
- @param sasl_service: ???
- @type sasl_min_ssf: ???
- @param sasl_min_ssf: ???
- @type sasl_max_ssf: ???
- @param sasl_max_ssf: ???
-
- @type reconnect: bool
- @param reconnect: enable/disable automatic reconnect
- @type reconnect_timeout: float
- @param reconnect_timeout: total time to attempt reconnect
- @type reconnect_internal_min: float
- @param reconnect_internal_min: minimum interval between reconnect attempts
- @type reconnect_internal_max: float
- @param reconnect_internal_max: maximum interval between reconnect attempts
- @type reconnect_internal: float
- @param reconnect_interval: set both min and max reconnect intervals
- @type reconnect_limit: int
- @param reconnect_limit: limit the total number of reconnect attempts
- @type reconnect_urls: list[str]
- @param reconnect_urls: list of backup hosts specified as urls
-
- @type address_ttl: float
- @param address_ttl: time until cached address resolution expires
-
- @rtype: Connection
- @return: a disconnected Connection
- """
- if url is None:
- url = options.get("host")
- if isinstance(url, basestring):
- url = URL(url)
- self.host = url.host
- if options.has_key("transport"):
- self.transport = options.get("transport")
- elif url.scheme == url.AMQP:
- self.transport = "tcp"
- elif url.scheme == url.AMQPS:
- self.transport = "ssl"
- else:
- self.transport = "tcp"
- if self.transport in ("ssl", "tcp+tls"):
- self.port = default(url.port, options.get("port", AMQPS_PORT))
- else:
- self.port = default(url.port, options.get("port", AMQP_PORT))
- self.heartbeat = options.get("heartbeat")
- self.username = default(url.user, options.get("username", None))
- self.password = default(url.password, options.get("password", None))
- self.auth_username = None
-
- self.sasl_mechanisms = options.get("sasl_mechanisms")
- self.sasl_service = options.get("sasl_service", "qpidd")
- self.sasl_min_ssf = options.get("sasl_min_ssf")
- self.sasl_max_ssf = options.get("sasl_max_ssf")
-
- self.reconnect = options.get("reconnect", False)
- self.reconnect_timeout = options.get("reconnect_timeout")
- reconnect_interval = options.get("reconnect_interval")
- self.reconnect_interval_min = options.get("reconnect_interval_min",
- default(reconnect_interval, 1))
- self.reconnect_interval_max = options.get("reconnect_interval_max",
- default(reconnect_interval, 2*60))
- self.reconnect_limit = options.get("reconnect_limit")
- self.reconnect_urls = options.get("reconnect_urls", [])
- self.reconnect_log = options.get("reconnect_log", True)
-
- self.address_ttl = options.get("address_ttl", 60)
- self.tcp_nodelay = options.get("tcp_nodelay", False)
-
- self.options = options
-
-
- self.id = str(uuid4())
- self.session_counter = 0
- self.sessions = {}
- self._open = False
- self._connected = False
- self._transport_connected = False
- self._lock = RLock()
- self._condition = Condition(self._lock)
- self._waiter = Waiter(self._condition)
- self._modcount = Serial(0)
- self.error = None
- from driver import Driver
- self._driver = Driver(self)
-
- def _wait(self, predicate, timeout=None):
- return self._waiter.wait(predicate, timeout=timeout)
-
- def _wakeup(self):
- self._modcount += 1
- self._driver.wakeup()
-
- def check_error(self):
- if self.error:
- self._condition.gc()
- raise self.error
-
- def get_error(self):
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self._wait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if not self._connected:
- self._condition.gc()
- raise ConnectionClosed()
-
- @synchronized
- def session(self, name=None, transactional=False):
- """
- Creates or retrieves the named session. If the name is omitted or
- None, then a unique name is chosen based on a randomly generated
- uuid.
-
- @type name: str
- @param name: the session name
- @rtype: Session
- @return: the named Session
- """
-
- if name is None:
- name = "%s:%s" % (self.id, self.session_counter)
- self.session_counter += 1
- else:
- name = "%s:%s" % (self.id, name)
-
- if self.sessions.has_key(name):
- return self.sessions[name]
- else:
- ssn = Session(self, name, transactional)
- self.sessions[name] = ssn
- self._wakeup()
- return ssn
-
- @synchronized
- def _remove_session(self, ssn):
- self.sessions.pop(ssn.name, 0)
-
- @synchronized
- def open(self):
- """
- Opens a connection.
- """
- if self._open:
- raise ConnectionError("already open")
- self._open = True
- self.attach()
-
- @synchronized
- def opened(self):
- """
- Return true if the connection is open, false otherwise.
- """
- return self._open
-
- @synchronized
- def attach(self):
- """
- Attach to the remote endpoint.
- """
- if not self._connected:
- self._connected = True
- self._driver.start()
- self._wakeup()
- self._ewait(lambda: self._transport_connected and not self._unlinked())
-
- def _unlinked(self):
- return [l
- for ssn in self.sessions.values()
- if not (ssn.error or ssn.closed)
- for l in ssn.senders + ssn.receivers
- if not (l.linked or l.error or l.closed)]
-
- @synchronized
- def detach(self, timeout=None):
- """
- Detach from the remote endpoint.
- """
- if self._connected:
- self._connected = False
- self._wakeup()
- cleanup = True
- else:
- cleanup = False
- try:
- if not self._wait(lambda: not self._transport_connected, timeout=timeout):
- raise Timeout("detach timed out")
- finally:
- if cleanup:
- self._driver.stop()
- self._condition.gc()
-
- @synchronized
- def attached(self):
- """
- Return true if the connection is attached, false otherwise.
- """
- return self._connected
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the connection and all sessions.
- """
- try:
- for ssn in self.sessions.values():
- ssn.close(timeout=timeout)
- finally:
- self.detach(timeout=timeout)
- self._open = False
-
-class Session(Endpoint):
-
- """
- Sessions provide a linear context for sending and receiving
- L{Messages<Message>}. L{Messages<Message>} are sent and received
- using the L{Sender.send} and L{Receiver.fetch} methods of the
- L{Sender} and L{Receiver} objects associated with a Session.
-
- Each L{Sender} and L{Receiver} is created by supplying either a
- target or source address to the L{sender} and L{receiver} methods of
- the Session. The address is supplied via a string syntax documented
- below.
-
- Addresses
- =========
-
- An address identifies a source or target for messages. In its
- simplest form this is just a name. In general a target address may
- also be used as a source address, however not all source addresses
- may be used as a target, e.g. a source might additionally have some
- filtering criteria that would not be present in a target.
-
- A subject may optionally be specified along with the name. When an
- address is used as a target, any subject specified in the address is
- used as the default subject of outgoing messages for that target.
- When an address is used as a source, any subject specified in the
- address is pattern matched against the subject of available messages
- as a filter for incoming messages from that source.
-
- The options map contains additional information about the address
- including:
-
- - policies for automatically creating, and deleting the node to
- which an address refers
-
- - policies for asserting facts about the node to which an address
- refers
-
- - extension points that can be used for sender/receiver
- configuration
-
- Mapping to AMQP 0-10
- --------------------
- The name is resolved to either an exchange or a queue by querying
- the broker.
-
- The subject is set as a property on the message. Additionally, if
- the name refers to an exchange, the routing key is set to the
- subject.
-
- Syntax
- ------
- The following regular expressions define the tokens used to parse
- addresses::
- LBRACE: \\{
- RBRACE: \\}
- LBRACK: \\[
- RBRACK: \\]
- COLON: :
- SEMI: ;
- SLASH: /
- COMMA: ,
- NUMBER: [+-]?[0-9]*\\.?[0-9]+
- ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
- STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
- ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
- SYM: [.#*%@$^!+-]
- WSPACE: [ \\n\\r\\t]+
-
- The formal grammar for addresses is given below::
- address = name [ "/" subject ] [ ";" options ]
- name = ( part | quoted )+
- subject = ( part | quoted | "/" )*
- quoted = STRING / ESC
- part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
- options = map
- map = "{" ( keyval ( "," keyval )* )? "}"
- keyval = ID ":" value
- value = NUMBER / STRING / ID / map / list
- list = "[" ( value ( "," value )* )? "]"
-
- This grammar resuls in the following informal syntax::
-
- <name> [ / <subject> ] [ ; <options> ]
-
- Where options is::
-
- { <key> : <value>, ... }
-
- And values may be:
- - numbers
- - single, double, or non quoted strings
- - maps (dictionaries)
- - lists
-
- Options
- -------
- The options map permits the following parameters::
-
- <name> [ / <subject> ] ; {
- create: always | sender | receiver | never,
- delete: always | sender | receiver | never,
- assert: always | sender | receiver | never,
- mode: browse | consume,
- node: {
- type: queue | topic,
- durable: True | False,
- x-declare: { ... <declare-overrides> ... },
- x-bindings: [<binding_1>, ... <binding_n>]
- },
- link: {
- name: <link-name>,
- durable: True | False,
- reliability: unreliable | at-most-once | at-least-once | exactly-once,
- x-declare: { ... <declare-overrides> ... },
- x-bindings: [<binding_1>, ... <binding_n>],
- x-subscribe: { ... <subscribe-overrides> ... }
- }
- }
-
- Bindings are specified as a map with the following options::
-
- {
- exchange: <exchange>,
- queue: <queue>,
- key: <key>,
- arguments: <arguments>
- }
-
- The create, delete, and assert policies specify who should perfom
- the associated action:
-
- - I{always}: the action will always be performed
- - I{sender}: the action will only be performed by the sender
- - I{receiver}: the action will only be performed by the receiver
- - I{never}: the action will never be performed (this is the default)
-
- The node-type is one of:
-
- - I{topic}: a topic node will default to the topic exchange,
- x-declare may be used to specify other exchange types
- - I{queue}: this is the default node-type
-
- The x-declare map permits protocol specific keys and values to be
- specified when exchanges or queues are declared. These keys and
- values are passed through when creating a node or asserting facts
- about an existing node.
-
- Examples
- --------
- A simple name resolves to any named node, usually a queue or a
- topic::
-
- my-queue-or-topic
-
- A simple name with a subject will also resolve to a node, but the
- presence of the subject will cause a sender using this address to
- set the subject on outgoing messages, and receivers to filter based
- on the subject::
-
- my-queue-or-topic/my-subject
-
- A subject pattern can be used and will cause filtering if used by
- the receiver. If used for a sender, the literal value gets set as
- the subject::
-
- my-queue-or-topic/my-*
-
- In all the above cases, the address is resolved to an existing node.
- If you want the node to be auto-created, then you can do the
- following. By default nonexistent nodes are assumed to be queues::
-
- my-queue; {create: always}
-
- You can customize the properties of the queue::
-
- my-queue; {create: always, node: {durable: True}}
-
- You can create a topic instead if you want::
-
- my-queue; {create: always, node: {type: topic}}
-
- You can assert that the address resolves to a node with particular
- properties::
-
- my-transient-topic; {
- assert: always,
- node: {
- type: topic,
- durable: False
- }
- }
- """
-
- def __init__(self, connection, name, transactional):
- self.connection = connection
- self.name = name
- self.log_id = "%x" % id(self)
-
- self.transactional = transactional
-
- self.committing = False
- self.committed = True
- self.aborting = False
- self.aborted = False
-
- self.next_sender_id = 0
- self.senders = []
- self.next_receiver_id = 0
- self.receivers = []
- self.outgoing = []
- self.incoming = []
- self.unacked = []
- self.acked = []
- # XXX: I hate this name.
- self.ack_capacity = UNLIMITED
-
- self.error = None
- self.closing = False
- self.closed = False
-
- self._lock = connection._lock
-
- def __repr__(self):
- return "<Session %s>" % self.name
-
- def _wait(self, predicate, timeout=None):
- return self.connection._wait(predicate, timeout=timeout)
-
- def _wakeup(self):
- self.connection._wakeup()
-
- def check_error(self):
- self.connection.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.connection.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.connection._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise SessionClosed()
-
- @synchronized
- def sender(self, target, **options):
- """
- Creates a L{Sender} that may be used to send L{Messages<Message>}
- to the specified target.
-
- @type target: str
- @param target: the target to which messages will be sent
- @rtype: Sender
- @return: a new Sender for the specified target
- """
- target = _mangle(target)
- sender = Sender(self, self.next_sender_id, target, options)
- self.next_sender_id += 1
- self.senders.append(sender)
- if not self.closed and self.connection._connected:
- self._wakeup()
- try:
- sender._ewait(lambda: sender.linked)
- except LinkError, e:
- sender.close()
- raise e
- return sender
-
- @synchronized
- def receiver(self, source, **options):
- """
- Creates a receiver that may be used to fetch L{Messages<Message>}
- from the specified source.
-
- @type source: str
- @param source: the source of L{Messages<Message>}
- @rtype: Receiver
- @return: a new Receiver for the specified source
- """
- source = _mangle(source)
- receiver = Receiver(self, self.next_receiver_id, source, options)
- self.next_receiver_id += 1
- self.receivers.append(receiver)
- if not self.closed and self.connection._connected:
- self._wakeup()
- try:
- receiver._ewait(lambda: receiver.linked)
- except LinkError, e:
- receiver.close()
- raise e
- return receiver
-
- @synchronized
- def _count(self, predicate):
- result = 0
- for msg in self.incoming:
- if predicate(msg):
- result += 1
- return result
-
- def _peek(self, receiver):
- for msg in self.incoming:
- if msg._receiver == receiver:
- return msg
-
- def _pop(self, receiver):
- i = 0
- while i < len(self.incoming):
- msg = self.incoming[i]
- if msg._receiver == receiver:
- del self.incoming[i]
- return msg
- else:
- i += 1
-
- @synchronized
- def _get(self, receiver, timeout=None):
- if self._ewait(lambda: ((self._peek(receiver) is not None) or
- self.closing or receiver.closed),
- timeout):
- msg = self._pop(receiver)
- if msg is not None:
- msg._receiver.returned += 1
- self.unacked.append(msg)
- log.debug("RETR[%s]: %s", self.log_id, msg)
- return msg
- return None
-
- @synchronized
- def next_receiver(self, timeout=None):
- if self._ecwait(lambda: self.incoming, timeout):
- return self.incoming[0]._receiver
- else:
- raise Empty
-
- @synchronized
- def acknowledge(self, message=None, disposition=None, sync=True):
- """
- Acknowledge the given L{Message}. If message is None, then all
- unacknowledged messages on the session are acknowledged.
-
- @type message: Message
- @param message: the message to acknowledge or None
- @type sync: boolean
- @param sync: if true then block until the message(s) are acknowledged
- """
- if message is None:
- messages = self.unacked[:]
- else:
- messages = [message]
-
- for m in messages:
- if self.ack_capacity is not UNLIMITED:
- if self.ack_capacity <= 0:
- # XXX: this is currently a SendError, maybe it should be a SessionError?
- raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
- self._wakeup()
- self._ecwait(lambda: len(self.acked) < self.ack_capacity)
- m._disposition = disposition
- self.unacked.remove(m)
- self.acked.append(m)
-
- self._wakeup()
- if sync:
- self._ecwait(lambda: not [m for m in messages if m in self.acked])
-
- @synchronized
- def commit(self):
- """
- Commit outstanding transactional work. This consists of all
- message sends and receives since the prior commit or rollback.
- """
- if not self.transactional:
- raise NontransactionalSession()
- self.committing = True
- self._wakeup()
- self._ecwait(lambda: not self.committing)
- if self.aborted:
- raise TransactionAborted()
- assert self.committed
-
- @synchronized
- def rollback(self):
- """
- Rollback outstanding transactional work. This consists of all
- message sends and receives since the prior commit or rollback.
- """
- if not self.transactional:
- raise NontransactionalSession()
- self.aborting = True
- self._wakeup()
- self._ecwait(lambda: not self.aborting)
- assert self.aborted
-
- @synchronized
- def sync(self, timeout=None):
- """
- Sync the session.
- """
- for snd in self.senders:
- snd.sync(timeout=timeout)
- if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
- raise Timeout("session sync timed out")
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the session.
- """
- self.sync(timeout=timeout)
-
- for link in self.receivers + self.senders:
- link.close(timeout=timeout)
-
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("session close timed out")
- finally:
- self.connection._remove_session(self)
-
-def _mangle(addr):
- if addr and addr.startswith("#"):
- return str(uuid4()) + addr
- else:
- return addr
-
-class Sender(Endpoint):
-
- """
- Sends outgoing messages.
- """
-
- def __init__(self, session, id, target, options):
- self.session = session
- self.id = id
- self.target = target
- self.options = options
- self.capacity = options.get("capacity", UNLIMITED)
- self.threshold = 0.5
- self.durable = options.get("durable")
- self.queued = Serial(0)
- self.synced = Serial(0)
- self.acked = Serial(0)
- self.error = None
- self.linked = False
- self.closing = False
- self.closed = False
- self._lock = self.session._lock
-
- def _wakeup(self):
- self.session._wakeup()
-
- def check_error(self):
- self.session.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.session.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.session._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise LinkClosed()
-
- @synchronized
- def unsettled(self):
- """
- Returns the number of messages awaiting acknowledgment.
- @rtype: int
- @return: the number of unacknowledged messages
- """
- return self.queued - self.acked
-
- @synchronized
- def available(self):
- if self.capacity is UNLIMITED:
- return UNLIMITED
- else:
- return self.capacity - self.unsettled()
-
- @synchronized
- def send(self, object, sync=True, timeout=None):
- """
- Send a message. If the object passed in is of type L{unicode},
- L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
- L{Message} and sent. If it is of type L{Message}, it will be sent
- directly. If the sender capacity is not L{UNLIMITED} then send
- will block until there is available capacity to send the message.
- If the timeout parameter is specified, then send will throw an
- L{InsufficientCapacity} exception if capacity does not become
- available within the specified time.
-
- @type object: unicode, str, list, dict, Message
- @param object: the message or content to send
-
- @type sync: boolean
- @param sync: if true then block until the message is sent
-
- @type timeout: float
- @param timeout: the time to wait for available capacity
- """
-
- if not self.session.connection._connected or self.session.closing:
- raise Detached()
-
- self._ecwait(lambda: self.linked)
-
- if isinstance(object, Message):
- message = object
- else:
- message = Message(object)
-
- if message.durable is None:
- message.durable = self.durable
-
- if self.capacity is not UNLIMITED:
- if self.capacity <= 0:
- raise InsufficientCapacity("capacity = %s" % self.capacity)
- if not self._ecwait(self.available, timeout=timeout):
- raise InsufficientCapacity("capacity = %s" % self.capacity)
-
- # XXX: what if we send the same message to multiple senders?
- message._sender = self
- if self.capacity is not UNLIMITED:
- message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
- else:
- message._sync = sync
- self.session.outgoing.append(message)
- self.queued += 1
-
- if sync:
- self.sync()
- assert message not in self.session.outgoing
- else:
- self._wakeup()
-
- @synchronized
- def sync(self, timeout=None):
- mno = self.queued
- if self.synced < mno:
- self.synced = mno
- self._wakeup()
- if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
- raise Timeout("sender sync timed out")
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the Sender.
- """
- # avoid erroring out when closing a sender that was never
- # established
- if self.acked < self.queued:
- self.sync(timeout=timeout)
-
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self.session._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("sender close timed out")
- finally:
- try:
- self.session.senders.remove(self)
- except ValueError:
- pass
-
-class Receiver(Endpoint, object):
-
- """
- Receives incoming messages from a remote source. Messages may be
- fetched with L{fetch}.
- """
-
- def __init__(self, session, id, source, options):
- self.session = session
- self.id = id
- self.source = source
- self.options = options
-
- self.granted = Serial(0)
- self.draining = False
- self.impending = Serial(0)
- self.received = Serial(0)
- self.returned = Serial(0)
-
- self.error = None
- self.linked = False
- self.closing = False
- self.closed = False
- self._lock = self.session._lock
- self._capacity = 0
- self._set_capacity(options.get("capacity", 0), False)
- self.threshold = 0.5
-
- @synchronized
- def _set_capacity(self, c, wakeup=True):
- if c is UNLIMITED:
- self._capacity = c.value
- else:
- self._capacity = c
- self._grant()
- if wakeup:
- self._wakeup()
-
- def _get_capacity(self):
- if self._capacity == UNLIMITED.value:
- return UNLIMITED
- else:
- return self._capacity
-
- capacity = property(_get_capacity, _set_capacity)
-
- def _wakeup(self):
- self.session._wakeup()
-
- def check_error(self):
- self.session.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.session.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.session._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise LinkClosed()
-
- @synchronized
- def unsettled(self):
- """
- Returns the number of acknowledged messages awaiting confirmation.
- """
- return len([m for m in self.acked if m._receiver is self])
-
- @synchronized
- def available(self):
- """
- Returns the number of messages available to be fetched by the
- application.
-
- @rtype: int
- @return: the number of available messages
- """
- return self.received - self.returned
-
- @synchronized
- def fetch(self, timeout=None):
- """
- Fetch and return a single message. A timeout of None will block
- forever waiting for a message to arrive, a timeout of zero will
- return immediately if no messages are available.
-
- @type timeout: float
- @param timeout: the time to wait for a message to be available
- """
-
- self._ecwait(lambda: self.linked)
-
- if self._capacity == 0:
- self.granted = self.returned + 1
- self._wakeup()
- self._ecwait(lambda: self.impending >= self.granted)
- msg = self.session._get(self, timeout=timeout)
- if msg is None:
- self.check_closed()
- self.draining = True
- self._wakeup()
- self._ecwait(lambda: not self.draining)
- msg = self.session._get(self, timeout=0)
- self._grant()
- self._wakeup()
- if msg is None:
- raise Empty()
- elif self._capacity not in (0, UNLIMITED.value):
- t = int(ceil(self.threshold * self._capacity))
- if self.received - self.returned <= t:
- self.granted = self.returned + self._capacity
- self._wakeup()
- return msg
-
- def _grant(self):
- if self._capacity == UNLIMITED.value:
- self.granted = UNLIMITED
- else:
- self.granted = self.returned + self._capacity
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the receiver.
- """
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self.session._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("receiver close timed out")
- finally:
- try:
- self.session.receivers.remove(self)
- except ValueError:
- pass
-
-__all__ = ["Connection", "Session", "Sender", "Receiver"]
diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py
deleted file mode 100644
index 0296d615d9..0000000000
--- a/python/qpid/messaging/exceptions.py
+++ /dev/null
@@ -1,156 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-class Timeout(Exception):
- pass
-
-## Messaging Errors
-
-class MessagingError(Exception):
-
- def __init__(self, code=None, text=None, **info):
- self.code = code
- self.text = text
- self.info = info
- if self.code is None:
- msg = self.text
- else:
- msg = "%s(%s)" % (self.text, self.code)
- if info:
- msg += " " + ", ".join(["%s=%r" % (k, v) for k, v in self.info.items()])
- Exception.__init__(self, msg)
-
-class InternalError(MessagingError):
- pass
-
-## Connection Errors
-
-class ConnectionError(MessagingError):
- """
- The base class for all connection related exceptions.
- """
- pass
-
-class ConnectError(ConnectionError):
- """
- Exception raised when there is an error connecting to the remote
- peer.
- """
- pass
-
-class VersionError(ConnectError):
- pass
-
-class AuthenticationFailure(ConnectError):
- pass
-
-class ConnectionClosed(ConnectionError):
- pass
-
-class HeartbeatTimeout(ConnectionError):
- pass
-
-## Session Errors
-
-class SessionError(MessagingError):
- pass
-
-class Detached(SessionError):
- """
- Exception raised when an operation is attempted that is illegal when
- detached.
- """
- pass
-
-class NontransactionalSession(SessionError):
- """
- Exception raised when commit or rollback is attempted on a non
- transactional session.
- """
- pass
-
-class TransactionError(SessionError):
- pass
-
-class TransactionAborted(TransactionError):
- pass
-
-class UnauthorizedAccess(SessionError):
- pass
-
-class ServerError(SessionError):
- pass
-
-class SessionClosed(SessionError):
- pass
-
-## Link Errors
-
-class LinkError(MessagingError):
- pass
-
-class InsufficientCapacity(LinkError):
- pass
-
-class AddressError(LinkError):
- pass
-
-class MalformedAddress(AddressError):
- pass
-
-class InvalidOption(AddressError):
- pass
-
-class ResolutionError(AddressError):
- pass
-
-class AssertionFailed(ResolutionError):
- pass
-
-class NotFound(ResolutionError):
- pass
-
-class LinkClosed(LinkError):
- pass
-
-## Sender Errors
-
-class SenderError(LinkError):
- pass
-
-class SendError(SenderError):
- pass
-
-class TargetCapacityExceeded(SendError):
- pass
-
-## Receiver Errors
-
-class ReceiverError(LinkError):
- pass
-
-class FetchError(ReceiverError):
- pass
-
-class Empty(FetchError):
- """
- Exception raised by L{Receiver.fetch} when there is no message
- available within the alloted time.
- """
- pass
diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py
deleted file mode 100644
index b70b365c16..0000000000
--- a/python/qpid/messaging/message.py
+++ /dev/null
@@ -1,173 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from qpid.codec010 import StringCodec
-from qpid.ops import PRIMITIVE
-
-def codec(name):
- type = PRIMITIVE[name]
-
- def encode(x):
- sc = StringCodec()
- sc.write_primitive(type, x)
- return sc.encoded
-
- def decode(x):
- sc = StringCodec(x)
- return sc.read_primitive(type)
-
- return encode, decode
-
-# XXX: need to correctly parse the mime type and deal with
-# content-encoding header
-
-TYPE_MAPPINGS={
- dict: "amqp/map",
- list: "amqp/list",
- unicode: "text/plain; charset=utf8",
- unicode: "text/plain",
- buffer: None,
- str: None,
- None.__class__: None
- }
-
-DEFAULT_CODEC = (lambda x: x, lambda x: x)
-
-def encode_text_plain(x):
- if x is None:
- return None
- else:
- return x.encode("utf8")
-
-def decode_text_plain(x):
- if x is None:
- return None
- else:
- return x.decode("utf8")
-
-TYPE_CODEC={
- "amqp/map": codec("map"),
- "amqp/list": codec("list"),
- "text/plain; charset=utf8": (encode_text_plain, decode_text_plain),
- "text/plain": (encode_text_plain, decode_text_plain),
- "": DEFAULT_CODEC,
- None: DEFAULT_CODEC
- }
-
-def get_type(content):
- return TYPE_MAPPINGS[content.__class__]
-
-def get_codec(content_type):
- return TYPE_CODEC.get(content_type, DEFAULT_CODEC)
-
-UNSPECIFIED = object()
-
-class Message:
-
- """
- A message consists of a standard set of fields, an application
- defined set of properties, and some content.
-
- @type id: str
- @ivar id: the message id
- @type subject: str
- @ivar subject: message subject
- @type user_id: str
- @ivar user_id: the user-id of the message producer
- @type reply_to: str
- @ivar reply_to: the address to send replies
- @type correlation_id: str
- @ivar correlation_id: a correlation-id for the message
- @type durable: bool
- @ivar durable: message durability
- @type priority: int
- @ivar priority: message priority
- @type ttl: float
- @ivar ttl: time-to-live measured in seconds
- @type properties: dict
- @ivar properties: application specific message properties
- @type content_type: str
- @ivar content_type: the content-type of the message
- @type content: str, unicode, buffer, dict, list
- @ivar content: the message content
- """
-
- def __init__(self, content=None, content_type=UNSPECIFIED, id=None,
- subject=None, user_id=None, reply_to=None, correlation_id=None,
- durable=None, priority=None, ttl=None, properties=None):
- """
- Construct a new message with the supplied content. The
- content-type of the message will be automatically inferred from
- type of the content parameter.
-
- @type content: str, unicode, buffer, dict, list
- @param content: the message content
-
- @type content_type: str
- @param content_type: the content-type of the message
- """
- self.id = id
- self.subject = subject
- self.user_id = user_id
- self.reply_to = reply_to
- self.correlation_id = correlation_id
- self.durable = durable
- self.priority = priority
- self.ttl = ttl
- self.redelivered = False
- if properties is None:
- self.properties = {}
- else:
- self.properties = properties
- if content_type is UNSPECIFIED:
- self.content_type = get_type(content)
- else:
- self.content_type = content_type
- self.content = content
-
- def __repr__(self):
- args = []
- for name in ["id", "subject", "user_id", "reply_to", "correlation_id",
- "priority", "ttl"]:
- value = self.__dict__[name]
- if value is not None: args.append("%s=%r" % (name, value))
- for name in ["durable", "redelivered", "properties"]:
- value = self.__dict__[name]
- if value: args.append("%s=%r" % (name, value))
- if self.content_type != get_type(self.content):
- args.append("content_type=%r" % self.content_type)
- if self.content is not None:
- if args:
- args.append("content=%r" % self.content)
- else:
- args.append(repr(self.content))
- return "Message(%s)" % ", ".join(args)
-
-class Disposition:
-
- def __init__(self, type, **options):
- self.type = type
- self.options = options
-
- def __repr__(self):
- args = [str(self.type)] + \
- ["%s=%r" % (k, v) for k, v in self.options.items()]
- return "Disposition(%s)" % ", ".join(args)
-
-__all__ = ["Message", "Disposition"]
diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py
deleted file mode 100644
index 7abaae12e8..0000000000
--- a/python/qpid/messaging/transports.py
+++ /dev/null
@@ -1,116 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import socket
-from qpid.util import connect
-
-TRANSPORTS = {}
-
-class SocketTransport:
-
- def __init__(self, conn, host, port):
- self.socket = connect(host, port)
- if conn.tcp_nodelay:
- self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-
- def fileno(self):
- return self.socket.fileno()
-
-class tcp(SocketTransport):
-
- def reading(self, reading):
- return reading
-
- def writing(self, writing):
- return writing
-
- def send(self, bytes):
- return self.socket.send(bytes)
-
- def recv(self, n):
- return self.socket.recv(n)
-
- def close(self):
- self.socket.close()
-
-TRANSPORTS["tcp"] = tcp
-
-try:
- from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \
- SSL_ERROR_WANT_WRITE
-except ImportError:
- pass
-else:
- class tls(SocketTransport):
-
- def __init__(self, conn, host, port):
- SocketTransport.__init__(self, conn, host, port)
- self.tls = wrap_socket(self.socket)
- self.socket.setblocking(0)
- self.state = None
-
- def reading(self, reading):
- if self.state is None:
- return reading
- else:
- return self.state == SSL_ERROR_WANT_READ
-
- def writing(self, writing):
- if self.state is None:
- return writing
- else:
- return self.state == SSL_ERROR_WANT_WRITE
-
- def send(self, bytes):
- self._clear_state()
- try:
- return self.tls.write(bytes)
- except SSLError, e:
- if self._update_state(e.args[0]):
- return 0
- else:
- raise
-
- def recv(self, n):
- self._clear_state()
- try:
- return self.tls.read(n)
- except SSLError, e:
- if self._update_state(e.args[0]):
- return None
- else:
- raise
-
- def _clear_state(self):
- self.state = None
-
- def _update_state(self, code):
- if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE):
- self.state = code
- return True
- else:
- return False
-
- def close(self):
- self.socket.setblocking(1)
- # this closes the underlying socket
- self.tls.close()
-
- TRANSPORTS["ssl"] = tls
- TRANSPORTS["tcp+tls"] = tls
diff --git a/python/qpid/messaging/util.py b/python/qpid/messaging/util.py
deleted file mode 100644
index 265cf7d51f..0000000000
--- a/python/qpid/messaging/util.py
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Add-on utilities for the L{qpid.messaging} API.
-"""
-
-from qpid.messaging import *
-from logging import getLogger
-from threading import Thread
-
-log = getLogger("qpid.messaging.util")
-
-def auto_fetch_reconnect_urls(conn):
- ssn = conn.session("auto-fetch-reconnect-urls")
- rcv = ssn.receiver("amq.failover")
- rcv.capacity = 10
-
- def main():
- while True:
- try:
- msg = rcv.fetch()
- except LinkClosed:
- return
- set_reconnect_urls(conn, msg)
- ssn.acknowledge(msg, sync=False)
-
- thread = Thread(name="auto-fetch-reconnect-urls", target=main)
- thread.setDaemon(True)
- thread.start()
-
-
-def set_reconnect_urls(conn, msg):
- reconnect_urls = []
- urls = msg.properties["amq.failover"]
- for u in urls:
- if u.startswith("amqp:"):
- for p in u[5:].split(","):
- parts = p.split(":")
- host, port = parts[1:3]
- reconnect_urls.append("%s:%s" % (host, port))
- conn.reconnect_urls = reconnect_urls
- log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls)
-
-__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"]