diff options
Diffstat (limited to 'python/qpid/management.py')
| -rw-r--r-- | python/qpid/management.py | 69 |
1 files changed, 35 insertions, 34 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py index e6ff5852f2..485b64b99f 100644 --- a/python/qpid/management.py +++ b/python/qpid/management.py @@ -162,8 +162,12 @@ class managementChannel: ssn.exchange_bind (exchange="amq.direct", queue=self.replyName, binding_key=self.replyName) - ssn.message_subscribe (queue=self.topicName, destination="tdest") - ssn.message_subscribe (queue=self.replyName, destination="rdest") + ssn.message_subscribe (queue=self.topicName, destination="tdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) + ssn.message_subscribe (queue=self.replyName, destination="rdest", + accept_mode=ssn.accept_mode.none, + acquire_mode=ssn.acquire_mode.pre_acquired) ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) ssn.incoming ("rdest").listen (self.replyCb) @@ -202,9 +206,6 @@ class managementChannel: if self.enabled: self.qpidChannel.message_transfer (destination=exchange, message=msg) - def accept (self, msg): - self.qpidChannel.message_accept(RangedSet(msg.id)) - def message (self, body, routing_key="broker"): dp = self.qpidChannel.delivery_properties() dp.routing_key = routing_key @@ -349,28 +350,27 @@ class managementClient: def topicCb (self, ch, msg): """ Receive messages via the topic queue of a particular channel. """ codec = Codec (self.spec, msg.body) - hdr = self.checkHeader (codec) - if hdr == None: - raise ValueError ("outer header invalid"); + while True: + hdr = self.checkHeader (codec) + if hdr == None: + return - if hdr[0] == 'p': - self.handlePackageInd (ch, codec) - elif hdr[0] == 'q': - self.handleClassInd (ch, codec) - elif hdr[0] == 'h': - self.handleHeartbeat (ch, codec) - elif hdr[0] == 'e': - self.handleEvent (ch, codec) - else: - self.parse (ch, codec, hdr[0], hdr[1]) - ch.accept(msg) + if hdr[0] == 'p': + self.handlePackageInd (ch, codec) + elif hdr[0] == 'q': + self.handleClassInd (ch, codec) + elif hdr[0] == 'h': + self.handleHeartbeat (ch, codec) + elif hdr[0] == 'e': + self.handleEvent (ch, codec) + else: + self.parse (ch, codec, hdr[0], hdr[1]) def replyCb (self, ch, msg): """ Receive messages via the reply queue of a particular channel. """ codec = Codec (self.spec, msg.body) hdr = self.checkHeader (codec) if hdr == None: - ch.accept(msg) return if hdr[0] == 'm': @@ -385,7 +385,6 @@ class managementClient: self.handleClassInd (ch, codec) else: self.parse (ch, codec, hdr[0], hdr[1]) - ch.accept(msg) def exceptCb (self, ch, data): if self.closeCb != None: @@ -403,20 +402,22 @@ class managementClient: codec.write_uint32 (seq) def checkHeader (self, codec): - """ Check the header of a management message and extract the opcode and - class. """ - octet = chr (codec.read_uint8 ()) - if octet != 'A': - return None - octet = chr (codec.read_uint8 ()) - if octet != 'M': - return None - octet = chr (codec.read_uint8 ()) - if octet != '2': + """ Check the header of a management message and extract the opcode and class. """ + try: + octet = chr (codec.read_uint8 ()) + if octet != 'A': + return None + octet = chr (codec.read_uint8 ()) + if octet != 'M': + return None + octet = chr (codec.read_uint8 ()) + if octet != '2': + return None + opcode = chr (codec.read_uint8 ()) + seq = codec.read_uint32 () + return (opcode, seq) + except: return None - opcode = chr (codec.read_uint8 ()) - seq = codec.read_uint32 () - return (opcode, seq) def encodeValue (self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ |
