summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/python_embedded.c
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
committerKim van der Riet <kpvdr@apache.org>2013-09-20 18:59:30 +0000
commitc70bf3ea28cdf6bafd8571690d3e5c466a0658a2 (patch)
tree68b24940e433f3f9c278b054d9ea1622389bd332 /qpid/extras/dispatch/src/python_embedded.c
parentfcdf1723c7b5cdf0772054a93edb6e7d97c4bb1e (diff)
downloadqpid-python-c70bf3ea28cdf6bafd8571690d3e5c466a0658a2.tar.gz
QPID-4984: WIP - Merge from trunk r.1525056
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/linearstore@1525101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/python_embedded.c')
-rw-r--r--qpid/extras/dispatch/src/python_embedded.c95
1 files changed, 84 insertions, 11 deletions
diff --git a/qpid/extras/dispatch/src/python_embedded.c b/qpid/extras/dispatch/src/python_embedded.c
index 6b5250a34a..0b0cc11025 100644
--- a/qpid/extras/dispatch/src/python_embedded.c
+++ b/qpid/extras/dispatch/src/python_embedded.c
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
typedef struct {
PyObject_HEAD
PyObject *handler;
+ PyObject *handler_rx_call;
dx_dispatch_t *dx;
dx_address_t *address;
} IoAdapter;
@@ -403,9 +404,65 @@ typedef struct {
static void dx_io_rx_handler(void *context, dx_message_t *msg)
{
- //IoAdapter *self = (IoAdapter*) context;
+ IoAdapter *self = (IoAdapter*) context;
+
+ //
+ // Parse the message through the body and exit if the message is not well formed.
+ //
+ if (!dx_message_check(msg, DX_DEPTH_BODY))
+ return;
+
+ //
+ // Get an iterator for the application-properties. Exit if the message has none.
+ //
+ dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+ if (ap == 0)
+ return;
+
+ //
+ // Try to get a map-view of the application-properties.
+ //
+ dx_parsed_field_t *ap_map = dx_parse(ap);
+ if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Get an iterator for the body. Exit if the message has none.
+ //
+ dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+ if (body == 0) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Try to get a map-view of the body.
+ //
+ dx_parsed_field_t *body_map = dx_parse(body);
+ if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+ dx_field_iterator_free(ap);
+ dx_field_iterator_free(body);
+ dx_parse_free(ap_map);
+ dx_parse_free(body_map);
+ return;
+ }
+
+ PyObject *pAP = dx_field_to_py(ap_map);
+ PyObject *pBody = dx_field_to_py(body_map);
- // TODO - Parse the incoming message and send it to the python handler.
+ PyObject *pArgs = PyTuple_New(2);
+ PyTuple_SetItem(pArgs, 0, pAP);
+ PyTuple_SetItem(pArgs, 1, pBody);
+
+ PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
@@ -415,9 +472,14 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
return -1;
+ self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+ if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+ return -1;
+
Py_INCREF(self->handler);
+ Py_INCREF(self->handler_rx_call);
self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+ self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
return 0;
}
@@ -426,24 +488,35 @@ static void IoAdapter_dealloc(IoAdapter* self)
{
dx_router_unregister_address(self->address);
Py_DECREF(self->handler);
+ Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
}
static PyObject* dx_python_send(PyObject *self, PyObject *args)
{
- IoAdapter *ioa = (IoAdapter*) self;
- const char *address;
- PyObject *app_properties;
- PyObject *body;
+ IoAdapter *ioa = (IoAdapter*) self;
+ dx_composed_field_t *field = 0;
+ const char *address;
+ PyObject *app_properties;
+ PyObject *body;
+
if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
return 0;
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+ field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+ dx_compose_start_map(field);
+
+ dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+ dx_compose_insert_string(field, "qdx.trace");
dx_compose_start_list(field);
- dx_compose_insert_bool(field, 0); // durable
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
+ dx_compose_end_map(field);
+
field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
dx_compose_start_list(field);
dx_compose_insert_null(field); // message-id