diff options
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r-- | python/qpid/messaging/endpoints.py | 1046 |
1 files changed, 0 insertions, 1046 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py deleted file mode 100644 index 338ac70ecf..0000000000 --- a/python/qpid/messaging/endpoints.py +++ /dev/null @@ -1,1046 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -""" -A candidate high level messaging API for python. - -Areas that still need work: - - - definition of the arguments for L{Session.sender} and L{Session.receiver} - - standard L{Message} properties - - L{Message} content encoding - - protocol negotiation/multiprotocol impl -""" - -from logging import getLogger -from math import ceil -from qpid.codec010 import StringCodec -from qpid.concurrency import synchronized, Waiter, Condition -from qpid.datatypes import Serial, uuid4 -from qpid.messaging.constants import * -from qpid.messaging.exceptions import * -from qpid.messaging.message import * -from qpid.ops import PRIMITIVE -from qpid.util import default, URL -from threading import Thread, RLock - -log = getLogger("qpid.messaging") - -static = staticmethod - -class Endpoint: - - def _ecwait(self, predicate, timeout=None): - result = self._ewait(lambda: self.closed or predicate(), timeout) - self.check_closed() - return result - -class Connection(Endpoint): - - """ - A Connection manages a group of L{Sessions<Session>} and connects - them with a remote endpoint. - """ - - @static - def establish(url=None, **options): - """ - Constructs a L{Connection} with the supplied parameters and opens - it. - """ - conn = Connection(url, **options) - conn.open() - return conn - - def __init__(self, url=None, **options): - """ - Creates a connection. A newly created connection must be connected - with the Connection.connect() method before it can be used. - - @type url: str - @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ] - @type host: str - @param host: the name or ip address of the remote host (overriden by url) - @type port: int - @param port: the port number of the remote host (overriden by url) - @type transport: str - @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls) - @type heartbeat: int - @param heartbeat: heartbeat interval in seconds - - @type username: str - @param username: the username for authentication (overriden by url) - @type password: str - @param password: the password for authentication (overriden by url) - - @type sasl_mechanisms: str - @param sasl_mechanisms: space separated list of permitted sasl mechanisms - @type sasl_service: str - @param sasl_service: ??? - @type sasl_min_ssf: ??? - @param sasl_min_ssf: ??? - @type sasl_max_ssf: ??? - @param sasl_max_ssf: ??? - - @type reconnect: bool - @param reconnect: enable/disable automatic reconnect - @type reconnect_timeout: float - @param reconnect_timeout: total time to attempt reconnect - @type reconnect_internal_min: float - @param reconnect_internal_min: minimum interval between reconnect attempts - @type reconnect_internal_max: float - @param reconnect_internal_max: maximum interval between reconnect attempts - @type reconnect_internal: float - @param reconnect_interval: set both min and max reconnect intervals - @type reconnect_limit: int - @param reconnect_limit: limit the total number of reconnect attempts - @type reconnect_urls: list[str] - @param reconnect_urls: list of backup hosts specified as urls - - @type address_ttl: float - @param address_ttl: time until cached address resolution expires - - @rtype: Connection - @return: a disconnected Connection - """ - if url is None: - url = options.get("host") - if isinstance(url, basestring): - url = URL(url) - self.host = url.host - if options.has_key("transport"): - self.transport = options.get("transport") - elif url.scheme == url.AMQP: - self.transport = "tcp" - elif url.scheme == url.AMQPS: - self.transport = "ssl" - else: - self.transport = "tcp" - if self.transport in ("ssl", "tcp+tls"): - self.port = default(url.port, options.get("port", AMQPS_PORT)) - else: - self.port = default(url.port, options.get("port", AMQP_PORT)) - self.heartbeat = options.get("heartbeat") - self.username = default(url.user, options.get("username", None)) - self.password = default(url.password, options.get("password", None)) - self.auth_username = None - - self.sasl_mechanisms = options.get("sasl_mechanisms") - self.sasl_service = options.get("sasl_service", "qpidd") - self.sasl_min_ssf = options.get("sasl_min_ssf") - self.sasl_max_ssf = options.get("sasl_max_ssf") - - self.reconnect = options.get("reconnect", False) - self.reconnect_timeout = options.get("reconnect_timeout") - reconnect_interval = options.get("reconnect_interval") - self.reconnect_interval_min = options.get("reconnect_interval_min", - default(reconnect_interval, 1)) - self.reconnect_interval_max = options.get("reconnect_interval_max", - default(reconnect_interval, 2*60)) - self.reconnect_limit = options.get("reconnect_limit") - self.reconnect_urls = options.get("reconnect_urls", []) - self.reconnect_log = options.get("reconnect_log", True) - - self.address_ttl = options.get("address_ttl", 60) - self.tcp_nodelay = options.get("tcp_nodelay", False) - - self.options = options - - - self.id = str(uuid4()) - self.session_counter = 0 - self.sessions = {} - self._open = False - self._connected = False - self._transport_connected = False - self._lock = RLock() - self._condition = Condition(self._lock) - self._waiter = Waiter(self._condition) - self._modcount = Serial(0) - self.error = None - from driver import Driver - self._driver = Driver(self) - - def _wait(self, predicate, timeout=None): - return self._waiter.wait(predicate, timeout=timeout) - - def _wakeup(self): - self._modcount += 1 - self._driver.wakeup() - - def check_error(self): - if self.error: - self._condition.gc() - raise self.error - - def get_error(self): - return self.error - - def _ewait(self, predicate, timeout=None): - result = self._wait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if not self._connected: - self._condition.gc() - raise ConnectionClosed() - - @synchronized - def session(self, name=None, transactional=False): - """ - Creates or retrieves the named session. If the name is omitted or - None, then a unique name is chosen based on a randomly generated - uuid. - - @type name: str - @param name: the session name - @rtype: Session - @return: the named Session - """ - - if name is None: - name = "%s:%s" % (self.id, self.session_counter) - self.session_counter += 1 - else: - name = "%s:%s" % (self.id, name) - - if self.sessions.has_key(name): - return self.sessions[name] - else: - ssn = Session(self, name, transactional) - self.sessions[name] = ssn - self._wakeup() - return ssn - - @synchronized - def _remove_session(self, ssn): - self.sessions.pop(ssn.name, 0) - - @synchronized - def open(self): - """ - Opens a connection. - """ - if self._open: - raise ConnectionError("already open") - self._open = True - self.attach() - - @synchronized - def opened(self): - """ - Return true if the connection is open, false otherwise. - """ - return self._open - - @synchronized - def attach(self): - """ - Attach to the remote endpoint. - """ - if not self._connected: - self._connected = True - self._driver.start() - self._wakeup() - self._ewait(lambda: self._transport_connected and not self._unlinked()) - - def _unlinked(self): - return [l - for ssn in self.sessions.values() - if not (ssn.error or ssn.closed) - for l in ssn.senders + ssn.receivers - if not (l.linked or l.error or l.closed)] - - @synchronized - def detach(self, timeout=None): - """ - Detach from the remote endpoint. - """ - if self._connected: - self._connected = False - self._wakeup() - cleanup = True - else: - cleanup = False - try: - if not self._wait(lambda: not self._transport_connected, timeout=timeout): - raise Timeout("detach timed out") - finally: - if cleanup: - self._driver.stop() - self._condition.gc() - - @synchronized - def attached(self): - """ - Return true if the connection is attached, false otherwise. - """ - return self._connected - - @synchronized - def close(self, timeout=None): - """ - Close the connection and all sessions. - """ - try: - for ssn in self.sessions.values(): - ssn.close(timeout=timeout) - finally: - self.detach(timeout=timeout) - self._open = False - -class Session(Endpoint): - - """ - Sessions provide a linear context for sending and receiving - L{Messages<Message>}. L{Messages<Message>} are sent and received - using the L{Sender.send} and L{Receiver.fetch} methods of the - L{Sender} and L{Receiver} objects associated with a Session. - - Each L{Sender} and L{Receiver} is created by supplying either a - target or source address to the L{sender} and L{receiver} methods of - the Session. The address is supplied via a string syntax documented - below. - - Addresses - ========= - - An address identifies a source or target for messages. In its - simplest form this is just a name. In general a target address may - also be used as a source address, however not all source addresses - may be used as a target, e.g. a source might additionally have some - filtering criteria that would not be present in a target. - - A subject may optionally be specified along with the name. When an - address is used as a target, any subject specified in the address is - used as the default subject of outgoing messages for that target. - When an address is used as a source, any subject specified in the - address is pattern matched against the subject of available messages - as a filter for incoming messages from that source. - - The options map contains additional information about the address - including: - - - policies for automatically creating, and deleting the node to - which an address refers - - - policies for asserting facts about the node to which an address - refers - - - extension points that can be used for sender/receiver - configuration - - Mapping to AMQP 0-10 - -------------------- - The name is resolved to either an exchange or a queue by querying - the broker. - - The subject is set as a property on the message. Additionally, if - the name refers to an exchange, the routing key is set to the - subject. - - Syntax - ------ - The following regular expressions define the tokens used to parse - addresses:: - LBRACE: \\{ - RBRACE: \\} - LBRACK: \\[ - RBRACK: \\] - COLON: : - SEMI: ; - SLASH: / - COMMA: , - NUMBER: [+-]?[0-9]*\\.?[0-9]+ - ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])? - STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\' - ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F] - SYM: [.#*%@$^!+-] - WSPACE: [ \\n\\r\\t]+ - - The formal grammar for addresses is given below:: - address = name [ "/" subject ] [ ";" options ] - name = ( part | quoted )+ - subject = ( part | quoted | "/" )* - quoted = STRING / ESC - part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM - options = map - map = "{" ( keyval ( "," keyval )* )? "}" - keyval = ID ":" value - value = NUMBER / STRING / ID / map / list - list = "[" ( value ( "," value )* )? "]" - - This grammar resuls in the following informal syntax:: - - <name> [ / <subject> ] [ ; <options> ] - - Where options is:: - - { <key> : <value>, ... } - - And values may be: - - numbers - - single, double, or non quoted strings - - maps (dictionaries) - - lists - - Options - ------- - The options map permits the following parameters:: - - <name> [ / <subject> ] ; { - create: always | sender | receiver | never, - delete: always | sender | receiver | never, - assert: always | sender | receiver | never, - mode: browse | consume, - node: { - type: queue | topic, - durable: True | False, - x-declare: { ... <declare-overrides> ... }, - x-bindings: [<binding_1>, ... <binding_n>] - }, - link: { - name: <link-name>, - durable: True | False, - reliability: unreliable | at-most-once | at-least-once | exactly-once, - x-declare: { ... <declare-overrides> ... }, - x-bindings: [<binding_1>, ... <binding_n>], - x-subscribe: { ... <subscribe-overrides> ... } - } - } - - Bindings are specified as a map with the following options:: - - { - exchange: <exchange>, - queue: <queue>, - key: <key>, - arguments: <arguments> - } - - The create, delete, and assert policies specify who should perfom - the associated action: - - - I{always}: the action will always be performed - - I{sender}: the action will only be performed by the sender - - I{receiver}: the action will only be performed by the receiver - - I{never}: the action will never be performed (this is the default) - - The node-type is one of: - - - I{topic}: a topic node will default to the topic exchange, - x-declare may be used to specify other exchange types - - I{queue}: this is the default node-type - - The x-declare map permits protocol specific keys and values to be - specified when exchanges or queues are declared. These keys and - values are passed through when creating a node or asserting facts - about an existing node. - - Examples - -------- - A simple name resolves to any named node, usually a queue or a - topic:: - - my-queue-or-topic - - A simple name with a subject will also resolve to a node, but the - presence of the subject will cause a sender using this address to - set the subject on outgoing messages, and receivers to filter based - on the subject:: - - my-queue-or-topic/my-subject - - A subject pattern can be used and will cause filtering if used by - the receiver. If used for a sender, the literal value gets set as - the subject:: - - my-queue-or-topic/my-* - - In all the above cases, the address is resolved to an existing node. - If you want the node to be auto-created, then you can do the - following. By default nonexistent nodes are assumed to be queues:: - - my-queue; {create: always} - - You can customize the properties of the queue:: - - my-queue; {create: always, node: {durable: True}} - - You can create a topic instead if you want:: - - my-queue; {create: always, node: {type: topic}} - - You can assert that the address resolves to a node with particular - properties:: - - my-transient-topic; { - assert: always, - node: { - type: topic, - durable: False - } - } - """ - - def __init__(self, connection, name, transactional): - self.connection = connection - self.name = name - self.log_id = "%x" % id(self) - - self.transactional = transactional - - self.committing = False - self.committed = True - self.aborting = False - self.aborted = False - - self.next_sender_id = 0 - self.senders = [] - self.next_receiver_id = 0 - self.receivers = [] - self.outgoing = [] - self.incoming = [] - self.unacked = [] - self.acked = [] - # XXX: I hate this name. - self.ack_capacity = UNLIMITED - - self.error = None - self.closing = False - self.closed = False - - self._lock = connection._lock - - def __repr__(self): - return "<Session %s>" % self.name - - def _wait(self, predicate, timeout=None): - return self.connection._wait(predicate, timeout=timeout) - - def _wakeup(self): - self.connection._wakeup() - - def check_error(self): - self.connection.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.connection.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.connection._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise SessionClosed() - - @synchronized - def sender(self, target, **options): - """ - Creates a L{Sender} that may be used to send L{Messages<Message>} - to the specified target. - - @type target: str - @param target: the target to which messages will be sent - @rtype: Sender - @return: a new Sender for the specified target - """ - target = _mangle(target) - sender = Sender(self, self.next_sender_id, target, options) - self.next_sender_id += 1 - self.senders.append(sender) - if not self.closed and self.connection._connected: - self._wakeup() - try: - sender._ewait(lambda: sender.linked) - except LinkError, e: - sender.close() - raise e - return sender - - @synchronized - def receiver(self, source, **options): - """ - Creates a receiver that may be used to fetch L{Messages<Message>} - from the specified source. - - @type source: str - @param source: the source of L{Messages<Message>} - @rtype: Receiver - @return: a new Receiver for the specified source - """ - source = _mangle(source) - receiver = Receiver(self, self.next_receiver_id, source, options) - self.next_receiver_id += 1 - self.receivers.append(receiver) - if not self.closed and self.connection._connected: - self._wakeup() - try: - receiver._ewait(lambda: receiver.linked) - except LinkError, e: - receiver.close() - raise e - return receiver - - @synchronized - def _count(self, predicate): - result = 0 - for msg in self.incoming: - if predicate(msg): - result += 1 - return result - - def _peek(self, receiver): - for msg in self.incoming: - if msg._receiver == receiver: - return msg - - def _pop(self, receiver): - i = 0 - while i < len(self.incoming): - msg = self.incoming[i] - if msg._receiver == receiver: - del self.incoming[i] - return msg - else: - i += 1 - - @synchronized - def _get(self, receiver, timeout=None): - if self._ewait(lambda: ((self._peek(receiver) is not None) or - self.closing or receiver.closed), - timeout): - msg = self._pop(receiver) - if msg is not None: - msg._receiver.returned += 1 - self.unacked.append(msg) - log.debug("RETR[%s]: %s", self.log_id, msg) - return msg - return None - - @synchronized - def next_receiver(self, timeout=None): - if self._ecwait(lambda: self.incoming, timeout): - return self.incoming[0]._receiver - else: - raise Empty - - @synchronized - def acknowledge(self, message=None, disposition=None, sync=True): - """ - Acknowledge the given L{Message}. If message is None, then all - unacknowledged messages on the session are acknowledged. - - @type message: Message - @param message: the message to acknowledge or None - @type sync: boolean - @param sync: if true then block until the message(s) are acknowledged - """ - if message is None: - messages = self.unacked[:] - else: - messages = [message] - - for m in messages: - if self.ack_capacity is not UNLIMITED: - if self.ack_capacity <= 0: - # XXX: this is currently a SendError, maybe it should be a SessionError? - raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity) - self._wakeup() - self._ecwait(lambda: len(self.acked) < self.ack_capacity) - m._disposition = disposition - self.unacked.remove(m) - self.acked.append(m) - - self._wakeup() - if sync: - self._ecwait(lambda: not [m for m in messages if m in self.acked]) - - @synchronized - def commit(self): - """ - Commit outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.committing = True - self._wakeup() - self._ecwait(lambda: not self.committing) - if self.aborted: - raise TransactionAborted() - assert self.committed - - @synchronized - def rollback(self): - """ - Rollback outstanding transactional work. This consists of all - message sends and receives since the prior commit or rollback. - """ - if not self.transactional: - raise NontransactionalSession() - self.aborting = True - self._wakeup() - self._ecwait(lambda: not self.aborting) - assert self.aborted - - @synchronized - def sync(self, timeout=None): - """ - Sync the session. - """ - for snd in self.senders: - snd.sync(timeout=timeout) - if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout): - raise Timeout("session sync timed out") - - @synchronized - def close(self, timeout=None): - """ - Close the session. - """ - self.sync(timeout=timeout) - - for link in self.receivers + self.senders: - link.close(timeout=timeout) - - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("session close timed out") - finally: - self.connection._remove_session(self) - -def _mangle(addr): - if addr and addr.startswith("#"): - return str(uuid4()) + addr - else: - return addr - -class Sender(Endpoint): - - """ - Sends outgoing messages. - """ - - def __init__(self, session, id, target, options): - self.session = session - self.id = id - self.target = target - self.options = options - self.capacity = options.get("capacity", UNLIMITED) - self.threshold = 0.5 - self.durable = options.get("durable") - self.queued = Serial(0) - self.synced = Serial(0) - self.acked = Serial(0) - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - - def _wakeup(self): - self.session._wakeup() - - def check_error(self): - self.session.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.session.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.session._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise LinkClosed() - - @synchronized - def unsettled(self): - """ - Returns the number of messages awaiting acknowledgment. - @rtype: int - @return: the number of unacknowledged messages - """ - return self.queued - self.acked - - @synchronized - def available(self): - if self.capacity is UNLIMITED: - return UNLIMITED - else: - return self.capacity - self.unsettled() - - @synchronized - def send(self, object, sync=True, timeout=None): - """ - Send a message. If the object passed in is of type L{unicode}, - L{str}, L{list}, or L{dict}, it will automatically be wrapped in a - L{Message} and sent. If it is of type L{Message}, it will be sent - directly. If the sender capacity is not L{UNLIMITED} then send - will block until there is available capacity to send the message. - If the timeout parameter is specified, then send will throw an - L{InsufficientCapacity} exception if capacity does not become - available within the specified time. - - @type object: unicode, str, list, dict, Message - @param object: the message or content to send - - @type sync: boolean - @param sync: if true then block until the message is sent - - @type timeout: float - @param timeout: the time to wait for available capacity - """ - - if not self.session.connection._connected or self.session.closing: - raise Detached() - - self._ecwait(lambda: self.linked) - - if isinstance(object, Message): - message = object - else: - message = Message(object) - - if message.durable is None: - message.durable = self.durable - - if self.capacity is not UNLIMITED: - if self.capacity <= 0: - raise InsufficientCapacity("capacity = %s" % self.capacity) - if not self._ecwait(self.available, timeout=timeout): - raise InsufficientCapacity("capacity = %s" % self.capacity) - - # XXX: what if we send the same message to multiple senders? - message._sender = self - if self.capacity is not UNLIMITED: - message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity)) - else: - message._sync = sync - self.session.outgoing.append(message) - self.queued += 1 - - if sync: - self.sync() - assert message not in self.session.outgoing - else: - self._wakeup() - - @synchronized - def sync(self, timeout=None): - mno = self.queued - if self.synced < mno: - self.synced = mno - self._wakeup() - if not self._ewait(lambda: self.acked >= mno, timeout=timeout): - raise Timeout("sender sync timed out") - - @synchronized - def close(self, timeout=None): - """ - Close the Sender. - """ - # avoid erroring out when closing a sender that was never - # established - if self.acked < self.queued: - self.sync(timeout=timeout) - - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self.session._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("sender close timed out") - finally: - try: - self.session.senders.remove(self) - except ValueError: - pass - -class Receiver(Endpoint, object): - - """ - Receives incoming messages from a remote source. Messages may be - fetched with L{fetch}. - """ - - def __init__(self, session, id, source, options): - self.session = session - self.id = id - self.source = source - self.options = options - - self.granted = Serial(0) - self.draining = False - self.impending = Serial(0) - self.received = Serial(0) - self.returned = Serial(0) - - self.error = None - self.linked = False - self.closing = False - self.closed = False - self._lock = self.session._lock - self._capacity = 0 - self._set_capacity(options.get("capacity", 0), False) - self.threshold = 0.5 - - @synchronized - def _set_capacity(self, c, wakeup=True): - if c is UNLIMITED: - self._capacity = c.value - else: - self._capacity = c - self._grant() - if wakeup: - self._wakeup() - - def _get_capacity(self): - if self._capacity == UNLIMITED.value: - return UNLIMITED - else: - return self._capacity - - capacity = property(_get_capacity, _set_capacity) - - def _wakeup(self): - self.session._wakeup() - - def check_error(self): - self.session.check_error() - if self.error: - raise self.error - - def get_error(self): - err = self.session.get_error() - if err: - return err - else: - return self.error - - def _ewait(self, predicate, timeout=None): - result = self.session._ewait(lambda: self.error or predicate(), timeout) - self.check_error() - return result - - def check_closed(self): - if self.closed: - raise LinkClosed() - - @synchronized - def unsettled(self): - """ - Returns the number of acknowledged messages awaiting confirmation. - """ - return len([m for m in self.acked if m._receiver is self]) - - @synchronized - def available(self): - """ - Returns the number of messages available to be fetched by the - application. - - @rtype: int - @return: the number of available messages - """ - return self.received - self.returned - - @synchronized - def fetch(self, timeout=None): - """ - Fetch and return a single message. A timeout of None will block - forever waiting for a message to arrive, a timeout of zero will - return immediately if no messages are available. - - @type timeout: float - @param timeout: the time to wait for a message to be available - """ - - self._ecwait(lambda: self.linked) - - if self._capacity == 0: - self.granted = self.returned + 1 - self._wakeup() - self._ecwait(lambda: self.impending >= self.granted) - msg = self.session._get(self, timeout=timeout) - if msg is None: - self.check_closed() - self.draining = True - self._wakeup() - self._ecwait(lambda: not self.draining) - msg = self.session._get(self, timeout=0) - self._grant() - self._wakeup() - if msg is None: - raise Empty() - elif self._capacity not in (0, UNLIMITED.value): - t = int(ceil(self.threshold * self._capacity)) - if self.received - self.returned <= t: - self.granted = self.returned + self._capacity - self._wakeup() - return msg - - def _grant(self): - if self._capacity == UNLIMITED.value: - self.granted = UNLIMITED - else: - self.granted = self.returned + self._capacity - - @synchronized - def close(self, timeout=None): - """ - Close the receiver. - """ - if not self.closing: - self.closing = True - self._wakeup() - - try: - if not self.session._ewait(lambda: self.closed, timeout=timeout): - raise Timeout("receiver close timed out") - finally: - try: - self.session.receivers.remove(self) - except ValueError: - pass - -__all__ = ["Connection", "Session", "Sender", "Receiver"] |