summaryrefslogtreecommitdiff
path: root/python/qpid/management.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/management.py')
-rw-r--r--python/qpid/management.py69
1 files changed, 35 insertions, 34 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py
index e6ff5852f2..485b64b99f 100644
--- a/python/qpid/management.py
+++ b/python/qpid/management.py
@@ -162,8 +162,12 @@ class managementChannel:
ssn.exchange_bind (exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
- ssn.message_subscribe (queue=self.topicName, destination="tdest")
- ssn.message_subscribe (queue=self.replyName, destination="rdest")
+ ssn.message_subscribe (queue=self.topicName, destination="tdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
+ ssn.message_subscribe (queue=self.replyName, destination="rdest",
+ accept_mode=ssn.accept_mode.none,
+ acquire_mode=ssn.acquire_mode.pre_acquired)
ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb)
ssn.incoming ("rdest").listen (self.replyCb)
@@ -202,9 +206,6 @@ class managementChannel:
if self.enabled:
self.qpidChannel.message_transfer (destination=exchange, message=msg)
- def accept (self, msg):
- self.qpidChannel.message_accept(RangedSet(msg.id))
-
def message (self, body, routing_key="broker"):
dp = self.qpidChannel.delivery_properties()
dp.routing_key = routing_key
@@ -349,28 +350,27 @@ class managementClient:
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
codec = Codec (self.spec, msg.body)
- hdr = self.checkHeader (codec)
- if hdr == None:
- raise ValueError ("outer header invalid");
+ while True:
+ hdr = self.checkHeader (codec)
+ if hdr == None:
+ return
- if hdr[0] == 'p':
- self.handlePackageInd (ch, codec)
- elif hdr[0] == 'q':
- self.handleClassInd (ch, codec)
- elif hdr[0] == 'h':
- self.handleHeartbeat (ch, codec)
- elif hdr[0] == 'e':
- self.handleEvent (ch, codec)
- else:
- self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
+ if hdr[0] == 'p':
+ self.handlePackageInd (ch, codec)
+ elif hdr[0] == 'q':
+ self.handleClassInd (ch, codec)
+ elif hdr[0] == 'h':
+ self.handleHeartbeat (ch, codec)
+ elif hdr[0] == 'e':
+ self.handleEvent (ch, codec)
+ else:
+ self.parse (ch, codec, hdr[0], hdr[1])
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
codec = Codec (self.spec, msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
- ch.accept(msg)
return
if hdr[0] == 'm':
@@ -385,7 +385,6 @@ class managementClient:
self.handleClassInd (ch, codec)
else:
self.parse (ch, codec, hdr[0], hdr[1])
- ch.accept(msg)
def exceptCb (self, ch, data):
if self.closeCb != None:
@@ -403,20 +402,22 @@ class managementClient:
codec.write_uint32 (seq)
def checkHeader (self, codec):
- """ Check the header of a management message and extract the opcode and
- class. """
- octet = chr (codec.read_uint8 ())
- if octet != 'A':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != 'M':
- return None
- octet = chr (codec.read_uint8 ())
- if octet != '2':
+ """ Check the header of a management message and extract the opcode and class. """
+ try:
+ octet = chr (codec.read_uint8 ())
+ if octet != 'A':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != 'M':
+ return None
+ octet = chr (codec.read_uint8 ())
+ if octet != '2':
+ return None
+ opcode = chr (codec.read_uint8 ())
+ seq = codec.read_uint32 ()
+ return (opcode, seq)
+ except:
return None
- opcode = chr (codec.read_uint8 ())
- seq = codec.read_uint32 ()
- return (opcode, seq)
def encodeValue (self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """