diff options
| author | Ted Ross <tross@apache.org> | 2013-07-08 21:43:48 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-07-08 21:43:48 +0000 |
| commit | 344fd5a1565f4088b9099bd74c95b836b0bbffab (patch) | |
| tree | 3f3fcfb66844d720688a86bb5f365267b444eb2f /qpid/extras/dispatch/src | |
| parent | a0bb0a54b3d375ca6456e865350ecc8b27f07a42 (diff) | |
| download | qpid-python-344fd5a1565f4088b9099bd74c95b836b0bbffab.tar.gz | |
QPID-4968 - Added an IO adapter for python modules to send and receive messages
QPID-4967 - Integrated the python router into the main program
- Updated the log module: added the full complement of severity levels
- Added stub versions of the dispatch python adapters so the python components can be
tested in a standalone environment.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500977 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src')
| -rw-r--r-- | qpid/extras/dispatch/src/dispatch.c | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/log.c | 12 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/adapter.py | 19 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/binding.py | 12 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/data.py | 6 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/link.py | 25 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/mobile.py | 5 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/neighbor.py | 18 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/path.py | 11 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/router_engine.py | 87 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/router/routing.py | 11 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/stubs/__init__.py | 22 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/stubs/ioadapter.py | 27 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/py/stubs/logadapter.py | 33 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/python_embedded.c | 177 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 219 |
16 files changed, 557 insertions, 129 deletions
diff --git a/qpid/extras/dispatch/src/dispatch.c b/qpid/extras/dispatch/src/dispatch.c index 27c988818b..86398bb946 100644 --- a/qpid/extras/dispatch/src/dispatch.c +++ b/qpid/extras/dispatch/src/dispatch.c @@ -61,7 +61,7 @@ dx_dispatch_t *dx_dispatch(const char *config_path) DEQ_INIT(dx->config_listeners); DEQ_INIT(dx->config_connectors); - dx_python_initialize(); + dx_python_initialize(dx); dx_log_initialize(); dx_alloc_initialize(); diff --git a/qpid/extras/dispatch/src/log.c b/qpid/extras/dispatch/src/log.c index 9b1cb87e29..555d9ed7f3 100644 --- a/qpid/extras/dispatch/src/log.c +++ b/qpid/extras/dispatch/src/log.c @@ -51,12 +51,16 @@ static dx_log_list_t entries; static sys_mutex_t *log_lock = 0; -static char *cls_prefix(int cls) +static const char *cls_prefix(int cls) { switch (cls) { - case LOG_TRACE : return "TRACE"; - case LOG_ERROR : return "ERROR"; - case LOG_INFO : return "INFO"; + case LOG_TRACE : return "TRACE"; + case LOG_DEBUG : return "DEBUG"; + case LOG_INFO : return "INFO"; + case LOG_NOTICE : return "NOTICE"; + case LOG_WARNING : return "WARNING"; + case LOG_ERROR : return "ERROR"; + case LOG_CRITICAL : return "CRITICAL"; } return ""; diff --git a/qpid/extras/dispatch/src/py/router/adapter.py b/qpid/extras/dispatch/src/py/router/adapter.py index b4a77d487e..76c5a3ea48 100644 --- a/qpid/extras/dispatch/src/py/router/adapter.py +++ b/qpid/extras/dispatch/src/py/router/adapter.py @@ -17,13 +17,10 @@ # under the License. # -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * ENTRY_OLD = 1 ENTRY_CURRENT = 2 @@ -91,12 +88,12 @@ class AdapterEngine(object): # messages to be duplicated. It's better to have gaps in the routing # tables momentarily because unroutable messages are stored for retry. for a,b in to_delete: - self.container.adapter.remote_unbind(a, b) + self.container.router_adapter.remote_unbind(a, b) for a,b in to_add: - self.container.adapter.remote_bind(a, b) + self.container.router_adapter.remote_bind(a, b) - self.container.log(INFO, "New Routing Table (class=%s):" % key_class) + self.container.log(LOG_INFO, "New Routing Table (class=%s):" % key_class) for a,b in new_table: - self.container.log(INFO, " %s => %s" % (a, b)) + self.container.log(LOG_INFO, " %s => %s" % (a, b)) diff --git a/qpid/extras/dispatch/src/py/router/binding.py b/qpid/extras/dispatch/src/py/router/binding.py index 5bec0208c9..cd0641d12f 100644 --- a/qpid/extras/dispatch/src/py/router/binding.py +++ b/qpid/extras/dispatch/src/py/router/binding.py @@ -17,13 +17,11 @@ # under the License. # -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * + class BindingEngine(object): """ diff --git a/qpid/extras/dispatch/src/py/router/data.py b/qpid/extras/dispatch/src/py/router/data.py index 3fd938d30d..79d6a0d0fe 100644 --- a/qpid/extras/dispatch/src/py/router/data.py +++ b/qpid/extras/dispatch/src/py/router/data.py @@ -18,6 +18,12 @@ # +try: + from dispatch import * +except ImportError: + from stubs import * + + def getMandatory(data, key, cls=None): """ Get the value mapped to the requested key. If it's not present, raise an exception. diff --git a/qpid/extras/dispatch/src/py/router/link.py b/qpid/extras/dispatch/src/py/router/link.py index 38494e71b7..1e06d161f6 100644 --- a/qpid/extras/dispatch/src/py/router/link.py +++ b/qpid/extras/dispatch/src/py/router/link.py @@ -20,13 +20,10 @@ from data import MessageRA, MessageLSU, MessageLSR from time import time -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * class LinkStateEngine(object): """ @@ -57,9 +54,9 @@ class LinkStateEngine(object): if self.collection_changed: self.collection_changed = False - self.container.log(INFO, "New Link-State Collection:") + self.container.log(LOG_INFO, "New Link-State Collection:") for a,b in self.collection.items(): - self.container.log(INFO, " %s => %r" % (a, b.peers)) + self.container.log(LOG_INFO, " %s => %r" % (a, b.peers)) self.container.ls_collection_changed(self.collection) @@ -90,7 +87,7 @@ class LinkStateEngine(object): self.collection[msg.id] = ls self.collection_changed = True ls.last_seen = now - self.container.log(INFO, "Learned link-state from new router: %s" % msg.id) + self.container.log(LOG_INFO, "Learned link-state from new router: %s" % msg.id) # Schedule LSRs for any routers referenced in this LS that we don't know about for _id in msg.ls.peers: if _id not in self.collection: @@ -103,7 +100,7 @@ class LinkStateEngine(object): if self.id not in self.collection: return my_ls = self.collection[self.id] - self.container.send('_topo.%s.%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) + self.container.send('_topo/%s/%s' % (msg.area, msg.id), MessageLSU(None, self.id, self.area, my_ls.ls_seq, my_ls)) def new_local_link_state(self, link_state): @@ -127,12 +124,12 @@ class LinkStateEngine(object): for key in to_delete: ls = self.collection.pop(key) self.collection_changed = True - self.container.log(INFO, "Expired link-state from router: %s" % key) + self.container.log(LOG_INFO, "Expired link-state from router: %s" % key) def _send_lsrs(self): for (_area, _id) in self.needed_lsrs.keys(): - self.container.send('_topo.%s.%s' % (_area, _id), MessageLSR(None, self.id, self.area)) + self.container.send('_topo/%s/%s' % (_area, _id), MessageLSR(None, self.id, self.area)) self.needed_lsrs = {} @@ -140,4 +137,4 @@ class LinkStateEngine(object): ls_seq = 0 if self.id in self.collection: ls_seq = self.collection[self.id].ls_seq - self.container.send('_topo.%s.all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) + self.container.send('_topo/%s/all' % self.area, MessageRA(None, self.id, self.area, ls_seq, self.mobile_seq)) diff --git a/qpid/extras/dispatch/src/py/router/mobile.py b/qpid/extras/dispatch/src/py/router/mobile.py index 0dd53af649..acea759e37 100644 --- a/qpid/extras/dispatch/src/py/router/mobile.py +++ b/qpid/extras/dispatch/src/py/router/mobile.py @@ -19,6 +19,11 @@ from data import MessageRA, MessageMAR, MessageMAU +try: + from dispatch import * +except ImportError: + from stubs import * + class MobileAddressEngine(object): """ This module is responsible for maintaining an up-to-date list of mobile addresses in the domain. diff --git a/qpid/extras/dispatch/src/py/router/neighbor.py b/qpid/extras/dispatch/src/py/router/neighbor.py index ece9c07ef1..55c6bab62f 100644 --- a/qpid/extras/dispatch/src/py/router/neighbor.py +++ b/qpid/extras/dispatch/src/py/router/neighbor.py @@ -20,13 +20,11 @@ from data import LinkState, MessageHELLO from time import time -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * + class NeighborEngine(object): """ @@ -51,7 +49,7 @@ class NeighborEngine(object): if now - self.last_hello_time >= self.hello_interval: self.last_hello_time = now - self.container.send('_peer', MessageHELLO(None, self.id, self.area, self.hellos.keys())) + self.container.send('_local/qdxrouter', MessageHELLO(None, self.id, self.area, self.hellos.keys())) if self.link_state_changed: self.link_state_changed = False @@ -66,7 +64,7 @@ class NeighborEngine(object): if msg.is_seen(self.id): if self.link_state.add_peer(msg.id): self.link_state_changed = True - self.container.log(INFO, "New neighbor established: %s" % msg.id) + self.container.log(LOG_INFO, "New neighbor established: %s" % msg.id) ## ## TODO - Use this function to detect area boundaries ## @@ -80,6 +78,6 @@ class NeighborEngine(object): self.hellos.pop(key) if self.link_state.del_peer(key): self.link_state_changed = True - self.container.log(INFO, "Neighbor lost: %s" % key) + self.container.log(LOG_INFO, "Neighbor lost: %s" % key) diff --git a/qpid/extras/dispatch/src/py/router/path.py b/qpid/extras/dispatch/src/py/router/path.py index 018c967f7a..3be6c40608 100644 --- a/qpid/extras/dispatch/src/py/router/path.py +++ b/qpid/extras/dispatch/src/py/router/path.py @@ -17,13 +17,10 @@ # under the License. # -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * class PathEngine(object): """ diff --git a/qpid/extras/dispatch/src/py/router/router_engine.py b/qpid/extras/dispatch/src/py/router/router_engine.py index e5bf5517b5..065204ad62 100644 --- a/qpid/extras/dispatch/src/py/router/router_engine.py +++ b/qpid/extras/dispatch/src/py/router/router_engine.py @@ -30,39 +30,44 @@ from routing import RoutingTableEngine from binding import BindingEngine from adapter import AdapterEngine -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +## +## Import the Dispatch adapters from the environment. If they are not found +## (i.e. we are in a test bench, etc.), load the stub versions. +## +try: + from dispatch import * +except ImportError: + from stubs import * + class RouterEngine: """ """ - def __init__(self, adapter, domain, router_id=None, area='area', config_override={}): + def __init__(self, router_adapter, router_id=None, area='area', config_override={}): """ Initialize an instance of a router for a domain. """ ## ## Record important information about this router instance ## - self.adapter = adapter - self.domain = domain + self.domain = "domain" + self.router_adapter = router_adapter + self.log_adapter = LogAdapter("dispatch.router") + self.io_adapter = IoAdapter(self, "qdxrouter") + if router_id: self.id = router_id else: self.id = str(uuid4()) self.area = area - self.log(NOTICE, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) + self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s" % (self.area, self.id)) ## ## Setup configuration ## self.config = Configuration(config_override) - self.log(INFO, "Config: %r" % self.config) + self.log(LOG_INFO, "Config: %r" % self.config) ## ## Launch the sub-module engines @@ -75,13 +80,6 @@ class RouterEngine: self.binding_engine = BindingEngine(self) self.adapter_engine = AdapterEngine(self) - ## - ## Establish the local bindings so that this router instance can receive - ## traffic addressed to it - ## - self.adapter.local_bind('router') - self.adapter.local_bind('_topo/%s/%s' % (self.area, self.id)) - self.adapter.local_bind('_topo/%s/all' % self.area) ##======================================================================================== @@ -102,7 +100,7 @@ class RouterEngine: return self.mobile_address_engine.add_local_address(key) except Exception, e: - self.log(ERROR, "Exception in new-address processing: exception=%r" % e) + self.log(LOG_ERROR, "Exception in new-address processing: exception=%r" % e) def delLocalAddress(self, key): """ @@ -112,7 +110,7 @@ class RouterEngine: return self.mobile_address_engine.del_local_address(key) except Exception, e: - self.log(ERROR, "Exception in del-address processing: exception=%r" % e) + self.log(LOG_ERROR, "Exception in del-address processing: exception=%r" % e) def handleTimerTick(self): @@ -128,7 +126,7 @@ class RouterEngine: self.binding_engine.tick(now) self.adapter_engine.tick(now) except Exception, e: - self.log(ERROR, "Exception in timer processing: exception=%r" % e) + self.log(LOG_ERROR, "Exception in timer processing: exception=%r" % e) def handleControlMessage(self, opcode, body): @@ -138,38 +136,48 @@ class RouterEngine: now = time() if opcode == 'HELLO': msg = MessageHELLO(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.neighbor_engine.handle_hello(msg, now) elif opcode == 'RA': msg = MessageRA(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_ra(msg, now) self.mobile_address_engine.handle_ra(msg, now) elif opcode == 'LSU': msg = MessageLSU(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_lsu(msg, now) elif opcode == 'LSR': msg = MessageLSR(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.link_state_engine.handle_lsr(msg, now) elif opcode == 'MAU': msg = MessageMAU(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.mobile_address_engine.handle_mau(msg, now) elif opcode == 'MAR': msg = MessageMAR(body) - self.log(TRACE, "RCVD: %r" % msg) + self.log(LOG_TRACE, "RCVD: %r" % msg) self.mobile_address_engine.handle_mar(msg, now) except Exception, e: - self.log(ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + self.log(LOG_ERROR, "Exception in message processing: opcode=%s body=%r exception=%r" % (opcode, body, e)) + + def receive(self, message_properties, body): + """ + This is the IoAdapter message-receive handler + """ + try: + self.handleControlMessage(message_properties['opcode'], body) + except Exception, e: + self.log(LOG_ERROR, "Exception in raw message processing: properties=%r body=%r exception=%r" % + (message_properties, body, e)) def getRouterData(self, kind): """ @@ -196,51 +204,52 @@ class RouterEngine: ##======================================================================================== - ## Adapter Calls - outbound calls to the adapter + ## Adapter Calls - outbound calls to Dispatch ##======================================================================================== def log(self, level, text): """ Emit a log message to the host's event log """ - self.adapter.log(level, text) + self.log_adapter.log(level, text) def send(self, dest, msg): """ Send a control message to another router. """ - self.adapter.send(dest, msg.get_opcode(), msg.to_dict()) - self.log(TRACE, "SENT: %r dest=%s" % (msg, dest)) + app_props = {'opcode' : msg.get_opcode() } + self.io_adapter.send(dest, app_props, msg.to_dict()) + self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest)) ##======================================================================================== ## Interconnect between the Sub-Modules ##======================================================================================== def local_link_state_changed(self, link_state): - self.log(DEBUG, "Event: local_link_state_changed: %r" % link_state) + self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state) self.link_state_engine.new_local_link_state(link_state) def ls_collection_changed(self, collection): - self.log(DEBUG, "Event: ls_collection_changed: %r" % collection) + self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection) self.path_engine.ls_collection_changed(collection) def next_hops_changed(self, next_hop_table): - self.log(DEBUG, "Event: next_hops_changed: %r" % next_hop_table) + self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table) self.routing_table_engine.next_hops_changed(next_hop_table) self.binding_engine.next_hops_changed() def mobile_sequence_changed(self, mobile_seq): - self.log(DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) + self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq) self.link_state_engine.set_mobile_sequence(mobile_seq) def mobile_keys_changed(self, keys): - self.log(DEBUG, "Event: mobile_keys_changed: %r" % keys) + self.log(LOG_DEBUG, "Event: mobile_keys_changed: %r" % keys) self.binding_engine.mobile_keys_changed(keys) def get_next_hops(self): return self.routing_table_engine.get_next_hops() def remote_routes_changed(self, key_class, routes): - self.log(DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) + self.log(LOG_DEBUG, "Event: remote_routes_changed: class=%s routes=%r" % (key_class, routes)) self.adapter_engine.remote_routes_changed(key_class, routes) diff --git a/qpid/extras/dispatch/src/py/router/routing.py b/qpid/extras/dispatch/src/py/router/routing.py index 439da66653..0d4ceed955 100644 --- a/qpid/extras/dispatch/src/py/router/routing.py +++ b/qpid/extras/dispatch/src/py/router/routing.py @@ -17,13 +17,10 @@ # under the License. # -TRACE = 0 -DEBUG = 1 -INFO = 2 -NOTICE = 3 -WARNING = 4 -ERROR = 5 -CRITICAL = 6 +try: + from dispatch import * +except ImportError: + from stubs import * class RoutingTableEngine(object): """ diff --git a/qpid/extras/dispatch/src/py/stubs/__init__.py b/qpid/extras/dispatch/src/py/stubs/__init__.py new file mode 100644 index 0000000000..98180eadee --- /dev/null +++ b/qpid/extras/dispatch/src/py/stubs/__init__.py @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from stubs.logadapter import * +from stubs.ioadapter import * + diff --git a/qpid/extras/dispatch/src/py/stubs/ioadapter.py b/qpid/extras/dispatch/src/py/stubs/ioadapter.py new file mode 100644 index 0000000000..1e465f98c3 --- /dev/null +++ b/qpid/extras/dispatch/src/py/stubs/ioadapter.py @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +class IoAdapter: + def __init__(self, handler, address): + self.handler = handler + self.address = address + + def send(self, address, app_properties, body): + print "IO: send(addr=%s props=%r body=%r" % (address, app_properties, body) + diff --git a/qpid/extras/dispatch/src/py/stubs/logadapter.py b/qpid/extras/dispatch/src/py/stubs/logadapter.py new file mode 100644 index 0000000000..3d717d3ed2 --- /dev/null +++ b/qpid/extras/dispatch/src/py/stubs/logadapter.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +LOG_TRACE = 1 +LOG_DEBUG = 2 +LOG_INFO = 4 +LOG_NOTICE = 8 +LOG_WARNING = 16 +LOG_ERROR = 32 +LOG_CRITICAL = 64 + +class LogAdapter: + def __init__(self, mod_name): + self.mod_name = mod_name + + def log(self, level, text): + print "LOG: mod=%s level=%d text=%s" % (self.mod_name, level, text) diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c index 42a8221cea..d223c047e6 100644 --- a/qpid/extras/dispatch/src/python_embedded.c +++ b/qpid/extras/dispatch/src/python_embedded.c @@ -22,21 +22,25 @@ #include <qpid/dispatch/log.h> #include <qpid/dispatch/amqp.h> #include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/router.h> //=============================================================================== // Control Functions //=============================================================================== -static uint32_t ref_count = 0; -static sys_mutex_t *lock = 0; -static char *log_module = "PYTHON"; +static dx_dispatch_t *dispatch = 0; +static uint32_t ref_count = 0; +static sys_mutex_t *lock = 0; +static char *log_module = "PYTHON"; +static PyObject *dispatch_module = 0; static void dx_python_setup(); -void dx_python_initialize() +void dx_python_initialize(dx_dispatch_t *dx) { + dispatch = dx; lock = sys_mutex(); } @@ -66,6 +70,8 @@ void dx_python_stop() sys_mutex_lock(lock); ref_count--; if (ref_count == 0) { + Py_DECREF(dispatch_module); + dispatch_module = 0; Py_Finalize(); dx_log(log_module, LOG_TRACE, "Embedded Python Interpreter Shut Down"); } @@ -73,6 +79,13 @@ void dx_python_stop() } +PyObject *dx_python_module() +{ + assert(dispatch_module); + return dispatch_module; +} + + //=============================================================================== // Data Conversion Functions //=============================================================================== @@ -380,49 +393,159 @@ static PyTypeObject LogAdapterType = { // Message IO Object //=============================================================================== -typedef struct dx_python_io_adapter { - int x; -} dx_python_io_adapter; +typedef struct { + PyObject_HEAD + PyObject *handler; + dx_dispatch_t *dx; + dx_address_t *address; +} IoAdapter; + + +static void dx_io_rx_handler(void *context, dx_message_t *msg) +{ + //IoAdapter *self = (IoAdapter*) context; + + // TODO - Parse the incoming message and send it to the python handler. +} + + +static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) +{ + const char *address; + if (!PyArg_ParseTuple(args, "Os", &self->handler, &address)) + return -1; + + Py_INCREF(self->handler); + self->dx = dispatch; + self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self); + return 0; +} + + +static void IoAdapter_dealloc(IoAdapter* self) +{ + dx_router_unregister_address(self->address); + Py_DECREF(self->handler); + self->ob_type->tp_free((PyObject*)self); +} + + +static PyObject* dx_python_send(PyObject *self, PyObject *args) +{ + const char *address; + PyObject *app_properties; + PyObject *body; + if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body)) + return 0; -ALLOC_DECLARE(dx_python_io_adapter); -ALLOC_DEFINE(dx_python_io_adapter); + // TODO - Compose and send a message + + + Py_INCREF(Py_None); + return Py_None; +} -//static PyObject* dx_python_send(PyObject *self, PyObject *args) -//{ -// return 0; -//} + +static PyMethodDef IoAdapter_methods[] = { + {"send", dx_python_send, METH_VARARGS, "Send a Message"}, + {0, 0, 0, 0} +}; + + +static PyTypeObject IoAdapterType = { + PyObject_HEAD_INIT(0) + 0, /* ob_size*/ + "dispatch.IoAdapter", /* tp_name*/ + sizeof(IoAdapter), /* tp_basicsize*/ + 0, /* tp_itemsize*/ + (destructor)IoAdapter_dealloc, /* tp_dealloc*/ + 0, /* tp_print*/ + 0, /* tp_getattr*/ + 0, /* tp_setattr*/ + 0, /* tp_compare*/ + 0, /* tp_repr*/ + 0, /* tp_as_number*/ + 0, /* tp_as_sequence*/ + 0, /* tp_as_mapping*/ + 0, /* tp_hash */ + 0, /* tp_call*/ + 0, /* tp_str*/ + 0, /* tp_getattro*/ + 0, /* tp_setattro*/ + 0, /* tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /* tp_flags*/ + "Dispatch IO Adapter", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + IoAdapter_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)IoAdapter_init, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ + 0, /* tp_bases */ + 0, /* tp_mro */ + 0, /* tp_cache */ + 0, /* tp_subclasses */ + 0, /* tp_weaklist */ + 0, /* tp_del */ + 0 /* tp_version_tag */ +}; //=============================================================================== // Initialization of Modules and Types //=============================================================================== +static void dx_register_log_constant(PyObject *module, const char *name, uint32_t value) +{ + PyObject *const_object = PyInt_FromLong((long) value); + Py_INCREF(const_object); + PyModule_AddObject(module, name, const_object); +} + + static void dx_python_setup() { - // - // Add LogAdapter - // LogAdapterType.tp_new = PyType_GenericNew; - if (PyType_Ready(&LogAdapterType) < 0) { + IoAdapterType.tp_new = PyType_GenericNew; + if ((PyType_Ready(&LogAdapterType) < 0) || (PyType_Ready(&IoAdapterType) < 0)) { PyErr_Print(); - dx_log(log_module, LOG_ERROR, "Unable to initialize LogAdapter"); + dx_log(log_module, LOG_ERROR, "Unable to initialize Adapters"); assert(0); } else { PyObject *m = Py_InitModule3("dispatch", empty_methods, "Dispatch Adapter Module"); + // + // Add LogAdapter + // Py_INCREF(&LogAdapterType); PyModule_AddObject(m, "LogAdapter", (PyObject*) &LogAdapterType); - PyObject *LogTrace = PyInt_FromLong((long) LOG_TRACE); - Py_INCREF(LogTrace); - PyModule_AddObject(m, "LOG_TRACE", LogTrace); + dx_register_log_constant(m, "LOG_TRACE", LOG_TRACE); + dx_register_log_constant(m, "LOG_DEBUG", LOG_DEBUG); + dx_register_log_constant(m, "LOG_INFO", LOG_INFO); + dx_register_log_constant(m, "LOG_NOTICE", LOG_NOTICE); + dx_register_log_constant(m, "LOG_WARNING", LOG_WARNING); + dx_register_log_constant(m, "LOG_ERROR", LOG_ERROR); + dx_register_log_constant(m, "LOG_CRITICAL", LOG_CRITICAL); - PyObject *LogError = PyInt_FromLong((long) LOG_ERROR); - Py_INCREF(LogError); - PyModule_AddObject(m, "LOG_ERROR", LogError); + // + Py_INCREF(&IoAdapterType); + PyModule_AddObject(m, "IoAdapter", (PyObject*) &IoAdapterType); - PyObject *LogInfo = PyInt_FromLong((long) LOG_INFO); - Py_INCREF(LogInfo); - PyModule_AddObject(m, "LOG_INFO", LogInfo); + Py_INCREF(m); + dispatch_module = m; } } diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 010b008f93..d1e307e754 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -17,6 +17,7 @@ * under the License. */ +#include <qpid/dispatch/python_embedded.h> #include <stdio.h> #include <string.h> #include <qpid/dispatch.h> @@ -24,6 +25,9 @@ static char *module = "ROUTER"; +static void dx_router_python_setup(dx_router_t *router); +static void dx_pyrouter_tick(dx_router_t *router); + //static char *local_prefix = "_local/"; //static char *topo_prefix = "_topo/"; @@ -54,6 +58,8 @@ struct dx_router_t { dx_timer_t *timer; hash_t *out_hash; uint64_t dtag; + PyObject *pyRouter; + PyObject *pyTick; }; @@ -459,6 +465,8 @@ static void dx_router_timer_handler(void *context) // // Periodic processing. // + dx_pyrouter_tick(router); + dx_timer_schedule(router->timer, 1000); } @@ -498,10 +506,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router->router_id = id; router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - dx_timer_schedule(router->timer, 0); // Immediate router->out_hash = hash(10, 32, 0); - router->dtag = 1; + router->dtag = 1; + router->pyRouter = 0; // // Inform the field iterator module of this router's id and area. The field iterator @@ -509,6 +517,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) // dx_field_iterator_set_address(area, id); + // + // Set up the usage of the embedded python router module. + // + dx_python_start(); + dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); return router; @@ -517,6 +530,9 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) void dx_router_setup_agent(dx_dispatch_t *dx) { + dx_router_python_setup(dx->router); + dx_timer_schedule(dx->router->timer, 1000); + // TODO } @@ -526,6 +542,7 @@ void dx_router_free(dx_router_t *router) dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); sys_mutex_free(router->lock); free(router); + dx_python_stop(); } @@ -595,3 +612,201 @@ void dx_router_send(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? } + +//=============================================================================== +// Python Router Adapter +//=============================================================================== + +typedef struct { + PyObject_HEAD + dx_router_t *router; +} RouterAdapter; + + +static PyObject* dx_router_add_route(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + const char *addr; + const char *peer; + + if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_router_del_route(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + const char *addr; + const char *peer; + + if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyMethodDef RouterAdapter_methods[] = { + {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, + {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, + {0, 0, 0, 0} +}; + +static PyTypeObject RouterAdapterType = { + PyObject_HEAD_INIT(0) + 0, /* ob_size*/ + "dispatch.RouterAdapter", /* tp_name*/ + sizeof(RouterAdapter), /* tp_basicsize*/ + 0, /* tp_itemsize*/ + 0, /* tp_dealloc*/ + 0, /* tp_print*/ + 0, /* tp_getattr*/ + 0, /* tp_setattr*/ + 0, /* tp_compare*/ + 0, /* tp_repr*/ + 0, /* tp_as_number*/ + 0, /* tp_as_sequence*/ + 0, /* tp_as_mapping*/ + 0, /* tp_hash */ + 0, /* tp_call*/ + 0, /* tp_str*/ + 0, /* tp_getattro*/ + 0, /* tp_setattro*/ + 0, /* tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /* tp_flags*/ + "Dispatch Router Adapter", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + RouterAdapter_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ + 0, /* tp_bases */ + 0, /* tp_mro */ + 0, /* tp_cache */ + 0, /* tp_subclasses */ + 0, /* tp_weaklist */ + 0, /* tp_del */ + 0 /* tp_version_tag */ +}; + + +static void dx_router_python_setup(dx_router_t *router) +{ + PyObject *pDispatchModule = dx_python_module(); + + RouterAdapterType.tp_new = PyType_GenericNew; + if (PyType_Ready(&RouterAdapterType) < 0) { + PyErr_Print(); + dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter"); + return; + } + + Py_INCREF(&RouterAdapterType); + PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType); + + // + // Attempt to import the Python Router module + // + PyObject* pName; + PyObject* pId; + PyObject* pArea; + PyObject* pModule; + PyObject* pClass; + PyObject* pArgs; + + pName = PyString_FromString("router"); + pModule = PyImport_Import(pName); + Py_DECREF(pName); + if (!pModule) { + dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module"); + return; + } + + pClass = PyObject_GetAttrString(pModule, "RouterEngine"); + if (!pClass || !PyClass_Check(pClass)) { + dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module"); + return; + } + + PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter"); + PyObject *adapterInstance = PyObject_CallObject(adapterType, 0); + assert(adapterInstance); + + ((RouterAdapter*) adapterInstance)->router = router; + + // + // Constructor Arguments for RouterEngine + // + pArgs = PyTuple_New(3); + + // arg 0: adapter instance + PyTuple_SetItem(pArgs, 0, adapterInstance); + + // arg 1: router_id + pId = PyString_FromString(router->router_id); + PyTuple_SetItem(pArgs, 1, pId); + + // arg 2: area id + pArea = PyString_FromString(router->router_area); + PyTuple_SetItem(pArgs, 2, pArea); + + // + // Instantiate the router + // + router->pyRouter = PyInstance_New(pClass, pArgs, 0); + Py_DECREF(pArgs); + Py_DECREF(adapterType); + + if (!router->pyRouter) { + PyErr_Print(); + dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated"); + return; + } + + router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick"); + if (!router->pyTick || !PyCallable_Check(router->pyTick)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); + return; + } +} + + +static void dx_pyrouter_tick(dx_router_t *router) +{ + PyObject *pArgs; + PyObject *pValue; + + pArgs = PyTuple_New(0); + pValue = PyObject_CallObject(router->pyTick, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } +} + |
