diff options
Diffstat (limited to 'extras/dispatch/include')
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/agent.h | 108 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/alloc.h | 72 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/buffer.h | 79 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/container.h | 129 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/ctools.h | 146 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/hash.h | 37 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/iovec.h | 32 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/iterator.h | 113 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/log.h | 31 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/message.h | 165 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/router.h | 35 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/server.h | 403 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/threading.h | 45 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/timer.h | 86 | ||||
-rw-r--r-- | extras/dispatch/include/qpid/dispatch/user_fd.h | 121 |
15 files changed, 1602 insertions, 0 deletions
diff --git a/extras/dispatch/include/qpid/dispatch/agent.h b/extras/dispatch/include/qpid/dispatch/agent.h new file mode 100644 index 0000000000..d53d24d4d4 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/agent.h @@ -0,0 +1,108 @@ +#ifndef __dispatch_agent_h__ +#define __dispatch_agent_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stddef.h> +#include <stdbool.h> +#include <stdint.h> + +/** + * \defgroup Container Management Agent + * @{ + */ + +typedef struct dx_agent_class_t dx_agent_class_t; + + +/** + * \brief Get Schema Data Handler + * + * @param context The handler context supplied in dx_agent_register. + */ +typedef void (*dx_agent_schema_cb_t)(void* context); + + +/** + * \brief Query Handler + * + * @param context The handler context supplied in dx_agent_register. + * @param id The identifier of the instance being queried or NULL for all instances. + * @param correlator The correlation handle to be used in calls to dx_agent_value_* + */ +typedef void (*dx_agent_query_cb_t)(void* context, const char *id, const void *correlator); + + +/** + * \brief Initialize the agent module and prepare it for operation. + * + */ +void dx_agent_initialize(); + + +/** + * \brief Finalize the agent after it has stopped running. + */ +void dx_agent_finalize(void); + + +/** + * \brief Register a class/object-type with the agent. + */ +dx_agent_class_t *dx_agent_register_class(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler); + +/** + * \brief Register an event-type with the agent. + */ +dx_agent_class_t *dx_agent_register_event(const char *fqname, + void *context, + dx_agent_schema_cb_t schema_handler); + +/** + * + */ +void dx_agent_value_string(const void *correlator, const char *key, const char *value); +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value); +void dx_agent_value_null(const void *correlator, const char *key); +void dx_agent_value_boolean(const void *correlator, const char *key, bool value); +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len); +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value); +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value); + + +/** + * + */ +void dx_agent_value_complete(const void *correlator, bool more); + + +/** + * + */ +void *dx_agent_raise_event(dx_agent_class_t *event); + + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/alloc.h b/extras/dispatch/include/qpid/dispatch/alloc.h new file mode 100644 index 0000000000..ae4190ad89 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/alloc.h @@ -0,0 +1,72 @@ +#ifndef __dispatch_alloc_h__ +#define __dispatch_alloc_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <stdint.h> +#include <qpid/dispatch/threading.h> + +typedef struct dx_alloc_pool_t dx_alloc_pool_t; + +typedef struct { + int transfer_batch_size; + int local_free_list_max; + int global_free_list_max; +} dx_alloc_config_t; + +typedef struct { + uint64_t total_alloc_from_heap; + uint64_t total_free_to_heap; + uint64_t held_by_threads; + uint64_t batches_rebalanced_to_threads; + uint64_t batches_rebalanced_to_global; +} dx_alloc_stats_t; + +typedef struct { + char *type_name; + size_t type_size; + size_t *additional_size; + size_t total_size; + dx_alloc_config_t *config; + dx_alloc_stats_t *stats; + dx_alloc_pool_t *global_pool; + sys_mutex_t *lock; +} dx_alloc_type_desc_t; + + +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool); +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p); + + +#define ALLOC_DECLARE(T) \ + T *new_##T(); \ + void free_##T(T *p) + +#define ALLOC_DEFINE_CONFIG(T,S,A,C) \ + dx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \ + __thread dx_alloc_pool_t *__local_pool_##T = 0; \ + T *new_##T() { return (T*) dx_alloc(&__desc_##T, &__local_pool_##T); } \ + void free_##T(T *p) { dx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ + dx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } + +#define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) + + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/buffer.h b/extras/dispatch/include/qpid/dispatch/buffer.h new file mode 100644 index 0000000000..1c372b265d --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/buffer.h @@ -0,0 +1,79 @@ +#ifndef __dispatch_buffer_h__ +#define __dispatch_buffer_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> + +typedef struct dx_buffer_t dx_buffer_t; + +DEQ_DECLARE(dx_buffer_t, dx_buffer_list_t); + +struct dx_buffer_t { + DEQ_LINKS(dx_buffer_t); + unsigned int size; +}; + +/** + */ +void dx_buffer_set_size(size_t size); + +/** + */ +dx_buffer_t *dx_allocate_buffer(void); + +/** + * @param buf A pointer to an allocated buffer + */ +void dx_free_buffer(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first octet in the buffer + */ +unsigned char *dx_buffer_base(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return A pointer to the first free octet in the buffer, the insert point for new data. + */ +unsigned char *dx_buffer_cursor(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets in the buffer's free space, how many octets may be inserted. + */ +size_t dx_buffer_capacity(dx_buffer_t *buf); + +/** + * @param buf A pointer to an allocated buffer + * @return The number of octets of data in the buffer + */ +size_t dx_buffer_size(dx_buffer_t *buf); + +/** + * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the + * cursor by len octets. + * + * @param buf A pointer to an allocated buffer + * @param len The number of octets that have been appended to the buffer + */ +void dx_buffer_insert(dx_buffer_t *buf, size_t len); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/container.h b/extras/dispatch/include/qpid/dispatch/container.h new file mode 100644 index 0000000000..01a24fbbef --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/container.h @@ -0,0 +1,129 @@ +#ifndef __dispatch_container_h__ +#define __dispatch_container_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> + +typedef uint8_t dx_dist_mode_t; +#define DX_DIST_COPY 0x01 +#define DX_DIST_MOVE 0x02 +#define DX_DIST_BOTH 0x03 + +/** + * Node Lifetime Policy (see AMQP 3.5.9) + */ +typedef enum { + DX_LIFE_PERMANENT, + DX_LIFE_DELETE_CLOSE, + DX_LIFE_DELETE_NO_LINKS, + DX_LIFE_DELETE_NO_MESSAGES, + DX_LIFE_DELETE_NO_LINKS_MESSAGES +} dx_lifetime_policy_t; + + +/** + * Link Direction + */ +typedef enum { + DX_INCOMING, + DX_OUTGOING +} dx_direction_t; + + +typedef struct dx_node_t dx_node_t; +typedef struct dx_link_t dx_link_t; + +typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery); +typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link); +typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed); +typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node); +typedef void (*dx_container_conn_handler_t) (void *type_context, dx_connection_t *conn); + +typedef struct { + char *type_name; + void *type_context; + int allow_dynamic_creation; + + // + // Node-Instance Handlers + // + dx_container_delivery_handler_t rx_handler; + dx_container_delivery_handler_t tx_handler; + dx_container_delivery_handler_t disp_handler; + dx_container_link_handler_t incoming_handler; + dx_container_link_handler_t outgoing_handler; + dx_container_link_handler_t writable_handler; + dx_container_link_detach_handler_t link_detach_handler; + + // + // Node-Type Handlers + // + dx_container_node_handler_t node_created_handler; + dx_container_node_handler_t node_destroyed_handler; + dx_container_conn_handler_t inbound_conn_open_handler; + dx_container_conn_handler_t outbound_conn_open_handler; +} dx_node_type_t; + +void dx_container_initialize(void); +void dx_container_finalize(void); + +int dx_container_register_node_type(const dx_node_type_t *nt); + +void dx_container_set_default_node_type(const dx_node_type_t *nt, + void *node_context, + dx_dist_mode_t supported_dist); + +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, + const char *name, + void *node_context, + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy); +void dx_container_destroy_node(dx_node_t *node); + +void dx_container_node_set_context(dx_node_t *node, void *node_context); +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node); +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node); + +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name); +void dx_link_set_context(dx_link_t *link, void *link_context); +void *dx_link_get_context(dx_link_t *link); +pn_link_t *dx_link_pn(dx_link_t *link); +pn_terminus_t *dx_link_source(dx_link_t *link); +pn_terminus_t *dx_link_target(dx_link_t *link); +pn_terminus_t *dx_link_remote_source(dx_link_t *link); +pn_terminus_t *dx_link_remote_target(dx_link_t *link); +void dx_link_activate(dx_link_t *link); +void dx_link_close(dx_link_t *link); + + +typedef struct dx_link_item_t dx_link_item_t; + +struct dx_link_item_t { + DEQ_LINKS(dx_link_item_t); + dx_link_t *link; +}; + +ALLOC_DECLARE(dx_link_item_t); +DEQ_DECLARE(dx_link_item_t, dx_link_list_t); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/ctools.h b/extras/dispatch/include/qpid/dispatch/ctools.h new file mode 100644 index 0000000000..33178a23ee --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/ctools.h @@ -0,0 +1,146 @@ +#ifndef __dispatch_ctools_h__ +#define __dispatch_ctools_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <assert.h> + +#define CT_ASSERT(exp) { assert(exp); } + +#define NEW(t) (t*) malloc(sizeof(t)) +#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n)) +#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n)) + +#define DEQ_DECLARE(i,d) typedef struct { \ + i *head; \ + i *tail; \ + i *scratch; \ + size_t size; \ + } d + +#define DEQ_LINKS(t) t *prev; t *next + +#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0) +#define DEQ_ITEM_INIT(i) do { (i)->next = 0; (i)->prev = 0; } while(0) +#define DEQ_HEAD(d) ((d).head) +#define DEQ_TAIL(d) ((d).tail) +#define DEQ_SIZE(d) ((d).size) +#define DEQ_NEXT(i) (i)->next +#define DEQ_PREV(i) (i)->prev + +#define DEQ_INSERT_HEAD(d,i) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((d).head) { \ + (i)->next = (d).head; \ + (d).head->prev = i; \ + } else { \ + (d).tail = i; \ + (i)->next = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->prev = 0; \ + (d).head = i; \ + (d).size++; \ +} while (0) + +#define DEQ_INSERT_TAIL(d,i) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((d).tail) { \ + (i)->prev = (d).tail; \ + (d).tail->next = i; \ + } else { \ + (d).head = i; \ + (i)->prev = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->next = 0; \ + (d).tail = i; \ + (d).size++; \ +} while (0) + +#define DEQ_REMOVE_HEAD(d) \ +do { \ + CT_ASSERT((d).head); \ + if ((d).head) { \ + (d).scratch = (d).head; \ + (d).head = (d).head->next; \ + if ((d).head == 0) { \ + (d).tail = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).head->prev = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ + } \ +} while (0) + +#define DEQ_REMOVE_TAIL(d) \ +do { \ + CT_ASSERT((d).tail); \ + if ((d).tail) { \ + (d).scratch = (d).tail; \ + (d).tail = (d).tail->prev; \ + if ((d).tail == 0) { \ + (d).head = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).tail->next = 0; \ + (d).size--; \ + (d).scratch->next = 0; \ + (d).scratch->prev = 0; \ + } \ +} while (0) + +#define DEQ_INSERT_AFTER(d,i,a) \ +do { \ + CT_ASSERT((i)->next == 0); \ + CT_ASSERT((i)->prev == 0); \ + if ((a)->next) \ + (a)->next->prev = (i); \ + else \ + (d).tail = (i); \ + (i)->next = (a)->next; \ + (i)->prev = (a); \ + (a)->next = (i); \ + (d).size++; \ +} while (0) + +#define DEQ_REMOVE(d,i) \ +do { \ + if ((i)->next) \ + (i)->next->prev = (i)->prev; \ + else \ + (d).tail = (i)->prev; \ + if ((i)->prev) \ + (i)->prev->next = (i)->next; \ + else \ + (d).head = (i)->next; \ + (d).size--; \ + (i)->next = 0; \ + (i)->prev = 0; \ + CT_ASSERT((d).size || (!(d).head && !(d).tail)); \ +} while (0) + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/hash.h b/extras/dispatch/include/qpid/dispatch/hash.h new file mode 100644 index 0000000000..7f4a4bb950 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/hash.h @@ -0,0 +1,37 @@ +#ifndef __dispatch_hash_h__ +#define __dispatch_hash_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <stdlib.h> +#include <qpid/dispatch/iterator.h> + +typedef struct hash_t hash_t; + +hash_t *hash(int bucket_exponent, int batch_size, int value_is_const); +void hash_free(hash_t *h); + +size_t hash_size(hash_t *h); +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); +int hash_remove(hash_t *h, dx_field_iterator_t *key); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/iovec.h b/extras/dispatch/include/qpid/dispatch/iovec.h new file mode 100644 index 0000000000..5b56c638ff --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/iovec.h @@ -0,0 +1,32 @@ +#ifndef __dispatch_iovec_h__ +#define __dispatch_iovec_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <sys/uio.h> + +typedef struct dx_iovec_t dx_iovec_t; + +dx_iovec_t *dx_iovec(int vector_count); +void dx_iovec_free(dx_iovec_t *iov); +struct iovec *dx_iovec_array(dx_iovec_t *iov); +int dx_iovec_count(dx_iovec_t *iov); + + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/iterator.h b/extras/dispatch/include/qpid/dispatch/iterator.h new file mode 100644 index 0000000000..9844286483 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/iterator.h @@ -0,0 +1,113 @@ +#ifndef __dispatch_iterator_h__ +#define __dispatch_iterator_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/buffer.h> + +/** + * The field iterator is used to access fields within a buffer chain. + * It shields the user from the fact that the field may be split across + * one or more physical buffers. + */ +typedef struct dx_field_iterator_t dx_field_iterator_t; + +/** + * Iterator views allow the code traversing the field to see a transformed + * view of the raw field. + * + * ITER_VIEW_ALL - No transformation of the raw field data + * + * ITER_VIEW_NO_HOST - Remove the scheme and host fields from the view + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^^^^^^^^^^^^^^^ + * node-id/node/specific + * ^^^^^^^^^^^^^^^^^^^^^ + * + * ITER_VIEW_NODE_ID - Isolate the node identifier from an address + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^ + * node-id/node/specific + * ^^^^^^^ + * + * ITER_VIEW_NODE_SPECIFIC - Isolate node-specific text from an address + * + * amqp://host.domain.com:port/node-id/node/specific + * ^^^^^^^^^^^^^ + * node-id/node/specific + * ^^^^^^^^^^^^^ + */ +typedef enum { + ITER_VIEW_ALL, + ITER_VIEW_NO_HOST, + ITER_VIEW_NODE_ID, + ITER_VIEW_NODE_SPECIFIC +} dx_iterator_view_t; + +/** + * Create an iterator from a null-terminated string. + * + * The "text" string must stay intact for the whole life of the iterator. The iterator + * does not copy the string, it references it. + */ +dx_field_iterator_t* dx_field_iterator_string(const char *text, + dx_iterator_view_t view); + +/** + * Create an iterator from a field in a buffer chain + */ +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, + int offset, + int length, + dx_iterator_view_t view); + +/** + * Free an iterator + */ +void dx_field_iterator_free(dx_field_iterator_t *iter); + +/** + * Reset the iterator to the first octet and set a new view + */ +void dx_field_iterator_reset(dx_field_iterator_t *iter, + dx_iterator_view_t view); + +/** + * Return the current octet in the iterator's view and step to the next. + */ +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter); + +/** + * Return true iff the iterator has no more octets in the view. + */ +int dx_field_iterator_end(dx_field_iterator_t *iter); + +/** + * Compare an input string to the iterator's view. Return true iff they are equal. + */ +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string); + +/** + * Return a copy of the iterator's view. + */ +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/log.h b/extras/dispatch/include/qpid/dispatch/log.h new file mode 100644 index 0000000000..cbea50f266 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/log.h @@ -0,0 +1,31 @@ +#ifndef __dispatch_log_h__ +#define __dispatch_log_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#define LOG_NONE 0x00000000 +#define LOG_TRACE 0x00000001 +#define LOG_ERROR 0x00000002 +#define LOG_INFO 0x00000004 + +void dx_log(const char *module, int cls, const char *fmt, ...); + +void dx_log_set_mask(int mask); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/message.h b/extras/dispatch/include/qpid/dispatch/message.h new file mode 100644 index 0000000000..41983c44a1 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/message.h @@ -0,0 +1,165 @@ +#ifndef __dispatch_message_h__ +#define __dispatch_message_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/iovec.h> + +// Callback for status change (confirmed persistent, loaded-in-memory, etc.) + +typedef struct dx_message_t dx_message_t; + +DEQ_DECLARE(dx_message_t, dx_message_list_t); + +struct dx_message_t { + DEQ_LINKS(dx_message_t); + // Private members not listed here. +}; + +typedef enum { + DX_DEPTH_NONE, + DX_DEPTH_HEADER, + DX_DEPTH_DELIVERY_ANNOTATIONS, + DX_DEPTH_MESSAGE_ANNOTATIONS, + DX_DEPTH_PROPERTIES, + DX_DEPTH_APPLICATION_PROPERTIES, + DX_DEPTH_BODY, + DX_DEPTH_ALL +} dx_message_depth_t; + + +typedef enum { + // + // Message Sections + // + DX_FIELD_HEADER, + DX_FIELD_DELIVERY_ANNOTATION, + DX_FIELD_MESSAGE_ANNOTATION, + DX_FIELD_PROPERTIES, + DX_FIELD_APPLICATION_PROPERTIES, + DX_FIELD_BODY, + DX_FIELD_FOOTER, + + // + // Fields of the Header Section + // + DX_FIELD_DURABLE, + DX_FIELD_PRIORITY, + DX_FIELD_TTL, + DX_FIELD_FIRST_ACQUIRER, + DX_FIELD_DELIVERY_COUNT, + + // + // Fields of the Properties Section + // + DX_FIELD_MESSAGE_ID, + DX_FIELD_USER_ID, + DX_FIELD_TO, + DX_FIELD_SUBJECT, + DX_FIELD_REPLY_TO, + DX_FIELD_CORRELATION_ID, + DX_FIELD_CONTENT_TYPE, + DX_FIELD_CONTENT_ENCODING, + DX_FIELD_ABSOLUTE_EXPIRY_TIME, + DX_FIELD_CREATION_TIME, + DX_FIELD_GROUP_ID, + DX_FIELD_GROUP_SEQUENCE, + DX_FIELD_REPLY_TO_GROUP_ID +} dx_message_field_t; + +// +// Functions for allocation +// +dx_message_t *dx_allocate_message(void); +void dx_free_message(dx_message_t *qm); +dx_message_t *dx_message_copy(dx_message_t *qm); +int dx_message_persistent(dx_message_t *qm); +int dx_message_in_memory(dx_message_t *qm); + +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg); +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg); + +// +// Functions for received messages +// +dx_message_t *dx_message_receive(pn_delivery_t *delivery); +void dx_message_send(dx_message_t *msg, pn_link_t *link); + +int dx_message_check(dx_message_t *msg, dx_message_depth_t depth); +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field); +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field); + +pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm); + +// +// Functions for composed messages +// + +// Convenience Functions +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers); +void dx_message_copy_header(dx_message_t *msg); // Copy received header into send-header (prior to adding annotations) +void dx_message_copy_message_annotations(dx_message_t *msg); + +// Raw Functions +void dx_message_begin_header(dx_message_t *msg); +void dx_message_end_header(dx_message_t *msg); + +void dx_message_begin_delivery_annotations(dx_message_t *msg); +void dx_message_end_delivery_annotations(dx_message_t *msg); + +void dx_message_begin_message_annotations(dx_message_t *msg); +void dx_message_end_message_annotations(dx_message_t *msg); + +void dx_message_begin_message_properties(dx_message_t *msg); +void dx_message_end_message_properties(dx_message_t *msg); + +void dx_message_begin_application_properties(dx_message_t *msg); +void dx_message_end_application_properties(dx_message_t *msg); + +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers); + +void dx_message_begin_body_sequence(dx_message_t *msg); +void dx_message_end_body_sequence(dx_message_t *msg); + +void dx_message_begin_footer(dx_message_t *msg); +void dx_message_end_footer(dx_message_t *msg); + +void dx_message_insert_null(dx_message_t *msg); +void dx_message_insert_boolean(dx_message_t *msg, int value); +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value); +void dx_message_insert_uint(dx_message_t *msg, uint32_t value); +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value); +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len); +void dx_message_insert_string(dx_message_t *msg, const char *start); +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value); +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len); +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value); +void dx_message_begin_list(dx_message_t* msg); +void dx_message_end_list(dx_message_t* msg); +void dx_message_begin_map(dx_message_t* msg); +void dx_message_end_map(dx_message_t* msg); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/router.h b/extras/dispatch/include/qpid/dispatch/router.h new file mode 100644 index 0000000000..03f4aa15be --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/router.h @@ -0,0 +1,35 @@ +#ifndef __dispatch_router_h__ +#define __dispatch_router_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/container.h> + +typedef struct dx_router_t dx_router_t; + +typedef struct { + size_t message_limit; + size_t memory_limit; +} dx_router_configuration_t; + +dx_router_t *dx_router(dx_router_configuration_t *config); +void dx_router_free(dx_router_t *router); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/server.h b/extras/dispatch/include/qpid/dispatch/server.h new file mode 100644 index 0000000000..635e1323dd --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/server.h @@ -0,0 +1,403 @@ +#ifndef __dispatch_server_h__ +#define __dispatch_server_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> + +/** + * \defgroup Control Server Control Functions + * @{ + */ + +/** + * \brief Thread Start Handler + * + * Callback invoked when a new server thread is started. The callback is + * invoked on the newly created thread. + * + * This handler can be used to set processor affinity or other thread-specific + * tuning values. + * + * @param context The handler context supplied in dx_server_initialize. + * @param thread_id The integer thread identifier that uniquely identifies this thread. + */ +typedef void (*dx_thread_start_cb_t)(void* context, int thread_id); + + +/** + * \brief Initialize the server module and prepare it for operation. + * + * @param thread_count The number of worker threads (1 or more) that the server shall create + */ +void dx_server_initialize(int thread_count); + + +/** + * \brief Finalize the server after it has stopped running. + */ +void dx_server_finalize(void); + + +/** + * \brief Set the optional thread-start handler. + * + * This handler is called once on each worker thread at the time + * the thread is started. This may be used to set tuning settings like processor affinity, etc. + * + * @param start_handler The thread-start handler invoked per thread on thread startup. + * @param context Opaque context to be passed back in the callback function. + */ +void dx_server_set_start_handler(dx_thread_start_cb_t start_handler, void *context); + + +/** + * \brief Run the server threads until completion. + * + * Start the operation of the server, including launching all of the worker threads. + * This function does not return until after the server has been stopped. The thread + * that calls dx_server_run is used as one of the worker threads. + */ +void dx_server_run(void); + + +/** + * \brief Stop the server + * + * Stop the server and join all of its worker threads. This function may be called from any + * thread. When this function returns, all of the other server threads have been closed and + * joined. The calling thread will be the only running thread in the process. + */ +void dx_server_stop(void); + + +/** + * \brief Pause (quiesce) the server. + * + * This call blocks until all of the worker threads (except + * the one calling the this function) are finished processing and have been blocked. When + * this call returns, the calling thread is the only thread running in the process. + */ +void dx_server_pause(void); + + +/** + * \brief Resume normal operation of a paused server. + * + * This call unblocks all of the worker threads + * so they can resume normal connection processing. + */ +void dx_server_resume(void); + + +/** + * @} + * \defgroup Signal Server Signal Handling Functions + * @{ + */ + + +/** + * \brief Signal Handler + * + * Callback for caught signals. This handler will only be invoked for signal numbers + * that were registered via dx_server_signal. The handler is not invoked in the context + * of the OS signal handler. Rather, it is invoked on one of the worker threads in an + * orderly sequence. + * + * @param context The handler context supplied in dx_server_initialize. + * @param signum The signal number that was raised. + */ +typedef void (*dx_signal_handler_cb_t)(void* context, int signum); + + +/** + * Set the signal handler for the server. The signal handler is invoked cleanly on a worker thread + * after the server process catches an operating-system signal. The signal handler is optional and + * need not be set. + * + * @param signal_handler The signal handler called when a registered signal is caught. + * @param context Opaque context to be passed back in the callback function. + */ +void dx_server_set_signal_handler(dx_signal_handler_cb_t signal_handler, void *context); + + +/** + * \brief Register a signal to be caught and handled by the signal handler. + * + * @param signum The signal number of a signal to be handled by the application. + */ +void dx_server_signal(int signum); + + +/** + * @} + * \defgroup Connection Server AMQP Connection Handling Functions + * @{ + */ + +/** + * \brief Listener objects represent the desire to accept incoming transport connections. + */ +typedef struct dx_listener_t dx_listener_t; + +/** + * \brief Connector objects represent the desire to create and maintain an outgoing transport connection. + */ +typedef struct dx_connector_t dx_connector_t; + +/** + * \brief Connection objects wrap Proton connection objects. + */ +typedef struct dx_connection_t dx_connection_t; + +/** + * Event type for the connection callback. + */ +typedef enum { + /// The connection just opened via a listener (inbound). + DX_CONN_EVENT_LISTENER_OPEN, + + /// The connection just opened via a connector (outbound). + DX_CONN_EVENT_CONNECTOR_OPEN, + + /// The connection was closed at the transport level (not cleanly). + DX_CONN_EVENT_CLOSE, + + /// The connection requires processing. + DX_CONN_EVENT_PROCESS +} dx_conn_event_t; + + +/** + * \brief Connection Event Handler + * + * Callback invoked when processing is needed on a proton connection. This callback + * shall be invoked on one of the server's worker threads. The server guarantees that + * no two threads shall be allowed to process a single connection concurrently. + * The implementation of this handler may assume that it has exclusive access to the + * connection and its subservient components (sessions, links, deliveries, etc.). + * + * @param context The handler context supplied in dx_server_{connect,listen}. + * @param event The event/reason for the invocation of the handler. + * @param conn The connection that requires processing by the handler. + * @return A value greater than zero if the handler did any proton processing for + * the connection. If no work was done, zero is returned. + */ +typedef int (*dx_conn_handler_cb_t)(void* context, dx_conn_event_t event, dx_connection_t *conn); + + +/** + * \brief Set the connection event handler callback. + * + * Set the connection handler callback for the server. This callback is mandatory and must be set + * prior to the invocation of dx_server_run. + * + * @param conn_hander The handler for processing connection-related events. + */ +void dx_server_set_conn_handler(dx_conn_handler_cb_t conn_handler); + + +/** + * \brief Set the user context for a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param context User context to be stored with the connection. + */ +void dx_connection_set_context(dx_connection_t *conn, void *context); + + +/** + * \brief Get the user context from a connection. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @return The user context stored with the connection. + */ +void *dx_connection_get_context(dx_connection_t *conn); + + +/** + * \brief Activate a connection for output. + * + * This function is used to request that the server activate the indicated connection. + * It is assumed that the connection is one that the caller does not have permission to + * access (i.e. it may be owned by another thread currently). An activated connection + * will, when writable, appear in the internal work list and be invoked for processing + * by a worker thread. + * + * @param conn The connection over which the application wishes to send data + */ +void dx_server_activate(dx_connection_t *conn); + + +/** + * \brief Get the wrapped proton-engine connection object. + * + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @return The proton connection object. + */ +pn_connection_t *dx_connection_pn(dx_connection_t *conn); + + +/** + * \brief Configuration block for a connector or a listener. + */ +typedef struct dx_server_config_t { + /** + * Host name or network address to bind to a listener or use in the connector. + */ + char *host; + + /** + * Port name or number to bind to a listener or use in the connector. + */ + char *port; + + /** + * Space-separated list of SASL mechanisms to be accepted for the connection. + */ + char *sasl_mechanisms; + + /** + * If appropriate for the mechanism, the username for authentication + * (connector only) + */ + char *sasl_username; + + /** + * If appropriate for the mechanism, the password for authentication + * (connector only) + */ + char *sasl_password; + + /** + * If appropriate for the mechanism, the minimum acceptable security strength factor + */ + int sasl_minssf; + + /** + * If appropriate for the mechanism, the maximum acceptable security strength factor + */ + int sasl_maxssf; + + /** + * SSL is enabled for this connection iff non-zero. + */ + int ssl_enabled; + + /** + * Connection will take on the role of SSL server iff non-zero. + */ + int ssl_server; + + /** + * Iff non-zero AND ssl_enabled is non-zero, this listener will detect the client's use + * of SSL or non-SSL and conform to the client's protocol. + * (listener only) + */ + int ssl_allow_unsecured_client; + + /** + * Path to the file containing the PEM-formatted public certificate for the local end + * of the connection. + */ + char *ssl_certificate_file; + + /** + * Path to the file containing the PEM-formatted private key for the local end of the + * connection. + */ + char *ssl_private_key_file; + + /** + * The password used to sign the private key, or NULL if the key is not protected. + */ + char *ssl_password; + + /** + * Path to the file containing the PEM-formatted set of certificates of trusted CAs. + */ + char *ssl_trusted_certificate_db; + + /** + * Iff non-zero, require that the peer's certificate be supplied and that it be authentic + * according to the set of trusted CAs. + */ + int ssl_require_peer_authentication; + + /** + * Allow the connection to be redirected by the peer (via CLOSE->Redirect). This is + * meaningful for outgoing (connector) connections only. + */ + int allow_redirect; +} dx_server_config_t; + + +/** + * \brief Create a listener for incoming connections. + * + * @param config Pointer to a configuration block for this listener. This block will be + * referenced by the server, not copied. The referenced record must remain + * in-scope for the life of the listener. + * @param context User context passed back in the connection handler. + * @return A pointer to the new listener, or NULL in case of failure. + */ +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context); + + +/** + * \brief Free the resources associated with a listener. + * + * @param li A listener pointer returned by dx_listen. + */ +void dx_listener_free(dx_listener_t* li); + + +/** + * \brief Close a listener so it will accept no more connections. + * + * @param li A listener pointer returned by dx_listen. + */ +void dx_listener_close(dx_listener_t* li); + + +/** + * \brief Create a connector for an outgoing connection. + * + * @param config Pointer to a configuration block for this connector. This block will be + * referenced by the server, not copied. The referenced record must remain + * in-scope for the life of the connector.. + * @param context User context passed back in the connection handler. + * @return A pointer to the new connector, or NULL in case of failure. + */ +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context); + + +/** + * \brief Free the resources associated with a connector. + * + * @param ct A connector pointer returned by dx_connect. + */ +void dx_connector_free(dx_connector_t* ct); + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/threading.h b/extras/dispatch/include/qpid/dispatch/threading.h new file mode 100644 index 0000000000..f275fc0086 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/threading.h @@ -0,0 +1,45 @@ +#ifndef __sys_threading_h__ +#define __sys_threading_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +typedef struct sys_mutex_t sys_mutex_t; + +sys_mutex_t *sys_mutex(void); +void sys_mutex_free(sys_mutex_t *mutex); +void sys_mutex_lock(sys_mutex_t *mutex); +void sys_mutex_unlock(sys_mutex_t *mutex); + + +typedef struct sys_cond_t sys_cond_t; + +sys_cond_t *sys_cond(void); +void sys_cond_free(sys_cond_t *cond); +void sys_cond_wait(sys_cond_t *cond, sys_mutex_t *held_mutex); +void sys_cond_signal(sys_cond_t *cond); +void sys_cond_signal_all(sys_cond_t *cond); + + +typedef struct sys_thread_t sys_thread_t; + +sys_thread_t *sys_thread(void *(*run_function) (void *), void *arg); +void sys_thread_free(sys_thread_t *thread); +void sys_thread_join(sys_thread_t *thread); + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/timer.h b/extras/dispatch/include/qpid/dispatch/timer.h new file mode 100644 index 0000000000..af3a22e262 --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/timer.h @@ -0,0 +1,86 @@ +#ifndef __dispatch_timer_h__ +#define __dispatch_timer_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * \defgroup Timer Server Timer Functions + * @{ + */ + +typedef struct dx_timer_t dx_timer_t; + +/** + * Timer Callback + * + * Callback invoked after a timer's interval expires and the timer fires. + * + * @param context The context supplied in dx_timer + */ +typedef void (*dx_timer_cb_t)(void* context); + + +/** + * Create a new timer object. + * + * @param cb The callback function to be invoked when the timer expires. + * @param context An opaque, user-supplied context to be passed into the callback. + * @return A pointer to the new timer object or NULL if memory is exhausted. + */ +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context); + + +/** + * Free the resources for a timer object. If the timer was scheduled, it will be canceled + * prior to freeing. After this function returns, the callback will not be invoked for this + * timer. + * + * @param timer Pointer to the timer object returned by dx_timer. + */ +void dx_timer_free(dx_timer_t *timer); + + +/** + * Schedule a timer to fire in the future. + * + * Note that the timer callback will never be invoked synchronously during the execution + * of dx_timer_schedule. Even if the interval is immediate (0), the callback invocation will + * be asynchronous and after the return of this function. + * + * @param timer Pointer to the timer object returned by dx_timer. + * @param msec The minimum number of milliseconds of delay until the timer fires. + * If 0 is supplied, the timer will fire immediately. + */ +void dx_timer_schedule(dx_timer_t *timer, long msec); + + +/** + * Attempt to cancel a scheduled timer. Since the timer callback can be invoked on any + * server thread, it is always possible that a last-second cancel attempt may arrive too late + * to stop the timer from firing (i.e. the cancel is concurrent with the fire callback). + * + * @param timer Pointer to the timer object returned by dx_timer. + */ +void dx_timer_cancel(dx_timer_t *timer); + +/** + * @} + */ + +#endif diff --git a/extras/dispatch/include/qpid/dispatch/user_fd.h b/extras/dispatch/include/qpid/dispatch/user_fd.h new file mode 100644 index 0000000000..3e5584ce2e --- /dev/null +++ b/extras/dispatch/include/qpid/dispatch/user_fd.h @@ -0,0 +1,121 @@ +#ifndef __dispatch_user_fd_h__ +#define __dispatch_user_fd_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +/** + * \defgroup UserFd Server User-File-Descriptor Functions + * @{ + */ + +typedef struct dx_user_fd_t dx_user_fd_t; + + +/** + * User_fd Handler + * + * Callback invoked when a user-managed file descriptor is available for reading or writing or there + * was an error on the file descriptor. + * + * @param context The handler context supplied in the dx_user_fd call. + * @param ufd The user_fd handle for the processable fd. + */ +typedef void (*dx_user_fd_handler_cb_t)(void* context, dx_user_fd_t *ufd); + + +/** + * Set the user-fd handler callback for the server. This handler is optional, but must be supplied + * if the dx_server is used to manage the activation of user file descriptors. + */ +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler); + + +/** + * Create a tracker for a user-managed file descriptor. + * + * A user-fd is appropriate for use when the application opens and manages file descriptors + * for purposes other than AMQP communication. Registering a user fd with the dispatch server + * controls processing of the FD alongside the FDs used for messaging. + * + * @param fd The open file descriptor being managed by the application. + * @param context User context passed back in the connection handler. + * @return A pointer to the new user_fd. + */ +dx_user_fd_t *dx_user_fd(int fd, void *context); + + +/** + * Free the resources for a user-managed FD tracker. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_free(dx_user_fd_t *ufd); + + +/** + * Activate a user-fd for read. + * + * Use this activation when the application has capacity to receive data from the user-fd. This will + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the + * file descriptor has data to read. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_activate_read(dx_user_fd_t *ufd); + + +/** + * Activate a user-fd for write. + * + * Use this activation when the application has data to write via the user-fd. This will + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the + * file descriptor is writable. + * + * @param ufd Structure pointer returned by dx_user_fd. + */ +void dx_user_fd_activate_write(dx_user_fd_t *ufd); + + +/** + * Check readable status of a user-fd + * + * Note: It is possible that readable status is spurious (i.e. this function returns true + * but the file-descriptor is not readable and will block if not set to O_NONBLOCK). + * Code accordingly. + * + * @param ufd Structure pointer returned by dx_user_fd. + * @return true iff the user file descriptor is readable. + */ +bool dx_user_fd_is_readable(dx_user_fd_t *ufd); + + +/** + * Check writable status of a user-fd + * + * @param ufd Structure pointer returned by dx_user_fd. + * @return true iff the user file descriptor is writable. + */ +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd); + +/** + * @} + */ + +#endif |