diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /python/qpid/messaging | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging')
-rw-r--r-- | python/qpid/messaging/__init__.py | 35 | ||||
-rw-r--r-- | python/qpid/messaging/address.py | 172 | ||||
-rw-r--r-- | python/qpid/messaging/constants.py | 40 | ||||
-rw-r--r-- | python/qpid/messaging/driver.py | 1329 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 1046 | ||||
-rw-r--r-- | python/qpid/messaging/exceptions.py | 156 | ||||
-rw-r--r-- | python/qpid/messaging/message.py | 173 | ||||
-rw-r--r-- | python/qpid/messaging/transports.py | 116 | ||||
-rw-r--r-- | python/qpid/messaging/util.py | 61 |
9 files changed, 0 insertions, 3128 deletions
diff --git a/python/qpid/messaging/__init__.py b/python/qpid/messaging/__init__.py deleted file mode 100644 index f9ddda2e80..0000000000 --- a/python/qpid/messaging/__init__.py +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A candidate high level messaging API for python. - -Areas that still need work: - - - definition of the arguments for L{Session.sender} and L{Session.receiver} - - standard L{Message} properties - - L{Message} content encoding - - protocol negotiation/multiprotocol impl -""" - -from qpid.datatypes import timestamp, uuid4, Serial -from qpid.messaging.constants import * -from qpid.messaging.endpoints import * -from qpid.messaging.exceptions import * -from qpid.messaging.message import * diff --git a/python/qpid/messaging/address.py b/python/qpid/messaging/address.py deleted file mode 100644 index e423f09193..0000000000 --- a/python/qpid/messaging/address.py +++ /dev/null @@ -1,172 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import re -from qpid.lexer import Lexicon, LexError -from qpid.parser import Parser, ParseError - -l = Lexicon() - -LBRACE = l.define("LBRACE", r"\{") -RBRACE = l.define("RBRACE", r"\}") -LBRACK = l.define("LBRACK", r"\[") -RBRACK = l.define("RBRACK", r"\]") -COLON = l.define("COLON", r":") -SEMI = l.define("SEMI", r";") -SLASH = l.define("SLASH", r"/") -COMMA = l.define("COMMA", r",") -NUMBER = l.define("NUMBER", r'[+-]?[0-9]*\.?[0-9]+') -ID = l.define("ID", r'[a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?') -STRING = l.define("STRING", r""""(?:[^\\"]|\\.)*"|'(?:[^\\']|\\.)*'""") -ESC = l.define("ESC", r"\\[^ux]|\\x[0-9a-fA-F][0-9a-fA-F]|\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]") -SYM = l.define("SYM", r"[.#*%@$^!+-]") -WSPACE = l.define("WSPACE", r"[ \n\r\t]+") -EOF = l.eof("EOF") - -LEXER = l.compile() - -def lex(st): - return LEXER.lex(st) - -def tok2str(tok): - if tok.type is STRING: - return eval(tok.value) - elif tok.type is ESC: - if tok.value[1] == "x": - return eval('"%s"' % tok.value) - elif tok.value[1] == "u": - return eval('u"%s"' % tok.value) - else: - return tok.value[1] - else: - return tok.value - -CONSTANTS = { - "True": True, - "true": True, - "False": False, - "false": False, - "None": None - } - -def tok2obj(tok): - if tok.type == ID: - return CONSTANTS.get(tok.value, tok.value) - elif tok.type in (STRING, NUMBER): - return eval(tok.value) - else: - return tok.value - -def toks2str(toks): - if toks: - return "".join(map(tok2str, toks)) - else: - return None - -class AddressParser(Parser): - - def __init__(self, tokens): - Parser.__init__(self, [t for t in tokens if t.type is not WSPACE]) - - def parse(self): - result = self.address() - self.eat(EOF) - return result - - def address(self): - name = toks2str(self.eat_until(SLASH, SEMI, EOF)) - - if name is None: - raise ParseError(self.next()) - - if self.matches(SLASH): - self.eat(SLASH) - subject = toks2str(self.eat_until(SEMI, EOF)) - else: - subject = None - - if self.matches(SEMI): - self.eat(SEMI) - options = self.map() - else: - options = None - return name, subject, options - - def map(self): - self.eat(LBRACE) - - result = {} - while True: - if self.matches(NUMBER, STRING, ID, LBRACE, LBRACK): - n, v = self.keyval() - result[n] = v - if self.matches(COMMA): - self.eat(COMMA) - elif self.matches(RBRACE): - break - else: - raise ParseError(self.next(), COMMA, RBRACE) - elif self.matches(RBRACE): - break - else: - raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK, - RBRACE) - - self.eat(RBRACE) - return result - - def keyval(self): - key = self.value() - self.eat(COLON) - val = self.value() - return (key, val) - - def value(self): - if self.matches(NUMBER, STRING, ID): - return tok2obj(self.eat()) - elif self.matches(LBRACE): - return self.map() - elif self.matches(LBRACK): - return self.list() - else: - raise ParseError(self.next(), NUMBER, STRING, ID, LBRACE, LBRACK) - - def list(self): - self.eat(LBRACK) - - result = [] - - while True: - if self.matches(RBRACK): - break - else: - result.append(self.value()) - if self.matches(COMMA): - self.eat(COMMA) - elif self.matches(RBRACK): - break - else: - raise ParseError(self.next(), COMMA, RBRACK) - - self.eat(RBRACK) - return result - -def parse(addr): - return AddressParser(lex(addr)).parse() - -__all__ = ["parse", "ParseError"] diff --git a/python/qpid/messaging/constants.py b/python/qpid/messaging/constants.py deleted file mode 100644 index f230c4def8..0000000000 --- a/python/qpid/messaging/constants.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -__SELF__ = object() - -class Constant: - - def __init__(self, name, value=__SELF__): - self.name = name - if value is __SELF__: - self.value = self - else: - self.value = value - - def __repr__(self): - return self.name - -AMQP_PORT = 5672 -AMQPS_PORT = 5671 - -UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) - -REJECTED = Constant("REJECTED") -RELEASED = Constant("RELEASED") diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py deleted file mode 100644 index 78af2827df..0000000000 --- a/python/qpid/messaging/driver.py +++ /dev/null @@ -1,1329 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import socket, struct, sys, time -from logging import getLogger, DEBUG -from qpid import compat -from qpid import sasl -from qpid.concurrency import synchronized -from qpid.datatypes import RangedSet, Serial -from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ - FrameDecoder, SegmentDecoder, OpDecoder -from qpid.messaging import address, transports -from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED -from qpid.messaging.exceptions import * -from qpid.messaging.message import get_codec, Disposition, Message -from qpid.ops import * -from qpid.selector import Selector -from qpid.util import URL, default -from qpid.validator import And, Context, List, Map, Types, Values -from threading import Condition, Thread - -log = getLogger("qpid.messaging") -rawlog = getLogger("qpid.messaging.io.raw") -opslog = getLogger("qpid.messaging.io.ops") - -def addr2reply_to(addr): - name, subject, options = address.parse(addr) - if options: - type = options.get("node", {}).get("type") - else: - type = None - - if type == "topic": - return ReplyTo(name, subject) - else: - return ReplyTo(None, name) - -def reply_to2addr(reply_to): - if reply_to.exchange in (None, ""): - return reply_to.routing_key - elif reply_to.routing_key is None: - return "%s; {node: {type: topic}}" % reply_to.exchange - else: - return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key) - -class Attachment: - - def __init__(self, target): - self.target = target - -# XXX - -DURABLE_DEFAULT=False - -# XXX - -class Pattern: - """ - The pattern filter matches the supplied wildcard pattern against a - message subject. - """ - - def __init__(self, value): - self.value = value - - # XXX: this should become part of the driver - def _bind(self, sst, exchange, queue): - from qpid.ops import ExchangeBind - - sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#"))) - -SUBJECT_DEFAULTS = { - "topic": "#" - } - -# XXX -ppid = 0 -try: - ppid = os.getppid() -except: - pass - -CLIENT_PROPERTIES = {"product": "qpid python client", - "version": "development", - "platform": os.name, - "qpid.client_process": os.path.basename(sys.argv[0]), - "qpid.client_pid": os.getpid(), - "qpid.client_ppid": ppid} - -def noop(): pass -def sync_noop(): pass - -class SessionState: - - def __init__(self, driver, session, name, channel): - self.driver = driver - self.session = session - self.name = name - self.channel = channel - self.detached = False - self.committing = False - self.aborting = False - - # sender state - self.sent = Serial(0) - self.acknowledged = RangedSet() - self.actions = {} - self.min_completion = self.sent - self.max_completion = self.sent - self.results = {} - self.need_sync = False - - # receiver state - self.received = None - self.executed = RangedSet() - - # XXX: need to periodically exchange completion/known_completion - - self.destinations = {} - - def write_query(self, query, handler): - id = self.sent - self.write_cmd(query, lambda: handler(self.results.pop(id))) - - def apply_overrides(self, cmd, overrides): - for k, v in overrides.items(): - cmd[k.replace('-', '_')] = v - - def write_cmd(self, cmd, action=noop, overrides=None, sync=True): - if overrides: - self.apply_overrides(cmd, overrides) - - if action != noop: - cmd.sync = sync - if self.detached: - raise Exception("detached") - cmd.id = self.sent - self.sent += 1 - self.actions[cmd.id] = action - self.max_completion = cmd.id - self.write_op(cmd) - self.need_sync = not cmd.sync - - def write_cmds(self, cmds, action=noop): - if cmds: - for cmd in cmds[:-1]: - self.write_cmd(cmd) - self.write_cmd(cmds[-1], action) - else: - action() - - def write_op(self, op): - op.channel = self.channel - self.driver.write_op(op) - -POLICIES = Values("always", "sender", "receiver", "never") -RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", - "exactly-once") - -DECLARE = Map({}, restricted=False) -BINDINGS = List(Map({ - "exchange": Types(basestring), - "queue": Types(basestring), - "key": Types(basestring), - "arguments": Map({}, restricted=False) - })) - -COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-declare": DECLARE, - "x-bindings": BINDINGS - }), - "link": Map({ - "name": Types(basestring), - "durable": Types(bool), - "reliability": RELIABILITY, - "x-declare": DECLARE, - "x-bindings": BINDINGS, - "x-subscribe": Map({}, restricted=False) - }) - } - -RECEIVE_MODES = Values("browse", "consume") - -SOURCE_OPTS = COMMON_OPTS.copy() -SOURCE_OPTS.update({ - "mode": RECEIVE_MODES - }) - -TARGET_OPTS = COMMON_OPTS.copy() - -class LinkIn: - - ADDR_NAME = "source" - DIR_NAME = "receiver" - VALIDATOR = Map(SOURCE_OPTS) - - def init_link(self, sst, rcv, _rcv): - _rcv.destination = str(rcv.id) - sst.destinations[_rcv.destination] = _rcv - _rcv.draining = False - _rcv.bytes_open = False - _rcv.on_unlink = [] - - def do_link(self, sst, rcv, _rcv, type, subtype, action): - link_opts = _rcv.options.get("link", {}) - reliability = link_opts.get("reliability", "at-least-once") - declare = link_opts.get("x-declare", {}) - subscribe = link_opts.get("x-subscribe", {}) - acq_mode = acquire_mode.pre_acquired - if reliability in ("unreliable", "at-most-once"): - rcv._accept_mode = accept_mode.none - else: - rcv._accept_mode = accept_mode.explicit - - if type == "topic": - default_name = "%s.%s" % (rcv.session.name, _rcv.destination) - _rcv._queue = link_opts.get("name", default_name) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, - durable=link_opts.get("durable", False), - exclusive=True, - auto_delete=(reliability == "unreliable")), - overrides=declare) - _rcv.on_unlink = [QueueDelete(_rcv._queue)] - subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) - bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) - if not bindings: - sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) - - elif type == "queue": - _rcv._queue = _rcv.name - if _rcv.options.get("mode", "consume") == "browse": - acq_mode = acquire_mode.not_acquired - bindings = get_bindings(link_opts, queue=_rcv._queue) - - - sst.write_cmds(bindings) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, - destination=_rcv.destination, - acquire_mode = acq_mode, - accept_mode = rcv._accept_mode), - overrides=subscribe) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) - - def do_unlink(self, sst, rcv, _rcv, action=noop): - link_opts = _rcv.options.get("link", {}) - reliability = link_opts.get("reliability") - cmds = [MessageCancel(_rcv.destination)] - cmds.extend(_rcv.on_unlink) - sst.write_cmds(cmds, action) - - def del_link(self, sst, rcv, _rcv): - del sst.destinations[_rcv.destination] - -class LinkOut: - - ADDR_NAME = "target" - DIR_NAME = "sender" - VALIDATOR = Map(TARGET_OPTS) - - def init_link(self, sst, snd, _snd): - _snd.closing = False - _snd.pre_ack = False - - def do_link(self, sst, snd, _snd, type, subtype, action): - link_opts = _snd.options.get("link", {}) - reliability = link_opts.get("reliability", "at-least-once") - _snd.pre_ack = reliability in ("unreliable", "at-most-once") - if type == "topic": - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) - elif type == "queue": - _snd._exchange = "" - _snd._routing_key = _snd.name - bindings = get_bindings(link_opts, queue=_snd.name) - sst.write_cmds(bindings, action) - - def do_unlink(self, sst, snd, _snd, action=noop): - action() - - def del_link(self, sst, snd, _snd): - pass - -class Cache: - - def __init__(self, ttl): - self.ttl = ttl - self.entries = {} - - def __setitem__(self, key, value): - self.entries[key] = time.time(), value - - def __getitem__(self, key): - tstamp, value = self.entries[key] - if time.time() - tstamp >= self.ttl: - del self.entries[key] - raise KeyError(key) - else: - return value - - def __delitem__(self, key): - del self.entries[key] - -# XXX -HEADER="!4s4B" - -EMPTY_DP = DeliveryProperties() -EMPTY_MP = MessageProperties() - -SUBJECT = "qpid.subject" - -CLOSED = "CLOSED" -READ_ONLY = "READ_ONLY" -WRITE_ONLY = "WRITE_ONLY" -OPEN = "OPEN" - -class Driver: - - def __init__(self, connection): - self.connection = connection - self.log_id = "%x" % id(self.connection) - self._lock = self.connection._lock - - self._selector = Selector.default() - self._attempts = 0 - self._delay = self.connection.reconnect_interval_min - self._reconnect_log = self.connection.reconnect_log - self._host = 0 - self._retrying = False - self._next_retry = None - self._transport = None - - self._timeout = None - - self.engine = None - - def _next_host(self): - urls = [URL(u) for u in self.connection.reconnect_urls] - hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ - [(u.host, default(u.port, 5672)) for u in urls] - if self._host >= len(hosts): - self._host = 0 - result = hosts[self._host] - if self._host == 0: - self._attempts += 1 - self._host = self._host + 1 - return result - - def _num_hosts(self): - return len(self.connection.reconnect_urls) + 1 - - @synchronized - def wakeup(self): - self.dispatch() - self._selector.wakeup() - - def start(self): - self._selector.register(self) - - def stop(self): - self._selector.unregister(self) - if self._transport: - self.st_closed() - - def fileno(self): - return self._transport.fileno() - - @synchronized - def reading(self): - return self._transport is not None and \ - self._transport.reading(True) - - @synchronized - def writing(self): - return self._transport is not None and \ - self._transport.writing(self.engine.pending()) - - @synchronized - def timing(self): - return self._timeout - - @synchronized - def readable(self): - try: - data = self._transport.recv(64*1024) - if data is None: - return - elif data: - rawlog.debug("READ[%s]: %r", self.log_id, data) - self.engine.write(data) - else: - self.close_engine() - except socket.error, e: - self.close_engine(ConnectionError(text=str(e))) - - self.update_status() - - self._notify() - - def _notify(self): - if self.connection.error: - self.connection._condition.gc() - self.connection._waiter.notifyAll() - - def close_engine(self, e=None): - if e is None: - e = ConnectionError(text="connection aborted") - - if (self.connection.reconnect and - (self.connection.reconnect_limit is None or - self.connection.reconnect_limit <= 0 or - self._attempts <= self.connection.reconnect_limit)): - if self._host < self._num_hosts(): - delay = 0 - else: - delay = self._delay - self._delay = min(2*self._delay, - self.connection.reconnect_interval_max) - self._next_retry = time.time() + delay - if self._reconnect_log: - log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) - if delay > 0: - log.warn("sleeping %s seconds" % delay) - self._retrying = True - self.engine.close() - else: - self.engine.close(e) - - self.schedule() - - def update_status(self): - status = self.engine.status() - return getattr(self, "st_%s" % status.lower())() - - def st_closed(self): - # XXX: this log statement seems to sometimes hit when the socket is not connected - # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._transport.close() - self._transport = None - self.engine = None - return True - - def st_open(self): - return False - - @synchronized - def writeable(self): - notify = False - try: - n = self._transport.send(self.engine.peek()) - if n == 0: return - sent = self.engine.read(n) - rawlog.debug("SENT[%s]: %r", self.log_id, sent) - except socket.error, e: - self.close_engine(e) - notify = True - - if self.update_status() or notify: - self._notify() - - @synchronized - def timeout(self): - self.dispatch() - self._notify() - self.schedule() - - def schedule(self): - times = [] - if self.connection.heartbeat: - times.append(time.time() + self.connection.heartbeat) - if self._next_retry: - times.append(self._next_retry) - if times: - self._timeout = min(times) - else: - self._timeout = None - - def dispatch(self): - try: - if self._transport is None: - if self.connection._connected and not self.connection.error: - self.connect() - else: - self.engine.dispatch() - except HeartbeatTimeout, e: - self.close_engine(e) - except: - # XXX: Does socket get leaked if this occurs? - msg = compat.format_exc() - self.connection.error = InternalError(text=msg) - - def connect(self): - if self._retrying and time.time() < self._next_retry: - return - - try: - # XXX: should make this non blocking - host, port = self._next_host() - if self._retrying and self._reconnect_log: - log.warn("trying: %s:%s", host, port) - self.engine = Engine(self.connection) - self.engine.open() - rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - trans = transports.TRANSPORTS.get(self.connection.transport) - if trans: - self._transport = trans(self.connection, host, port) - else: - raise ConnectError("no such transport: %s" % self.connection.transport) - if self._retrying and self._reconnect_log: - log.warn("reconnect succeeded: %s:%s", host, port) - self._next_retry = None - self._attempts = 0 - self._host = 0 - self._delay = self.connection.reconnect_interval_min - self._retrying = False - self.schedule() - except socket.error, e: - self.close_engine(ConnectError(text=str(e))) - -DEFAULT_DISPOSITION = Disposition(None) - -def get_bindings(opts, queue=None, exchange=None, key=None): - bindings = opts.get("x-bindings", []) - cmds = [] - for b in bindings: - exchange = b.get("exchange", exchange) - queue = b.get("queue", queue) - key = b.get("key", key) - args = b.get("arguments", {}) - cmds.append(ExchangeBind(queue, exchange, key, args)) - return cmds - -CONNECTION_ERRS = { - # anythong not here (i.e. everything right now) will default to - # connection error - } - -SESSION_ERRS = { - # anything not here will default to session error - error_code.unauthorized_access: UnauthorizedAccess, - error_code.not_found: NotFound, - error_code.resource_locked: ReceiverError, - error_code.resource_limit_exceeded: TargetCapacityExceeded, - error_code.internal_error: ServerError - } - -class Engine: - - def __init__(self, connection): - self.connection = connection - self.log_id = "%x" % id(self.connection) - self._closing = False - self._connected = False - self._attachments = {} - - self._in = LinkIn() - self._out = LinkOut() - - self._channel_max = 65536 - self._channels = 0 - self._sessions = {} - - self.address_cache = Cache(self.connection.address_ttl) - - self._status = CLOSED - self._buf = "" - self._hdr = "" - self._last_in = None - self._last_out = None - self._op_enc = OpEncoder() - self._seg_enc = SegmentEncoder() - self._frame_enc = FrameEncoder() - self._frame_dec = FrameDecoder() - self._seg_dec = SegmentDecoder() - self._op_dec = OpDecoder() - - self._sasl = sasl.Client() - if self.connection.username: - self._sasl.setAttr("username", self.connection.username) - if self.connection.password: - self._sasl.setAttr("password", self.connection.password) - if self.connection.host: - self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", self.connection.sasl_service) - if self.connection.sasl_min_ssf is not None: - self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) - if self.connection.sasl_max_ssf is not None: - self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) - self._sasl.init() - self._sasl_encode = False - self._sasl_decode = False - - def _reset(self): - self.connection._transport_connected = False - - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for snd in ssn.senders: - snd.linked = False - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.linked = False - - def status(self): - return self._status - - def write(self, data): - self._last_in = time.time() - try: - if self._sasl_decode: - data = self._sasl.decode(data) - - if len(self._hdr) < 8: - r = 8 - len(self._hdr) - self._hdr += data[:r] - data = data[r:] - - if len(self._hdr) == 8: - self.do_header(self._hdr) - - self._frame_dec.write(data) - self._seg_dec.write(*self._frame_dec.read()) - self._op_dec.write(*self._seg_dec.read()) - for op in self._op_dec.read(): - self.assign_id(op) - opslog.debug("RCVD[%s]: %r", self.log_id, op) - op.dispatch(self) - self.dispatch() - except MessagingError, e: - self.close(e) - except: - self.close(InternalError(text=compat.format_exc())) - - def close(self, e=None): - self._reset() - if e: - self.connection.error = e - self._status = CLOSED - - def assign_id(self, op): - if isinstance(op, Command): - sst = self.get_sst(op) - op.id = sst.received - sst.received += 1 - - def pending(self): - return len(self._buf) - - def read(self, n): - result = self._buf[:n] - self._buf = self._buf[n:] - return result - - def peek(self): - return self._buf - - def write_op(self, op): - opslog.debug("SENT[%s]: %r", self.log_id, op) - self._op_enc.write(op) - self._seg_enc.write(*self._op_enc.read()) - self._frame_enc.write(*self._seg_enc.read()) - bytes = self._frame_enc.read() - if self._sasl_encode: - bytes = self._sasl.encode(bytes) - self._buf += bytes - self._last_out = time.time() - - def do_header(self, hdr): - cli_major = 0; cli_minor = 10 - magic, _, _, major, minor = struct.unpack(HEADER, hdr) - if major != cli_major or minor != cli_minor: - raise VersionError(text="client: %s-%s, server: %s-%s" % - (cli_major, cli_minor, major, minor)) - - def do_connection_start(self, start): - if self.connection.sasl_mechanisms: - permitted = self.connection.sasl_mechanisms.split() - mechs = [m for m in start.mechanisms if m in permitted] - else: - mechs = start.mechanisms - try: - mech, initial = self._sasl.start(" ".join(mechs)) - except sasl.SASLError, e: - raise AuthenticationFailure(text=str(e)) - self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, - mechanism=mech, response=initial)) - - def do_connection_secure(self, secure): - resp = self._sasl.step(secure.challenge) - self.write_op(ConnectionSecureOk(response=resp)) - - def do_connection_tune(self, tune): - # XXX: is heartbeat protocol specific? - if tune.channel_max is not None: - self.channel_max = tune.channel_max - self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, - channel_max=self.channel_max)) - self.write_op(ConnectionOpen()) - self._sasl_encode = True - - def do_connection_open_ok(self, open_ok): - self.connection.auth_username = self._sasl.auth_username() - self._connected = True - self._sasl_decode = True - self.connection._transport_connected = True - - def do_connection_heartbeat(self, hrt): - pass - - def do_connection_close(self, close): - self.write_op(ConnectionCloseOk()) - if close.reply_code != close_code.normal: - exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) - self.connection.error = exc(close.reply_code, close.reply_text) - # XXX: should we do a half shutdown on the socket here? - # XXX: we really need to test this, we may end up reporting a - # connection abort after this, if we were to do a shutdown on read - # and stop reading, then we wouldn't report the abort, that's - # probably the right thing to do - - def do_connection_close_ok(self, close_ok): - self.close() - - def do_session_attached(self, atc): - pass - - def do_session_command_point(self, cp): - sst = self.get_sst(cp) - sst.received = cp.command_id - - def do_session_completed(self, sc): - sst = self.get_sst(sc) - for r in sc.commands: - sst.acknowledged.add(r.lower, r.upper) - - if not sc.commands.empty(): - while sst.min_completion in sc.commands: - if sst.actions.has_key(sst.min_completion): - sst.actions.pop(sst.min_completion)() - sst.min_completion += 1 - - def session_known_completed(self, kcmp): - sst = self.get_sst(kcmp) - executed = RangedSet() - for e in sst.executed.ranges: - for ke in kcmp.ranges: - if e.lower in ke and e.upper in ke: - break - else: - executed.add_range(e) - sst.executed = completed - - def do_session_flush(self, sf): - sst = self.get_sst(sf) - if sf.expected: - if sst.received is None: - exp = None - else: - exp = RangedSet(sst.received) - sst.write_op(SessionExpected(exp)) - if sf.confirmed: - sst.write_op(SessionConfirmed(sst.executed)) - if sf.completed: - sst.write_op(SessionCompleted(sst.executed)) - - def do_session_request_timeout(self, rt): - sst = self.get_sst(rt) - sst.write_op(SessionTimeout(timeout=0)) - - def do_execution_result(self, er): - sst = self.get_sst(er) - sst.results[er.command_id] = er.value - sst.executed.add(er.id) - - def do_execution_exception(self, ex): - sst = self.get_sst(ex) - exc = SESSION_ERRS.get(ex.error_code, SessionError) - sst.session.error = exc(ex.error_code, ex.description) - - def dispatch(self): - if not self.connection._connected and not self._closing and self._status != CLOSED: - self.disconnect() - - if self._connected and not self._closing: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) - - if self.connection.heartbeat and self._status != CLOSED: - now = time.time() - if self._last_in is not None and \ - now - self._last_in > 2*self.connection.heartbeat: - raise HeartbeatTimeout(text="heartbeat timeout") - if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: - self.write_op(ConnectionHeartbeat()) - - def open(self): - self._reset() - self._status = OPEN - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - - def disconnect(self): - self.write_op(ConnectionClose(close_code.normal)) - self._closing = True - - def attach(self, ssn): - sst = self._attachments.get(ssn) - if sst is None and not ssn.closed: - for i in xrange(0, self.channel_max): - if not self._sessions.has_key(i): - ch = i - break - else: - raise RuntimeError("all channels used") - sst = SessionState(self, ssn, ssn.name, ch) - sst.write_op(SessionAttach(name=ssn.name)) - sst.write_op(SessionCommandPoint(sst.sent, 0)) - sst.outgoing_idx = 0 - sst.acked = [] - sst.acked_idx = 0 - if ssn.transactional: - sst.write_cmd(TxSelect()) - self._attachments[ssn] = sst - self._sessions[sst.channel] = sst - - for snd in ssn.senders: - self.link(snd, self._out, snd.target) - for rcv in ssn.receivers: - self.link(rcv, self._in, rcv.source) - - if sst is not None and ssn.closing and not sst.detached: - sst.detached = True - sst.write_op(SessionDetach(name=ssn.name)) - - def get_sst(self, op): - return self._sessions[op.channel] - - def do_session_detached(self, dtc): - sst = self._sessions.pop(dtc.channel) - ssn = sst.session - del self._attachments[ssn] - ssn.closed = True - - def do_session_detach(self, dtc): - sst = self.get_sst(dtc) - sst.write_op(SessionDetached(name=dtc.name)) - self.do_session_detached(dtc) - - def link(self, lnk, dir, addr): - sst = self._attachments.get(lnk.session) - _lnk = self._attachments.get(lnk) - - if _lnk is None and not lnk.closed: - _lnk = Attachment(lnk) - _lnk.closing = False - dir.init_link(sst, lnk, _lnk) - - err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) - if err: - lnk.error = err - lnk.closed = True - return - - def linked(): - lnk.linked = True - - def resolved(type, subtype): - dir.do_link(sst, lnk, _lnk, type, subtype, linked) - - self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) - self._attachments[lnk] = _lnk - - if lnk.linked and lnk.closing and not lnk.closed: - if not _lnk.closing: - def unlinked(): - dir.del_link(sst, lnk, _lnk) - del self._attachments[lnk] - lnk.closed = True - if _lnk.options.get("delete") in ("always", dir.DIR_NAME): - dir.do_unlink(sst, lnk, _lnk) - self.delete(sst, _lnk.name, unlinked) - else: - dir.do_unlink(sst, lnk, _lnk, unlinked) - _lnk.closing = True - elif not lnk.linked and lnk.closing and not lnk.closed: - if lnk.error: lnk.closed = True - - def parse_address(self, lnk, dir, addr): - if addr is None: - return MalformedAddress(text="%s is None" % dir.ADDR_NAME) - else: - try: - lnk.name, lnk.subject, lnk.options = address.parse(addr) - # XXX: subject - if lnk.options is None: - lnk.options = {} - except address.LexError, e: - return MalformedAddress(text=str(e)) - except address.ParseError, e: - return MalformedAddress(text=str(e)) - - def validate_options(self, lnk, dir): - ctx = Context() - err = dir.VALIDATOR.validate(lnk.options, ctx) - if err: return InvalidOption(text="error in options: %s" % err) - - def resolve_declare(self, sst, lnk, dir, action): - declare = lnk.options.get("create") in ("always", dir) - assrt = lnk.options.get("assert") in ("always", dir) - def do_resolved(type, subtype): - err = None - if type is None: - if declare: - err = self.declare(sst, lnk, action) - else: - err = NotFound(text="no such queue: %s" % lnk.name) - else: - if assrt: - expected = lnk.options.get("node", {}).get("type") - if expected and type != expected: - err = AssertionFailed(text="expected %s, got %s" % (expected, type)) - if err is None: - action(type, subtype) - - if err: - tgt = lnk.target - tgt.error = err - del self._attachments[tgt] - tgt.closed = True - return - self.resolve(sst, lnk.name, do_resolved, force=declare) - - def resolve(self, sst, name, action, force=False): - if not force: - try: - type, subtype = self.address_cache[name] - action(type, subtype) - return - except KeyError: - pass - - args = [] - def do_result(r): - args.append(r) - def do_action(r): - do_result(r) - er, qr = args - if er.not_found and not qr.queue: - type, subtype = None, None - elif qr.queue: - type, subtype = "queue", None - else: - type, subtype = "topic", er.type - if type is not None: - self.address_cache[name] = (type, subtype) - action(type, subtype) - sst.write_query(ExchangeQuery(name), do_result) - sst.write_query(QueueQuery(name), do_action) - - def declare(self, sst, lnk, action): - name = lnk.name - props = lnk.options.get("node", {}) - durable = props.get("durable", DURABLE_DEFAULT) - type = props.get("type", "queue") - declare = props.get("x-declare", {}) - - if type == "topic": - cmd = ExchangeDeclare(exchange=name, durable=durable) - bindings = get_bindings(props, exchange=name) - elif type == "queue": - cmd = QueueDeclare(queue=name, durable=durable) - bindings = get_bindings(props, queue=name) - else: - raise ValueError(type) - - sst.apply_overrides(cmd, declare) - - if type == "topic": - if cmd.type is None: - cmd.type = "topic" - subtype = cmd.type - else: - subtype = None - - cmds = [cmd] - cmds.extend(bindings) - - def declared(): - self.address_cache[name] = (type, subtype) - action(type, subtype) - - sst.write_cmds(cmds, declared) - - def delete(self, sst, name, action): - def deleted(): - del self.address_cache[name] - action() - - def do_delete(type, subtype): - if type == "topic": - sst.write_cmd(ExchangeDelete(name), deleted) - elif type == "queue": - sst.write_cmd(QueueDelete(name), deleted) - elif type is None: - action() - else: - raise ValueError(type) - self.resolve(sst, name, do_delete, force=True) - - def process(self, ssn): - if ssn.closed or ssn.closing: return - - sst = self._attachments[ssn] - - while sst.outgoing_idx < len(ssn.outgoing): - msg = ssn.outgoing[sst.outgoing_idx] - snd = msg._sender - # XXX: should check for sender error here - _snd = self._attachments.get(snd) - if _snd and snd.linked: - self.send(snd, msg) - sst.outgoing_idx += 1 - else: - break - - for snd in ssn.senders: - # XXX: should included snd.acked in this - if snd.synced >= snd.queued and sst.need_sync: - sst.write_cmd(ExecutionSync(), sync_noop) - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - if ssn.acked: - messages = ssn.acked[sst.acked_idx:] - if messages: - ids = RangedSet() - - disposed = [(DEFAULT_DISPOSITION, [])] - acked = [] - for m in messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - if m._transfer_id is None: - acked.append(m) - continue - ids.add(m._transfer_id) - if m._receiver._accept_mode is accept_mode.explicit: - disp = m._disposition or DEFAULT_DISPOSITION - last, msgs = disposed[-1] - if disp.type is last.type and disp.options == last.options: - msgs.append(m) - else: - disposed.append((disp, [m])) - else: - acked.append(m) - - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - - def ack_acker(msgs): - def ack_ack(): - for m in msgs: - ssn.acked.remove(m) - sst.acked_idx -= 1 - # XXX: should this check accept_mode too? - if not ssn.transactional: - sst.acked.remove(m) - return ack_ack - - for disp, msgs in disposed: - if not msgs: continue - if disp.type is None: - op = MessageAccept - elif disp.type is RELEASED: - op = MessageRelease - elif disp.type is REJECTED: - op = MessageReject - sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), - **disp.options), - ack_acker(msgs)) - if log.isEnabledFor(DEBUG): - for m in msgs: - log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) - - sst.acked.extend(messages) - sst.acked_idx += len(messages) - ack_acker(acked)() - - if ssn.committing and not sst.committing: - def commit_ok(): - del sst.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False - sst.committing = False - sst.write_cmd(TxCommit(), commit_ok) - sst.committing = True - - if ssn.aborting and not sst.aborting: - sst.aborting = True - def do_rb(): - messages = sst.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids, True)) - sst.write_cmd(TxRollback(), do_rb_ok) - - def do_rb_ok(): - del ssn.incoming[:] - del ssn.unacked[:] - del sst.acked[:] - - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False - sst.aborting = False - - for rcv in ssn.receivers: - _rcv = self._attachments[rcv] - sst.write_cmd(MessageStop(_rcv.destination)) - sst.write_cmd(ExecutionSync(), do_rb) - - def grant(self, rcv): - sst = self._attachments[rcv.session] - _rcv = self._attachments.get(rcv) - if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: - return - - if rcv.granted is UNLIMITED: - if rcv.impending is UNLIMITED: - delta = 0 - else: - delta = UNLIMITED - elif rcv.impending is UNLIMITED: - delta = -1 - else: - delta = max(rcv.granted, rcv.received) - rcv.impending - - if delta is UNLIMITED: - if not _rcv.bytes_open: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - _rcv.bytes_open = True - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) - rcv.impending = UNLIMITED - elif delta > 0: - if not _rcv.bytes_open: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - _rcv.bytes_open = True - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) - rcv.impending += delta - elif delta < 0 and not rcv.draining: - _rcv.draining = True - def do_stop(): - rcv.impending = rcv.received - _rcv.draining = False - _rcv.bytes_open = False - self.grant(rcv) - sst.write_cmd(MessageStop(_rcv.destination), do_stop) - - if rcv.draining: - _rcv.draining = True - def do_flush(): - rcv.impending = rcv.received - rcv.granted = rcv.impending - _rcv.draining = False - _rcv.bytes_open = False - rcv.draining = False - sst.write_cmd(MessageFlush(_rcv.destination), do_flush) - - - def process_receiver(self, rcv): - if rcv.closed: return - self.grant(rcv) - - def send(self, snd, msg): - sst = self._attachments[snd.session] - _snd = self._attachments[snd] - - if msg.subject is None or _snd._exchange == "": - rk = _snd._routing_key - else: - rk = msg.subject - - if msg.subject is None: - subject = _snd.subject - else: - subject = msg.subject - - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if msg.reply_to: - rt = addr2reply_to(msg.reply_to) - else: - rt = None - content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") - dp = DeliveryProperties(routing_key=rk) - mp = MessageProperties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - app_id = msg.properties.get("x-amqp-0-10.app-id"), - content_type=msg.content_type, - content_encoding=content_encoding, - application_headers=msg.properties) - if subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers[SUBJECT] = subject - if msg.durable is not None: - if msg.durable: - dp.delivery_mode = delivery_mode.persistent - else: - dp.delivery_mode = delivery_mode.non_persistent - if msg.priority is not None: - dp.priority = msg.priority - if msg.ttl is not None: - dp.ttl = long(msg.ttl*1000) - enc, dec = get_codec(msg.content_type) - body = enc(msg.content) - - # XXX: this is not safe for out of order, can this be triggered by pre_ack? - def msg_acked(): - # XXX: should we log the ack somehow too? - snd.acked += 1 - m = snd.session.outgoing.pop(0) - sst.outgoing_idx -= 1 - log.debug("RACK[%s]: %s", sst.session.log_id, msg) - assert msg == m - - xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body) - - if _snd.pre_ack: - sst.write_cmd(xfr) - else: - sst.write_cmd(xfr, msg_acked, sync=msg._sync) - - log.debug("SENT[%s]: %s", sst.session.log_id, msg) - - if _snd.pre_ack: - msg_acked() - - def do_message_transfer(self, xfr): - sst = self.get_sst(xfr) - ssn = sst.session - - msg = self._decode(xfr) - rcv = sst.destinations[xfr.destination].target - msg._receiver = rcv - if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) - rcv.received += 1 - log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.incoming.append(msg) - - def _decode(self, xfr): - dp = EMPTY_DP - mp = EMPTY_MP - - for h in xfr.headers: - if isinstance(h, DeliveryProperties): - dp = h - elif isinstance(h, MessageProperties): - mp = h - - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(xfr.payload) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.subject = ap.get(SUBJECT) - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - if dp.delivery_mode is not None: - msg.durable = dp.delivery_mode == delivery_mode.persistent - msg.priority = dp.priority - if dp.ttl is not None: - msg.ttl = dp.ttl/1000.0 - msg.redelivered = dp.redelivered - msg.properties = mp.application_headers or {} - if mp.app_id is not None: - msg.properties["x-amqp-0-10.app-id"] = mp.app_id - if mp.content_encoding is not None: - msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding - if dp.routing_key is not None: - msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key - msg.content_type = mp.content_type - msg._transfer_id = xfr.id - return msg diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py deleted file mode 100644 index 338ac70ecf..0000000000 --- a/python/qpid/messaging/endpoints.py +++ /dev/null @@ -1,1046 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A candidate high level messaging API for python. - -Areas that still need work: - - - definition of the arguments for L{Session.sender} and L{Session.receiver} - - standard L{Message} properties - - L{Message} content encoding - - protocol negotiation/multiprotocol impl -""" - -from logging import getLogger -from math import ceil -from qpid.codec010 import StringCodec -from qpid.concurrency import synchronized, Waiter, Condition -from qpid.datatypes import Serial, uuid4 -from qpid.messaging.constants import * -from qpid.messaging.exceptions import * -from qpid.messaging.message import * -from qpid.ops import PRIMITIVE -from qpid.util import default, URL -from threading import Thread, RLock - -log = getLogger("qpid.messaging") - -static = staticmethod - -class Endpoint: - - def _ecwait(self, predicate, timeout=None): - result = self._ewait(lambda: self.closed or predicate(), timeout) - self.check_closed() - return result - -class Connection(Endpoint): - - """ - A Connection manages a group of L{Sessions<Session>} and connects - them with a remote endpoint. - """ - - @static - def establish(url=None, **options): - """ - Constructs a L{Connection} with the supplied parameters and opens - it. - """ - conn = Connection(url, **options) - conn.open() - return conn - - def __init__(self, url=None, **options): - """ - Creates a connection. A newly created connection must be connected - with the Connection.connect() method before it can be used. - - @type url: str - @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] - @type host: str - @param host: the name or ip address of the remote host (overriden by url) - @type port: int - @param port: the port number of the remote host (overriden by url) - @type transport: str - @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) - @type heartbeat: int - @param heartbeat: heartbeat interval in seconds - - @type username: str - @param username: the username for authentication (overriden by url) - @type password: str - @param password: the password for authentication (overriden by url) - - @type sasl_mechanisms: str - @param sasl_mechanisms: space separated list of permitted sasl mechanisms - @type sasl_service: str - @param sasl_service: ??? - @type sasl_min_ssf: ??? - @param sasl_min_ssf: ??? - @type sasl_max_ssf: ??? - @param sasl_max_ssf: ??? - - @type reconnect: bool - @param reconnect: enable/disable automatic reconnect - @type reconnect_timeout: float - @param reconnect_timeout: total time to attempt reconnect - @type reconnect_internal_min: float - @param reconnect_internal_min: minimum interval between reconnect attempts - @type reconnect_internal_max: float - @param reconnect_internal_max: maximum interval between reconnect attempts - @type reconnect_internal: float - @param reconnect_interval: set both min and max reconnect intervals - @type reconnect_limit: int - @param reconnect_limit: limit the total number of reconnect attempts - @type reconnect_urls: list[str] - @param reconnect_urls: list of backup hosts specified as urls - - @type address_ttl: float - @param address_ttl: time until cached address resolution expires - - @rtype: Connection - @return: a disconnected Connection - """ - if url is None: - url = options.get("host") - if isinstance(url, basestring): - url = URL(url) - self.host = url.host - if options.has_key("transport"): - self.transport = options.get("transport") - elif url.scheme == url.AMQP: - self.transport = "tcp" - elif url.scheme == url.AMQPS: - self.transport = "ssl" - else: - self.transport = "tcp" - if self.transport in ("ssl", "tcp+tls"): - self.port = default(url.port, options.get("port", AMQPS_PORT)) - else: - self.port = default(url.port, options.get("port", AMQP_PORT)) - self.heartbeat = options.get("heartbeat") - self.username = default(url.user, options.get("username", None)) - self.password = default(url.password, options.get("password", None)) - self.auth_username = None - - self.sasl_mechanisms = options.get("sasl_mechanisms") - self.sasl_service = options.get("sasl_service", "qpidd") - self.sasl_min_ssf = options.get("sasl_min_ssf") - self.sasl_max_ssf = options.get("sasl_max_ssf") - - self.reconnect = options.get("reconnect", False) - self.reconnect_timeout = options.get("reconnect_timeout") - reconnect_interval = options.get("reconnect_interval") - self.reconnect_interval_min = options.get("reconnect_interval_min", - default(reconnect_interval, 1)) - self.reconnect_interval_max = options.get("reconnect_interval_max", - default(reconnect_interval, 2*60)) - self.reconnect_limit = options.get("reconnect_limit") - self.reconnect_urls = options.get("reconnect_urls", []) - self.reconnect_log = options.get("reconnect_log", True) - - self.address_ttl = options.get("address_ttl", 60) - self.tcp_nodelay = options.get("tcp_nodelay", False) - - self.options = options - - - self.id = str(uuid4()) - self.session_counter = 0 - self.sessions = {} - self._open = False - self._connected = False - self._transport_connected = False - self._lock = RLock() - self._condition = Condition(self._lock) - self._waiter = Waiter(self._condition) - self._modcount = Serial(0) - self.error = None - from driver import Driver - self._driver = Driver(self) - - def _wait(self, predicate, timeout=None): - return self._waiter.wait(predicate, timeout=timeout) - - def _wakeup(self): - self._modcount += 1 - self._driver.wakeup() - - def check_error(self): - if self.error: - self._condition.gc() - raise self.error - - def get_error(self): - return self.error - - def _ewait(self, predicate, timeout=None): - result = self._wait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if not self._connected: - self._condition.gc() - raise ConnectionClosed() - - @synchronized - def session(self, name=None, transactional=False): - """ - Creates or retrieves the named session. If the name is omitted or - None, then a unique name is chosen based on a randomly generated - uuid. - - @type name: str - @param name: the session name - @rtype: Session - @return: the named Session - """ - - if name is None: - name = "%s:%s" % (self.id, self.session_counter) - self.session_counter += 1 - else: - name = "%s:%s" % (self.id, name) - - if self.sessions.has_key(name): - return self.sessions[name] - else: - ssn = Session(self, name, transactional) - self.sessions[name] = ssn - self._wakeup() - return ssn - - @synchronized - def _remove_session(self, ssn): - self.sessions.pop(ssn.name, 0) - - @synchronized - def open(self): - """ - Opens a connection. - """ - if self._open: - raise ConnectionError("already open") - self._open = True - self.attach() - - @synchronized - def opened(self): - """ - Return true if the connection is open, false otherwise. - """ - return self._open - - @synchronized - def attach(self): - """ - Attach to the remote endpoint. - """ - if not self._connected: - self._connected = True - self._driver.start() - self._wakeup() - self._ewait(lambda: self._transport_connected and not self._unlinked()) - - def _unlinked(self): - return [l - for ssn in self.sessions.values() - if not (ssn.error or ssn.closed) - for l in ssn.senders + ssn.receivers - if not (l.linked or l.error or l.closed)] - - @synchronized - def detach(self, timeout=None): - """ - Detach from the remote endpoint. - """ - if self._connected: - self._connected = False - self._wakeup() - cleanup = True - else: - cleanup = False - try: - if not self._wait(lambda: not self._transport_connected, timeout=timeout): - raise Timeout("detach timed out") - finally: - if cleanup: - self._driver.stop() - self._condition.gc() - - @synchronized - def attached(self): - """ - Return true if the connection is attached, false otherwise. - """ - return self._connected - - @synchronized - def close(self, timeout=None): - """ - Close the connection and all sessions. - """ - try: - for ssn in self.sessions.values(): - ssn.close(timeout=timeout) - finally: - self.detach(timeout=timeout) - self._open = False - -class Session(Endpoint): - - """ - Sessions provide a linear context for sending and receiving - L{Messages<Message>}. L{Messages<Message>} are sent and received - using the L{Sender.send} and L{Receiver.fetch} methods of the - L{Sender} and L{Receiver} objects associated with a Session. - - Each L{Sender} and L{Receiver} is created by supplying either a - target or source address to the L{sender} and L{receiver} methods of - the Session. The address is supplied via a string syntax documented - below. - - Addresses - ========= - - An address identifies a source or target for messages. In its - simplest form this is just a name. In general a target address may - also be used as a source address, however not all source addresses - may be used as a target, e.g. a source might additionally have some - filtering criteria that would not be present in a target. - - A subject may optionally be specified along with the name. When an - address is used as a target, any subject specified in the address is - used as the default subject of outgoing messages for that target. - When an address is used as a source, any subject specified in the - address is pattern matched against the subject of available messages - as a filter for incoming messages from that source. - - The options map contains additional information about the address - including: - - - policies for automatically creating, and deleting the node to - which an address refers - - - policies for asserting facts about the node to which an address - refers - - - extension points that can be used for sender/receiver - configuration - - Mapping to AMQP 0-10 - -------------------- - The name is resolved to either an exchange or a queue by querying - the broker. - - The subject is set as a property on the message. Additionally, if - the name refers to an exchange, the routing key is set to the - subject. - - Syntax - ------ - The following regular expressions define the tokens used to parse - addresses:: - LBRACE: \\{ - RBRACE: \\} - LBRACK: \\[ - RBRACK: \\] - COLON: : - SEMI: ; - SLASH: / - COMMA: , - NUMBER: [+-]?[0-9]*\\.?[0-9]+ - ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? - STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' - ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] - SYM: [.#*%@$^!+-] - WSPACE: [ \\n\\r\\t]+ - - The formal grammar for addresses is given below:: - address = name [ "/" subject ] [ ";" options ] - name = ( part | quoted )+ - subject = ( part | quoted | "/" )* - quoted = STRING / ESC - part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM - options = map - map = "{" ( keyval ( "," keyval )* )? "}" - keyval = ID ":" value - value = NUMBER / STRING / ID / map / list - list = "[" ( value ( "," value )* )? "]" - - This grammar resuls in the following informal syntax:: - - <name> [ / <subject> ] [ ; <options> ] - - Where options is:: - - { <key> : <value>, ... } - - And values may be: - - numbers - - single, double, or non quoted strings - - maps (dictionaries) - - lists - - Options - ------- - The options map permits the following parameters:: - - <name> [ / <subject> ] ; { - create: always | sender | receiver | never, - delete: always | sender | receiver | never, - assert: always | sender | receiver | never, - mode: browse | consume, - node: { - type: queue | topic, - durable: True | False, - x-declare: { ... <declare-overrides> ... }, - x-bindings: [<binding_1>, ... <binding_n>] - }, - link: { - name: <link-name>, - durable: True | False, - reliability: unreliable | at-most-once | at-least-once | exactly-once, - x-declare: { ... <declare-overrides> ... }, - x-bindings: [<binding_1>, ... <binding_n>], - x-subscribe: { ... <subscribe-overrides> ... } - } - } - - Bindings are specified as a map with the following options:: - - { - exchange: <exchange>, - queue: <queue>, - key: <key>, - arguments: <arguments> - } - - The create, delete, and assert policies specify who should perfom - the associated action: - - - I{always}: the action will always be performed - - I{sender}: the action will only be performed by the sender - - I{receiver}: the action will only be performed by the receiver - - I{never}: the action will never be performed (this is the default) - - The node-type is one of: - - - I{topic}: a topic node will default to the topic exchange, - x-declare may be used to specify other exchange types - - I{queue}: this is the default node-type - - The x-declare map permits protocol specific keys and values to be - specified when exchanges or queues are declared. These keys and - values are passed through when creating a node or asserting facts - about an existing node. - - Examples - -------- - A simple name resolves to any named node, usually a queue or a - topic:: - - my-queue-or-topic - - A simple name with a subject will also resolve to a node, but the - presence of the subject will cause a sender using this address to - set the subject on outgoing messages, and receivers to filter based - on the subject:: - - my-queue-or-topic/my-subject - - A subject pattern can be used and will cause filtering if used by - the receiver. If used for a sender, the literal value gets set as - the subject:: - - my-queue-or-topic/my-* - - In all the above cases, the address is resolved to an existing node. - If you want the node to be auto-created, then you can do the - following. By default nonexistent nodes are assumed to be queues:: - - my-queue; {create: always} - - You can customize the properties of the queue:: - - my-queue; {create: always, node: {durable: True}} - - You can create a topic instead if you want:: - - my-queue; {create: always, node: {type: topic}} - - You can assert that the address resolves to a node with particular - properties:: - - my-transient-topic; { - assert: always, - node: { - type: topic, - durable: False - } - } - """ - - def __init__(self, connection, name, transactional): - self.connection = connection - self.name = name - self.log_id = "%x" % id(self) - - self.transactional = transactional - - self.committing = False - self.committed = True - self.aborting = False - self.aborted = False - - self.next_sender_id = 0 - self.senders = [] - self.next_receiver_id = 0 - self.receivers = [] - self.outgoing = [] - self.incoming = [] - self.unacked = [] - self.acked = [] - # XXX: I hate this name. - self.ack_capacity = UNLIMITED - - self.error = None - self.closing = False - self.closed = False - - self._lock = connection._lock - - def __repr__(self): - return "<Session %s>" % self.name - - def _wait(self, predicate, timeout=None): - return self.connection._wait(predicate, timeout=timeout) - - def _wakeup(self): - self.connection._wakeup() - - def check_error(self): - self.connection.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.connection.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.connection._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise SessionClosed() - - @synchronized - def sender(self, target, **options): - """ - Creates a L{Sender} that may be used to send L{Messages<Message>} - to the specified target. - - @type target: str - @param target: the target to which messages will be sent - @rtype: Sender - @return: a new Sender for the specified target - """ - target = _mangle(target) - sender = Sender(self, self.next_sender_id, target, options) - self.next_sender_id += 1 - self.senders.append(sender) - if not self.closed and self.connection._connected: - self._wakeup() - try: - sender._ewait(lambda: sender.linked) - except LinkError, e: - sender.close() - raise e - return sender - - @synchronized - def receiver(self, source, **options): - """ - Creates a receiver that may be used to fetch L{Messages<Message>} - from the specified source. - - @type source: str - @param source: the source of L{Messages<Message>} - @rtype: Receiver - @return: a new Receiver for the specified source - """ - source = _mangle(source) - receiver = Receiver(self, self.next_receiver_id, source, options) - self.next_receiver_id += 1 - self.receivers.append(receiver) - if not self.closed and self.connection._connected: - self._wakeup() - try: - receiver._ewait(lambda: receiver.linked) - except LinkError, e: - receiver.close() - raise e - return receiver - - @synchronized - def _count(self, predicate): - result = 0 - for msg in self.incoming: - if predicate(msg): - result += 1 - return result - - def _peek(self, receiver): - for msg in self.incoming: - if msg._receiver == receiver: - return msg - - def _pop(self, receiver): - i = 0 - while i < len(self.incoming): - msg = self.incoming[i] - if msg._receiver == receiver: - del self.incoming[i] - return msg - else: - i += 1 - - @synchronized - def _get(self, receiver, timeout=None): - if self._ewait(lambda: ((self._peek(receiver) is not None) or - self.closing or receiver.closed), - timeout): - msg = self._pop(receiver) - if msg is not None: - msg._receiver.returned += 1 - self.unacked.append(msg) - log.debug("RETR[%s]: %s", self.log_id, msg) - return msg - return None - - @synchronized - def next_receiver(self, timeout=None): - if self._ecwait(lambda: self.incoming, timeout): - return self.incoming[0]._receiver - else: - raise Empty - - @synchronized - def acknowledge(self, message=None, disposition=None, sync=True): - """ - Acknowledge the given L{Message}. If message is None, then all - unacknowledged messages on the session are acknowledged. - - @type message: Message - @param message: the message to acknowledge or None - @type sync: boolean - @param sync: if true then block until the message(s) are acknowledged - """ - if message is None: - messages = self.unacked[:] - else: - messages = [message] - - for m in messages: - if self.ack_capacity is not UNLIMITED: - if self.ack_capacity <= 0: - # XXX: this is currently a SendError, maybe it should be a SessionError? - raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) - self._wakeup() - self._ecwait(lambda: len(self.acked) < self.ack_capacity) - m._disposition = disposition - self.unacked.remove(m) - self.acked.append(m) - - self._wakeup() - if sync: - self._ecwait(lambda: not [m for m in messages if m in self.acked]) - - @synchronized - def commit(self): - """ - Commit outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.committing = True - self._wakeup() - self._ecwait(lambda: not self.committing) - if self.aborted: - raise TransactionAborted() - assert self.committed - - @synchronized - def rollback(self): - """ - Rollback outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.aborting = True - self._wakeup() - self._ecwait(lambda: not self.aborting) - assert self.aborted - - @synchronized - def sync(self, timeout=None): - """ - Sync the session. - """ - for snd in self.senders: - snd.sync(timeout=timeout) - if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): - raise Timeout("session sync timed out") - - @synchronized - def close(self, timeout=None): - """ - Close the session. - """ - self.sync(timeout=timeout) - - for link in self.receivers + self.senders: - link.close(timeout=timeout) - - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("session close timed out") - finally: - self.connection._remove_session(self) - -def _mangle(addr): - if addr and addr.startswith("#"): - return str(uuid4()) + addr - else: - return addr - -class Sender(Endpoint): - - """ - Sends outgoing messages. - """ - - def __init__(self, session, id, target, options): - self.session = session - self.id = id - self.target = target - self.options = options - self.capacity = options.get("capacity", UNLIMITED) - self.threshold = 0.5 - self.durable = options.get("durable") - self.queued = Serial(0) - self.synced = Serial(0) - self.acked = Serial(0) - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - - def _wakeup(self): - self.session._wakeup() - - def check_error(self): - self.session.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.session.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.session._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise LinkClosed() - - @synchronized - def unsettled(self): - """ - Returns the number of messages awaiting acknowledgment. - @rtype: int - @return: the number of unacknowledged messages - """ - return self.queued - self.acked - - @synchronized - def available(self): - if self.capacity is UNLIMITED: - return UNLIMITED - else: - return self.capacity - self.unsettled() - - @synchronized - def send(self, object, sync=True, timeout=None): - """ - Send a message. If the object passed in is of type L{unicode}, - L{str}, L{list}, or L{dict}, it will automatically be wrapped in a - L{Message} and sent. If it is of type L{Message}, it will be sent - directly. If the sender capacity is not L{UNLIMITED} then send - will block until there is available capacity to send the message. - If the timeout parameter is specified, then send will throw an - L{InsufficientCapacity} exception if capacity does not become - available within the specified time. - - @type object: unicode, str, list, dict, Message - @param object: the message or content to send - - @type sync: boolean - @param sync: if true then block until the message is sent - - @type timeout: float - @param timeout: the time to wait for available capacity - """ - - if not self.session.connection._connected or self.session.closing: - raise Detached() - - self._ecwait(lambda: self.linked) - - if isinstance(object, Message): - message = object - else: - message = Message(object) - - if message.durable is None: - message.durable = self.durable - - if self.capacity is not UNLIMITED: - if self.capacity <= 0: - raise InsufficientCapacity("capacity = %s" % self.capacity) - if not self._ecwait(self.available, timeout=timeout): - raise InsufficientCapacity("capacity = %s" % self.capacity) - - # XXX: what if we send the same message to multiple senders? - message._sender = self - if self.capacity is not UNLIMITED: - message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) - else: - message._sync = sync - self.session.outgoing.append(message) - self.queued += 1 - - if sync: - self.sync() - assert message not in self.session.outgoing - else: - self._wakeup() - - @synchronized - def sync(self, timeout=None): - mno = self.queued - if self.synced < mno: - self.synced = mno - self._wakeup() - if not self._ewait(lambda: self.acked >= mno, timeout=timeout): - raise Timeout("sender sync timed out") - - @synchronized - def close(self, timeout=None): - """ - Close the Sender. - """ - # avoid erroring out when closing a sender that was never - # established - if self.acked < self.queued: - self.sync(timeout=timeout) - - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self.session._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("sender close timed out") - finally: - try: - self.session.senders.remove(self) - except ValueError: - pass - -class Receiver(Endpoint, object): - - """ - Receives incoming messages from a remote source. Messages may be - fetched with L{fetch}. - """ - - def __init__(self, session, id, source, options): - self.session = session - self.id = id - self.source = source - self.options = options - - self.granted = Serial(0) - self.draining = False - self.impending = Serial(0) - self.received = Serial(0) - self.returned = Serial(0) - - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - self._capacity = 0 - self._set_capacity(options.get("capacity", 0), False) - self.threshold = 0.5 - - @synchronized - def _set_capacity(self, c, wakeup=True): - if c is UNLIMITED: - self._capacity = c.value - else: - self._capacity = c - self._grant() - if wakeup: - self._wakeup() - - def _get_capacity(self): - if self._capacity == UNLIMITED.value: - return UNLIMITED - else: - return self._capacity - - capacity = property(_get_capacity, _set_capacity) - - def _wakeup(self): - self.session._wakeup() - - def check_error(self): - self.session.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.session.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.session._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise LinkClosed() - - @synchronized - def unsettled(self): - """ - Returns the number of acknowledged messages awaiting confirmation. - """ - return len([m for m in self.acked if m._receiver is self]) - - @synchronized - def available(self): - """ - Returns the number of messages available to be fetched by the - application. - - @rtype: int - @return: the number of available messages - """ - return self.received - self.returned - - @synchronized - def fetch(self, timeout=None): - """ - Fetch and return a single message. A timeout of None will block - forever waiting for a message to arrive, a timeout of zero will - return immediately if no messages are available. - - @type timeout: float - @param timeout: the time to wait for a message to be available - """ - - self._ecwait(lambda: self.linked) - - if self._capacity == 0: - self.granted = self.returned + 1 - self._wakeup() - self._ecwait(lambda: self.impending >= self.granted) - msg = self.session._get(self, timeout=timeout) - if msg is None: - self.check_closed() - self.draining = True - self._wakeup() - self._ecwait(lambda: not self.draining) - msg = self.session._get(self, timeout=0) - self._grant() - self._wakeup() - if msg is None: - raise Empty() - elif self._capacity not in (0, UNLIMITED.value): - t = int(ceil(self.threshold * self._capacity)) - if self.received - self.returned <= t: - self.granted = self.returned + self._capacity - self._wakeup() - return msg - - def _grant(self): - if self._capacity == UNLIMITED.value: - self.granted = UNLIMITED - else: - self.granted = self.returned + self._capacity - - @synchronized - def close(self, timeout=None): - """ - Close the receiver. - """ - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self.session._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("receiver close timed out") - finally: - try: - self.session.receivers.remove(self) - except ValueError: - pass - -__all__ = ["Connection", "Session", "Sender", "Receiver"] diff --git a/python/qpid/messaging/exceptions.py b/python/qpid/messaging/exceptions.py deleted file mode 100644 index 0296d615d9..0000000000 --- a/python/qpid/messaging/exceptions.py +++ /dev/null @@ -1,156 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -class Timeout(Exception): - pass - -## Messaging Errors - -class MessagingError(Exception): - - def __init__(self, code=None, text=None, **info): - self.code = code - self.text = text - self.info = info - if self.code is None: - msg = self.text - else: - msg = "%s(%s)" % (self.text, self.code) - if info: - msg += " " + ", ".join(["%s=%r" % (k, v) for k, v in self.info.items()]) - Exception.__init__(self, msg) - -class InternalError(MessagingError): - pass - -## Connection Errors - -class ConnectionError(MessagingError): - """ - The base class for all connection related exceptions. - """ - pass - -class ConnectError(ConnectionError): - """ - Exception raised when there is an error connecting to the remote - peer. - """ - pass - -class VersionError(ConnectError): - pass - -class AuthenticationFailure(ConnectError): - pass - -class ConnectionClosed(ConnectionError): - pass - -class HeartbeatTimeout(ConnectionError): - pass - -## Session Errors - -class SessionError(MessagingError): - pass - -class Detached(SessionError): - """ - Exception raised when an operation is attempted that is illegal when - detached. - """ - pass - -class NontransactionalSession(SessionError): - """ - Exception raised when commit or rollback is attempted on a non - transactional session. - """ - pass - -class TransactionError(SessionError): - pass - -class TransactionAborted(TransactionError): - pass - -class UnauthorizedAccess(SessionError): - pass - -class ServerError(SessionError): - pass - -class SessionClosed(SessionError): - pass - -## Link Errors - -class LinkError(MessagingError): - pass - -class InsufficientCapacity(LinkError): - pass - -class AddressError(LinkError): - pass - -class MalformedAddress(AddressError): - pass - -class InvalidOption(AddressError): - pass - -class ResolutionError(AddressError): - pass - -class AssertionFailed(ResolutionError): - pass - -class NotFound(ResolutionError): - pass - -class LinkClosed(LinkError): - pass - -## Sender Errors - -class SenderError(LinkError): - pass - -class SendError(SenderError): - pass - -class TargetCapacityExceeded(SendError): - pass - -## Receiver Errors - -class ReceiverError(LinkError): - pass - -class FetchError(ReceiverError): - pass - -class Empty(FetchError): - """ - Exception raised by L{Receiver.fetch} when there is no message - available within the alloted time. - """ - pass diff --git a/python/qpid/messaging/message.py b/python/qpid/messaging/message.py deleted file mode 100644 index b70b365c16..0000000000 --- a/python/qpid/messaging/message.py +++ /dev/null @@ -1,173 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from qpid.codec010 import StringCodec -from qpid.ops import PRIMITIVE - -def codec(name): - type = PRIMITIVE[name] - - def encode(x): - sc = StringCodec() - sc.write_primitive(type, x) - return sc.encoded - - def decode(x): - sc = StringCodec(x) - return sc.read_primitive(type) - - return encode, decode - -# XXX: need to correctly parse the mime type and deal with -# content-encoding header - -TYPE_MAPPINGS={ - dict: "amqp/map", - list: "amqp/list", - unicode: "text/plain; charset=utf8", - unicode: "text/plain", - buffer: None, - str: None, - None.__class__: None - } - -DEFAULT_CODEC = (lambda x: x, lambda x: x) - -def encode_text_plain(x): - if x is None: - return None - else: - return x.encode("utf8") - -def decode_text_plain(x): - if x is None: - return None - else: - return x.decode("utf8") - -TYPE_CODEC={ - "amqp/map": codec("map"), - "amqp/list": codec("list"), - "text/plain; charset=utf8": (encode_text_plain, decode_text_plain), - "text/plain": (encode_text_plain, decode_text_plain), - "": DEFAULT_CODEC, - None: DEFAULT_CODEC - } - -def get_type(content): - return TYPE_MAPPINGS[content.__class__] - -def get_codec(content_type): - return TYPE_CODEC.get(content_type, DEFAULT_CODEC) - -UNSPECIFIED = object() - -class Message: - - """ - A message consists of a standard set of fields, an application - defined set of properties, and some content. - - @type id: str - @ivar id: the message id - @type subject: str - @ivar subject: message subject - @type user_id: str - @ivar user_id: the user-id of the message producer - @type reply_to: str - @ivar reply_to: the address to send replies - @type correlation_id: str - @ivar correlation_id: a correlation-id for the message - @type durable: bool - @ivar durable: message durability - @type priority: int - @ivar priority: message priority - @type ttl: float - @ivar ttl: time-to-live measured in seconds - @type properties: dict - @ivar properties: application specific message properties - @type content_type: str - @ivar content_type: the content-type of the message - @type content: str, unicode, buffer, dict, list - @ivar content: the message content - """ - - def __init__(self, content=None, content_type=UNSPECIFIED, id=None, - subject=None, user_id=None, reply_to=None, correlation_id=None, - durable=None, priority=None, ttl=None, properties=None): - """ - Construct a new message with the supplied content. The - content-type of the message will be automatically inferred from - type of the content parameter. - - @type content: str, unicode, buffer, dict, list - @param content: the message content - - @type content_type: str - @param content_type: the content-type of the message - """ - self.id = id - self.subject = subject - self.user_id = user_id - self.reply_to = reply_to - self.correlation_id = correlation_id - self.durable = durable - self.priority = priority - self.ttl = ttl - self.redelivered = False - if properties is None: - self.properties = {} - else: - self.properties = properties - if content_type is UNSPECIFIED: - self.content_type = get_type(content) - else: - self.content_type = content_type - self.content = content - - def __repr__(self): - args = [] - for name in ["id", "subject", "user_id", "reply_to", "correlation_id", - "priority", "ttl"]: - value = self.__dict__[name] - if value is not None: args.append("%s=%r" % (name, value)) - for name in ["durable", "redelivered", "properties"]: - value = self.__dict__[name] - if value: args.append("%s=%r" % (name, value)) - if self.content_type != get_type(self.content): - args.append("content_type=%r" % self.content_type) - if self.content is not None: - if args: - args.append("content=%r" % self.content) - else: - args.append(repr(self.content)) - return "Message(%s)" % ", ".join(args) - -class Disposition: - - def __init__(self, type, **options): - self.type = type - self.options = options - - def __repr__(self): - args = [str(self.type)] + \ - ["%s=%r" % (k, v) for k, v in self.options.items()] - return "Disposition(%s)" % ", ".join(args) - -__all__ = ["Message", "Disposition"] diff --git a/python/qpid/messaging/transports.py b/python/qpid/messaging/transports.py deleted file mode 100644 index 7abaae12e8..0000000000 --- a/python/qpid/messaging/transports.py +++ /dev/null @@ -1,116 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import socket -from qpid.util import connect - -TRANSPORTS = {} - -class SocketTransport: - - def __init__(self, conn, host, port): - self.socket = connect(host, port) - if conn.tcp_nodelay: - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - def fileno(self): - return self.socket.fileno() - -class tcp(SocketTransport): - - def reading(self, reading): - return reading - - def writing(self, writing): - return writing - - def send(self, bytes): - return self.socket.send(bytes) - - def recv(self, n): - return self.socket.recv(n) - - def close(self): - self.socket.close() - -TRANSPORTS["tcp"] = tcp - -try: - from ssl import wrap_socket, SSLError, SSL_ERROR_WANT_READ, \ - SSL_ERROR_WANT_WRITE -except ImportError: - pass -else: - class tls(SocketTransport): - - def __init__(self, conn, host, port): - SocketTransport.__init__(self, conn, host, port) - self.tls = wrap_socket(self.socket) - self.socket.setblocking(0) - self.state = None - - def reading(self, reading): - if self.state is None: - return reading - else: - return self.state == SSL_ERROR_WANT_READ - - def writing(self, writing): - if self.state is None: - return writing - else: - return self.state == SSL_ERROR_WANT_WRITE - - def send(self, bytes): - self._clear_state() - try: - return self.tls.write(bytes) - except SSLError, e: - if self._update_state(e.args[0]): - return 0 - else: - raise - - def recv(self, n): - self._clear_state() - try: - return self.tls.read(n) - except SSLError, e: - if self._update_state(e.args[0]): - return None - else: - raise - - def _clear_state(self): - self.state = None - - def _update_state(self, code): - if code in (SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE): - self.state = code - return True - else: - return False - - def close(self): - self.socket.setblocking(1) - # this closes the underlying socket - self.tls.close() - - TRANSPORTS["ssl"] = tls - TRANSPORTS["tcp+tls"] = tls diff --git a/python/qpid/messaging/util.py b/python/qpid/messaging/util.py deleted file mode 100644 index 265cf7d51f..0000000000 --- a/python/qpid/messaging/util.py +++ /dev/null @@ -1,61 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -Add-on utilities for the L{qpid.messaging} API. -""" - -from qpid.messaging import * -from logging import getLogger -from threading import Thread - -log = getLogger("qpid.messaging.util") - -def auto_fetch_reconnect_urls(conn): - ssn = conn.session("auto-fetch-reconnect-urls") - rcv = ssn.receiver("amq.failover") - rcv.capacity = 10 - - def main(): - while True: - try: - msg = rcv.fetch() - except LinkClosed: - return - set_reconnect_urls(conn, msg) - ssn.acknowledge(msg, sync=False) - - thread = Thread(name="auto-fetch-reconnect-urls", target=main) - thread.setDaemon(True) - thread.start() - - -def set_reconnect_urls(conn, msg): - reconnect_urls = [] - urls = msg.properties["amq.failover"] - for u in urls: - if u.startswith("amqp:"): - for p in u[5:].split(","): - parts = p.split(":") - host, port = parts[1:3] - reconnect_urls.append("%s:%s" % (host, port)) - conn.reconnect_urls = reconnect_urls - log.warn("set reconnect_urls for conn %s: %s", conn, reconnect_urls) - -__all__ = ["auto_fetch_reconnect_urls", "set_reconnect_urls"] |