summaryrefslogtreecommitdiff
path: root/python/qpid/messaging/endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging/endpoints.py')
-rw-r--r--python/qpid/messaging/endpoints.py1046
1 files changed, 0 insertions, 1046 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
deleted file mode 100644
index 338ac70ecf..0000000000
--- a/python/qpid/messaging/endpoints.py
+++ /dev/null
@@ -1,1046 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-A candidate high level messaging API for python.
-
-Areas that still need work:
-
- - definition of the arguments for L{Session.sender} and L{Session.receiver}
- - standard L{Message} properties
- - L{Message} content encoding
- - protocol negotiation/multiprotocol impl
-"""
-
-from logging import getLogger
-from math import ceil
-from qpid.codec010 import StringCodec
-from qpid.concurrency import synchronized, Waiter, Condition
-from qpid.datatypes import Serial, uuid4
-from qpid.messaging.constants import *
-from qpid.messaging.exceptions import *
-from qpid.messaging.message import *
-from qpid.ops import PRIMITIVE
-from qpid.util import default, URL
-from threading import Thread, RLock
-
-log = getLogger("qpid.messaging")
-
-static = staticmethod
-
-class Endpoint:
-
- def _ecwait(self, predicate, timeout=None):
- result = self._ewait(lambda: self.closed or predicate(), timeout)
- self.check_closed()
- return result
-
-class Connection(Endpoint):
-
- """
- A Connection manages a group of L{Sessions<Session>} and connects
- them with a remote endpoint.
- """
-
- @static
- def establish(url=None, **options):
- """
- Constructs a L{Connection} with the supplied parameters and opens
- it.
- """
- conn = Connection(url, **options)
- conn.open()
- return conn
-
- def __init__(self, url=None, **options):
- """
- Creates a connection. A newly created connection must be connected
- with the Connection.connect() method before it can be used.
-
- @type url: str
- @param url: [ <username> [ / <password> ] @ ] <host> [ : <port> ]
- @type host: str
- @param host: the name or ip address of the remote host (overriden by url)
- @type port: int
- @param port: the port number of the remote host (overriden by url)
- @type transport: str
- @param transport: one of tcp, tcp+tls, or ssl (alias for tcp+tls)
- @type heartbeat: int
- @param heartbeat: heartbeat interval in seconds
-
- @type username: str
- @param username: the username for authentication (overriden by url)
- @type password: str
- @param password: the password for authentication (overriden by url)
-
- @type sasl_mechanisms: str
- @param sasl_mechanisms: space separated list of permitted sasl mechanisms
- @type sasl_service: str
- @param sasl_service: ???
- @type sasl_min_ssf: ???
- @param sasl_min_ssf: ???
- @type sasl_max_ssf: ???
- @param sasl_max_ssf: ???
-
- @type reconnect: bool
- @param reconnect: enable/disable automatic reconnect
- @type reconnect_timeout: float
- @param reconnect_timeout: total time to attempt reconnect
- @type reconnect_internal_min: float
- @param reconnect_internal_min: minimum interval between reconnect attempts
- @type reconnect_internal_max: float
- @param reconnect_internal_max: maximum interval between reconnect attempts
- @type reconnect_internal: float
- @param reconnect_interval: set both min and max reconnect intervals
- @type reconnect_limit: int
- @param reconnect_limit: limit the total number of reconnect attempts
- @type reconnect_urls: list[str]
- @param reconnect_urls: list of backup hosts specified as urls
-
- @type address_ttl: float
- @param address_ttl: time until cached address resolution expires
-
- @rtype: Connection
- @return: a disconnected Connection
- """
- if url is None:
- url = options.get("host")
- if isinstance(url, basestring):
- url = URL(url)
- self.host = url.host
- if options.has_key("transport"):
- self.transport = options.get("transport")
- elif url.scheme == url.AMQP:
- self.transport = "tcp"
- elif url.scheme == url.AMQPS:
- self.transport = "ssl"
- else:
- self.transport = "tcp"
- if self.transport in ("ssl", "tcp+tls"):
- self.port = default(url.port, options.get("port", AMQPS_PORT))
- else:
- self.port = default(url.port, options.get("port", AMQP_PORT))
- self.heartbeat = options.get("heartbeat")
- self.username = default(url.user, options.get("username", None))
- self.password = default(url.password, options.get("password", None))
- self.auth_username = None
-
- self.sasl_mechanisms = options.get("sasl_mechanisms")
- self.sasl_service = options.get("sasl_service", "qpidd")
- self.sasl_min_ssf = options.get("sasl_min_ssf")
- self.sasl_max_ssf = options.get("sasl_max_ssf")
-
- self.reconnect = options.get("reconnect", False)
- self.reconnect_timeout = options.get("reconnect_timeout")
- reconnect_interval = options.get("reconnect_interval")
- self.reconnect_interval_min = options.get("reconnect_interval_min",
- default(reconnect_interval, 1))
- self.reconnect_interval_max = options.get("reconnect_interval_max",
- default(reconnect_interval, 2*60))
- self.reconnect_limit = options.get("reconnect_limit")
- self.reconnect_urls = options.get("reconnect_urls", [])
- self.reconnect_log = options.get("reconnect_log", True)
-
- self.address_ttl = options.get("address_ttl", 60)
- self.tcp_nodelay = options.get("tcp_nodelay", False)
-
- self.options = options
-
-
- self.id = str(uuid4())
- self.session_counter = 0
- self.sessions = {}
- self._open = False
- self._connected = False
- self._transport_connected = False
- self._lock = RLock()
- self._condition = Condition(self._lock)
- self._waiter = Waiter(self._condition)
- self._modcount = Serial(0)
- self.error = None
- from driver import Driver
- self._driver = Driver(self)
-
- def _wait(self, predicate, timeout=None):
- return self._waiter.wait(predicate, timeout=timeout)
-
- def _wakeup(self):
- self._modcount += 1
- self._driver.wakeup()
-
- def check_error(self):
- if self.error:
- self._condition.gc()
- raise self.error
-
- def get_error(self):
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self._wait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if not self._connected:
- self._condition.gc()
- raise ConnectionClosed()
-
- @synchronized
- def session(self, name=None, transactional=False):
- """
- Creates or retrieves the named session. If the name is omitted or
- None, then a unique name is chosen based on a randomly generated
- uuid.
-
- @type name: str
- @param name: the session name
- @rtype: Session
- @return: the named Session
- """
-
- if name is None:
- name = "%s:%s" % (self.id, self.session_counter)
- self.session_counter += 1
- else:
- name = "%s:%s" % (self.id, name)
-
- if self.sessions.has_key(name):
- return self.sessions[name]
- else:
- ssn = Session(self, name, transactional)
- self.sessions[name] = ssn
- self._wakeup()
- return ssn
-
- @synchronized
- def _remove_session(self, ssn):
- self.sessions.pop(ssn.name, 0)
-
- @synchronized
- def open(self):
- """
- Opens a connection.
- """
- if self._open:
- raise ConnectionError("already open")
- self._open = True
- self.attach()
-
- @synchronized
- def opened(self):
- """
- Return true if the connection is open, false otherwise.
- """
- return self._open
-
- @synchronized
- def attach(self):
- """
- Attach to the remote endpoint.
- """
- if not self._connected:
- self._connected = True
- self._driver.start()
- self._wakeup()
- self._ewait(lambda: self._transport_connected and not self._unlinked())
-
- def _unlinked(self):
- return [l
- for ssn in self.sessions.values()
- if not (ssn.error or ssn.closed)
- for l in ssn.senders + ssn.receivers
- if not (l.linked or l.error or l.closed)]
-
- @synchronized
- def detach(self, timeout=None):
- """
- Detach from the remote endpoint.
- """
- if self._connected:
- self._connected = False
- self._wakeup()
- cleanup = True
- else:
- cleanup = False
- try:
- if not self._wait(lambda: not self._transport_connected, timeout=timeout):
- raise Timeout("detach timed out")
- finally:
- if cleanup:
- self._driver.stop()
- self._condition.gc()
-
- @synchronized
- def attached(self):
- """
- Return true if the connection is attached, false otherwise.
- """
- return self._connected
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the connection and all sessions.
- """
- try:
- for ssn in self.sessions.values():
- ssn.close(timeout=timeout)
- finally:
- self.detach(timeout=timeout)
- self._open = False
-
-class Session(Endpoint):
-
- """
- Sessions provide a linear context for sending and receiving
- L{Messages<Message>}. L{Messages<Message>} are sent and received
- using the L{Sender.send} and L{Receiver.fetch} methods of the
- L{Sender} and L{Receiver} objects associated with a Session.
-
- Each L{Sender} and L{Receiver} is created by supplying either a
- target or source address to the L{sender} and L{receiver} methods of
- the Session. The address is supplied via a string syntax documented
- below.
-
- Addresses
- =========
-
- An address identifies a source or target for messages. In its
- simplest form this is just a name. In general a target address may
- also be used as a source address, however not all source addresses
- may be used as a target, e.g. a source might additionally have some
- filtering criteria that would not be present in a target.
-
- A subject may optionally be specified along with the name. When an
- address is used as a target, any subject specified in the address is
- used as the default subject of outgoing messages for that target.
- When an address is used as a source, any subject specified in the
- address is pattern matched against the subject of available messages
- as a filter for incoming messages from that source.
-
- The options map contains additional information about the address
- including:
-
- - policies for automatically creating, and deleting the node to
- which an address refers
-
- - policies for asserting facts about the node to which an address
- refers
-
- - extension points that can be used for sender/receiver
- configuration
-
- Mapping to AMQP 0-10
- --------------------
- The name is resolved to either an exchange or a queue by querying
- the broker.
-
- The subject is set as a property on the message. Additionally, if
- the name refers to an exchange, the routing key is set to the
- subject.
-
- Syntax
- ------
- The following regular expressions define the tokens used to parse
- addresses::
- LBRACE: \\{
- RBRACE: \\}
- LBRACK: \\[
- RBRACK: \\]
- COLON: :
- SEMI: ;
- SLASH: /
- COMMA: ,
- NUMBER: [+-]?[0-9]*\\.?[0-9]+
- ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
- STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
- ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
- SYM: [.#*%@$^!+-]
- WSPACE: [ \\n\\r\\t]+
-
- The formal grammar for addresses is given below::
- address = name [ "/" subject ] [ ";" options ]
- name = ( part | quoted )+
- subject = ( part | quoted | "/" )*
- quoted = STRING / ESC
- part = LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
- options = map
- map = "{" ( keyval ( "," keyval )* )? "}"
- keyval = ID ":" value
- value = NUMBER / STRING / ID / map / list
- list = "[" ( value ( "," value )* )? "]"
-
- This grammar resuls in the following informal syntax::
-
- <name> [ / <subject> ] [ ; <options> ]
-
- Where options is::
-
- { <key> : <value>, ... }
-
- And values may be:
- - numbers
- - single, double, or non quoted strings
- - maps (dictionaries)
- - lists
-
- Options
- -------
- The options map permits the following parameters::
-
- <name> [ / <subject> ] ; {
- create: always | sender | receiver | never,
- delete: always | sender | receiver | never,
- assert: always | sender | receiver | never,
- mode: browse | consume,
- node: {
- type: queue | topic,
- durable: True | False,
- x-declare: { ... <declare-overrides> ... },
- x-bindings: [<binding_1>, ... <binding_n>]
- },
- link: {
- name: <link-name>,
- durable: True | False,
- reliability: unreliable | at-most-once | at-least-once | exactly-once,
- x-declare: { ... <declare-overrides> ... },
- x-bindings: [<binding_1>, ... <binding_n>],
- x-subscribe: { ... <subscribe-overrides> ... }
- }
- }
-
- Bindings are specified as a map with the following options::
-
- {
- exchange: <exchange>,
- queue: <queue>,
- key: <key>,
- arguments: <arguments>
- }
-
- The create, delete, and assert policies specify who should perfom
- the associated action:
-
- - I{always}: the action will always be performed
- - I{sender}: the action will only be performed by the sender
- - I{receiver}: the action will only be performed by the receiver
- - I{never}: the action will never be performed (this is the default)
-
- The node-type is one of:
-
- - I{topic}: a topic node will default to the topic exchange,
- x-declare may be used to specify other exchange types
- - I{queue}: this is the default node-type
-
- The x-declare map permits protocol specific keys and values to be
- specified when exchanges or queues are declared. These keys and
- values are passed through when creating a node or asserting facts
- about an existing node.
-
- Examples
- --------
- A simple name resolves to any named node, usually a queue or a
- topic::
-
- my-queue-or-topic
-
- A simple name with a subject will also resolve to a node, but the
- presence of the subject will cause a sender using this address to
- set the subject on outgoing messages, and receivers to filter based
- on the subject::
-
- my-queue-or-topic/my-subject
-
- A subject pattern can be used and will cause filtering if used by
- the receiver. If used for a sender, the literal value gets set as
- the subject::
-
- my-queue-or-topic/my-*
-
- In all the above cases, the address is resolved to an existing node.
- If you want the node to be auto-created, then you can do the
- following. By default nonexistent nodes are assumed to be queues::
-
- my-queue; {create: always}
-
- You can customize the properties of the queue::
-
- my-queue; {create: always, node: {durable: True}}
-
- You can create a topic instead if you want::
-
- my-queue; {create: always, node: {type: topic}}
-
- You can assert that the address resolves to a node with particular
- properties::
-
- my-transient-topic; {
- assert: always,
- node: {
- type: topic,
- durable: False
- }
- }
- """
-
- def __init__(self, connection, name, transactional):
- self.connection = connection
- self.name = name
- self.log_id = "%x" % id(self)
-
- self.transactional = transactional
-
- self.committing = False
- self.committed = True
- self.aborting = False
- self.aborted = False
-
- self.next_sender_id = 0
- self.senders = []
- self.next_receiver_id = 0
- self.receivers = []
- self.outgoing = []
- self.incoming = []
- self.unacked = []
- self.acked = []
- # XXX: I hate this name.
- self.ack_capacity = UNLIMITED
-
- self.error = None
- self.closing = False
- self.closed = False
-
- self._lock = connection._lock
-
- def __repr__(self):
- return "<Session %s>" % self.name
-
- def _wait(self, predicate, timeout=None):
- return self.connection._wait(predicate, timeout=timeout)
-
- def _wakeup(self):
- self.connection._wakeup()
-
- def check_error(self):
- self.connection.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.connection.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.connection._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise SessionClosed()
-
- @synchronized
- def sender(self, target, **options):
- """
- Creates a L{Sender} that may be used to send L{Messages<Message>}
- to the specified target.
-
- @type target: str
- @param target: the target to which messages will be sent
- @rtype: Sender
- @return: a new Sender for the specified target
- """
- target = _mangle(target)
- sender = Sender(self, self.next_sender_id, target, options)
- self.next_sender_id += 1
- self.senders.append(sender)
- if not self.closed and self.connection._connected:
- self._wakeup()
- try:
- sender._ewait(lambda: sender.linked)
- except LinkError, e:
- sender.close()
- raise e
- return sender
-
- @synchronized
- def receiver(self, source, **options):
- """
- Creates a receiver that may be used to fetch L{Messages<Message>}
- from the specified source.
-
- @type source: str
- @param source: the source of L{Messages<Message>}
- @rtype: Receiver
- @return: a new Receiver for the specified source
- """
- source = _mangle(source)
- receiver = Receiver(self, self.next_receiver_id, source, options)
- self.next_receiver_id += 1
- self.receivers.append(receiver)
- if not self.closed and self.connection._connected:
- self._wakeup()
- try:
- receiver._ewait(lambda: receiver.linked)
- except LinkError, e:
- receiver.close()
- raise e
- return receiver
-
- @synchronized
- def _count(self, predicate):
- result = 0
- for msg in self.incoming:
- if predicate(msg):
- result += 1
- return result
-
- def _peek(self, receiver):
- for msg in self.incoming:
- if msg._receiver == receiver:
- return msg
-
- def _pop(self, receiver):
- i = 0
- while i < len(self.incoming):
- msg = self.incoming[i]
- if msg._receiver == receiver:
- del self.incoming[i]
- return msg
- else:
- i += 1
-
- @synchronized
- def _get(self, receiver, timeout=None):
- if self._ewait(lambda: ((self._peek(receiver) is not None) or
- self.closing or receiver.closed),
- timeout):
- msg = self._pop(receiver)
- if msg is not None:
- msg._receiver.returned += 1
- self.unacked.append(msg)
- log.debug("RETR[%s]: %s", self.log_id, msg)
- return msg
- return None
-
- @synchronized
- def next_receiver(self, timeout=None):
- if self._ecwait(lambda: self.incoming, timeout):
- return self.incoming[0]._receiver
- else:
- raise Empty
-
- @synchronized
- def acknowledge(self, message=None, disposition=None, sync=True):
- """
- Acknowledge the given L{Message}. If message is None, then all
- unacknowledged messages on the session are acknowledged.
-
- @type message: Message
- @param message: the message to acknowledge or None
- @type sync: boolean
- @param sync: if true then block until the message(s) are acknowledged
- """
- if message is None:
- messages = self.unacked[:]
- else:
- messages = [message]
-
- for m in messages:
- if self.ack_capacity is not UNLIMITED:
- if self.ack_capacity <= 0:
- # XXX: this is currently a SendError, maybe it should be a SessionError?
- raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
- self._wakeup()
- self._ecwait(lambda: len(self.acked) < self.ack_capacity)
- m._disposition = disposition
- self.unacked.remove(m)
- self.acked.append(m)
-
- self._wakeup()
- if sync:
- self._ecwait(lambda: not [m for m in messages if m in self.acked])
-
- @synchronized
- def commit(self):
- """
- Commit outstanding transactional work. This consists of all
- message sends and receives since the prior commit or rollback.
- """
- if not self.transactional:
- raise NontransactionalSession()
- self.committing = True
- self._wakeup()
- self._ecwait(lambda: not self.committing)
- if self.aborted:
- raise TransactionAborted()
- assert self.committed
-
- @synchronized
- def rollback(self):
- """
- Rollback outstanding transactional work. This consists of all
- message sends and receives since the prior commit or rollback.
- """
- if not self.transactional:
- raise NontransactionalSession()
- self.aborting = True
- self._wakeup()
- self._ecwait(lambda: not self.aborting)
- assert self.aborted
-
- @synchronized
- def sync(self, timeout=None):
- """
- Sync the session.
- """
- for snd in self.senders:
- snd.sync(timeout=timeout)
- if not self._ewait(lambda: not self.outgoing and not self.acked, timeout=timeout):
- raise Timeout("session sync timed out")
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the session.
- """
- self.sync(timeout=timeout)
-
- for link in self.receivers + self.senders:
- link.close(timeout=timeout)
-
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("session close timed out")
- finally:
- self.connection._remove_session(self)
-
-def _mangle(addr):
- if addr and addr.startswith("#"):
- return str(uuid4()) + addr
- else:
- return addr
-
-class Sender(Endpoint):
-
- """
- Sends outgoing messages.
- """
-
- def __init__(self, session, id, target, options):
- self.session = session
- self.id = id
- self.target = target
- self.options = options
- self.capacity = options.get("capacity", UNLIMITED)
- self.threshold = 0.5
- self.durable = options.get("durable")
- self.queued = Serial(0)
- self.synced = Serial(0)
- self.acked = Serial(0)
- self.error = None
- self.linked = False
- self.closing = False
- self.closed = False
- self._lock = self.session._lock
-
- def _wakeup(self):
- self.session._wakeup()
-
- def check_error(self):
- self.session.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.session.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.session._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise LinkClosed()
-
- @synchronized
- def unsettled(self):
- """
- Returns the number of messages awaiting acknowledgment.
- @rtype: int
- @return: the number of unacknowledged messages
- """
- return self.queued - self.acked
-
- @synchronized
- def available(self):
- if self.capacity is UNLIMITED:
- return UNLIMITED
- else:
- return self.capacity - self.unsettled()
-
- @synchronized
- def send(self, object, sync=True, timeout=None):
- """
- Send a message. If the object passed in is of type L{unicode},
- L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
- L{Message} and sent. If it is of type L{Message}, it will be sent
- directly. If the sender capacity is not L{UNLIMITED} then send
- will block until there is available capacity to send the message.
- If the timeout parameter is specified, then send will throw an
- L{InsufficientCapacity} exception if capacity does not become
- available within the specified time.
-
- @type object: unicode, str, list, dict, Message
- @param object: the message or content to send
-
- @type sync: boolean
- @param sync: if true then block until the message is sent
-
- @type timeout: float
- @param timeout: the time to wait for available capacity
- """
-
- if not self.session.connection._connected or self.session.closing:
- raise Detached()
-
- self._ecwait(lambda: self.linked)
-
- if isinstance(object, Message):
- message = object
- else:
- message = Message(object)
-
- if message.durable is None:
- message.durable = self.durable
-
- if self.capacity is not UNLIMITED:
- if self.capacity <= 0:
- raise InsufficientCapacity("capacity = %s" % self.capacity)
- if not self._ecwait(self.available, timeout=timeout):
- raise InsufficientCapacity("capacity = %s" % self.capacity)
-
- # XXX: what if we send the same message to multiple senders?
- message._sender = self
- if self.capacity is not UNLIMITED:
- message._sync = sync or self.available() <= int(ceil(self.threshold*self.capacity))
- else:
- message._sync = sync
- self.session.outgoing.append(message)
- self.queued += 1
-
- if sync:
- self.sync()
- assert message not in self.session.outgoing
- else:
- self._wakeup()
-
- @synchronized
- def sync(self, timeout=None):
- mno = self.queued
- if self.synced < mno:
- self.synced = mno
- self._wakeup()
- if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
- raise Timeout("sender sync timed out")
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the Sender.
- """
- # avoid erroring out when closing a sender that was never
- # established
- if self.acked < self.queued:
- self.sync(timeout=timeout)
-
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self.session._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("sender close timed out")
- finally:
- try:
- self.session.senders.remove(self)
- except ValueError:
- pass
-
-class Receiver(Endpoint, object):
-
- """
- Receives incoming messages from a remote source. Messages may be
- fetched with L{fetch}.
- """
-
- def __init__(self, session, id, source, options):
- self.session = session
- self.id = id
- self.source = source
- self.options = options
-
- self.granted = Serial(0)
- self.draining = False
- self.impending = Serial(0)
- self.received = Serial(0)
- self.returned = Serial(0)
-
- self.error = None
- self.linked = False
- self.closing = False
- self.closed = False
- self._lock = self.session._lock
- self._capacity = 0
- self._set_capacity(options.get("capacity", 0), False)
- self.threshold = 0.5
-
- @synchronized
- def _set_capacity(self, c, wakeup=True):
- if c is UNLIMITED:
- self._capacity = c.value
- else:
- self._capacity = c
- self._grant()
- if wakeup:
- self._wakeup()
-
- def _get_capacity(self):
- if self._capacity == UNLIMITED.value:
- return UNLIMITED
- else:
- return self._capacity
-
- capacity = property(_get_capacity, _set_capacity)
-
- def _wakeup(self):
- self.session._wakeup()
-
- def check_error(self):
- self.session.check_error()
- if self.error:
- raise self.error
-
- def get_error(self):
- err = self.session.get_error()
- if err:
- return err
- else:
- return self.error
-
- def _ewait(self, predicate, timeout=None):
- result = self.session._ewait(lambda: self.error or predicate(), timeout)
- self.check_error()
- return result
-
- def check_closed(self):
- if self.closed:
- raise LinkClosed()
-
- @synchronized
- def unsettled(self):
- """
- Returns the number of acknowledged messages awaiting confirmation.
- """
- return len([m for m in self.acked if m._receiver is self])
-
- @synchronized
- def available(self):
- """
- Returns the number of messages available to be fetched by the
- application.
-
- @rtype: int
- @return: the number of available messages
- """
- return self.received - self.returned
-
- @synchronized
- def fetch(self, timeout=None):
- """
- Fetch and return a single message. A timeout of None will block
- forever waiting for a message to arrive, a timeout of zero will
- return immediately if no messages are available.
-
- @type timeout: float
- @param timeout: the time to wait for a message to be available
- """
-
- self._ecwait(lambda: self.linked)
-
- if self._capacity == 0:
- self.granted = self.returned + 1
- self._wakeup()
- self._ecwait(lambda: self.impending >= self.granted)
- msg = self.session._get(self, timeout=timeout)
- if msg is None:
- self.check_closed()
- self.draining = True
- self._wakeup()
- self._ecwait(lambda: not self.draining)
- msg = self.session._get(self, timeout=0)
- self._grant()
- self._wakeup()
- if msg is None:
- raise Empty()
- elif self._capacity not in (0, UNLIMITED.value):
- t = int(ceil(self.threshold * self._capacity))
- if self.received - self.returned <= t:
- self.granted = self.returned + self._capacity
- self._wakeup()
- return msg
-
- def _grant(self):
- if self._capacity == UNLIMITED.value:
- self.granted = UNLIMITED
- else:
- self.granted = self.returned + self._capacity
-
- @synchronized
- def close(self, timeout=None):
- """
- Close the receiver.
- """
- if not self.closing:
- self.closing = True
- self._wakeup()
-
- try:
- if not self.session._ewait(lambda: self.closed, timeout=timeout):
- raise Timeout("receiver close timed out")
- finally:
- try:
- self.session.receivers.remove(self)
- except ValueError:
- pass
-
-__all__ = ["Connection", "Session", "Sender", "Receiver"]