diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /extras | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras')
56 files changed, 8419 insertions, 5 deletions
diff --git a/extras/dispatch/CMakeLists.txt b/extras/dispatch/CMakeLists.txt new file mode 100644 index 0000000000..bc1812fb6b --- /dev/null +++ b/extras/dispatch/CMakeLists.txt @@ -0,0 +1,99 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License. +## + +cmake_minimum_required(VERSION 2.6) +include(CheckLibraryExists) +include(CheckSymbolExists) + +project(qpid-dispatch C) + +set (SO_VERSION_MAJOR 0) +set (SO_VERSION_MINOR 1) +set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}") + +if (NOT DEFINED LIB_SUFFIX) + get_property(LIB64 GLOBAL PROPERTY FIND_LIBRARY_USE_LIB64_PATHS) + if ("${LIB64}" STREQUAL "TRUE" AND ${CMAKE_SIZEOF_VOID_P} STREQUAL "8") + set(LIB_SUFFIX 64) + else() + set(LIB_SUFFIX "") + endif() +endif() + +set(INCLUDE_INSTALL_DIR include CACHE PATH "Include file directory") +set(LIB_INSTALL_DIR "lib${LIB_SUFFIX}" CACHE PATH "Library object file directory") +set(SYSCONF_INSTALL_DIR etc CACHE PATH "System read only configuration directory") +set(SHARE_INSTALL_DIR share CACHE PATH "Shared read only data directory") +set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory") + +include_directories( + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR}/src + ${proton_include} + ) + +## +## Find dependencies +## +find_library(proton_lib qpid-proton) +find_library(pthread_lib pthread) +find_library(rt_lib rt) +find_path(proton_include proton/driver.h) + +set(CMAKE_C_FLAGS "-pthread -Wall -Werror") +set(CATCH_UNDEFINED "-Wl,--no-undefined") + +## +## Build the Multi-Threaded Server Library +## +set(server_SOURCES + src/agent.c + src/alloc.c + src/auth.c + src/buffer.c + src/container.c + src/hash.c + src/iovec.c + src/iterator.c + src/log.c + src/message.c + src/posix/threading.c + src/router_node.c + src/server.c + src/timer.c + src/work_queue.c + ) + +add_library(qpid-dispatch SHARED ${server_SOURCES}) +target_link_libraries(qpid-dispatch ${proton_lib} ${pthread_lib} ${rt_lib}) +set_target_properties(qpid-dispatch PROPERTIES + VERSION "${SO_VERSION}" + SOVERSION "${SO_VERSION_MAJOR}" + LINK_FLAGS "${CATCH_UNDEFINED}" + ) +install(TARGETS qpid-dispatch + LIBRARY DESTINATION ${LIB_INSTALL_DIR}) +file(GLOB headers "include/qpid/dispatch/*.h") +install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/dispatch) + +## +## Build Tests +## +add_subdirectory(router) +add_subdirectory(tests) diff --git a/extras/dispatch/include/qpid/dispatch/agent.h b/extras/dispatch/include/qpid/dispatch/agent.h new file mode 100644 index 0000000000..d53d24d4d4 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/agent.h @@ -0,0 +1,108 @@ +#ifndef __dispatch_agent_h__ +#define __dispatch_agent_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stddef.h> +#include <stdbool.h> +#include <stdint.h> + +/** + * \defgroup Container Management Agent + * @{ + */ + +typedef struct dx_agent_class_t dx_agent_class_t; + + +/** + * \brief Get Schema Data Handler + * + * @param context The handler context supplied in dx_agent_register. + */ +typedef void (*dx_agent_schema_cb_t)(void* context); + + +/** + * \brief Query Handler + * + * @param context The handler context supplied in dx_agent_register. + * @param id The identifier of the instance being queried or NULL for all instances. + * @param correlator The correlation handle to be used in calls to dx_agent_value_* + */ +typedef void (*dx_agent_query_cb_t)(void* context, const char *id, const void *correlator); + + +/** + * \brief Initialize the agent module and prepare it for operation. + * + */ +void dx_agent_initialize(); + + +/** + * \brief Finalize the agent after it has stopped running. + */ +void dx_agent_finalize(void); + + +/** + * \brief Register a class/object-type with the agent. + */ +dx_agent_class_t *dx_agent_register_class(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler); + +/** + * \brief Register an event-type with the agent. + */ +dx_agent_class_t *dx_agent_register_event(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler); + +/** + * + */ +void dx_agent_value_string(const void *correlator, const char *key, const char *value); +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value); +void dx_agent_value_null(const void *correlator, const char *key); +void dx_agent_value_boolean(const void *correlator, const char *key, bool value); +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len); +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value); +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value); + + +/** + * + */ +void dx_agent_value_complete(const void *correlator, bool more); + + +/** + * + */ +void *dx_agent_raise_event(dx_agent_class_t *event); + + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/alloc.h b/extras/dispatch/include/qpid/dispatch/alloc.h new file mode 100644 index 0000000000..ae4190ad89 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/alloc.h @@ -0,0 +1,72 @@ +#ifndef __dispatch_alloc_h__ +#define __dispatch_alloc_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <stdint.h> +#include <qpid/dispatch/threading.h> + +typedef struct dx_alloc_pool_t dx_alloc_pool_t; + +typedef struct { + int transfer_batch_size; + int local_free_list_max; + int global_free_list_max; +} dx_alloc_config_t; + +typedef struct { + uint64_t total_alloc_from_heap; + uint64_t total_free_to_heap; + uint64_t held_by_threads; + uint64_t batches_rebalanced_to_threads; + uint64_t batches_rebalanced_to_global; +} dx_alloc_stats_t; + +typedef struct { + char *type_name; + size_t type_size; + size_t *additional_size; + size_t total_size; + dx_alloc_config_t *config; + dx_alloc_stats_t *stats; + dx_alloc_pool_t *global_pool; + sys_mutex_t *lock; +} dx_alloc_type_desc_t; + + +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool); +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p); + + +#define ALLOC_DECLARE(T) \ + T *new_##T(); \ + void free_##T(T *p) + +#define ALLOC_DEFINE_CONFIG(T,S,A,C) \ + dx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \ + __thread dx_alloc_pool_t *__local_pool_##T = 0; \ + T *new_##T() { return (T*) dx_alloc(&__desc_##T, &__local_pool_##T); } \ + void free_##T(T *p) { dx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ + dx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } + +#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) + + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/buffer.h b/extras/dispatch/include/qpid/dispatch/buffer.h new file mode 100644 index 0000000000..1c372b265d --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/buffer.h @@ -0,0 +1,79 @@ +#ifndef __dispatch_buffer_h__ +#define __dispatch_buffer_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> + +typedef struct dx_buffer_t dx_buffer_t; + +DEQ_DECLARE(dx_buffer_t, dx_buffer_list_t); + +struct dx_buffer_t { + DEQ_LINKS(dx_buffer_t); + unsigned int size; +}; + +/** + */ +void dx_buffer_set_size(size_t size); + +/** + */ +dx_buffer_t *dx_allocate_buffer(void); + +/** + * @param buf A pointer to an allocated buffer + */ +void dx_free_buffer(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first octet in the buffer + */ +unsigned char *dx_buffer_base(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first free octet in the buffer, the insert point for new data. + */ +unsigned char *dx_buffer_cursor(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets in the buffer's free space, how many octets may be inserted. + */ +size_t dx_buffer_capacity(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets of data in the buffer + */ +size_t dx_buffer_size(dx_buffer_t *buf); + +/** + * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the + * cursor by len octets. + * + * @param buf A pointer to an allocated buffer + * @param len The number of octets that have been appended to the buffer + */ +void dx_buffer_insert(dx_buffer_t *buf, size_t len); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h new file mode 100644 index 0000000000..01a24fbbef --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/container.h @@ -0,0 +1,129 @@ +#ifndef __dispatch_container_h__ +#define __dispatch_container_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> + +typedef uint8_t dx_dist_mode_t; +#define DX_DIST_COPY 0x01 +#define DX_DIST_MOVE 0x02 +#define DX_DIST_BOTH 0x03 + +/** + * Node Lifetime Policy (see AMQP 3.5.9) + */ +typedef enum { + DX_LIFE_PERMANENT, + DX_LIFE_DELETE_CLOSE, + DX_LIFE_DELETE_NO_LINKS, + DX_LIFE_DELETE_NO_MESSAGES, + DX_LIFE_DELETE_NO_LINKS_MESSAGES +} dx_lifetime_policy_t; + + +/** + * Link Direction + */ +typedef enum { + DX_INCOMING, + DX_OUTGOING +} dx_direction_t; + + +typedef struct dx_node_t dx_node_t; +typedef struct dx_link_t dx_link_t; + +typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery); +typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link); +typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed); +typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node); +typedef void (*dx_container_conn_handler_t) (void *type_context, dx_connection_t *conn); + +typedef struct { + char *type_name; + void *type_context; + int allow_dynamic_creation; + + // + // Node-Instance Handlers + // + dx_container_delivery_handler_t rx_handler; + dx_container_delivery_handler_t tx_handler; + dx_container_delivery_handler_t disp_handler; + dx_container_link_handler_t incoming_handler; + dx_container_link_handler_t outgoing_handler; + dx_container_link_handler_t writable_handler; + dx_container_link_detach_handler_t link_detach_handler; + + // + // Node-Type Handlers + // + dx_container_node_handler_t node_created_handler; + dx_container_node_handler_t node_destroyed_handler; + dx_container_conn_handler_t inbound_conn_open_handler; + dx_container_conn_handler_t outbound_conn_open_handler; +} dx_node_type_t; + +void dx_container_initialize(void); +void dx_container_finalize(void); + +int dx_container_register_node_type(const dx_node_type_t *nt); + +void dx_container_set_default_node_type(const dx_node_type_t *nt, + void *node_context, + dx_dist_mode_t supported_dist); + +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, + const char *name, + void *node_context, + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy); +void dx_container_destroy_node(dx_node_t *node); + +void dx_container_node_set_context(dx_node_t *node, void *node_context); +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node); +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node); + +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name); +void dx_link_set_context(dx_link_t *link, void *link_context); +void *dx_link_get_context(dx_link_t *link); +pn_link_t *dx_link_pn(dx_link_t *link); +pn_terminus_t *dx_link_source(dx_link_t *link); +pn_terminus_t *dx_link_target(dx_link_t *link); +pn_terminus_t *dx_link_remote_source(dx_link_t *link); +pn_terminus_t *dx_link_remote_target(dx_link_t *link); +void dx_link_activate(dx_link_t *link); +void dx_link_close(dx_link_t *link); + + +typedef struct dx_link_item_t dx_link_item_t; + +struct dx_link_item_t { + DEQ_LINKS(dx_link_item_t); + dx_link_t *link; +}; + +ALLOC_DECLARE(dx_link_item_t); +DEQ_DECLARE(dx_link_item_t, dx_link_list_t); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/ctools.h b/extras/dispatch/include/qpid/dispatch/ctools.h new file mode 100644 index 0000000000..33178a23ee --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/ctools.h @@ -0,0 +1,146 @@ +#ifndef __dispatch_ctools_h__ +#define __dispatch_ctools_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <assert.h> + +#define CT_ASSERT(exp) { assert(exp); } + +#define NEW(t) (t*) malloc(sizeof(t)) +#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n)) +#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n)) + +#define DEQ_DECLARE(i,d) typedef struct { \ + i *head; \ + i *tail; \ + i *scratch; \ + size_t size; \ + } d + +#define DEQ_LINKS(t) t *prev; t *next + +#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0) +#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0) +#define DEQ_HEAD(d) ((d).head) +#define DEQ_TAIL(d) ((d).tail) +#define DEQ_SIZE(d) ((d).size) +#define DEQ_NEXT(i) (i)->next +#define DEQ_PREV(i) (i)->prev + +#define DEQ_INSERT_HEAD(d,i) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((d).head) { \ + (i)->next = (d).head; \ + (d).head->prev = i; \ + } else { \ + (d).tail = i; \ + (i)->next = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->prev = 0; \ + (d).head = i; \ + (d).size++; \ +} while (0) + +#define DEQ_INSERT_TAIL(d,i) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((d).tail) { \ + (i)->prev = (d).tail; \ + (d).tail->next = i; \ + } else { \ + (d).head = i; \ + (i)->prev = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->next = 0; \ + (d).tail = i; \ + (d).size++; \ +} while (0) + +#define DEQ_REMOVE_HEAD(d) \ +do { \ + CT_ASSERT((d).head); \ + if ((d).head) { \ + (d).scratch = (d).head; \ + (d).head = (d).head->next; \ + if ((d).head == 0) { \ + (d).tail = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).head->prev = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ + } \ +} while (0) + +#define DEQ_REMOVE_TAIL(d) \ +do { \ + CT_ASSERT((d).tail); \ + if ((d).tail) { \ + (d).scratch = (d).tail; \ + (d).tail = (d).tail->prev; \ + if ((d).tail == 0) { \ + (d).head = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).tail->next = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ + } \ +} while (0) + +#define DEQ_INSERT_AFTER(d,i,a) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((a)->next) \ + (a)->next->prev = (i); \ + else \ + (d).tail = (i); \ + (i)->next = (a)->next; \ + (i)->prev = (a); \ + (a)->next = (i); \ + (d).size++; \ +} while (0) + +#define DEQ_REMOVE(d,i) \ +do { \ + if ((i)->next) \ + (i)->next->prev = (i)->prev; \ + else \ + (d).tail = (i)->prev; \ + if ((i)->prev) \ + (i)->prev->next = (i)->next; \ + else \ + (d).head = (i)->next; \ + (d).size--; \ + (i)->next = 0; \ + (i)->prev = 0; \ + CT_ASSERT((d).size || (!(d).head && !(d).tail)); \ +} while (0) + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/hash.h b/extras/dispatch/include/qpid/dispatch/hash.h new file mode 100644 index 0000000000..7f4a4bb950 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/hash.h @@ -0,0 +1,37 @@ +#ifndef __dispatch_hash_h__ +#define __dispatch_hash_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <qpid/dispatch/iterator.h> + +typedef struct hash_t hash_t; + +hash_t *hash(int bucket_exponent, int batch_size, int value_is_const); +void hash_free(hash_t *h); + +size_t hash_size(hash_t *h); +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); +int hash_remove(hash_t *h, dx_field_iterator_t *key); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/iovec.h b/extras/dispatch/include/qpid/dispatch/iovec.h new file mode 100644 index 0000000000..5b56c638ff --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/iovec.h @@ -0,0 +1,32 @@ +#ifndef __dispatch_iovec_h__ +#define __dispatch_iovec_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <sys/uio.h> + +typedef struct dx_iovec_t dx_iovec_t; + +dx_iovec_t *dx_iovec(int vector_count); +void dx_iovec_free(dx_iovec_t *iov); +struct iovec *dx_iovec_array(dx_iovec_t *iov); +int dx_iovec_count(dx_iovec_t *iov); + + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/iterator.h b/extras/dispatch/include/qpid/dispatch/iterator.h new file mode 100644 index 0000000000..9844286483 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/iterator.h @@ -0,0 +1,113 @@ +#ifndef __dispatch_iterator_h__ +#define __dispatch_iterator_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/buffer.h> + +/** + * The field iterator is used to access fields within a buffer chain. + * It shields the user from the fact that the field may be split across + * one or more physical buffers. + */ +typedef struct dx_field_iterator_t dx_field_iterator_t; + +/** + * Iterator views allow the code traversing the field to see a transformed + * view of the raw field. + * + * ITER_VIEW_ALL - No transformation of the raw field data + * + * ITER_VIEW_NO_HOST - Remove the scheme and host fields from the view + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^^^^^^^^^^^^^^^ + * node-id/node/specific + * ^^^^^^^^^^^^^^^^^^^^^ + * + * ITER_VIEW_NODE_ID - Isolate the node identifier from an address + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^ + * node-id/node/specific + * ^^^^^^^ + * + * ITER_VIEW_NODE_SPECIFIC - Isolate node-specific text from an address + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^^^^^^^ + * node-id/node/specific + * ^^^^^^^^^^^^^ + */ +typedef enum { + ITER_VIEW_ALL, + ITER_VIEW_NO_HOST, + ITER_VIEW_NODE_ID, + ITER_VIEW_NODE_SPECIFIC +} dx_iterator_view_t; + +/** + * Create an iterator from a null-terminated string. + * + * The "text" string must stay intact for the whole life of the iterator. The iterator + * does not copy the string, it references it. + */ +dx_field_iterator_t* dx_field_iterator_string(const char *text, + dx_iterator_view_t view); + +/** + * Create an iterator from a field in a buffer chain + */ +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, + int offset, + int length, + dx_iterator_view_t view); + +/** + * Free an iterator + */ +void dx_field_iterator_free(dx_field_iterator_t *iter); + +/** + * Reset the iterator to the first octet and set a new view + */ +void dx_field_iterator_reset(dx_field_iterator_t *iter, + dx_iterator_view_t view); + +/** + * Return the current octet in the iterator's view and step to the next. + */ +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter); + +/** + * Return true iff the iterator has no more octets in the view. + */ +int dx_field_iterator_end(dx_field_iterator_t *iter); + +/** + * Compare an input string to the iterator's view. Return true iff they are equal. + */ +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string); + +/** + * Return a copy of the iterator's view. + */ +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/log.h b/extras/dispatch/include/qpid/dispatch/log.h new file mode 100644 index 0000000000..cbea50f266 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/log.h @@ -0,0 +1,31 @@ +#ifndef __dispatch_log_h__ +#define __dispatch_log_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#define LOG_NONE 0x00000000 +#define LOG_TRACE 0x00000001 +#define LOG_ERROR 0x00000002 +#define LOG_INFO 0x00000004 + +void dx_log(const char *module, int cls, const char *fmt, ...); + +void dx_log_set_mask(int mask); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/message.h b/extras/dispatch/include/qpid/dispatch/message.h new file mode 100644 index 0000000000..41983c44a1 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/message.h @@ -0,0 +1,165 @@ +#ifndef __dispatch_message_h__ +#define __dispatch_message_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/iovec.h> + +// Callback for status change (confirmed persistent, loaded-in-memory, etc.) + +typedef struct dx_message_t dx_message_t; + +DEQ_DECLARE(dx_message_t, dx_message_list_t); + +struct dx_message_t { + DEQ_LINKS(dx_message_t); + // Private members not listed here. +}; + +typedef enum { + DX_DEPTH_NONE, + DX_DEPTH_HEADER, + DX_DEPTH_DELIVERY_ANNOTATIONS, + DX_DEPTH_MESSAGE_ANNOTATIONS, + DX_DEPTH_PROPERTIES, + DX_DEPTH_APPLICATION_PROPERTIES, + DX_DEPTH_BODY, + DX_DEPTH_ALL +} dx_message_depth_t; + + +typedef enum { + // + // Message Sections + // + DX_FIELD_HEADER, + DX_FIELD_DELIVERY_ANNOTATION, + DX_FIELD_MESSAGE_ANNOTATION, + DX_FIELD_PROPERTIES, + DX_FIELD_APPLICATION_PROPERTIES, + DX_FIELD_BODY, + DX_FIELD_FOOTER, + + // + // Fields of the Header Section + // + DX_FIELD_DURABLE, + DX_FIELD_PRIORITY, + DX_FIELD_TTL, + DX_FIELD_FIRST_ACQUIRER, + DX_FIELD_DELIVERY_COUNT, + + // + // Fields of the Properties Section + // + DX_FIELD_MESSAGE_ID, + DX_FIELD_USER_ID, + DX_FIELD_TO, + DX_FIELD_SUBJECT, + DX_FIELD_REPLY_TO, + DX_FIELD_CORRELATION_ID, + DX_FIELD_CONTENT_TYPE, + DX_FIELD_CONTENT_ENCODING, + DX_FIELD_ABSOLUTE_EXPIRY_TIME, + DX_FIELD_CREATION_TIME, + DX_FIELD_GROUP_ID, + DX_FIELD_GROUP_SEQUENCE, + DX_FIELD_REPLY_TO_GROUP_ID +} dx_message_field_t; + +// +// Functions for allocation +// +dx_message_t *dx_allocate_message(void); +void dx_free_message(dx_message_t *qm); +dx_message_t *dx_message_copy(dx_message_t *qm); +int dx_message_persistent(dx_message_t *qm); +int dx_message_in_memory(dx_message_t *qm); + +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg); +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg); + +// +// Functions for received messages +// +dx_message_t *dx_message_receive(pn_delivery_t *delivery); +void dx_message_send(dx_message_t *msg, pn_link_t *link); + +int dx_message_check(dx_message_t *msg, dx_message_depth_t depth); +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field); +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field); + +pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm); + +// +// Functions for composed messages +// + +// Convenience Functions +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers); +void dx_message_copy_header(dx_message_t *msg); // Copy received header into send-header (prior to adding annotations) +void dx_message_copy_message_annotations(dx_message_t *msg); + +// Raw Functions +void dx_message_begin_header(dx_message_t *msg); +void dx_message_end_header(dx_message_t *msg); + +void dx_message_begin_delivery_annotations(dx_message_t *msg); +void dx_message_end_delivery_annotations(dx_message_t *msg); + +void dx_message_begin_message_annotations(dx_message_t *msg); +void dx_message_end_message_annotations(dx_message_t *msg); + +void dx_message_begin_message_properties(dx_message_t *msg); +void dx_message_end_message_properties(dx_message_t *msg); + +void dx_message_begin_application_properties(dx_message_t *msg); +void dx_message_end_application_properties(dx_message_t *msg); + +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers); + +void dx_message_begin_body_sequence(dx_message_t *msg); +void dx_message_end_body_sequence(dx_message_t *msg); + +void dx_message_begin_footer(dx_message_t *msg); +void dx_message_end_footer(dx_message_t *msg); + +void dx_message_insert_null(dx_message_t *msg); +void dx_message_insert_boolean(dx_message_t *msg, int value); +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value); +void dx_message_insert_uint(dx_message_t *msg, uint32_t value); +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value); +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len); +void dx_message_insert_string(dx_message_t *msg, const char *start); +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value); +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len); +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value); +void dx_message_begin_list(dx_message_t* msg); +void dx_message_end_list(dx_message_t* msg); +void dx_message_begin_map(dx_message_t* msg); +void dx_message_end_map(dx_message_t* msg); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/router.h b/extras/dispatch/include/qpid/dispatch/router.h new file mode 100644 index 0000000000..03f4aa15be --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/router.h @@ -0,0 +1,35 @@ +#ifndef __dispatch_router_h__ +#define __dispatch_router_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/container.h> + +typedef struct dx_router_t dx_router_t; + +typedef struct { + size_t message_limit; + size_t memory_limit; +} dx_router_configuration_t; + +dx_router_t *dx_router(dx_router_configuration_t *config); +void dx_router_free(dx_router_t *router); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h new file mode 100644 index 0000000000..635e1323dd --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/server.h @@ -0,0 +1,403 @@ +#ifndef __dispatch_server_h__ +#define __dispatch_server_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> + +/** + * \defgroup Control Server Control Functions + * @{ + */ + +/** + * \brief Thread Start Handler + * + * Callback invoked when a new server thread is started. The callback is + * invoked on the newly created thread. + * + * This handler can be used to set processor affinity or other thread-specific + * tuning values. + * + * @param context The handler context supplied in dx_server_initialize. + * @param thread_id The integer thread identifier that uniquely identifies this thread. + */ +typedef void (*dx_thread_start_cb_t)(void* context, int thread_id); + + +/** + * \brief Initialize the server module and prepare it for operation. + * + * @param thread_count The number of worker threads (1 or more) that the server shall create + */ +void dx_server_initialize(int thread_count); + + +/** + * \brief Finalize the server after it has stopped running. + */ +void dx_server_finalize(void); + + +/** + * \brief Set the optional thread-start handler. + * + * This handler is called once on each worker thread at the time + * the thread is started. This may be used to set tuning settings like processor affinity, etc. + * + * @param start_handler The thread-start handler invoked per thread on thread startup. + * @param context Opaque context to be passed back in the callback function. + */ +void dx_server_set_start_handler(dx_thread_start_cb_t start_handler, void *context); + + +/** + * \brief Run the server threads until completion. + * + * Start the operation of the server, including launching all of the worker threads. + * This function does not return until after the server has been stopped. The thread + * that calls dx_server_run is used as one of the worker threads. + */ +void dx_server_run(void); + + +/** + * \brief Stop the server + * + * Stop the server and join all of its worker threads. This function may be called from any + * thread. When this function returns, all of the other server threads have been closed and + * joined. The calling thread will be the only running thread in the process. + */ +void dx_server_stop(void); + + +/** + * \brief Pause (quiesce) the server. + * + * This call blocks until all of the worker threads (except + * the one calling the this function) are finished processing and have been blocked. When + * this call returns, the calling thread is the only thread running in the process. + */ +void dx_server_pause(void); + + +/** + * \brief Resume normal operation of a paused server. + * + * This call unblocks all of the worker threads + * so they can resume normal connection processing. + */ +void dx_server_resume(void); + + +/** + * @} + * \defgroup Signal Server Signal Handling Functions + * @{ + */ + + +/** + * \brief Signal Handler + * + * Callback for caught signals. This handler will only be invoked for signal numbers + * that were registered via dx_server_signal. The handler is not invoked in the context + * of the OS signal handler. Rather, it is invoked on one of the worker threads in an + * orderly sequence. + * + * @param context The handler context supplied in dx_server_initialize. + * @param signum The signal number that was raised. + */ +typedef void (*dx_signal_handler_cb_t)(void* context, int signum); + + +/** + * Set the signal handler for the server. The signal handler is invoked cleanly on a worker thread + * after the server process catches an operating-system signal. The signal handler is optional and + * need not be set. + * + * @param signal_handler The signal handler called when a registered signal is caught. + * @param context Opaque context to be passed back in the callback function. + */ +void dx_server_set_signal_handler(dx_signal_handler_cb_t signal_handler, void *context); + + +/** + * \brief Register a signal to be caught and handled by the signal handler. + * + * @param signum The signal number of a signal to be handled by the application. + */ +void dx_server_signal(int signum); + + +/** + * @} + * \defgroup Connection Server AMQP Connection Handling Functions + * @{ + */ + +/** + * \brief Listener objects represent the desire to accept incoming transport connections. + */ +typedef struct dx_listener_t dx_listener_t; + +/** + * \brief Connector objects represent the desire to create and maintain an outgoing transport connection. + */ +typedef struct dx_connector_t dx_connector_t; + +/** + * \brief Connection objects wrap Proton connection objects. + */ +typedef struct dx_connection_t dx_connection_t; + +/** + * Event type for the connection callback. + */ +typedef enum { + /// The connection just opened via a listener (inbound). + DX_CONN_EVENT_LISTENER_OPEN, + + /// The connection just opened via a connector (outbound). + DX_CONN_EVENT_CONNECTOR_OPEN, + + /// The connection was closed at the transport level (not cleanly). + DX_CONN_EVENT_CLOSE, + + /// The connection requires processing. + DX_CONN_EVENT_PROCESS +} dx_conn_event_t; + + +/** + * \brief Connection Event Handler + * + * Callback invoked when processing is needed on a proton connection. This callback + * shall be invoked on one of the server's worker threads. The server guarantees that + * no two threads shall be allowed to process a single connection concurrently. + * The implementation of this handler may assume that it has exclusive access to the + * connection and its subservient components (sessions, links, deliveries, etc.). + * + * @param context The handler context supplied in dx_server_{connect,listen}. + * @param event The event/reason for the invocation of the handler. + * @param conn The connection that requires processing by the handler. + * @return A value greater than zero if the handler did any proton processing for + * the connection. If no work was done, zero is returned. + */ +typedef int (*dx_conn_handler_cb_t)(void* context, dx_conn_event_t event, dx_connection_t *conn); + + +/** + * \brief Set the connection event handler callback. + * + * Set the connection handler callback for the server. This callback is mandatory and must be set + * prior to the invocation of dx_server_run. + * + * @param conn_hander The handler for processing connection-related events. + */ +void dx_server_set_conn_handler(dx_conn_handler_cb_t conn_handler); + + +/** + * \brief Set the user context for a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param context User context to be stored with the connection. + */ +void dx_connection_set_context(dx_connection_t *conn, void *context); + + +/** + * \brief Get the user context from a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @return The user context stored with the connection. + */ +void *dx_connection_get_context(dx_connection_t *conn); + + +/** + * \brief Activate a connection for output. + * + * This function is used to request that the server activate the indicated connection. + * It is assumed that the connection is one that the caller does not have permission to + * access (i.e. it may be owned by another thread currently). An activated connection + * will, when writable, appear in the internal work list and be invoked for processing + * by a worker thread. + * + * @param conn The connection over which the application wishes to send data + */ +void dx_server_activate(dx_connection_t *conn); + + +/** + * \brief Get the wrapped proton-engine connection object. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @return The proton connection object. + */ +pn_connection_t *dx_connection_pn(dx_connection_t *conn); + + +/** + * \brief Configuration block for a connector or a listener. + */ +typedef struct dx_server_config_t { + /** + * Host name or network address to bind to a listener or use in the connector. + */ + char *host; + + /** + * Port name or number to bind to a listener or use in the connector. + */ + char *port; + + /** + * Space-separated list of SASL mechanisms to be accepted for the connection. + */ + char *sasl_mechanisms; + + /** + * If appropriate for the mechanism, the username for authentication + * (connector only) + */ + char *sasl_username; + + /** + * If appropriate for the mechanism, the password for authentication + * (connector only) + */ + char *sasl_password; + + /** + * If appropriate for the mechanism, the minimum acceptable security strength factor + */ + int sasl_minssf; + + /** + * If appropriate for the mechanism, the maximum acceptable security strength factor + */ + int sasl_maxssf; + + /** + * SSL is enabled for this connection iff non-zero. + */ + int ssl_enabled; + + /** + * Connection will take on the role of SSL server iff non-zero. + */ + int ssl_server; + + /** + * Iff non-zero AND ssl_enabled is non-zero, this listener will detect the client's use + * of SSL or non-SSL and conform to the client's protocol. + * (listener only) + */ + int ssl_allow_unsecured_client; + + /** + * Path to the file containing the PEM-formatted public certificate for the local end + * of the connection. + */ + char *ssl_certificate_file; + + /** + * Path to the file containing the PEM-formatted private key for the local end of the + * connection. + */ + char *ssl_private_key_file; + + /** + * The password used to sign the private key, or NULL if the key is not protected. + */ + char *ssl_password; + + /** + * Path to the file containing the PEM-formatted set of certificates of trusted CAs. + */ + char *ssl_trusted_certificate_db; + + /** + * Iff non-zero, require that the peer's certificate be supplied and that it be authentic + * according to the set of trusted CAs. + */ + int ssl_require_peer_authentication; + + /** + * Allow the connection to be redirected by the peer (via CLOSE->Redirect). This is + * meaningful for outgoing (connector) connections only. + */ + int allow_redirect; +} dx_server_config_t; + + +/** + * \brief Create a listener for incoming connections. + * + * @param config Pointer to a configuration block for this listener. This block will be + * referenced by the server, not copied. The referenced record must remain + * in-scope for the life of the listener. + * @param context User context passed back in the connection handler. + * @return A pointer to the new listener, or NULL in case of failure. + */ +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context); + + +/** + * \brief Free the resources associated with a listener. + * + * @param li A listener pointer returned by dx_listen. + */ +void dx_listener_free(dx_listener_t* li); + + +/** + * \brief Close a listener so it will accept no more connections. + * + * @param li A listener pointer returned by dx_listen. + */ +void dx_listener_close(dx_listener_t* li); + + +/** + * \brief Create a connector for an outgoing connection. + * + * @param config Pointer to a configuration block for this connector. This block will be + * referenced by the server, not copied. The referenced record must remain + * in-scope for the life of the connector.. + * @param context User context passed back in the connection handler. + * @return A pointer to the new connector, or NULL in case of failure. + */ +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context); + + +/** + * \brief Free the resources associated with a connector. + * + * @param ct A connector pointer returned by dx_connect. + */ +void dx_connector_free(dx_connector_t* ct); + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/threading.h b/extras/dispatch/include/qpid/dispatch/threading.h new file mode 100644 index 0000000000..f275fc0086 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/threading.h @@ -0,0 +1,45 @@ +#ifndef __sys_threading_h__ +#define __sys_threading_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct sys_mutex_t sys_mutex_t; + +sys_mutex_t *sys_mutex(void); +void sys_mutex_free(sys_mutex_t *mutex); +void sys_mutex_lock(sys_mutex_t *mutex); +void sys_mutex_unlock(sys_mutex_t *mutex); + + +typedef struct sys_cond_t sys_cond_t; + +sys_cond_t *sys_cond(void); +void sys_cond_free(sys_cond_t *cond); +void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex); +void sys_cond_signal(sys_cond_t *cond); +void sys_cond_signal_all(sys_cond_t *cond); + + +typedef struct sys_thread_t sys_thread_t; + +sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg); +void sys_thread_free(sys_thread_t *thread); +void sys_thread_join(sys_thread_t *thread); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/timer.h b/extras/dispatch/include/qpid/dispatch/timer.h new file mode 100644 index 0000000000..af3a22e262 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/timer.h @@ -0,0 +1,86 @@ +#ifndef __dispatch_timer_h__ +#define __dispatch_timer_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \defgroup Timer Server Timer Functions + * @{ + */ + +typedef struct dx_timer_t dx_timer_t; + +/** + * Timer Callback + * + * Callback invoked after a timer's interval expires and the timer fires. + * + * @param context The context supplied in dx_timer + */ +typedef void (*dx_timer_cb_t)(void* context); + + +/** + * Create a new timer object. + * + * @param cb The callback function to be invoked when the timer expires. + * @param context An opaque, user-supplied context to be passed into the callback. + * @return A pointer to the new timer object or NULL if memory is exhausted. + */ +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context); + + +/** + * Free the resources for a timer object. If the timer was scheduled, it will be canceled + * prior to freeing. After this function returns, the callback will not be invoked for this + * timer. + * + * @param timer Pointer to the timer object returned by dx_timer. + */ +void dx_timer_free(dx_timer_t *timer); + + +/** + * Schedule a timer to fire in the future. + * + * Note that the timer callback will never be invoked synchronously during the execution + * of dx_timer_schedule. Even if the interval is immediate (0), the callback invocation will + * be asynchronous and after the return of this function. + * + * @param timer Pointer to the timer object returned by dx_timer. + * @param msec The minimum number of milliseconds of delay until the timer fires. + * If 0 is supplied, the timer will fire immediately. + */ +void dx_timer_schedule(dx_timer_t *timer, long msec); + + +/** + * Attempt to cancel a scheduled timer. Since the timer callback can be invoked on any + * server thread, it is always possible that a last-second cancel attempt may arrive too late + * to stop the timer from firing (i.e. the cancel is concurrent with the fire callback). + * + * @param timer Pointer to the timer object returned by dx_timer. + */ +void dx_timer_cancel(dx_timer_t *timer); + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/user_fd.h b/extras/dispatch/include/qpid/dispatch/user_fd.h new file mode 100644 index 0000000000..3e5584ce2e --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/user_fd.h @@ -0,0 +1,121 @@ +#ifndef __dispatch_user_fd_h__ +#define __dispatch_user_fd_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +/** + * \defgroup UserFd Server User-File-Descriptor Functions + * @{ + */ + +typedef struct dx_user_fd_t dx_user_fd_t; + + +/** + * User_fd Handler + * + * Callback invoked when a user-managed file descriptor is available for reading or writing or there + * was an error on the file descriptor. + * + * @param context The handler context supplied in the dx_user_fd call. + * @param ufd The user_fd handle for the processable fd. + */ +typedef void (*dx_user_fd_handler_cb_t)(void* context, dx_user_fd_t *ufd); + + +/** + * Set the user-fd handler callback for the server. This handler is optional, but must be supplied + * if the dx_server is used to manage the activation of user file descriptors. + */ +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler); + + +/** + * Create a tracker for a user-managed file descriptor. + * + * A user-fd is appropriate for use when the application opens and manages file descriptors + * for purposes other than AMQP communication. Registering a user fd with the dispatch server + * controls processing of the FD alongside the FDs used for messaging. + * + * @param fd The open file descriptor being managed by the application. + * @param context User context passed back in the connection handler. + * @return A pointer to the new user_fd. + */ +dx_user_fd_t *dx_user_fd(int fd, void *context); + + +/** + * Free the resources for a user-managed FD tracker. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_free(dx_user_fd_t *ufd); + + +/** + * Activate a user-fd for read. + * + * Use this activation when the application has capacity to receive data from the user-fd. This will + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the + * file descriptor has data to read. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_activate_read(dx_user_fd_t *ufd); + + +/** + * Activate a user-fd for write. + * + * Use this activation when the application has data to write via the user-fd. This will + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the + * file descriptor is writable. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_activate_write(dx_user_fd_t *ufd); + + +/** + * Check readable status of a user-fd + * + * Note: It is possible that readable status is spurious (i.e. this function returns true + * but the file-descriptor is not readable and will block if not set to O_NONBLOCK). + * Code accordingly. + * + * @param ufd Structure pointer returned by dx_user_fd. + * @return true iff the user file descriptor is readable. + */ +bool dx_user_fd_is_readable(dx_user_fd_t *ufd); + + +/** + * Check writable status of a user-fd + * + * @param ufd Structure pointer returned by dx_user_fd. + * @return true iff the user file descriptor is writable. + */ +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd); + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/router/CMakeLists.txt b/extras/dispatch/router/CMakeLists.txt new file mode 100644 index 0000000000..efb424ee13 --- /dev/null +++ b/extras/dispatch/router/CMakeLists.txt @@ -0,0 +1,31 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License. +## + +## +## Build the router application +## +set(router_SOURCES + src/main.c + ) + +add_executable(dispatch-router ${router_SOURCES}) +target_link_libraries(dispatch-router qpid-dispatch ${proton_lib}) + +install(TARGETS dispatch-router RUNTIME DESTINATION bin) + diff --git a/extras/dispatch/router/src/main.c b/extras/dispatch/router/src/main.c new file mode 100644 index 0000000000..0cafa6a2ca --- /dev/null +++ b/extras/dispatch/router/src/main.c @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <proton/driver.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/log.h> +#include <qpid/dispatch/router.h> +#include <signal.h> +#include <sys/types.h> +#include <unistd.h> + +static int exit_with_sigint = 0; + +static void thread_start_handler(void* context, int thread_id) +{ +} + + +static void signal_handler(void* context, int signum) +{ + dx_server_pause(); + + switch (signum) { + case SIGINT: + exit_with_sigint = 1; + + case SIGQUIT: + case SIGTERM: + fflush(stdout); + dx_server_stop(); + break; + + case SIGHUP: + break; + + default: + break; + } + + dx_server_resume(); +} + + +static void startup(void *context) +{ + // TODO - Move this into a configuration framework + + dx_server_pause(); + + static dx_server_config_t server_config; + server_config.host = "0.0.0.0"; + server_config.port = "5672"; + server_config.sasl_mechanisms = "ANONYMOUS"; + server_config.ssl_enabled = 0; + + dx_server_listen(&server_config, 0); + + /* + static dx_server_config_t client_config; + client_config.host = "0.0.0.0"; + client_config.port = "10000"; + client_config.sasl_mechanisms = "ANONYMOUS"; + client_config.ssl_enabled = 0; + + dx_server_connect(&client_config, 0); + */ + + dx_server_resume(); +} + + +int main(int argc, char **argv) +{ + dx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR); + + dx_server_initialize(4); + dx_container_initialize(); + + dx_server_set_signal_handler(signal_handler, 0); + dx_server_set_start_handler(thread_start_handler, 0); + + dx_router_t *router = dx_router(0); + + dx_timer_t *startup_timer = dx_timer(startup, 0); + dx_timer_schedule(startup_timer, 0); + + dx_server_signal(SIGHUP); + dx_server_signal(SIGQUIT); + dx_server_signal(SIGTERM); + dx_server_signal(SIGINT); + + dx_server_run(); + dx_router_free(router); + dx_server_finalize(); + + if (exit_with_sigint) { + signal(SIGINT, SIG_DFL); + kill(getpid(), SIGINT); + } + + return 0; +} + diff --git a/extras/dispatch/site/css/style.css b/extras/dispatch/site/css/style.css new file mode 100644 index 0000000000..b73c136d4a --- /dev/null +++ b/extras/dispatch/site/css/style.css @@ -0,0 +1,280 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +ul { + list-style-type:square; +} + +th { + text-align: left; + font-weight: bold; +} + +body { + margin:0; + background:#FFFFFF; + font-family:"Verdana", sans-serif; +} + +.container { + width:950px; + margin:0 auto; +} + +.header { + height:100px; + width:950px; + background:url(images/header.png) +} + +.logo { + text-align:center; + font-weight:600; + padding:0 0 0 0; + font-size:14px; + font-family:"Verdana", cursive; +} + +.logo a { + color:#000000; + text-decoration:none; +} + +.main_text_area { + margin-left:200px; +} + +.main_text_area_top { + height:14px; + font-size:1px; +} + +.main_text_area_bottom { + display:none; +/* height:14px; + margin-bottom:4px;*/ +} + +.main_text_area_body { + padding:5px 24px; +} + +.main_text_area_body p { + text-align:justify; +} + +.main_text_area br { + line-height:10px; +} + +.main_text_area h1 { + font-size:28px; + font-weight:600; + margin:0 0 24px 0; + color:#0c3b82; + font-family:"Verdana", Times, serif; +} + +.main_text_area h2 { + font-size:24px; + font-weight:600; + margin:24px 0 8px 0; + color:#0c3b82; + font-family:"Verdana",Times, serif; +} + +.main_text_area ol, .main_text_area ul { + padding:0; + margin:10px 0; + margin-left:20px; +} + +.main_text_area li { +/* margin-left:40px; */ +} + +.main_text_area, .menu_box { + font-size:13px; + line-height:17px; + color:#000000; +} + +.main_text_area { + font-size:15px; +} + +.main_text_area a { + color:#000000; +} + +.main_text_area a:hover { + color:#000000; +} + +.menu_box { + width:196px; + float:left; + margin-left:4px; +} + +.menu_box_top { + background:url(images/menu_top.png) no-repeat; + height:14px; + font-size:1px; +} + +.menu_box_body { + background:url(images/menu_body.png) repeat-y; + padding:5px 24px 5px 24px; +} + +.menu_box_bottom { + background:url(images/menu_bottom.png) no-repeat; + height:14px; + font-size:1px; + margin-bottom:1px; +} + +.menu_box h3 { + font-size:20px; + font-weight:500; + margin:0 0 8px 0; + color:#0c3b82; + font-family:"Verdana",Times, serif; +} + +.menu_box ul { + margin:12px; + padding:0px; +} + +.menu_box li { + list-style:square; +} + +.menu_box a { + color:#000000; + text-decoration:none; +} + +.menu_box a:hover { + color:#000000; + text-decoration:underline; +} + +.feature_box { + width:698px; + overflow:hidden; +} + +.feature_box h3 { + font-size:18px; + font-weight:600; + margin:0 0 8px 0; + color:#0c3b82; + font-family:"Verdana", Times, serif; +} + +.feature_box_column1 { + width:196px; + float:left; + padding:10px 15px 10px 15px; + margin-left:0px; +} + +.feature_box_column2 { + width:196px; + float:left; + padding:10px 15px 10px 15px; + margin-left:0px; +} + +.feature_box_column3 { + width:196px; + float:left; + padding:10px 15px 10px 15px; + margin-left:0px; +} + + +.feature_box ul { + margin:.8em .4em; + padding-left:1.2em; + padding:0; + list-style-type: square; +} + +.feature_box ul li { + font-family:"Verdana",sans-serif; + font-size:14px; + color:#000; + margin:.4em 0; +} + +.feature_box ul li ul { + padding-left:1.2em; + margin-left:2em; +} + +.feature_box a { + color:#000000; + text-decoration:none; +} + +.feature_box a:hover { + color:#000000; + text-decoration:underline; +} + +.footer { + color:#000000; + clear:both; + text-align:center; + font-size:11px; + line-height:17px; + height:45px; + padding-top:18px; +} + +.footer a { + color:#000000; +} + +.footer a:hover { + color:#000000; +} + +.download_table { + width:100%; +} + +.download_table_col_1 { + width:240px; +} + +.proton_download_table_col_1 { + width:420px; +} + +.download_table_amqp_col { + text-align:center; + width:80px; +} + diff --git a/extras/dispatch/site/images/arch.dia b/extras/dispatch/site/images/arch.dia Binary files differnew file mode 100644 index 0000000000..99b3185447 --- /dev/null +++ b/extras/dispatch/site/images/arch.dia diff --git a/extras/dispatch/site/images/arch.png b/extras/dispatch/site/images/arch.png Binary files differnew file mode 100644 index 0000000000..a2b7f776b9 --- /dev/null +++ b/extras/dispatch/site/images/arch.png diff --git a/extras/dispatch/site/includes/footer.include b/extras/dispatch/site/includes/footer.include new file mode 100644 index 0000000000..35ff04b9f2 --- /dev/null +++ b/extras/dispatch/site/includes/footer.include @@ -0,0 +1,7 @@ + <div class="footer"> + <p> + © 2004-2012 The Apache Software Foundation.<br /> + Apache Qpid, Qpid, Apache, the Apache feather logo, and the Apache Qpid project logo are trademarks of The Apache Software Foundation.<br /> + All other marks mentioned may be trademarks or registered trademarks of their respective owners. + </p> + </div> diff --git a/extras/dispatch/site/includes/header.include b/extras/dispatch/site/includes/header.include new file mode 100644 index 0000000000..244dfc4517 --- /dev/null +++ b/extras/dispatch/site/includes/header.include @@ -0,0 +1,6 @@ + <div class="header"> + <div class="logo"> + <h1>Apache Qpid™</h1> + <h2>Open Source AMQP Messaging</h2> + </div> + </div> diff --git a/extras/dispatch/site/includes/menu.include b/extras/dispatch/site/includes/menu.include new file mode 100644 index 0000000000..7cbdbd139d --- /dev/null +++ b/extras/dispatch/site/includes/menu.include @@ -0,0 +1,68 @@ + <div class="menu_box"> + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>Apache Qpid Dispatch</h3> + <ul> + <li><a href="index.html">Back to Qpid</a></li> + <li><a href="index.html">Home</a></li> + <li><a href="download.html">Download</a></li> + <li><a href="getting_started.html">Getting Started</a></li> + <li><a href="http://www.apache.org/licenses/">License</a></li> + <li><a href="https://cwiki.apache.org/qpid/faq.html">FAQ</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>Documentation</h3> + <ul> + <li><a href="documentation.html#doc-release">Latest Release</a></li> + <li><a href="documentation.html#doc-trunk">Trunk</a></li> + <li><a href="documentation.html#doc-archives">Archive</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>Community</h3> + <ul> + <li><a href="getting_involved.html">Getting Involved</a></li> + <li><a href="source_repository.html">Source Repository</a></li> + <li><a href="https://issues.apache.org/jira/browse/qpid">Issue Reporting</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>Developers</h3> + <ul> + <li><a href="https://cwiki.apache.org/qpid/building.html">Building Qpid</a></li> + <li><a href="https://cwiki.apache.org/qpid/developer-pages.html">Developer Pages</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>About AMQP</h3> + <ul> + <li><a href="amqp.html">What is AMQP?</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + + <div class="menu_box_top"></div> + <div class="menu_box_body"> + <h3>About Apache</h3> + <ul> + <li><a href="http://www.apache.org">Home</a></li> + <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li> + <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li> + <li><a href="http://www.apache.org/security/">Security</a></li> + </ul> + </div> + <div class="menu_box_bottom"></div> + </div> diff --git a/extras/dispatch/site/index.html b/extras/dispatch/site/index.html new file mode 100755 index 0000000000..d8f1759492 --- /dev/null +++ b/extras/dispatch/site/index.html @@ -0,0 +1,101 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - +--> +<html xmlns="http://www.w3.org/1999/xhtml"> + <head> + <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> + <title>Apache Qpid Dispatch™: A Platform for Building AMQP Infrastructure</title> + <link href="css/style.css" rel="stylesheet" type="text/css"/> + </head> + + <body> + <div class="container"> + <!-- begin header --> + + <div class="header"> + <div class="logo"> + <h1>Apache Qpid Dispatch™</h1> + <h2>A Platform for Building AMQP Infrastructure</h2> + </div> + </div> + + <!-- end header --> + + <!-- begin menu --> + <!--#include virtual="includes/menu.include" --> + <!-- end menu --> + + <!-- begin content --> + <div class="main_text_area"> + <div class="main_text_area_top"></div> + + <div class="main_text_area_body"> + +<p>Qpid Dispatch is a library to help developers build infrastructure +components for AMQP. Dispatch is not a general-purpose Messaging API. +Rather, it is a foundation on which to build applications, services, and +appliances that need direct access to the detailed constructs of AMQP.</p> +<hr width="80%" /> +<h2>Overview</h2> +<p>Dispatch is an extension of the Engine and Driver interfaces of +<a href="http://qpid.apache.org/proton">Qpid Proton</a>. It neither +uses nor exposes the Messenger interface of Proton. Rather, it +provides a way for developers to use Proton's more detailed Engine +facility. The following features are provided:</p> + +<ul> + <li>An asynchronous, event-oriented application environment</li> + <li>Safe multi-threaded use of Proton</li> + <li>Operating System Signal handling</li> + <li>Quiesce and Resume for the application's threads</li> + <li>Timers</li> + <li>Resilient outbound connections (retry/reconnect)</li> + <li>Polling support for the application's non-AMQP file descriptors</li> + <li>An AMQP Node Container that allows the developer to create + custom node types</li> +</ul> +<p /> +<hr width="80%" /> +<h2>Architecture</h2> +<center><img src="images/arch.png" /></center> +<ul> + <li><b>Proton Engine and Driver</b> provide the underlying AMQP capability</li> + <li><a href="doxygen/server/modules.html">Dispatch Server</a> + wraps Proton connections in a multi-threaded server environment</li> + <li><b>Dispatch Container</b> provides management of AMQP nodes (links, termini, and deliveries)</li> + <li><b>Dispatch Message</b> provides efficient message encode/decode, optimized for messaging intermediaries</li> + <li>The <b>Application</b> uses all of the above services to implement scalable and performant AMQP infrastructure</li> +</ul> +<hr width="80%" /> + + </div> + + <div class="main_text_area_bottom"></div> + </div> + <!-- end content --> + + <!-- begin footer --> + <!--#include virtual="includes/footer.include" --> + <!-- end footer --> + + </div> + </body> +</html> diff --git a/extras/dispatch/src/agent.c b/extras/dispatch/src/agent.c new file mode 100644 index 0000000000..a885042b45 --- /dev/null +++ b/extras/dispatch/src/agent.c @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/agent.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> +#include <string.h> + + +typedef struct dx_agent_t { + hash_t *class_hash; + dx_message_list_t in_fifo; + dx_message_list_t out_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; +} dx_agent_t; + +static dx_agent_t *agent = 0; + + +struct dx_agent_class_t { + char *fqname; + void *context; + dx_agent_schema_cb_t schema_handler; + dx_agent_query_cb_t query_handler; // 0 iff class is an event. +}; + + +static void dx_agent_timer_handler(void *context) +{ + // TODO - Process the in_fifo here +} + + +void dx_agent_initialize() +{ + assert(!agent); + agent = NEW(dx_agent_t); + agent->class_hash = hash(6, 10, 1); + DEQ_INIT(agent->in_fifo); + DEQ_INIT(agent->out_fifo); + agent->lock = sys_mutex(); + agent->timer = dx_timer(dx_agent_timer_handler, agent); +} + + +void dx_agent_finalize(void) +{ + sys_mutex_free(agent->lock); + dx_timer_free(agent->timer); + hash_free(agent->class_hash); + free(agent); + agent = 0; +} + + +dx_agent_class_t *dx_agent_register_class(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler) +{ + dx_agent_class_t *cls = NEW(dx_agent_class_t); + assert(cls); + cls->fqname = (char*) malloc(strlen(fqname) + 1); + strcpy(cls->fqname, fqname); + cls->context = context; + cls->schema_handler = schema_handler; + cls->query_handler = query_handler; + + dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); + int result = hash_insert_const(agent->class_hash, iter, cls); + dx_field_iterator_free(iter); + assert(result >= 0); + + return cls; +} + + +dx_agent_class_t *dx_agent_register_event(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler) +{ + return dx_agent_register_class(fqname, context, schema_handler, 0); +} + + +void dx_agent_value_string(const void *correlator, const char *key, const char *value) +{ +} + + +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) +{ +} + + +void dx_agent_value_null(const void *correlator, const char *key) +{ +} + + +void dx_agent_value_boolean(const void *correlator, const char *key, bool value) +{ +} + + +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) +{ +} + + +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) +{ +} + + +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) +{ +} + + +void dx_agent_value_complete(const void *correlator, bool more) +{ +} + + +void *dx_agent_raise_event(dx_agent_class_t *event) +{ + return 0; +} + diff --git a/extras/dispatch/src/alloc.c b/extras/dispatch/src/alloc.c new file mode 100644 index 0000000000..2b3b953aad --- /dev/null +++ b/extras/dispatch/src/alloc.c @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/log.h> +#include <memory.h> +#include <stdio.h> + +typedef struct item_t item_t; + +struct item_t { + DEQ_LINKS(item_t); + dx_alloc_type_desc_t *desc; +}; + +DEQ_DECLARE(item_t, item_list_t); + +struct dx_alloc_pool_t { + item_list_t free_list; +}; + +dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0}; +dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0}; + +sys_mutex_t *init_lock; +item_list_t type_list; + +static void dx_alloc_init(dx_alloc_type_desc_t *desc) +{ + sys_mutex_lock(init_lock); + + desc->total_size = desc->type_size; + if (desc->additional_size) + desc->total_size += *desc->additional_size; + + dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", + desc->type_name, desc->type_size, desc->total_size); + + if (!desc->global_pool) { + if (desc->config == 0) + desc->config = desc->total_size > 256 ? + &dx_alloc_default_config_big : &dx_alloc_default_config_small; + + assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); + + desc->global_pool = NEW(dx_alloc_pool_t); + DEQ_INIT(desc->global_pool->free_list); + desc->lock = sys_mutex(); + desc->stats = NEW(dx_alloc_stats_t); + memset(desc->stats, 0, sizeof(dx_alloc_stats_t)); + } + + item_t *type_item = NEW(item_t); + DEQ_ITEM_INIT(type_item); + type_item->desc = desc; + DEQ_INSERT_TAIL(type_list, type_item); + + sys_mutex_unlock(init_lock); +} + + +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) +{ + int idx; + + // + // If the descriptor is not initialized, set it up now. + // + if (!desc->global_pool) + dx_alloc_init(desc); + + // + // If this is the thread's first pass through here, allocate the + // thread-local pool for this type. + // + if (*tpool == 0) { + *tpool = NEW(dx_alloc_pool_t); + DEQ_INIT((*tpool)->free_list); + } + + dx_alloc_pool_t *pool = *tpool; + + // + // Fast case: If there's an item on the local free list, take it off the + // list and return it. Since everything we've touched is thread-local, + // there is no need to acquire a lock. + // + item_t *item = DEQ_HEAD(pool->free_list); + if (item) { + DEQ_REMOVE_HEAD(pool->free_list); + return &item[1]; + } + + // + // The local free list is empty, we need to either rebalance a batch + // of items from the global list or go to the heap to get new memory. + // + sys_mutex_lock(desc->lock); + if (DEQ_SIZE(desc->global_pool->free_list) >= desc->config->transfer_batch_size) { + // + // Rebalance a full batch from the global free list to the thread list. + // + desc->stats->batches_rebalanced_to_threads++; + desc->stats->held_by_threads += desc->config->transfer_batch_size; + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = DEQ_HEAD(desc->global_pool->free_list); + DEQ_REMOVE_HEAD(desc->global_pool->free_list); + DEQ_INSERT_TAIL(pool->free_list, item); + } + } else { + // + // Allocate a full batch from the heap and put it on the thread list. + // + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = (item_t*) malloc(sizeof(item_t) + desc->total_size); + if (item == 0) + break; + DEQ_ITEM_INIT(item); + item->desc = desc; + DEQ_INSERT_TAIL(pool->free_list, item); + desc->stats->held_by_threads++; + desc->stats->total_alloc_from_heap++; + } + } + sys_mutex_unlock(desc->lock); + + item = DEQ_HEAD(pool->free_list); + if (item) { + DEQ_REMOVE_HEAD(pool->free_list); + return &item[1]; + } + + return 0; +} + + +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p) +{ + item_t *item = ((item_t*) p) - 1; + int idx; + + // + // If this is the thread's first pass through here, allocate the + // thread-local pool for this type. + // + if (*tpool == 0) { + *tpool = NEW(dx_alloc_pool_t); + DEQ_INIT((*tpool)->free_list); + } + + dx_alloc_pool_t *pool = *tpool; + + DEQ_INSERT_TAIL(pool->free_list, item); + + if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max) + return; + + // + // We've exceeded the maximum size of the local free list. A batch must be + // rebalanced back to the global list. + // + sys_mutex_lock(desc->lock); + desc->stats->batches_rebalanced_to_global++; + desc->stats->held_by_threads -= desc->config->transfer_batch_size; + for (idx = 0; idx < desc->config->transfer_batch_size; idx++) { + item = DEQ_HEAD(pool->free_list); + DEQ_REMOVE_HEAD(pool->free_list); + DEQ_INSERT_TAIL(desc->global_pool->free_list, item); + } + + // + // If there's a global_free_list size limit, remove items until the limit is + // not exceeded. + // + if (desc->config->global_free_list_max != 0) { + while (DEQ_SIZE(desc->global_pool->free_list) > desc->config->global_free_list_max) { + item = DEQ_HEAD(desc->global_pool->free_list); + DEQ_REMOVE_HEAD(desc->global_pool->free_list); + free(item); + desc->stats->total_free_to_heap++; + } + } + + sys_mutex_unlock(desc->lock); +} + + +void dx_alloc_initialize(void) +{ + init_lock = sys_mutex(); + DEQ_INIT(type_list); +} + diff --git a/extras/dispatch/src/alloc_private.h b/extras/dispatch/src/alloc_private.h new file mode 100644 index 0000000000..fbb18ccd48 --- /dev/null +++ b/extras/dispatch/src/alloc_private.h @@ -0,0 +1,26 @@ +#ifndef __dispatch_alloc_private_h__ +#define __dispatch_alloc_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/alloc.h> + +void dx_alloc_initialize(void); + +#endif diff --git a/extras/dispatch/src/auth.c b/extras/dispatch/src/auth.c new file mode 100644 index 0000000000..f0df58f6c2 --- /dev/null +++ b/extras/dispatch/src/auth.c @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <string.h> +#include "auth.h" +#include "server_private.h" +#include <proton/sasl.h> + + +void auth_client_handler(pn_connector_t *cxtr) +{ + pn_sasl_t *sasl = pn_connector_sasl(cxtr); + pn_sasl_state_t state = pn_sasl_state(sasl); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); + + if (state == PN_SASL_CONF) { + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_client(sasl); + } + + state = pn_sasl_state(sasl); + + if (state == PN_SASL_PASS) { + ctx->state = CONN_STATE_OPENING; + } else if (state == PN_SASL_FAIL) { + ctx->state = CONN_STATE_FAILED; + } +} + + +void auth_server_handler(pn_connector_t *cxtr) +{ + pn_sasl_t *sasl = pn_connector_sasl(cxtr); + pn_sasl_state_t state = pn_sasl_state(sasl); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); + + while (state == PN_SASL_CONF || state == PN_SASL_STEP) { + if (state == PN_SASL_CONF) { + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_server(sasl); + } else if (state == PN_SASL_STEP) { + const char* mechanisms = pn_sasl_remote_mechanisms(sasl); + if (strcmp(mechanisms, "ANONYMOUS") == 0) + pn_sasl_done(sasl, PN_SASL_OK); + else + pn_sasl_done(sasl, PN_SASL_AUTH); + } + state = pn_sasl_state(sasl); + } + + if (state == PN_SASL_PASS) { + ctx->state = CONN_STATE_OPENING; + } else if (state == PN_SASL_FAIL) { + ctx->state = CONN_STATE_FAILED; + } +} + + diff --git a/extras/dispatch/src/auth.h b/extras/dispatch/src/auth.h new file mode 100644 index 0000000000..c551c8ff76 --- /dev/null +++ b/extras/dispatch/src/auth.h @@ -0,0 +1,27 @@ +#ifndef __auth_h__ +#define __auth_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/driver.h> + +void auth_client_handler(pn_connector_t *conn); +void auth_server_handler(pn_connector_t *conn); + +#endif diff --git a/extras/dispatch/src/buffer.c b/extras/dispatch/src/buffer.c new file mode 100644 index 0000000000..015711afd9 --- /dev/null +++ b/extras/dispatch/src/buffer.c @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/alloc.h> + +static size_t buffer_size = 512; +static int size_locked = 0; + +ALLOC_DECLARE(dx_buffer_t); +ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0); + + +void dx_buffer_set_size(size_t size) +{ + assert(!size_locked); + buffer_size = size; +} + + +dx_buffer_t *dx_allocate_buffer(void) +{ + size_locked = 1; + dx_buffer_t *buf = new_dx_buffer_t(); + + DEQ_ITEM_INIT(buf); + buf->size = 0; + return buf; +} + + +void dx_free_buffer(dx_buffer_t *buf) +{ + free_dx_buffer_t(buf); +} + + +unsigned char *dx_buffer_base(dx_buffer_t *buf) +{ + return (unsigned char*) &buf[1]; +} + + +unsigned char *dx_buffer_cursor(dx_buffer_t *buf) +{ + return ((unsigned char*) &buf[1]) + buf->size; +} + + +size_t dx_buffer_capacity(dx_buffer_t *buf) +{ + return buffer_size - buf->size; +} + + +size_t dx_buffer_size(dx_buffer_t *buf) +{ + return buf->size; +} + + +void dx_buffer_insert(dx_buffer_t *buf, size_t len) +{ + buf->size += len; + assert(buf->size <= buffer_size); +} + diff --git a/extras/dispatch/src/container.c b/extras/dispatch/src/container.c new file mode 100644 index 0000000000..68e2afa3eb --- /dev/null +++ b/extras/dispatch/src/container.c @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <string.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> +#include <proton/engine.h> +#include <proton/message.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> + +static char *module="CONTAINER"; + +struct dx_node_t { + const dx_node_type_t *ntype; + char *name; + void *context; + dx_dist_mode_t supported_dist; + dx_lifetime_policy_t life_policy; +}; + +ALLOC_DECLARE(dx_node_t); +ALLOC_DEFINE(dx_node_t); +ALLOC_DEFINE(dx_link_item_t); + +struct dx_link_t { + pn_link_t *pn_link; + void *context; + dx_node_t *node; +}; + +ALLOC_DECLARE(dx_link_t); +ALLOC_DEFINE(dx_link_t); + +typedef struct nxc_node_type_t { + DEQ_LINKS(struct nxc_node_type_t); + const dx_node_type_t *ntype; +} nxc_node_type_t; +DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); + + +static hash_t *node_type_map; +static hash_t *node_map; +static sys_mutex_t *lock; +static dx_node_t *default_node; +static nxc_node_type_list_t node_type_list; + +static void setup_outgoing_link(pn_link_t *pn_link) +{ + sys_mutex_lock(lock); + dx_node_t *node; + int result; + const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); + dx_field_iterator_t *iter; + // TODO - Extract the name from the structured source + + if (source) { + iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); + result = hash_retrieve(node_map, iter, (void*) &node); + dx_field_iterator_free(iter); + } else + result = -1; + sys_mutex_unlock(lock); + + if (result < 0) { + if (default_node) + node = default_node; + else { + // Reject the link + // TODO - When the API allows, add an error message for "no available node" + pn_link_close(pn_link); + return; + } + } + + dx_link_t *link = new_dx_link_t(); + if (!link) { + pn_link_close(pn_link); + return; + } + + link->pn_link = pn_link; + link->context = 0; + link->node = node; + + pn_link_set_context(pn_link, link); + node->ntype->outgoing_handler(node->context, link); +} + + +static void setup_incoming_link(pn_link_t *pn_link) +{ + sys_mutex_lock(lock); + dx_node_t *node; + int result; + const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_field_iterator_t *iter; + // TODO - Extract the name from the structured target + + if (target) { + iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); + result = hash_retrieve(node_map, iter, (void*) &node); + dx_field_iterator_free(iter); + } else + result = -1; + sys_mutex_unlock(lock); + + if (result < 0) { + if (default_node) + node = default_node; + else { + // Reject the link + // TODO - When the API allows, add an error message for "no available node" + pn_link_close(pn_link); + return; + } + } + + dx_link_t *link = new_dx_link_t(); + if (!link) { + pn_link_close(pn_link); + return; + } + + link->pn_link = pn_link; + link->context = 0; + link->node = node; + + pn_link_set_context(pn_link, link); + node->ntype->incoming_handler(node->context, link); +} + + +static int do_writable(pn_link_t *pn_link) +{ + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + if (!link) + return 0; + + dx_node_t *node = link->node; + if (!node) + return 0; + + return node->ntype->writable_handler(node->context, link); +} + + +static void process_receive(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) { + node->ntype->rx_handler(node->context, link, delivery); + return; + } + } + + // + // Reject the delivery if we couldn't find a node to handle it + // + pn_link_advance(pn_link); + pn_link_flow(pn_link, 1); + pn_delivery_update(delivery, PN_REJECTED); + pn_delivery_settle(delivery); +} + + +static void do_send(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) { + node->ntype->tx_handler(node->context, link, delivery); + return; + } + } + + // TODO - Cancel the delivery +} + + +static void do_updated(pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + + if (link) { + dx_node_t *node = link->node; + if (node) + node->ntype->disp_handler(node->context, link, delivery); + } +} + + +static int close_handler(void* unused, pn_connection_t *conn) +{ + // + // Close all links, passing False as the 'closed' argument. These links are not + // being properly 'detached'. They are being orphaned. + // + pn_link_t *pn_link = pn_link_head(conn, 0); + while (pn_link) { + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; + if (node) + node->ntype->link_detach_handler(node->context, link, 0); + pn_link_close(pn_link); + free_dx_link_t(link); + pn_link = pn_link_next(pn_link, 0); + } + + // teardown all sessions + pn_session_t *ssn = pn_session_head(conn, 0); + while (ssn) { + pn_session_close(ssn); + ssn = pn_session_next(ssn, 0); + } + + // teardown the connection + pn_connection_close(conn); + return 0; +} + + +static int process_handler(void* unused, pn_connection_t *conn) +{ + pn_session_t *ssn; + pn_link_t *pn_link; + pn_delivery_t *delivery; + int event_count = 0; + + // Step 1: setup the engine's connection, and any sessions and links + // that may be pending. + + // initialize the connection if it's new + if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { + pn_connection_open(conn); + event_count++; + } + + // open all pending sessions + ssn = pn_session_head(conn, PN_LOCAL_UNINIT); + while (ssn) { + pn_session_open(ssn); + ssn = pn_session_next(ssn, PN_LOCAL_UNINIT); + event_count++; + } + + // configure and open any pending links + pn_link = pn_link_head(conn, PN_LOCAL_UNINIT); + while (pn_link) { + if (pn_link_is_sender(pn_link)) + setup_outgoing_link(pn_link); + else + setup_incoming_link(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_UNINIT); + event_count++; + } + + + // Step 2: Now drain all the pending deliveries from the connection's + // work queue and process them + + delivery = pn_work_head(conn); + while (delivery) { + if (pn_delivery_readable(delivery)) + process_receive(delivery); + else if (pn_delivery_writable(delivery)) + do_send(delivery); + + if (pn_delivery_updated(delivery)) + do_updated(delivery); + + delivery = pn_work_next(delivery); + event_count++; + } + + // + // Step 2.5: Traverse all of the links on the connection looking for + // outgoing links with non-zero credit. Call the attached node's + // writable handler for such links. + // + pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + while (pn_link) { + assert(pn_session_connection(pn_link_session(pn_link)) == conn); + if (pn_link_is_sender(pn_link) && pn_link_credit(pn_link) > 0) + event_count += do_writable(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); + } + + // Step 3: Clean up any links or sessions that have been closed by the + // remote. If the connection has been closed remotely, clean that up + // also. + + // teardown any terminating links + pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + while (pn_link) { + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; + if (node) + node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message + pn_link_close(pn_link); + pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + event_count++; + } + + // teardown any terminating sessions + ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + while (ssn) { + pn_session_close(ssn); + ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); + event_count++; + } + + // teardown the connection if it's terminating + if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) { + pn_connection_close(conn); + event_count++; + } + + return event_count; +} + + +static void open_handler(dx_connection_t *conn, dx_direction_t dir) +{ + const dx_node_type_t *nt; + + // + // Note the locking structure in this function. Generally this would be unsafe, but since + // this particular list is only ever appended to and never has items inserted or deleted, + // this usage is safe in this case. + // + sys_mutex_lock(lock); + nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list); + sys_mutex_unlock(lock); + + pn_connection_open(dx_connection_pn(conn)); + + while (nt_item) { + nt = nt_item->ntype; + if (dir == DX_INCOMING) { + if (nt->inbound_conn_open_handler) + nt->inbound_conn_open_handler(nt->type_context, conn); + } else { + if (nt->outbound_conn_open_handler) + nt->outbound_conn_open_handler(nt->type_context, conn); + } + + sys_mutex_lock(lock); + nt_item = DEQ_NEXT(nt_item); + sys_mutex_unlock(lock); + } +} + + +static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn) +{ + pn_connection_t *conn = dx_connection_pn(dx_conn); + + switch (event) { + case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break; + case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break; + case DX_CONN_EVENT_CLOSE: return close_handler(context, conn); + case DX_CONN_EVENT_PROCESS: return process_handler(context, conn); + } + + return 0; +} + + +void dx_container_initialize(void) +{ + dx_log(module, LOG_TRACE, "Container Initializing"); + + node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 + node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 + lock = sys_mutex(); + default_node = 0; + DEQ_INIT(node_type_list); + + dx_server_set_conn_handler(handler); +} + + +void dx_container_finalize(void) +{ +} + + +int dx_container_register_node_type(const dx_node_type_t *nt) +{ + int result; + dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); + nxc_node_type_t *nt_item = NEW(nxc_node_type_t); + DEQ_ITEM_INIT(nt_item); + nt_item->ntype = nt; + + sys_mutex_lock(lock); + result = hash_insert_const(node_type_map, iter, nt); + DEQ_INSERT_TAIL(node_type_list, nt_item); + sys_mutex_unlock(lock); + + dx_field_iterator_free(iter); + if (result < 0) + return result; + dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); + + return 0; +} + + +void dx_container_set_default_node_type(const dx_node_type_t *nt, + void *context, + dx_dist_mode_t supported_dist) +{ + if (default_node) + dx_container_destroy_node(default_node); + + if (nt) { + default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT); + dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); + } else { + default_node = 0; + dx_log(module, LOG_TRACE, "Default node removed"); + } +} + + +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, + const char *name, + void *context, + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy) +{ + int result; + dx_node_t *node = new_dx_node_t(); + if (!node) + return 0; + + node->ntype = nt; + node->name = 0; + node->context = context; + node->supported_dist = supported_dist; + node->life_policy = life_policy; + + if (name) { + dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); + sys_mutex_lock(lock); + result = hash_insert(node_map, iter, node); + sys_mutex_unlock(lock); + dx_field_iterator_free(iter); + if (result < 0) { + free_dx_node_t(node); + return 0; + } + + node->name = (char*) malloc(strlen(name) + 1); + strcpy(node->name, name); + } + + if (name) + dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); + + return node; +} + + +void dx_container_destroy_node(dx_node_t *node) +{ + if (node->name) { + dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); + sys_mutex_lock(lock); + hash_remove(node_map, iter); + sys_mutex_unlock(lock); + dx_field_iterator_free(iter); + free(node->name); + } + + free_dx_node_t(node); +} + + +void dx_container_node_set_context(dx_node_t *node, void *node_context) +{ + node->context = node_context; +} + + +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node) +{ + return node->supported_dist; +} + + +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node) +{ + return node->life_policy; +} + + +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name) +{ + pn_session_t *sess = pn_session(dx_connection_pn(conn)); + dx_link_t *link = new_dx_link_t(); + + if (dir == DX_OUTGOING) + link->pn_link = pn_sender(sess, name); + else + link->pn_link = pn_receiver(sess, name); + link->context = node->context; + link->node = node; + + pn_link_set_context(link->pn_link, link); + + pn_session_open(sess); + + return link; +} + + +void dx_link_set_context(dx_link_t *link, void *context) +{ + link->context = context; +} + + +void *dx_link_get_context(dx_link_t *link) +{ + return link->context; +} + + +pn_link_t *dx_link_pn(dx_link_t *link) +{ + return link->pn_link; +} + + +pn_terminus_t *dx_link_source(dx_link_t *link) +{ + return pn_link_source(link->pn_link); +} + + +pn_terminus_t *dx_link_target(dx_link_t *link) +{ + return pn_link_target(link->pn_link); +} + + +pn_terminus_t *dx_link_remote_source(dx_link_t *link) +{ + return pn_link_remote_source(link->pn_link); +} + + +pn_terminus_t *dx_link_remote_target(dx_link_t *link) +{ + return pn_link_remote_target(link->pn_link); +} + + +void dx_link_activate(dx_link_t *link) +{ + if (!link || !link->pn_link) + return; + + pn_session_t *sess = pn_link_session(link->pn_link); + if (!sess) + return; + + pn_connection_t *conn = pn_session_connection(sess); + if (!conn) + return; + + dx_connection_t *ctx = pn_connection_get_context(conn); + if (!ctx) + return; + + dx_server_activate(ctx); +} + + +void dx_link_close(dx_link_t *link) +{ + pn_link_close(link->pn_link); +} + + diff --git a/extras/dispatch/src/hash.c b/extras/dispatch/src/hash.c new file mode 100644 index 0000000000..c54d5d6fcf --- /dev/null +++ b/extras/dispatch/src/hash.c @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <stdio.h> +#include <string.h> + +typedef struct hash_item_t { + DEQ_LINKS(struct hash_item_t); + unsigned char *key; + union { + void *val; + const void *val_const; + } v; +} hash_item_t; + +ALLOC_DECLARE(hash_item_t); +ALLOC_DEFINE(hash_item_t); +DEQ_DECLARE(hash_item_t, items_t); + + +typedef struct bucket_t { + items_t items; +} bucket_t; + + +struct hash_t { + bucket_t *buckets; + unsigned int bucket_count; + unsigned int bucket_mask; + int batch_size; + size_t size; + int is_const; +}; + + +// djb2 hash algorithm +static unsigned long hash_function(dx_field_iterator_t *iter) +{ + unsigned long hash = 5381; + int c; + + while (!dx_field_iterator_end(iter)) { + c = (int) dx_field_iterator_octet(iter); + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + } + + return hash; +} + + +hash_t *hash(int bucket_exponent, int batch_size, int value_is_const) +{ + int i; + hash_t *h = NEW(hash_t); + + if (!h) + return 0; + + h->bucket_count = 1 << bucket_exponent; + h->bucket_mask = h->bucket_count - 1; + h->batch_size = batch_size; + h->size = 0; + h->is_const = value_is_const; + h->buckets = NEW_ARRAY(bucket_t, h->bucket_count); + for (i = 0; i < h->bucket_count; i++) { + DEQ_INIT(h->buckets[i].items); + } + + return h; +} + + +void hash_free(hash_t *h) +{ + // TODO - Implement this +} + + +size_t hash_size(hash_t *h) +{ + return h ? h->size : 0; +} + + +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + *error = 0; + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + if (item) { + *error = -1; + return 0; + } + + item = new_hash_item_t(); + if (!item) { + *error = -2; + return 0; + } + + DEQ_ITEM_INIT(item); + item->key = dx_field_iterator_copy(key); + + DEQ_INSERT_TAIL(h->buckets[idx].items, item); + h->size++; + return item; +} + + +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) +{ + int error = 0; + hash_item_t *item = hash_internal_insert(h, key, &error); + + if (item) + item->v.val = val; + return error; +} + + +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) +{ + if (!h->is_const) + return -3; + + int error = 0; + hash_item_t *item = hash_internal_insert(h, key, &error); + + if (item) + item->v.val_const = val; + return error; +} + + +static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + return item; +} + + +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) +{ + hash_item_t *item = hash_internal_retrieve(h, key); + if (item) { + *val = item->v.val; + return 0; + } + return -1; +} + + +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) +{ + if (!h->is_const) + return -3; + + hash_item_t *item = hash_internal_retrieve(h, key); + if (item) { + *val = item->v.val_const; + return 0; + } + return -1; +} + + +int hash_remove(hash_t *h, dx_field_iterator_t *key) +{ + unsigned long idx = hash_function(key) & h->bucket_mask; + hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); + + while (item) { + if (dx_field_iterator_equal(key, item->key)) + break; + item = item->next; + } + + if (item) { + free(item->key); + DEQ_REMOVE(h->buckets[idx].items, item); + free_hash_item_t(item); + h->size--; + return 0; + } + + return -1; +} + diff --git a/extras/dispatch/src/iovec.c b/extras/dispatch/src/iovec.c new file mode 100644 index 0000000000..6ff6874440 --- /dev/null +++ b/extras/dispatch/src/iovec.c @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iovec.h> +#include <qpid/dispatch/alloc.h> +#include <string.h> + +#define DX_IOVEC_MAX 64 + +struct dx_iovec_t { + struct iovec iov_array[DX_IOVEC_MAX]; + struct iovec *iov; + int iov_count; +}; + + +ALLOC_DECLARE(dx_iovec_t); +ALLOC_DEFINE(dx_iovec_t); + + +dx_iovec_t *dx_iovec(int vector_count) +{ + dx_iovec_t *iov = new_dx_iovec_t(); + if (!iov) + return 0; + + memset(iov, 0, sizeof(dx_iovec_t)); + + iov->iov_count = vector_count; + if (vector_count > DX_IOVEC_MAX) + iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count); + else + iov->iov = &iov->iov_array[0]; + + return iov; +} + + +void dx_iovec_free(dx_iovec_t *iov) +{ + if (!iov) + return; + + if (iov->iov && iov->iov != &iov->iov_array[0]) + free(iov->iov); + + free_dx_iovec_t(iov); +} + + +struct iovec *dx_iovec_array(dx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov; +} + + +int dx_iovec_count(dx_iovec_t *iov) +{ + if (!iov) + return 0; + return iov->iov_count; +} + diff --git a/extras/dispatch/src/iterator.c b/extras/dispatch/src/iterator.c new file mode 100644 index 0000000000..6ab67f948d --- /dev/null +++ b/extras/dispatch/src/iterator.c @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include "message_private.h" +#include <stdio.h> +#include <string.h> + +typedef enum { +MODE_TO_END, +MODE_TO_SLASH +} parse_mode_t; + +struct dx_field_iterator_t { + dx_buffer_t *start_buffer; + unsigned char *start_cursor; + int start_length; + dx_buffer_t *buffer; + unsigned char *cursor; + int length; + dx_iterator_view_t view; + parse_mode_t mode; +}; + + +ALLOC_DECLARE(dx_field_iterator_t); +ALLOC_DEFINE(dx_field_iterator_t); + + +typedef enum { +STATE_START, +STATE_SLASH_LEFT, +STATE_SKIPPING_TO_NEXT_SLASH, +STATE_SCANNING, +STATE_COLON, +STATE_COLON_SLASH, +STATE_AT_NODE_ID +} state_t; + + +static void view_initialize(dx_field_iterator_t *iter) +{ + if (iter->view == ITER_VIEW_ALL) { + iter->mode = MODE_TO_END; + return; + } + + // + // Advance to the node-id. + // + state_t state = STATE_START; + unsigned int octet; + while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { + octet = dx_field_iterator_octet(iter); + switch (state) { + case STATE_START : + if (octet == '/') + state = STATE_SLASH_LEFT; + else + state = STATE_SCANNING; + break; + + case STATE_SLASH_LEFT : + if (octet == '/') + state = STATE_SKIPPING_TO_NEXT_SLASH; + else + state = STATE_AT_NODE_ID; + break; + + case STATE_SKIPPING_TO_NEXT_SLASH : + if (octet == '/') + state = STATE_AT_NODE_ID; + break; + + case STATE_SCANNING : + if (octet == ':') + state = STATE_COLON; + break; + + case STATE_COLON : + if (octet == '/') + state = STATE_COLON_SLASH; + else + state = STATE_SCANNING; + break; + + case STATE_COLON_SLASH : + if (octet == '/') + state = STATE_SKIPPING_TO_NEXT_SLASH; + else + state = STATE_SCANNING; + break; + + case STATE_AT_NODE_ID : + break; + } + } + + if (state != STATE_AT_NODE_ID) { + // + // The address string was relative, not absolute. The node-id + // is at the beginning of the string. + // + iter->buffer = iter->start_buffer; + iter->cursor = iter->start_cursor; + iter->length = iter->start_length; + } + + // + // Cursor is now on the first octet of the node-id + // + if (iter->view == ITER_VIEW_NODE_ID) { + iter->mode = MODE_TO_SLASH; + return; + } + + if (iter->view == ITER_VIEW_NO_HOST) { + iter->mode = MODE_TO_END; + return; + } + + if (iter->view == ITER_VIEW_NODE_SPECIFIC) { + iter->mode = MODE_TO_END; + while (!dx_field_iterator_end(iter)) { + octet = dx_field_iterator_octet(iter); + if (octet == '/') + break; + } + return; + } +} + + +dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) +{ + dx_field_iterator_t *iter = new_dx_field_iterator_t(); + if (!iter) + return 0; + + iter->start_buffer = 0; + iter->start_cursor = (unsigned char*) text; + iter->start_length = strlen(text); + + dx_field_iterator_reset(iter, view); + + return iter; +} + + +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view) +{ + dx_field_iterator_t *iter = new_dx_field_iterator_t(); + if (!iter) + return 0; + + iter->start_buffer = buffer; + iter->start_cursor = dx_buffer_base(buffer) + offset; + iter->start_length = length; + + dx_field_iterator_reset(iter, view); + + return iter; +} + + +void dx_field_iterator_free(dx_field_iterator_t *iter) +{ + free_dx_field_iterator_t(iter); +} + + +void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view) +{ + iter->buffer = iter->start_buffer; + iter->cursor = iter->start_cursor; + iter->length = iter->start_length; + iter->view = view; + + view_initialize(iter); +} + + +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) +{ + if (iter->length == 0) + return (unsigned char) 0; + + unsigned char result = *(iter->cursor); + + iter->cursor++; + iter->length--; + + if (iter->length > 0) { + if (iter->buffer) { + if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) { + iter->buffer = iter->buffer->next; + if (iter->buffer == 0) + iter->length = 0; + iter->cursor = dx_buffer_base(iter->buffer); + } + } + } + + if (iter->length && iter->mode == MODE_TO_SLASH && *(iter->cursor) == '/') + iter->length = 0; + + return result; +} + + +int dx_field_iterator_end(dx_field_iterator_t *iter) +{ + return iter->length == 0; +} + + +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) +{ + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter) && *string) { + if (*string != dx_field_iterator_octet(iter)) + return 0; + string++; + } + + return (dx_field_iterator_end(iter) && (*string == 0)); +} + + +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) +{ + int length = 0; + int idx = 0; + unsigned char *copy; + + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter)) { + dx_field_iterator_octet(iter); + length++; + } + + dx_field_iterator_reset(iter, iter->view); + copy = (unsigned char*) malloc(length + 1); + while (!dx_field_iterator_end(iter)) + copy[idx++] = dx_field_iterator_octet(iter); + copy[idx] = '\0'; + + return copy; +} + diff --git a/extras/dispatch/src/log.c b/extras/dispatch/src/log.c new file mode 100644 index 0000000000..d4ec534915 --- /dev/null +++ b/extras/dispatch/src/log.c @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/log.h> +#include <stdarg.h> +#include <stdio.h> +#include <string.h> + +static int mask=LOG_INFO; + +static char *cls_prefix(int cls) +{ + switch (cls) { + case LOG_TRACE : return "TRACE"; + case LOG_ERROR : return "ERROR"; + case LOG_INFO : return "INFO"; + } + + return ""; +} + +void dx_log(const char *module, int cls, const char *fmt, ...) +{ + if (!(cls & mask)) + return; + + va_list ap; + char line[128]; + + va_start(ap, fmt); + vsnprintf(line, 127, fmt, ap); + va_end(ap); + fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line); +} + +void dx_log_set_mask(int _mask) +{ + mask = _mask; +} + diff --git a/extras/dispatch/src/message.c b/extras/dispatch/src/message.c new file mode 100644 index 0000000000..f66e79010c --- /dev/null +++ b/extras/dispatch/src/message.c @@ -0,0 +1,1120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include "message_private.h" +#include <string.h> +#include <stdio.h> + +ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); +ALLOC_DEFINE(dx_message_content_t); + + +static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) +{ + unsigned char *local_cursor = *cursor; + dx_buffer_t *local_buffer = *buffer; + + int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); + while (consume > 0) { + if (consume < remaining) { + local_cursor += consume; + consume = 0; + } else { + consume -= remaining; + local_buffer = local_buffer->next; + if (local_buffer == 0){ + local_cursor = 0; + break; + } + local_cursor = dx_buffer_base(local_buffer); + remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); + } + } + + *cursor = local_cursor; + *buffer = local_buffer; +} + + +static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) +{ + unsigned char result = **cursor; + advance(cursor, buffer, 1); + return result; +} + + +static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) +{ + unsigned char tag = next_octet(cursor, buffer); + if (!(*cursor)) return 0; + int consume = 0; + switch (tag & 0xF0) { + case 0x40 : consume = 0; break; + case 0x50 : consume = 1; break; + case 0x60 : consume = 2; break; + case 0x70 : consume = 4; break; + case 0x80 : consume = 8; break; + case 0x90 : consume = 16; break; + + case 0xB0 : + case 0xD0 : + case 0xF0 : + consume |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + consume |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + consume |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + // Fall through to the next case... + + case 0xA0 : + case 0xC0 : + case 0xE0 : + consume |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + break; + } + + if (field) { + field->buffer = *buffer; + field->offset = *cursor - dx_buffer_base(*buffer); + field->length = consume; + field->parsed = 1; + } + + advance(cursor, buffer, consume); + return 1; +} + + +static int start_list(unsigned char **cursor, dx_buffer_t **buffer) +{ + unsigned char tag = next_octet(cursor, buffer); + if (!(*cursor)) return 0; + int length = 0; + int count = 0; + + switch (tag) { + case 0x45 : // list0 + break; + case 0xd0 : // list32 + length |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + length |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + length |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + length |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + count |= ((int) next_octet(cursor, buffer)) << 24; + if (!(*cursor)) return 0; + count |= ((int) next_octet(cursor, buffer)) << 16; + if (!(*cursor)) return 0; + count |= ((int) next_octet(cursor, buffer)) << 8; + if (!(*cursor)) return 0; + count |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + break; + + case 0xc0 : // list8 + length |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + + count |= (int) next_octet(cursor, buffer); + if (!(*cursor)) return 0; + break; + } + + return count; +} + + +// +// Check the buffer chain, starting at cursor to see if it matches the pattern. +// If the pattern matches, check the next tag to see if it's in the set of expected +// tags. If not, return zero. If so, set the location descriptor to the good +// tag and advance the cursor (and buffer, if needed) to the end of the matched section. +// +// If there is no match, don't advance the cursor. +// +// Return 0 if the pattern matches but the following tag is unexpected +// Return 0 if the pattern matches and the location already has a pointer (duplicate section) +// Return 1 if the pattern matches and we've advanced the cursor/buffer +// Return 1 if the pattern does not match +// +static int dx_check_and_advance(dx_buffer_t **buffer, + unsigned char **cursor, + unsigned char *pattern, + int pattern_length, + unsigned char *expected_tags, + dx_field_location_t *location) +{ + dx_buffer_t *test_buffer = *buffer; + unsigned char *test_cursor = *cursor; + + if (!test_cursor) + return 1; // no match + + unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); + int idx = 0; + + while (idx < pattern_length && *test_cursor == pattern[idx]) { + idx++; + test_cursor++; + if (test_cursor == end_of_buffer) { + test_buffer = test_buffer->next; + if (test_buffer == 0) + return 1; // Pattern didn't match + test_cursor = dx_buffer_base(test_buffer); + end_of_buffer = test_cursor + dx_buffer_size(test_buffer); + } + } + + if (idx < pattern_length) + return 1; // Pattern didn't match + + // + // Pattern matched, check the tag + // + while (*expected_tags && *test_cursor != *expected_tags) + expected_tags++; + if (*expected_tags == 0) + return 0; // Unexpected tag + + if (location->parsed) + return 0; // Duplicate section + + // + // Pattern matched and tag is expected. Mark the beginning of the section. + // + location->parsed = 1; + location->buffer = test_buffer; + location->offset = test_cursor - dx_buffer_base(test_buffer); + location->length = 0; + + // + // Advance the pointers to consume the whole section. + // + int consume = 0; + unsigned char tag = next_octet(&test_cursor, &test_buffer); + if (!test_cursor) return 0; + switch (tag) { + case 0x45 : // list0 + break; + + case 0xd0 : // list32 + case 0xd1 : // map32 + case 0xb0 : // vbin32 + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; + if (!test_cursor) return 0; + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; + if (!test_cursor) return 0; + consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; + if (!test_cursor) return 0; + // Fall through to the next case... + + case 0xc0 : // list8 + case 0xc1 : // map8 + case 0xa0 : // vbin8 + consume |= (int) next_octet(&test_cursor, &test_buffer); + if (!test_cursor) return 0; + break; + } + + if (consume) + advance(&test_cursor, &test_buffer, consume); + + *cursor = test_cursor; + *buffer = test_buffer; + return 1; +} + + +static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) +{ + dx_buffer_t *buf = DEQ_TAIL(msg->buffers); + + while (len > 0) { + if (buf == 0 || dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); + if (buf == 0) + return; + DEQ_INSERT_TAIL(msg->buffers, buf); + } + + size_t to_copy = dx_buffer_capacity(buf); + if (to_copy > len) + to_copy = len; + memcpy(dx_buffer_cursor(buf), seq, to_copy); + dx_buffer_insert(buf, to_copy); + len -= to_copy; + seq += to_copy; + msg->length += to_copy; + } +} + + +static void dx_insert_8(dx_message_content_t *msg, uint8_t value) +{ + dx_insert(msg, &value, 1); +} + + +static void dx_insert_32(dx_message_content_t *msg, uint32_t value) +{ + uint8_t buf[4]; + buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); + buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); + buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); + buf[3] = (uint8_t) (value & 0x000000FF); + dx_insert(msg, buf, 4); +} + + +static void dx_insert_64(dx_message_content_t *msg, uint64_t value) +{ + uint8_t buf[8]; + buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); + buf[1] = (uint8_t) ((value & 0x00FF000000000000L) >> 48); + buf[2] = (uint8_t) ((value & 0x0000FF0000000000L) >> 40); + buf[3] = (uint8_t) ((value & 0x000000FF00000000L) >> 32); + buf[4] = (uint8_t) ((value & 0x00000000FF000000L) >> 24); + buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); + buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); + buf[7] = (uint8_t) (value & 0x00000000000000FFL); + dx_insert(msg, buf, 8); +} + + +static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) +{ + while (*buf) { + if (*cursor >= dx_buffer_size(*buf)) { + *buf = (*buf)->next; + *cursor = 0; + } else { + dx_buffer_base(*buf)[*cursor] = value; + (*cursor)++; + return; + } + } +} + + +static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) +{ + dx_buffer_t *buf = field->buffer; + size_t cursor = field->offset; + + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); +} + + +static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) +{ + // + // Insert the short-form performative tag + // + dx_insert(msg, (const uint8_t*) "\x00\x53", 2); + dx_insert_8(msg, code); + + // + // Open the list with a list32 tag + // + dx_insert_8(msg, 0xd0); + + // + // Mark the current location to later overwrite the length + // + msg->compose_length.buffer = DEQ_TAIL(msg->buffers); + msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); + msg->compose_length.length = 4; + msg->compose_length.parsed = 1; + + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + + // + // Mark the current location to later overwrite the count + // + msg->compose_count.buffer = DEQ_TAIL(msg->buffers); + msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); + msg->compose_count.length = 4; + msg->compose_count.parsed = 1; + + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + + msg->length = 4; // Include the length of the count field + msg->count = 0; +} + + +static void dx_end_list(dx_message_content_t *msg) +{ + dx_overwrite_32(&msg->compose_length, msg->length); + dx_overwrite_32(&msg->compose_count, msg->count); +} + + +static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + + switch (field) { + case DX_FIELD_TO: + while (1) { + if (content->field_to.parsed) + return &content->field_to; + + if (content->section_message_properties.parsed == 0) + break; + + dx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; + + int count = start_list(&cursor, &buffer); + int result; + + if (count < 3) + break; + + result = traverse_field(&cursor, &buffer, 0); // message_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, 0); // user_id + if (!result) return 0; + result = traverse_field(&cursor, &buffer, &content->field_to); // to + if (!result) return 0; + } + break; + + case DX_FIELD_BODY: + while (1) { + if (content->body.parsed) + return &content->body; + + if (content->section_body.parsed == 0) + break; + + dx_buffer_t *buffer = content->section_body.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; + int result; + + result = traverse_field(&cursor, &buffer, &content->body); + if (!result) return 0; + } + break; + + default: + break; + } + + return 0; +} + + +dx_message_t *dx_allocate_message() +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); + if (!msg) + return 0; + + DEQ_ITEM_INIT(msg); + msg->content = new_dx_message_content_t(); + msg->out_delivery = 0; + + if (msg->content == 0) { + free_dx_message_t((dx_message_t*) msg); + return 0; + } + + memset(msg->content, 0, sizeof(dx_message_content_t)); + msg->content->lock = sys_mutex(); + msg->content->ref_count = 1; + + return (dx_message_t*) msg; +} + + +void dx_free_message(dx_message_t *in_msg) +{ + uint32_t rc; + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + + sys_mutex_lock(content->lock); + rc = --content->ref_count; + sys_mutex_unlock(content->lock); + + if (rc == 0) { + dx_buffer_t *buf = DEQ_HEAD(content->buffers); + + while (buf) { + DEQ_REMOVE_HEAD(content->buffers); + dx_free_buffer(buf); + buf = DEQ_HEAD(content->buffers); + } + + sys_mutex_free(content->lock); + free_dx_message_content_t(content); + } + + free_dx_message_t((dx_message_t*) msg); +} + + +dx_message_t *dx_message_copy(dx_message_t *in_msg) +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); + + if (!copy) + return 0; + + DEQ_ITEM_INIT(copy); + copy->content = content; + copy->out_delivery = 0; + + sys_mutex_lock(content->lock); + content->ref_count++; + sys_mutex_unlock(content->lock); + + return (dx_message_t*) copy; +} + + +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) +{ + ((dx_message_pvt_t*) msg)->out_delivery = delivery; +} + + +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) +{ + return ((dx_message_pvt_t*) msg)->out_delivery; +} + + +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + content->in_delivery = delivery; +} + + +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + return content->in_delivery; +} + + +dx_message_t *dx_message_receive(pn_delivery_t *delivery) +{ + pn_link_t *link = pn_delivery_link(delivery); + dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); + ssize_t rc; + dx_buffer_t *buf; + + // + // If there is no message associated with the delivery, this is the first time + // we've received anything on this delivery. Allocate a message descriptor and + // link it and the delivery together. + // + if (!msg) { + msg = (dx_message_pvt_t*) dx_allocate_message(); + pn_delivery_set_context(delivery, (void*) msg); + + // + // Record the incoming delivery only if it is not settled. If it is + // settled, it should not be recorded as no future operations on it are + // permitted. + // + if (!pn_delivery_settled(delivery)) + msg->content->in_delivery = delivery; + } + + // + // Get a reference to the tail buffer on the message. This is the buffer into which + // we will store incoming message data. If there is no buffer in the message, allocate + // an empty one and add it to the message. + // + buf = DEQ_TAIL(msg->content->buffers); + if (!buf) { + buf = dx_allocate_buffer(); + DEQ_INSERT_TAIL(msg->content->buffers, buf); + } + + while (1) { + // + // Try to receive enough data to fill the remaining space in the tail buffer. + // + rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); + + // + // If we receive PN_EOS, we have come to the end of the message. + // + if (rc == PN_EOS) { + // + // If the last buffer in the list is empty, remove it and free it. This + // will only happen if the size of the message content is an exact multiple + // of the buffer size. + // + if (dx_buffer_size(buf) == 0) { + DEQ_REMOVE_TAIL(msg->content->buffers); + dx_free_buffer(buf); + } + return (dx_message_t*) msg; + } + + if (rc > 0) { + // + // We have received a positive number of bytes for the message. Advance + // the cursor in the buffer. + // + dx_buffer_insert(buf, rc); + + // + // If the buffer is full, allocate a new empty buffer and append it to the + // tail of the message's list. + // + if (dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); + DEQ_INSERT_TAIL(msg->content->buffers, buf); + } + } else + // + // We received zero bytes, and no PN_EOS. This means that we've received + // all of the data available up to this point, but it does not constitute + // the entire message. We'll be back later to finish it up. + // + break; + } + + return 0; +} + + +void dx_message_send(dx_message_t *in_msg, pn_link_t *link) +{ + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); + + // TODO - Handle cases where annotations have been added or modified + while (buf) { + pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); + buf = DEQ_NEXT(buf); + } +} + + +int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) +{ + +#define LONG 10 +#define SHORT 3 +#define MSG_HDR_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70" +#define MSG_HDR_SHORT (unsigned char*) "\x00\x53\x70" +#define DELIVERY_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71" +#define DELIVERY_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x71" +#define MESSAGE_ANNOTATION_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72" +#define MESSAGE_ANNOTATION_SHORT (unsigned char*) "\x00\x53\x72" +#define PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73" +#define PROPERTIES_SHORT (unsigned char*) "\x00\x53\x73" +#define APPLICATION_PROPERTIES_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74" +#define APPLICATION_PROPERTIES_SHORT (unsigned char*) "\x00\x53\x74" +#define BODY_DATA_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75" +#define BODY_DATA_SHORT (unsigned char*) "\x00\x53\x75" +#define BODY_SEQUENCE_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76" +#define BODY_SEQUENCE_SHORT (unsigned char*) "\x00\x53\x76" +#define FOOTER_LONG (unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78" +#define FOOTER_SHORT (unsigned char*) "\x00\x53\x78" +#define TAGS_LIST (unsigned char*) "\x45\xc0\xd0" +#define TAGS_MAP (unsigned char*) "\xc1\xd1" +#define TAGS_BINARY (unsigned char*) "\xa0\xb0" + + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_buffer_t *buffer = DEQ_HEAD(content->buffers); + unsigned char *cursor; + + if (!buffer) + return 0; // Invalid - No data in the message + + if (depth == DX_DEPTH_NONE) + return 1; + + cursor = dx_buffer_base(buffer); + + // + // MESSAGE HEADER + // + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) + return 0; + + if (depth == DX_DEPTH_HEADER) + return 1; + + // + // DELIVERY ANNOTATION + // + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) + return 0; + + if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) + return 1; + + // + // MESSAGE ANNOTATION + // + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) + return 0; + + if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) + return 1; + + // + // PROPERTIES + // + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) + return 0; + + if (depth == DX_DEPTH_PROPERTIES) + return 1; + + // + // APPLICATION PROPERTIES + // + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) + return 0; + + if (depth == DX_DEPTH_APPLICATION_PROPERTIES) + return 1; + + // + // BODY (Note that this function expects a single data section or a single AMQP sequence) + // + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) + return 0; + + if (depth == DX_DEPTH_BODY) + return 1; + + // + // FOOTER + // + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) + return 0; + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) + return 0; + + return 1; +} + + +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) +{ + dx_field_location_t *loc = dx_message_field_location(msg, field); + if (!loc) + return 0; + + return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); +} + + +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field) +{ + dx_field_location_t *loc = dx_message_field_location(msg, field); + if (!loc) + return 0; + + // + // Count the number of buffers this field straddles + // + int bufcnt = 1; + dx_buffer_t *buf = loc->buffer; + size_t bufsize = dx_buffer_size(buf) - loc->offset; + ssize_t remaining = loc->length - bufsize; + + while (remaining > 0) { + bufcnt++; + buf = buf->next; + if (!buf) + return 0; + remaining -= dx_buffer_size(buf); + } + + // + // Allocate an iovec object big enough to hold the number of buffers + // + dx_iovec_t *iov = dx_iovec(bufcnt); + if (!iov) + return 0; + + // + // Build out the io vectors with pointers to the segments of the field in buffers + // + bufcnt = 0; + buf = loc->buffer; + bufsize = dx_buffer_size(buf) - loc->offset; + void *base = dx_buffer_base(buf) + loc->offset; + remaining = loc->length; + + while (remaining > 0) { + dx_iovec_array(iov)[bufcnt].iov_base = base; + dx_iovec_array(iov)[bufcnt].iov_len = bufsize; + bufcnt++; + remaining -= bufsize; + if (remaining > 0) { + buf = buf->next; + base = dx_buffer_base(buf); + bufsize = dx_buffer_size(buf); + if (bufsize > remaining) + bufsize = remaining; + } + } + + return iov; +} + + +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) +{ + dx_message_begin_header(msg); + dx_message_insert_boolean(msg, 0); // durable + //dx_message_insert_null(msg); // priority + //dx_message_insert_null(msg); // ttl + //dx_message_insert_boolean(msg, 0); // first-acquirer + //dx_message_insert_uint(msg, 0); // delivery-count + dx_message_end_header(msg); + + dx_message_begin_message_properties(msg); + dx_message_insert_null(msg); // message-id + dx_message_insert_null(msg); // user-id + dx_message_insert_string(msg, to); // to + //dx_message_insert_null(msg); // subject + //dx_message_insert_null(msg); // reply-to + //dx_message_insert_null(msg); // correlation-id + //dx_message_insert_null(msg); // content-type + //dx_message_insert_null(msg); // content-encoding + //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time + //dx_message_insert_timestamp(msg, 0); // creation-time + //dx_message_insert_null(msg); // group-id + //dx_message_insert_uint(msg, 0); // group-sequence + //dx_message_insert_null(msg); // reply-to-group-id + dx_message_end_message_properties(msg); + + if (buffers) + dx_message_append_body_data(msg, buffers); +} + + +void dx_message_begin_header(dx_message_t *msg) +{ + dx_start_list_performative(MSG_CONTENT(msg), 0x70); +} + + +void dx_message_end_header(dx_message_t *msg) +{ + dx_end_list(MSG_CONTENT(msg)); +} + + +void dx_message_begin_delivery_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_delivery_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_message_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_message_annotations(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_message_properties(dx_message_t *msg) +{ + dx_start_list_performative(MSG_CONTENT(msg), 0x73); +} + + +void dx_message_end_message_properties(dx_message_t *msg) +{ + dx_end_list(MSG_CONTENT(msg)); +} + + +void dx_message_begin_application_properties(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_application_properties(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_buffer_t *buf = DEQ_HEAD(*buffers); + uint32_t len = 0; + + // + // Calculate the size of the body to be appended. + // + while (buf) { + len += dx_buffer_size(buf); + buf = DEQ_NEXT(buf); + } + + // + // Insert a DATA section performative header. + // + dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); + if (len < 256) { + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); + } else { + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); + } + + // + // Move the supplied buffers to the tail of the message's buffer list. + // + buf = DEQ_HEAD(*buffers); + while (buf) { + DEQ_REMOVE_HEAD(*buffers); + DEQ_INSERT_TAIL(content->buffers, buf); + buf = DEQ_HEAD(*buffers); + } +} + + +void dx_message_begin_body_sequence(dx_message_t *msg) +{ +} + + +void dx_message_end_body_sequence(dx_message_t *msg) +{ +} + + +void dx_message_begin_footer(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_footer(dx_message_t *msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_insert_null(dx_message_t *msg) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x40); + content->count++; +} + + +void dx_message_insert_boolean(dx_message_t *msg, int value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value) + dx_insert(content, (const uint8_t*) "\x56\x01", 2); + else + dx_insert(content, (const uint8_t*) "\x56\x00", 2); + content->count++; +} + + +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x50); + dx_insert_8(content, value); + content->count++; +} + + +void dx_message_insert_uint(dx_message_t *msg, uint32_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value == 0) { + dx_insert_8(content, 0x43); // uint0 + } else if (value < 256) { + dx_insert_8(content, 0x52); // smalluint + dx_insert_8(content, (uint8_t) value); + } else { + dx_insert_8(content, 0x70); // uint + dx_insert_32(content, value); + } + content->count++; +} + + +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (value == 0) { + dx_insert_8(content, 0x44); // ulong0 + } else if (value < 256) { + dx_insert_8(content, 0x53); // smallulong + dx_insert_8(content, (uint8_t) value); + } else { + dx_insert_8(content, 0x80); // ulong + dx_insert_64(content, value); + } + content->count++; +} + + +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (len < 256) { + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); + } else { + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); + } + dx_insert(content, start, len); + content->count++; +} + + +void dx_message_insert_string(dx_message_t *msg, const char *start) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + uint32_t len = strlen(start); + + if (len < 256) { + dx_insert_8(content, 0xa1); // str8-utf8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); + } else { + dx_insert_8(content, 0xb1); // str32-utf8 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); + } + content->count++; +} + + +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x98); // uuid + dx_insert(content, value, 16); + content->count++; +} + + +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + if (len < 256) { + dx_insert_8(content, 0xa3); // sym8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); + } else { + dx_insert_8(content, 0xb3); // sym32 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); + } + content->count++; +} + + +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) +{ + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x83); // timestamp + dx_insert_64(content, value); + content->count++; +} + + +void dx_message_begin_list(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_list(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_begin_map(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + + +void dx_message_end_map(dx_message_t* msg) +{ + assert(0); // Not Implemented +} + diff --git a/extras/dispatch/src/message_private.h b/extras/dispatch/src/message_private.h new file mode 100644 index 0000000000..5fb18078f5 --- /dev/null +++ b/extras/dispatch/src/message_private.h @@ -0,0 +1,94 @@ +#ifndef __message_private_h__ +#define __message_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/threading.h> + +/** + * Architecture of the message module: + * + * +--------------+ +----------------------+ + * | | | | + * | dx_message_t |----------->| dx_message_content_t | + * | | +----->| | + * +--------------+ | +----------------------+ + * | | + * +--------------+ | | +-------------+ +-------------+ +-------------+ + * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/ + * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+ + * | | + * +--------------+ + * + * The message module provides chained-fixed-sized-buffer storage of message content with multiple + * references. If a message is received and is to be queued for multiple destinations, there is only + * one copy of the message content in memory but multiple lightweight references to the content. + * + */ + +typedef struct { + dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present + size_t offset; // Offset in the buffer to the first octet + size_t length; // Length of the field or zero if unneeded + int parsed; // non-zero iff the buffer chain has been parsed to find this field +} dx_field_location_t; + + +// TODO - consider using pointers to dx_field_location_t below to save memory +// TODO - we need a second buffer list for modified annotations and header +// There are three message scenarios: +// 1) Received message is held and forwarded unmodified - single buffer list +// 2) Received message is held and modified before forwarding - two buffer lists +// 3) Message is composed internally - single buffer list + +typedef struct { + sys_mutex_t *lock; + uint32_t ref_count; // The number of qmessages referencing this + dx_buffer_list_t buffers; // The buffer chain containing the message + pn_delivery_t *in_delivery; // The delivery on which the message arrived + dx_field_location_t section_message_header; // The message header list + dx_field_location_t section_delivery_annotation; // The delivery annotation map + dx_field_location_t section_message_annotation; // The message annotation map + dx_field_location_t section_message_properties; // The message properties list + dx_field_location_t section_application_properties; // The application properties list + dx_field_location_t section_body; // The message body: Data + dx_field_location_t section_footer; // The footer + dx_field_location_t field_user_id; // The string value of the user-id + dx_field_location_t field_to; // The string value of the to field + dx_field_location_t body; // The body of the message + dx_field_location_t compose_length; + dx_field_location_t compose_count; + uint32_t length; + uint32_t count; +} dx_message_content_t; + +typedef struct { + DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t + dx_message_content_t *content; + pn_delivery_t *out_delivery; +} dx_message_pvt_t; + +ALLOC_DECLARE(dx_message_t); +ALLOC_DECLARE(dx_message_content_t); + +#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content) + +#endif diff --git a/extras/dispatch/src/posix/threading.c b/extras/dispatch/src/posix/threading.c new file mode 100644 index 0000000000..8edce86cdc --- /dev/null +++ b/extras/dispatch/src/posix/threading.c @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/ctools.h> +#include <stdio.h> +#include <pthread.h> + +struct sys_mutex_t { + pthread_mutex_t mutex; + int acquired; +}; + +sys_mutex_t *sys_mutex(void) +{ + sys_mutex_t *mutex = NEW(sys_mutex_t); + pthread_mutex_init(&(mutex->mutex), 0); + mutex->acquired = 0; + return mutex; +} + + +void sys_mutex_free(sys_mutex_t *mutex) +{ + assert(!mutex->acquired); + pthread_mutex_destroy(&(mutex->mutex)); + free(mutex); +} + + +void sys_mutex_lock(sys_mutex_t *mutex) +{ + pthread_mutex_lock(&(mutex->mutex)); + assert(!mutex->acquired); + mutex->acquired++; +} + + +void sys_mutex_unlock(sys_mutex_t *mutex) +{ + mutex->acquired--; + assert(!mutex->acquired); + pthread_mutex_unlock(&(mutex->mutex)); +} + + +struct sys_cond_t { + pthread_cond_t cond; +}; + + +sys_cond_t *sys_cond(void) +{ + sys_cond_t *cond = NEW(sys_cond_t); + pthread_cond_init(&(cond->cond), 0); + return cond; +} + + +void sys_cond_free(sys_cond_t *cond) +{ + pthread_cond_destroy(&(cond->cond)); + free(cond); +} + + +void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex) +{ + assert(held_mutex->acquired); + held_mutex->acquired--; + pthread_cond_wait(&(cond->cond), &(held_mutex->mutex)); + held_mutex->acquired++; +} + + +void sys_cond_signal(sys_cond_t *cond) +{ + pthread_cond_signal(&(cond->cond)); +} + + +void sys_cond_signal_all(sys_cond_t *cond) +{ + pthread_cond_broadcast(&(cond->cond)); +} + + +struct sys_thread_t { + pthread_t thread; +}; + +sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg) +{ + sys_thread_t *thread = NEW(sys_thread_t); + pthread_create(&(thread->thread), 0, run_function, arg); + return thread; +} + + +void sys_thread_free(sys_thread_t *thread) +{ + free(thread); +} + + +void sys_thread_join(sys_thread_t *thread) +{ + pthread_join(thread->thread, 0); +} + diff --git a/extras/dispatch/src/router_node.c b/extras/dispatch/src/router_node.c new file mode 100644 index 0000000000..6ddc8f45dd --- /dev/null +++ b/extras/dispatch/src/router_node.c @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> +#include <qpid/dispatch/router.h> + +static char *module="ROUTER_NODE"; + +struct dx_router_t { + dx_node_t *node; + dx_link_list_t in_links; + dx_link_list_t out_links; + dx_message_list_t in_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; + hash_t *out_hash; + uint64_t dtag; +}; + + +typedef struct { + dx_link_t *link; + dx_message_list_t out_fifo; +} dx_router_link_t; + + +ALLOC_DECLARE(dx_router_link_t); +ALLOC_DEFINE(dx_router_link_t); + + +/** + * Outbound Delivery Handler + */ +static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_message_t *msg; + size_t size; + + sys_mutex_lock(router->lock); + msg = DEQ_HEAD(rlink->out_fifo); + if (!msg) { + // TODO - Recind the delivery + sys_mutex_unlock(router->lock); + return; + } + + DEQ_REMOVE_HEAD(rlink->out_fifo); + size = (DEQ_SIZE(rlink->out_fifo)); + sys_mutex_unlock(router->lock); + + dx_message_send(msg, pn_link); + + // + // If there is no incoming delivery, it was pre-settled. In this case, + // we must pre-settle the outgoing delivery as well. + // + if (dx_message_in_delivery(msg)) { + pn_delivery_set_context(delivery, (void*) msg); + dx_message_set_out_delivery(msg, delivery); + } else { + pn_delivery_settle(delivery); + dx_free_message(msg); + } + + pn_link_advance(pn_link); + pn_link_offered(pn_link, size); +} + + +/** + * Inbound Delivery Handler + */ +static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_message_t *msg; + int valid_message = 0; + + // + // Receive the message into a local representation. If the returned message + // pointer is NULL, we have not yet received a complete message. + // + sys_mutex_lock(router->lock); + msg = dx_message_receive(delivery); + sys_mutex_unlock(router->lock); + + if (!msg) + return; + + // + // Validate the message through the Properties section + // + valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); + + pn_link_advance(pn_link); + pn_link_flow(pn_link, 1); + + if (valid_message) { + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); + dx_router_link_t *rlink; + if (iter) { + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + sys_mutex_lock(router->lock); + int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + dx_field_iterator_free(iter); + + if (result == 0) { + // + // To field is valid and contains a known destination. Enqueue on + // the output fifo for the next-hop-to-destination. + // + pn_link_t* pn_outlink = dx_link_pn(rlink->link); + DEQ_INSERT_TAIL(rlink->out_fifo, msg); + pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo)); + dx_link_activate(rlink->link); + } else { + // + // To field contains an unknown address. Release the message. + // + pn_delivery_update(delivery, PN_RELEASED); + pn_delivery_settle(delivery); + } + + sys_mutex_unlock(router->lock); + } + } else { + // + // Message is invalid. Reject the message. + // + pn_delivery_update(delivery, PN_REJECTED); + pn_delivery_settle(delivery); + pn_delivery_set_context(delivery, 0); + dx_free_message(msg); + } +} + + +/** + * Delivery Disposition Handler + */ +static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) +{ + pn_link_t *pn_link = pn_delivery_link(delivery); + + if (pn_link_is_sender(pn_link)) { + pn_disposition_t disp = pn_delivery_remote_state(delivery); + dx_message_t *msg = pn_delivery_get_context(delivery); + pn_delivery_t *activate = 0; + + if (msg) { + assert(delivery == dx_message_out_delivery(msg)); + if (disp != 0) { + activate = dx_message_in_delivery(msg); + pn_delivery_update(activate, disp); + // TODO - handling of the data accompanying RECEIVED/MODIFIED + } + + if (pn_delivery_settled(delivery)) { + // + // Downstream delivery has been settled. Propagate the settlement + // upstream. + // + activate = dx_message_in_delivery(msg); + pn_delivery_settle(activate); + pn_delivery_settle(delivery); + dx_free_message(msg); + } + + if (activate) { + // + // Activate the upstream/incoming link so that the settlement will + // get pushed out. + // + dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate)); + dx_link_activate(act_link); + } + + return; + } + } + + pn_delivery_settle(delivery); +} + + +/** + * New Incoming Link Handler + */ +static int router_incoming_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + dx_link_item_t *item = new_dx_link_item_t(); + pn_link_t *pn_link = dx_link_pn(link); + + if (item) { + DEQ_ITEM_INIT(item); + item->link = link; + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, item); + sys_mutex_unlock(router->lock); + + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_flow(pn_link, 8); + pn_link_open(pn_link); + } else { + pn_link_close(pn_link); + } + return 0; +} + + +/** + * New Outgoing Link Handler + */ +static int router_outgoing_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + + sys_mutex_lock(router->lock); + dx_router_link_t *rlink = new_dx_router_link_t(); + rlink->link = link; + DEQ_INIT(rlink->out_fifo); + dx_link_set_context(link, rlink); + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + int result = hash_insert(router->out_hash, iter, rlink); + dx_field_iterator_free(iter); + + if (result == 0) { + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_open(pn_link); + sys_mutex_unlock(router->lock); + dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); + return 0; + } + + dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); + pn_link_close(pn_link); + sys_mutex_unlock(router->lock); + return 0; +} + + +/** + * Outgoing Link Writable Handler + */ +static int router_writable_link_handler(void* context, dx_link_t *link) +{ + dx_router_t *router = (dx_router_t*) context; + int grant_delivery = 0; + pn_delivery_t *delivery; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + pn_link_t *pn_link = dx_link_pn(link); + uint64_t tag; + + sys_mutex_lock(router->lock); + if (DEQ_SIZE(rlink->out_fifo) > 0) { + grant_delivery = 1; + tag = router->dtag++; + } + sys_mutex_unlock(router->lock); + + if (grant_delivery) { + pn_delivery(pn_link, pn_dtag((char*) &tag, 8)); + delivery = pn_link_current(pn_link); + if (delivery) { + router_tx_handler(context, link, delivery); + return 1; + } + } + + return 0; +} + + +/** + * Link Detached Handler + */ +static int router_link_detach_handler(void* context, dx_link_t *link, int closed) +{ + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); + dx_link_item_t *item; + + sys_mutex_lock(router->lock); + if (pn_link_is_sender(pn_link)) { + item = DEQ_HEAD(router->out_links); + + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_router_link_t *rlink; + if (iter) { + int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); + if (result == 0) { + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + hash_remove(router->out_hash, iter); + free_dx_router_link_t(rlink); + dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + } + dx_field_iterator_free(iter); + } + } + else + item = DEQ_HEAD(router->in_links); + + while (item) { + if (item->link == link) { + if (pn_link_is_sender(pn_link)) + DEQ_REMOVE(router->out_links, item); + else + DEQ_REMOVE(router->in_links, item); + free_dx_link_item_t(item); + break; + } + item = item->next; + } + + sys_mutex_unlock(router->lock); + return 0; +} + + +static void router_inbound_open_handler(void *type_context, dx_connection_t *conn) +{ +} + + +static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) +{ +} + + +static void dx_router_timer_handler(void *context) +{ + dx_router_t *router = (dx_router_t*) context; + + // + // Periodic processing. + // + dx_timer_schedule(router->timer, 1000); +} + + +static dx_node_type_t router_node = {"router", 0, 0, + router_rx_handler, + router_tx_handler, + router_disp_handler, + router_incoming_link_handler, + router_outgoing_link_handler, + router_writable_link_handler, + router_link_detach_handler, + 0, // node_created_handler + 0, // node_destroyed_handler + router_inbound_open_handler, + router_outbound_open_handler }; +static int type_registered = 0; + + +dx_router_t *dx_router(dx_router_configuration_t *config) +{ + if (!type_registered) { + type_registered = 1; + dx_container_register_node_type(&router_node); + } + + dx_router_t *router = NEW(dx_router_t); + dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH); + + DEQ_INIT(router->in_links); + DEQ_INIT(router->out_links); + DEQ_INIT(router->in_fifo); + + router->lock = sys_mutex(); + + router->timer = dx_timer(dx_router_timer_handler, (void*) router); + dx_timer_schedule(router->timer, 0); // Immediate + + router->out_hash = hash(10, 32, 0); + router->dtag = 1; + + return router; +} + + +void dx_router_free(dx_router_t *router) +{ + dx_container_set_default_node_type(0, 0, DX_DIST_BOTH); + sys_mutex_free(router->lock); + free(router); +} + diff --git a/extras/dispatch/src/server.c b/extras/dispatch/src/server.c new file mode 100644 index 0000000000..0099393f60 --- /dev/null +++ b/extras/dispatch/src/server.c @@ -0,0 +1,903 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> +#include "server_private.h" +#include "timer_private.h" +#include "alloc_private.h" +#include "auth.h" +#include "work_queue.h" +#include <stdio.h> +#include <time.h> +#include <signal.h> + +static char *module="SERVER"; + +typedef struct dx_thread_t { + int thread_id; + volatile int running; + volatile int canceled; + int using_thread; + sys_thread_t *thread; +} dx_thread_t; + + +typedef struct dx_server_t { + int thread_count; + pn_driver_t *driver; + dx_thread_start_cb_t start_handler; + dx_conn_handler_cb_t conn_handler; + dx_signal_handler_cb_t signal_handler; + dx_user_fd_handler_cb_t ufd_handler; + void *start_context; + void *conn_context; + void *signal_context; + sys_cond_t *cond; + sys_mutex_t *lock; + dx_thread_t **threads; + work_queue_t *work_queue; + dx_timer_list_t pending_timers; + bool a_thread_is_waiting; + int threads_active; + int pause_requests; + int threads_paused; + int pause_next_sequence; + int pause_now_serving; + int pending_signal; +} dx_server_t; + + +ALLOC_DEFINE(dx_listener_t); +ALLOC_DEFINE(dx_connector_t); +ALLOC_DEFINE(dx_connection_t); +ALLOC_DEFINE(dx_user_fd_t); + + +/** + * Singleton Concurrent Proton Driver object + */ +static dx_server_t *dx_server = 0; + + +static void signal_handler(int signum) +{ + dx_server->pending_signal = signum; + sys_cond_signal_all(dx_server->cond); +} + + +static dx_thread_t *thread(int id) +{ + dx_thread_t *thread = NEW(dx_thread_t); + if (!thread) + return 0; + + thread->thread_id = id; + thread->running = 0; + thread->canceled = 0; + thread->using_thread = 0; + + return thread; +} + + +static void thread_process_listeners(pn_driver_t *driver) +{ + pn_listener_t *listener = pn_driver_listener(driver); + pn_connector_t *cxtr; + dx_connection_t *ctx; + + while (listener) { + dx_log(module, LOG_TRACE, "Accepting Connection"); + cxtr = pn_listener_accept(listener); + ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_SASL_SERVER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_cxtr = cxtr; + ctx->pn_conn = 0; + ctx->listener = (dx_listener_t*) pn_listener_context(listener); + ctx->connector = 0; + ctx->context = ctx->listener->context; + ctx->ufd = 0; + + pn_connector_set_context(cxtr, ctx); + listener = pn_driver_listener(driver); + } +} + + +static void handle_signals_LH(void) +{ + int signum = dx_server->pending_signal; + + if (signum) { + dx_server->pending_signal = 0; + if (dx_server->signal_handler) { + sys_mutex_unlock(dx_server->lock); + dx_server->signal_handler(dx_server->signal_context, signum); + sys_mutex_lock(dx_server->lock); + } + } +} + + +static void block_if_paused_LH(void) +{ + if (dx_server->pause_requests > 0) { + dx_server->threads_paused++; + sys_cond_signal_all(dx_server->cond); + while (dx_server->pause_requests > 0) + sys_cond_wait(dx_server->cond, dx_server->lock); + dx_server->threads_paused--; + } +} + + +static void process_connector(pn_connector_t *cxtr) +{ + dx_connection_t *ctx = pn_connector_context(cxtr); + int events = 0; + int auth_passes = 0; + + if (ctx->state == CONN_STATE_USER) { + dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); + return; + } + + do { + // + // Step the engine for pre-handler processing + // + pn_connector_process(cxtr); + + // + // Call the handler that is appropriate for the connector's state. + // + switch (ctx->state) { + case CONN_STATE_CONNECTING: + if (!pn_connector_closed(cxtr)) { + ctx->state = CONN_STATE_SASL_CLIENT; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; + events = 1; + } else { + ctx->state = CONN_STATE_FAILED; + events = 0; + } + break; + + case CONN_STATE_SASL_CLIENT: + if (auth_passes == 0) { + auth_client_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_SASL_SERVER: + if (auth_passes == 0) { + auth_server_handler(cxtr); + events = 1; + } else { + auth_passes++; + events = 0; + } + break; + + case CONN_STATE_OPENING: + ctx->state = CONN_STATE_OPERATIONAL; + + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, "dispatch"); // TODO - make unique + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + + case CONN_STATE_OPERATIONAL: + if (pn_connector_closed(cxtr)) { + dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_CLOSE, + (dx_connection_t*) pn_connector_context(cxtr)); + events = 0; + } + else + events = dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_PROCESS, + (dx_connection_t*) pn_connector_context(cxtr)); + break; + + default: + break; + } + } while (events > 0); +} + + +// +// TEMPORARY FUNCTION PROTOTYPES +// +void pn_driver_wait_1(pn_driver_t *d); +int pn_driver_wait_2(pn_driver_t *d, int timeout); +void pn_driver_wait_3(pn_driver_t *d); +// +// END TEMPORARY +// + +static void *thread_run(void *arg) +{ + dx_thread_t *thread = (dx_thread_t*) arg; + pn_connector_t *work; + pn_connection_t *conn; + dx_connection_t *ctx; + int error; + int poll_result; + int timer_holdoff = 0; + + if (!thread) + return 0; + + thread->running = 1; + + if (thread->canceled) + return 0; + + // + // Invoke the start handler if the application supplied one. + // This handler can be used to set NUMA or processor affinnity for the thread. + // + if (dx_server->start_handler) + dx_server->start_handler(dx_server->start_context, thread->thread_id); + + // + // Main Loop + // + while (thread->running) { + sys_mutex_lock(dx_server->lock); + + // + // Check for pending signals to process + // + handle_signals_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Check to see if the server is pausing. If so, block here. + // + block_if_paused_LH(); + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Service pending timers. + // + dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); + if (timer) { + DEQ_REMOVE_HEAD(dx_server->pending_timers); + + // + // Mark the timer as idle in case it reschedules itself. + // + dx_timer_idle_LH(timer); + + // + // Release the lock and invoke the connection handler. + // + sys_mutex_unlock(dx_server->lock); + timer->handler(timer->context); + pn_driver_wakeup(dx_server->driver); + continue; + } + + // + // Check the work queue for connectors scheduled for processing. + // + work = work_queue_get(dx_server->work_queue); + if (!work) { + // + // There is no pending work to do + // + if (dx_server->a_thread_is_waiting) { + // + // Another thread is waiting on the proton driver, this thread must + // wait on the condition variable until signaled. + // + sys_cond_wait(dx_server->cond, dx_server->lock); + } else { + // + // This thread elects itself to wait on the proton driver. Set the + // thread-is-waiting flag so other idle threads will not interfere. + // + dx_server->a_thread_is_waiting = true; + + // + // Ask the timer module when its next timer is scheduled to fire. We'll + // use this value in driver_wait as the timeout. If there are no scheduled + // timers, the returned value will be -1. + // + long duration = dx_timer_next_duration_LH(); + + // + // Invoke the proton driver's wait sequence. This is a bit of a hack for now + // and will be improved in the future. The wait process is divided into three parts, + // the first and third of which need to be non-reentrant, and the second of which + // must be reentrant (and blocks). + // + pn_driver_wait_1(dx_server->driver); + sys_mutex_unlock(dx_server->lock); + + do { + error = 0; + poll_result = pn_driver_wait_2(dx_server->driver, duration); + if (poll_result == -1) + error = pn_driver_errno(dx_server->driver); + } while (error == PN_INTR); + if (error) { + dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); + exit(-1); + } + + sys_mutex_lock(dx_server->lock); + pn_driver_wait_3(dx_server->driver); + + if (!thread->running) { + sys_mutex_unlock(dx_server->lock); + break; + } + + // + // Visit the timer module. + // + if (poll_result == 0 || ++timer_holdoff == 100) { + struct timespec tv; + clock_gettime(CLOCK_REALTIME, &tv); + long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; + dx_timer_visit_LH(milliseconds); + timer_holdoff = 0; + } + + // + // Process listeners (incoming connections). + // + thread_process_listeners(dx_server->driver); + + // + // Traverse the list of connectors-needing-service from the proton driver. + // If the connector is not already in the work queue and it is not currently + // being processed by another thread, put it in the work queue and signal the + // condition variable. + // + work = pn_driver_connector(dx_server->driver); + while (work) { + ctx = pn_connector_context(work); + if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->enqueued = 1; + work_queue_put(dx_server->work_queue, work); + sys_cond_signal(dx_server->cond); + } + work = pn_driver_connector(dx_server->driver); + } + + // + // Release our exclusive claim on pn_driver_wait. + // + dx_server->a_thread_is_waiting = false; + } + } + + // + // If we were given a connector to work on from the work queue, mark it as + // owned by this thread and as no longer enqueued. + // + if (work) { + ctx = pn_connector_context(work); + if (ctx->owner_thread == CONTEXT_NO_OWNER) { + ctx->owner_thread = thread->thread_id; + ctx->enqueued = 0; + dx_server->threads_active++; + } else { + // + // This connector is being processed by another thread, re-queue it. + // + work_queue_put(dx_server->work_queue, work); + work = 0; + } + } + sys_mutex_unlock(dx_server->lock); + + // + // Process the connector that we now have exclusive access to. + // + if (work) { + process_connector(work); + + // + // Check to see if the connector was closed during processing + // + if (pn_connector_closed(work)) { + // + // Connector is closed. Free the context and the connector. + // + conn = pn_connector_connection(work); + if (ctx->connector) { + ctx->connector->ctx = 0; + ctx->connector->state = CXTR_STATE_CONNECTING; + dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); + } + sys_mutex_lock(dx_server->lock); + free_dx_connection_t(ctx); + pn_connector_free(work); + if (conn) + pn_connection_free(conn); + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } else { + // + // The connector lives on. Mark it as no longer owned by this thread. + // + sys_mutex_lock(dx_server->lock); + ctx->owner_thread = CONTEXT_NO_OWNER; + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); + } + + // + // Wake up the proton driver to force it to reconsider its set of FDs + // in light of the processing that just occurred. + // + pn_driver_wakeup(dx_server->driver); + } + } + + return 0; +} + + +static void thread_start(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->using_thread = 1; + thread->thread = sys_thread(thread_run, (void*) thread); +} + + +static void thread_cancel(dx_thread_t *thread) +{ + if (!thread) + return; + + thread->running = 0; + thread->canceled = 1; +} + + +static void thread_join(dx_thread_t *thread) +{ + if (!thread) + return; + + if (thread->using_thread) + sys_thread_join(thread->thread); +} + + +static void thread_free(dx_thread_t *thread) +{ + if (!thread) + return; + + free(thread); +} + + +static void cxtr_try_open(void *context) +{ + dx_connector_t *ct = (dx_connector_t*) context; + if (ct->state != CXTR_STATE_CONNECTING) + return; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_CONNECTING; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = ct; + ctx->context = ct->context; + ctx->user_context = 0; + ctx->ufd = 0; + + // + // pn_connector is not thread safe + // + sys_mutex_lock(dx_server->lock); + ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); + sys_mutex_unlock(dx_server->lock); + + ct->ctx = ctx; + ct->delay = 5000; + dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); +} + + +void dx_server_initialize(int thread_count) +{ + int i; + + if (dx_server) + return; // TODO - Fail in a more dramatic way + + dx_alloc_initialize(); + dx_server = NEW(dx_server_t); + + if (!dx_server) + return; // TODO - Fail in a more dramatic way + + dx_server->thread_count = thread_count; + dx_server->driver = pn_driver(); + dx_server->start_handler = 0; + dx_server->conn_handler = 0; + dx_server->signal_handler = 0; + dx_server->ufd_handler = 0; + dx_server->start_context = 0; + dx_server->signal_context = 0; + dx_server->lock = sys_mutex(); + dx_server->cond = sys_cond(); + + dx_timer_initialize(dx_server->lock); + + dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); + for (i = 0; i < thread_count; i++) + dx_server->threads[i] = thread(i); + + dx_server->work_queue = work_queue(); + DEQ_INIT(dx_server->pending_timers); + dx_server->a_thread_is_waiting = false; + dx_server->threads_active = 0; + dx_server->pause_requests = 0; + dx_server->threads_paused = 0; + dx_server->pause_next_sequence = 0; + dx_server->pause_now_serving = 0; + dx_server->pending_signal = 0; +} + + +void dx_server_finalize(void) +{ + int i; + if (!dx_server) + return; + + for (i = 0; i < dx_server->thread_count; i++) + thread_free(dx_server->threads[i]); + + work_queue_free(dx_server->work_queue); + + pn_driver_free(dx_server->driver); + sys_mutex_free(dx_server->lock); + sys_cond_free(dx_server->cond); + free(dx_server); + dx_server = 0; +} + + +void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) +{ + dx_server->conn_handler = handler; +} + + +void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) +{ + dx_server->signal_handler = handler; + dx_server->signal_context = context; +} + + +void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) +{ + dx_server->start_handler = handler; + dx_server->start_context = context; +} + + +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) +{ + dx_server->ufd_handler = ufd_handler; +} + + +void dx_server_run(void) +{ + int i; + if (!dx_server) + return; + + assert(dx_server->conn_handler); // Server can't run without a connection handler. + + for (i = 1; i < dx_server->thread_count; i++) + thread_start(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); + + thread_run((void*) dx_server->threads[0]); + + for (i = 1; i < dx_server->thread_count; i++) + thread_join(dx_server->threads[i]); + + dx_log(module, LOG_INFO, "Shut Down"); +} + + +void dx_server_stop(void) +{ + int idx; + + sys_mutex_lock(dx_server->lock); + for (idx = 0; idx < dx_server->thread_count; idx++) + thread_cancel(dx_server->threads[idx]); + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_signal(int signum) +{ + signal(signum, signal_handler); +} + + +void dx_server_pause(void) +{ + sys_mutex_lock(dx_server->lock); + + // + // Bump the request count to stop all the threads. + // + dx_server->pause_requests++; + int my_sequence = dx_server->pause_next_sequence++; + + // + // Awaken all threads that are currently blocking. + // + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + + // + // Wait for the paused thread count plus the number of threads requesting a pause to equal + // the total thread count. Also, don't exit the blocking loop until now_serving equals our + // sequence number. This ensures that concurrent pausers don't run at the same time. + // + while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || + (my_sequence != dx_server->pause_now_serving)) + sys_cond_wait(dx_server->cond, dx_server->lock); + + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_resume(void) +{ + sys_mutex_lock(dx_server->lock); + dx_server->pause_requests--; + dx_server->pause_now_serving++; + sys_cond_signal_all(dx_server->cond); + sys_mutex_unlock(dx_server->lock); +} + + +void dx_server_activate(dx_connection_t *ctx) +{ + if (!ctx) + return; + + pn_connector_t *ctor = ctx->pn_cxtr; + if (!ctor) + return; + + if (!pn_connector_closed(ctor)) + pn_connector_activate(ctor, PN_CONNECTOR_WRITABLE); +} + + +void dx_connection_set_context(dx_connection_t *conn, void *context) +{ + conn->user_context = context; +} + + +void *dx_connection_get_context(dx_connection_t *conn) +{ + return conn->user_context; +} + + +pn_connection_t *dx_connection_pn(dx_connection_t *conn) +{ + return conn->pn_conn; +} + + +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) +{ + dx_listener_t *li = new_dx_listener_t(); + + if (!li) + return 0; + + li->config = config; + li->context = context; + li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); + + if (!li->pn_listener) { + dx_log(module, LOG_ERROR, "Driver Error %d (%s)", + pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); + free_dx_listener_t(li); + return 0; + } + dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); + + return li; +} + + +void dx_server_listener_free(dx_listener_t* li) +{ + pn_listener_free(li->pn_listener); + free_dx_listener_t(li); +} + + +void dx_server_listener_close(dx_listener_t* li) +{ + pn_listener_close(li->pn_listener); +} + + +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) +{ + dx_connector_t *ct = new_dx_connector_t(); + + if (!ct) + return 0; + + ct->state = CXTR_STATE_CONNECTING; + ct->config = config; + ct->context = context; + ct->ctx = 0; + ct->timer = dx_timer(cxtr_try_open, (void*) ct); + ct->delay = 0; + + dx_timer_schedule(ct->timer, ct->delay); + return ct; +} + + +void dx_server_connector_free(dx_connector_t* ct) +{ + // Don't free the proton connector. This will be done by the connector + // processing/cleanup. + + if (ct->ctx) { + pn_connector_close(ct->ctx->pn_cxtr); + ct->ctx->connector = 0; + } + + dx_timer_free(ct->timer); + free_dx_connector_t(ct); +} + + +dx_user_fd_t *dx_user_fd(int fd, void *context) +{ + dx_user_fd_t *ufd = new_dx_user_fd_t(); + + if (!ufd) + return 0; + + dx_connection_t *ctx = new_dx_connection_t(); + ctx->state = CONN_STATE_USER; + ctx->owner_thread = CONTEXT_NO_OWNER; + ctx->enqueued = 0; + ctx->pn_conn = 0; + ctx->listener = 0; + ctx->connector = 0; + ctx->context = 0; + ctx->user_context = 0; + ctx->ufd = ufd; + + ufd->context = context; + ufd->fd = fd; + ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); + pn_driver_wakeup(dx_server->driver); + + return ufd; +} + + +void dx_user_fd_free(dx_user_fd_t *ufd) +{ + pn_connector_close(ufd->pn_conn); + free_dx_user_fd_t(ufd); +} + + +void dx_user_fd_activate_read(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); + pn_driver_wakeup(dx_server->driver); +} + + +void dx_user_fd_activate_write(dx_user_fd_t *ufd) +{ + pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); + pn_driver_wakeup(dx_server->driver); +} + + +bool dx_user_fd_is_readable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); +} + + +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) +{ + return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); +} + + +void dx_server_timer_pending_LH(dx_timer_t *timer) +{ + DEQ_INSERT_TAIL(dx_server->pending_timers, timer); +} + + +void dx_server_timer_cancel_LH(dx_timer_t *timer) +{ + DEQ_REMOVE(dx_server->pending_timers, timer); +} + diff --git a/extras/dispatch/src/server_private.h b/extras/dispatch/src/server_private.h new file mode 100644 index 0000000000..1722175e35 --- /dev/null +++ b/extras/dispatch/src/server_private.h @@ -0,0 +1,96 @@ +#ifndef __server_private_h__ +#define __server_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/user_fd.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/alloc.h> +#include <proton/driver.h> +#include <proton/driver_extras.h> + +void dx_server_timer_pending_LH(dx_timer_t *timer); +void dx_server_timer_cancel_LH(dx_timer_t *timer); + + +typedef enum { + CONN_STATE_CONNECTING = 0, + CONN_STATE_SASL_CLIENT, + CONN_STATE_SASL_SERVER, + CONN_STATE_OPENING, + CONN_STATE_OPERATIONAL, + CONN_STATE_FAILED, + CONN_STATE_USER +} conn_state_t; + +#define CONTEXT_NO_OWNER -1 + +typedef enum { + CXTR_STATE_CONNECTING = 0, + CXTR_STATE_OPEN, + CXTR_STATE_FAILED +} cxtr_state_t; + + +struct dx_listener_t { + const dx_server_config_t *config; + void *context; + pn_listener_t *pn_listener; +}; + + +struct dx_connector_t { + cxtr_state_t state; + const dx_server_config_t *config; + void *context; + dx_connection_t *ctx; + dx_timer_t *timer; + long delay; +}; + + +struct dx_connection_t { + conn_state_t state; + int owner_thread; + int enqueued; + pn_connector_t *pn_cxtr; + pn_connection_t *pn_conn; + dx_listener_t *listener; + dx_connector_t *connector; + void *context; // Copy of context from listener or connector + void *user_context; + dx_user_fd_t *ufd; +}; + + +struct dx_user_fd_t { + void *context; + int fd; + pn_connector_t *pn_conn; +}; + + +ALLOC_DECLARE(dx_listener_t); +ALLOC_DECLARE(dx_connector_t); +ALLOC_DECLARE(dx_connection_t); +ALLOC_DECLARE(dx_user_fd_t); + + +#endif diff --git a/extras/dispatch/src/timer.c b/extras/dispatch/src/timer.c new file mode 100644 index 0000000000..b6b4864e26 --- /dev/null +++ b/extras/dispatch/src/timer.c @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "timer_private.h" +#include "server_private.h" +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/alloc.h> +#include <assert.h> +#include <stdio.h> + +static sys_mutex_t *lock; +static dx_timer_list_t idle_timers; +static dx_timer_list_t scheduled_timers; +static long time_base; + +ALLOC_DECLARE(dx_timer_t); +ALLOC_DEFINE(dx_timer_t); + +//========================================================================= +// Private static functions +//========================================================================= + +static void dx_timer_cancel_LH(dx_timer_t *timer) +{ + switch (timer->state) { + case TIMER_FREE: + assert(0); + break; + + case TIMER_IDLE: + break; + + case TIMER_SCHEDULED: + if (timer->next) + timer->next->delta_time += timer->delta_time; + DEQ_REMOVE(scheduled_timers, timer); + DEQ_INSERT_TAIL(idle_timers, timer); + break; + + case TIMER_PENDING: + dx_server_timer_cancel_LH(timer); + break; + } + + timer->state = TIMER_IDLE; +} + + +//========================================================================= +// Public Functions from timer.h +//========================================================================= + +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context) +{ + dx_timer_t *timer = new_dx_timer_t(); + if (!timer) + return 0; + + DEQ_ITEM_INIT(timer); + + timer->handler = cb; + timer->context = context; + timer->delta_time = 0; + timer->state = TIMER_IDLE; + + sys_mutex_lock(lock); + DEQ_INSERT_TAIL(idle_timers, timer); + sys_mutex_unlock(lock); + + return timer; +} + + +void dx_timer_free(dx_timer_t *timer) +{ + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); + DEQ_REMOVE(idle_timers, timer); + sys_mutex_unlock(lock); + + timer->state = TIMER_FREE; + free_dx_timer_t(timer); +} + + +void dx_timer_schedule(dx_timer_t *timer, long duration) +{ + dx_timer_t *ptr; + dx_timer_t *last; + long total_time; + + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); // Timer is now on the idle list + assert(timer->state == TIMER_IDLE); + DEQ_REMOVE(idle_timers, timer); + + // + // Handle the special case of a zero-time scheduling. In this case, + // the timer doesn't go on the scheduled list. It goes straight to the + // pending list in the server. + // + if (duration == 0) { + timer->state = TIMER_PENDING; + dx_server_timer_pending_LH(timer); + sys_mutex_unlock(lock); + return; + } + + // + // Find the insert point in the schedule. + // + total_time = 0; + ptr = DEQ_HEAD(scheduled_timers); + assert(!ptr || ptr->prev == 0); + while (ptr) { + total_time += ptr->delta_time; + if (total_time > duration) + break; + ptr = ptr->next; + } + + // + // Insert the timer into the schedule and adjust the delta time + // of the following timer if present. + // + if (total_time <= duration) { + assert(ptr == 0); + timer->delta_time = duration - total_time; + DEQ_INSERT_TAIL(scheduled_timers, timer); + } else { + total_time -= ptr->delta_time; + timer->delta_time = duration - total_time; + assert(ptr->delta_time > timer->delta_time); + ptr->delta_time -= timer->delta_time; + last = ptr->prev; + if (last) + DEQ_INSERT_AFTER(scheduled_timers, timer, last); + else + DEQ_INSERT_HEAD(scheduled_timers, timer); + } + + timer->state = TIMER_SCHEDULED; + + sys_mutex_unlock(lock); +} + + +void dx_timer_cancel(dx_timer_t *timer) +{ + sys_mutex_lock(lock); + dx_timer_cancel_LH(timer); + sys_mutex_unlock(lock); +} + + +//========================================================================= +// Private Functions from timer_private.h +//========================================================================= + +void dx_timer_initialize(sys_mutex_t *server_lock) +{ + lock = server_lock; + DEQ_INIT(idle_timers); + DEQ_INIT(scheduled_timers); + time_base = 0; +} + + +void dx_timer_finalize(void) +{ + lock = 0; +} + + +long dx_timer_next_duration_LH(void) +{ + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); + if (timer) + return timer->delta_time; + return -1; +} + + +void dx_timer_visit_LH(long current_time) +{ + long delta; + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); + + if (time_base == 0) { + time_base = current_time; + return; + } + + delta = current_time - time_base; + time_base = current_time; + + while (timer) { + assert(delta >= 0); + if (timer->delta_time > delta) { + timer->delta_time -= delta; + break; + } else { + DEQ_REMOVE_HEAD(scheduled_timers); + delta -= timer->delta_time; + timer->state = TIMER_PENDING; + dx_server_timer_pending_LH(timer); + + } + timer = DEQ_HEAD(scheduled_timers); + } +} + + +void dx_timer_idle_LH(dx_timer_t *timer) +{ + timer->state = TIMER_IDLE; + DEQ_INSERT_TAIL(idle_timers, timer); +} + diff --git a/extras/dispatch/src/timer_private.h b/extras/dispatch/src/timer_private.h new file mode 100644 index 0000000000..618297b18e --- /dev/null +++ b/extras/dispatch/src/timer_private.h @@ -0,0 +1,51 @@ +#ifndef __timer_private_h__ +#define __timer_private_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/threading.h> + +typedef enum { + TIMER_FREE, + TIMER_IDLE, + TIMER_SCHEDULED, + TIMER_PENDING +} dx_timer_state_t; + + +struct dx_timer_t { + DEQ_LINKS(dx_timer_t); + dx_timer_cb_t handler; + void *context; + long delta_time; + dx_timer_state_t state; +}; + +DEQ_DECLARE(dx_timer_t, dx_timer_list_t); + +void dx_timer_initialize(sys_mutex_t *server_lock); +void dx_timer_finalize(void); +long dx_timer_next_duration_LH(void); +void dx_timer_visit_LH(long current_time); +void dx_timer_idle_LH(dx_timer_t *timer); + + +#endif diff --git a/extras/dispatch/src/work_queue.c b/extras/dispatch/src/work_queue.c new file mode 100644 index 0000000000..4b3c5d7fa5 --- /dev/null +++ b/extras/dispatch/src/work_queue.c @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include "work_queue.h" +#include <string.h> +#include <stdio.h> + +#define BATCH_SIZE 100 +typedef struct work_item_t work_item_t; + +struct work_item_t { + DEQ_LINKS(work_item_t); + pn_connector_t *conn; +}; + +DEQ_DECLARE(work_item_t, work_list_t); + +struct work_queue_t { + work_list_t items; + work_list_t free_list; +}; + +static void allocate_batch(work_queue_t *w) +{ + int i; + work_item_t *batch = NEW_ARRAY(work_item_t, BATCH_SIZE); + if (!batch) + return; + + memset(batch, 0, sizeof(work_item_t) * BATCH_SIZE); + + for (i = 0; i < BATCH_SIZE; i++) + DEQ_INSERT_TAIL(w->free_list, &batch[i]); +} + + +work_queue_t *work_queue(void) +{ + work_queue_t *w = NEW(work_queue_t); + if (!w) + return 0; + + DEQ_INIT(w->items); + DEQ_INIT(w->free_list); + + allocate_batch(w); + + return w; +} + + +void work_queue_free(work_queue_t *w) +{ + if (!w) + return; + + // KEEP TRACK OF BATCHES AND FREE + free(w); +} + + +void work_queue_put(work_queue_t *w, pn_connector_t *conn) +{ + work_item_t *item; + + if (!w) + return; + if (DEQ_SIZE(w->free_list) == 0) + allocate_batch(w); + if (DEQ_SIZE(w->free_list) == 0) + return; + + item = DEQ_HEAD(w->free_list); + DEQ_REMOVE_HEAD(w->free_list); + + item->conn = conn; + + DEQ_INSERT_TAIL(w->items, item); +} + + +pn_connector_t *work_queue_get(work_queue_t *w) +{ + work_item_t *item; + pn_connector_t *conn; + + if (!w) + return 0; + item = DEQ_HEAD(w->items); + if (!item) + return 0; + + DEQ_REMOVE_HEAD(w->items); + conn = item->conn; + item->conn = 0; + + DEQ_INSERT_TAIL(w->free_list, item); + + return conn; +} + + +int work_queue_empty(work_queue_t *w) +{ + return !w || DEQ_SIZE(w->items) == 0; +} + + +int work_queue_depth(work_queue_t *w) +{ + if (!w) + return 0; + return DEQ_SIZE(w->items); +} + diff --git a/extras/dispatch/src/work_queue.h b/extras/dispatch/src/work_queue.h new file mode 100644 index 0000000000..597a484a9c --- /dev/null +++ b/extras/dispatch/src/work_queue.h @@ -0,0 +1,33 @@ +#ifndef __work_queue_h__ +#define __work_queue_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/driver.h> + +typedef struct work_queue_t work_queue_t; + +work_queue_t *work_queue(void); +void work_queue_free(work_queue_t *w); +void work_queue_put(work_queue_t *w, pn_connector_t *conn); +pn_connector_t *work_queue_get(work_queue_t *w); +int work_queue_empty(work_queue_t *w); +int work_queue_depth(work_queue_t *w); + +#endif diff --git a/extras/dispatch/tests/CMakeLists.txt b/extras/dispatch/tests/CMakeLists.txt new file mode 100644 index 0000000000..10bf1eb43a --- /dev/null +++ b/extras/dispatch/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +## +## Licensed to the Apache Software Foundation (ASF) under one +## or more contributor license agreements. See the NOTICE file +## distributed with this work for additional information +## regarding copyright ownership. The ASF licenses this file +## to you under the Apache License, Version 2.0 (the +## "License"); you may not use this file except in compliance +## with the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, +## software distributed under the License is distributed on an +## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +## KIND, either express or implied. See the License for the +## specific language governing permissions and limitations +## under the License. +## + +## +## Build test applications +## +set(test_SOURCES + alloc_test.c + message_test.c + run_tests.c + server_test.c + timer_test.c + tool_test.c + ) + +add_executable(run_tests ${test_SOURCES}) +target_link_libraries(run_tests qpid-dispatch) + diff --git a/extras/dispatch/tests/alloc_test.c b/extras/dispatch/tests/alloc_test.c new file mode 100644 index 0000000000..2406048209 --- /dev/null +++ b/extras/dispatch/tests/alloc_test.c @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "test_case.h" +#include <stdio.h> +#include <string.h> +#include "alloc_private.h" + +typedef struct { + int A; + int B; +} object_t; + +dx_alloc_config_t config = {3, 7, 10}; + +ALLOC_DECLARE(object_t); +ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config); + + +static char* check_stats(dx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg) +{ + if (stats->total_alloc_from_heap != ah) return "Incorrect alloc-from-heap"; + if (stats->total_free_to_heap != fh) return "Incorrect free-to-heap"; + if (stats->held_by_threads != ht) return "Incorrect held-by-threads"; + if (stats->batches_rebalanced_to_threads != rt) return "Incorrect rebalance-to-threads"; + if (stats->batches_rebalanced_to_global != rg) return "Incorrect rebalance-to-global"; + return 0; +} + + +static char* test_alloc_basic(void *context) +{ + object_t *obj[50]; + int idx; + dx_alloc_stats_t *stats; + char *error; + + for (idx = 0; idx < 20; idx++) + obj[idx] = new_object_t(); + + stats = alloc_stats_object_t(); + error = check_stats(stats, 21, 0, 21, 0, 0); + if (error) return error; + + for (idx = 0; idx < 20; idx++) + free_object_t(obj[idx]); + + error = check_stats(stats, 21, 5, 6, 0, 5); + if (error) return error; + + for (idx = 0; idx < 20; idx++) + obj[idx] = new_object_t(); + + error = check_stats(stats, 27, 5, 21, 3, 5); + if (error) return error; + + return 0; +} + + +int alloc_tests(void) +{ + int result = 0; + dx_alloc_initialize(); + + TEST_CASE(test_alloc_basic, 0); + + return result; +} + diff --git a/extras/dispatch/tests/message_test.c b/extras/dispatch/tests/message_test.c new file mode 100644 index 0000000000..590b7f6ed7 --- /dev/null +++ b/extras/dispatch/tests/message_test.c @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "test_case.h" +#include <stdio.h> +#include <string.h> +#include "message_private.h" +#include <qpid/dispatch/iterator.h> +#include <proton/message.h> + + +static char* test_send_to_messenger(void *context) +{ + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); + + dx_message_compose_1(msg, "test_addr_0", 0); + dx_buffer_t *buf = DEQ_HEAD(content->buffers); + if (buf == 0) return "Expected a buffer in the test message"; + + pn_message_t *pn_msg = pn_message(); + int result = pn_message_decode(pn_msg, (const char*) dx_buffer_base(buf), dx_buffer_size(buf)); + if (result != 0) return "Error in pn_message_decode"; + + if (strcmp(pn_message_get_address(pn_msg), "test_addr_0") != 0) + return "Address mismatch in received message"; + + pn_message_free(pn_msg); + dx_free_message(msg); + + return 0; +} + + +static char* test_receive_from_messenger(void *context) +{ + pn_message_t *pn_msg = pn_message(); + pn_message_set_address(pn_msg, "test_addr_1"); + + dx_buffer_t *buf = dx_allocate_buffer(); + size_t size = dx_buffer_capacity(buf); + int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size); + if (result != 0) return "Error in pn_message_encode"; + dx_buffer_insert(buf, size); + + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); + + DEQ_INSERT_TAIL(content->buffers, buf); + int valid = dx_message_check(msg, DX_DEPTH_ALL); + if (!valid) return "dx_message_check returns 'invalid'"; + + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); + if (iter == 0) return "Expected an iterator for the 'to' field"; + + if (!dx_field_iterator_equal(iter, (unsigned char*) "test_addr_1")) + return "Mismatched 'to' field contents"; + + pn_message_free(pn_msg); + dx_free_message(msg); + + return 0; +} + + +static char* test_insufficient_check_depth(void *context) +{ + pn_message_t *pn_msg = pn_message(); + pn_message_set_address(pn_msg, "test_addr_2"); + + dx_buffer_t *buf = dx_allocate_buffer(); + size_t size = dx_buffer_capacity(buf); + int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size); + if (result != 0) return "Error in pn_message_encode"; + dx_buffer_insert(buf, size); + + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); + + DEQ_INSERT_TAIL(content->buffers, buf); + int valid = dx_message_check(msg, DX_DEPTH_DELIVERY_ANNOTATIONS); + if (!valid) return "dx_message_check returns 'invalid'"; + + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); + if (iter) return "Expected no iterator for the 'to' field"; + + dx_free_message(msg); + + return 0; +} + + +int message_tests(void) +{ + int result = 0; + + TEST_CASE(test_send_to_messenger, 0); + TEST_CASE(test_receive_from_messenger, 0); + TEST_CASE(test_insufficient_check_depth, 0); + + return result; +} + diff --git a/extras/dispatch/tests/run_tests.c b/extras/dispatch/tests/run_tests.c new file mode 100644 index 0000000000..a677c04577 --- /dev/null +++ b/extras/dispatch/tests/run_tests.c @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +int tool_tests(); +int timer_tests(); +int alloc_tests(); +int server_tests(); +int message_tests(); + +int main(int argc, char** argv) +{ + int result = 0; + result += tool_tests(); + result += timer_tests(); + result += alloc_tests(); + result += server_tests(); + result += message_tests(); + return result; +} + diff --git a/extras/dispatch/tests/server_test.c b/extras/dispatch/tests/server_test.c new file mode 100644 index 0000000000..adeab62af9 --- /dev/null +++ b/extras/dispatch/tests/server_test.c @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#define _GNU_SOURCE +#include <stdio.h> +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <assert.h> +#include <qpid/dispatch/timer.h> +#include "test_case.h" +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/user_fd.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> + +#define THREAD_COUNT 4 +#define OCTET_COUNT 100 + +static sys_mutex_t *test_lock; + +static void *expected_context; +static int call_count; +static int threads_seen[THREAD_COUNT]; +static char stored_error[512]; + +static int write_count; +static int read_count; +static int fd[2]; +static dx_user_fd_t *ufd_write; +static dx_user_fd_t *ufd_read; + + +static void thread_start(void *context, int thread_id) +{ + sys_mutex_lock(test_lock); + if (context != expected_context && !stored_error[0]) + sprintf(stored_error, "Unexpected Context Value: %lx", (long) context); + if (thread_id >= THREAD_COUNT && !stored_error[0]) + sprintf(stored_error, "Thread_ID too large: %d", thread_id); + if (thread_id < 0 && !stored_error[0]) + sprintf(stored_error, "Thread_ID negative: %d", thread_id); + + call_count++; + if (thread_id >= 0 && thread_id < THREAD_COUNT) + threads_seen[thread_id]++; + + if (call_count == THREAD_COUNT) + dx_server_stop(); + sys_mutex_unlock(test_lock); +} + + +static int conn_handler(void *context, dx_conn_event_t event, dx_connection_t *conn) +{ + return 0; +} + + +static void ufd_handler(void *context, dx_user_fd_t *ufd) +{ + long dir = (long) context; + char buffer; + ssize_t len; + static int in_read = 0; + static int in_write = 0; + + if (dir == 0) { // READ + in_read++; + assert(in_read == 1); + if (!dx_user_fd_is_readable(ufd_read)) { + sprintf(stored_error, "Expected Readable"); + dx_server_stop(); + } else { + len = read(fd[0], &buffer, 1); + if (len == 1) { + read_count++; + if (read_count == OCTET_COUNT) + dx_server_stop(); + } + dx_user_fd_activate_read(ufd_read); + } + in_read--; + } else { // WRITE + in_write++; + assert(in_write == 1); + if (!dx_user_fd_is_writeable(ufd_write)) { + sprintf(stored_error, "Expected Writable"); + dx_server_stop(); + } else { + write(fd[1], "X", 1); + + write_count++; + if (write_count < OCTET_COUNT) + dx_user_fd_activate_write(ufd_write); + } + in_write--; + } +} + + +static void fd_test_start(void *context) +{ + dx_user_fd_activate_read(ufd_read); +} + + +static char* test_start_handler(void *context) +{ + int i; + + dx_server_initialize(THREAD_COUNT); + + expected_context = (void*) 0x00112233; + stored_error[0] = 0x0; + call_count = 0; + for (i = 0; i < THREAD_COUNT; i++) + threads_seen[i] = 0; + + dx_server_set_conn_handler(conn_handler); + dx_server_set_start_handler(thread_start, expected_context); + dx_server_run(); + dx_server_finalize(); + + if (stored_error[0]) return stored_error; + if (call_count != THREAD_COUNT) return "Incorrect number of thread-start callbacks"; + for (i = 0; i < THREAD_COUNT; i++) + if (threads_seen[i] != 1) return "Incorrect count on one thread ID"; + + return 0; +} + + +static char* test_user_fd(void *context) +{ + int res; + dx_timer_t *timer; + + dx_server_initialize(THREAD_COUNT); + dx_server_set_conn_handler(conn_handler); + dx_server_set_user_fd_handler(ufd_handler); + timer = dx_timer(fd_test_start, 0); + dx_timer_schedule(timer, 0); + + stored_error[0] = 0x0; + res = pipe2(fd, O_NONBLOCK); + if (res != 0) return "Error creating pipe2"; + + ufd_write = dx_user_fd(fd[1], (void*) 1); + ufd_read = dx_user_fd(fd[0], (void*) 0); + + dx_server_run(); + dx_timer_free(timer); + dx_server_finalize(); + close(fd[0]); + close(fd[1]); + + if (stored_error[0]) return stored_error; + if (write_count - OCTET_COUNT > 2) sprintf(stored_error, "Excessively high Write Count: %d", write_count); + if (read_count != OCTET_COUNT) sprintf(stored_error, "Incorrect Read Count: %d", read_count);; + + if (stored_error[0]) return stored_error; + return 0; +} + + +int server_tests(void) +{ + int result = 0; + test_lock = sys_mutex(); + dx_log_set_mask(LOG_NONE); + + TEST_CASE(test_start_handler, 0); + TEST_CASE(test_user_fd, 0); + + sys_mutex_free(test_lock); + return result; +} + diff --git a/extras/dispatch/tests/test_case.h b/extras/dispatch/tests/test_case.h new file mode 100644 index 0000000000..6e36b440a5 --- /dev/null +++ b/extras/dispatch/tests/test_case.h @@ -0,0 +1,36 @@ +#ifndef _nexus_test_case_h_ +#define _nexus_test_case_h_ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef char* (*testcase_t)(void *context); + +#define TEST_CASE(T,C) do { \ + char *r = T(C); \ + printf("Test Case %s.%s: ", __FUNCTION__, #T); \ + if (r) { \ + printf("FAIL: %s\n", r); \ + result++; \ + } else \ + printf("PASS\n"); \ +} while(0); + + +#endif + diff --git a/extras/dispatch/tests/timer_test.c b/extras/dispatch/tests/timer_test.c new file mode 100644 index 0000000000..3d199f2aa2 --- /dev/null +++ b/extras/dispatch/tests/timer_test.c @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdio.h> +#include <qpid/dispatch/timer.h> +#include "alloc_private.h" +#include "timer_private.h" +#include "test_case.h" +#include <qpid/dispatch/threading.h> + + +static unsigned long fire_mask; +static dx_timer_list_t pending_timers; +static sys_mutex_t *lock; +static long time; +static dx_timer_t *timers[16]; + + +void dx_server_timer_pending_LH(dx_timer_t *timer) +{ + DEQ_INSERT_TAIL(pending_timers, timer); +} + + +void dx_server_timer_cancel_LH(dx_timer_t *timer) +{ + if (timer->state == TIMER_PENDING) + DEQ_REMOVE(pending_timers, timer); +} + + +static int fire_head() +{ + sys_mutex_lock(lock); + int result = DEQ_SIZE(pending_timers); + dx_timer_t *timer = DEQ_HEAD(pending_timers); + if (timer) { + DEQ_REMOVE_HEAD(pending_timers); + dx_timer_idle_LH(timer); + fire_mask |= (unsigned long) timer->context; + } + sys_mutex_unlock(lock); + return result; +} + + +static char* test_quiet(void *context) +{ + fire_mask = 0; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + + while(fire_head()); + + if (fire_mask != 0) + return "Expected zero timers fired"; + return 0; +} + +static char* test_immediate(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 0); + + if (fire_mask != 0) return "Premature firing"; + if (fire_head() > 1) return "Too many firings"; + if (fire_mask != 1) return "Incorrect fire mask"; + + return 0; +} + + +static char* test_immediate_plus_delayed(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 0); + dx_timer_schedule(timers[1], 5); + + if (fire_mask != 0) return "Premature firing"; + if (fire_head() > 1) return "Too many firings"; + if (fire_mask != 1) return "Incorrect fire mask 1"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + time += 8; + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + + if (fire_head() < 1) return "Delayed Failed to fire"; + if (fire_mask != 3) return "Incorrect fire mask 3"; + + return 0; +} + + +static char* test_single(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 2); + if (fire_head() > 0) return "Premature firing 1"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() > 0) return "Premature firing 2"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() < 1) return "Failed to fire"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() != 0) return "Spurious fires"; + + if (fire_mask != 1) return "Incorrect fire mask"; + if (timers[0]->state != TIMER_IDLE) return "Expected idle timer state"; + + return 0; +} + + +static char* test_two_inorder(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 4); + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + int count = fire_head(); + if (count < 1) return "First failed to fire"; + if (count > 1) return "Second fired prematurely"; + if (fire_mask != 1) return "Incorrect fire mask 1"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() < 1) return "Second failed to fire"; + if (fire_mask != 3) return "Incorrect fire mask 3"; + + return 0; +} + + +static char* test_two_reverse(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 4); + dx_timer_schedule(timers[1], 2); + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + int count = fire_head(); + if (count < 1) return "First failed to fire"; + if (count > 1) return "Second fired prematurely"; + if (fire_mask != 2) return "Incorrect fire mask 2"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() < 1) return "Second failed to fire"; + if (fire_mask != 3) return "Incorrect fire mask 3"; + + return 0; +} + + +static char* test_two_duplicate(void *context) +{ + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 2); + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + int count = fire_head(); + if (count != 2) return "Expected two firings"; + fire_head(); + if (fire_mask != 3) return "Incorrect fire mask 3"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + if (fire_head() > 0) return "Spurious timer fires"; + + return 0; +} + + +static char* test_separated(void *context) +{ + int count; + + while(fire_head()); + fire_mask = 0; + + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 4); + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + count = fire_head(); + if (count < 1) return "First failed to fire"; + if (count > 1) return "Second fired prematurely"; + if (fire_mask != 1) return "Incorrect fire mask 1"; + + dx_timer_schedule(timers[2], 2); + dx_timer_schedule(timers[3], 4); + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + count = fire_head(); + fire_head(); + if (count < 1) return "Second failed to fire"; + if (count < 2) return "Third failed to fire"; + if (fire_mask != 7) return "Incorrect fire mask 7"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + count = fire_head(); + if (count < 1) return "Fourth failed to fire"; + if (fire_mask != 15) return "Incorrect fire mask 15"; + + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + count = fire_head(); + if (count > 0) return "Spurious fire"; + + return 0; +} + + +static char* test_big(void *context) +{ + while(fire_head()); + fire_mask = 0; + + long durations[16] = + { 5, 8, 7, 6, + 14, 10, 16, 15, + 11, 12, 9, 12, + 1, 2, 3, 4}; + unsigned long masks[18] = { + 0x1000, + 0x3000, + 0x7000, + 0xf000, + 0xf001, + 0xf009, + 0xf00d, + 0xf00f, + 0xf40f, + 0xf42f, + 0xf52f, + 0xff2f, + 0xff2f, + 0xff3f, + 0xffbf, + 0xffff, + 0xffff, + 0xffff + }; + + int i; + for (i = 0; i < 16; i++) + dx_timer_schedule(timers[i], durations[i]); + for (i = 0; i < 18; i++) { + sys_mutex_lock(lock); + dx_timer_visit_LH(time++); + sys_mutex_unlock(lock); + while(fire_head()); + if (fire_mask != masks[i]) { + static char error[100]; + sprintf(error, "Iteration %d: expected mask %04lx, got %04lx", i, masks[i], fire_mask); + return error; + } + } + + return 0; +} + + +int timer_tests(void) +{ + int result = 0; + dx_alloc_initialize(); + + fire_mask = 0; + DEQ_INIT(pending_timers); + lock = sys_mutex(); + dx_timer_initialize(lock); + time = 1; + + timers[0] = dx_timer(0, (void*) 0x00000001); + timers[1] = dx_timer(0, (void*) 0x00000002); + timers[2] = dx_timer(0, (void*) 0x00000004); + timers[3] = dx_timer(0, (void*) 0x00000008); + timers[4] = dx_timer(0, (void*) 0x00000010); + timers[5] = dx_timer(0, (void*) 0x00000020); + timers[6] = dx_timer(0, (void*) 0x00000040); + timers[7] = dx_timer(0, (void*) 0x00000080); + timers[8] = dx_timer(0, (void*) 0x00000100); + timers[9] = dx_timer(0, (void*) 0x00000200); + timers[10] = dx_timer(0, (void*) 0x00000400); + timers[11] = dx_timer(0, (void*) 0x00000800); + timers[12] = dx_timer(0, (void*) 0x00001000); + timers[13] = dx_timer(0, (void*) 0x00002000); + timers[14] = dx_timer(0, (void*) 0x00004000); + timers[15] = dx_timer(0, (void*) 0x00008000); + + TEST_CASE(test_quiet, 0); + TEST_CASE(test_immediate, 0); + TEST_CASE(test_immediate_plus_delayed, 0); + TEST_CASE(test_single, 0); + TEST_CASE(test_two_inorder, 0); + TEST_CASE(test_two_reverse, 0); + TEST_CASE(test_two_duplicate, 0); + TEST_CASE(test_separated, 0); + TEST_CASE(test_big, 0); + + int i; + for (i = 0; i < 16; i++) + dx_timer_free(timers[i]); + + dx_timer_finalize(); + + return result; +} + diff --git a/extras/dispatch/tests/tool_test.c b/extras/dispatch/tests/tool_test.c new file mode 100644 index 0000000000..7923ee3381 --- /dev/null +++ b/extras/dispatch/tests/tool_test.c @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "test_case.h" +#include <stdio.h> +#include <string.h> +#include <qpid/dispatch/ctools.h> + +typedef struct item_t { + DEQ_LINKS(struct item_t); + char letter; +} item_t; + +DEQ_DECLARE(item_t, item_list_t); + + +static char* list_well_formed(item_list_t list, char *key) +{ + item_t *ptr; + item_t *last = 0; + int size = DEQ_SIZE(list); + int count = 0; + char str[32]; + + ptr = DEQ_HEAD(list); + while (ptr) { + str[count] = ptr->letter; + count++; + if (DEQ_PREV(ptr) != last) return "Corrupt previous link"; + last = ptr; + ptr = DEQ_NEXT(ptr); + } + str[count] = '\0'; + if (strcmp(str, key) != 0) return "Invalid key"; + + if (count != size) return "Size different from number of items (forward)"; + + count = 0; + last = 0; + ptr = DEQ_TAIL(list); + while (ptr) { + count++; + if (DEQ_NEXT(ptr) != last) return "Corrupt next link"; + last = ptr; + ptr = DEQ_PREV(ptr); + } + + if (count != size) return "Size different from number of items (backward)"; + + return 0; +} + + +static char* test_deq_basic(void *context) +{ + item_list_t list; + item_t item[10]; + item_t *ptr; + int idx; + char *subtest; + + DEQ_INIT(list); + if (DEQ_SIZE(list) != 0) return "Expected zero initial size"; + + for (idx = 0; idx < 10; idx++) { + DEQ_ITEM_INIT(&item[idx]); + item[idx].letter = 'A' + idx; + DEQ_INSERT_TAIL(list, &item[idx]); + } + if (DEQ_SIZE(list) != 10) return "Expected 10 items in list"; + + ptr = DEQ_HEAD(list); + if (!ptr) return "Expected valid head item"; + if (DEQ_PREV(ptr)) return "Head item has non-null previous link"; + if (ptr->letter != 'A') return "Expected item A at the head"; + if (DEQ_NEXT(ptr) == 0) return "Head item has null next link"; + subtest = list_well_formed(list, "ABCDEFGHIJ"); + if (subtest) return subtest; + + DEQ_REMOVE_HEAD(list); + if (DEQ_SIZE(list) != 9) return "Expected 9 items in list"; + ptr = DEQ_HEAD(list); + if (ptr->letter != 'B') return "Expected item B at the head"; + subtest = list_well_formed(list, "BCDEFGHIJ"); + if (subtest) return subtest; + + DEQ_REMOVE_TAIL(list); + if (DEQ_SIZE(list) != 8) return "Expected 8 items in list"; + ptr = DEQ_TAIL(list); + if (ptr->letter != 'I') return "Expected item I at the tail"; + subtest = list_well_formed(list, "BCDEFGHI"); + if (subtest) return subtest; + + DEQ_REMOVE(list, &item[4]); + if (DEQ_SIZE(list) != 7) return "Expected 7 items in list"; + subtest = list_well_formed(list, "BCDFGHI"); + if (subtest) return subtest; + + DEQ_REMOVE(list, &item[1]); + if (DEQ_SIZE(list) != 6) return "Expected 6 items in list"; + subtest = list_well_formed(list, "CDFGHI"); + if (subtest) return subtest; + + DEQ_REMOVE(list, &item[8]); + if (DEQ_SIZE(list) != 5) return "Expected 5 items in list"; + subtest = list_well_formed(list, "CDFGH"); + if (subtest) return subtest; + + DEQ_INSERT_HEAD(list, &item[8]); + if (DEQ_SIZE(list) != 6) return "Expected 6 items in list"; + ptr = DEQ_HEAD(list); + if (ptr->letter != 'I') return "Expected item I at the head"; + subtest = list_well_formed(list, "ICDFGH"); + if (subtest) return subtest; + + DEQ_INSERT_AFTER(list, &item[4], &item[7]); + if (DEQ_SIZE(list) != 7) return "Expected 7 items in list"; + ptr = DEQ_TAIL(list); + if (ptr->letter != 'E') return "Expected item E at the head"; + subtest = list_well_formed(list, "ICDFGHE"); + if (subtest) return subtest; + + DEQ_INSERT_AFTER(list, &item[1], &item[5]); + if (DEQ_SIZE(list) != 8) return "Expected 8 items in list"; + subtest = list_well_formed(list, "ICDFBGHE"); + if (subtest) return subtest; + + if (item[0].prev || item[0].next) return "Unlisted item A has non-null pointers"; + if (item[9].prev || item[9].next) return "Unlisted item J has non-null pointers"; + + return 0; +} + + +int tool_tests(void) +{ + int result = 0; + + TEST_CASE(test_deq_basic, 0); + + return result; +} + diff --git a/extras/qmf/setup.py b/extras/qmf/setup.py index fcfd2f8a30..db62ddba99 100755 --- a/extras/qmf/setup.py +++ b/extras/qmf/setup.py @@ -20,7 +20,7 @@ from distutils.core import setup setup(name="qpid-qmf", - version="0.19", + version="0.21", author="Apache Qpid", author_email="dev@qpid.apache.org", packages=["qmf"], diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index af5d1da5ca..0a30176ed5 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -25,6 +25,7 @@ import qpid import struct import socket import re +import sys from qpid.datatypes import UUID from qpid.datatypes import timestamp from qpid.datatypes import datetime @@ -2423,11 +2424,21 @@ class Broker(Thread): oldTimeout = sock.gettimeout() sock.settimeout(self.connTimeout) connSock = None + force_blocking = False if self.ssl: + # Bug (QPID-4337): the "old" implementation of python SSL + # fails if the socket is set to non-blocking (which settimeout() + # may change). + if sys.version_info[:2] < (2, 6): # 2.6+ uses openssl - it's ok + force_blocking = True + sock.setblocking(1) + certfile = None if 'ssl_certfile' in self.connectArgs: - connSock = ssl(sock, certfile=self.connectArgs['ssl_certfile']) - else: - connSock = ssl(sock) + certfile = self.connectArgs['ssl_certfile'] + keyfile = None + if 'ssl_keyfile' in self.connectArgs: + keyfile = self.connectArgs['ssl_keyfile'] + connSock = ssl(sock, certfile=certfile, keyfile=keyfile) else: connSock = sock self.conn = Connection(connSock, username=self.authUser, password=self.authPass, @@ -2438,7 +2449,10 @@ class Broker(Thread): oldAborted = self.conn.aborted self.conn.aborted = aborted self.conn.start() - sock.settimeout(oldTimeout) + + # Bug (QPID-4337): don't enable non-blocking (timeouts) for old SSL + if not force_blocking: + sock.settimeout(oldTimeout) self.conn.aborted = oldAborted uid = self.conn.user_id if uid.__class__ == tuple and len(uid) == 2: |