# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import socket, struct, sys, time from logging import getLogger, DEBUG from qpid import compat from qpid import sasl from qpid.concurrency import synchronized from qpid.datatypes import RangedSet, Serial from qpid.framing import OpEncoder, SegmentEncoder, FrameEncoder, \ FrameDecoder, SegmentDecoder, OpDecoder from qpid.messaging import address, transports from qpid.messaging.constants import UNLIMITED, REJECTED, RELEASED from qpid.messaging.exceptions import * from qpid.messaging.message import get_codec, Disposition, Message from qpid.ops import * from qpid.selector import Selector from qpid.util import URL, default from qpid.validator import And, Context, List, Map, Types, Values from threading import Condition, Thread log = getLogger("qpid.messaging") rawlog = getLogger("qpid.messaging.io.raw") opslog = getLogger("qpid.messaging.io.ops") def addr2reply_to(addr): name, subject, options = address.parse(addr) if options: type = options.get("node", {}).get("type") else: type = None if type == "topic": return ReplyTo(name, subject) else: return ReplyTo(None, name) def reply_to2addr(reply_to): if reply_to.exchange in (None, ""): return reply_to.routing_key elif reply_to.routing_key is None: return "%s; {node: {type: topic}}" % reply_to.exchange else: return "%s/%s; {node: {type: topic}}" % (reply_to.exchange, reply_to.routing_key) class Attachment: def __init__(self, target): self.target = target # XXX DURABLE_DEFAULT=False # XXX class Pattern: """ The pattern filter matches the supplied wildcard pattern against a message subject. """ def __init__(self, value): self.value = value # XXX: this should become part of the driver def _bind(self, sst, exchange, queue): from qpid.ops import ExchangeBind sst.write_cmd(ExchangeBind(exchange=exchange, queue=queue, binding_key=self.value.replace("*", "#"))) SUBJECT_DEFAULTS = { "topic": "#" } # XXX ppid = 0 try: ppid = os.getppid() except: pass CLIENT_PROPERTIES = {"product": "qpid python client", "version": "development", "platform": os.name, "qpid.client_process": os.path.basename(sys.argv[0]), "qpid.client_pid": os.getpid(), "qpid.client_ppid": ppid} def noop(): pass def sync_noop(): pass class SessionState: def __init__(self, driver, session, name, channel): self.driver = driver self.session = session self.name = name self.channel = channel self.detached = False self.committing = False self.aborting = False # sender state self.sent = Serial(0) self.acknowledged = RangedSet() self.actions = {} self.min_completion = self.sent self.max_completion = self.sent self.results = {} self.need_sync = False # receiver state self.received = None self.executed = RangedSet() # XXX: need to periodically exchange completion/known_completion self.destinations = {} def write_query(self, query, handler): id = self.sent self.write_cmd(query, lambda: handler(self.results.pop(id))) def apply_overrides(self, cmd, overrides): for k, v in overrides.items(): cmd[k.replace('-', '_')] = v def write_cmd(self, cmd, action=noop, overrides=None, sync=True): if overrides: self.apply_overrides(cmd, overrides) if action != noop: cmd.sync = sync if self.detached: raise Exception("detached") cmd.id = self.sent self.sent += 1 self.actions[cmd.id] = action self.max_completion = cmd.id self.write_op(cmd) self.need_sync = not cmd.sync def write_cmds(self, cmds, action=noop): if cmds: for cmd in cmds[:-1]: self.write_cmd(cmd) self.write_cmd(cmds[-1], action) else: action() def write_op(self, op): op.channel = self.channel self.driver.write_op(op) POLICIES = Values("always", "sender", "receiver", "never") RELIABILITY = Values("unreliable", "at-most-once", "at-least-once", "exactly-once") DECLARE = Map({}, restricted=False) BINDINGS = List(Map({ "exchange": Types(basestring), "queue": Types(basestring), "key": Types(basestring), "arguments": Map({}, restricted=False) })) COMMON_OPTS = { "create": POLICIES, "delete": POLICIES, "assert": POLICIES, "node": Map({ "type": Values("queue", "topic"), "durable": Types(bool), "x-declare": DECLARE, "x-bindings": BINDINGS }), "link": Map({ "name": Types(basestring), "durable": Types(bool), "reliability": RELIABILITY, "x-declare": DECLARE, "x-bindings": BINDINGS, "x-subscribe": Map({}, restricted=False) }) } RECEIVE_MODES = Values("browse", "consume") SOURCE_OPTS = COMMON_OPTS.copy() SOURCE_OPTS.update({ "mode": RECEIVE_MODES }) TARGET_OPTS = COMMON_OPTS.copy() class LinkIn: ADDR_NAME = "source" DIR_NAME = "receiver" VALIDATOR = Map(SOURCE_OPTS) def init_link(self, sst, rcv, _rcv): _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv _rcv.draining = False _rcv.bytes_open = False _rcv.on_unlink = [] def do_link(self, sst, rcv, _rcv, type, subtype, action): link_opts = _rcv.options.get("link", {}) reliability = link_opts.get("reliability", "at-least-once") declare = link_opts.get("x-declare", {}) subscribe = link_opts.get("x-subscribe", {}) acq_mode = acquire_mode.pre_acquired if reliability in ("unreliable", "at-most-once"): rcv._accept_mode = accept_mode.none else: rcv._accept_mode = accept_mode.explicit if type == "topic": default_name = "%s.%s" % (rcv.session.name, _rcv.destination) _rcv._queue = link_opts.get("name", default_name) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=link_opts.get("durable", False), exclusive=True, auto_delete=(reliability == "unreliable")), overrides=declare) _rcv.on_unlink = [QueueDelete(_rcv._queue)] subject = _rcv.subject or SUBJECT_DEFAULTS.get(subtype) bindings = get_bindings(link_opts, _rcv._queue, _rcv.name, subject) if not bindings: sst.write_cmd(ExchangeBind(_rcv._queue, _rcv.name, subject)) elif type == "queue": _rcv._queue = _rcv.name if _rcv.options.get("mode", "consume") == "browse": acq_mode = acquire_mode.not_acquired bindings = get_bindings(link_opts, queue=_rcv._queue) sst.write_cmds(bindings) sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, acquire_mode = acq_mode, accept_mode = rcv._accept_mode), overrides=subscribe) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): link_opts = _rcv.options.get("link", {}) reliability = link_opts.get("reliability") cmds = [MessageCancel(_rcv.destination)] cmds.extend(_rcv.on_unlink) sst.write_cmds(cmds, action) def del_link(self, sst, rcv, _rcv): del sst.destinations[_rcv.destination] class LinkOut: ADDR_NAME = "target" DIR_NAME = "sender" VALIDATOR = Map(TARGET_OPTS) def init_link(self, sst, snd, _snd): _snd.closing = False _snd.pre_ack = False def do_link(self, sst, snd, _snd, type, subtype, action): link_opts = _snd.options.get("link", {}) reliability = link_opts.get("reliability", "at-least-once") _snd.pre_ack = reliability in ("unreliable", "at-most-once") if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject bindings = get_bindings(link_opts, exchange=_snd.name, key=_snd.subject) elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name bindings = get_bindings(link_opts, queue=_snd.name) sst.write_cmds(bindings, action) def do_unlink(self, sst, snd, _snd, action=noop): action() def del_link(self, sst, snd, _snd): pass class Cache: def __init__(self, ttl): self.ttl = ttl self.entries = {} def __setitem__(self, key, value): self.entries[key] = time.time(), value def __getitem__(self, key): tstamp, value = self.entries[key] if time.time() - tstamp >= self.ttl: del self.entries[key] raise KeyError(key) else: return value def __delitem__(self, key): del self.entries[key] # XXX HEADER="!4s4B" EMPTY_DP = DeliveryProperties() EMPTY_MP = MessageProperties() SUBJECT = "qpid.subject" CLOSED = "CLOSED" READ_ONLY = "READ_ONLY" WRITE_ONLY = "WRITE_ONLY" OPEN = "OPEN" class Driver: def __init__(self, connection): self.connection = connection self.log_id = "%x" % id(self.connection) self._lock = self.connection._lock self._selector = Selector.default() self._attempts = 0 self._delay = self.connection.reconnect_interval_min self._reconnect_log = self.connection.reconnect_log self._host = 0 self._retrying = False self._next_retry = None self._transport = None self._timeout = None self.engine = None def _next_host(self): urls = [URL(u) for u in self.connection.reconnect_urls] hosts = [(self.connection.host, default(self.connection.port, 5672))] + \ [(u.host, default(u.port, 5672)) for u in urls] if self._host >= len(hosts): self._host = 0 result = hosts[self._host] if self._host == 0: self._attempts += 1 self._host = self._host + 1 return result def _num_hosts(self): return len(self.connection.reconnect_urls) + 1 @synchronized def wakeup(self): self.dispatch() self._selector.wakeup() def start(self): self._selector.register(self) def stop(self): self._selector.unregister(self) if self._transport: self.st_closed() def fileno(self): return self._transport.fileno() @synchronized def reading(self): return self._transport is not None and \ self._transport.reading(True) @synchronized def writing(self): return self._transport is not None and \ self._transport.writing(self.engine.pending()) @synchronized def timing(self): return self._timeout @synchronized def readable(self): try: data = self._transport.recv(64*1024) if data is None: return elif data: rawlog.debug("READ[%s]: %r", self.log_id, data) self.engine.write(data) else: self.close_engine() except socket.error, e: self.close_engine(ConnectionError(text=str(e))) self.update_status() self._notify() def _notify(self): if self.connection.error: self.connection._condition.gc() self.connection._waiter.notifyAll() def close_engine(self, e=None): if e is None: e = ConnectionError(text="connection aborted") if (self.connection.reconnect and (self.connection.reconnect_limit is None or self.connection.reconnect_limit <= 0 or self._attempts <= self.connection.reconnect_limit)): if self._host < self._num_hosts(): delay = 0 else: delay = self._delay self._delay = min(2*self._delay, self.connection.reconnect_interval_max) self._next_retry = time.time() + delay if self._reconnect_log: log.warn("recoverable error[attempt %s]: %s" % (self._attempts, e)) if delay > 0: log.warn("sleeping %s seconds" % delay) self._retrying = True self.engine.close() else: self.engine.close(e) self.schedule() def update_status(self): status = self.engine.status() return getattr(self, "st_%s" % status.lower())() def st_closed(self): # XXX: this log statement seems to sometimes hit when the socket is not connected # XXX: rawlog.debug("CLOSE[%s]: %s", self.log_id, self._socket.getpeername()) self._transport.close() self._transport = None self.engine = None return True def st_open(self): return False @synchronized def writeable(self): notify = False try: n = self._transport.send(self.engine.peek()) if n == 0: return sent = self.engine.read(n) rawlog.debug("SENT[%s]: %r", self.log_id, sent) except socket.error, e: self.close_engine(e) notify = True if self.update_status() or notify: self._notify() @synchronized def timeout(self): self.dispatch() self._notify() self.schedule() def schedule(self): times = [] if self.connection.heartbeat: times.append(time.time() + self.connection.heartbeat) if self._next_retry: times.append(self._next_retry) if times: self._timeout = min(times) else: self._timeout = None def dispatch(self): try: if self._transport is None: if self.connection._connected and not self.connection.error: self.connect() else: self.engine.dispatch() except HeartbeatTimeout, e: self.close_engine(e) except: # XXX: Does socket get leaked if this occurs? msg = compat.format_exc() self.connection.error = InternalError(text=msg) def connect(self): if self._retrying and time.time() < self._next_retry: return try: # XXX: should make this non blocking host, port = self._next_host() if self._retrying and self._reconnect_log: log.warn("trying: %s:%s", host, port) self.engine = Engine(self.connection) self.engine.open() rawlog.debug("OPEN[%s]: %s:%s", self.log_id, host, port) trans = transports.TRANSPORTS.get(self.connection.transport) if trans: self._transport = trans(self.connection, host, port) else: raise ConnectError("no such transport: %s" % self.connection.transport) if self._retrying and self._reconnect_log: log.warn("reconnect succeeded: %s:%s", host, port) self._next_retry = None self._attempts = 0 self._host = 0 self._delay = self.connection.reconnect_interval_min self._retrying = False self.schedule() except socket.error, e: self.close_engine(ConnectError(text=str(e))) DEFAULT_DISPOSITION = Disposition(None) def get_bindings(opts, queue=None, exchange=None, key=None): bindings = opts.get("x-bindings", []) cmds = [] for b in bindings: exchange = b.get("exchange", exchange) queue = b.get("queue", queue) key = b.get("key", key) args = b.get("arguments", {}) cmds.append(ExchangeBind(queue, exchange, key, args)) return cmds CONNECTION_ERRS = { # anythong not here (i.e. everything right now) will default to # connection error } SESSION_ERRS = { # anything not here will default to session error error_code.unauthorized_access: UnauthorizedAccess, error_code.not_found: NotFound, error_code.resource_locked: ReceiverError, error_code.resource_limit_exceeded: TargetCapacityExceeded, error_code.internal_error: ServerError } class Engine: def __init__(self, connection): self.connection = connection self.log_id = "%x" % id(self.connection) self._closing = False self._connected = False self._attachments = {} self._in = LinkIn() self._out = LinkOut() self._channel_max = 65536 self._channels = 0 self._sessions = {} self.address_cache = Cache(self.connection.address_ttl) self._status = CLOSED self._buf = "" self._hdr = "" self._last_in = None self._last_out = None self._op_enc = OpEncoder() self._seg_enc = SegmentEncoder() self._frame_enc = FrameEncoder() self._frame_dec = FrameDecoder() self._seg_dec = SegmentDecoder() self._op_dec = OpDecoder() self._sasl = sasl.Client() if self.connection.username: self._sasl.setAttr("username", self.connection.username) if self.connection.password: self._sasl.setAttr("password", self.connection.password) if self.connection.host: self._sasl.setAttr("host", self.connection.host) self._sasl.setAttr("service", self.connection.sasl_service) if self.connection.sasl_min_ssf is not None: self._sasl.setAttr("minssf", self.connection.sasl_min_ssf) if self.connection.sasl_max_ssf is not None: self._sasl.setAttr("maxssf", self.connection.sasl_max_ssf) self._sasl.init() self._sasl_encode = False self._sasl_decode = False def _reset(self): self.connection._transport_connected = False for ssn in self.connection.sessions.values(): for m in ssn.acked + ssn.unacked + ssn.incoming: m._transfer_id = None for snd in ssn.senders: snd.linked = False for rcv in ssn.receivers: rcv.impending = rcv.received rcv.linked = False def status(self): return self._status def write(self, data): self._last_in = time.time() try: if self._sasl_decode: data = self._sasl.decode(data) if len(self._hdr) < 8: r = 8 - len(self._hdr) self._hdr += data[:r] data = data[r:] if len(self._hdr) == 8: self.do_header(self._hdr) self._frame_dec.write(data) self._seg_dec.write(*self._frame_dec.read()) self._op_dec.write(*self._seg_dec.read()) for op in self._op_dec.read(): self.assign_id(op) opslog.debug("RCVD[%s]: %r", self.log_id, op) op.dispatch(self) self.dispatch() except MessagingError, e: self.close(e) except: self.close(InternalError(text=compat.format_exc())) def close(self, e=None): self._reset() if e: self.connection.error = e self._status = CLOSED def assign_id(self, op): if isinstance(op, Command): sst = self.get_sst(op) op.id = sst.received sst.received += 1 def pending(self): return len(self._buf) def read(self, n): result = self._buf[:n] self._buf = self._buf[n:] return result def peek(self): return self._buf def write_op(self, op): opslog.debug("SENT[%s]: %r", self.log_id, op) self._op_enc.write(op) self._seg_enc.write(*self._op_enc.read()) self._frame_enc.write(*self._seg_enc.read()) bytes = self._frame_enc.read() if self._sasl_encode: bytes = self._sasl.encode(bytes) self._buf += bytes self._last_out = time.time() def do_header(self, hdr): cli_major = 0; cli_minor = 10 magic, _, _, major, minor = struct.unpack(HEADER, hdr) if major != cli_major or minor != cli_minor: raise VersionError(text="client: %s-%s, server: %s-%s" % (cli_major, cli_minor, major, minor)) def do_connection_start(self, start): if self.connection.sasl_mechanisms: permitted = self.connection.sasl_mechanisms.split() mechs = [m for m in start.mechanisms if m in permitted] else: mechs = start.mechanisms try: mech, initial = self._sasl.start(" ".join(mechs)) except sasl.SASLError, e: raise AuthenticationFailure(text=str(e)) self.write_op(ConnectionStartOk(client_properties=CLIENT_PROPERTIES, mechanism=mech, response=initial)) def do_connection_secure(self, secure): resp = self._sasl.step(secure.challenge) self.write_op(ConnectionSecureOk(response=resp)) def do_connection_tune(self, tune): # XXX: is heartbeat protocol specific? if tune.channel_max is not None: self.channel_max = tune.channel_max self.write_op(ConnectionTuneOk(heartbeat=self.connection.heartbeat, channel_max=self.channel_max)) self.write_op(ConnectionOpen()) self._sasl_encode = True def do_connection_open_ok(self, open_ok): self.connection.auth_username = self._sasl.auth_username() self._connected = True self._sasl_decode = True self.connection._transport_connected = True def do_connection_heartbeat(self, hrt): pass def do_connection_close(self, close): self.write_op(ConnectionCloseOk()) if close.reply_code != close_code.normal: exc = CONNECTION_ERRS.get(close.reply_code, ConnectionError) self.connection.error = exc(close.reply_code, close.reply_text) # XXX: should we do a half shutdown on the socket here? # XXX: we really need to test this, we may end up reporting a # connection abort after this, if we were to do a shutdown on read # and stop reading, then we wouldn't report the abort, that's # probably the right thing to do def do_connection_close_ok(self, close_ok): self.close() def do_session_attached(self, atc): pass def do_session_command_point(self, cp): sst = self.get_sst(cp) sst.received = cp.command_id def do_session_completed(self, sc): sst = self.get_sst(sc) for r in sc.commands: sst.acknowledged.add(r.lower, r.upper) if not sc.commands.empty(): while sst.min_completion in sc.commands: if sst.actions.has_key(sst.min_completion): sst.actions.pop(sst.min_completion)() sst.min_completion += 1 def session_known_completed(self, kcmp): sst = self.get_sst(kcmp) executed = RangedSet() for e in sst.executed.ranges: for ke in kcmp.ranges: if e.lower in ke and e.upper in ke: break else: executed.add_range(e) sst.executed = completed def do_session_flush(self, sf): sst = self.get_sst(sf) if sf.expected: if sst.received is None: exp = None else: exp = RangedSet(sst.received) sst.write_op(SessionExpected(exp)) if sf.confirmed: sst.write_op(SessionConfirmed(sst.executed)) if sf.completed: sst.write_op(SessionCompleted(sst.executed)) def do_session_request_timeout(self, rt): sst = self.get_sst(rt) sst.write_op(SessionTimeout(timeout=0)) def do_execution_result(self, er): sst = self.get_sst(er) sst.results[er.command_id] = er.value sst.executed.add(er.id) def do_execution_exception(self, ex): sst = self.get_sst(ex) exc = SESSION_ERRS.get(ex.error_code, SessionError) sst.session.error = exc(ex.error_code, ex.description) def dispatch(self): if not self.connection._connected and not self._closing and self._status != CLOSED: self.disconnect() if self._connected and not self._closing: for ssn in self.connection.sessions.values(): self.attach(ssn) self.process(ssn) if self.connection.heartbeat and self._status != CLOSED: now = time.time() if self._last_in is not None and \ now - self._last_in > 2*self.connection.heartbeat: raise HeartbeatTimeout(text="heartbeat timeout") if self._last_out is None or now - self._last_out >= self.connection.heartbeat/2.0: self.write_op(ConnectionHeartbeat()) def open(self): self._reset() self._status = OPEN self._buf += struct.pack(HEADER, "AMQP", 1, 1, 0, 10) def disconnect(self): self.write_op(ConnectionClose(close_code.normal)) self._closing = True def attach(self, ssn): if ssn.closed: return sst = self._attachments.get(ssn) if sst is None: for i in xrange(0, self.channel_max): if not self._sessions.has_key(i): ch = i break else: raise RuntimeError("all channels used") sst = SessionState(self, ssn, ssn.name, ch) sst.write_op(SessionAttach(name=ssn.name)) sst.write_op(SessionCommandPoint(sst.sent, 0)) sst.outgoing_idx = 0 sst.acked = [] sst.acked_idx = 0 if ssn.transactional: sst.write_cmd(TxSelect()) self._attachments[ssn] = sst self._sessions[sst.channel] = sst for snd in ssn.senders: self.link(snd, self._out, snd.target) for rcv in ssn.receivers: self.link(rcv, self._in, rcv.source) if sst is not None and ssn.closing and not sst.detached: sst.detached = True sst.write_op(SessionDetach(name=ssn.name)) def get_sst(self, op): return self._sessions[op.channel] def do_session_detached(self, dtc): sst = self._sessions.pop(dtc.channel) ssn = sst.session del self._attachments[ssn] ssn.closed = True def do_session_detach(self, dtc): sst = self.get_sst(dtc) sst.write_op(SessionDetached(name=dtc.name)) self.do_session_detached(dtc) def link(self, lnk, dir, addr): sst = self._attachments.get(lnk.session) _lnk = self._attachments.get(lnk) if _lnk is None and not lnk.closed: _lnk = Attachment(lnk) _lnk.closing = False dir.init_link(sst, lnk, _lnk) err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) if err: lnk.error = err lnk.closed = True return def linked(): lnk.linked = True def resolved(type, subtype): dir.do_link(sst, lnk, _lnk, type, subtype, linked) self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) self._attachments[lnk] = _lnk if lnk.linked and lnk.closing and not lnk.closed: if not _lnk.closing: def unlinked(): dir.del_link(sst, lnk, _lnk) del self._attachments[lnk] lnk.closed = True if _lnk.options.get("delete") in ("always", dir.DIR_NAME): dir.do_unlink(sst, lnk, _lnk) self.delete(sst, _lnk.name, unlinked) else: dir.do_unlink(sst, lnk, _lnk, unlinked) _lnk.closing = True elif not lnk.linked and lnk.closing and not lnk.closed: if lnk.error: lnk.closed = True def parse_address(self, lnk, dir, addr): if addr is None: return MalformedAddress(text="%s is None" % dir.ADDR_NAME) else: try: lnk.name, lnk.subject, lnk.options = address.parse(addr) # XXX: subject if lnk.options is None: lnk.options = {} except address.LexError, e: return MalformedAddress(text=str(e)) except address.ParseError, e: return MalformedAddress(text=str(e)) def validate_options(self, lnk, dir): ctx = Context() err = dir.VALIDATOR.validate(lnk.options, ctx) if err: return InvalidOption(text="error in options: %s" % err) def resolve_declare(self, sst, lnk, dir, action): declare = lnk.options.get("create") in ("always", dir) assrt = lnk.options.get("assert") in ("always", dir) def do_resolved(type, subtype): err = None if type is None: if declare: err = self.declare(sst, lnk, action) else: err = NotFound(text="no such queue: %s" % lnk.name) else: if assrt: expected = lnk.options.get("node", {}).get("type") if expected and type != expected: err = AssertionFailed(text="expected %s, got %s" % (expected, type)) if err is None: action(type, subtype) if err: tgt = lnk.target tgt.error = err del self._attachments[tgt] tgt.closed = True return self.resolve(sst, lnk.name, do_resolved, force=declare) def resolve(self, sst, name, action, force=False): if not force: try: type, subtype = self.address_cache[name] action(type, subtype) return except KeyError: pass args = [] def do_result(r): args.append(r) def do_action(r): do_result(r) er, qr = args if er.not_found and not qr.queue: type, subtype = None, None elif qr.queue: type, subtype = "queue", None else: type, subtype = "topic", er.type if type is not None: self.address_cache[name] = (type, subtype) action(type, subtype) sst.write_query(ExchangeQuery(name), do_result) sst.write_query(QueueQuery(name), do_action) def declare(self, sst, lnk, action): name = lnk.name props = lnk.options.get("node", {}) durable = props.get("durable", DURABLE_DEFAULT) type = props.get("type", "queue") declare = props.get("x-declare", {}) if type == "topic": cmd = ExchangeDeclare(exchange=name, durable=durable) bindings = get_bindings(props, exchange=name) elif type == "queue": cmd = QueueDeclare(queue=name, durable=durable) bindings = get_bindings(props, queue=name) else: raise ValueError(type) sst.apply_overrides(cmd, declare) if type == "topic": if cmd.type is None: cmd.type = "topic" subtype = cmd.type else: subtype = None cmds = [cmd] cmds.extend(bindings) def declared(): self.address_cache[name] = (type, subtype) action(type, subtype) sst.write_cmds(cmds, declared) def delete(self, sst, name, action): def deleted(): del self.address_cache[name] action() def do_delete(type, subtype): if type == "topic": sst.write_cmd(ExchangeDelete(name), deleted) elif type == "queue": sst.write_cmd(QueueDelete(name), deleted) elif type is None: action() else: raise ValueError(type) self.resolve(sst, name, do_delete, force=True) def process(self, ssn): if ssn.closed or ssn.closing: return sst = self._attachments[ssn] while sst.outgoing_idx < len(ssn.outgoing): msg = ssn.outgoing[sst.outgoing_idx] snd = msg._sender # XXX: should check for sender error here _snd = self._attachments.get(snd) if _snd and snd.linked: self.send(snd, msg) sst.outgoing_idx += 1 else: break for snd in ssn.senders: # XXX: should included snd.acked in this if snd.synced >= snd.queued and sst.need_sync: sst.write_cmd(ExecutionSync(), sync_noop) for rcv in ssn.receivers: self.process_receiver(rcv) if ssn.acked: messages = ssn.acked[sst.acked_idx:] if messages: ids = RangedSet() disposed = [(DEFAULT_DISPOSITION, [])] acked = [] for m in messages: # XXX: we're ignoring acks that get lost when disconnected, # could we deal this via some message-id based purge? if m._transfer_id is None: acked.append(m) continue ids.add(m._transfer_id) if m._receiver._accept_mode is accept_mode.explicit: disp = m._disposition or DEFAULT_DISPOSITION last, msgs = disposed[-1] if disp.type is last.type and disp.options == last.options: msgs.append(m) else: disposed.append((disp, [m])) else: acked.append(m) for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) def ack_acker(msgs): def ack_ack(): for m in msgs: ssn.acked.remove(m) sst.acked_idx -= 1 # XXX: should this check accept_mode too? if not ssn.transactional: sst.acked.remove(m) return ack_ack for disp, msgs in disposed: if not msgs: continue if disp.type is None: op = MessageAccept elif disp.type is RELEASED: op = MessageRelease elif disp.type is REJECTED: op = MessageReject sst.write_cmd(op(RangedSet(*[m._transfer_id for m in msgs]), **disp.options), ack_acker(msgs)) if log.isEnabledFor(DEBUG): for m in msgs: log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) sst.acked.extend(messages) sst.acked_idx += len(messages) ack_acker(acked)() if ssn.committing and not sst.committing: def commit_ok(): del sst.acked[:] ssn.committing = False ssn.committed = True ssn.aborting = False ssn.aborted = False sst.committing = False sst.write_cmd(TxCommit(), commit_ok) sst.committing = True if ssn.aborting and not sst.aborting: sst.aborting = True def do_rb(): messages = sst.acked + ssn.unacked + ssn.incoming ids = RangedSet(*[m._transfer_id for m in messages]) for range in ids: sst.executed.add_range(range) sst.write_op(SessionCompleted(sst.executed)) sst.write_cmd(MessageRelease(ids, True)) sst.write_cmd(TxRollback(), do_rb_ok) def do_rb_ok(): del ssn.incoming[:] del ssn.unacked[:] del sst.acked[:] for rcv in ssn.receivers: rcv.impending = rcv.received rcv.returned = rcv.received # XXX: do we need to update granted here as well? for rcv in ssn.receivers: self.process_receiver(rcv) ssn.aborting = False ssn.aborted = True ssn.committing = False ssn.committed = False sst.aborting = False for rcv in ssn.receivers: _rcv = self._attachments[rcv] sst.write_cmd(MessageStop(_rcv.destination)) sst.write_cmd(ExecutionSync(), do_rb) def grant(self, rcv): sst = self._attachments[rcv.session] _rcv = self._attachments.get(rcv) if _rcv is None or not rcv.linked or _rcv.closing or _rcv.draining: return if rcv.granted is UNLIMITED: if rcv.impending is UNLIMITED: delta = 0 else: delta = UNLIMITED elif rcv.impending is UNLIMITED: delta = -1 else: delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: if not _rcv.bytes_open: sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) _rcv.bytes_open = True sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: if not _rcv.bytes_open: sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) _rcv.bytes_open = True sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) rcv.impending += delta elif delta < 0 and not rcv.draining: _rcv.draining = True def do_stop(): rcv.impending = rcv.received _rcv.draining = False _rcv.bytes_open = False self.grant(rcv) sst.write_cmd(MessageStop(_rcv.destination), do_stop) if rcv.draining: _rcv.draining = True def do_flush(): rcv.impending = rcv.received rcv.granted = rcv.impending _rcv.draining = False _rcv.bytes_open = False rcv.draining = False sst.write_cmd(MessageFlush(_rcv.destination), do_flush) def process_receiver(self, rcv): if rcv.closed: return self.grant(rcv) def send(self, snd, msg): sst = self._attachments[snd.session] _snd = self._attachments[snd] if msg.subject is None or _snd._exchange == "": rk = _snd._routing_key else: rk = msg.subject if msg.subject is None: subject = _snd.subject else: subject = msg.subject # XXX: do we need to query to figure out how to create the reply-to interoperably? if msg.reply_to: rt = addr2reply_to(msg.reply_to) else: rt = None content_encoding = msg.properties.get("x-amqp-0-10.content-encoding") dp = DeliveryProperties(routing_key=rk) mp = MessageProperties(message_id=msg.id, user_id=msg.user_id, reply_to=rt, correlation_id=msg.correlation_id, app_id = msg.properties.get("x-amqp-0-10.app-id"), content_type=msg.content_type, content_encoding=content_encoding, application_headers=msg.properties) if subject is not None: if mp.application_headers is None: mp.application_headers = {} mp.application_headers[SUBJECT] = subject if msg.durable is not None: if msg.durable: dp.delivery_mode = delivery_mode.persistent else: dp.delivery_mode = delivery_mode.non_persistent if msg.priority is not None: dp.priority = msg.priority if msg.ttl is not None: dp.ttl = long(msg.ttl*1000) enc, dec = get_codec(msg.content_type) body = enc(msg.content) # XXX: this is not safe for out of order, can this be triggered by pre_ack? def msg_acked(): # XXX: should we log the ack somehow too? snd.acked += 1 m = snd.session.outgoing.pop(0) sst.outgoing_idx -= 1 log.debug("RACK[%s]: %s", sst.session.log_id, msg) assert msg == m xfr = MessageTransfer(destination=_snd._exchange, headers=(dp, mp), payload=body) if _snd.pre_ack: sst.write_cmd(xfr) else: sst.write_cmd(xfr, msg_acked, sync=msg._sync) log.debug("SENT[%s]: %s", sst.session.log_id, msg) if _snd.pre_ack: msg_acked() def do_message_transfer(self, xfr): sst = self.get_sst(xfr) ssn = sst.session msg = self._decode(xfr) rcv = sst.destinations[xfr.destination].target msg._receiver = rcv if rcv.impending is not UNLIMITED: assert rcv.received < rcv.impending, "%s, %s" % (rcv.received, rcv.impending) rcv.received += 1 log.debug("RCVD[%s]: %s", ssn.log_id, msg) ssn.incoming.append(msg) def _decode(self, xfr): dp = EMPTY_DP mp = EMPTY_MP for h in xfr.headers: if isinstance(h, DeliveryProperties): dp = h elif isinstance(h, MessageProperties): mp = h ap = mp.application_headers enc, dec = get_codec(mp.content_type) content = dec(xfr.payload) msg = Message(content) msg.id = mp.message_id if ap is not None: msg.subject = ap.get(SUBJECT) msg.user_id = mp.user_id if mp.reply_to is not None: msg.reply_to = reply_to2addr(mp.reply_to) msg.correlation_id = mp.correlation_id if dp.delivery_mode is not None: msg.durable = dp.delivery_mode == delivery_mode.persistent msg.priority = dp.priority if dp.ttl is not None: msg.ttl = dp.ttl/1000.0 msg.redelivered = dp.redelivered msg.properties = mp.application_headers or {} if mp.app_id is not None: msg.properties["x-amqp-0-10.app-id"] = mp.app_id if mp.content_encoding is not None: msg.properties["x-amqp-0-10.content-encoding"] = mp.content_encoding if dp.routing_key is not None: msg.properties["x-amqp-0-10.routing-key"] = dp.routing_key msg.content_type = mp.content_type msg._transfer_id = xfr.id return msg