diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /python/qpid/messaging/driver.py | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging/driver.py')
-rw-r--r-- | python/qpid/messaging/driver.py | 1329 |
1 files changed, 0 insertions, 1329 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py deleted file mode 100644 index 78af2827df..0000000000 --- a/python/qpid/messaging/driver.py +++ /dev/null @@ -1,1329 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import socket, struct, sys, time -from logging import getLogger, DEBUG -from qpid import compat -from qpid import sasl -from qpid.concurrency import synchronized -from qpid.datatypes import RangedSet, Serial -from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ - FrameDecoder, SegmentDecoder, OpDecoder -from qpid.messaging import address, transports -from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED -from qpid.messaging.exceptions import * -from qpid.messaging.message import get_codec, Disposition, Message -from qpid.ops import * -from qpid.selector import Selector -from qpid.util import URL, default -from qpid.validator import And, Context, List, Map, Types, Values -from threading import Condition, Thread - -log = getLogger("qpid.messaging") -rawlog = getLogger("qpid.messaging.io.raw") -opslog = getLogger("qpid.messaging.io.ops") - -def addr2reply_to(addr): - name, subject, options = address.parse(addr) - if options: - type = options.get("node", {}).get("type") - else: - type = None - - if type == "topic": - return ReplyTo(name, subject) - else: - return ReplyTo(None, name) - -def reply_to2addr(reply_to): - if reply_to.exchange in (None, ""): - return reply_to.routing_key - elif reply_to.routing_key is None: - return "%s; {node: {type: topic}}" % reply_to.exchange - else: - return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key) - -class Attachment: - - def __init__(self, target): - self.target = target - -# XXX - -DURABLE_DEFAULT=False - -# XXX - -class Pattern: - """ - The pattern filter matches the supplied wildcard pattern against a - message subject. - """ - - def __init__(self, value): - self.value = value - - # XXX: this should become part of the driver - def _bind(self, sst, exchange, queue): - from qpid.ops import ExchangeBind - - sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, - binding_key=self.value.replace("*", "#"))) - -SUBJECT_DEFAULTS = { - "topic": "#" - } - -# XXX -ppid = 0 -try: - ppid = os.getppid() -except: - pass - -CLIENT_PROPERTIES = {"product": "qpid python client", - "version": "development", - "platform": os.name, - "qpid.client_process": os.path.basename(sys.argv[0]), - "qpid.client_pid": os.getpid(), - "qpid.client_ppid": ppid} - -def noop(): pass -def sync_noop(): pass - -class SessionState: - - def __init__(self, driver, session, name, channel): - self.driver = driver - self.session = session - self.name = name - self.channel = channel - self.detached = False - self.committing = False - self.aborting = False - - # sender state - self.sent = Serial(0) - self.acknowledged = RangedSet() - self.actions = {} - self.min_completion = self.sent - self.max_completion = self.sent - self.results = {} - self.need_sync = False - - # receiver state - self.received = None - self.executed = RangedSet() - - # XXX: need to periodically exchange completion/known_completion - - self.destinations = {} - - def write_query(self, query, handler): - id = self.sent - self.write_cmd(query, lambda: handler(self.results.pop(id))) - - def apply_overrides(self, cmd, overrides): - for k, v in overrides.items(): - cmd[k.replace('-', '_')] = v - - def write_cmd(self, cmd, action=noop, overrides=None, sync=True): - if overrides: - self.apply_overrides(cmd, overrides) - - if action != noop: - cmd.sync = sync - if self.detached: - raise Exception("detached") - cmd.id = self.sent - self.sent += 1 - self.actions[cmd.id] = action - self.max_completion = cmd.id - self.write_op(cmd) - self.need_sync = not cmd.sync - - def write_cmds(self, cmds, action=noop): - if cmds: - for cmd in cmds[:-1]: - self.write_cmd(cmd) - self.write_cmd(cmds[-1], action) - else: - action() - - def write_op(self, op): - op.channel = self.channel - self.driver.write_op(op) - -POLICIES = Values("always", "sender", "receiver", "never") -RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", - "exactly-once") - -DECLARE = Map({}, restricted=False) -BINDINGS = List(Map({ - "exchange": Types(basestring), - "queue": Types(basestring), - "key": Types(basestring), - "arguments": Map({}, restricted=False) - })) - -COMMON_OPTS = { - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-declare": DECLARE, - "x-bindings": BINDINGS - }), - "link": Map({ - "name": Types(basestring), - "durable": Types(bool), - "reliability": RELIABILITY, - "x-declare": DECLARE, - "x-bindings": BINDINGS, - "x-subscribe": Map({}, restricted=False) - }) - } - -RECEIVE_MODES = Values("browse", "consume") - -SOURCE_OPTS = COMMON_OPTS.copy() -SOURCE_OPTS.update({ - "mode": RECEIVE_MODES - }) - -TARGET_OPTS = COMMON_OPTS.copy() - -class LinkIn: - - ADDR_NAME = "source" - DIR_NAME = "receiver" - VALIDATOR = Map(SOURCE_OPTS) - - def init_link(self, sst, rcv, _rcv): - _rcv.destination = str(rcv.id) - sst.destinations[_rcv.destination] = _rcv - _rcv.draining = False - _rcv.bytes_open = False - _rcv.on_unlink = [] - - def do_link(self, sst, rcv, _rcv, type, subtype, action): - link_opts = _rcv.options.get("link", {}) - reliability = link_opts.get("reliability", "at-least-once") - declare = link_opts.get("x-declare", {}) - subscribe = link_opts.get("x-subscribe", {}) - acq_mode = acquire_mode.pre_acquired - if reliability in ("unreliable", "at-most-once"): - rcv._accept_mode = accept_mode.none - else: - rcv._accept_mode = accept_mode.explicit - - if type == "topic": - default_name = "%s.%s" % (rcv.session.name, _rcv.destination) - _rcv._queue = link_opts.get("name", default_name) - sst.write_cmd(QueueDeclare(queue=_rcv._queue, - durable=link_opts.get("durable", False), - exclusive=True, - auto_delete=(reliability == "unreliable")), - overrides=declare) - _rcv.on_unlink = [QueueDelete(_rcv._queue)] - subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) - bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) - if not bindings: - sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) - - elif type == "queue": - _rcv._queue = _rcv.name - if _rcv.options.get("mode", "consume") == "browse": - acq_mode = acquire_mode.not_acquired - bindings = get_bindings(link_opts, queue=_rcv._queue) - - - sst.write_cmds(bindings) - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, - destination=_rcv.destination, - acquire_mode = acq_mode, - accept_mode = rcv._accept_mode), - overrides=subscribe) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) - - def do_unlink(self, sst, rcv, _rcv, action=noop): - link_opts = _rcv.options.get("link", {}) - reliability = link_opts.get("reliability") - cmds = [MessageCancel(_rcv.destination)] - cmds.extend(_rcv.on_unlink) - sst.write_cmds(cmds, action) - - def del_link(self, sst, rcv, _rcv): - del sst.destinations[_rcv.destination] - -class LinkOut: - - ADDR_NAME = "target" - DIR_NAME = "sender" - VALIDATOR = Map(TARGET_OPTS) - - def init_link(self, sst, snd, _snd): - _snd.closing = False - _snd.pre_ack = False - - def do_link(self, sst, snd, _snd, type, subtype, action): - link_opts = _snd.options.get("link", {}) - reliability = link_opts.get("reliability", "at-least-once") - _snd.pre_ack = reliability in ("unreliable", "at-most-once") - if type == "topic": - _snd._exchange = _snd.name - _snd._routing_key = _snd.subject - bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) - elif type == "queue": - _snd._exchange = "" - _snd._routing_key = _snd.name - bindings = get_bindings(link_opts, queue=_snd.name) - sst.write_cmds(bindings, action) - - def do_unlink(self, sst, snd, _snd, action=noop): - action() - - def del_link(self, sst, snd, _snd): - pass - -class Cache: - - def __init__(self, ttl): - self.ttl = ttl - self.entries = {} - - def __setitem__(self, key, value): - self.entries[key] = time.time(), value - - def __getitem__(self, key): - tstamp, value = self.entries[key] - if time.time() - tstamp >= self.ttl: - del self.entries[key] - raise KeyError(key) - else: - return value - - def __delitem__(self, key): - del self.entries[key] - -# XXX -HEADER="!4s4B" - -EMPTY_DP = DeliveryProperties() -EMPTY_MP = MessageProperties() - -SUBJECT = "qpid.subject" - -CLOSED = "CLOSED" -READ_ONLY = "READ_ONLY" -WRITE_ONLY = "WRITE_ONLY" -OPEN = "OPEN" - -class Driver: - - def __init__(self, connection): - self.connection = connection - self.log_id = "%x" % id(self.connection) - self._lock = self.connection._lock - - self._selector = Selector.default() - self._attempts = 0 - self._delay = self.connection.reconnect_interval_min - self._reconnect_log = self.connection.reconnect_log - self._host = 0 - self._retrying = False - self._next_retry = None - self._transport = None - - self._timeout = None - - self.engine = None - - def _next_host(self): - urls = [URL(u) for u in self.connection.reconnect_urls] - hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ - [(u.host, default(u.port, 5672)) for u in urls] - if self._host >= len(hosts): - self._host = 0 - result = hosts[self._host] - if self._host == 0: - self._attempts += 1 - self._host = self._host + 1 - return result - - def _num_hosts(self): - return len(self.connection.reconnect_urls) + 1 - - @synchronized - def wakeup(self): - self.dispatch() - self._selector.wakeup() - - def start(self): - self._selector.register(self) - - def stop(self): - self._selector.unregister(self) - if self._transport: - self.st_closed() - - def fileno(self): - return self._transport.fileno() - - @synchronized - def reading(self): - return self._transport is not None and \ - self._transport.reading(True) - - @synchronized - def writing(self): - return self._transport is not None and \ - self._transport.writing(self.engine.pending()) - - @synchronized - def timing(self): - return self._timeout - - @synchronized - def readable(self): - try: - data = self._transport.recv(64*1024) - if data is None: - return - elif data: - rawlog.debug("READ[%s]: %r", self.log_id, data) - self.engine.write(data) - else: - self.close_engine() - except socket.error, e: - self.close_engine(ConnectionError(text=str(e))) - - self.update_status() - - self._notify() - - def _notify(self): - if self.connection.error: - self.connection._condition.gc() - self.connection._waiter.notifyAll() - - def close_engine(self, e=None): - if e is None: - e = ConnectionError(text="connection aborted") - - if (self.connection.reconnect and - (self.connection.reconnect_limit is None or - self.connection.reconnect_limit <= 0 or - self._attempts <= self.connection.reconnect_limit)): - if self._host < self._num_hosts(): - delay = 0 - else: - delay = self._delay - self._delay = min(2*self._delay, - self.connection.reconnect_interval_max) - self._next_retry = time.time() + delay - if self._reconnect_log: - log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) - if delay > 0: - log.warn("sleeping %s seconds" % delay) - self._retrying = True - self.engine.close() - else: - self.engine.close(e) - - self.schedule() - - def update_status(self): - status = self.engine.status() - return getattr(self, "st_%s" % status.lower())() - - def st_closed(self): - # XXX: this log statement seems to sometimes hit when the socket is not connected - # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) - self._transport.close() - self._transport = None - self.engine = None - return True - - def st_open(self): - return False - - @synchronized - def writeable(self): - notify = False - try: - n = self._transport.send(self.engine.peek()) - if n == 0: return - sent = self.engine.read(n) - rawlog.debug("SENT[%s]: %r", self.log_id, sent) - except socket.error, e: - self.close_engine(e) - notify = True - - if self.update_status() or notify: - self._notify() - - @synchronized - def timeout(self): - self.dispatch() - self._notify() - self.schedule() - - def schedule(self): - times = [] - if self.connection.heartbeat: - times.append(time.time() + self.connection.heartbeat) - if self._next_retry: - times.append(self._next_retry) - if times: - self._timeout = min(times) - else: - self._timeout = None - - def dispatch(self): - try: - if self._transport is None: - if self.connection._connected and not self.connection.error: - self.connect() - else: - self.engine.dispatch() - except HeartbeatTimeout, e: - self.close_engine(e) - except: - # XXX: Does socket get leaked if this occurs? - msg = compat.format_exc() - self.connection.error = InternalError(text=msg) - - def connect(self): - if self._retrying and time.time() < self._next_retry: - return - - try: - # XXX: should make this non blocking - host, port = self._next_host() - if self._retrying and self._reconnect_log: - log.warn("trying: %s:%s", host, port) - self.engine = Engine(self.connection) - self.engine.open() - rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) - trans = transports.TRANSPORTS.get(self.connection.transport) - if trans: - self._transport = trans(self.connection, host, port) - else: - raise ConnectError("no such transport: %s" % self.connection.transport) - if self._retrying and self._reconnect_log: - log.warn("reconnect succeeded: %s:%s", host, port) - self._next_retry = None - self._attempts = 0 - self._host = 0 - self._delay = self.connection.reconnect_interval_min - self._retrying = False - self.schedule() - except socket.error, e: - self.close_engine(ConnectError(text=str(e))) - -DEFAULT_DISPOSITION = Disposition(None) - -def get_bindings(opts, queue=None, exchange=None, key=None): - bindings = opts.get("x-bindings", []) - cmds = [] - for b in bindings: - exchange = b.get("exchange", exchange) - queue = b.get("queue", queue) - key = b.get("key", key) - args = b.get("arguments", {}) - cmds.append(ExchangeBind(queue, exchange, key, args)) - return cmds - -CONNECTION_ERRS = { - # anythong not here (i.e. everything right now) will default to - # connection error - } - -SESSION_ERRS = { - # anything not here will default to session error - error_code.unauthorized_access: UnauthorizedAccess, - error_code.not_found: NotFound, - error_code.resource_locked: ReceiverError, - error_code.resource_limit_exceeded: TargetCapacityExceeded, - error_code.internal_error: ServerError - } - -class Engine: - - def __init__(self, connection): - self.connection = connection - self.log_id = "%x" % id(self.connection) - self._closing = False - self._connected = False - self._attachments = {} - - self._in = LinkIn() - self._out = LinkOut() - - self._channel_max = 65536 - self._channels = 0 - self._sessions = {} - - self.address_cache = Cache(self.connection.address_ttl) - - self._status = CLOSED - self._buf = "" - self._hdr = "" - self._last_in = None - self._last_out = None - self._op_enc = OpEncoder() - self._seg_enc = SegmentEncoder() - self._frame_enc = FrameEncoder() - self._frame_dec = FrameDecoder() - self._seg_dec = SegmentDecoder() - self._op_dec = OpDecoder() - - self._sasl = sasl.Client() - if self.connection.username: - self._sasl.setAttr("username", self.connection.username) - if self.connection.password: - self._sasl.setAttr("password", self.connection.password) - if self.connection.host: - self._sasl.setAttr("host", self.connection.host) - self._sasl.setAttr("service", self.connection.sasl_service) - if self.connection.sasl_min_ssf is not None: - self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) - if self.connection.sasl_max_ssf is not None: - self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) - self._sasl.init() - self._sasl_encode = False - self._sasl_decode = False - - def _reset(self): - self.connection._transport_connected = False - - for ssn in self.connection.sessions.values(): - for m in ssn.acked + ssn.unacked + ssn.incoming: - m._transfer_id = None - for snd in ssn.senders: - snd.linked = False - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.linked = False - - def status(self): - return self._status - - def write(self, data): - self._last_in = time.time() - try: - if self._sasl_decode: - data = self._sasl.decode(data) - - if len(self._hdr) < 8: - r = 8 - len(self._hdr) - self._hdr += data[:r] - data = data[r:] - - if len(self._hdr) == 8: - self.do_header(self._hdr) - - self._frame_dec.write(data) - self._seg_dec.write(*self._frame_dec.read()) - self._op_dec.write(*self._seg_dec.read()) - for op in self._op_dec.read(): - self.assign_id(op) - opslog.debug("RCVD[%s]: %r", self.log_id, op) - op.dispatch(self) - self.dispatch() - except MessagingError, e: - self.close(e) - except: - self.close(InternalError(text=compat.format_exc())) - - def close(self, e=None): - self._reset() - if e: - self.connection.error = e - self._status = CLOSED - - def assign_id(self, op): - if isinstance(op, Command): - sst = self.get_sst(op) - op.id = sst.received - sst.received += 1 - - def pending(self): - return len(self._buf) - - def read(self, n): - result = self._buf[:n] - self._buf = self._buf[n:] - return result - - def peek(self): - return self._buf - - def write_op(self, op): - opslog.debug("SENT[%s]: %r", self.log_id, op) - self._op_enc.write(op) - self._seg_enc.write(*self._op_enc.read()) - self._frame_enc.write(*self._seg_enc.read()) - bytes = self._frame_enc.read() - if self._sasl_encode: - bytes = self._sasl.encode(bytes) - self._buf += bytes - self._last_out = time.time() - - def do_header(self, hdr): - cli_major = 0; cli_minor = 10 - magic, _, _, major, minor = struct.unpack(HEADER, hdr) - if major != cli_major or minor != cli_minor: - raise VersionError(text="client: %s-%s, server: %s-%s" % - (cli_major, cli_minor, major, minor)) - - def do_connection_start(self, start): - if self.connection.sasl_mechanisms: - permitted = self.connection.sasl_mechanisms.split() - mechs = [m for m in start.mechanisms if m in permitted] - else: - mechs = start.mechanisms - try: - mech, initial = self._sasl.start(" ".join(mechs)) - except sasl.SASLError, e: - raise AuthenticationFailure(text=str(e)) - self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, - mechanism=mech, response=initial)) - - def do_connection_secure(self, secure): - resp = self._sasl.step(secure.challenge) - self.write_op(ConnectionSecureOk(response=resp)) - - def do_connection_tune(self, tune): - # XXX: is heartbeat protocol specific? - if tune.channel_max is not None: - self.channel_max = tune.channel_max - self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, - channel_max=self.channel_max)) - self.write_op(ConnectionOpen()) - self._sasl_encode = True - - def do_connection_open_ok(self, open_ok): - self.connection.auth_username = self._sasl.auth_username() - self._connected = True - self._sasl_decode = True - self.connection._transport_connected = True - - def do_connection_heartbeat(self, hrt): - pass - - def do_connection_close(self, close): - self.write_op(ConnectionCloseOk()) - if close.reply_code != close_code.normal: - exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) - self.connection.error = exc(close.reply_code, close.reply_text) - # XXX: should we do a half shutdown on the socket here? - # XXX: we really need to test this, we may end up reporting a - # connection abort after this, if we were to do a shutdown on read - # and stop reading, then we wouldn't report the abort, that's - # probably the right thing to do - - def do_connection_close_ok(self, close_ok): - self.close() - - def do_session_attached(self, atc): - pass - - def do_session_command_point(self, cp): - sst = self.get_sst(cp) - sst.received = cp.command_id - - def do_session_completed(self, sc): - sst = self.get_sst(sc) - for r in sc.commands: - sst.acknowledged.add(r.lower, r.upper) - - if not sc.commands.empty(): - while sst.min_completion in sc.commands: - if sst.actions.has_key(sst.min_completion): - sst.actions.pop(sst.min_completion)() - sst.min_completion += 1 - - def session_known_completed(self, kcmp): - sst = self.get_sst(kcmp) - executed = RangedSet() - for e in sst.executed.ranges: - for ke in kcmp.ranges: - if e.lower in ke and e.upper in ke: - break - else: - executed.add_range(e) - sst.executed = completed - - def do_session_flush(self, sf): - sst = self.get_sst(sf) - if sf.expected: - if sst.received is None: - exp = None - else: - exp = RangedSet(sst.received) - sst.write_op(SessionExpected(exp)) - if sf.confirmed: - sst.write_op(SessionConfirmed(sst.executed)) - if sf.completed: - sst.write_op(SessionCompleted(sst.executed)) - - def do_session_request_timeout(self, rt): - sst = self.get_sst(rt) - sst.write_op(SessionTimeout(timeout=0)) - - def do_execution_result(self, er): - sst = self.get_sst(er) - sst.results[er.command_id] = er.value - sst.executed.add(er.id) - - def do_execution_exception(self, ex): - sst = self.get_sst(ex) - exc = SESSION_ERRS.get(ex.error_code, SessionError) - sst.session.error = exc(ex.error_code, ex.description) - - def dispatch(self): - if not self.connection._connected and not self._closing and self._status != CLOSED: - self.disconnect() - - if self._connected and not self._closing: - for ssn in self.connection.sessions.values(): - self.attach(ssn) - self.process(ssn) - - if self.connection.heartbeat and self._status != CLOSED: - now = time.time() - if self._last_in is not None and \ - now - self._last_in > 2*self.connection.heartbeat: - raise HeartbeatTimeout(text="heartbeat timeout") - if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: - self.write_op(ConnectionHeartbeat()) - - def open(self): - self._reset() - self._status = OPEN - self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) - - def disconnect(self): - self.write_op(ConnectionClose(close_code.normal)) - self._closing = True - - def attach(self, ssn): - sst = self._attachments.get(ssn) - if sst is None and not ssn.closed: - for i in xrange(0, self.channel_max): - if not self._sessions.has_key(i): - ch = i - break - else: - raise RuntimeError("all channels used") - sst = SessionState(self, ssn, ssn.name, ch) - sst.write_op(SessionAttach(name=ssn.name)) - sst.write_op(SessionCommandPoint(sst.sent, 0)) - sst.outgoing_idx = 0 - sst.acked = [] - sst.acked_idx = 0 - if ssn.transactional: - sst.write_cmd(TxSelect()) - self._attachments[ssn] = sst - self._sessions[sst.channel] = sst - - for snd in ssn.senders: - self.link(snd, self._out, snd.target) - for rcv in ssn.receivers: - self.link(rcv, self._in, rcv.source) - - if sst is not None and ssn.closing and not sst.detached: - sst.detached = True - sst.write_op(SessionDetach(name=ssn.name)) - - def get_sst(self, op): - return self._sessions[op.channel] - - def do_session_detached(self, dtc): - sst = self._sessions.pop(dtc.channel) - ssn = sst.session - del self._attachments[ssn] - ssn.closed = True - - def do_session_detach(self, dtc): - sst = self.get_sst(dtc) - sst.write_op(SessionDetached(name=dtc.name)) - self.do_session_detached(dtc) - - def link(self, lnk, dir, addr): - sst = self._attachments.get(lnk.session) - _lnk = self._attachments.get(lnk) - - if _lnk is None and not lnk.closed: - _lnk = Attachment(lnk) - _lnk.closing = False - dir.init_link(sst, lnk, _lnk) - - err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) - if err: - lnk.error = err - lnk.closed = True - return - - def linked(): - lnk.linked = True - - def resolved(type, subtype): - dir.do_link(sst, lnk, _lnk, type, subtype, linked) - - self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) - self._attachments[lnk] = _lnk - - if lnk.linked and lnk.closing and not lnk.closed: - if not _lnk.closing: - def unlinked(): - dir.del_link(sst, lnk, _lnk) - del self._attachments[lnk] - lnk.closed = True - if _lnk.options.get("delete") in ("always", dir.DIR_NAME): - dir.do_unlink(sst, lnk, _lnk) - self.delete(sst, _lnk.name, unlinked) - else: - dir.do_unlink(sst, lnk, _lnk, unlinked) - _lnk.closing = True - elif not lnk.linked and lnk.closing and not lnk.closed: - if lnk.error: lnk.closed = True - - def parse_address(self, lnk, dir, addr): - if addr is None: - return MalformedAddress(text="%s is None" % dir.ADDR_NAME) - else: - try: - lnk.name, lnk.subject, lnk.options = address.parse(addr) - # XXX: subject - if lnk.options is None: - lnk.options = {} - except address.LexError, e: - return MalformedAddress(text=str(e)) - except address.ParseError, e: - return MalformedAddress(text=str(e)) - - def validate_options(self, lnk, dir): - ctx = Context() - err = dir.VALIDATOR.validate(lnk.options, ctx) - if err: return InvalidOption(text="error in options: %s" % err) - - def resolve_declare(self, sst, lnk, dir, action): - declare = lnk.options.get("create") in ("always", dir) - assrt = lnk.options.get("assert") in ("always", dir) - def do_resolved(type, subtype): - err = None - if type is None: - if declare: - err = self.declare(sst, lnk, action) - else: - err = NotFound(text="no such queue: %s" % lnk.name) - else: - if assrt: - expected = lnk.options.get("node", {}).get("type") - if expected and type != expected: - err = AssertionFailed(text="expected %s, got %s" % (expected, type)) - if err is None: - action(type, subtype) - - if err: - tgt = lnk.target - tgt.error = err - del self._attachments[tgt] - tgt.closed = True - return - self.resolve(sst, lnk.name, do_resolved, force=declare) - - def resolve(self, sst, name, action, force=False): - if not force: - try: - type, subtype = self.address_cache[name] - action(type, subtype) - return - except KeyError: - pass - - args = [] - def do_result(r): - args.append(r) - def do_action(r): - do_result(r) - er, qr = args - if er.not_found and not qr.queue: - type, subtype = None, None - elif qr.queue: - type, subtype = "queue", None - else: - type, subtype = "topic", er.type - if type is not None: - self.address_cache[name] = (type, subtype) - action(type, subtype) - sst.write_query(ExchangeQuery(name), do_result) - sst.write_query(QueueQuery(name), do_action) - - def declare(self, sst, lnk, action): - name = lnk.name - props = lnk.options.get("node", {}) - durable = props.get("durable", DURABLE_DEFAULT) - type = props.get("type", "queue") - declare = props.get("x-declare", {}) - - if type == "topic": - cmd = ExchangeDeclare(exchange=name, durable=durable) - bindings = get_bindings(props, exchange=name) - elif type == "queue": - cmd = QueueDeclare(queue=name, durable=durable) - bindings = get_bindings(props, queue=name) - else: - raise ValueError(type) - - sst.apply_overrides(cmd, declare) - - if type == "topic": - if cmd.type is None: - cmd.type = "topic" - subtype = cmd.type - else: - subtype = None - - cmds = [cmd] - cmds.extend(bindings) - - def declared(): - self.address_cache[name] = (type, subtype) - action(type, subtype) - - sst.write_cmds(cmds, declared) - - def delete(self, sst, name, action): - def deleted(): - del self.address_cache[name] - action() - - def do_delete(type, subtype): - if type == "topic": - sst.write_cmd(ExchangeDelete(name), deleted) - elif type == "queue": - sst.write_cmd(QueueDelete(name), deleted) - elif type is None: - action() - else: - raise ValueError(type) - self.resolve(sst, name, do_delete, force=True) - - def process(self, ssn): - if ssn.closed or ssn.closing: return - - sst = self._attachments[ssn] - - while sst.outgoing_idx < len(ssn.outgoing): - msg = ssn.outgoing[sst.outgoing_idx] - snd = msg._sender - # XXX: should check for sender error here - _snd = self._attachments.get(snd) - if _snd and snd.linked: - self.send(snd, msg) - sst.outgoing_idx += 1 - else: - break - - for snd in ssn.senders: - # XXX: should included snd.acked in this - if snd.synced >= snd.queued and sst.need_sync: - sst.write_cmd(ExecutionSync(), sync_noop) - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - if ssn.acked: - messages = ssn.acked[sst.acked_idx:] - if messages: - ids = RangedSet() - - disposed = [(DEFAULT_DISPOSITION, [])] - acked = [] - for m in messages: - # XXX: we're ignoring acks that get lost when disconnected, - # could we deal this via some message-id based purge? - if m._transfer_id is None: - acked.append(m) - continue - ids.add(m._transfer_id) - if m._receiver._accept_mode is accept_mode.explicit: - disp = m._disposition or DEFAULT_DISPOSITION - last, msgs = disposed[-1] - if disp.type is last.type and disp.options == last.options: - msgs.append(m) - else: - disposed.append((disp, [m])) - else: - acked.append(m) - - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - - def ack_acker(msgs): - def ack_ack(): - for m in msgs: - ssn.acked.remove(m) - sst.acked_idx -= 1 - # XXX: should this check accept_mode too? - if not ssn.transactional: - sst.acked.remove(m) - return ack_ack - - for disp, msgs in disposed: - if not msgs: continue - if disp.type is None: - op = MessageAccept - elif disp.type is RELEASED: - op = MessageRelease - elif disp.type is REJECTED: - op = MessageReject - sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), - **disp.options), - ack_acker(msgs)) - if log.isEnabledFor(DEBUG): - for m in msgs: - log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) - - sst.acked.extend(messages) - sst.acked_idx += len(messages) - ack_acker(acked)() - - if ssn.committing and not sst.committing: - def commit_ok(): - del sst.acked[:] - ssn.committing = False - ssn.committed = True - ssn.aborting = False - ssn.aborted = False - sst.committing = False - sst.write_cmd(TxCommit(), commit_ok) - sst.committing = True - - if ssn.aborting and not sst.aborting: - sst.aborting = True - def do_rb(): - messages = sst.acked + ssn.unacked + ssn.incoming - ids = RangedSet(*[m._transfer_id for m in messages]) - for range in ids: - sst.executed.add_range(range) - sst.write_op(SessionCompleted(sst.executed)) - sst.write_cmd(MessageRelease(ids, True)) - sst.write_cmd(TxRollback(), do_rb_ok) - - def do_rb_ok(): - del ssn.incoming[:] - del ssn.unacked[:] - del sst.acked[:] - - for rcv in ssn.receivers: - rcv.impending = rcv.received - rcv.returned = rcv.received - # XXX: do we need to update granted here as well? - - for rcv in ssn.receivers: - self.process_receiver(rcv) - - ssn.aborting = False - ssn.aborted = True - ssn.committing = False - ssn.committed = False - sst.aborting = False - - for rcv in ssn.receivers: - _rcv = self._attachments[rcv] - sst.write_cmd(MessageStop(_rcv.destination)) - sst.write_cmd(ExecutionSync(), do_rb) - - def grant(self, rcv): - sst = self._attachments[rcv.session] - _rcv = self._attachments.get(rcv) - if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: - return - - if rcv.granted is UNLIMITED: - if rcv.impending is UNLIMITED: - delta = 0 - else: - delta = UNLIMITED - elif rcv.impending is UNLIMITED: - delta = -1 - else: - delta = max(rcv.granted, rcv.received) - rcv.impending - - if delta is UNLIMITED: - if not _rcv.bytes_open: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - _rcv.bytes_open = True - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) - rcv.impending = UNLIMITED - elif delta > 0: - if not _rcv.bytes_open: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) - _rcv.bytes_open = True - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) - rcv.impending += delta - elif delta < 0 and not rcv.draining: - _rcv.draining = True - def do_stop(): - rcv.impending = rcv.received - _rcv.draining = False - _rcv.bytes_open = False - self.grant(rcv) - sst.write_cmd(MessageStop(_rcv.destination), do_stop) - - if rcv.draining: - _rcv.draining = True - def do_flush(): - rcv.impending = rcv.received - rcv.granted = rcv.impending - _rcv.draining = False - _rcv.bytes_open = False - rcv.draining = False - sst.write_cmd(MessageFlush(_rcv.destination), do_flush) - - - def process_receiver(self, rcv): - if rcv.closed: return - self.grant(rcv) - - def send(self, snd, msg): - sst = self._attachments[snd.session] - _snd = self._attachments[snd] - - if msg.subject is None or _snd._exchange == "": - rk = _snd._routing_key - else: - rk = msg.subject - - if msg.subject is None: - subject = _snd.subject - else: - subject = msg.subject - - # XXX: do we need to query to figure out how to create the reply-to interoperably? - if msg.reply_to: - rt = addr2reply_to(msg.reply_to) - else: - rt = None - content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") - dp = DeliveryProperties(routing_key=rk) - mp = MessageProperties(message_id=msg.id, - user_id=msg.user_id, - reply_to=rt, - correlation_id=msg.correlation_id, - app_id = msg.properties.get("x-amqp-0-10.app-id"), - content_type=msg.content_type, - content_encoding=content_encoding, - application_headers=msg.properties) - if subject is not None: - if mp.application_headers is None: - mp.application_headers = {} - mp.application_headers[SUBJECT] = subject - if msg.durable is not None: - if msg.durable: - dp.delivery_mode = delivery_mode.persistent - else: - dp.delivery_mode = delivery_mode.non_persistent - if msg.priority is not None: - dp.priority = msg.priority - if msg.ttl is not None: - dp.ttl = long(msg.ttl*1000) - enc, dec = get_codec(msg.content_type) - body = enc(msg.content) - - # XXX: this is not safe for out of order, can this be triggered by pre_ack? - def msg_acked(): - # XXX: should we log the ack somehow too? - snd.acked += 1 - m = snd.session.outgoing.pop(0) - sst.outgoing_idx -= 1 - log.debug("RACK[%s]: %s", sst.session.log_id, msg) - assert msg == m - - xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), - payload=body) - - if _snd.pre_ack: - sst.write_cmd(xfr) - else: - sst.write_cmd(xfr, msg_acked, sync=msg._sync) - - log.debug("SENT[%s]: %s", sst.session.log_id, msg) - - if _snd.pre_ack: - msg_acked() - - def do_message_transfer(self, xfr): - sst = self.get_sst(xfr) - ssn = sst.session - - msg = self._decode(xfr) - rcv = sst.destinations[xfr.destination].target - msg._receiver = rcv - if rcv.impending is not UNLIMITED: - assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) - rcv.received += 1 - log.debug("RCVD[%s]: %s", ssn.log_id, msg) - ssn.incoming.append(msg) - - def _decode(self, xfr): - dp = EMPTY_DP - mp = EMPTY_MP - - for h in xfr.headers: - if isinstance(h, DeliveryProperties): - dp = h - elif isinstance(h, MessageProperties): - mp = h - - ap = mp.application_headers - enc, dec = get_codec(mp.content_type) - content = dec(xfr.payload) - msg = Message(content) - msg.id = mp.message_id - if ap is not None: - msg.subject = ap.get(SUBJECT) - msg.user_id = mp.user_id - if mp.reply_to is not None: - msg.reply_to = reply_to2addr(mp.reply_to) - msg.correlation_id = mp.correlation_id - if dp.delivery_mode is not None: - msg.durable = dp.delivery_mode == delivery_mode.persistent - msg.priority = dp.priority - if dp.ttl is not None: - msg.ttl = dp.ttl/1000.0 - msg.redelivered = dp.redelivered - msg.properties = mp.application_headers or {} - if mp.app_id is not None: - msg.properties["x-amqp-0-10.app-id"] = mp.app_id - if mp.content_encoding is not None: - msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding - if dp.routing_key is not None: - msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key - msg.content_type = mp.content_type - msg._transfer_id = xfr.id - return msg |