diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-01 20:05:22 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-03-01 20:05:22 +0000 |
| commit | 028e23133ff79007353a05c6ec9dc71098b79da5 (patch) | |
| tree | 3ce17ece84c3df7a5c8017e8c3ab99ed7a92d9b5 | |
| parent | 960d88355bba3a11c28c6be48bb6b14e4ba00d50 (diff) | |
| download | qpid-python-028e23133ff79007353a05c6ec9dc71098b79da5.tar.gz | |
QPID-2261: split error logging and debug tracing into separate loggers
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@917688 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | extras/qmf/src/py/qmf2/agent.py | 106 | ||||
| -rw-r--r-- | extras/qmf/src/py/qmf2/common.py | 225 | ||||
| -rw-r--r-- | extras/qmf/src/py/qmf2/console.py | 239 |
3 files changed, 188 insertions, 382 deletions
diff --git a/extras/qmf/src/py/qmf2/agent.py b/extras/qmf/src/py/qmf2/agent.py index 6d4b3ea46c..b39c8d44aa 100644 --- a/extras/qmf/src/py/qmf2/agent.py +++ b/extras/qmf/src/py/qmf2/agent.py @@ -17,10 +17,10 @@ # import sys -import logging import datetime import time import Queue +from logging import getLogger from threading import Thread, RLock, currentThread, Event from qpid.messaging import Connection, Message, Empty, SendError from uuid import uuid4 @@ -32,6 +32,8 @@ from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass, # running the agent notifier callback _callback_thread=None +log = getLogger("qmf") +trace = getLogger("qmf.agent") ##============================================================================== @@ -175,10 +177,10 @@ class Agent(Thread): @type timeout: float @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. """ - logging.debug("Destroying Agent %s" % self.name) + trace.debug("Destroying Agent %s" % self.name) if self._conn: self.remove_connection(timeout) - logging.debug("Agent Destroyed") + trace.debug("Agent Destroyed") def get_name(self): @@ -196,7 +198,7 @@ class Agent(Thread): " x-properties:" " {type:direct}}}", capacity=self._capacity) - logging.debug("my direct addr=%s" % self._direct_receiver.source) + trace.debug("my direct addr=%s" % self._direct_receiver.source) # for sending directly addressed messages. self._direct_sender = self._session.sender(str(self._address.get_node()) + @@ -205,7 +207,7 @@ class Agent(Thread): " {type:topic," " x-properties:" " {type:direct}}}") - logging.debug("my default direct send addr=%s" % self._direct_sender.target) + trace.debug("my default direct send addr=%s" % self._direct_sender.target) # for receiving "broadcast" messages from consoles default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#", @@ -215,7 +217,7 @@ class Agent(Thread): " node-properties:" " {type:topic}}", capacity=self._capacity) - logging.debug("console.ind addr=%s" % self._topic_receiver.source) + trace.debug("console.ind addr=%s" % self._topic_receiver.source) # for sending to topic subscribers ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND, @@ -224,7 +226,7 @@ class Agent(Thread): ";{create:always," " node-properties:" " {type:topic}}") - logging.debug("agent.ind addr=%s" % self._topic_sender.target) + trace.debug("agent.ind addr=%s" % self._topic_sender.target) self._running = True self.start() @@ -238,10 +240,10 @@ class Agent(Thread): if self.isAlive(): # kick my thread to wake it up self._wake_thread() - logging.debug("waiting for agent receiver thread to exit") + trace.debug("waiting for agent receiver thread to exit") self.join(timeout) if self.isAlive(): - logging.error( "Agent thread '%s' is hung..." % self.name) + log.error( "Agent thread '%s' is hung..." % self.name) self._direct_receiver.close() self._direct_receiver = None self._direct_sender.close() @@ -253,7 +255,7 @@ class Agent(Thread): self._session.close() self._session = None self._conn = None - logging.debug("agent connection removal complete") + trace.debug("agent connection removal complete") def register_object_class(self, schema): """ @@ -303,7 +305,7 @@ class Agent(Thread): "qmf.agent":self.name}, content=[qmfEvent.map_encode()]) # TRACE - # logging.error("!!! Agent %s sending Event (%s)" % + # log.error("!!! Agent %s sending Event (%s)" % # (self.name, str(msg))) self._topic_sender.send(msg) @@ -422,14 +424,14 @@ class Agent(Thread): # # Process inbound messages # - logging.debug("%s processing inbound messages..." % self.name) + trace.debug("%s processing inbound messages..." % self.name) for i in range(batch_limit): try: msg = self._topic_receiver.fetch(timeout=0) except Empty: break # TRACE - # logging.error("!!! Agent %s: msg on %s [%s]" % + # log.error("!!! Agent %s: msg on %s [%s]" % # (self.name, self._topic_receiver.source, msg)) self._dispatch(msg, _direct=False) @@ -439,7 +441,7 @@ class Agent(Thread): except Empty: break # TRACE - # logging.error("!!! Agent %s: msg on %s [%s]" % + # log.error("!!! Agent %s: msg on %s [%s]" % # (self.name, self._direct_receiver.source, msg)) self._dispatch(msg, _direct=True) @@ -448,7 +450,7 @@ class Agent(Thread): # now = datetime.datetime.utcnow() if now >= next_heartbeat: - logging.debug("%s sending heartbeat..." % self.name) + trace.debug("%s sending heartbeat..." % self.name) ind = Message(id=QMF_APP_ID, subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT, properties={"method":"indication", @@ -456,10 +458,10 @@ class Agent(Thread): "qmf.agent":self.name}, content=self._makeAgentInfoBody()) # TRACE - #logging.error("!!! Agent %s sending Heartbeat (%s)" % + #log.error("!!! Agent %s sending Heartbeat (%s)" % # (self.name, str(ind))) self._topic_sender.send(ind) - logging.debug("Agent Indication Sent") + trace.debug("Agent Indication Sent") next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval) # @@ -470,7 +472,7 @@ class Agent(Thread): now = datetime.datetime.utcnow() if (self._next_subscribe_event is None or now >= self._next_subscribe_event): - logging.debug("%s polling subscriptions..." % self.name) + trace.debug("%s polling subscriptions..." % self.name) self._next_subscribe_event = now + datetime.timedelta(seconds= self._max_duration) dead_ss = {} @@ -494,11 +496,11 @@ class Agent(Thread): # notify application of pending WorkItems # if self._work_q_put and self._notifier: - logging.debug("%s notifying application..." % self.name) + trace.debug("%s notifying application..." % self.name) # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() - logging.info("Calling agent notifier.indication") + trace.debug("Calling agent notifier.indication") self._notifier.indication() _callback_thread = None @@ -521,7 +523,7 @@ class Agent(Thread): timeout = timedelta_to_secs(next_timeout - now) if self._running and timeout > 0.0: - logging.debug("%s sleeping %s seconds..." % (self.name, + trace.debug("%s sleeping %s seconds..." % (self.name, timeout)) try: self._session.next_receiver(timeout=timeout) @@ -529,7 +531,7 @@ class Agent(Thread): pass - logging.debug("Shutting down Agent %s thread" % self.name) + trace.debug("Shutting down Agent %s thread" % self.name) # # Private: @@ -550,24 +552,24 @@ class Agent(Thread): try: reply_to = QmfAddress.from_string(str(reply_to)) except ValueError: - logging.error("Invalid reply-to address '%s'" % reply_to) + log.error("Invalid reply-to address '%s'" % reply_to) msg.subject = reply_to.get_subject() try: if reply_to.is_direct(): # TRACE - #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" % + #log.error("!!! Agent %s direct REPLY-To:%s (%s)" % # (self.name, str(reply_to), str(msg))) self._direct_sender.send(msg) else: # TRACE - # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" % + # log.error("!!! Agent %s topic REPLY-To:%s (%s)" % # (self.name, str(reply_to), str(msg))) self._topic_sender.send(msg) - logging.debug("reply msg sent to [%s]" % str(reply_to)) + trace.debug("reply msg sent to [%s]" % str(reply_to)) except SendError, e: - logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) + log.error("Failed to send reply msg '%s' (%s)" % (msg, str(e))) def _send_query_response(self, content_type, cid, reply_to, objects): """ @@ -612,12 +614,11 @@ class Agent(Thread): @param _direct: True if msg directly addressed to this agent. """ - # logging.debug( "Message received from Console! [%s]" % msg ) - # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) ) + trace.debug( "Message received from Console! [%s]" % msg ) opcode = msg.properties.get("qmf.opcode") if not opcode: - logging.warning("Ignoring unrecognized message '%s'" % msg) + log.warning("Ignoring unrecognized message '%s'" % msg) return version = 2 # @todo: fix me cmap = {}; props={} @@ -640,16 +641,16 @@ class Agent(Thread): self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct) elif opcode == OpCode.noop: self._noop_pending = False - logging.debug("No-op msg received.") + trace.debug("No-op msg received.") else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ): """ Process a received agent-locate message """ - logging.debug("_handleAgentLocateMsg") + trace.debug("_handleAgentLocateMsg") reply = False if props.get("method") == "request": @@ -676,21 +677,21 @@ class Agent(Thread): m.correlation_id = msg.correlation_id self._send_reply(m, msg.reply_to) else: - logging.debug("agent-locate msg not mine - no reply sent") + trace.debug("agent-locate msg not mine - no reply sent") def _handleQueryMsg(self, msg, cmap, props, version, _direct ): """ Handle received query message """ - logging.debug("_handleQueryMsg") + trace.debug("_handleQueryMsg") if "method" in props and props["method"] == "request": if cmap: try: query = QmfQuery.from_map(cmap) except TypeError: - logging.error("Invalid Query format: '%s'" % str(cmap)) + log.error("Invalid Query format: '%s'" % str(cmap)) return target = query.get_target() if target == QmfQuery.TARGET_PACKAGES: @@ -700,13 +701,13 @@ class Agent(Thread): elif target == QmfQuery.TARGET_SCHEMA: self._querySchemaReply( msg, query) elif target == QmfQuery.TARGET_AGENT: - logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!") + log.warning("!!! @todo: Query TARGET=AGENT TBD !!!") elif target == QmfQuery.TARGET_OBJECT_ID: self._queryDataReply(msg, query, _idOnly=True) elif target == QmfQuery.TARGET_OBJECT: self._queryDataReply(msg, query) else: - logging.warning("Unrecognized query target: '%s'" % str(target)) + log.warning("Unrecognized query target: '%s'" % str(target)) @@ -717,7 +718,7 @@ class Agent(Thread): if "method" in props and props["method"] == "request": mname = cmap.get(SchemaMethod.KEY_NAME) if not mname: - logging.warning("Invalid method call from '%s': no name" + log.warning("Invalid method call from '%s': no name" % msg.reply_to) return @@ -774,7 +775,7 @@ class Agent(Thread): try: query = QmfQuery.from_map(query_map) except TypeError: - logging.warning("Invalid query for subscription: %s" % + log.warning("Invalid query for subscription: %s" % str(query_map)) return @@ -788,7 +789,7 @@ class Agent(Thread): # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST, # msg.correlation_id, param)) # self._work_q_put = True - logging.error("External Subscription TBD") + log.error("External Subscription TBD") return # validate the query - only specific objects, or @@ -796,7 +797,7 @@ class Agent(Thread): if (query.get_target() != QmfQuery.TARGET_OBJECT or (query.get_selector() == QmfQuery.PREDICATE and query.get_predicate())): - logging.error("Subscriptions only support (wildcard) Object" + log.error("Subscriptions only support (wildcard) Object" " Queries.") err = QmfData.create( {"reason": "Unsupported Query type for subscription.", @@ -819,7 +820,7 @@ class Agent(Thread): elif duration < self._min_duration: duration = self._min_duration except: - logging.warning("Bad duration value: %s" % str(msg)) + log.warning("Bad duration value: %s" % str(msg)) duration = self._default_duration if interval is None: @@ -830,7 +831,7 @@ class Agent(Thread): if interval < self._min_interval: interval = self._min_interval except: - logging.warning("Bad interval value: %s" % str(msg)) + log.warning("Bad interval value: %s" % str(msg)) interval = self._default_interval ss = _SubscriptionState(msg.reply_to, @@ -867,7 +868,7 @@ class Agent(Thread): if props.get("method") == "request": sid = cmap.get("_subscription_id") if not sid: - logging.error("Invalid subscription refresh msg: %s" % + log.error("Invalid subscription refresh msg: %s" % str(msg)) return @@ -875,7 +876,7 @@ class Agent(Thread): try: ss = self._subscriptions.get(sid) if not ss: - logging.error("Ignoring unknown subscription: %s" % + log.error("Ignoring unknown subscription: %s" % str(sid)) return duration = cmap.get("_duration") @@ -887,7 +888,7 @@ class Agent(Thread): elif duration < self._min_duration: duration = self._min_duration except: - logging.error("Bad duration value: %s" % str(msg)) + log.error("Bad duration value: %s" % str(msg)) duration = None # use existing duration ss.resubscribe(datetime.datetime.utcnow(), duration) @@ -917,7 +918,7 @@ class Agent(Thread): if props.get("method") == "request": sid = cmap.get("_subscription_id") if not sid: - logging.warning("No subscription id supplied: %s" % msg) + log.warning("No subscription id supplied: %s" % msg) return self._lock.acquire() @@ -1100,7 +1101,7 @@ class Agent(Thread): response.append(obj.map_encode()) if response: - logging.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id)) + trace.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id)) self._send_query_response( ContentType.data, sub.correlation_id, sub.reply_to, @@ -1127,7 +1128,7 @@ class Agent(Thread): self._lock.acquire() try: if not self._noop_pending: - logging.debug("Sending noop to wake up [%s]" % self._address) + trace.debug("Sending noop to wake up [%s]" % self._address) msg = Message(id=QMF_APP_ID, subject=self.name, properties={"method":"indication", @@ -1137,7 +1138,7 @@ class Agent(Thread): self._direct_sender.send( msg, sync=True ) self._noop_pending = True except SendError, e: - logging.error(str(e)) + log.error(str(e)) finally: self._lock.release() @@ -1155,7 +1156,7 @@ class AgentExternal(Agent): super(AgentExternal, self).__init__(name, _domain, _notifier, _heartbeat_interval, _max_msg_size, _capacity) - logging.error("AgentExternal TBD") + log.error("AgentExternal TBD") @@ -1294,6 +1295,7 @@ class QmfAgentData(QmfData): if __name__ == '__main__': # static test cases - no message passing, just exercise API + import logging from common import (AgentName, SchemaProperty, qmfTypes, SchemaEventClass) logging.getLogger().setLevel(logging.INFO) diff --git a/extras/qmf/src/py/qmf2/common.py b/extras/qmf/src/py/qmf2/common.py index 548cebbf31..b3352b750c 100644 --- a/extras/qmf/src/py/qmf2/common.py +++ b/extras/qmf/src/py/qmf2/common.py @@ -620,208 +620,9 @@ class QmfEvent(QmfData): - - -#============================================================================== -#============================================================================== -#============================================================================== - - - - -class Arguments(object): - def __init__(self, map): - pass -# self.map = map -# self._by_hash = {} -# key_count = self.map.keyCount() -# a = 0 -# while a < key_count: -# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a)) -# a += 1 - - -# def __getitem__(self, key): -# return self._by_hash[key] - - -# def __setitem__(self, key, value): -# self._by_hash[key] = value -# self.set(key, value) - - -# def __iter__(self): -# return self._by_hash.__iter__ - - -# def __getattr__(self, name): -# if name in self._by_hash: -# return self._by_hash[name] -# return super.__getattr__(self, name) - - -# def __setattr__(self, name, value): -# # -# # ignore local data members -# # -# if (name[0] == '_' or -# name == 'map'): -# return super.__setattr__(self, name, value) - -# if name in self._by_hash: -# self._by_hash[name] = value -# return self.set(name, value) - -# return super.__setattr__(self, name, value) - - -# def by_key(self, key): -# val = self.map.byKey(key) -# vType = val.getType() -# if vType == TYPE_UINT8: return val.asUint() -# elif vType == TYPE_UINT16: return val.asUint() -# elif vType == TYPE_UINT32: return val.asUint() -# elif vType == TYPE_UINT64: return val.asUint64() -# elif vType == TYPE_SSTR: return val.asString() -# elif vType == TYPE_LSTR: return val.asString() -# elif vType == TYPE_ABSTIME: return val.asInt64() -# elif vType == TYPE_DELTATIME: return val.asUint64() -# elif vType == TYPE_REF: return ObjectId(val.asObjectId()) -# elif vType == TYPE_BOOL: return val.asBool() -# elif vType == TYPE_FLOAT: return val.asFloat() -# elif vType == TYPE_DOUBLE: return val.asDouble() -# elif vType == TYPE_UUID: return val.asUuid() -# elif vType == TYPE_INT8: return val.asInt() -# elif vType == TYPE_INT16: return val.asInt() -# elif vType == TYPE_INT32: return val.asInt() -# elif vType == TYPE_INT64: return val.asInt64() -# else: -# # when TYPE_MAP -# # when TYPE_OBJECT -# # when TYPE_LIST -# # when TYPE_ARRAY -# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType())) -# return None - - -# def set(self, key, value): -# val = self.map.byKey(key) -# vType = val.getType() -# if vType == TYPE_UINT8: return val.setUint(value) -# elif vType == TYPE_UINT16: return val.setUint(value) -# elif vType == TYPE_UINT32: return val.setUint(value) -# elif vType == TYPE_UINT64: return val.setUint64(value) -# elif vType == TYPE_SSTR: -# if value: -# return val.setString(value) -# else: -# return val.setString('') -# elif vType == TYPE_LSTR: -# if value: -# return val.setString(value) -# else: -# return val.setString('') -# elif vType == TYPE_ABSTIME: return val.setInt64(value) -# elif vType == TYPE_DELTATIME: return val.setUint64(value) -# elif vType == TYPE_REF: return val.setObjectId(value.impl) -# elif vType == TYPE_BOOL: return val.setBool(value) -# elif vType == TYPE_FLOAT: return val.setFloat(value) -# elif vType == TYPE_DOUBLE: return val.setDouble(value) -# elif vType == TYPE_UUID: return val.setUuid(value) -# elif vType == TYPE_INT8: return val.setInt(value) -# elif vType == TYPE_INT16: return val.setInt(value) -# elif vType == TYPE_INT32: return val.setInt(value) -# elif vType == TYPE_INT64: return val.setInt64(value) -# else: -# # when TYPE_MAP -# # when TYPE_OBJECT -# # when TYPE_LIST -# # when TYPE_ARRAY -# logging.error("Unsupported Type for Set? '%s'" % str(val.getType())) -# return None - - - -#class MethodResponse(object): -# def __init__(self, impl): -# pass -# self.impl = qmfengine.MethodResponse(impl) - - -# def status(self): -# return self.impl.getStatus() - - -# def exception(self): -# return self.impl.getException() - - -# def text(self): -# return exception().asString() - - -# def args(self): -# return Arguments(self.impl.getArgs()) - - -# def __getattr__(self, name): -# myArgs = self.args() -# return myArgs.__getattr__(name) - - -# def __setattr__(self, name, value): -# if name == 'impl': -# return super.__setattr__(self, name, value) - -# myArgs = self.args() -# return myArgs.__setattr__(name, value) - - - -# ##============================================================================== -# ## QUERY -# ##============================================================================== - - - -# def _doQuery(predicate, params ): -# """ -# Given the predicate from a query, and a map of named parameters, apply the predicate -# to the parameters, and return True or False. -# """ -# if type(predicate) != list or len(predicate) < 1: -# return False - -# elif opr == Query._LOGIC_AND: -# logging.debug("_doQuery() AND: [%s]" % predicate ) -# rc = False -# for exp in predicate[1:]: -# rc = _doQuery( exp, params ) -# if not rc: -# break -# return rc - -# elif opr == Query._LOGIC_OR: -# logging.debug("_doQuery() OR: [%s]" % predicate ) -# rc = False -# for exp in predicate[1:]: -# rc = _doQuery( exp, params ) -# if rc: -# break -# return rc - -# elif opr == Query._LOGIC_NOT: -# logging.debug("_doQuery() NOT: [%s]" % predicate ) -# if len(predicate) != 2: -# logging.warning("Malformed query not-expression received: '%s'" % predicate) -# return False -# return not _doQuery( predicate[1:], params ) - - - -# else: -# logging.warning("Unknown query operator received: '%s'" % opr) -# return False +##============================================================================== +## QUERY +##============================================================================== @@ -1095,7 +896,7 @@ class QmfQuery(_mapEncoder): raise TypeError("Query expects to evaluate QmfData types.") if not isinstance(pred, type([])): - log_query.warning("Invalid type for predicate expression: '%s'" % str(pred)) + log.warning("Invalid type for predicate expression: '%s'" % str(pred)) return False # empty predicate - match all??? @@ -1140,8 +941,8 @@ class QmfQuery(_mapEncoder): if oper == QmfQuery.EXISTS: if len(pred) != 2: - log_query.warning("Malformed query: 'exists' operator" - " - bad arguments '%s'" % str(pred)) + log.warning("Malformed query: 'exists' operator" + " - bad arguments '%s'" % str(pred)) return False ### Q: Should we assume "quote", or should it be explicit? ### "foo" or ["quote" "foo"] @@ -1150,7 +951,7 @@ class QmfQuery(_mapEncoder): try: arg = self._fetch_pred_arg(pred[1], qmfData) except AttributeError: - log_query.debug("query parameter not found: '%s'" % str(pred)) + log.warning("query parameter not found: '%s'" % str(pred)) return False v = qmfData.has_value(arg) log_query.debug("---> %s" % str(v)) @@ -1161,9 +962,9 @@ class QmfQuery(_mapEncoder): QmfQuery.LE, QmfQuery.GT, QmfQuery.GE, QmfQuery.RE_MATCH]: if len(pred) != 3: - log_query.warning("Malformed query: '%s' operator" - " - requires 2 arguments '%s'" % - (oper, str(pred))) + log.warning("Malformed query: '%s' operator" + " - requires 2 arguments '%s'" % + (oper, str(pred))) return False # @todo: support regular expression match log_query.debug("query evaluate binary op: [%s]" % str(pred)) @@ -1171,7 +972,7 @@ class QmfQuery(_mapEncoder): arg1 = self._fetch_pred_arg(pred[1], qmfData) arg2 = self._fetch_pred_arg(pred[2], qmfData) except AttributeError: - log_query.debug("query parameter not found: '%s'" % str(pred)) + log.warning("query parameter not found: '%s'" % str(pred)) return False log_query.debug("query evaluate %s: %s, %s" % (oper, str(arg1), str(arg2))) v = False @@ -1183,11 +984,11 @@ class QmfQuery(_mapEncoder): elif oper == QmfQuery.GT: v = arg1 > arg2 elif oper == QmfQuery.GE: v = arg1 >= arg2 except TypeError: - log_query.warning("query comparison failed: '%s'" % str(pred)) + log.warning("query comparison failed: '%s'" % str(pred)) log_query.debug("---> %s" % str(v)) return v - log_query.warning("Unrecognized query operator: [%s]" % str(pred[0])) + log.warning("Unrecognized query operator: [%s]" % str(pred[0])) return False def _fetch_pred_arg(self, arg, qmfData): diff --git a/extras/qmf/src/py/qmf2/console.py b/extras/qmf/src/py/qmf2/console.py index 5c4a2eac85..b62aa7342b 100644 --- a/extras/qmf/src/py/qmf2/console.py +++ b/extras/qmf/src/py/qmf2/console.py @@ -18,11 +18,11 @@ # import sys import os -import logging import platform import time import datetime import Queue +from logging import getLogger from threading import Thread, Event from threading import RLock from threading import currentThread @@ -41,6 +41,8 @@ from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType, _callback_thread=None +log = getLogger("qmf") +trace = getLogger("qmf.console") ##============================================================================== @@ -213,6 +215,7 @@ class _QueryMailbox(_AsyncMailbox): Process query response messages delivered to this mailbox. Invoked by Console Management thread only. """ + trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name) objects = reply.content if isinstance(objects, type([])): # convert from map to native types if needed @@ -253,7 +256,7 @@ class _QueryMailbox(_AsyncMailbox): self.result += objects if not "partial" in reply.properties: - # logging.error("QUERY COMPLETE for %s" % str(self.context)) + # log.error("QUERY COMPLETE for %s" % str(self.context)) wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) self.console._work_q_put = True @@ -262,8 +265,7 @@ class _QueryMailbox(_AsyncMailbox): def expire(self): - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) + trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name) # send along whatever (possibly none) has been received so far wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result) self.console._work_q.put(wi) @@ -291,6 +293,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): """ Process schema response messages. """ + trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id) done = False schemas = reply.content if schemas and isinstance(schemas, type([])): @@ -309,6 +312,7 @@ class _SchemaPrefetchMailbox(_AsyncMailbox): def expire(self): + trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id) self.destroy() @@ -332,10 +336,10 @@ class _MethodMailbox(_AsyncMailbox): Process method response messages delivered to this mailbox. Invoked by Console Management thread only. """ - + trace.debug("Delivering to method mailbox.") _map = reply.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") result = None else: error=_map.get(SchemaMethod.KEY_ERROR) @@ -358,8 +362,7 @@ class _MethodMailbox(_AsyncMailbox): The mailbox expired without receiving a reply. Invoked by the Console Management thread only. """ - logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" % - datetime.datetime.utcnow()) + trace.debug("Expiring method mailbox.") # send along an empty response wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None) self.console._work_q.put(wi) @@ -391,30 +394,30 @@ class _SubscriptionMailbox(_AsyncMailbox): def subscribe(self, query): agent = self.console.get_agent(self.agent_name) if not agent: - logging.warning("subscribed failed - unknown agent '%s'" % + log.warning("subscribed failed - unknown agent '%s'" % self.agent_name) return False try: - logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name) + trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name) agent._send_subscribe_req(query, self.get_address(), self.interval, self.duration) except SendError, e: - logging.error(str(e)) + log.error(str(e)) return False return True def resubscribe(self, duration): agent = self.console.get_agent(self.agent_name) if not agent: - logging.warning("resubscribed failed - unknown agent '%s'" % + log.warning("resubscribed failed - unknown agent '%s'" % self.agent_name) return False try: - logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name) + trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name) agent._send_resubscribe_req(self.get_address(), self.agent_subscription_id, duration) except SendError, e: - logging.error(str(e)) + log.error(str(e)) return False return True @@ -430,7 +433,7 @@ class _SubscriptionMailbox(_AsyncMailbox): try: e_map = QmfData.from_map(error) except TypeError: - logging.warning("Invalid QmfData map received: '%s'" + log.warning("Invalid QmfData map received: '%s'" % str(error)) e_map = QmfData.create({"error":"Unknown error"}) sp = SubscribeParams(None, None, None, e_map) @@ -456,12 +459,12 @@ class _SubscriptionMailbox(_AsyncMailbox): # else: data indication agent_name = msg.properties.get("qmf.agent") if not agent_name: - logging.warning("Ignoring data_ind - no agent name given: %s" % + log.warning("Ignoring data_ind - no agent name given: %s" % msg) return agent = self.console.get_agent(agent_name) if not agent: - logging.warning("Ignoring data_ind - unknown agent '%s'" % + log.warning("Ignoring data_ind - unknown agent '%s'" % agent_name) return @@ -625,7 +628,7 @@ class QmfConsoleData(QmfData): contents. """ if _reply_handle is not None: - logging.error(" ASYNC REFRESH TBD!!!") + log.error(" ASYNC REFRESH TBD!!!") return None assert self._agent @@ -677,28 +680,28 @@ class QmfConsoleData(QmfData): if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args - logging.debug("Sending method req to Agent (%s)" % time.time()) + trace.debug("Sending method req to Agent (%s)" % time.time()) try: self._agent._send_method_req(_map, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True - logging.debug("Waiting for response to method req (%s)" % _timeout) + trace.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: - logging.debug("Agent method req wait timed-out.") + trace.debug("Agent method req wait timed-out.") return None _map = replyMsg.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") return None error=_map.get(SchemaMethod.KEY_ERROR) @@ -751,7 +754,7 @@ class Agent(object): self._packages = {} # map of {package-name:[list of class-names], } for this agent self._subscriptions = [] # list of active standing subscriptions for this agent self._announce_timestamp = None # datetime when last announce received - logging.debug( "Created Agent with address: [%s]" % self._address ) + trace.debug( "Created Agent with address: [%s]" % self._address ) def get_name(self): @@ -768,7 +771,7 @@ class Agent(object): if correlation_id: msg.correlation_id = str(correlation_id) # TRACE - #logging.error("!!! Console %s sending to agent %s (%s)" % + #log.error("!!! Console %s sending to agent %s (%s)" % # (self._console._name, self._name, str(msg))) self._sender.send(msg) # return handle @@ -846,28 +849,28 @@ class Agent(object): if _in_args: _map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy() - logging.debug("Sending method req to Agent (%s)" % time.time()) + trace.debug("Sending method req to Agent (%s)" % time.time()) try: self._send_method_req(_map, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None if _reply_handle is not None: return True - logging.debug("Waiting for response to method req (%s)" % _timeout) + trace.debug("Waiting for response to method req (%s)" % _timeout) replyMsg = mbox.fetch(_timeout) mbox.destroy() if not replyMsg: - logging.debug("Agent method req wait timed-out.") + trace.debug("Agent method req wait timed-out.") return None _map = replyMsg.content if not _map or not isinstance(_map, type({})): - logging.error("Invalid method call reply message") + log.error("Invalid method call reply message") return None return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS), @@ -1076,10 +1079,10 @@ class Console(Thread): @type timeout: float @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. """ - logging.debug("Destroying Console...") + trace.debug("Destroying Console...") if self._conn: self.remove_connection(self._conn, timeout) - logging.debug("Console Destroyed") + trace.debug("Console Destroyed") def add_connection(self, conn): """ @@ -1103,7 +1106,7 @@ class Console(Thread): " x-properties:" " {type:direct}}}", capacity=1) - logging.debug("my direct addr=%s" % self._direct_recvr.source) + trace.debug("my direct addr=%s" % self._direct_recvr.source) self._direct_sender = self._session.sender(str(self._address.get_node()) + ";{create:always," @@ -1111,7 +1114,7 @@ class Console(Thread): " {type:topic," " x-properties:" " {type:direct}}}") - logging.debug("my direct sender=%s" % self._direct_sender.target) + trace.debug("my direct sender=%s" % self._direct_sender.target) # for receiving "broadcast" messages from agents default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#", @@ -1120,7 +1123,7 @@ class Console(Thread): ";{create:always," " node-properties:{type:topic}}", capacity=1) - logging.debug("default topic recv addr=%s" % self._topic_recvr.source) + trace.debug("default topic recv addr=%s" % self._topic_recvr.source) # for sending to topic subscribers @@ -1128,7 +1131,7 @@ class Console(Thread): self._topic_sender = self._session.sender(str(topic_addr) + ";{create:always," " node-properties:{type:topic}}") - logging.debug("default topic send addr=%s" % self._topic_sender.target) + trace.debug("default topic send addr=%s" % self._topic_sender.target) # # Now that receivers are created, fire off the receive thread... @@ -1150,17 +1153,17 @@ class Console(Thread): @param conn: connection previously added by add_connection() """ if self._conn and conn and conn != self._conn: - logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + log.error( "Attempt to delete unknown connection: %s" % str(conn)) # tell connection thread to shutdown self._operational = False if self.isAlive(): # kick my thread to wake it up self._wake_thread() - logging.debug("waiting for console receiver thread to exit") + trace.debug("waiting for console receiver thread to exit") self.join(timeout) if self.isAlive(): - logging.error( "Console thread '%s' is hung..." % self.getName() ) + log.error( "Console thread '%s' is hung..." % self.getName() ) self._direct_recvr.close() self._direct_sender.close() self._topic_recvr.close() @@ -1168,7 +1171,7 @@ class Console(Thread): self._session.close() self._session = None self._conn = None - logging.debug("console connection removal complete") + trace.debug("console connection removal complete") def get_address(self): @@ -1219,14 +1222,14 @@ class Console(Thread): content=query._predicate) msg.reply_to = str(self._address) msg.correlation_id = str(cid) - logging.debug("Sending Agent Locate (%s)" % time.time()) + trace.debug("Sending Agent Locate (%s)" % time.time()) # TRACE - #logging.error("!!! Console %s sending agent locate (%s)" % + #log.error("!!! Console %s sending agent locate (%s)" % # (self._name, str(msg))) try: self._topic_sender.send(msg) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None @@ -1234,10 +1237,10 @@ class Console(Thread): timeout = self._reply_timeout new_agent = None - logging.debug("Waiting for response to Agent Locate (%s)" % timeout) + trace.debug("Waiting for response to Agent Locate (%s)" % timeout) mbox.fetch(timeout) mbox.destroy() - logging.debug("Agent Locate wait ended (%s)" % time.time()) + trace.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: new_agent = self._agent_map.get(name) @@ -1288,10 +1291,10 @@ class Console(Thread): cid = mbox.get_address() try: - logging.debug("Sending Query to Agent (%s)" % time.time()) + trace.debug("Sending Query to Agent (%s)" % time.time()) agent._send_query(query, cid) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() return None @@ -1302,7 +1305,7 @@ class Console(Thread): if not _timeout: _timeout = self._reply_timeout - logging.debug("Waiting for response to Query (%s)" % _timeout) + trace.debug("Waiting for response to Query (%s)" % _timeout) now = datetime.datetime.utcnow() expire = now + datetime.timedelta(seconds=_timeout) @@ -1311,7 +1314,7 @@ class Console(Thread): _timeout = timedelta_to_secs(expire - now) reply = mbox.fetch(_timeout) if not reply: - logging.debug("Query wait timed-out.") + trace.debug("Query wait timed-out.") break objects = reply.content @@ -1383,12 +1386,12 @@ class Console(Thread): mbox.destroy() return None - logging.debug("Waiting for response to subscription (%s)" % _timeout) + trace.debug("Waiting for response to subscription (%s)" % _timeout) # @todo: what if mbox expires here? sp = mbox.fetch(_timeout) if not sp: - logging.debug("Subscription request wait timed-out.") + trace.debug("Subscription request wait timed-out.") mbox.destroy() return None @@ -1405,7 +1408,7 @@ class Console(Thread): mbox = self._get_mailbox(subscription_id) if not mbox: - logging.warning("Subscription %s not found." % subscription_id) + log.warning("Subscription %s not found." % subscription_id) return None if isinstance(mbox, _AsyncSubscriptionMailbox): @@ -1418,11 +1421,11 @@ class Console(Thread): # wait for reply - logging.debug("Waiting for response to subscription (%s)" % _timeout) + trace.debug("Waiting for response to subscription (%s)" % _timeout) sp = mbox.fetch(_timeout) if not sp: - logging.debug("re-subscribe request wait timed-out.") + trace.debug("re-subscribe request wait timed-out.") # @todo???? mbox.destroy() return None @@ -1439,11 +1442,11 @@ class Console(Thread): agent = self.get_agent(mbox.agent_name) if agent: try: - logging.debug("Sending UnSubscribe to Agent (%s)" % time.time()) + trace.debug("Sending UnSubscribe to Agent (%s)" % time.time()) agent._send_unsubscribe_ind(subscription_id, mbox.agent_subscription_id) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() @@ -1453,7 +1456,7 @@ class Console(Thread): Make the console management thread loop wakeup from its next_receiver sleep. """ - logging.debug("Sending noop to wake up [%s]" % self._address) + trace.debug("Sending noop to wake up [%s]" % self._address) msg = Message(id=QMF_APP_ID, subject=self._name, properties={"method":"indication", @@ -1462,7 +1465,7 @@ class Console(Thread): try: self._direct_sender.send( msg, sync=True ) except SendError, e: - logging.error(str(e)) + log.error(str(e)) def run(self): @@ -1484,7 +1487,7 @@ class Console(Thread): except Empty: break # TRACE: - # logging.error("!!! Console %s: msg on %s [%s]" % + # log.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._topic_recvr.source, msg)) self._dispatch(msg, _direct=False) @@ -1494,7 +1497,7 @@ class Console(Thread): except Empty: break # TRACE - #logging.error("!!! Console %s: msg on %s [%s]" % + #log.error("!!! Console %s: msg on %s [%s]" % # (self._name, self._direct_recvr.source, msg)) self._dispatch(msg, _direct=True) @@ -1506,7 +1509,7 @@ class Console(Thread): # new stuff on work queue, kick the the application... self._work_q_put = False _callback_thread = currentThread() - logging.info("Calling console notifier.indication") + trace.debug("Calling console notifier.indication") self._notifier.indication() _callback_thread = None @@ -1531,12 +1534,12 @@ class Console(Thread): if self._operational and timeout > 0.0: try: - logging.debug("waiting for next rcvr (timeout=%s)..." % timeout) + trace.debug("waiting for next rcvr (timeout=%s)..." % timeout) self._session.next_receiver(timeout = timeout) except Empty: pass - logging.debug("Shutting down Console thread") + trace.debug("Shutting down Console thread") def get_objects(self, _object_id=None, @@ -1640,12 +1643,11 @@ class Console(Thread): """ PRIVATE: Process a message received from an Agent """ - #logging.debug( "Message received from Agent! [%s]" % msg ) - #logging.error( "Message received from Agent! [%s]" % msg ) + trace.debug( "Message received from Agent! [%s]" % msg ) opcode = msg.properties.get("qmf.opcode") if not opcode: - logging.error("Ignoring unrecognized message '%s'" % msg) + log.error("Ignoring unrecognized message '%s'" % msg) return version = 2 # @todo: fix me @@ -1673,9 +1675,9 @@ class Console(Thread): else: self._handle_indication_msg(msg, cmap, version, _direct) elif opcode == OpCode.noop: - logging.debug("No-op msg received.") + trace.debug("No-op msg received.") else: - logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) + log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode) def _handle_agent_ind_msg(self, msg, cmap, version, direct): @@ -1683,15 +1685,15 @@ class Console(Thread): Process a received agent-ind message. This message may be a response to a agent-locate, or it can be an unsolicited agent announce. """ - logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) + trace.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time())) ai_map = msg.content if not ai_map or not isinstance(ai_map, type({})): - logging.warning("Bad agent-ind message received: '%s'" % msg) + log.warning("Bad agent-ind message received: '%s'" % msg) return name = ai_map.get("_name") if not name: - logging.warning("Bad agent-ind message received: agent name missing" + log.warning("Bad agent-ind message received: agent name missing" " '%s'" % msg) return @@ -1725,48 +1727,48 @@ class Console(Thread): if matched: # unsolicited, but newly discovered - logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) + trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time())) wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent}) self._work_q.put(wi) self._work_q_put = True if correlated: # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_response_msg(self, msg, cmap, version, direct): """ Process a received data-ind message. """ - logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) + trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time())) mbox = self._get_mailbox(msg.correlation_id) if not mbox: - logging.warning("Response msg received with unknown correlation_id" + log.warning("Response msg received with unknown correlation_id" " msg='%s'" % str(msg)) return # wake up all waiters - logging.debug("waking waiters for correlation id %s" % msg.correlation_id) + trace.debug("waking waiters for correlation id %s" % msg.correlation_id) mbox.deliver(msg) def _handle_indication_msg(self, msg, cmap, version, _direct): aname = msg.properties.get("qmf.agent") if not aname: - logging.debug("No agent name field in indication message.") + trace.debug("No agent name field in indication message.") return content_type = msg.properties.get("qmf.content") if (content_type != ContentType.event or not isinstance(msg.content, type([]))): - logging.warning("Bad event indication message received: '%s'" % msg) + log.warning("Bad event indication message received: '%s'" % msg) return emap = msg.content[0] if not isinstance(emap, type({})): - logging.debug("Invalid event body in indication message: '%s'" % msg) + trace.debug("Invalid event body in indication message: '%s'" % msg) return agent = None @@ -1776,18 +1778,18 @@ class Console(Thread): finally: self._lock.release() if not agent: - logging.debug("Agent '%s' not known." % aname) + trace.debug("Agent '%s' not known." % aname) return try: # @todo: schema??? event = QmfEvent.from_map(emap) except TypeError: - logging.debug("Invalid QmfEvent map received: %s" % str(emap)) + trace.debug("Invalid QmfEvent map received: %s" % str(emap)) return # @todo: schema? Need to fetch it, but not from this thread! # This thread can not pend on a request. - logging.debug("Publishing event received from agent %s" % aname) + trace.debug("Publishing event received from agent %s" % aname) wi = WorkItem(WorkItem.EVENT_RECEIVED, None, {"agent":agent, "event":event}) @@ -1836,12 +1838,12 @@ class Console(Thread): next_expire_delta = lifetime_delta self._lock.acquire() try: - logging.debug("!!! expiring agents '%s'" % now) + trace.debug("!!! expiring agents '%s'" % now) for agent in self._agent_map.itervalues(): if agent._announce_timestamp: agent_deathtime = agent._announce_timestamp + lifetime_delta if agent_deathtime <= now: - logging.debug("AGENT_DELETED for %s" % agent) + trace.debug("AGENT_DELETED for %s" % agent) agent._announce_timestamp = None wi = WorkItem(WorkItem.AGENT_DELETED, None, {"agent":agent}) @@ -1853,7 +1855,7 @@ class Console(Thread): next_expire_delta = agent_deathtime - now self._next_agent_expire = now + next_expire_delta - logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) + trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire) finally: self._lock.release() @@ -1863,7 +1865,7 @@ class Console(Thread): """ Factory to create/retrieve an agent for this console """ - logging.debug("creating agent %s" % name) + trace.debug("creating agent %s" % name) self._lock.acquire() try: agent = self._agent_map.get(name) @@ -1879,9 +1881,9 @@ class Console(Thread): " x-properties:" " {type:direct}}}") except: - logging.warning("Unable to create sender for %s" % name) + log.warning("Unable to create sender for %s" % name) return None - logging.debug("created agent sender %s" % agent._sender.target) + trace.debug("created agent sender %s" % agent._sender.target) self._agent_map[name] = agent finally: @@ -1985,11 +1987,11 @@ class Console(Thread): if need_fetch: mbox = _SchemaPrefetchMailbox(self, schema_id) query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id) - logging.debug("Sending Schema Query to Agent (%s)" % time.time()) + trace.debug("Sending Schema Query to Agent (%s)" % time.time()) try: agent._send_query(query, mbox.get_address()) except SendError, e: - logging.error(str(e)) + log.error(str(e)) mbox.destroy() self._lock.acquire() try: @@ -2042,7 +2044,7 @@ class Console(Thread): try: mid = long(mid) except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) + log.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() @@ -2057,7 +2059,7 @@ class Console(Thread): try: mid = long(mid) except TypeError: - logging.error("Invalid mailbox id: %s" % str(mid)) + log.error("Invalid mailbox id: %s" % str(mid)) return None self._lock.acquire() @@ -2243,36 +2245,36 @@ class Console(Thread): # count += 1 # try: # if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: -# logging.debug("Console Event AGENT_ADDED received") +# trace.debug("Console Event AGENT_ADDED received") # if self._handler: # self._handler.agent_added(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: -# logging.debug("Console Event AGENT_DELETED received") +# trace.debug("Console Event AGENT_DELETED received") # if self._handler: # self._handler.agent_deleted(AgentProxy(self._event.agent, None)) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: -# logging.debug("Console Event NEW_PACKAGE received") +# trace.debug("Console Event NEW_PACKAGE received") # if self._handler: # self._handler.new_package(self._event.name) # elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: -# logging.debug("Console Event NEW_CLASS received") +# trace.debug("Console Event NEW_CLASS received") # if self._handler: # self._handler.new_class(SchemaClassKey(self._event.classKey)) # elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: -# logging.debug("Console Event OBJECT_UPDATE received") +# trace.debug("Console Event OBJECT_UPDATE received") # if self._handler: # self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), # self._event.hasProps, self._event.hasStats) # elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: -# logging.debug("Console Event EVENT_RECEIVED received") +# trace.debug("Console Event EVENT_RECEIVED received") # elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: -# logging.debug("Console Event AGENT_HEARTBEAT received") +# trace.debug("Console Event AGENT_HEARTBEAT received") # if self._handler: # self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) # elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: -# logging.debug("Console Event METHOD_RESPONSE received") +# trace.debug("Console Event METHOD_RESPONSE received") # else: -# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) # except e: # print "Exception caught in callback thread:", e # self.impl.popEvent() @@ -2301,17 +2303,17 @@ class Console(Thread): # def shutdown(self): -# logging.debug("broker.shutdown() called.") +# trace.debug("broker.shutdown() called.") # self.console.impl.delConnection(self.impl) # self.conn.del_conn_handler(self) # if self._session: # self.impl.sessionClosed() -# logging.debug("broker.shutdown() sessionClosed done.") +# trace.debug("broker.shutdown() sessionClosed done.") # self._session.destroy() -# logging.debug("broker.shutdown() session destroy done.") +# trace.debug("broker.shutdown() session destroy done.") # self._session = None # self._operational = False -# logging.debug("broker.shutdown() done.") +# trace.debug("broker.shutdown() done.") # def wait_for_stable(self, timeout = None): @@ -2344,24 +2346,24 @@ class Console(Thread): # while valid: # count += 1 # if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: -# logging.debug("Broker Event BROKER_INFO received"); +# trace.debug("Broker Event BROKER_INFO received"); # elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: -# logging.debug("Broker Event DECLARE_QUEUE received"); +# trace.debug("Broker Event DECLARE_QUEUE received"); # self.conn.impl.declareQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: -# logging.debug("Broker Event DELETE_QUEUE received"); +# trace.debug("Broker Event DELETE_QUEUE received"); # self.conn.impl.deleteQueue(self._session.handle, self._event.name) # elif self._event.kind == qmfengine.BrokerEvent.BIND: -# logging.debug("Broker Event BIND received"); +# trace.debug("Broker Event BIND received"); # self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.UNBIND: -# logging.debug("Broker Event UNBIND received"); +# trace.debug("Broker Event UNBIND received"); # self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) # elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: -# logging.debug("Broker Event SETUP_COMPLETE received"); +# trace.debug("Broker Event SETUP_COMPLETE received"); # self.impl.startProtocol() # elif self._event.kind == qmfengine.BrokerEvent.STABLE: -# logging.debug("Broker Event STABLE received"); +# trace.debug("Broker Event STABLE received"); # self._cv.acquire() # try: # self._stable = True @@ -2388,7 +2390,7 @@ class Console(Thread): # valid = self.impl.getXmtMessage(self._xmtMessage) # while valid: # count += 1 -# logging.debug("Broker: sending msg on connection") +# trace.debug("Broker: sending msg on connection") # self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) # self.impl.popXmt() # valid = self.impl.getXmtMessage(self._xmtMessage) @@ -2406,14 +2408,14 @@ class Console(Thread): # def conn_event_connected(self): -# logging.debug("Broker: Connection event CONNECTED") +# trace.debug("Broker: Connection event CONNECTED") # self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) # self.impl.sessionOpened(self._session.handle) # self._do_events() # def conn_event_disconnected(self, error): -# logging.debug("Broker: Connection event DISCONNECTED") +# trace.debug("Broker: Connection event DISCONNECTED") # pass @@ -2422,14 +2424,14 @@ class Console(Thread): # def sess_event_session_closed(self, context, error): -# logging.debug("Broker: Session event CLOSED") +# trace.debug("Broker: Session event CLOSED") # self.impl.sessionClosed() # def sess_event_recv(self, context, message): -# logging.debug("Broker: Session event MSG_RECV") +# trace.debug("Broker: Session event MSG_RECV") # if not self._operational: -# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) # self.impl.handleRcvMessage(message) # self._do_events() @@ -2447,6 +2449,7 @@ class Console(Thread): if __name__ == '__main__': # temp test code + import logging from common import (qmfTypes, SchemaProperty) logging.getLogger().setLevel(logging.INFO) |
