diff options
| author | Ted Ross <tross@apache.org> | 2013-02-21 14:28:48 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-02-21 14:28:48 +0000 |
| commit | dc1aecebb61ab371dc5b94dc637612fe9c92ff44 (patch) | |
| tree | b223936aa1570bbd77aa95d62e73a55c54986b4e | |
| parent | 5ac93e55ad1ec43d4fd985f364cb222d80e915ec (diff) | |
| download | qpid-python-dc1aecebb61ab371dc5b94dc637612fe9c92ff44.tar.gz | |
QPID-4538 - Renamed Qpid Nexus to Qpid Dispatch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1448649 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/extras/dispatch/CMakeLists.txt (renamed from qpid/extras/nexus/CMakeLists.txt) | 14 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/agent.h (renamed from qpid/extras/nexus/include/qpid/nexus/agent.h) | 48 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/alloc.h (renamed from qpid/extras/nexus/include/qpid/nexus/alloc.h) | 34 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/buffer.h (renamed from qpid/extras/nexus/include/qpid/nexus/buffer.h) | 30 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/container.h | 129 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/ctools.h (renamed from qpid/extras/nexus/include/qpid/nexus/ctools.h) | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/hash.h (renamed from qpid/extras/nexus/include/qpid/nexus/hash.h) | 16 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/iovec.h (renamed from qpid/extras/nexus/include/qpid/nexus/iovec.h) | 14 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/iterator.h (renamed from qpid/extras/nexus/include/qpid/nexus/iterator.h) | 32 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/log.h (renamed from qpid/extras/nexus/include/qpid/nexus/log.h) | 8 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/message.h | 165 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/router.h (renamed from qpid/extras/nexus/include/qpid/nexus/router.h) | 14 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/server.h (renamed from qpid/extras/nexus/include/qpid/nexus/server.h) | 92 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/threading.h (renamed from qpid/extras/nexus/include/qpid/nexus/threading.h) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/timer.h (renamed from qpid/extras/nexus/include/qpid/nexus/timer.h) | 26 | ||||
| -rw-r--r-- | qpid/extras/dispatch/include/qpid/dispatch/user_fd.h (renamed from qpid/extras/nexus/include/qpid/nexus/user_fd.h) | 42 | ||||
| -rw-r--r-- | qpid/extras/dispatch/router/CMakeLists.txt (renamed from qpid/extras/nexus/router/CMakeLists.txt) | 6 | ||||
| -rw-r--r-- | qpid/extras/dispatch/router/src/main.c (renamed from qpid/extras/nexus/router/src/main.c) | 60 | ||||
| -rw-r--r-- | qpid/extras/dispatch/site/css/style.css (renamed from qpid/extras/nexus/site/css/style.css) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/site/images/arch.dia | bin | 0 -> 1352 bytes | |||
| -rw-r--r-- | qpid/extras/dispatch/site/images/arch.png | bin | 0 -> 6170 bytes | |||
| -rw-r--r-- | qpid/extras/dispatch/site/includes/footer.include (renamed from qpid/extras/nexus/site/includes/footer.include) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/site/includes/header.include (renamed from qpid/extras/nexus/site/includes/header.include) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/site/includes/menu.include (renamed from qpid/extras/nexus/site/includes/menu.include) | 7 | ||||
| -rwxr-xr-x | qpid/extras/dispatch/site/index.html (renamed from qpid/extras/nexus/site/index.html) | 33 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/agent.c (renamed from qpid/extras/nexus/src/agent.c) | 80 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/alloc.c (renamed from qpid/extras/nexus/src/alloc.c) | 42 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/alloc_private.h (renamed from qpid/extras/nexus/src/alloc_private.h) | 8 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/auth.c (renamed from qpid/extras/nexus/src/auth.c) | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/auth.h (renamed from qpid/extras/nexus/src/auth.h) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/buffer.c (renamed from qpid/extras/nexus/src/buffer.c) | 28 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/container.c (renamed from qpid/extras/nexus/src/container.c) | 194 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/hash.c (renamed from qpid/extras/nexus/src/hash.c) | 34 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/iovec.c (renamed from qpid/extras/nexus/src/iovec.c) | 30 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/iterator.c (renamed from qpid/extras/nexus/src/iterator.c) | 80 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/log.c (renamed from qpid/extras/nexus/src/log.c) | 6 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message.c (renamed from qpid/extras/nexus/src/message.c) | 516 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/message_private.h (renamed from qpid/extras/nexus/src/message_private.h) | 58 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/posix/threading.c (renamed from qpid/extras/nexus/src/posix/threading.c) | 4 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c (renamed from qpid/extras/nexus/src/router_node.c) | 180 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/server.c (renamed from qpid/extras/nexus/src/server.c) | 448 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/server_private.h (renamed from qpid/extras/nexus/src/server_private.h) | 43 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/timer.c (renamed from qpid/extras/nexus/src/timer.c) | 58 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/timer_private.h (renamed from qpid/extras/nexus/src/timer_private.h) | 28 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/work_queue.c (renamed from qpid/extras/nexus/src/work_queue.c) | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/src/work_queue.h (renamed from qpid/extras/nexus/src/work_queue.h) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/CMakeLists.txt (renamed from qpid/extras/nexus/tests/CMakeLists.txt) | 2 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/alloc_test.c (renamed from qpid/extras/nexus/tests/alloc_test.c) | 8 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/message_test.c (renamed from qpid/extras/nexus/tests/message_test.c) | 56 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/run_tests.c (renamed from qpid/extras/nexus/tests/run_tests.c) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/server_test.c (renamed from qpid/extras/nexus/tests/server_test.c) | 70 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/test_case.h (renamed from qpid/extras/nexus/tests/test_case.h) | 0 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/timer_test.c (renamed from qpid/extras/nexus/tests/timer_test.c) | 160 | ||||
| -rw-r--r-- | qpid/extras/dispatch/tests/tool_test.c (renamed from qpid/extras/nexus/tests/tool_test.c) | 2 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/container.h | 129 | ||||
| -rw-r--r-- | qpid/extras/nexus/include/qpid/nexus/message.h | 165 | ||||
| -rw-r--r-- | qpid/extras/nexus/site/images/gwarch.dia | bin | 1370 -> 0 bytes | |||
| -rw-r--r-- | qpid/extras/nexus/site/images/gwarch.png | bin | 7941 -> 0 bytes |
58 files changed, 1606 insertions, 1603 deletions
diff --git a/qpid/extras/nexus/CMakeLists.txt b/qpid/extras/dispatch/CMakeLists.txt index affe292f7d..bc1812fb6b 100644 --- a/qpid/extras/nexus/CMakeLists.txt +++ b/qpid/extras/dispatch/CMakeLists.txt @@ -21,7 +21,7 @@ cmake_minimum_required(VERSION 2.6) include(CheckLibraryExists) include(CheckSymbolExists) -project(qpid-nexus C) +project(qpid-dispatch C) set (SO_VERSION_MAJOR 0) set (SO_VERSION_MINOR 1) @@ -80,17 +80,17 @@ set(server_SOURCES src/work_queue.c ) -add_library(qpid-nexus SHARED ${server_SOURCES}) -target_link_libraries(qpid-nexus ${proton_lib} ${pthread_lib} ${rt_lib}) -set_target_properties(qpid-nexus PROPERTIES +add_library(qpid-dispatch SHARED ${server_SOURCES}) +target_link_libraries(qpid-dispatch ${proton_lib} ${pthread_lib} ${rt_lib}) +set_target_properties(qpid-dispatch PROPERTIES VERSION "${SO_VERSION}" SOVERSION "${SO_VERSION_MAJOR}" LINK_FLAGS "${CATCH_UNDEFINED}" ) -install(TARGETS qpid-nexus +install(TARGETS qpid-dispatch LIBRARY DESTINATION ${LIB_INSTALL_DIR}) -file(GLOB headers "include/qpid/nexus/*.h") -install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/nexus) +file(GLOB headers "include/qpid/dispatch/*.h") +install(FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/qpid/dispatch) ## ## Build Tests diff --git a/qpid/extras/nexus/include/qpid/nexus/agent.h b/qpid/extras/dispatch/include/qpid/dispatch/agent.h index 162399f649..d53d24d4d4 100644 --- a/qpid/extras/nexus/include/qpid/nexus/agent.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/agent.h @@ -1,5 +1,5 @@ -#ifndef __nexus_agent_h__ -#define __nexus_agent_h__ 1 +#ifndef __dispatch_agent_h__ +#define __dispatch_agent_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -28,77 +28,77 @@ * @{ */ -typedef struct nx_agent_class_t nx_agent_class_t; +typedef struct dx_agent_class_t dx_agent_class_t; /** * \brief Get Schema Data Handler * - * @param context The handler context supplied in nx_agent_register. + * @param context The handler context supplied in dx_agent_register. */ -typedef void (*nx_agent_schema_cb_t)(void* context); +typedef void (*dx_agent_schema_cb_t)(void* context); /** * \brief Query Handler * - * @param context The handler context supplied in nx_agent_register. + * @param context The handler context supplied in dx_agent_register. * @param id The identifier of the instance being queried or NULL for all instances. - * @param correlator The correlation handle to be used in calls to nx_agent_value_* + * @param correlator The correlation handle to be used in calls to dx_agent_value_* */ -typedef void (*nx_agent_query_cb_t)(void* context, const char *id, const void *correlator); +typedef void (*dx_agent_query_cb_t)(void* context, const char *id, const void *correlator); /** * \brief Initialize the agent module and prepare it for operation. * */ -void nx_agent_initialize(); +void dx_agent_initialize(); /** * \brief Finalize the agent after it has stopped running. */ -void nx_agent_finalize(void); +void dx_agent_finalize(void); /** * \brief Register a class/object-type with the agent. */ -nx_agent_class_t *nx_agent_register_class(const char *fqname, +dx_agent_class_t *dx_agent_register_class(const char *fqname, void *context, - nx_agent_schema_cb_t schema_handler, - nx_agent_query_cb_t query_handler); + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler); /** * \brief Register an event-type with the agent. */ -nx_agent_class_t *nx_agent_register_event(const char *fqname, +dx_agent_class_t *dx_agent_register_event(const char *fqname, void *context, - nx_agent_schema_cb_t schema_handler); + dx_agent_schema_cb_t schema_handler); /** * */ -void nx_agent_value_string(const void *correlator, const char *key, const char *value); -void nx_agent_value_uint(const void *correlator, const char *key, uint64_t value); -void nx_agent_value_null(const void *correlator, const char *key); -void nx_agent_value_boolean(const void *correlator, const char *key, bool value); -void nx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len); -void nx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value); -void nx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value); +void dx_agent_value_string(const void *correlator, const char *key, const char *value); +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value); +void dx_agent_value_null(const void *correlator, const char *key); +void dx_agent_value_boolean(const void *correlator, const char *key, bool value); +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len); +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value); +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value); /** * */ -void nx_agent_value_complete(const void *correlator, bool more); +void dx_agent_value_complete(const void *correlator, bool more); /** * */ -void *nx_agent_raise_event(nx_agent_class_t *event); +void *dx_agent_raise_event(dx_agent_class_t *event); /** diff --git a/qpid/extras/nexus/include/qpid/nexus/alloc.h b/qpid/extras/dispatch/include/qpid/dispatch/alloc.h index f4ce896f22..ae4190ad89 100644 --- a/qpid/extras/nexus/include/qpid/nexus/alloc.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/alloc.h @@ -1,5 +1,5 @@ -#ifndef __nexus_alloc_h__ -#define __nexus_alloc_h__ 1 +#ifndef __dispatch_alloc_h__ +#define __dispatch_alloc_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,15 +21,15 @@ #include <stdlib.h> #include <stdint.h> -#include <qpid/nexus/threading.h> +#include <qpid/dispatch/threading.h> -typedef struct nx_alloc_pool_t nx_alloc_pool_t; +typedef struct dx_alloc_pool_t dx_alloc_pool_t; typedef struct { int transfer_batch_size; int local_free_list_max; int global_free_list_max; -} nx_alloc_config_t; +} dx_alloc_config_t; typedef struct { uint64_t total_alloc_from_heap; @@ -37,22 +37,22 @@ typedef struct { uint64_t held_by_threads; uint64_t batches_rebalanced_to_threads; uint64_t batches_rebalanced_to_global; -} nx_alloc_stats_t; +} dx_alloc_stats_t; typedef struct { char *type_name; size_t type_size; size_t *additional_size; size_t total_size; - nx_alloc_config_t *config; - nx_alloc_stats_t *stats; - nx_alloc_pool_t *global_pool; + dx_alloc_config_t *config; + dx_alloc_stats_t *stats; + dx_alloc_pool_t *global_pool; sys_mutex_t *lock; -} nx_alloc_type_desc_t; +} dx_alloc_type_desc_t; -void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool); -void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p); +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool); +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p); #define ALLOC_DECLARE(T) \ @@ -60,11 +60,11 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p); void free_##T(T *p) #define ALLOC_DEFINE_CONFIG(T,S,A,C) \ - nx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \ - __thread nx_alloc_pool_t *__local_pool_##T = 0; \ - T *new_##T() { return (T*) nx_alloc(&__desc_##T, &__local_pool_##T); } \ - void free_##T(T *p) { nx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ - nx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } + dx_alloc_type_desc_t __desc_##T = {#T, S, A, 0, C, 0, 0, 0}; \ + __thread dx_alloc_pool_t *__local_pool_##T = 0; \ + T *new_##T() { return (T*) dx_alloc(&__desc_##T, &__local_pool_##T); } \ + void free_##T(T *p) { dx_dealloc(&__desc_##T, &__local_pool_##T, (void*) p); } \ + dx_alloc_stats_t *alloc_stats_##T() { return __desc_##T.stats; } #define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) diff --git a/qpid/extras/nexus/include/qpid/nexus/buffer.h b/qpid/extras/dispatch/include/qpid/dispatch/buffer.h index 8c2e0d8374..1c372b265d 100644 --- a/qpid/extras/nexus/include/qpid/nexus/buffer.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/buffer.h @@ -1,5 +1,5 @@ -#ifndef __nexus_buffer_h__ -#define __nexus_buffer_h__ 1 +#ifndef __dispatch_buffer_h__ +#define __dispatch_buffer_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,53 +19,53 @@ * under the License. */ -#include <qpid/nexus/ctools.h> +#include <qpid/dispatch/ctools.h> -typedef struct nx_buffer_t nx_buffer_t; +typedef struct dx_buffer_t dx_buffer_t; -DEQ_DECLARE(nx_buffer_t, nx_buffer_list_t); +DEQ_DECLARE(dx_buffer_t, dx_buffer_list_t); -struct nx_buffer_t { - DEQ_LINKS(nx_buffer_t); +struct dx_buffer_t { + DEQ_LINKS(dx_buffer_t); unsigned int size; }; /** */ -void nx_buffer_set_size(size_t size); +void dx_buffer_set_size(size_t size); /** */ -nx_buffer_t *nx_allocate_buffer(void); +dx_buffer_t *dx_allocate_buffer(void); /** * @param buf A pointer to an allocated buffer */ -void nx_free_buffer(nx_buffer_t *buf); +void dx_free_buffer(dx_buffer_t *buf); /** * @param buf A pointer to an allocated buffer * @return A pointer to the first octet in the buffer */ -unsigned char *nx_buffer_base(nx_buffer_t *buf); +unsigned char *dx_buffer_base(dx_buffer_t *buf); /** * @param buf A pointer to an allocated buffer * @return A pointer to the first free octet in the buffer, the insert point for new data. */ -unsigned char *nx_buffer_cursor(nx_buffer_t *buf); +unsigned char *dx_buffer_cursor(dx_buffer_t *buf); /** * @param buf A pointer to an allocated buffer * @return The number of octets in the buffer's free space, how many octets may be inserted. */ -size_t nx_buffer_capacity(nx_buffer_t *buf); +size_t dx_buffer_capacity(dx_buffer_t *buf); /** * @param buf A pointer to an allocated buffer * @return The number of octets of data in the buffer */ -size_t nx_buffer_size(nx_buffer_t *buf); +size_t dx_buffer_size(dx_buffer_t *buf); /** * Notify the buffer that octets have been inserted at the buffer's cursor. This will advance the @@ -74,6 +74,6 @@ size_t nx_buffer_size(nx_buffer_t *buf); * @param buf A pointer to an allocated buffer * @param len The number of octets that have been appended to the buffer */ -void nx_buffer_insert(nx_buffer_t *buf, size_t len); +void dx_buffer_insert(dx_buffer_t *buf, size_t len); #endif diff --git a/qpid/extras/dispatch/include/qpid/dispatch/container.h b/qpid/extras/dispatch/include/qpid/dispatch/container.h new file mode 100644 index 0000000000..01a24fbbef --- /dev/null +++ b/qpid/extras/dispatch/include/qpid/dispatch/container.h @@ -0,0 +1,129 @@ +#ifndef __dispatch_container_h__ +#define __dispatch_container_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> + +typedef uint8_t dx_dist_mode_t; +#define DX_DIST_COPY 0x01 +#define DX_DIST_MOVE 0x02 +#define DX_DIST_BOTH 0x03 + +/** + * Node Lifetime Policy (see AMQP 3.5.9) + */ +typedef enum { + DX_LIFE_PERMANENT, + DX_LIFE_DELETE_CLOSE, + DX_LIFE_DELETE_NO_LINKS, + DX_LIFE_DELETE_NO_MESSAGES, + DX_LIFE_DELETE_NO_LINKS_MESSAGES +} dx_lifetime_policy_t; + + +/** + * Link Direction + */ +typedef enum { + DX_INCOMING, + DX_OUTGOING +} dx_direction_t; + + +typedef struct dx_node_t dx_node_t; +typedef struct dx_link_t dx_link_t; + +typedef void (*dx_container_delivery_handler_t) (void *node_context, dx_link_t *link, pn_delivery_t *delivery); +typedef int (*dx_container_link_handler_t) (void *node_context, dx_link_t *link); +typedef int (*dx_container_link_detach_handler_t) (void *node_context, dx_link_t *link, int closed); +typedef void (*dx_container_node_handler_t) (void *type_context, dx_node_t *node); +typedef void (*dx_container_conn_handler_t) (void *type_context, dx_connection_t *conn); + +typedef struct { + char *type_name; + void *type_context; + int allow_dynamic_creation; + + // + // Node-Instance Handlers + // + dx_container_delivery_handler_t rx_handler; + dx_container_delivery_handler_t tx_handler; + dx_container_delivery_handler_t disp_handler; + dx_container_link_handler_t incoming_handler; + dx_container_link_handler_t outgoing_handler; + dx_container_link_handler_t writable_handler; + dx_container_link_detach_handler_t link_detach_handler; + + // + // Node-Type Handlers + // + dx_container_node_handler_t node_created_handler; + dx_container_node_handler_t node_destroyed_handler; + dx_container_conn_handler_t inbound_conn_open_handler; + dx_container_conn_handler_t outbound_conn_open_handler; +} dx_node_type_t; + +void dx_container_initialize(void); +void dx_container_finalize(void); + +int dx_container_register_node_type(const dx_node_type_t *nt); + +void dx_container_set_default_node_type(const dx_node_type_t *nt, + void *node_context, + dx_dist_mode_t supported_dist); + +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, + const char *name, + void *node_context, + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy); +void dx_container_destroy_node(dx_node_t *node); + +void dx_container_node_set_context(dx_node_t *node, void *node_context); +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node); +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node); + +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name); +void dx_link_set_context(dx_link_t *link, void *link_context); +void *dx_link_get_context(dx_link_t *link); +pn_link_t *dx_link_pn(dx_link_t *link); +pn_terminus_t *dx_link_source(dx_link_t *link); +pn_terminus_t *dx_link_target(dx_link_t *link); +pn_terminus_t *dx_link_remote_source(dx_link_t *link); +pn_terminus_t *dx_link_remote_target(dx_link_t *link); +void dx_link_activate(dx_link_t *link); +void dx_link_close(dx_link_t *link); + + +typedef struct dx_link_item_t dx_link_item_t; + +struct dx_link_item_t { + DEQ_LINKS(dx_link_item_t); + dx_link_t *link; +}; + +ALLOC_DECLARE(dx_link_item_t); +DEQ_DECLARE(dx_link_item_t, dx_link_list_t); + +#endif diff --git a/qpid/extras/nexus/include/qpid/nexus/ctools.h b/qpid/extras/dispatch/include/qpid/dispatch/ctools.h index 67298f3782..33178a23ee 100644 --- a/qpid/extras/nexus/include/qpid/nexus/ctools.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/ctools.h @@ -1,5 +1,5 @@ -#ifndef __ctools_h__ -#define __ctools_h__ 1 +#ifndef __dispatch_ctools_h__ +#define __dispatch_ctools_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/qpid/extras/nexus/include/qpid/nexus/hash.h b/qpid/extras/dispatch/include/qpid/dispatch/hash.h index 0efded35e8..7f4a4bb950 100644 --- a/qpid/extras/nexus/include/qpid/nexus/hash.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/hash.h @@ -1,5 +1,5 @@ -#ifndef __hash_h__ -#define __hash_h__ 1 +#ifndef __dispatch_hash_h__ +#define __dispatch_hash_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,7 +20,7 @@ */ #include <stdlib.h> -#include <qpid/nexus/iterator.h> +#include <qpid/dispatch/iterator.h> typedef struct hash_t hash_t; @@ -28,10 +28,10 @@ hash_t *hash(int bucket_exponent, int batch_size, int value_is_const); void hash_free(hash_t *h); size_t hash_size(hash_t *h); -int hash_insert(hash_t *h, nx_field_iterator_t *key, void *val); -int hash_insert_const(hash_t *h, nx_field_iterator_t *key, const void *val); -int hash_retrieve(hash_t *h, nx_field_iterator_t *key, void **val); -int hash_retrieve_const(hash_t *h, nx_field_iterator_t *key, const void **val); -int hash_remove(hash_t *h, nx_field_iterator_t *key); +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val); +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val); +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val); +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val); +int hash_remove(hash_t *h, dx_field_iterator_t *key); #endif diff --git a/qpid/extras/nexus/include/qpid/nexus/iovec.h b/qpid/extras/dispatch/include/qpid/dispatch/iovec.h index 33730b9ed3..5b56c638ff 100644 --- a/qpid/extras/nexus/include/qpid/nexus/iovec.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/iovec.h @@ -1,5 +1,5 @@ -#ifndef __nexus_iovec_h__ -#define __nexus_iovec_h__ 1 +#ifndef __dispatch_iovec_h__ +#define __dispatch_iovec_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,12 +21,12 @@ #include <sys/uio.h> -typedef struct nx_iovec_t nx_iovec_t; +typedef struct dx_iovec_t dx_iovec_t; -nx_iovec_t *nx_iovec(int vector_count); -void nx_iovec_free(nx_iovec_t *iov); -struct iovec *nx_iovec_array(nx_iovec_t *iov); -int nx_iovec_count(nx_iovec_t *iov); +dx_iovec_t *dx_iovec(int vector_count); +void dx_iovec_free(dx_iovec_t *iov); +struct iovec *dx_iovec_array(dx_iovec_t *iov); +int dx_iovec_count(dx_iovec_t *iov); #endif diff --git a/qpid/extras/nexus/include/qpid/nexus/iterator.h b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h index a98ab257fa..9844286483 100644 --- a/qpid/extras/nexus/include/qpid/nexus/iterator.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/iterator.h @@ -1,5 +1,5 @@ -#ifndef __nexus_iterator_h__ -#define __nexus_iterator_h__ 1 +#ifndef __dispatch_iterator_h__ +#define __dispatch_iterator_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,14 +19,14 @@ * under the License. */ -#include <qpid/nexus/buffer.h> +#include <qpid/dispatch/buffer.h> /** * The field iterator is used to access fields within a buffer chain. * It shields the user from the fact that the field may be split across * one or more physical buffers. */ -typedef struct nx_field_iterator_t nx_field_iterator_t; +typedef struct dx_field_iterator_t dx_field_iterator_t; /** * Iterator views allow the code traversing the field to see a transformed @@ -60,7 +60,7 @@ typedef enum { ITER_VIEW_NO_HOST, ITER_VIEW_NODE_ID, ITER_VIEW_NODE_SPECIFIC -} nx_iterator_view_t; +} dx_iterator_view_t; /** * Create an iterator from a null-terminated string. @@ -68,46 +68,46 @@ typedef enum { * The "text" string must stay intact for the whole life of the iterator. The iterator * does not copy the string, it references it. */ -nx_field_iterator_t* nx_field_iterator_string(const char *text, - nx_iterator_view_t view); +dx_field_iterator_t* dx_field_iterator_string(const char *text, + dx_iterator_view_t view); /** * Create an iterator from a field in a buffer chain */ -nx_field_iterator_t *nx_field_iterator_buffer(nx_buffer_t *buffer, +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, - nx_iterator_view_t view); + dx_iterator_view_t view); /** * Free an iterator */ -void nx_field_iterator_free(nx_field_iterator_t *iter); +void dx_field_iterator_free(dx_field_iterator_t *iter); /** * Reset the iterator to the first octet and set a new view */ -void nx_field_iterator_reset(nx_field_iterator_t *iter, - nx_iterator_view_t view); +void dx_field_iterator_reset(dx_field_iterator_t *iter, + dx_iterator_view_t view); /** * Return the current octet in the iterator's view and step to the next. */ -unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter); +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter); /** * Return true iff the iterator has no more octets in the view. */ -int nx_field_iterator_end(nx_field_iterator_t *iter); +int dx_field_iterator_end(dx_field_iterator_t *iter); /** * Compare an input string to the iterator's view. Return true iff they are equal. */ -int nx_field_iterator_equal(nx_field_iterator_t *iter, unsigned char *string); +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string); /** * Return a copy of the iterator's view. */ -unsigned char *nx_field_iterator_copy(nx_field_iterator_t *iter); +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter); #endif diff --git a/qpid/extras/nexus/include/qpid/nexus/log.h b/qpid/extras/dispatch/include/qpid/dispatch/log.h index 1376405d13..cbea50f266 100644 --- a/qpid/extras/nexus/include/qpid/nexus/log.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/log.h @@ -1,5 +1,5 @@ -#ifndef __nx_log_h__ -#define __nx_log_h__ 1 +#ifndef __dispatch_log_h__ +#define __dispatch_log_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -24,8 +24,8 @@ #define LOG_ERROR 0x00000002 #define LOG_INFO 0x00000004 -void nx_log(const char *module, int cls, const char *fmt, ...); +void dx_log(const char *module, int cls, const char *fmt, ...); -void nx_log_set_mask(int mask); +void dx_log_set_mask(int mask); #endif diff --git a/qpid/extras/dispatch/include/qpid/dispatch/message.h b/qpid/extras/dispatch/include/qpid/dispatch/message.h new file mode 100644 index 0000000000..41983c44a1 --- /dev/null +++ b/qpid/extras/dispatch/include/qpid/dispatch/message.h @@ -0,0 +1,165 @@ +#ifndef __dispatch_message_h__ +#define __dispatch_message_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/engine.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/iovec.h> + +// Callback for status change (confirmed persistent, loaded-in-memory, etc.) + +typedef struct dx_message_t dx_message_t; + +DEQ_DECLARE(dx_message_t, dx_message_list_t); + +struct dx_message_t { + DEQ_LINKS(dx_message_t); + // Private members not listed here. +}; + +typedef enum { + DX_DEPTH_NONE, + DX_DEPTH_HEADER, + DX_DEPTH_DELIVERY_ANNOTATIONS, + DX_DEPTH_MESSAGE_ANNOTATIONS, + DX_DEPTH_PROPERTIES, + DX_DEPTH_APPLICATION_PROPERTIES, + DX_DEPTH_BODY, + DX_DEPTH_ALL +} dx_message_depth_t; + + +typedef enum { + // + // Message Sections + // + DX_FIELD_HEADER, + DX_FIELD_DELIVERY_ANNOTATION, + DX_FIELD_MESSAGE_ANNOTATION, + DX_FIELD_PROPERTIES, + DX_FIELD_APPLICATION_PROPERTIES, + DX_FIELD_BODY, + DX_FIELD_FOOTER, + + // + // Fields of the Header Section + // + DX_FIELD_DURABLE, + DX_FIELD_PRIORITY, + DX_FIELD_TTL, + DX_FIELD_FIRST_ACQUIRER, + DX_FIELD_DELIVERY_COUNT, + + // + // Fields of the Properties Section + // + DX_FIELD_MESSAGE_ID, + DX_FIELD_USER_ID, + DX_FIELD_TO, + DX_FIELD_SUBJECT, + DX_FIELD_REPLY_TO, + DX_FIELD_CORRELATION_ID, + DX_FIELD_CONTENT_TYPE, + DX_FIELD_CONTENT_ENCODING, + DX_FIELD_ABSOLUTE_EXPIRY_TIME, + DX_FIELD_CREATION_TIME, + DX_FIELD_GROUP_ID, + DX_FIELD_GROUP_SEQUENCE, + DX_FIELD_REPLY_TO_GROUP_ID +} dx_message_field_t; + +// +// Functions for allocation +// +dx_message_t *dx_allocate_message(void); +void dx_free_message(dx_message_t *qm); +dx_message_t *dx_message_copy(dx_message_t *qm); +int dx_message_persistent(dx_message_t *qm); +int dx_message_in_memory(dx_message_t *qm); + +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg); +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery); +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg); + +// +// Functions for received messages +// +dx_message_t *dx_message_receive(pn_delivery_t *delivery); +void dx_message_send(dx_message_t *msg, pn_link_t *link); + +int dx_message_check(dx_message_t *msg, dx_message_depth_t depth); +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field); +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field); + +pn_delivery_t *dx_message_inbound_delivery(dx_message_t *qm); + +// +// Functions for composed messages +// + +// Convenience Functions +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers); +void dx_message_copy_header(dx_message_t *msg); // Copy received header into send-header (prior to adding annotations) +void dx_message_copy_message_annotations(dx_message_t *msg); + +// Raw Functions +void dx_message_begin_header(dx_message_t *msg); +void dx_message_end_header(dx_message_t *msg); + +void dx_message_begin_delivery_annotations(dx_message_t *msg); +void dx_message_end_delivery_annotations(dx_message_t *msg); + +void dx_message_begin_message_annotations(dx_message_t *msg); +void dx_message_end_message_annotations(dx_message_t *msg); + +void dx_message_begin_message_properties(dx_message_t *msg); +void dx_message_end_message_properties(dx_message_t *msg); + +void dx_message_begin_application_properties(dx_message_t *msg); +void dx_message_end_application_properties(dx_message_t *msg); + +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers); + +void dx_message_begin_body_sequence(dx_message_t *msg); +void dx_message_end_body_sequence(dx_message_t *msg); + +void dx_message_begin_footer(dx_message_t *msg); +void dx_message_end_footer(dx_message_t *msg); + +void dx_message_insert_null(dx_message_t *msg); +void dx_message_insert_boolean(dx_message_t *msg, int value); +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value); +void dx_message_insert_uint(dx_message_t *msg, uint32_t value); +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value); +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len); +void dx_message_insert_string(dx_message_t *msg, const char *start); +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value); +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len); +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value); +void dx_message_begin_list(dx_message_t* msg); +void dx_message_end_list(dx_message_t* msg); +void dx_message_begin_map(dx_message_t* msg); +void dx_message_end_map(dx_message_t* msg); + +#endif diff --git a/qpid/extras/nexus/include/qpid/nexus/router.h b/qpid/extras/dispatch/include/qpid/dispatch/router.h index 504a1d7858..03f4aa15be 100644 --- a/qpid/extras/nexus/include/qpid/nexus/router.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/router.h @@ -1,5 +1,5 @@ -#ifndef __nexus_router_h__ -#define __nexus_router_h__ 1 +#ifndef __dispatch_router_h__ +#define __dispatch_router_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,16 +20,16 @@ */ #include <proton/engine.h> -#include <qpid/nexus/container.h> +#include <qpid/dispatch/container.h> -typedef struct nx_router_t nx_router_t; +typedef struct dx_router_t dx_router_t; typedef struct { size_t message_limit; size_t memory_limit; -} nx_router_configuration_t; +} dx_router_configuration_t; -nx_router_t *nx_router(nx_router_configuration_t *config); -void nx_router_free(nx_router_t *router); +dx_router_t *dx_router(dx_router_configuration_t *config); +void dx_router_free(dx_router_t *router); #endif diff --git a/qpid/extras/nexus/include/qpid/nexus/server.h b/qpid/extras/dispatch/include/qpid/dispatch/server.h index b04db5cf9a..635e1323dd 100644 --- a/qpid/extras/nexus/include/qpid/nexus/server.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/server.h @@ -1,5 +1,5 @@ -#ifndef __nexus_server_h__ -#define __nexus_server_h__ 1 +#ifndef __dispatch_server_h__ +#define __dispatch_server_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -35,10 +35,10 @@ * This handler can be used to set processor affinity or other thread-specific * tuning values. * - * @param context The handler context supplied in nx_server_initialize. + * @param context The handler context supplied in dx_server_initialize. * @param thread_id The integer thread identifier that uniquely identifies this thread. */ -typedef void (*nx_thread_start_cb_t)(void* context, int thread_id); +typedef void (*dx_thread_start_cb_t)(void* context, int thread_id); /** @@ -46,13 +46,13 @@ typedef void (*nx_thread_start_cb_t)(void* context, int thread_id); * * @param thread_count The number of worker threads (1 or more) that the server shall create */ -void nx_server_initialize(int thread_count); +void dx_server_initialize(int thread_count); /** * \brief Finalize the server after it has stopped running. */ -void nx_server_finalize(void); +void dx_server_finalize(void); /** @@ -64,7 +64,7 @@ void nx_server_finalize(void); * @param start_handler The thread-start handler invoked per thread on thread startup. * @param context Opaque context to be passed back in the callback function. */ -void nx_server_set_start_handler(nx_thread_start_cb_t start_handler, void *context); +void dx_server_set_start_handler(dx_thread_start_cb_t start_handler, void *context); /** @@ -72,9 +72,9 @@ void nx_server_set_start_handler(nx_thread_start_cb_t start_handler, void *conte * * Start the operation of the server, including launching all of the worker threads. * This function does not return until after the server has been stopped. The thread - * that calls nx_server_run is used as one of the worker threads. + * that calls dx_server_run is used as one of the worker threads. */ -void nx_server_run(void); +void dx_server_run(void); /** @@ -84,7 +84,7 @@ void nx_server_run(void); * thread. When this function returns, all of the other server threads have been closed and * joined. The calling thread will be the only running thread in the process. */ -void nx_server_stop(void); +void dx_server_stop(void); /** @@ -94,7 +94,7 @@ void nx_server_stop(void); * the one calling the this function) are finished processing and have been blocked. When * this call returns, the calling thread is the only thread running in the process. */ -void nx_server_pause(void); +void dx_server_pause(void); /** @@ -103,7 +103,7 @@ void nx_server_pause(void); * This call unblocks all of the worker threads * so they can resume normal connection processing. */ -void nx_server_resume(void); +void dx_server_resume(void); /** @@ -117,14 +117,14 @@ void nx_server_resume(void); * \brief Signal Handler * * Callback for caught signals. This handler will only be invoked for signal numbers - * that were registered via nx_server_signal. The handler is not invoked in the context + * that were registered via dx_server_signal. The handler is not invoked in the context * of the OS signal handler. Rather, it is invoked on one of the worker threads in an * orderly sequence. * - * @param context The handler context supplied in nx_server_initialize. + * @param context The handler context supplied in dx_server_initialize. * @param signum The signal number that was raised. */ -typedef void (*nx_signal_handler_cb_t)(void* context, int signum); +typedef void (*dx_signal_handler_cb_t)(void* context, int signum); /** @@ -135,7 +135,7 @@ typedef void (*nx_signal_handler_cb_t)(void* context, int signum); * @param signal_handler The signal handler called when a registered signal is caught. * @param context Opaque context to be passed back in the callback function. */ -void nx_server_set_signal_handler(nx_signal_handler_cb_t signal_handler, void *context); +void dx_server_set_signal_handler(dx_signal_handler_cb_t signal_handler, void *context); /** @@ -143,7 +143,7 @@ void nx_server_set_signal_handler(nx_signal_handler_cb_t signal_handler, void *c * * @param signum The signal number of a signal to be handled by the application. */ -void nx_server_signal(int signum); +void dx_server_signal(int signum); /** @@ -155,34 +155,34 @@ void nx_server_signal(int signum); /** * \brief Listener objects represent the desire to accept incoming transport connections. */ -typedef struct nx_listener_t nx_listener_t; +typedef struct dx_listener_t dx_listener_t; /** * \brief Connector objects represent the desire to create and maintain an outgoing transport connection. */ -typedef struct nx_connector_t nx_connector_t; +typedef struct dx_connector_t dx_connector_t; /** * \brief Connection objects wrap Proton connection objects. */ -typedef struct nx_connection_t nx_connection_t; +typedef struct dx_connection_t dx_connection_t; /** * Event type for the connection callback. */ typedef enum { /// The connection just opened via a listener (inbound). - NX_CONN_EVENT_LISTENER_OPEN, + DX_CONN_EVENT_LISTENER_OPEN, /// The connection just opened via a connector (outbound). - NX_CONN_EVENT_CONNECTOR_OPEN, + DX_CONN_EVENT_CONNECTOR_OPEN, /// The connection was closed at the transport level (not cleanly). - NX_CONN_EVENT_CLOSE, + DX_CONN_EVENT_CLOSE, /// The connection requires processing. - NX_CONN_EVENT_PROCESS -} nx_conn_event_t; + DX_CONN_EVENT_PROCESS +} dx_conn_event_t; /** @@ -194,42 +194,42 @@ typedef enum { * The implementation of this handler may assume that it has exclusive access to the * connection and its subservient components (sessions, links, deliveries, etc.). * - * @param context The handler context supplied in nx_server_{connect,listen}. + * @param context The handler context supplied in dx_server_{connect,listen}. * @param event The event/reason for the invocation of the handler. * @param conn The connection that requires processing by the handler. * @return A value greater than zero if the handler did any proton processing for * the connection. If no work was done, zero is returned. */ -typedef int (*nx_conn_handler_cb_t)(void* context, nx_conn_event_t event, nx_connection_t *conn); +typedef int (*dx_conn_handler_cb_t)(void* context, dx_conn_event_t event, dx_connection_t *conn); /** * \brief Set the connection event handler callback. * * Set the connection handler callback for the server. This callback is mandatory and must be set - * prior to the invocation of nx_server_run. + * prior to the invocation of dx_server_run. * * @param conn_hander The handler for processing connection-related events. */ -void nx_server_set_conn_handler(nx_conn_handler_cb_t conn_handler); +void dx_server_set_conn_handler(dx_conn_handler_cb_t conn_handler); /** * \brief Set the user context for a connection. * - * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN * @param context User context to be stored with the connection. */ -void nx_connection_set_context(nx_connection_t *conn, void *context); +void dx_connection_set_context(dx_connection_t *conn, void *context); /** * \brief Get the user context from a connection. * - * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN * @return The user context stored with the connection. */ -void *nx_connection_get_context(nx_connection_t *conn); +void *dx_connection_get_context(dx_connection_t *conn); /** @@ -243,22 +243,22 @@ void *nx_connection_get_context(nx_connection_t *conn); * * @param conn The connection over which the application wishes to send data */ -void nx_server_activate(nx_connection_t *conn); +void dx_server_activate(dx_connection_t *conn); /** * \brief Get the wrapped proton-engine connection object. * - * @param conn Connection object supplied in NX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN + * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN * @return The proton connection object. */ -pn_connection_t *nx_connection_pn(nx_connection_t *conn); +pn_connection_t *dx_connection_pn(dx_connection_t *conn); /** * \brief Configuration block for a connector or a listener. */ -typedef struct nx_server_config_t { +typedef struct dx_server_config_t { /** * Host name or network address to bind to a listener or use in the connector. */ @@ -346,7 +346,7 @@ typedef struct nx_server_config_t { * meaningful for outgoing (connector) connections only. */ int allow_redirect; -} nx_server_config_t; +} dx_server_config_t; /** @@ -358,23 +358,23 @@ typedef struct nx_server_config_t { * @param context User context passed back in the connection handler. * @return A pointer to the new listener, or NULL in case of failure. */ -nx_listener_t *nx_server_listen(const nx_server_config_t *config, void *context); +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context); /** * \brief Free the resources associated with a listener. * - * @param li A listener pointer returned by nx_listen. + * @param li A listener pointer returned by dx_listen. */ -void nx_listener_free(nx_listener_t* li); +void dx_listener_free(dx_listener_t* li); /** * \brief Close a listener so it will accept no more connections. * - * @param li A listener pointer returned by nx_listen. + * @param li A listener pointer returned by dx_listen. */ -void nx_listener_close(nx_listener_t* li); +void dx_listener_close(dx_listener_t* li); /** @@ -386,15 +386,15 @@ void nx_listener_close(nx_listener_t* li); * @param context User context passed back in the connection handler. * @return A pointer to the new connector, or NULL in case of failure. */ -nx_connector_t *nx_server_connect(const nx_server_config_t *config, void *context); +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context); /** * \brief Free the resources associated with a connector. * - * @param ct A connector pointer returned by nx_connect. + * @param ct A connector pointer returned by dx_connect. */ -void nx_connector_free(nx_connector_t* ct); +void dx_connector_free(dx_connector_t* ct); /** * @} diff --git a/qpid/extras/nexus/include/qpid/nexus/threading.h b/qpid/extras/dispatch/include/qpid/dispatch/threading.h index f275fc0086..f275fc0086 100644 --- a/qpid/extras/nexus/include/qpid/nexus/threading.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/threading.h diff --git a/qpid/extras/nexus/include/qpid/nexus/timer.h b/qpid/extras/dispatch/include/qpid/dispatch/timer.h index 5444989296..af3a22e262 100644 --- a/qpid/extras/nexus/include/qpid/nexus/timer.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/timer.h @@ -1,5 +1,5 @@ -#ifndef __nexus_timer_h__ -#define __nexus_timer_h__ 1 +#ifndef __dispatch_timer_h__ +#define __dispatch_timer_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -24,16 +24,16 @@ * @{ */ -typedef struct nx_timer_t nx_timer_t; +typedef struct dx_timer_t dx_timer_t; /** * Timer Callback * * Callback invoked after a timer's interval expires and the timer fires. * - * @param context The context supplied in nx_timer + * @param context The context supplied in dx_timer */ -typedef void (*nx_timer_cb_t)(void* context); +typedef void (*dx_timer_cb_t)(void* context); /** @@ -43,7 +43,7 @@ typedef void (*nx_timer_cb_t)(void* context); * @param context An opaque, user-supplied context to be passed into the callback. * @return A pointer to the new timer object or NULL if memory is exhausted. */ -nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context); +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context); /** @@ -51,23 +51,23 @@ nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context); * prior to freeing. After this function returns, the callback will not be invoked for this * timer. * - * @param timer Pointer to the timer object returned by nx_timer. + * @param timer Pointer to the timer object returned by dx_timer. */ -void nx_timer_free(nx_timer_t *timer); +void dx_timer_free(dx_timer_t *timer); /** * Schedule a timer to fire in the future. * * Note that the timer callback will never be invoked synchronously during the execution - * of nx_timer_schedule. Even if the interval is immediate (0), the callback invocation will + * of dx_timer_schedule. Even if the interval is immediate (0), the callback invocation will * be asynchronous and after the return of this function. * - * @param timer Pointer to the timer object returned by nx_timer. + * @param timer Pointer to the timer object returned by dx_timer. * @param msec The minimum number of milliseconds of delay until the timer fires. * If 0 is supplied, the timer will fire immediately. */ -void nx_timer_schedule(nx_timer_t *timer, long msec); +void dx_timer_schedule(dx_timer_t *timer, long msec); /** @@ -75,9 +75,9 @@ void nx_timer_schedule(nx_timer_t *timer, long msec); * server thread, it is always possible that a last-second cancel attempt may arrive too late * to stop the timer from firing (i.e. the cancel is concurrent with the fire callback). * - * @param timer Pointer to the timer object returned by nx_timer. + * @param timer Pointer to the timer object returned by dx_timer. */ -void nx_timer_cancel(nx_timer_t *timer); +void dx_timer_cancel(dx_timer_t *timer); /** * @} diff --git a/qpid/extras/nexus/include/qpid/nexus/user_fd.h b/qpid/extras/dispatch/include/qpid/dispatch/user_fd.h index 2f139c2c4f..3e5584ce2e 100644 --- a/qpid/extras/nexus/include/qpid/nexus/user_fd.h +++ b/qpid/extras/dispatch/include/qpid/dispatch/user_fd.h @@ -1,5 +1,5 @@ -#ifndef __nexus_user_fd_h__ -#define __nexus_user_fd_h__ 1 +#ifndef __dispatch_user_fd_h__ +#define __dispatch_user_fd_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -25,7 +25,7 @@ * @{ */ -typedef struct nx_user_fd_t nx_user_fd_t; +typedef struct dx_user_fd_t dx_user_fd_t; /** @@ -34,63 +34,63 @@ typedef struct nx_user_fd_t nx_user_fd_t; * Callback invoked when a user-managed file descriptor is available for reading or writing or there * was an error on the file descriptor. * - * @param context The handler context supplied in the nx_user_fd call. + * @param context The handler context supplied in the dx_user_fd call. * @param ufd The user_fd handle for the processable fd. */ -typedef void (*nx_user_fd_handler_cb_t)(void* context, nx_user_fd_t *ufd); +typedef void (*dx_user_fd_handler_cb_t)(void* context, dx_user_fd_t *ufd); /** * Set the user-fd handler callback for the server. This handler is optional, but must be supplied - * if the nx_server is used to manage the activation of user file descriptors. + * if the dx_server is used to manage the activation of user file descriptors. */ -void nx_server_set_user_fd_handler(nx_user_fd_handler_cb_t ufd_handler); +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler); /** * Create a tracker for a user-managed file descriptor. * * A user-fd is appropriate for use when the application opens and manages file descriptors - * for purposes other than AMQP communication. Registering a user fd with the nexus server + * for purposes other than AMQP communication. Registering a user fd with the dispatch server * controls processing of the FD alongside the FDs used for messaging. * * @param fd The open file descriptor being managed by the application. * @param context User context passed back in the connection handler. * @return A pointer to the new user_fd. */ -nx_user_fd_t *nx_user_fd(int fd, void *context); +dx_user_fd_t *dx_user_fd(int fd, void *context); /** * Free the resources for a user-managed FD tracker. * - * @param ufd Structure pointer returned by nx_user_fd. + * @param ufd Structure pointer returned by dx_user_fd. */ -void nx_user_fd_free(nx_user_fd_t *ufd); +void dx_user_fd_free(dx_user_fd_t *ufd); /** * Activate a user-fd for read. * * Use this activation when the application has capacity to receive data from the user-fd. This will - * cause the callback set in nx_server_set_user_fd_handler to later be invoked when the + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the * file descriptor has data to read. * - * @param ufd Structure pointer returned by nx_user_fd. + * @param ufd Structure pointer returned by dx_user_fd. */ -void nx_user_fd_activate_read(nx_user_fd_t *ufd); +void dx_user_fd_activate_read(dx_user_fd_t *ufd); /** * Activate a user-fd for write. * * Use this activation when the application has data to write via the user-fd. This will - * cause the callback set in nx_server_set_user_fd_handler to later be invoked when the + * cause the callback set in dx_server_set_user_fd_handler to later be invoked when the * file descriptor is writable. * - * @param ufd Structure pointer returned by nx_user_fd. + * @param ufd Structure pointer returned by dx_user_fd. */ -void nx_user_fd_activate_write(nx_user_fd_t *ufd); +void dx_user_fd_activate_write(dx_user_fd_t *ufd); /** @@ -100,19 +100,19 @@ void nx_user_fd_activate_write(nx_user_fd_t *ufd); * but the file-descriptor is not readable and will block if not set to O_NONBLOCK). * Code accordingly. * - * @param ufd Structure pointer returned by nx_user_fd. + * @param ufd Structure pointer returned by dx_user_fd. * @return true iff the user file descriptor is readable. */ -bool nx_user_fd_is_readable(nx_user_fd_t *ufd); +bool dx_user_fd_is_readable(dx_user_fd_t *ufd); /** * Check writable status of a user-fd * - * @param ufd Structure pointer returned by nx_user_fd. + * @param ufd Structure pointer returned by dx_user_fd. * @return true iff the user file descriptor is writable. */ -bool nx_user_fd_is_writeable(nx_user_fd_t *ufd); +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd); /** * @} diff --git a/qpid/extras/nexus/router/CMakeLists.txt b/qpid/extras/dispatch/router/CMakeLists.txt index 6023b7fc79..efb424ee13 100644 --- a/qpid/extras/nexus/router/CMakeLists.txt +++ b/qpid/extras/dispatch/router/CMakeLists.txt @@ -24,8 +24,8 @@ set(router_SOURCES src/main.c ) -add_executable(nexus-router ${router_SOURCES}) -target_link_libraries(nexus-router qpid-nexus ${proton_lib}) +add_executable(dispatch-router ${router_SOURCES}) +target_link_libraries(dispatch-router qpid-dispatch ${proton_lib}) -install(TARGETS nexus-router RUNTIME DESTINATION bin) +install(TARGETS dispatch-router RUNTIME DESTINATION bin) diff --git a/qpid/extras/nexus/router/src/main.c b/qpid/extras/dispatch/router/src/main.c index 5db2697682..0cafa6a2ca 100644 --- a/qpid/extras/nexus/router/src/main.c +++ b/qpid/extras/dispatch/router/src/main.c @@ -19,12 +19,14 @@ #include <stdio.h> #include <proton/driver.h> -#include <qpid/nexus/server.h> -#include <qpid/nexus/container.h> -#include <qpid/nexus/timer.h> -#include <qpid/nexus/log.h> -#include <qpid/nexus/router.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/log.h> +#include <qpid/dispatch/router.h> #include <signal.h> +#include <sys/types.h> +#include <unistd.h> static int exit_with_sigint = 0; @@ -35,7 +37,7 @@ static void thread_start_handler(void* context, int thread_id) static void signal_handler(void* context, int signum) { - nx_server_pause(); + dx_server_pause(); switch (signum) { case SIGINT: @@ -44,7 +46,7 @@ static void signal_handler(void* context, int signum) case SIGQUIT: case SIGTERM: fflush(stdout); - nx_server_stop(); + dx_server_stop(); break; case SIGHUP: @@ -54,7 +56,7 @@ static void signal_handler(void* context, int signum) break; } - nx_server_resume(); + dx_server_resume(); } @@ -62,53 +64,53 @@ static void startup(void *context) { // TODO - Move this into a configuration framework - nx_server_pause(); + dx_server_pause(); - static nx_server_config_t server_config; + static dx_server_config_t server_config; server_config.host = "0.0.0.0"; server_config.port = "5672"; server_config.sasl_mechanisms = "ANONYMOUS"; server_config.ssl_enabled = 0; - nx_server_listen(&server_config, 0); + dx_server_listen(&server_config, 0); /* - static nx_server_config_t client_config; + static dx_server_config_t client_config; client_config.host = "0.0.0.0"; client_config.port = "10000"; client_config.sasl_mechanisms = "ANONYMOUS"; client_config.ssl_enabled = 0; - nx_server_connect(&client_config, 0); + dx_server_connect(&client_config, 0); */ - nx_server_resume(); + dx_server_resume(); } int main(int argc, char **argv) { - nx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR); + dx_log_set_mask(LOG_INFO | LOG_TRACE | LOG_ERROR); - nx_server_initialize(4); - nx_container_initialize(); + dx_server_initialize(4); + dx_container_initialize(); - nx_server_set_signal_handler(signal_handler, 0); - nx_server_set_start_handler(thread_start_handler, 0); + dx_server_set_signal_handler(signal_handler, 0); + dx_server_set_start_handler(thread_start_handler, 0); - nx_router_t *router = nx_router(0); + dx_router_t *router = dx_router(0); - nx_timer_t *startup_timer = nx_timer(startup, 0); - nx_timer_schedule(startup_timer, 0); + dx_timer_t *startup_timer = dx_timer(startup, 0); + dx_timer_schedule(startup_timer, 0); - nx_server_signal(SIGHUP); - nx_server_signal(SIGQUIT); - nx_server_signal(SIGTERM); - nx_server_signal(SIGINT); + dx_server_signal(SIGHUP); + dx_server_signal(SIGQUIT); + dx_server_signal(SIGTERM); + dx_server_signal(SIGINT); - nx_server_run(); - nx_router_free(router); - nx_server_finalize(); + dx_server_run(); + dx_router_free(router); + dx_server_finalize(); if (exit_with_sigint) { signal(SIGINT, SIG_DFL); diff --git a/qpid/extras/nexus/site/css/style.css b/qpid/extras/dispatch/site/css/style.css index b73c136d4a..b73c136d4a 100644 --- a/qpid/extras/nexus/site/css/style.css +++ b/qpid/extras/dispatch/site/css/style.css diff --git a/qpid/extras/dispatch/site/images/arch.dia b/qpid/extras/dispatch/site/images/arch.dia Binary files differnew file mode 100644 index 0000000000..99b3185447 --- /dev/null +++ b/qpid/extras/dispatch/site/images/arch.dia diff --git a/qpid/extras/dispatch/site/images/arch.png b/qpid/extras/dispatch/site/images/arch.png Binary files differnew file mode 100644 index 0000000000..a2b7f776b9 --- /dev/null +++ b/qpid/extras/dispatch/site/images/arch.png diff --git a/qpid/extras/nexus/site/includes/footer.include b/qpid/extras/dispatch/site/includes/footer.include index 35ff04b9f2..35ff04b9f2 100644 --- a/qpid/extras/nexus/site/includes/footer.include +++ b/qpid/extras/dispatch/site/includes/footer.include diff --git a/qpid/extras/nexus/site/includes/header.include b/qpid/extras/dispatch/site/includes/header.include index 244dfc4517..244dfc4517 100644 --- a/qpid/extras/nexus/site/includes/header.include +++ b/qpid/extras/dispatch/site/includes/header.include diff --git a/qpid/extras/nexus/site/includes/menu.include b/qpid/extras/dispatch/site/includes/menu.include index aa96000e94..7cbdbd139d 100644 --- a/qpid/extras/nexus/site/includes/menu.include +++ b/qpid/extras/dispatch/site/includes/menu.include @@ -1,8 +1,9 @@ <div class="menu_box"> <div class="menu_box_top"></div> <div class="menu_box_body"> - <h3>Apache Qpid</h3> + <h3>Apache Qpid Dispatch</h3> <ul> + <li><a href="index.html">Back to Qpid</a></li> <li><a href="index.html">Home</a></li> <li><a href="download.html">Download</a></li> <li><a href="getting_started.html">Getting Started</a></li> @@ -29,11 +30,7 @@ <ul> <li><a href="getting_involved.html">Getting Involved</a></li> <li><a href="source_repository.html">Source Repository</a></li> - <li><a href="mailing_lists.html">Mailing Lists</a></li> - <li><a href="https://cwiki.apache.org/qpid/">Wiki</a></li> <li><a href="https://issues.apache.org/jira/browse/qpid">Issue Reporting</a></li> - <li><a href="people.html">People</a></li> - <li><a href="acknowledgements.html">Acknowledgements</a></li> </ul> </div> <div class="menu_box_bottom"></div> diff --git a/qpid/extras/nexus/site/index.html b/qpid/extras/dispatch/site/index.html index 806965a9c1..d8f1759492 100755 --- a/qpid/extras/nexus/site/index.html +++ b/qpid/extras/dispatch/site/index.html @@ -22,7 +22,7 @@ <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> - <title>Apache Qpid Nexus™: A Platform for Building AMQP Infrastructure</title> + <title>Apache Qpid Dispatch™: A Platform for Building AMQP Infrastructure</title> <link href="css/style.css" rel="stylesheet" type="text/css"/> </head> @@ -32,7 +32,7 @@ <div class="header"> <div class="logo"> - <h1>Apache Qpid Nexus™</h1> + <h1>Apache Qpid Dispatch™</h1> <h2>A Platform for Building AMQP Infrastructure</h2> </div> </div> @@ -40,7 +40,7 @@ <!-- end header --> <!-- begin menu --> - <!--#include virtual="/includes/menu.include" --> + <!--#include virtual="includes/menu.include" --> <!-- end menu --> <!-- begin content --> @@ -49,36 +49,39 @@ <div class="main_text_area_body"> -<p>Qpid Nexus is a library to help developers build infrastructure -components for AMQP. Nexus is not a general-purpose Messaging API. +<p>Qpid Dispatch is a library to help developers build infrastructure +components for AMQP. Dispatch is not a general-purpose Messaging API. Rather, it is a foundation on which to build applications, services, and appliances that need direct access to the detailed constructs of AMQP.</p> <hr width="80%" /> <h2>Overview</h2> -<p>Nexus is an extension of the Engine and Driver interfaces of -<a href="http://qpid.apache.org/proton">Qpid Proton</a>. The following -features are provided:</p> +<p>Dispatch is an extension of the Engine and Driver interfaces of +<a href="http://qpid.apache.org/proton">Qpid Proton</a>. It neither +uses nor exposes the Messenger interface of Proton. Rather, it +provides a way for developers to use Proton's more detailed Engine +facility. The following features are provided:</p> <ul> + <li>An asynchronous, event-oriented application environment</li> <li>Safe multi-threaded use of Proton</li> <li>Operating System Signal handling</li> <li>Quiesce and Resume for the application's threads</li> <li>Timers</li> <li>Resilient outbound connections (retry/reconnect)</li> <li>Polling support for the application's non-AMQP file descriptors</li> - <li>An AMQP Node Container that allows the developer to create node types</li> - <li>Node instances can be statically or dynamically provisioned</li> + <li>An AMQP Node Container that allows the developer to create + custom node types</li> </ul> <p /> <hr width="80%" /> <h2>Architecture</h2> -<center><img src="images/gwarch.png" /></center> +<center><img src="images/arch.png" /></center> <ul> <li><b>Proton Engine and Driver</b> provide the underlying AMQP capability</li> - <li><a href="doxygen/server/modules.html">Nexus Server</a> + <li><a href="doxygen/server/modules.html">Dispatch Server</a> wraps Proton connections in a multi-threaded server environment</li> - <li><b>Nexus Container</b> provides management of AMQP nodes (links, termini, and deliveries)</li> - <li><b>Nexus Message</b> provides efficient message encode/decode, optimized for messaging intermediaries</li> + <li><b>Dispatch Container</b> provides management of AMQP nodes (links, termini, and deliveries)</li> + <li><b>Dispatch Message</b> provides efficient message encode/decode, optimized for messaging intermediaries</li> <li>The <b>Application</b> uses all of the above services to implement scalable and performant AMQP infrastructure</li> </ul> <hr width="80%" /> @@ -90,7 +93,7 @@ features are provided:</p> <!-- end content --> <!-- begin footer --> - <!--#include virtual="/includes/footer.include" --> + <!--#include virtual="includes/footer.include" --> <!-- end footer --> </div> diff --git a/qpid/extras/nexus/src/agent.c b/qpid/extras/dispatch/src/agent.c index d131cc87dc..a885042b45 100644 --- a/qpid/extras/nexus/src/agent.c +++ b/qpid/extras/dispatch/src/agent.c @@ -17,69 +17,69 @@ * under the License. */ -#include <qpid/nexus/agent.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/hash.h> -#include <qpid/nexus/container.h> -#include <qpid/nexus/message.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/timer.h> +#include <qpid/dispatch/agent.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> #include <string.h> -typedef struct nx_agent_t { +typedef struct dx_agent_t { hash_t *class_hash; - nx_message_list_t in_fifo; - nx_message_list_t out_fifo; + dx_message_list_t in_fifo; + dx_message_list_t out_fifo; sys_mutex_t *lock; - nx_timer_t *timer; -} nx_agent_t; + dx_timer_t *timer; +} dx_agent_t; -static nx_agent_t *agent = 0; +static dx_agent_t *agent = 0; -struct nx_agent_class_t { +struct dx_agent_class_t { char *fqname; void *context; - nx_agent_schema_cb_t schema_handler; - nx_agent_query_cb_t query_handler; // 0 iff class is an event. + dx_agent_schema_cb_t schema_handler; + dx_agent_query_cb_t query_handler; // 0 iff class is an event. }; -static void nx_agent_timer_handler(void *context) +static void dx_agent_timer_handler(void *context) { // TODO - Process the in_fifo here } -void nx_agent_initialize() +void dx_agent_initialize() { assert(!agent); - agent = NEW(nx_agent_t); + agent = NEW(dx_agent_t); agent->class_hash = hash(6, 10, 1); DEQ_INIT(agent->in_fifo); DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); - agent->timer = nx_timer(nx_agent_timer_handler, agent); + agent->timer = dx_timer(dx_agent_timer_handler, agent); } -void nx_agent_finalize(void) +void dx_agent_finalize(void) { sys_mutex_free(agent->lock); - nx_timer_free(agent->timer); + dx_timer_free(agent->timer); hash_free(agent->class_hash); free(agent); agent = 0; } -nx_agent_class_t *nx_agent_register_class(const char *fqname, +dx_agent_class_t *dx_agent_register_class(const char *fqname, void *context, - nx_agent_schema_cb_t schema_handler, - nx_agent_query_cb_t query_handler) + dx_agent_schema_cb_t schema_handler, + dx_agent_query_cb_t query_handler) { - nx_agent_class_t *cls = NEW(nx_agent_class_t); + dx_agent_class_t *cls = NEW(dx_agent_class_t); assert(cls); cls->fqname = (char*) malloc(strlen(fqname) + 1); strcpy(cls->fqname, fqname); @@ -87,64 +87,64 @@ nx_agent_class_t *nx_agent_register_class(const char *fqname, cls->schema_handler = schema_handler; cls->query_handler = query_handler; - nx_field_iterator_t *iter = nx_field_iterator_string(fqname, ITER_VIEW_ALL); + dx_field_iterator_t *iter = dx_field_iterator_string(fqname, ITER_VIEW_ALL); int result = hash_insert_const(agent->class_hash, iter, cls); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); assert(result >= 0); return cls; } -nx_agent_class_t *nx_agent_register_event(const char *fqname, +dx_agent_class_t *dx_agent_register_event(const char *fqname, void *context, - nx_agent_schema_cb_t schema_handler) + dx_agent_schema_cb_t schema_handler) { - return nx_agent_register_class(fqname, context, schema_handler, 0); + return dx_agent_register_class(fqname, context, schema_handler, 0); } -void nx_agent_value_string(const void *correlator, const char *key, const char *value) +void dx_agent_value_string(const void *correlator, const char *key, const char *value) { } -void nx_agent_value_uint(const void *correlator, const char *key, uint64_t value) +void dx_agent_value_uint(const void *correlator, const char *key, uint64_t value) { } -void nx_agent_value_null(const void *correlator, const char *key) +void dx_agent_value_null(const void *correlator, const char *key) { } -void nx_agent_value_boolean(const void *correlator, const char *key, bool value) +void dx_agent_value_boolean(const void *correlator, const char *key, bool value) { } -void nx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) +void dx_agent_value_binary(const void *correlator, const char *key, const uint8_t *value, size_t len) { } -void nx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) +void dx_agent_value_uuid(const void *correlator, const char *key, const uint8_t *value) { } -void nx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) +void dx_agent_value_timestamp(const void *correlator, const char *key, uint64_t value) { } -void nx_agent_value_complete(const void *correlator, bool more) +void dx_agent_value_complete(const void *correlator, bool more) { } -void *nx_agent_raise_event(nx_agent_class_t *event) +void *dx_agent_raise_event(dx_agent_class_t *event) { return 0; } diff --git a/qpid/extras/nexus/src/alloc.c b/qpid/extras/dispatch/src/alloc.c index ae710e53a5..2b3b953aad 100644 --- a/qpid/extras/nexus/src/alloc.c +++ b/qpid/extras/dispatch/src/alloc.c @@ -17,9 +17,9 @@ * under the License. */ -#include <qpid/nexus/alloc.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/log.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/log.h> #include <memory.h> #include <stdio.h> @@ -27,22 +27,22 @@ typedef struct item_t item_t; struct item_t { DEQ_LINKS(item_t); - nx_alloc_type_desc_t *desc; + dx_alloc_type_desc_t *desc; }; DEQ_DECLARE(item_t, item_list_t); -struct nx_alloc_pool_t { +struct dx_alloc_pool_t { item_list_t free_list; }; -nx_alloc_config_t nx_alloc_default_config_big = {16, 32, 0}; -nx_alloc_config_t nx_alloc_default_config_small = {64, 128, 0}; +dx_alloc_config_t dx_alloc_default_config_big = {16, 32, 0}; +dx_alloc_config_t dx_alloc_default_config_small = {64, 128, 0}; sys_mutex_t *init_lock; item_list_t type_list; -static void nx_alloc_init(nx_alloc_type_desc_t *desc) +static void dx_alloc_init(dx_alloc_type_desc_t *desc) { sys_mutex_lock(init_lock); @@ -50,21 +50,21 @@ static void nx_alloc_init(nx_alloc_type_desc_t *desc) if (desc->additional_size) desc->total_size += *desc->additional_size; - nx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", + dx_log("ALLOC", LOG_TRACE, "Initialized Allocator - type=%s type-size=%d total-size=%d", desc->type_name, desc->type_size, desc->total_size); if (!desc->global_pool) { if (desc->config == 0) desc->config = desc->total_size > 256 ? - &nx_alloc_default_config_big : &nx_alloc_default_config_small; + &dx_alloc_default_config_big : &dx_alloc_default_config_small; assert (desc->config->local_free_list_max >= desc->config->transfer_batch_size); - desc->global_pool = NEW(nx_alloc_pool_t); + desc->global_pool = NEW(dx_alloc_pool_t); DEQ_INIT(desc->global_pool->free_list); desc->lock = sys_mutex(); - desc->stats = NEW(nx_alloc_stats_t); - memset(desc->stats, 0, sizeof(nx_alloc_stats_t)); + desc->stats = NEW(dx_alloc_stats_t); + memset(desc->stats, 0, sizeof(dx_alloc_stats_t)); } item_t *type_item = NEW(item_t); @@ -76,7 +76,7 @@ static void nx_alloc_init(nx_alloc_type_desc_t *desc) } -void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool) +void *dx_alloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool) { int idx; @@ -84,18 +84,18 @@ void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool) // If the descriptor is not initialized, set it up now. // if (!desc->global_pool) - nx_alloc_init(desc); + dx_alloc_init(desc); // // If this is the thread's first pass through here, allocate the // thread-local pool for this type. // if (*tpool == 0) { - *tpool = NEW(nx_alloc_pool_t); + *tpool = NEW(dx_alloc_pool_t); DEQ_INIT((*tpool)->free_list); } - nx_alloc_pool_t *pool = *tpool; + dx_alloc_pool_t *pool = *tpool; // // Fast case: If there's an item on the local free list, take it off the @@ -151,7 +151,7 @@ void *nx_alloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool) } -void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p) +void dx_dealloc(dx_alloc_type_desc_t *desc, dx_alloc_pool_t **tpool, void *p) { item_t *item = ((item_t*) p) - 1; int idx; @@ -161,11 +161,11 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p) // thread-local pool for this type. // if (*tpool == 0) { - *tpool = NEW(nx_alloc_pool_t); + *tpool = NEW(dx_alloc_pool_t); DEQ_INIT((*tpool)->free_list); } - nx_alloc_pool_t *pool = *tpool; + dx_alloc_pool_t *pool = *tpool; DEQ_INSERT_TAIL(pool->free_list, item); @@ -202,7 +202,7 @@ void nx_dealloc(nx_alloc_type_desc_t *desc, nx_alloc_pool_t **tpool, void *p) } -void nx_alloc_initialize(void) +void dx_alloc_initialize(void) { init_lock = sys_mutex(); DEQ_INIT(type_list); diff --git a/qpid/extras/nexus/src/alloc_private.h b/qpid/extras/dispatch/src/alloc_private.h index 00a4380bff..fbb18ccd48 100644 --- a/qpid/extras/nexus/src/alloc_private.h +++ b/qpid/extras/dispatch/src/alloc_private.h @@ -1,5 +1,5 @@ -#ifndef __nexus_alloc_private_h__ -#define __nexus_alloc_private_h__ 1 +#ifndef __dispatch_alloc_private_h__ +#define __dispatch_alloc_private_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,8 +19,8 @@ * under the License. */ -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/alloc.h> -void nx_alloc_initialize(void); +void dx_alloc_initialize(void); #endif diff --git a/qpid/extras/nexus/src/auth.c b/qpid/extras/dispatch/src/auth.c index f33e907359..f0df58f6c2 100644 --- a/qpid/extras/nexus/src/auth.c +++ b/qpid/extras/dispatch/src/auth.c @@ -28,7 +28,7 @@ void auth_client_handler(pn_connector_t *cxtr) { pn_sasl_t *sasl = pn_connector_sasl(cxtr); pn_sasl_state_t state = pn_sasl_state(sasl); - nx_connection_t *ctx = (nx_connection_t*) pn_connector_context(cxtr); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); if (state == PN_SASL_CONF) { pn_sasl_mechanisms(sasl, "ANONYMOUS"); @@ -49,7 +49,7 @@ void auth_server_handler(pn_connector_t *cxtr) { pn_sasl_t *sasl = pn_connector_sasl(cxtr); pn_sasl_state_t state = pn_sasl_state(sasl); - nx_connection_t *ctx = (nx_connection_t*) pn_connector_context(cxtr); + dx_connection_t *ctx = (dx_connection_t*) pn_connector_context(cxtr); while (state == PN_SASL_CONF || state == PN_SASL_STEP) { if (state == PN_SASL_CONF) { diff --git a/qpid/extras/nexus/src/auth.h b/qpid/extras/dispatch/src/auth.h index c551c8ff76..c551c8ff76 100644 --- a/qpid/extras/nexus/src/auth.h +++ b/qpid/extras/dispatch/src/auth.h diff --git a/qpid/extras/nexus/src/buffer.c b/qpid/extras/dispatch/src/buffer.c index 3c091a4983..015711afd9 100644 --- a/qpid/extras/nexus/src/buffer.c +++ b/qpid/extras/dispatch/src/buffer.c @@ -17,27 +17,27 @@ * under the License. */ -#include <qpid/nexus/buffer.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/alloc.h> static size_t buffer_size = 512; static int size_locked = 0; -ALLOC_DECLARE(nx_buffer_t); -ALLOC_DEFINE_CONFIG(nx_buffer_t, sizeof(nx_buffer_t), &buffer_size, 0); +ALLOC_DECLARE(dx_buffer_t); +ALLOC_DEFINE_CONFIG(dx_buffer_t, sizeof(dx_buffer_t), &buffer_size, 0); -void nx_buffer_set_size(size_t size) +void dx_buffer_set_size(size_t size) { assert(!size_locked); buffer_size = size; } -nx_buffer_t *nx_allocate_buffer(void) +dx_buffer_t *dx_allocate_buffer(void) { size_locked = 1; - nx_buffer_t *buf = new_nx_buffer_t(); + dx_buffer_t *buf = new_dx_buffer_t(); DEQ_ITEM_INIT(buf); buf->size = 0; @@ -45,37 +45,37 @@ nx_buffer_t *nx_allocate_buffer(void) } -void nx_free_buffer(nx_buffer_t *buf) +void dx_free_buffer(dx_buffer_t *buf) { - free_nx_buffer_t(buf); + free_dx_buffer_t(buf); } -unsigned char *nx_buffer_base(nx_buffer_t *buf) +unsigned char *dx_buffer_base(dx_buffer_t *buf) { return (unsigned char*) &buf[1]; } -unsigned char *nx_buffer_cursor(nx_buffer_t *buf) +unsigned char *dx_buffer_cursor(dx_buffer_t *buf) { return ((unsigned char*) &buf[1]) + buf->size; } -size_t nx_buffer_capacity(nx_buffer_t *buf) +size_t dx_buffer_capacity(dx_buffer_t *buf) { return buffer_size - buf->size; } -size_t nx_buffer_size(nx_buffer_t *buf) +size_t dx_buffer_size(dx_buffer_t *buf) { return buf->size; } -void nx_buffer_insert(nx_buffer_t *buf, size_t len) +void dx_buffer_insert(dx_buffer_t *buf, size_t len) { buf->size += len; assert(buf->size <= buffer_size); diff --git a/qpid/extras/nexus/src/container.c b/qpid/extras/dispatch/src/container.c index fdf610db93..68e2afa3eb 100644 --- a/qpid/extras/nexus/src/container.c +++ b/qpid/extras/dispatch/src/container.c @@ -19,42 +19,42 @@ #include <stdio.h> #include <string.h> -#include <qpid/nexus/container.h> -#include <qpid/nexus/message.h> +#include <qpid/dispatch/container.h> +#include <qpid/dispatch/message.h> #include <proton/engine.h> #include <proton/message.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/hash.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/iterator.h> -#include <qpid/nexus/log.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> static char *module="CONTAINER"; -struct nx_node_t { - const nx_node_type_t *ntype; +struct dx_node_t { + const dx_node_type_t *ntype; char *name; void *context; - nx_dist_mode_t supported_dist; - nx_lifetime_policy_t life_policy; + dx_dist_mode_t supported_dist; + dx_lifetime_policy_t life_policy; }; -ALLOC_DECLARE(nx_node_t); -ALLOC_DEFINE(nx_node_t); -ALLOC_DEFINE(nx_link_item_t); +ALLOC_DECLARE(dx_node_t); +ALLOC_DEFINE(dx_node_t); +ALLOC_DEFINE(dx_link_item_t); -struct nx_link_t { +struct dx_link_t { pn_link_t *pn_link; void *context; - nx_node_t *node; + dx_node_t *node; }; -ALLOC_DECLARE(nx_link_t); -ALLOC_DEFINE(nx_link_t); +ALLOC_DECLARE(dx_link_t); +ALLOC_DEFINE(dx_link_t); typedef struct nxc_node_type_t { DEQ_LINKS(struct nxc_node_type_t); - const nx_node_type_t *ntype; + const dx_node_type_t *ntype; } nxc_node_type_t; DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); @@ -62,22 +62,22 @@ DEQ_DECLARE(nxc_node_type_t, nxc_node_type_list_t); static hash_t *node_type_map; static hash_t *node_map; static sys_mutex_t *lock; -static nx_node_t *default_node; +static dx_node_t *default_node; static nxc_node_type_list_t node_type_list; static void setup_outgoing_link(pn_link_t *pn_link) { sys_mutex_lock(lock); - nx_node_t *node; + dx_node_t *node; int result; const char *source = pn_terminus_get_address(pn_link_remote_source(pn_link)); - nx_field_iterator_t *iter; + dx_field_iterator_t *iter; // TODO - Extract the name from the structured source if (source) { - iter = nx_field_iterator_string(source, ITER_VIEW_NODE_ID); + iter = dx_field_iterator_string(source, ITER_VIEW_NODE_ID); result = hash_retrieve(node_map, iter, (void*) &node); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); } else result = -1; sys_mutex_unlock(lock); @@ -93,7 +93,7 @@ static void setup_outgoing_link(pn_link_t *pn_link) } } - nx_link_t *link = new_nx_link_t(); + dx_link_t *link = new_dx_link_t(); if (!link) { pn_link_close(pn_link); return; @@ -111,16 +111,16 @@ static void setup_outgoing_link(pn_link_t *pn_link) static void setup_incoming_link(pn_link_t *pn_link) { sys_mutex_lock(lock); - nx_node_t *node; + dx_node_t *node; int result; const char *target = pn_terminus_get_address(pn_link_remote_target(pn_link)); - nx_field_iterator_t *iter; + dx_field_iterator_t *iter; // TODO - Extract the name from the structured target if (target) { - iter = nx_field_iterator_string(target, ITER_VIEW_NODE_ID); + iter = dx_field_iterator_string(target, ITER_VIEW_NODE_ID); result = hash_retrieve(node_map, iter, (void*) &node); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); } else result = -1; sys_mutex_unlock(lock); @@ -136,7 +136,7 @@ static void setup_incoming_link(pn_link_t *pn_link) } } - nx_link_t *link = new_nx_link_t(); + dx_link_t *link = new_dx_link_t(); if (!link) { pn_link_close(pn_link); return; @@ -153,11 +153,11 @@ static void setup_incoming_link(pn_link_t *pn_link) static int do_writable(pn_link_t *pn_link) { - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); if (!link) return 0; - nx_node_t *node = link->node; + dx_node_t *node = link->node; if (!node) return 0; @@ -168,10 +168,10 @@ static int do_writable(pn_link_t *pn_link) static void process_receive(pn_delivery_t *delivery) { pn_link_t *pn_link = pn_delivery_link(delivery); - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); if (link) { - nx_node_t *node = link->node; + dx_node_t *node = link->node; if (node) { node->ntype->rx_handler(node->context, link, delivery); return; @@ -191,10 +191,10 @@ static void process_receive(pn_delivery_t *delivery) static void do_send(pn_delivery_t *delivery) { pn_link_t *pn_link = pn_delivery_link(delivery); - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); if (link) { - nx_node_t *node = link->node; + dx_node_t *node = link->node; if (node) { node->ntype->tx_handler(node->context, link, delivery); return; @@ -208,10 +208,10 @@ static void do_send(pn_delivery_t *delivery) static void do_updated(pn_delivery_t *delivery) { pn_link_t *pn_link = pn_delivery_link(delivery); - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); if (link) { - nx_node_t *node = link->node; + dx_node_t *node = link->node; if (node) node->ntype->disp_handler(node->context, link, delivery); } @@ -226,12 +226,12 @@ static int close_handler(void* unused, pn_connection_t *conn) // pn_link_t *pn_link = pn_link_head(conn, 0); while (pn_link) { - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); - nx_node_t *node = link->node; + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; if (node) node->ntype->link_detach_handler(node->context, link, 0); pn_link_close(pn_link); - free_nx_link_t(link); + free_dx_link_t(link); pn_link = pn_link_next(pn_link, 0); } @@ -321,8 +321,8 @@ static int process_handler(void* unused, pn_connection_t *conn) // teardown any terminating links pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED); while (pn_link) { - nx_link_t *link = (nx_link_t*) pn_link_get_context(pn_link); - nx_node_t *node = link->node; + dx_link_t *link = (dx_link_t*) pn_link_get_context(pn_link); + dx_node_t *node = link->node; if (node) node->ntype->link_detach_handler(node->context, link, 1); // TODO - get 'closed' from detach message pn_link_close(pn_link); @@ -348,9 +348,9 @@ static int process_handler(void* unused, pn_connection_t *conn) } -static void open_handler(nx_connection_t *conn, nx_direction_t dir) +static void open_handler(dx_connection_t *conn, dx_direction_t dir) { - const nx_node_type_t *nt; + const dx_node_type_t *nt; // // Note the locking structure in this function. Generally this would be unsafe, but since @@ -361,11 +361,11 @@ static void open_handler(nx_connection_t *conn, nx_direction_t dir) nxc_node_type_t *nt_item = DEQ_HEAD(node_type_list); sys_mutex_unlock(lock); - pn_connection_open(nx_connection_pn(conn)); + pn_connection_open(dx_connection_pn(conn)); while (nt_item) { nt = nt_item->ntype; - if (dir == NX_INCOMING) { + if (dir == DX_INCOMING) { if (nt->inbound_conn_open_handler) nt->inbound_conn_open_handler(nt->type_context, conn); } else { @@ -380,24 +380,24 @@ static void open_handler(nx_connection_t *conn, nx_direction_t dir) } -static int handler(void* context, nx_conn_event_t event, nx_connection_t *nx_conn) +static int handler(void* context, dx_conn_event_t event, dx_connection_t *dx_conn) { - pn_connection_t *conn = nx_connection_pn(nx_conn); + pn_connection_t *conn = dx_connection_pn(dx_conn); switch (event) { - case NX_CONN_EVENT_LISTENER_OPEN: open_handler(nx_conn, NX_INCOMING); break; - case NX_CONN_EVENT_CONNECTOR_OPEN: open_handler(nx_conn, NX_OUTGOING); break; - case NX_CONN_EVENT_CLOSE: return close_handler(context, conn); - case NX_CONN_EVENT_PROCESS: return process_handler(context, conn); + case DX_CONN_EVENT_LISTENER_OPEN: open_handler(dx_conn, DX_INCOMING); break; + case DX_CONN_EVENT_CONNECTOR_OPEN: open_handler(dx_conn, DX_OUTGOING); break; + case DX_CONN_EVENT_CLOSE: return close_handler(context, conn); + case DX_CONN_EVENT_PROCESS: return process_handler(context, conn); } return 0; } -void nx_container_initialize(void) +void dx_container_initialize(void) { - nx_log(module, LOG_TRACE, "Container Initializing"); + dx_log(module, LOG_TRACE, "Container Initializing"); node_type_map = hash(6, 4, 1); // 64 buckets, item batches of 4 node_map = hash(10, 32, 0); // 1K buckets, item batches of 32 @@ -405,19 +405,19 @@ void nx_container_initialize(void) default_node = 0; DEQ_INIT(node_type_list); - nx_server_set_conn_handler(handler); + dx_server_set_conn_handler(handler); } -void nx_container_finalize(void) +void dx_container_finalize(void) { } -int nx_container_register_node_type(const nx_node_type_t *nt) +int dx_container_register_node_type(const dx_node_type_t *nt) { int result; - nx_field_iterator_t *iter = nx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); + dx_field_iterator_t *iter = dx_field_iterator_string(nt->type_name, ITER_VIEW_ALL); nxc_node_type_t *nt_item = NEW(nxc_node_type_t); DEQ_ITEM_INIT(nt_item); nt_item->ntype = nt; @@ -427,40 +427,40 @@ int nx_container_register_node_type(const nx_node_type_t *nt) DEQ_INSERT_TAIL(node_type_list, nt_item); sys_mutex_unlock(lock); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); if (result < 0) return result; - nx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); + dx_log(module, LOG_TRACE, "Node Type Registered - %s", nt->type_name); return 0; } -void nx_container_set_default_node_type(const nx_node_type_t *nt, +void dx_container_set_default_node_type(const dx_node_type_t *nt, void *context, - nx_dist_mode_t supported_dist) + dx_dist_mode_t supported_dist) { if (default_node) - nx_container_destroy_node(default_node); + dx_container_destroy_node(default_node); if (nt) { - default_node = nx_container_create_node(nt, 0, context, supported_dist, NX_LIFE_PERMANENT); - nx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); + default_node = dx_container_create_node(nt, 0, context, supported_dist, DX_LIFE_PERMANENT); + dx_log(module, LOG_TRACE, "Node of type '%s' installed as default node", nt->type_name); } else { default_node = 0; - nx_log(module, LOG_TRACE, "Default node removed"); + dx_log(module, LOG_TRACE, "Default node removed"); } } -nx_node_t *nx_container_create_node(const nx_node_type_t *nt, +dx_node_t *dx_container_create_node(const dx_node_type_t *nt, const char *name, void *context, - nx_dist_mode_t supported_dist, - nx_lifetime_policy_t life_policy) + dx_dist_mode_t supported_dist, + dx_lifetime_policy_t life_policy) { int result; - nx_node_t *node = new_nx_node_t(); + dx_node_t *node = new_dx_node_t(); if (!node) return 0; @@ -471,13 +471,13 @@ nx_node_t *nx_container_create_node(const nx_node_type_t *nt, node->life_policy = life_policy; if (name) { - nx_field_iterator_t *iter = nx_field_iterator_string(name, ITER_VIEW_ALL); + dx_field_iterator_t *iter = dx_field_iterator_string(name, ITER_VIEW_ALL); sys_mutex_lock(lock); result = hash_insert(node_map, iter, node); sys_mutex_unlock(lock); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); if (result < 0) { - free_nx_node_t(node); + free_dx_node_t(node); return 0; } @@ -486,51 +486,51 @@ nx_node_t *nx_container_create_node(const nx_node_type_t *nt, } if (name) - nx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); + dx_log(module, LOG_TRACE, "Node of type '%s' created with name '%s'", nt->type_name, name); return node; } -void nx_container_destroy_node(nx_node_t *node) +void dx_container_destroy_node(dx_node_t *node) { if (node->name) { - nx_field_iterator_t *iter = nx_field_iterator_string(node->name, ITER_VIEW_ALL); + dx_field_iterator_t *iter = dx_field_iterator_string(node->name, ITER_VIEW_ALL); sys_mutex_lock(lock); hash_remove(node_map, iter); sys_mutex_unlock(lock); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); free(node->name); } - free_nx_node_t(node); + free_dx_node_t(node); } -void nx_container_node_set_context(nx_node_t *node, void *node_context) +void dx_container_node_set_context(dx_node_t *node, void *node_context) { node->context = node_context; } -nx_dist_mode_t nx_container_node_get_dist_modes(const nx_node_t *node) +dx_dist_mode_t dx_container_node_get_dist_modes(const dx_node_t *node) { return node->supported_dist; } -nx_lifetime_policy_t nx_container_node_get_life_policy(const nx_node_t *node) +dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node) { return node->life_policy; } -nx_link_t *nx_link(nx_node_t *node, nx_connection_t *conn, nx_direction_t dir, const char* name) +dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char* name) { - pn_session_t *sess = pn_session(nx_connection_pn(conn)); - nx_link_t *link = new_nx_link_t(); + pn_session_t *sess = pn_session(dx_connection_pn(conn)); + dx_link_t *link = new_dx_link_t(); - if (dir == NX_OUTGOING) + if (dir == DX_OUTGOING) link->pn_link = pn_sender(sess, name); else link->pn_link = pn_receiver(sess, name); @@ -545,49 +545,49 @@ nx_link_t *nx_link(nx_node_t *node, nx_connection_t *conn, nx_direction_t dir, c } -void nx_link_set_context(nx_link_t *link, void *context) +void dx_link_set_context(dx_link_t *link, void *context) { link->context = context; } -void *nx_link_get_context(nx_link_t *link) +void *dx_link_get_context(dx_link_t *link) { return link->context; } -pn_link_t *nx_link_pn(nx_link_t *link) +pn_link_t *dx_link_pn(dx_link_t *link) { return link->pn_link; } -pn_terminus_t *nx_link_source(nx_link_t *link) +pn_terminus_t *dx_link_source(dx_link_t *link) { return pn_link_source(link->pn_link); } -pn_terminus_t *nx_link_target(nx_link_t *link) +pn_terminus_t *dx_link_target(dx_link_t *link) { return pn_link_target(link->pn_link); } -pn_terminus_t *nx_link_remote_source(nx_link_t *link) +pn_terminus_t *dx_link_remote_source(dx_link_t *link) { return pn_link_remote_source(link->pn_link); } -pn_terminus_t *nx_link_remote_target(nx_link_t *link) +pn_terminus_t *dx_link_remote_target(dx_link_t *link) { return pn_link_remote_target(link->pn_link); } -void nx_link_activate(nx_link_t *link) +void dx_link_activate(dx_link_t *link) { if (!link || !link->pn_link) return; @@ -600,15 +600,15 @@ void nx_link_activate(nx_link_t *link) if (!conn) return; - nx_connection_t *ctx = pn_connection_get_context(conn); + dx_connection_t *ctx = pn_connection_get_context(conn); if (!ctx) return; - nx_server_activate(ctx); + dx_server_activate(ctx); } -void nx_link_close(nx_link_t *link) +void dx_link_close(dx_link_t *link) { pn_link_close(link->pn_link); } diff --git a/qpid/extras/nexus/src/hash.c b/qpid/extras/dispatch/src/hash.c index c5d882519d..c54d5d6fcf 100644 --- a/qpid/extras/nexus/src/hash.c +++ b/qpid/extras/dispatch/src/hash.c @@ -17,9 +17,9 @@ * under the License. */ -#include <qpid/nexus/hash.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> #include <stdio.h> #include <string.h> @@ -53,13 +53,13 @@ struct hash_t { // djb2 hash algorithm -static unsigned long hash_function(nx_field_iterator_t *iter) +static unsigned long hash_function(dx_field_iterator_t *iter) { unsigned long hash = 5381; int c; - while (!nx_field_iterator_end(iter)) { - c = (int) nx_field_iterator_octet(iter); + while (!dx_field_iterator_end(iter)) { + c = (int) dx_field_iterator_octet(iter); hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ } @@ -101,7 +101,7 @@ size_t hash_size(hash_t *h) } -static hash_item_t *hash_internal_insert(hash_t *h, nx_field_iterator_t *key, int *error) +static hash_item_t *hash_internal_insert(hash_t *h, dx_field_iterator_t *key, int *error) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); @@ -109,7 +109,7 @@ static hash_item_t *hash_internal_insert(hash_t *h, nx_field_iterator_t *key, in *error = 0; while (item) { - if (nx_field_iterator_equal(key, item->key)) + if (dx_field_iterator_equal(key, item->key)) break; item = item->next; } @@ -126,7 +126,7 @@ static hash_item_t *hash_internal_insert(hash_t *h, nx_field_iterator_t *key, in } DEQ_ITEM_INIT(item); - item->key = nx_field_iterator_copy(key); + item->key = dx_field_iterator_copy(key); DEQ_INSERT_TAIL(h->buckets[idx].items, item); h->size++; @@ -134,7 +134,7 @@ static hash_item_t *hash_internal_insert(hash_t *h, nx_field_iterator_t *key, in } -int hash_insert(hash_t *h, nx_field_iterator_t *key, void *val) +int hash_insert(hash_t *h, dx_field_iterator_t *key, void *val) { int error = 0; hash_item_t *item = hash_internal_insert(h, key, &error); @@ -145,7 +145,7 @@ int hash_insert(hash_t *h, nx_field_iterator_t *key, void *val) } -int hash_insert_const(hash_t *h, nx_field_iterator_t *key, const void *val) +int hash_insert_const(hash_t *h, dx_field_iterator_t *key, const void *val) { if (!h->is_const) return -3; @@ -159,13 +159,13 @@ int hash_insert_const(hash_t *h, nx_field_iterator_t *key, const void *val) } -static hash_item_t *hash_internal_retrieve(hash_t *h, nx_field_iterator_t *key) +static hash_item_t *hash_internal_retrieve(hash_t *h, dx_field_iterator_t *key) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); while (item) { - if (nx_field_iterator_equal(key, item->key)) + if (dx_field_iterator_equal(key, item->key)) break; item = item->next; } @@ -174,7 +174,7 @@ static hash_item_t *hash_internal_retrieve(hash_t *h, nx_field_iterator_t *key) } -int hash_retrieve(hash_t *h, nx_field_iterator_t *key, void **val) +int hash_retrieve(hash_t *h, dx_field_iterator_t *key, void **val) { hash_item_t *item = hash_internal_retrieve(h, key); if (item) { @@ -185,7 +185,7 @@ int hash_retrieve(hash_t *h, nx_field_iterator_t *key, void **val) } -int hash_retrieve_const(hash_t *h, nx_field_iterator_t *key, const void **val) +int hash_retrieve_const(hash_t *h, dx_field_iterator_t *key, const void **val) { if (!h->is_const) return -3; @@ -199,13 +199,13 @@ int hash_retrieve_const(hash_t *h, nx_field_iterator_t *key, const void **val) } -int hash_remove(hash_t *h, nx_field_iterator_t *key) +int hash_remove(hash_t *h, dx_field_iterator_t *key) { unsigned long idx = hash_function(key) & h->bucket_mask; hash_item_t *item = DEQ_HEAD(h->buckets[idx].items); while (item) { - if (nx_field_iterator_equal(key, item->key)) + if (dx_field_iterator_equal(key, item->key)) break; item = item->next; } diff --git a/qpid/extras/nexus/src/iovec.c b/qpid/extras/dispatch/src/iovec.c index 4543176bbe..6ff6874440 100644 --- a/qpid/extras/nexus/src/iovec.c +++ b/qpid/extras/dispatch/src/iovec.c @@ -17,33 +17,33 @@ * under the License. */ -#include <qpid/nexus/iovec.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/iovec.h> +#include <qpid/dispatch/alloc.h> #include <string.h> -#define NX_IOVEC_MAX 64 +#define DX_IOVEC_MAX 64 -struct nx_iovec_t { - struct iovec iov_array[NX_IOVEC_MAX]; +struct dx_iovec_t { + struct iovec iov_array[DX_IOVEC_MAX]; struct iovec *iov; int iov_count; }; -ALLOC_DECLARE(nx_iovec_t); -ALLOC_DEFINE(nx_iovec_t); +ALLOC_DECLARE(dx_iovec_t); +ALLOC_DEFINE(dx_iovec_t); -nx_iovec_t *nx_iovec(int vector_count) +dx_iovec_t *dx_iovec(int vector_count) { - nx_iovec_t *iov = new_nx_iovec_t(); + dx_iovec_t *iov = new_dx_iovec_t(); if (!iov) return 0; - memset(iov, 0, sizeof(nx_iovec_t)); + memset(iov, 0, sizeof(dx_iovec_t)); iov->iov_count = vector_count; - if (vector_count > NX_IOVEC_MAX) + if (vector_count > DX_IOVEC_MAX) iov->iov = (struct iovec*) malloc(sizeof(struct iovec) * vector_count); else iov->iov = &iov->iov_array[0]; @@ -52,7 +52,7 @@ nx_iovec_t *nx_iovec(int vector_count) } -void nx_iovec_free(nx_iovec_t *iov) +void dx_iovec_free(dx_iovec_t *iov) { if (!iov) return; @@ -60,11 +60,11 @@ void nx_iovec_free(nx_iovec_t *iov) if (iov->iov && iov->iov != &iov->iov_array[0]) free(iov->iov); - free_nx_iovec_t(iov); + free_dx_iovec_t(iov); } -struct iovec *nx_iovec_array(nx_iovec_t *iov) +struct iovec *dx_iovec_array(dx_iovec_t *iov) { if (!iov) return 0; @@ -72,7 +72,7 @@ struct iovec *nx_iovec_array(nx_iovec_t *iov) } -int nx_iovec_count(nx_iovec_t *iov) +int dx_iovec_count(dx_iovec_t *iov) { if (!iov) return 0; diff --git a/qpid/extras/nexus/src/iterator.c b/qpid/extras/dispatch/src/iterator.c index 668629d83a..6ab67f948d 100644 --- a/qpid/extras/nexus/src/iterator.c +++ b/qpid/extras/dispatch/src/iterator.c @@ -17,9 +17,9 @@ * under the License. */ -#include <qpid/nexus/iterator.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> #include "message_private.h" #include <stdio.h> #include <string.h> @@ -29,20 +29,20 @@ MODE_TO_END, MODE_TO_SLASH } parse_mode_t; -struct nx_field_iterator_t { - nx_buffer_t *start_buffer; +struct dx_field_iterator_t { + dx_buffer_t *start_buffer; unsigned char *start_cursor; int start_length; - nx_buffer_t *buffer; + dx_buffer_t *buffer; unsigned char *cursor; int length; - nx_iterator_view_t view; + dx_iterator_view_t view; parse_mode_t mode; }; -ALLOC_DECLARE(nx_field_iterator_t); -ALLOC_DEFINE(nx_field_iterator_t); +ALLOC_DECLARE(dx_field_iterator_t); +ALLOC_DEFINE(dx_field_iterator_t); typedef enum { @@ -56,7 +56,7 @@ STATE_AT_NODE_ID } state_t; -static void view_initialize(nx_field_iterator_t *iter) +static void view_initialize(dx_field_iterator_t *iter) { if (iter->view == ITER_VIEW_ALL) { iter->mode = MODE_TO_END; @@ -68,8 +68,8 @@ static void view_initialize(nx_field_iterator_t *iter) // state_t state = STATE_START; unsigned int octet; - while (!nx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { - octet = nx_field_iterator_octet(iter); + while (!dx_field_iterator_end(iter) && state != STATE_AT_NODE_ID) { + octet = dx_field_iterator_octet(iter); switch (state) { case STATE_START : if (octet == '/') @@ -139,8 +139,8 @@ static void view_initialize(nx_field_iterator_t *iter) if (iter->view == ITER_VIEW_NODE_SPECIFIC) { iter->mode = MODE_TO_END; - while (!nx_field_iterator_end(iter)) { - octet = nx_field_iterator_octet(iter); + while (!dx_field_iterator_end(iter)) { + octet = dx_field_iterator_octet(iter); if (octet == '/') break; } @@ -149,9 +149,9 @@ static void view_initialize(nx_field_iterator_t *iter) } -nx_field_iterator_t* nx_field_iterator_string(const char *text, nx_iterator_view_t view) +dx_field_iterator_t* dx_field_iterator_string(const char *text, dx_iterator_view_t view) { - nx_field_iterator_t *iter = new_nx_field_iterator_t(); + dx_field_iterator_t *iter = new_dx_field_iterator_t(); if (!iter) return 0; @@ -159,35 +159,35 @@ nx_field_iterator_t* nx_field_iterator_string(const char *text, nx_iterator_view iter->start_cursor = (unsigned char*) text; iter->start_length = strlen(text); - nx_field_iterator_reset(iter, view); + dx_field_iterator_reset(iter, view); return iter; } -nx_field_iterator_t *nx_field_iterator_buffer(nx_buffer_t *buffer, int offset, int length, nx_iterator_view_t view) +dx_field_iterator_t *dx_field_iterator_buffer(dx_buffer_t *buffer, int offset, int length, dx_iterator_view_t view) { - nx_field_iterator_t *iter = new_nx_field_iterator_t(); + dx_field_iterator_t *iter = new_dx_field_iterator_t(); if (!iter) return 0; iter->start_buffer = buffer; - iter->start_cursor = nx_buffer_base(buffer) + offset; + iter->start_cursor = dx_buffer_base(buffer) + offset; iter->start_length = length; - nx_field_iterator_reset(iter, view); + dx_field_iterator_reset(iter, view); return iter; } -void nx_field_iterator_free(nx_field_iterator_t *iter) +void dx_field_iterator_free(dx_field_iterator_t *iter) { - free_nx_field_iterator_t(iter); + free_dx_field_iterator_t(iter); } -void nx_field_iterator_reset(nx_field_iterator_t *iter, nx_iterator_view_t view) +void dx_field_iterator_reset(dx_field_iterator_t *iter, dx_iterator_view_t view) { iter->buffer = iter->start_buffer; iter->cursor = iter->start_cursor; @@ -198,7 +198,7 @@ void nx_field_iterator_reset(nx_field_iterator_t *iter, nx_iterator_view_t view } -unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter) +unsigned char dx_field_iterator_octet(dx_field_iterator_t *iter) { if (iter->length == 0) return (unsigned char) 0; @@ -210,11 +210,11 @@ unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter) if (iter->length > 0) { if (iter->buffer) { - if (iter->cursor - nx_buffer_base(iter->buffer) == nx_buffer_size(iter->buffer)) { + if (iter->cursor - dx_buffer_base(iter->buffer) == dx_buffer_size(iter->buffer)) { iter->buffer = iter->buffer->next; if (iter->buffer == 0) iter->length = 0; - iter->cursor = nx_buffer_base(iter->buffer); + iter->cursor = dx_buffer_base(iter->buffer); } } } @@ -226,41 +226,41 @@ unsigned char nx_field_iterator_octet(nx_field_iterator_t *iter) } -int nx_field_iterator_end(nx_field_iterator_t *iter) +int dx_field_iterator_end(dx_field_iterator_t *iter) { return iter->length == 0; } -int nx_field_iterator_equal(nx_field_iterator_t *iter, unsigned char *string) +int dx_field_iterator_equal(dx_field_iterator_t *iter, unsigned char *string) { - nx_field_iterator_reset(iter, iter->view); - while (!nx_field_iterator_end(iter) && *string) { - if (*string != nx_field_iterator_octet(iter)) + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter) && *string) { + if (*string != dx_field_iterator_octet(iter)) return 0; string++; } - return (nx_field_iterator_end(iter) && (*string == 0)); + return (dx_field_iterator_end(iter) && (*string == 0)); } -unsigned char *nx_field_iterator_copy(nx_field_iterator_t *iter) +unsigned char *dx_field_iterator_copy(dx_field_iterator_t *iter) { int length = 0; int idx = 0; unsigned char *copy; - nx_field_iterator_reset(iter, iter->view); - while (!nx_field_iterator_end(iter)) { - nx_field_iterator_octet(iter); + dx_field_iterator_reset(iter, iter->view); + while (!dx_field_iterator_end(iter)) { + dx_field_iterator_octet(iter); length++; } - nx_field_iterator_reset(iter, iter->view); + dx_field_iterator_reset(iter, iter->view); copy = (unsigned char*) malloc(length + 1); - while (!nx_field_iterator_end(iter)) - copy[idx++] = nx_field_iterator_octet(iter); + while (!dx_field_iterator_end(iter)) + copy[idx++] = dx_field_iterator_octet(iter); copy[idx] = '\0'; return copy; diff --git a/qpid/extras/nexus/src/log.c b/qpid/extras/dispatch/src/log.c index ca1af86915..d4ec534915 100644 --- a/qpid/extras/nexus/src/log.c +++ b/qpid/extras/dispatch/src/log.c @@ -17,7 +17,7 @@ * under the License. */ -#include <qpid/nexus/log.h> +#include <qpid/dispatch/log.h> #include <stdarg.h> #include <stdio.h> #include <string.h> @@ -35,7 +35,7 @@ static char *cls_prefix(int cls) return ""; } -void nx_log(const char *module, int cls, const char *fmt, ...) +void dx_log(const char *module, int cls, const char *fmt, ...) { if (!(cls & mask)) return; @@ -49,7 +49,7 @@ void nx_log(const char *module, int cls, const char *fmt, ...) fprintf(stderr, "%s (%s): %s\n", module, cls_prefix(cls), line); } -void nx_log_set_mask(int _mask) +void dx_log_set_mask(int _mask) { mask = _mask; } diff --git a/qpid/extras/nexus/src/message.c b/qpid/extras/dispatch/src/message.c index 95e959e745..f66e79010c 100644 --- a/qpid/extras/nexus/src/message.c +++ b/qpid/extras/dispatch/src/message.c @@ -17,22 +17,22 @@ * under the License. */ -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/threading.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> #include "message_private.h" #include <string.h> #include <stdio.h> -ALLOC_DEFINE_CONFIG(nx_message_t, sizeof(nx_message_pvt_t), 0, 0); -ALLOC_DEFINE(nx_message_content_t); +ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0); +ALLOC_DEFINE(dx_message_content_t); -static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume) +static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume) { unsigned char *local_cursor = *cursor; - nx_buffer_t *local_buffer = *buffer; + dx_buffer_t *local_buffer = *buffer; - int remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer)); + int remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); while (consume > 0) { if (consume < remaining) { local_cursor += consume; @@ -44,8 +44,8 @@ static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume) local_cursor = 0; break; } - local_cursor = nx_buffer_base(local_buffer); - remaining = nx_buffer_size(local_buffer) - (local_cursor - nx_buffer_base(local_buffer)); + local_cursor = dx_buffer_base(local_buffer); + remaining = dx_buffer_size(local_buffer) - (local_cursor - dx_buffer_base(local_buffer)); } } @@ -54,7 +54,7 @@ static void advance(unsigned char **cursor, nx_buffer_t **buffer, int consume) } -static unsigned char next_octet(unsigned char **cursor, nx_buffer_t **buffer) +static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer) { unsigned char result = **cursor; advance(cursor, buffer, 1); @@ -62,7 +62,7 @@ static unsigned char next_octet(unsigned char **cursor, nx_buffer_t **buffer) } -static int traverse_field(unsigned char **cursor, nx_buffer_t **buffer, nx_field_location_t *field) +static int traverse_field(unsigned char **cursor, dx_buffer_t **buffer, dx_field_location_t *field) { unsigned char tag = next_octet(cursor, buffer); if (!(*cursor)) return 0; @@ -96,7 +96,7 @@ static int traverse_field(unsigned char **cursor, nx_buffer_t **buffer, nx_field if (field) { field->buffer = *buffer; - field->offset = *cursor - nx_buffer_base(*buffer); + field->offset = *cursor - dx_buffer_base(*buffer); field->length = consume; field->parsed = 1; } @@ -106,7 +106,7 @@ static int traverse_field(unsigned char **cursor, nx_buffer_t **buffer, nx_field } -static int start_list(unsigned char **cursor, nx_buffer_t **buffer) +static int start_list(unsigned char **cursor, dx_buffer_t **buffer) { unsigned char tag = next_octet(cursor, buffer); if (!(*cursor)) return 0; @@ -163,20 +163,20 @@ static int start_list(unsigned char **cursor, nx_buffer_t **buffer) // Return 1 if the pattern matches and we've advanced the cursor/buffer // Return 1 if the pattern does not match // -static int nx_check_and_advance(nx_buffer_t **buffer, +static int dx_check_and_advance(dx_buffer_t **buffer, unsigned char **cursor, unsigned char *pattern, int pattern_length, unsigned char *expected_tags, - nx_field_location_t *location) + dx_field_location_t *location) { - nx_buffer_t *test_buffer = *buffer; + dx_buffer_t *test_buffer = *buffer; unsigned char *test_cursor = *cursor; if (!test_cursor) return 1; // no match - unsigned char *end_of_buffer = nx_buffer_base(test_buffer) + nx_buffer_size(test_buffer); + unsigned char *end_of_buffer = dx_buffer_base(test_buffer) + dx_buffer_size(test_buffer); int idx = 0; while (idx < pattern_length && *test_cursor == pattern[idx]) { @@ -186,8 +186,8 @@ static int nx_check_and_advance(nx_buffer_t **buffer, test_buffer = test_buffer->next; if (test_buffer == 0) return 1; // Pattern didn't match - test_cursor = nx_buffer_base(test_buffer); - end_of_buffer = test_cursor + nx_buffer_size(test_buffer); + test_cursor = dx_buffer_base(test_buffer); + end_of_buffer = test_cursor + dx_buffer_size(test_buffer); } } @@ -210,7 +210,7 @@ static int nx_check_and_advance(nx_buffer_t **buffer, // location->parsed = 1; location->buffer = test_buffer; - location->offset = test_cursor - nx_buffer_base(test_buffer); + location->offset = test_cursor - dx_buffer_base(test_buffer); location->length = 0; // @@ -251,23 +251,23 @@ static int nx_check_and_advance(nx_buffer_t **buffer, } -static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len) +static void dx_insert(dx_message_content_t *msg, const uint8_t *seq, size_t len) { - nx_buffer_t *buf = DEQ_TAIL(msg->buffers); + dx_buffer_t *buf = DEQ_TAIL(msg->buffers); while (len > 0) { - if (buf == 0 || nx_buffer_capacity(buf) == 0) { - buf = nx_allocate_buffer(); + if (buf == 0 || dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); if (buf == 0) return; DEQ_INSERT_TAIL(msg->buffers, buf); } - size_t to_copy = nx_buffer_capacity(buf); + size_t to_copy = dx_buffer_capacity(buf); if (to_copy > len) to_copy = len; - memcpy(nx_buffer_cursor(buf), seq, to_copy); - nx_buffer_insert(buf, to_copy); + memcpy(dx_buffer_cursor(buf), seq, to_copy); + dx_buffer_insert(buf, to_copy); len -= to_copy; seq += to_copy; msg->length += to_copy; @@ -275,24 +275,24 @@ static void nx_insert(nx_message_content_t *msg, const uint8_t *seq, size_t len) } -static void nx_insert_8(nx_message_content_t *msg, uint8_t value) +static void dx_insert_8(dx_message_content_t *msg, uint8_t value) { - nx_insert(msg, &value, 1); + dx_insert(msg, &value, 1); } -static void nx_insert_32(nx_message_content_t *msg, uint32_t value) +static void dx_insert_32(dx_message_content_t *msg, uint32_t value) { uint8_t buf[4]; buf[0] = (uint8_t) ((value & 0xFF000000) >> 24); buf[1] = (uint8_t) ((value & 0x00FF0000) >> 16); buf[2] = (uint8_t) ((value & 0x0000FF00) >> 8); buf[3] = (uint8_t) (value & 0x000000FF); - nx_insert(msg, buf, 4); + dx_insert(msg, buf, 4); } -static void nx_insert_64(nx_message_content_t *msg, uint64_t value) +static void dx_insert_64(dx_message_content_t *msg, uint64_t value) { uint8_t buf[8]; buf[0] = (uint8_t) ((value & 0xFF00000000000000L) >> 56); @@ -303,18 +303,18 @@ static void nx_insert_64(nx_message_content_t *msg, uint64_t value) buf[5] = (uint8_t) ((value & 0x0000000000FF0000L) >> 16); buf[6] = (uint8_t) ((value & 0x000000000000FF00L) >> 8); buf[7] = (uint8_t) (value & 0x00000000000000FFL); - nx_insert(msg, buf, 8); + dx_insert(msg, buf, 8); } -static void nx_overwrite(nx_buffer_t **buf, size_t *cursor, uint8_t value) +static void dx_overwrite(dx_buffer_t **buf, size_t *cursor, uint8_t value) { while (*buf) { - if (*cursor >= nx_buffer_size(*buf)) { + if (*cursor >= dx_buffer_size(*buf)) { *buf = (*buf)->next; *cursor = 0; } else { - nx_buffer_base(*buf)[*cursor] = value; + dx_buffer_base(*buf)[*cursor] = value; (*cursor)++; return; } @@ -322,69 +322,69 @@ static void nx_overwrite(nx_buffer_t **buf, size_t *cursor, uint8_t value) } -static void nx_overwrite_32(nx_field_location_t *field, uint32_t value) +static void dx_overwrite_32(dx_field_location_t *field, uint32_t value) { - nx_buffer_t *buf = field->buffer; + dx_buffer_t *buf = field->buffer; size_t cursor = field->offset; - nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); - nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); - nx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); - nx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0xFF000000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x00FF0000) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) ((value & 0x0000FF00) >> 24)); + dx_overwrite(&buf, &cursor, (uint8_t) (value & 0x000000FF)); } -static void nx_start_list_performative(nx_message_content_t *msg, uint8_t code) +static void dx_start_list_performative(dx_message_content_t *msg, uint8_t code) { // // Insert the short-form performative tag // - nx_insert(msg, (const uint8_t*) "\x00\x53", 2); - nx_insert_8(msg, code); + dx_insert(msg, (const uint8_t*) "\x00\x53", 2); + dx_insert_8(msg, code); // // Open the list with a list32 tag // - nx_insert_8(msg, 0xd0); + dx_insert_8(msg, 0xd0); // // Mark the current location to later overwrite the length // msg->compose_length.buffer = DEQ_TAIL(msg->buffers); - msg->compose_length.offset = nx_buffer_size(msg->compose_length.buffer); + msg->compose_length.offset = dx_buffer_size(msg->compose_length.buffer); msg->compose_length.length = 4; msg->compose_length.parsed = 1; - nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); // // Mark the current location to later overwrite the count // msg->compose_count.buffer = DEQ_TAIL(msg->buffers); - msg->compose_count.offset = nx_buffer_size(msg->compose_count.buffer); + msg->compose_count.offset = dx_buffer_size(msg->compose_count.buffer); msg->compose_count.length = 4; msg->compose_count.parsed = 1; - nx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); + dx_insert(msg, (const uint8_t*) "\x00\x00\x00\x00", 4); msg->length = 4; // Include the length of the count field msg->count = 0; } -static void nx_end_list(nx_message_content_t *msg) +static void dx_end_list(dx_message_content_t *msg) { - nx_overwrite_32(&msg->compose_length, msg->length); - nx_overwrite_32(&msg->compose_count, msg->count); + dx_overwrite_32(&msg->compose_length, msg->length); + dx_overwrite_32(&msg->compose_count, msg->count); } -static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_message_field_t field) +static dx_field_location_t *dx_message_field_location(dx_message_t *msg, dx_message_field_t field) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); switch (field) { - case NX_FIELD_TO: + case DX_FIELD_TO: while (1) { if (content->field_to.parsed) return &content->field_to; @@ -392,8 +392,8 @@ static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_mess if (content->section_message_properties.parsed == 0) break; - nx_buffer_t *buffer = content->section_message_properties.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + content->section_message_properties.offset; + dx_buffer_t *buffer = content->section_message_properties.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_message_properties.offset; int count = start_list(&cursor, &buffer); int result; @@ -410,7 +410,7 @@ static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_mess } break; - case NX_FIELD_BODY: + case DX_FIELD_BODY: while (1) { if (content->body.parsed) return &content->body; @@ -418,8 +418,8 @@ static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_mess if (content->section_body.parsed == 0) break; - nx_buffer_t *buffer = content->section_body.buffer; - unsigned char *cursor = nx_buffer_base(buffer) + content->section_body.offset; + dx_buffer_t *buffer = content->section_body.buffer; + unsigned char *cursor = dx_buffer_base(buffer) + content->section_body.offset; int result; result = traverse_field(&cursor, &buffer, &content->body); @@ -435,61 +435,61 @@ static nx_field_location_t *nx_message_field_location(nx_message_t *msg, nx_mess } -nx_message_t *nx_allocate_message() +dx_message_t *dx_allocate_message() { - nx_message_pvt_t *msg = (nx_message_pvt_t*) new_nx_message_t(); + dx_message_pvt_t *msg = (dx_message_pvt_t*) new_dx_message_t(); if (!msg) return 0; DEQ_ITEM_INIT(msg); - msg->content = new_nx_message_content_t(); + msg->content = new_dx_message_content_t(); msg->out_delivery = 0; if (msg->content == 0) { - free_nx_message_t((nx_message_t*) msg); + free_dx_message_t((dx_message_t*) msg); return 0; } - memset(msg->content, 0, sizeof(nx_message_content_t)); + memset(msg->content, 0, sizeof(dx_message_content_t)); msg->content->lock = sys_mutex(); msg->content->ref_count = 1; - return (nx_message_t*) msg; + return (dx_message_t*) msg; } -void nx_free_message(nx_message_t *in_msg) +void dx_free_message(dx_message_t *in_msg) { uint32_t rc; - nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; - nx_message_content_t *content = msg->content; + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; sys_mutex_lock(content->lock); rc = --content->ref_count; sys_mutex_unlock(content->lock); if (rc == 0) { - nx_buffer_t *buf = DEQ_HEAD(content->buffers); + dx_buffer_t *buf = DEQ_HEAD(content->buffers); while (buf) { DEQ_REMOVE_HEAD(content->buffers); - nx_free_buffer(buf); + dx_free_buffer(buf); buf = DEQ_HEAD(content->buffers); } sys_mutex_free(content->lock); - free_nx_message_content_t(content); + free_dx_message_content_t(content); } - free_nx_message_t((nx_message_t*) msg); + free_dx_message_t((dx_message_t*) msg); } -nx_message_t *nx_message_copy(nx_message_t *in_msg) +dx_message_t *dx_message_copy(dx_message_t *in_msg) { - nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; - nx_message_content_t *content = msg->content; - nx_message_pvt_t *copy = (nx_message_pvt_t*) new_nx_message_t(); + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_message_pvt_t *copy = (dx_message_pvt_t*) new_dx_message_t(); if (!copy) return 0; @@ -502,42 +502,42 @@ nx_message_t *nx_message_copy(nx_message_t *in_msg) content->ref_count++; sys_mutex_unlock(content->lock); - return (nx_message_t*) copy; + return (dx_message_t*) copy; } -void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery) +void dx_message_set_out_delivery(dx_message_t *msg, pn_delivery_t *delivery) { - ((nx_message_pvt_t*) msg)->out_delivery = delivery; + ((dx_message_pvt_t*) msg)->out_delivery = delivery; } -pn_delivery_t *nx_message_out_delivery(nx_message_t *msg) +pn_delivery_t *dx_message_out_delivery(dx_message_t *msg) { - return ((nx_message_pvt_t*) msg)->out_delivery; + return ((dx_message_pvt_t*) msg)->out_delivery; } -void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery) +void dx_message_set_in_delivery(dx_message_t *msg, pn_delivery_t *delivery) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); content->in_delivery = delivery; } -pn_delivery_t *nx_message_in_delivery(nx_message_t *msg) +pn_delivery_t *dx_message_in_delivery(dx_message_t *msg) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); return content->in_delivery; } -nx_message_t *nx_message_receive(pn_delivery_t *delivery) +dx_message_t *dx_message_receive(pn_delivery_t *delivery) { pn_link_t *link = pn_delivery_link(delivery); - nx_message_pvt_t *msg = (nx_message_pvt_t*) pn_delivery_get_context(delivery); + dx_message_pvt_t *msg = (dx_message_pvt_t*) pn_delivery_get_context(delivery); ssize_t rc; - nx_buffer_t *buf; + dx_buffer_t *buf; // // If there is no message associated with the delivery, this is the first time @@ -545,7 +545,7 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // link it and the delivery together. // if (!msg) { - msg = (nx_message_pvt_t*) nx_allocate_message(); + msg = (dx_message_pvt_t*) dx_allocate_message(); pn_delivery_set_context(delivery, (void*) msg); // @@ -564,7 +564,7 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // buf = DEQ_TAIL(msg->content->buffers); if (!buf) { - buf = nx_allocate_buffer(); + buf = dx_allocate_buffer(); DEQ_INSERT_TAIL(msg->content->buffers, buf); } @@ -572,7 +572,7 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // // Try to receive enough data to fill the remaining space in the tail buffer. // - rc = pn_link_recv(link, (char*) nx_buffer_cursor(buf), nx_buffer_capacity(buf)); + rc = pn_link_recv(link, (char*) dx_buffer_cursor(buf), dx_buffer_capacity(buf)); // // If we receive PN_EOS, we have come to the end of the message. @@ -583,11 +583,11 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // will only happen if the size of the message content is an exact multiple // of the buffer size. // - if (nx_buffer_size(buf) == 0) { + if (dx_buffer_size(buf) == 0) { DEQ_REMOVE_TAIL(msg->content->buffers); - nx_free_buffer(buf); + dx_free_buffer(buf); } - return (nx_message_t*) msg; + return (dx_message_t*) msg; } if (rc > 0) { @@ -595,14 +595,14 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) // We have received a positive number of bytes for the message. Advance // the cursor in the buffer. // - nx_buffer_insert(buf, rc); + dx_buffer_insert(buf, rc); // // If the buffer is full, allocate a new empty buffer and append it to the // tail of the message's list. // - if (nx_buffer_capacity(buf) == 0) { - buf = nx_allocate_buffer(); + if (dx_buffer_capacity(buf) == 0) { + buf = dx_allocate_buffer(); DEQ_INSERT_TAIL(msg->content->buffers, buf); } } else @@ -618,20 +618,20 @@ nx_message_t *nx_message_receive(pn_delivery_t *delivery) } -void nx_message_send(nx_message_t *in_msg, pn_link_t *link) +void dx_message_send(dx_message_t *in_msg, pn_link_t *link) { - nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; - nx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers); // TODO - Handle cases where annotations have been added or modified while (buf) { - pn_link_send(link, (char*) nx_buffer_base(buf), nx_buffer_size(buf)); + pn_link_send(link, (char*) dx_buffer_base(buf), dx_buffer_size(buf)); buf = DEQ_NEXT(buf); } } -int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth) +int dx_message_check(dx_message_t *in_msg, dx_message_depth_t depth) { #define LONG 10 @@ -656,114 +656,114 @@ int nx_message_check(nx_message_t *in_msg, nx_message_depth_t depth) #define TAGS_MAP (unsigned char*) "\xc1\xd1" #define TAGS_BINARY (unsigned char*) "\xa0\xb0" - nx_message_pvt_t *msg = (nx_message_pvt_t*) in_msg; - nx_message_content_t *content = msg->content; - nx_buffer_t *buffer = DEQ_HEAD(content->buffers); + dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg; + dx_message_content_t *content = msg->content; + dx_buffer_t *buffer = DEQ_HEAD(content->buffers); unsigned char *cursor; if (!buffer) return 0; // Invalid - No data in the message - if (depth == NX_DEPTH_NONE) + if (depth == DX_DEPTH_NONE) return 1; - cursor = nx_buffer_base(buffer); + cursor = dx_buffer_base(buffer); // // MESSAGE HEADER // - if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_LONG, LONG, TAGS_LIST, &content->section_message_header)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) + if (0 == dx_check_and_advance(&buffer, &cursor, MSG_HDR_SHORT, SHORT, TAGS_LIST, &content->section_message_header)) return 0; - if (depth == NX_DEPTH_HEADER) + if (depth == DX_DEPTH_HEADER) return 1; // // DELIVERY ANNOTATION // - if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_delivery_annotation)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) + if (0 == dx_check_and_advance(&buffer, &cursor, DELIVERY_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_delivery_annotation)) return 0; - if (depth == NX_DEPTH_DELIVERY_ANNOTATIONS) + if (depth == DX_DEPTH_DELIVERY_ANNOTATIONS) return 1; // // MESSAGE ANNOTATION // - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_LONG, LONG, TAGS_MAP, &content->section_message_annotation)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) + if (0 == dx_check_and_advance(&buffer, &cursor, MESSAGE_ANNOTATION_SHORT, SHORT, TAGS_MAP, &content->section_message_annotation)) return 0; - if (depth == NX_DEPTH_MESSAGE_ANNOTATIONS) + if (depth == DX_DEPTH_MESSAGE_ANNOTATIONS) return 1; // // PROPERTIES // - if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_LONG, LONG, TAGS_LIST, &content->section_message_properties)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) + if (0 == dx_check_and_advance(&buffer, &cursor, PROPERTIES_SHORT, SHORT, TAGS_LIST, &content->section_message_properties)) return 0; - if (depth == NX_DEPTH_PROPERTIES) + if (depth == DX_DEPTH_PROPERTIES) return 1; // // APPLICATION PROPERTIES // - if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_LONG, LONG, TAGS_MAP, &content->section_application_properties)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) + if (0 == dx_check_and_advance(&buffer, &cursor, APPLICATION_PROPERTIES_SHORT, SHORT, TAGS_MAP, &content->section_application_properties)) return 0; - if (depth == NX_DEPTH_APPLICATION_PROPERTIES) + if (depth == DX_DEPTH_APPLICATION_PROPERTIES) return 1; // // BODY (Note that this function expects a single data section or a single AMQP sequence) // - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_LONG, LONG, TAGS_BINARY, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_DATA_SHORT, SHORT, TAGS_BINARY, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_LONG, LONG, TAGS_LIST, &content->section_body)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) + if (0 == dx_check_and_advance(&buffer, &cursor, BODY_SEQUENCE_SHORT, SHORT, TAGS_LIST, &content->section_body)) return 0; - if (depth == NX_DEPTH_BODY) + if (depth == DX_DEPTH_BODY) return 1; // // FOOTER // - if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_LONG, LONG, TAGS_MAP, &content->section_footer)) return 0; - if (0 == nx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) + if (0 == dx_check_and_advance(&buffer, &cursor, FOOTER_SHORT, SHORT, TAGS_MAP, &content->section_footer)) return 0; return 1; } -nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field) +dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, dx_message_field_t field) { - nx_field_location_t *loc = nx_message_field_location(msg, field); + dx_field_location_t *loc = dx_message_field_location(msg, field); if (!loc) return 0; - return nx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); + return dx_field_iterator_buffer(loc->buffer, loc->offset, loc->length, ITER_VIEW_ALL); } -nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) +dx_iovec_t *dx_message_field_iovec(dx_message_t *msg, dx_message_field_t field) { - nx_field_location_t *loc = nx_message_field_location(msg, field); + dx_field_location_t *loc = dx_message_field_location(msg, field); if (!loc) return 0; @@ -771,8 +771,8 @@ nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) // Count the number of buffers this field straddles // int bufcnt = 1; - nx_buffer_t *buf = loc->buffer; - size_t bufsize = nx_buffer_size(buf) - loc->offset; + dx_buffer_t *buf = loc->buffer; + size_t bufsize = dx_buffer_size(buf) - loc->offset; ssize_t remaining = loc->length - bufsize; while (remaining > 0) { @@ -780,13 +780,13 @@ nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) buf = buf->next; if (!buf) return 0; - remaining -= nx_buffer_size(buf); + remaining -= dx_buffer_size(buf); } // // Allocate an iovec object big enough to hold the number of buffers // - nx_iovec_t *iov = nx_iovec(bufcnt); + dx_iovec_t *iov = dx_iovec(bufcnt); if (!iov) return 0; @@ -795,19 +795,19 @@ nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) // bufcnt = 0; buf = loc->buffer; - bufsize = nx_buffer_size(buf) - loc->offset; - void *base = nx_buffer_base(buf) + loc->offset; + bufsize = dx_buffer_size(buf) - loc->offset; + void *base = dx_buffer_base(buf) + loc->offset; remaining = loc->length; while (remaining > 0) { - nx_iovec_array(iov)[bufcnt].iov_base = base; - nx_iovec_array(iov)[bufcnt].iov_len = bufsize; + dx_iovec_array(iov)[bufcnt].iov_base = base; + dx_iovec_array(iov)[bufcnt].iov_len = bufsize; bufcnt++; remaining -= bufsize; if (remaining > 0) { buf = buf->next; - base = nx_buffer_base(buf); - bufsize = nx_buffer_size(buf); + base = dx_buffer_base(buf); + bufsize = dx_buffer_size(buf); if (bufsize > remaining) bufsize = remaining; } @@ -817,121 +817,121 @@ nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field) } -void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers) +void dx_message_compose_1(dx_message_t *msg, const char *to, dx_buffer_list_t *buffers) { - nx_message_begin_header(msg); - nx_message_insert_boolean(msg, 0); // durable - //nx_message_insert_null(msg); // priority - //nx_message_insert_null(msg); // ttl - //nx_message_insert_boolean(msg, 0); // first-acquirer - //nx_message_insert_uint(msg, 0); // delivery-count - nx_message_end_header(msg); - - nx_message_begin_message_properties(msg); - nx_message_insert_null(msg); // message-id - nx_message_insert_null(msg); // user-id - nx_message_insert_string(msg, to); // to - //nx_message_insert_null(msg); // subject - //nx_message_insert_null(msg); // reply-to - //nx_message_insert_null(msg); // correlation-id - //nx_message_insert_null(msg); // content-type - //nx_message_insert_null(msg); // content-encoding - //nx_message_insert_timestamp(msg, 0); // absolute-expiry-time - //nx_message_insert_timestamp(msg, 0); // creation-time - //nx_message_insert_null(msg); // group-id - //nx_message_insert_uint(msg, 0); // group-sequence - //nx_message_insert_null(msg); // reply-to-group-id - nx_message_end_message_properties(msg); + dx_message_begin_header(msg); + dx_message_insert_boolean(msg, 0); // durable + //dx_message_insert_null(msg); // priority + //dx_message_insert_null(msg); // ttl + //dx_message_insert_boolean(msg, 0); // first-acquirer + //dx_message_insert_uint(msg, 0); // delivery-count + dx_message_end_header(msg); + + dx_message_begin_message_properties(msg); + dx_message_insert_null(msg); // message-id + dx_message_insert_null(msg); // user-id + dx_message_insert_string(msg, to); // to + //dx_message_insert_null(msg); // subject + //dx_message_insert_null(msg); // reply-to + //dx_message_insert_null(msg); // correlation-id + //dx_message_insert_null(msg); // content-type + //dx_message_insert_null(msg); // content-encoding + //dx_message_insert_timestamp(msg, 0); // absolute-expiry-time + //dx_message_insert_timestamp(msg, 0); // creation-time + //dx_message_insert_null(msg); // group-id + //dx_message_insert_uint(msg, 0); // group-sequence + //dx_message_insert_null(msg); // reply-to-group-id + dx_message_end_message_properties(msg); if (buffers) - nx_message_append_body_data(msg, buffers); + dx_message_append_body_data(msg, buffers); } -void nx_message_begin_header(nx_message_t *msg) +void dx_message_begin_header(dx_message_t *msg) { - nx_start_list_performative(MSG_CONTENT(msg), 0x70); + dx_start_list_performative(MSG_CONTENT(msg), 0x70); } -void nx_message_end_header(nx_message_t *msg) +void dx_message_end_header(dx_message_t *msg) { - nx_end_list(MSG_CONTENT(msg)); + dx_end_list(MSG_CONTENT(msg)); } -void nx_message_begin_delivery_annotations(nx_message_t *msg) +void dx_message_begin_delivery_annotations(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_end_delivery_annotations(nx_message_t *msg) +void dx_message_end_delivery_annotations(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_begin_message_annotations(nx_message_t *msg) +void dx_message_begin_message_annotations(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_end_message_annotations(nx_message_t *msg) +void dx_message_end_message_annotations(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_begin_message_properties(nx_message_t *msg) +void dx_message_begin_message_properties(dx_message_t *msg) { - nx_start_list_performative(MSG_CONTENT(msg), 0x73); + dx_start_list_performative(MSG_CONTENT(msg), 0x73); } -void nx_message_end_message_properties(nx_message_t *msg) +void dx_message_end_message_properties(dx_message_t *msg) { - nx_end_list(MSG_CONTENT(msg)); + dx_end_list(MSG_CONTENT(msg)); } -void nx_message_begin_application_properties(nx_message_t *msg) +void dx_message_begin_application_properties(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_end_application_properties(nx_message_t *msg) +void dx_message_end_application_properties(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers) +void dx_message_append_body_data(dx_message_t *msg, dx_buffer_list_t *buffers) { - nx_message_content_t *content = MSG_CONTENT(msg); - nx_buffer_t *buf = DEQ_HEAD(*buffers); + dx_message_content_t *content = MSG_CONTENT(msg); + dx_buffer_t *buf = DEQ_HEAD(*buffers); uint32_t len = 0; // // Calculate the size of the body to be appended. // while (buf) { - len += nx_buffer_size(buf); + len += dx_buffer_size(buf); buf = DEQ_NEXT(buf); } // // Insert a DATA section performative header. // - nx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); + dx_insert(content, (const uint8_t*) "\x00\x53\x75", 3); if (len < 256) { - nx_insert_8(content, 0xa0); // vbin8 - nx_insert_8(content, (uint8_t) len); + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); } else { - nx_insert_8(content, 0xb0); // vbin32 - nx_insert_32(content, len); + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); } // @@ -946,174 +946,174 @@ void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers) } -void nx_message_begin_body_sequence(nx_message_t *msg) +void dx_message_begin_body_sequence(dx_message_t *msg) { } -void nx_message_end_body_sequence(nx_message_t *msg) +void dx_message_end_body_sequence(dx_message_t *msg) { } -void nx_message_begin_footer(nx_message_t *msg) +void dx_message_begin_footer(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_end_footer(nx_message_t *msg) +void dx_message_end_footer(dx_message_t *msg) { assert(0); // Not Implemented } -void nx_message_insert_null(nx_message_t *msg) +void dx_message_insert_null(dx_message_t *msg) { - nx_message_content_t *content = MSG_CONTENT(msg); - nx_insert_8(content, 0x40); + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x40); content->count++; } -void nx_message_insert_boolean(nx_message_t *msg, int value) +void dx_message_insert_boolean(dx_message_t *msg, int value) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); if (value) - nx_insert(content, (const uint8_t*) "\x56\x01", 2); + dx_insert(content, (const uint8_t*) "\x56\x01", 2); else - nx_insert(content, (const uint8_t*) "\x56\x00", 2); + dx_insert(content, (const uint8_t*) "\x56\x00", 2); content->count++; } -void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value) +void dx_message_insert_ubyte(dx_message_t *msg, uint8_t value) { - nx_message_content_t *content = MSG_CONTENT(msg); - nx_insert_8(content, 0x50); - nx_insert_8(content, value); + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x50); + dx_insert_8(content, value); content->count++; } -void nx_message_insert_uint(nx_message_t *msg, uint32_t value) +void dx_message_insert_uint(dx_message_t *msg, uint32_t value) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { - nx_insert_8(content, 0x43); // uint0 + dx_insert_8(content, 0x43); // uint0 } else if (value < 256) { - nx_insert_8(content, 0x52); // smalluint - nx_insert_8(content, (uint8_t) value); + dx_insert_8(content, 0x52); // smalluint + dx_insert_8(content, (uint8_t) value); } else { - nx_insert_8(content, 0x70); // uint - nx_insert_32(content, value); + dx_insert_8(content, 0x70); // uint + dx_insert_32(content, value); } content->count++; } -void nx_message_insert_ulong(nx_message_t *msg, uint64_t value) +void dx_message_insert_ulong(dx_message_t *msg, uint64_t value) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); if (value == 0) { - nx_insert_8(content, 0x44); // ulong0 + dx_insert_8(content, 0x44); // ulong0 } else if (value < 256) { - nx_insert_8(content, 0x53); // smallulong - nx_insert_8(content, (uint8_t) value); + dx_insert_8(content, 0x53); // smallulong + dx_insert_8(content, (uint8_t) value); } else { - nx_insert_8(content, 0x80); // ulong - nx_insert_64(content, value); + dx_insert_8(content, 0x80); // ulong + dx_insert_64(content, value); } content->count++; } -void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len) +void dx_message_insert_binary(dx_message_t *msg, const uint8_t *start, size_t len) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { - nx_insert_8(content, 0xa0); // vbin8 - nx_insert_8(content, (uint8_t) len); + dx_insert_8(content, 0xa0); // vbin8 + dx_insert_8(content, (uint8_t) len); } else { - nx_insert_8(content, 0xb0); // vbin32 - nx_insert_32(content, len); + dx_insert_8(content, 0xb0); // vbin32 + dx_insert_32(content, len); } - nx_insert(content, start, len); + dx_insert(content, start, len); content->count++; } -void nx_message_insert_string(nx_message_t *msg, const char *start) +void dx_message_insert_string(dx_message_t *msg, const char *start) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); uint32_t len = strlen(start); if (len < 256) { - nx_insert_8(content, 0xa1); // str8-utf8 - nx_insert_8(content, (uint8_t) len); - nx_insert(content, (const uint8_t*) start, len); + dx_insert_8(content, 0xa1); // str8-utf8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); } else { - nx_insert_8(content, 0xb1); // str32-utf8 - nx_insert_32(content, len); - nx_insert(content, (const uint8_t*) start, len); + dx_insert_8(content, 0xb1); // str32-utf8 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); } content->count++; } -void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value) +void dx_message_insert_uuid(dx_message_t *msg, const uint8_t *value) { - nx_message_content_t *content = MSG_CONTENT(msg); - nx_insert_8(content, 0x98); // uuid - nx_insert(content, value, 16); + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x98); // uuid + dx_insert(content, value, 16); content->count++; } -void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len) +void dx_message_insert_symbol(dx_message_t *msg, const char *start, size_t len) { - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_content_t *content = MSG_CONTENT(msg); if (len < 256) { - nx_insert_8(content, 0xa3); // sym8 - nx_insert_8(content, (uint8_t) len); - nx_insert(content, (const uint8_t*) start, len); + dx_insert_8(content, 0xa3); // sym8 + dx_insert_8(content, (uint8_t) len); + dx_insert(content, (const uint8_t*) start, len); } else { - nx_insert_8(content, 0xb3); // sym32 - nx_insert_32(content, len); - nx_insert(content, (const uint8_t*) start, len); + dx_insert_8(content, 0xb3); // sym32 + dx_insert_32(content, len); + dx_insert(content, (const uint8_t*) start, len); } content->count++; } -void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value) +void dx_message_insert_timestamp(dx_message_t *msg, uint64_t value) { - nx_message_content_t *content = MSG_CONTENT(msg); - nx_insert_8(content, 0x83); // timestamp - nx_insert_64(content, value); + dx_message_content_t *content = MSG_CONTENT(msg); + dx_insert_8(content, 0x83); // timestamp + dx_insert_64(content, value); content->count++; } -void nx_message_begin_list(nx_message_t* msg) +void dx_message_begin_list(dx_message_t* msg) { assert(0); // Not Implemented } -void nx_message_end_list(nx_message_t* msg) +void dx_message_end_list(dx_message_t* msg) { assert(0); // Not Implemented } -void nx_message_begin_map(nx_message_t* msg) +void dx_message_begin_map(dx_message_t* msg) { assert(0); // Not Implemented } -void nx_message_end_map(nx_message_t* msg) +void dx_message_end_map(dx_message_t* msg) { assert(0); // Not Implemented } diff --git a/qpid/extras/nexus/src/message_private.h b/qpid/extras/dispatch/src/message_private.h index 0e20332357..5fb18078f5 100644 --- a/qpid/extras/nexus/src/message_private.h +++ b/qpid/extras/dispatch/src/message_private.h @@ -19,22 +19,22 @@ * under the License. */ -#include <qpid/nexus/message.h> -#include <qpid/nexus/alloc.h> -#include <qpid/nexus/threading.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/threading.h> /** * Architecture of the message module: * * +--------------+ +----------------------+ * | | | | - * | nx_message_t |----------->| nx_message_content_t | + * | dx_message_t |----------->| dx_message_content_t | * | | +----->| | * +--------------+ | +----------------------+ * | | * +--------------+ | | +-------------+ +-------------+ +-------------+ - * | | | +--->| nx_buffer_t |-->| nx_buffer_t |-->| nx_buffer_t |--/ - * | nx_message_t |-----+ +-------------+ +-------------+ +-------------+ + * | | | +--->| dx_buffer_t |-->| dx_buffer_t |-->| dx_buffer_t |--/ + * | dx_message_t |-----+ +-------------+ +-------------+ +-------------+ * | | * +--------------+ * @@ -45,14 +45,14 @@ */ typedef struct { - nx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present + dx_buffer_t *buffer; // Buffer that contains the first octet of the field, null if the field is not present size_t offset; // Offset in the buffer to the first octet size_t length; // Length of the field or zero if unneeded int parsed; // non-zero iff the buffer chain has been parsed to find this field -} nx_field_location_t; +} dx_field_location_t; -// TODO - consider using pointers to nx_field_location_t below to save memory +// TODO - consider using pointers to dx_field_location_t below to save memory // TODO - we need a second buffer list for modified annotations and header // There are three message scenarios: // 1) Received message is held and forwarded unmodified - single buffer list @@ -62,33 +62,33 @@ typedef struct { typedef struct { sys_mutex_t *lock; uint32_t ref_count; // The number of qmessages referencing this - nx_buffer_list_t buffers; // The buffer chain containing the message + dx_buffer_list_t buffers; // The buffer chain containing the message pn_delivery_t *in_delivery; // The delivery on which the message arrived - nx_field_location_t section_message_header; // The message header list - nx_field_location_t section_delivery_annotation; // The delivery annotation map - nx_field_location_t section_message_annotation; // The message annotation map - nx_field_location_t section_message_properties; // The message properties list - nx_field_location_t section_application_properties; // The application properties list - nx_field_location_t section_body; // The message body: Data - nx_field_location_t section_footer; // The footer - nx_field_location_t field_user_id; // The string value of the user-id - nx_field_location_t field_to; // The string value of the to field - nx_field_location_t body; // The body of the message - nx_field_location_t compose_length; - nx_field_location_t compose_count; + dx_field_location_t section_message_header; // The message header list + dx_field_location_t section_delivery_annotation; // The delivery annotation map + dx_field_location_t section_message_annotation; // The message annotation map + dx_field_location_t section_message_properties; // The message properties list + dx_field_location_t section_application_properties; // The application properties list + dx_field_location_t section_body; // The message body: Data + dx_field_location_t section_footer; // The footer + dx_field_location_t field_user_id; // The string value of the user-id + dx_field_location_t field_to; // The string value of the to field + dx_field_location_t body; // The body of the message + dx_field_location_t compose_length; + dx_field_location_t compose_count; uint32_t length; uint32_t count; -} nx_message_content_t; +} dx_message_content_t; typedef struct { - DEQ_LINKS(nx_message_t); // Deq linkage that overlays the nx_message_t - nx_message_content_t *content; + DEQ_LINKS(dx_message_t); // Deq linkage that overlays the dx_message_t + dx_message_content_t *content; pn_delivery_t *out_delivery; -} nx_message_pvt_t; +} dx_message_pvt_t; -ALLOC_DECLARE(nx_message_t); -ALLOC_DECLARE(nx_message_content_t); +ALLOC_DECLARE(dx_message_t); +ALLOC_DECLARE(dx_message_content_t); -#define MSG_CONTENT(m) (((nx_message_pvt_t*) m)->content) +#define MSG_CONTENT(m) (((dx_message_pvt_t*) m)->content) #endif diff --git a/qpid/extras/nexus/src/posix/threading.c b/qpid/extras/dispatch/src/posix/threading.c index 6121151378..8edce86cdc 100644 --- a/qpid/extras/nexus/src/posix/threading.c +++ b/qpid/extras/dispatch/src/posix/threading.c @@ -17,8 +17,8 @@ * under the License. */ -#include <qpid/nexus/threading.h> -#include <qpid/nexus/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/ctools.h> #include <stdio.h> #include <pthread.h> diff --git a/qpid/extras/nexus/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 4a1d83b703..6ddc8f45dd 100644 --- a/qpid/extras/nexus/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -18,49 +18,49 @@ */ #include <stdio.h> -#include <qpid/nexus/server.h> -#include <qpid/nexus/message.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/timer.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/hash.h> -#include <qpid/nexus/iterator.h> -#include <qpid/nexus/log.h> -#include <qpid/nexus/router.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/message.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/hash.h> +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/log.h> +#include <qpid/dispatch/router.h> static char *module="ROUTER_NODE"; -struct nx_router_t { - nx_node_t *node; - nx_link_list_t in_links; - nx_link_list_t out_links; - nx_message_list_t in_fifo; +struct dx_router_t { + dx_node_t *node; + dx_link_list_t in_links; + dx_link_list_t out_links; + dx_message_list_t in_fifo; sys_mutex_t *lock; - nx_timer_t *timer; + dx_timer_t *timer; hash_t *out_hash; uint64_t dtag; }; typedef struct { - nx_link_t *link; - nx_message_list_t out_fifo; -} nx_router_link_t; + dx_link_t *link; + dx_message_list_t out_fifo; +} dx_router_link_t; -ALLOC_DECLARE(nx_router_link_t); -ALLOC_DEFINE(nx_router_link_t); +ALLOC_DECLARE(dx_router_link_t); +ALLOC_DEFINE(dx_router_link_t); /** * Outbound Delivery Handler */ -static void router_tx_handler(void* context, nx_link_t *link, pn_delivery_t *delivery) +static void router_tx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) { - nx_router_t *router = (nx_router_t*) context; + dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = pn_delivery_link(delivery); - nx_router_link_t *rlink = (nx_router_link_t*) nx_link_get_context(link); - nx_message_t *msg; + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_message_t *msg; size_t size; sys_mutex_lock(router->lock); @@ -75,18 +75,18 @@ static void router_tx_handler(void* context, nx_link_t *link, pn_delivery_t *del size = (DEQ_SIZE(rlink->out_fifo)); sys_mutex_unlock(router->lock); - nx_message_send(msg, pn_link); + dx_message_send(msg, pn_link); // // If there is no incoming delivery, it was pre-settled. In this case, // we must pre-settle the outgoing delivery as well. // - if (nx_message_in_delivery(msg)) { + if (dx_message_in_delivery(msg)) { pn_delivery_set_context(delivery, (void*) msg); - nx_message_set_out_delivery(msg, delivery); + dx_message_set_out_delivery(msg, delivery); } else { pn_delivery_settle(delivery); - nx_free_message(msg); + dx_free_message(msg); } pn_link_advance(pn_link); @@ -97,11 +97,11 @@ static void router_tx_handler(void* context, nx_link_t *link, pn_delivery_t *del /** * Inbound Delivery Handler */ -static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *delivery) +static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) { - nx_router_t *router = (nx_router_t*) context; + dx_router_t *router = (dx_router_t*) context; pn_link_t *pn_link = pn_delivery_link(delivery); - nx_message_t *msg; + dx_message_t *msg; int valid_message = 0; // @@ -109,7 +109,7 @@ static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *del // pointer is NULL, we have not yet received a complete message. // sys_mutex_lock(router->lock); - msg = nx_message_receive(delivery); + msg = dx_message_receive(delivery); sys_mutex_unlock(router->lock); if (!msg) @@ -118,29 +118,29 @@ static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *del // // Validate the message through the Properties section // - valid_message = nx_message_check(msg, NX_DEPTH_PROPERTIES); + valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); pn_link_advance(pn_link); pn_link_flow(pn_link, 1); if (valid_message) { - nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO); - nx_router_link_t *rlink; + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); + dx_router_link_t *rlink; if (iter) { - nx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); sys_mutex_lock(router->lock); int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); if (result == 0) { // // To field is valid and contains a known destination. Enqueue on // the output fifo for the next-hop-to-destination. // - pn_link_t* pn_outlink = nx_link_pn(rlink->link); + pn_link_t* pn_outlink = dx_link_pn(rlink->link); DEQ_INSERT_TAIL(rlink->out_fifo, msg); pn_link_offered(pn_outlink, DEQ_SIZE(rlink->out_fifo)); - nx_link_activate(rlink->link); + dx_link_activate(rlink->link); } else { // // To field contains an unknown address. Release the message. @@ -158,7 +158,7 @@ static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *del pn_delivery_update(delivery, PN_REJECTED); pn_delivery_settle(delivery); pn_delivery_set_context(delivery, 0); - nx_free_message(msg); + dx_free_message(msg); } } @@ -166,19 +166,19 @@ static void router_rx_handler(void* context, nx_link_t *link, pn_delivery_t *del /** * Delivery Disposition Handler */ -static void router_disp_handler(void* context, nx_link_t *link, pn_delivery_t *delivery) +static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) { pn_link_t *pn_link = pn_delivery_link(delivery); if (pn_link_is_sender(pn_link)) { pn_disposition_t disp = pn_delivery_remote_state(delivery); - nx_message_t *msg = pn_delivery_get_context(delivery); + dx_message_t *msg = pn_delivery_get_context(delivery); pn_delivery_t *activate = 0; if (msg) { - assert(delivery == nx_message_out_delivery(msg)); + assert(delivery == dx_message_out_delivery(msg)); if (disp != 0) { - activate = nx_message_in_delivery(msg); + activate = dx_message_in_delivery(msg); pn_delivery_update(activate, disp); // TODO - handling of the data accompanying RECEIVED/MODIFIED } @@ -188,10 +188,10 @@ static void router_disp_handler(void* context, nx_link_t *link, pn_delivery_t *d // Downstream delivery has been settled. Propagate the settlement // upstream. // - activate = nx_message_in_delivery(msg); + activate = dx_message_in_delivery(msg); pn_delivery_settle(activate); pn_delivery_settle(delivery); - nx_free_message(msg); + dx_free_message(msg); } if (activate) { @@ -199,8 +199,8 @@ static void router_disp_handler(void* context, nx_link_t *link, pn_delivery_t *d // Activate the upstream/incoming link so that the settlement will // get pushed out. // - nx_link_t *act_link = (nx_link_t*) pn_link_get_context(pn_delivery_link(activate)); - nx_link_activate(act_link); + dx_link_t *act_link = (dx_link_t*) pn_link_get_context(pn_delivery_link(activate)); + dx_link_activate(act_link); } return; @@ -214,11 +214,11 @@ static void router_disp_handler(void* context, nx_link_t *link, pn_delivery_t *d /** * New Incoming Link Handler */ -static int router_incoming_link_handler(void* context, nx_link_t *link) +static int router_incoming_link_handler(void* context, dx_link_t *link) { - nx_router_t *router = (nx_router_t*) context; - nx_link_item_t *item = new_nx_link_item_t(); - pn_link_t *pn_link = nx_link_pn(link); + dx_router_t *router = (dx_router_t*) context; + dx_link_item_t *item = new_dx_link_item_t(); + pn_link_t *pn_link = dx_link_pn(link); if (item) { DEQ_ITEM_INIT(item); @@ -242,32 +242,32 @@ static int router_incoming_link_handler(void* context, nx_link_t *link) /** * New Outgoing Link Handler */ -static int router_outgoing_link_handler(void* context, nx_link_t *link) +static int router_outgoing_link_handler(void* context, dx_link_t *link) { - nx_router_t *router = (nx_router_t*) context; - pn_link_t *pn_link = nx_link_pn(link); + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); sys_mutex_lock(router->lock); - nx_router_link_t *rlink = new_nx_router_link_t(); + dx_router_link_t *rlink = new_dx_router_link_t(); rlink->link = link; DEQ_INIT(rlink->out_fifo); - nx_link_set_context(link, rlink); + dx_link_set_context(link, rlink); - nx_field_iterator_t *iter = nx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); int result = hash_insert(router->out_hash, iter, rlink); - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); if (result == 0) { pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); pn_link_open(pn_link); sys_mutex_unlock(router->lock); - nx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); + dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } - nx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); + dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); pn_link_close(pn_link); sys_mutex_unlock(router->lock); return 0; @@ -277,13 +277,13 @@ static int router_outgoing_link_handler(void* context, nx_link_t *link) /** * Outgoing Link Writable Handler */ -static int router_writable_link_handler(void* context, nx_link_t *link) +static int router_writable_link_handler(void* context, dx_link_t *link) { - nx_router_t *router = (nx_router_t*) context; + dx_router_t *router = (dx_router_t*) context; int grant_delivery = 0; pn_delivery_t *delivery; - nx_router_link_t *rlink = (nx_router_link_t*) nx_link_get_context(link); - pn_link_t *pn_link = nx_link_pn(link); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + pn_link_t *pn_link = dx_link_pn(link); uint64_t tag; sys_mutex_lock(router->lock); @@ -309,28 +309,28 @@ static int router_writable_link_handler(void* context, nx_link_t *link) /** * Link Detached Handler */ -static int router_link_detach_handler(void* context, nx_link_t *link, int closed) +static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - nx_router_t *router = (nx_router_t*) context; - pn_link_t *pn_link = nx_link_pn(link); + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - nx_link_item_t *item; + dx_link_item_t *item; sys_mutex_lock(router->lock); if (pn_link_is_sender(pn_link)) { item = DEQ_HEAD(router->out_links); - nx_field_iterator_t *iter = nx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); - nx_router_link_t *rlink; + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_router_link_t *rlink; if (iter) { int result = hash_retrieve(router->out_hash, iter, (void*) &rlink); if (result == 0) { - nx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); + dx_field_iterator_reset(iter, ITER_VIEW_NO_HOST); hash_remove(router->out_hash, iter); - free_nx_router_link_t(rlink); - nx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + free_dx_router_link_t(rlink); + dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); } - nx_field_iterator_free(iter); + dx_field_iterator_free(iter); } } else @@ -342,7 +342,7 @@ static int router_link_detach_handler(void* context, nx_link_t *link, int closed DEQ_REMOVE(router->out_links, item); else DEQ_REMOVE(router->in_links, item); - free_nx_link_item_t(item); + free_dx_link_item_t(item); break; } item = item->next; @@ -353,28 +353,28 @@ static int router_link_detach_handler(void* context, nx_link_t *link, int closed } -static void router_inbound_open_handler(void *type_context, nx_connection_t *conn) +static void router_inbound_open_handler(void *type_context, dx_connection_t *conn) { } -static void router_outbound_open_handler(void *type_context, nx_connection_t *conn) +static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) { } -static void nx_router_timer_handler(void *context) +static void dx_router_timer_handler(void *context) { - nx_router_t *router = (nx_router_t*) context; + dx_router_t *router = (dx_router_t*) context; // // Periodic processing. // - nx_timer_schedule(router->timer, 1000); + dx_timer_schedule(router->timer, 1000); } -static nx_node_type_t router_node = {"router", 0, 0, +static dx_node_type_t router_node = {"router", 0, 0, router_rx_handler, router_tx_handler, router_disp_handler, @@ -389,15 +389,15 @@ static nx_node_type_t router_node = {"router", 0, 0, static int type_registered = 0; -nx_router_t *nx_router(nx_router_configuration_t *config) +dx_router_t *dx_router(dx_router_configuration_t *config) { if (!type_registered) { type_registered = 1; - nx_container_register_node_type(&router_node); + dx_container_register_node_type(&router_node); } - nx_router_t *router = NEW(nx_router_t); - nx_container_set_default_node_type(&router_node, (void*) router, NX_DIST_BOTH); + dx_router_t *router = NEW(dx_router_t); + dx_container_set_default_node_type(&router_node, (void*) router, DX_DIST_BOTH); DEQ_INIT(router->in_links); DEQ_INIT(router->out_links); @@ -405,8 +405,8 @@ nx_router_t *nx_router(nx_router_configuration_t *config) router->lock = sys_mutex(); - router->timer = nx_timer(nx_router_timer_handler, (void*) router); - nx_timer_schedule(router->timer, 0); // Immediate + router->timer = dx_timer(dx_router_timer_handler, (void*) router); + dx_timer_schedule(router->timer, 0); // Immediate router->out_hash = hash(10, 32, 0); router->dtag = 1; @@ -415,9 +415,9 @@ nx_router_t *nx_router(nx_router_configuration_t *config) } -void nx_router_free(nx_router_t *router) +void dx_router_free(dx_router_t *router) { - nx_container_set_default_node_type(0, 0, NX_DIST_BOTH); + dx_container_set_default_node_type(0, 0, DX_DIST_BOTH); sys_mutex_free(router->lock); free(router); } diff --git a/qpid/extras/nexus/src/server.c b/qpid/extras/dispatch/src/server.c index 16740b812f..0099393f60 100644 --- a/qpid/extras/nexus/src/server.c +++ b/qpid/extras/dispatch/src/server.c @@ -17,9 +17,9 @@ * under the License. */ -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/log.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> #include "server_private.h" #include "timer_private.h" #include "alloc_private.h" @@ -31,30 +31,30 @@ static char *module="SERVER"; -typedef struct nx_thread_t { +typedef struct dx_thread_t { int thread_id; volatile int running; volatile int canceled; int using_thread; sys_thread_t *thread; -} nx_thread_t; +} dx_thread_t; -typedef struct nx_server_t { +typedef struct dx_server_t { int thread_count; pn_driver_t *driver; - nx_thread_start_cb_t start_handler; - nx_conn_handler_cb_t conn_handler; - nx_signal_handler_cb_t signal_handler; - nx_user_fd_handler_cb_t ufd_handler; + dx_thread_start_cb_t start_handler; + dx_conn_handler_cb_t conn_handler; + dx_signal_handler_cb_t signal_handler; + dx_user_fd_handler_cb_t ufd_handler; void *start_context; void *conn_context; void *signal_context; sys_cond_t *cond; sys_mutex_t *lock; - nx_thread_t **threads; + dx_thread_t **threads; work_queue_t *work_queue; - nx_timer_list_t pending_timers; + dx_timer_list_t pending_timers; bool a_thread_is_waiting; int threads_active; int pause_requests; @@ -62,31 +62,31 @@ typedef struct nx_server_t { int pause_next_sequence; int pause_now_serving; int pending_signal; -} nx_server_t; +} dx_server_t; -ALLOC_DEFINE(nx_listener_t); -ALLOC_DEFINE(nx_connector_t); -ALLOC_DEFINE(nx_connection_t); -ALLOC_DEFINE(nx_user_fd_t); +ALLOC_DEFINE(dx_listener_t); +ALLOC_DEFINE(dx_connector_t); +ALLOC_DEFINE(dx_connection_t); +ALLOC_DEFINE(dx_user_fd_t); /** * Singleton Concurrent Proton Driver object */ -static nx_server_t *nx_server = 0; +static dx_server_t *dx_server = 0; static void signal_handler(int signum) { - nx_server->pending_signal = signum; - sys_cond_signal_all(nx_server->cond); + dx_server->pending_signal = signum; + sys_cond_signal_all(dx_server->cond); } -static nx_thread_t *thread(int id) +static dx_thread_t *thread(int id) { - nx_thread_t *thread = NEW(nx_thread_t); + dx_thread_t *thread = NEW(dx_thread_t); if (!thread) return 0; @@ -103,18 +103,18 @@ static void thread_process_listeners(pn_driver_t *driver) { pn_listener_t *listener = pn_driver_listener(driver); pn_connector_t *cxtr; - nx_connection_t *ctx; + dx_connection_t *ctx; while (listener) { - nx_log(module, LOG_TRACE, "Accepting Connection"); + dx_log(module, LOG_TRACE, "Accepting Connection"); cxtr = pn_listener_accept(listener); - ctx = new_nx_connection_t(); + ctx = new_dx_connection_t(); ctx->state = CONN_STATE_SASL_SERVER; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_cxtr = cxtr; ctx->pn_conn = 0; - ctx->listener = (nx_listener_t*) pn_listener_context(listener); + ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; ctx->ufd = 0; @@ -127,14 +127,14 @@ static void thread_process_listeners(pn_driver_t *driver) static void handle_signals_LH(void) { - int signum = nx_server->pending_signal; + int signum = dx_server->pending_signal; if (signum) { - nx_server->pending_signal = 0; - if (nx_server->signal_handler) { - sys_mutex_unlock(nx_server->lock); - nx_server->signal_handler(nx_server->signal_context, signum); - sys_mutex_lock(nx_server->lock); + dx_server->pending_signal = 0; + if (dx_server->signal_handler) { + sys_mutex_unlock(dx_server->lock); + dx_server->signal_handler(dx_server->signal_context, signum); + sys_mutex_lock(dx_server->lock); } } } @@ -142,24 +142,24 @@ static void handle_signals_LH(void) static void block_if_paused_LH(void) { - if (nx_server->pause_requests > 0) { - nx_server->threads_paused++; - sys_cond_signal_all(nx_server->cond); - while (nx_server->pause_requests > 0) - sys_cond_wait(nx_server->cond, nx_server->lock); - nx_server->threads_paused--; + if (dx_server->pause_requests > 0) { + dx_server->threads_paused++; + sys_cond_signal_all(dx_server->cond); + while (dx_server->pause_requests > 0) + sys_cond_wait(dx_server->cond, dx_server->lock); + dx_server->threads_paused--; } } static void process_connector(pn_connector_t *cxtr) { - nx_connection_t *ctx = pn_connector_context(cxtr); + dx_connection_t *ctx = pn_connector_context(cxtr); int events = 0; int auth_passes = 0; if (ctx->state == CONN_STATE_USER) { - nx_server->ufd_handler(ctx->ufd->context, ctx->ufd); + dx_server->ufd_handler(ctx->ufd->context, ctx->ufd); return; } @@ -209,36 +209,36 @@ static void process_connector(pn_connector_t *cxtr) ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); - pn_connection_set_container(conn, "nexus"); // TODO - make unique + pn_connection_set_container(conn, "dispatch"); // TODO - make unique pn_connector_set_connection(cxtr, conn); pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; - nx_conn_event_t ce = NX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy if (ctx->listener) { - ce = NX_CONN_EVENT_LISTENER_OPEN; + ce = DX_CONN_EVENT_LISTENER_OPEN; } else if (ctx->connector) { - ce = NX_CONN_EVENT_CONNECTOR_OPEN; + ce = DX_CONN_EVENT_CONNECTOR_OPEN; ctx->connector->delay = 0; } else assert(0); - nx_server->conn_handler(ctx->context, ce, (nx_connection_t*) pn_connector_context(cxtr)); + dx_server->conn_handler(ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); events = 1; break; case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { - nx_server->conn_handler(ctx->context, - NX_CONN_EVENT_CLOSE, - (nx_connection_t*) pn_connector_context(cxtr)); + dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_CLOSE, + (dx_connection_t*) pn_connector_context(cxtr)); events = 0; } else - events = nx_server->conn_handler(ctx->context, - NX_CONN_EVENT_PROCESS, - (nx_connection_t*) pn_connector_context(cxtr)); + events = dx_server->conn_handler(ctx->context, + DX_CONN_EVENT_PROCESS, + (dx_connection_t*) pn_connector_context(cxtr)); break; default: @@ -260,10 +260,10 @@ void pn_driver_wait_3(pn_driver_t *d); static void *thread_run(void *arg) { - nx_thread_t *thread = (nx_thread_t*) arg; + dx_thread_t *thread = (dx_thread_t*) arg; pn_connector_t *work; pn_connection_t *conn; - nx_connection_t *ctx; + dx_connection_t *ctx; int error; int poll_result; int timer_holdoff = 0; @@ -280,21 +280,21 @@ static void *thread_run(void *arg) // Invoke the start handler if the application supplied one. // This handler can be used to set NUMA or processor affinnity for the thread. // - if (nx_server->start_handler) - nx_server->start_handler(nx_server->start_context, thread->thread_id); + if (dx_server->start_handler) + dx_server->start_handler(dx_server->start_context, thread->thread_id); // // Main Loop // while (thread->running) { - sys_mutex_lock(nx_server->lock); + sys_mutex_lock(dx_server->lock); // // Check for pending signals to process // handle_signals_LH(); if (!thread->running) { - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); break; } @@ -303,58 +303,58 @@ static void *thread_run(void *arg) // block_if_paused_LH(); if (!thread->running) { - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); break; } // // Service pending timers. // - nx_timer_t *timer = DEQ_HEAD(nx_server->pending_timers); + dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers); if (timer) { - DEQ_REMOVE_HEAD(nx_server->pending_timers); + DEQ_REMOVE_HEAD(dx_server->pending_timers); // // Mark the timer as idle in case it reschedules itself. // - nx_timer_idle_LH(timer); + dx_timer_idle_LH(timer); // // Release the lock and invoke the connection handler. // - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); timer->handler(timer->context); - pn_driver_wakeup(nx_server->driver); + pn_driver_wakeup(dx_server->driver); continue; } // // Check the work queue for connectors scheduled for processing. // - work = work_queue_get(nx_server->work_queue); + work = work_queue_get(dx_server->work_queue); if (!work) { // // There is no pending work to do // - if (nx_server->a_thread_is_waiting) { + if (dx_server->a_thread_is_waiting) { // // Another thread is waiting on the proton driver, this thread must // wait on the condition variable until signaled. // - sys_cond_wait(nx_server->cond, nx_server->lock); + sys_cond_wait(dx_server->cond, dx_server->lock); } else { // // This thread elects itself to wait on the proton driver. Set the // thread-is-waiting flag so other idle threads will not interfere. // - nx_server->a_thread_is_waiting = true; + dx_server->a_thread_is_waiting = true; // // Ask the timer module when its next timer is scheduled to fire. We'll // use this value in driver_wait as the timeout. If there are no scheduled // timers, the returned value will be -1. // - long duration = nx_timer_next_duration_LH(); + long duration = dx_timer_next_duration_LH(); // // Invoke the proton driver's wait sequence. This is a bit of a hack for now @@ -362,25 +362,25 @@ static void *thread_run(void *arg) // the first and third of which need to be non-reentrant, and the second of which // must be reentrant (and blocks). // - pn_driver_wait_1(nx_server->driver); - sys_mutex_unlock(nx_server->lock); + pn_driver_wait_1(dx_server->driver); + sys_mutex_unlock(dx_server->lock); do { error = 0; - poll_result = pn_driver_wait_2(nx_server->driver, duration); + poll_result = pn_driver_wait_2(dx_server->driver, duration); if (poll_result == -1) - error = pn_driver_errno(nx_server->driver); + error = pn_driver_errno(dx_server->driver); } while (error == PN_INTR); if (error) { - nx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(nx_server->driver))); + dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver))); exit(-1); } - sys_mutex_lock(nx_server->lock); - pn_driver_wait_3(nx_server->driver); + sys_mutex_lock(dx_server->lock); + pn_driver_wait_3(dx_server->driver); if (!thread->running) { - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); break; } @@ -391,14 +391,14 @@ static void *thread_run(void *arg) struct timespec tv; clock_gettime(CLOCK_REALTIME, &tv); long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000; - nx_timer_visit_LH(milliseconds); + dx_timer_visit_LH(milliseconds); timer_holdoff = 0; } // // Process listeners (incoming connections). // - thread_process_listeners(nx_server->driver); + thread_process_listeners(dx_server->driver); // // Traverse the list of connectors-needing-service from the proton driver. @@ -406,21 +406,21 @@ static void *thread_run(void *arg) // being processed by another thread, put it in the work queue and signal the // condition variable. // - work = pn_driver_connector(nx_server->driver); + work = pn_driver_connector(dx_server->driver); while (work) { ctx = pn_connector_context(work); if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) { ctx->enqueued = 1; - work_queue_put(nx_server->work_queue, work); - sys_cond_signal(nx_server->cond); + work_queue_put(dx_server->work_queue, work); + sys_cond_signal(dx_server->cond); } - work = pn_driver_connector(nx_server->driver); + work = pn_driver_connector(dx_server->driver); } // // Release our exclusive claim on pn_driver_wait. // - nx_server->a_thread_is_waiting = false; + dx_server->a_thread_is_waiting = false; } } @@ -433,16 +433,16 @@ static void *thread_run(void *arg) if (ctx->owner_thread == CONTEXT_NO_OWNER) { ctx->owner_thread = thread->thread_id; ctx->enqueued = 0; - nx_server->threads_active++; + dx_server->threads_active++; } else { // // This connector is being processed by another thread, re-queue it. // - work_queue_put(nx_server->work_queue, work); + work_queue_put(dx_server->work_queue, work); work = 0; } } - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); // // Process the connector that we now have exclusive access to. @@ -461,30 +461,30 @@ static void *thread_run(void *arg) if (ctx->connector) { ctx->connector->ctx = 0; ctx->connector->state = CXTR_STATE_CONNECTING; - nx_timer_schedule(ctx->connector->timer, ctx->connector->delay); + dx_timer_schedule(ctx->connector->timer, ctx->connector->delay); } - sys_mutex_lock(nx_server->lock); - free_nx_connection_t(ctx); + sys_mutex_lock(dx_server->lock); + free_dx_connection_t(ctx); pn_connector_free(work); if (conn) pn_connection_free(conn); - nx_server->threads_active--; - sys_mutex_unlock(nx_server->lock); + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); } else { // // The connector lives on. Mark it as no longer owned by this thread. // - sys_mutex_lock(nx_server->lock); + sys_mutex_lock(dx_server->lock); ctx->owner_thread = CONTEXT_NO_OWNER; - nx_server->threads_active--; - sys_mutex_unlock(nx_server->lock); + dx_server->threads_active--; + sys_mutex_unlock(dx_server->lock); } // // Wake up the proton driver to force it to reconsider its set of FDs // in light of the processing that just occurred. // - pn_driver_wakeup(nx_server->driver); + pn_driver_wakeup(dx_server->driver); } } @@ -492,7 +492,7 @@ static void *thread_run(void *arg) } -static void thread_start(nx_thread_t *thread) +static void thread_start(dx_thread_t *thread) { if (!thread) return; @@ -502,7 +502,7 @@ static void thread_start(nx_thread_t *thread) } -static void thread_cancel(nx_thread_t *thread) +static void thread_cancel(dx_thread_t *thread) { if (!thread) return; @@ -512,7 +512,7 @@ static void thread_cancel(nx_thread_t *thread) } -static void thread_join(nx_thread_t *thread) +static void thread_join(dx_thread_t *thread) { if (!thread) return; @@ -522,7 +522,7 @@ static void thread_join(nx_thread_t *thread) } -static void thread_free(nx_thread_t *thread) +static void thread_free(dx_thread_t *thread) { if (!thread) return; @@ -533,11 +533,11 @@ static void thread_free(nx_thread_t *thread) static void cxtr_try_open(void *context) { - nx_connector_t *ct = (nx_connector_t*) context; + dx_connector_t *ct = (dx_connector_t*) context; if (ct->state != CXTR_STATE_CONNECTING) return; - nx_connection_t *ctx = new_nx_connection_t(); + dx_connection_t *ctx = new_dx_connection_t(); ctx->state = CONN_STATE_CONNECTING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; @@ -551,184 +551,184 @@ static void cxtr_try_open(void *context) // // pn_connector is not thread safe // - sys_mutex_lock(nx_server->lock); - ctx->pn_cxtr = pn_connector(nx_server->driver, ct->config->host, ct->config->port, (void*) ctx); - sys_mutex_unlock(nx_server->lock); + sys_mutex_lock(dx_server->lock); + ctx->pn_cxtr = pn_connector(dx_server->driver, ct->config->host, ct->config->port, (void*) ctx); + sys_mutex_unlock(dx_server->lock); ct->ctx = ctx; ct->delay = 5000; - nx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); + dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port); } -void nx_server_initialize(int thread_count) +void dx_server_initialize(int thread_count) { int i; - if (nx_server) + if (dx_server) return; // TODO - Fail in a more dramatic way - nx_alloc_initialize(); - nx_server = NEW(nx_server_t); + dx_alloc_initialize(); + dx_server = NEW(dx_server_t); - if (!nx_server) + if (!dx_server) return; // TODO - Fail in a more dramatic way - nx_server->thread_count = thread_count; - nx_server->driver = pn_driver(); - nx_server->start_handler = 0; - nx_server->conn_handler = 0; - nx_server->signal_handler = 0; - nx_server->ufd_handler = 0; - nx_server->start_context = 0; - nx_server->signal_context = 0; - nx_server->lock = sys_mutex(); - nx_server->cond = sys_cond(); + dx_server->thread_count = thread_count; + dx_server->driver = pn_driver(); + dx_server->start_handler = 0; + dx_server->conn_handler = 0; + dx_server->signal_handler = 0; + dx_server->ufd_handler = 0; + dx_server->start_context = 0; + dx_server->signal_context = 0; + dx_server->lock = sys_mutex(); + dx_server->cond = sys_cond(); - nx_timer_initialize(nx_server->lock); + dx_timer_initialize(dx_server->lock); - nx_server->threads = NEW_PTR_ARRAY(nx_thread_t, thread_count); + dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count); for (i = 0; i < thread_count; i++) - nx_server->threads[i] = thread(i); + dx_server->threads[i] = thread(i); - nx_server->work_queue = work_queue(); - DEQ_INIT(nx_server->pending_timers); - nx_server->a_thread_is_waiting = false; - nx_server->threads_active = 0; - nx_server->pause_requests = 0; - nx_server->threads_paused = 0; - nx_server->pause_next_sequence = 0; - nx_server->pause_now_serving = 0; - nx_server->pending_signal = 0; + dx_server->work_queue = work_queue(); + DEQ_INIT(dx_server->pending_timers); + dx_server->a_thread_is_waiting = false; + dx_server->threads_active = 0; + dx_server->pause_requests = 0; + dx_server->threads_paused = 0; + dx_server->pause_next_sequence = 0; + dx_server->pause_now_serving = 0; + dx_server->pending_signal = 0; } -void nx_server_finalize(void) +void dx_server_finalize(void) { int i; - if (!nx_server) + if (!dx_server) return; - for (i = 0; i < nx_server->thread_count; i++) - thread_free(nx_server->threads[i]); + for (i = 0; i < dx_server->thread_count; i++) + thread_free(dx_server->threads[i]); - work_queue_free(nx_server->work_queue); + work_queue_free(dx_server->work_queue); - pn_driver_free(nx_server->driver); - sys_mutex_free(nx_server->lock); - sys_cond_free(nx_server->cond); - free(nx_server); - nx_server = 0; + pn_driver_free(dx_server->driver); + sys_mutex_free(dx_server->lock); + sys_cond_free(dx_server->cond); + free(dx_server); + dx_server = 0; } -void nx_server_set_conn_handler(nx_conn_handler_cb_t handler) +void dx_server_set_conn_handler(dx_conn_handler_cb_t handler) { - nx_server->conn_handler = handler; + dx_server->conn_handler = handler; } -void nx_server_set_signal_handler(nx_signal_handler_cb_t handler, void *context) +void dx_server_set_signal_handler(dx_signal_handler_cb_t handler, void *context) { - nx_server->signal_handler = handler; - nx_server->signal_context = context; + dx_server->signal_handler = handler; + dx_server->signal_context = context; } -void nx_server_set_start_handler(nx_thread_start_cb_t handler, void *context) +void dx_server_set_start_handler(dx_thread_start_cb_t handler, void *context) { - nx_server->start_handler = handler; - nx_server->start_context = context; + dx_server->start_handler = handler; + dx_server->start_context = context; } -void nx_server_set_user_fd_handler(nx_user_fd_handler_cb_t ufd_handler) +void dx_server_set_user_fd_handler(dx_user_fd_handler_cb_t ufd_handler) { - nx_server->ufd_handler = ufd_handler; + dx_server->ufd_handler = ufd_handler; } -void nx_server_run(void) +void dx_server_run(void) { int i; - if (!nx_server) + if (!dx_server) return; - assert(nx_server->conn_handler); // Server can't run without a connection handler. + assert(dx_server->conn_handler); // Server can't run without a connection handler. - for (i = 1; i < nx_server->thread_count; i++) - thread_start(nx_server->threads[i]); + for (i = 1; i < dx_server->thread_count; i++) + thread_start(dx_server->threads[i]); - nx_log(module, LOG_INFO, "Operational, %d Threads Running", nx_server->thread_count); + dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count); - thread_run((void*) nx_server->threads[0]); + thread_run((void*) dx_server->threads[0]); - for (i = 1; i < nx_server->thread_count; i++) - thread_join(nx_server->threads[i]); + for (i = 1; i < dx_server->thread_count; i++) + thread_join(dx_server->threads[i]); - nx_log(module, LOG_INFO, "Shut Down"); + dx_log(module, LOG_INFO, "Shut Down"); } -void nx_server_stop(void) +void dx_server_stop(void) { int idx; - sys_mutex_lock(nx_server->lock); - for (idx = 0; idx < nx_server->thread_count; idx++) - thread_cancel(nx_server->threads[idx]); - sys_cond_signal_all(nx_server->cond); - pn_driver_wakeup(nx_server->driver); - sys_mutex_unlock(nx_server->lock); + sys_mutex_lock(dx_server->lock); + for (idx = 0; idx < dx_server->thread_count; idx++) + thread_cancel(dx_server->threads[idx]); + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); + sys_mutex_unlock(dx_server->lock); } -void nx_server_signal(int signum) +void dx_server_signal(int signum) { signal(signum, signal_handler); } -void nx_server_pause(void) +void dx_server_pause(void) { - sys_mutex_lock(nx_server->lock); + sys_mutex_lock(dx_server->lock); // // Bump the request count to stop all the threads. // - nx_server->pause_requests++; - int my_sequence = nx_server->pause_next_sequence++; + dx_server->pause_requests++; + int my_sequence = dx_server->pause_next_sequence++; // // Awaken all threads that are currently blocking. // - sys_cond_signal_all(nx_server->cond); - pn_driver_wakeup(nx_server->driver); + sys_cond_signal_all(dx_server->cond); + pn_driver_wakeup(dx_server->driver); // // Wait for the paused thread count plus the number of threads requesting a pause to equal // the total thread count. Also, don't exit the blocking loop until now_serving equals our // sequence number. This ensures that concurrent pausers don't run at the same time. // - while ((nx_server->threads_paused + nx_server->pause_requests < nx_server->thread_count) || - (my_sequence != nx_server->pause_now_serving)) - sys_cond_wait(nx_server->cond, nx_server->lock); + while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) || + (my_sequence != dx_server->pause_now_serving)) + sys_cond_wait(dx_server->cond, dx_server->lock); - sys_mutex_unlock(nx_server->lock); + sys_mutex_unlock(dx_server->lock); } -void nx_server_resume(void) +void dx_server_resume(void) { - sys_mutex_lock(nx_server->lock); - nx_server->pause_requests--; - nx_server->pause_now_serving++; - sys_cond_signal_all(nx_server->cond); - sys_mutex_unlock(nx_server->lock); + sys_mutex_lock(dx_server->lock); + dx_server->pause_requests--; + dx_server->pause_now_serving++; + sys_cond_signal_all(dx_server->cond); + sys_mutex_unlock(dx_server->lock); } -void nx_server_activate(nx_connection_t *ctx) +void dx_server_activate(dx_connection_t *ctx) { if (!ctx) return; @@ -742,63 +742,63 @@ void nx_server_activate(nx_connection_t *ctx) } -void nx_connection_set_context(nx_connection_t *conn, void *context) +void dx_connection_set_context(dx_connection_t *conn, void *context) { conn->user_context = context; } -void *nx_connection_get_context(nx_connection_t *conn) +void *dx_connection_get_context(dx_connection_t *conn) { return conn->user_context; } -pn_connection_t *nx_connection_pn(nx_connection_t *conn) +pn_connection_t *dx_connection_pn(dx_connection_t *conn) { return conn->pn_conn; } -nx_listener_t *nx_server_listen(const nx_server_config_t *config, void *context) +dx_listener_t *dx_server_listen(const dx_server_config_t *config, void *context) { - nx_listener_t *li = new_nx_listener_t(); + dx_listener_t *li = new_dx_listener_t(); if (!li) return 0; li->config = config; li->context = context; - li->pn_listener = pn_listener(nx_server->driver, config->host, config->port, (void*) li); + li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li); if (!li->pn_listener) { - nx_log(module, LOG_ERROR, "Driver Error %d (%s)", - pn_driver_errno(nx_server->driver), pn_driver_error(nx_server->driver)); - free_nx_listener_t(li); + dx_log(module, LOG_ERROR, "Driver Error %d (%s)", + pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver)); + free_dx_listener_t(li); return 0; } - nx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); + dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port); return li; } -void nx_server_listener_free(nx_listener_t* li) +void dx_server_listener_free(dx_listener_t* li) { pn_listener_free(li->pn_listener); - free_nx_listener_t(li); + free_dx_listener_t(li); } -void nx_server_listener_close(nx_listener_t* li) +void dx_server_listener_close(dx_listener_t* li) { pn_listener_close(li->pn_listener); } -nx_connector_t *nx_server_connect(const nx_server_config_t *config, void *context) +dx_connector_t *dx_server_connect(const dx_server_config_t *config, void *context) { - nx_connector_t *ct = new_nx_connector_t(); + dx_connector_t *ct = new_dx_connector_t(); if (!ct) return 0; @@ -807,15 +807,15 @@ nx_connector_t *nx_server_connect(const nx_server_config_t *config, void *contex ct->config = config; ct->context = context; ct->ctx = 0; - ct->timer = nx_timer(cxtr_try_open, (void*) ct); + ct->timer = dx_timer(cxtr_try_open, (void*) ct); ct->delay = 0; - nx_timer_schedule(ct->timer, ct->delay); + dx_timer_schedule(ct->timer, ct->delay); return ct; } -void nx_server_connector_free(nx_connector_t* ct) +void dx_server_connector_free(dx_connector_t* ct) { // Don't free the proton connector. This will be done by the connector // processing/cleanup. @@ -825,19 +825,19 @@ void nx_server_connector_free(nx_connector_t* ct) ct->ctx->connector = 0; } - nx_timer_free(ct->timer); - free_nx_connector_t(ct); + dx_timer_free(ct->timer); + free_dx_connector_t(ct); } -nx_user_fd_t *nx_user_fd(int fd, void *context) +dx_user_fd_t *dx_user_fd(int fd, void *context) { - nx_user_fd_t *ufd = new_nx_user_fd_t(); + dx_user_fd_t *ufd = new_dx_user_fd_t(); if (!ufd) return 0; - nx_connection_t *ctx = new_nx_connection_t(); + dx_connection_t *ctx = new_dx_connection_t(); ctx->state = CONN_STATE_USER; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; @@ -850,54 +850,54 @@ nx_user_fd_t *nx_user_fd(int fd, void *context) ufd->context = context; ufd->fd = fd; - ufd->pn_conn = pn_connector_fd(nx_server->driver, fd, (void*) ctx); - pn_driver_wakeup(nx_server->driver); + ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx); + pn_driver_wakeup(dx_server->driver); return ufd; } -void nx_user_fd_free(nx_user_fd_t *ufd) +void dx_user_fd_free(dx_user_fd_t *ufd) { pn_connector_close(ufd->pn_conn); - free_nx_user_fd_t(ufd); + free_dx_user_fd_t(ufd); } -void nx_user_fd_activate_read(nx_user_fd_t *ufd) +void dx_user_fd_activate_read(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE); - pn_driver_wakeup(nx_server->driver); + pn_driver_wakeup(dx_server->driver); } -void nx_user_fd_activate_write(nx_user_fd_t *ufd) +void dx_user_fd_activate_write(dx_user_fd_t *ufd) { pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE); - pn_driver_wakeup(nx_server->driver); + pn_driver_wakeup(dx_server->driver); } -bool nx_user_fd_is_readable(nx_user_fd_t *ufd) +bool dx_user_fd_is_readable(dx_user_fd_t *ufd) { return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE); } -bool nx_user_fd_is_writeable(nx_user_fd_t *ufd) +bool dx_user_fd_is_writeable(dx_user_fd_t *ufd) { return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE); } -void nx_server_timer_pending_LH(nx_timer_t *timer) +void dx_server_timer_pending_LH(dx_timer_t *timer) { - DEQ_INSERT_TAIL(nx_server->pending_timers, timer); + DEQ_INSERT_TAIL(dx_server->pending_timers, timer); } -void nx_server_timer_cancel_LH(nx_timer_t *timer) +void dx_server_timer_cancel_LH(dx_timer_t *timer) { - DEQ_REMOVE(nx_server->pending_timers, timer); + DEQ_REMOVE(dx_server->pending_timers, timer); } diff --git a/qpid/extras/nexus/src/server_private.h b/qpid/extras/dispatch/src/server_private.h index a7f0a18ef7..1722175e35 100644 --- a/qpid/extras/nexus/src/server_private.h +++ b/qpid/extras/dispatch/src/server_private.h @@ -19,14 +19,15 @@ * under the License. */ -#include <qpid/nexus/server.h> -#include <qpid/nexus/user_fd.h> -#include <qpid/nexus/timer.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/user_fd.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/alloc.h> #include <proton/driver.h> +#include <proton/driver_extras.h> -void nx_server_timer_pending_LH(nx_timer_t *timer); -void nx_server_timer_cancel_LH(nx_timer_t *timer); +void dx_server_timer_pending_LH(dx_timer_t *timer); +void dx_server_timer_cancel_LH(dx_timer_t *timer); typedef enum { @@ -48,48 +49,48 @@ typedef enum { } cxtr_state_t; -struct nx_listener_t { - const nx_server_config_t *config; +struct dx_listener_t { + const dx_server_config_t *config; void *context; pn_listener_t *pn_listener; }; -struct nx_connector_t { +struct dx_connector_t { cxtr_state_t state; - const nx_server_config_t *config; + const dx_server_config_t *config; void *context; - nx_connection_t *ctx; - nx_timer_t *timer; + dx_connection_t *ctx; + dx_timer_t *timer; long delay; }; -struct nx_connection_t { +struct dx_connection_t { conn_state_t state; int owner_thread; int enqueued; pn_connector_t *pn_cxtr; pn_connection_t *pn_conn; - nx_listener_t *listener; - nx_connector_t *connector; + dx_listener_t *listener; + dx_connector_t *connector; void *context; // Copy of context from listener or connector void *user_context; - nx_user_fd_t *ufd; + dx_user_fd_t *ufd; }; -struct nx_user_fd_t { +struct dx_user_fd_t { void *context; int fd; pn_connector_t *pn_conn; }; -ALLOC_DECLARE(nx_listener_t); -ALLOC_DECLARE(nx_connector_t); -ALLOC_DECLARE(nx_connection_t); -ALLOC_DECLARE(nx_user_fd_t); +ALLOC_DECLARE(dx_listener_t); +ALLOC_DECLARE(dx_connector_t); +ALLOC_DECLARE(dx_connection_t); +ALLOC_DECLARE(dx_user_fd_t); #endif diff --git a/qpid/extras/nexus/src/timer.c b/qpid/extras/dispatch/src/timer.c index f6d0071a77..b6b4864e26 100644 --- a/qpid/extras/nexus/src/timer.c +++ b/qpid/extras/dispatch/src/timer.c @@ -19,25 +19,25 @@ #include "timer_private.h" #include "server_private.h" -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/alloc.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/alloc.h> #include <assert.h> #include <stdio.h> static sys_mutex_t *lock; -static nx_timer_list_t idle_timers; -static nx_timer_list_t scheduled_timers; +static dx_timer_list_t idle_timers; +static dx_timer_list_t scheduled_timers; static long time_base; -ALLOC_DECLARE(nx_timer_t); -ALLOC_DEFINE(nx_timer_t); +ALLOC_DECLARE(dx_timer_t); +ALLOC_DEFINE(dx_timer_t); //========================================================================= // Private static functions //========================================================================= -static void nx_timer_cancel_LH(nx_timer_t *timer) +static void dx_timer_cancel_LH(dx_timer_t *timer) { switch (timer->state) { case TIMER_FREE: @@ -55,7 +55,7 @@ static void nx_timer_cancel_LH(nx_timer_t *timer) break; case TIMER_PENDING: - nx_server_timer_cancel_LH(timer); + dx_server_timer_cancel_LH(timer); break; } @@ -67,9 +67,9 @@ static void nx_timer_cancel_LH(nx_timer_t *timer) // Public Functions from timer.h //========================================================================= -nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context) +dx_timer_t *dx_timer(dx_timer_cb_t cb, void* context) { - nx_timer_t *timer = new_nx_timer_t(); + dx_timer_t *timer = new_dx_timer_t(); if (!timer) return 0; @@ -88,26 +88,26 @@ nx_timer_t *nx_timer(nx_timer_cb_t cb, void* context) } -void nx_timer_free(nx_timer_t *timer) +void dx_timer_free(dx_timer_t *timer) { sys_mutex_lock(lock); - nx_timer_cancel_LH(timer); + dx_timer_cancel_LH(timer); DEQ_REMOVE(idle_timers, timer); sys_mutex_unlock(lock); timer->state = TIMER_FREE; - free_nx_timer_t(timer); + free_dx_timer_t(timer); } -void nx_timer_schedule(nx_timer_t *timer, long duration) +void dx_timer_schedule(dx_timer_t *timer, long duration) { - nx_timer_t *ptr; - nx_timer_t *last; + dx_timer_t *ptr; + dx_timer_t *last; long total_time; sys_mutex_lock(lock); - nx_timer_cancel_LH(timer); // Timer is now on the idle list + dx_timer_cancel_LH(timer); // Timer is now on the idle list assert(timer->state == TIMER_IDLE); DEQ_REMOVE(idle_timers, timer); @@ -118,7 +118,7 @@ void nx_timer_schedule(nx_timer_t *timer, long duration) // if (duration == 0) { timer->state = TIMER_PENDING; - nx_server_timer_pending_LH(timer); + dx_server_timer_pending_LH(timer); sys_mutex_unlock(lock); return; } @@ -162,10 +162,10 @@ void nx_timer_schedule(nx_timer_t *timer, long duration) } -void nx_timer_cancel(nx_timer_t *timer) +void dx_timer_cancel(dx_timer_t *timer) { sys_mutex_lock(lock); - nx_timer_cancel_LH(timer); + dx_timer_cancel_LH(timer); sys_mutex_unlock(lock); } @@ -174,7 +174,7 @@ void nx_timer_cancel(nx_timer_t *timer) // Private Functions from timer_private.h //========================================================================= -void nx_timer_initialize(sys_mutex_t *server_lock) +void dx_timer_initialize(sys_mutex_t *server_lock) { lock = server_lock; DEQ_INIT(idle_timers); @@ -183,25 +183,25 @@ void nx_timer_initialize(sys_mutex_t *server_lock) } -void nx_timer_finalize(void) +void dx_timer_finalize(void) { lock = 0; } -long nx_timer_next_duration_LH(void) +long dx_timer_next_duration_LH(void) { - nx_timer_t *timer = DEQ_HEAD(scheduled_timers); + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); if (timer) return timer->delta_time; return -1; } -void nx_timer_visit_LH(long current_time) +void dx_timer_visit_LH(long current_time) { long delta; - nx_timer_t *timer = DEQ_HEAD(scheduled_timers); + dx_timer_t *timer = DEQ_HEAD(scheduled_timers); if (time_base == 0) { time_base = current_time; @@ -220,7 +220,7 @@ void nx_timer_visit_LH(long current_time) DEQ_REMOVE_HEAD(scheduled_timers); delta -= timer->delta_time; timer->state = TIMER_PENDING; - nx_server_timer_pending_LH(timer); + dx_server_timer_pending_LH(timer); } timer = DEQ_HEAD(scheduled_timers); @@ -228,7 +228,7 @@ void nx_timer_visit_LH(long current_time) } -void nx_timer_idle_LH(nx_timer_t *timer) +void dx_timer_idle_LH(dx_timer_t *timer) { timer->state = TIMER_IDLE; DEQ_INSERT_TAIL(idle_timers, timer); diff --git a/qpid/extras/nexus/src/timer_private.h b/qpid/extras/dispatch/src/timer_private.h index fa9891953f..618297b18e 100644 --- a/qpid/extras/nexus/src/timer_private.h +++ b/qpid/extras/dispatch/src/timer_private.h @@ -19,33 +19,33 @@ * under the License. */ -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/timer.h> -#include <qpid/nexus/threading.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/timer.h> +#include <qpid/dispatch/threading.h> typedef enum { TIMER_FREE, TIMER_IDLE, TIMER_SCHEDULED, TIMER_PENDING -} nx_timer_state_t; +} dx_timer_state_t; -struct nx_timer_t { - DEQ_LINKS(nx_timer_t); - nx_timer_cb_t handler; +struct dx_timer_t { + DEQ_LINKS(dx_timer_t); + dx_timer_cb_t handler; void *context; long delta_time; - nx_timer_state_t state; + dx_timer_state_t state; }; -DEQ_DECLARE(nx_timer_t, nx_timer_list_t); +DEQ_DECLARE(dx_timer_t, dx_timer_list_t); -void nx_timer_initialize(sys_mutex_t *server_lock); -void nx_timer_finalize(void); -long nx_timer_next_duration_LH(void); -void nx_timer_visit_LH(long current_time); -void nx_timer_idle_LH(nx_timer_t *timer); +void dx_timer_initialize(sys_mutex_t *server_lock); +void dx_timer_finalize(void); +long dx_timer_next_duration_LH(void); +void dx_timer_visit_LH(long current_time); +void dx_timer_idle_LH(dx_timer_t *timer); #endif diff --git a/qpid/extras/nexus/src/work_queue.c b/qpid/extras/dispatch/src/work_queue.c index b9555b3cb2..4b3c5d7fa5 100644 --- a/qpid/extras/nexus/src/work_queue.c +++ b/qpid/extras/dispatch/src/work_queue.c @@ -17,7 +17,7 @@ * under the License. */ -#include <qpid/nexus/ctools.h> +#include <qpid/dispatch/ctools.h> #include "work_queue.h" #include <string.h> #include <stdio.h> diff --git a/qpid/extras/nexus/src/work_queue.h b/qpid/extras/dispatch/src/work_queue.h index 597a484a9c..597a484a9c 100644 --- a/qpid/extras/nexus/src/work_queue.h +++ b/qpid/extras/dispatch/src/work_queue.h diff --git a/qpid/extras/nexus/tests/CMakeLists.txt b/qpid/extras/dispatch/tests/CMakeLists.txt index 383c3c9919..10bf1eb43a 100644 --- a/qpid/extras/nexus/tests/CMakeLists.txt +++ b/qpid/extras/dispatch/tests/CMakeLists.txt @@ -30,5 +30,5 @@ set(test_SOURCES ) add_executable(run_tests ${test_SOURCES}) -target_link_libraries(run_tests qpid-nexus) +target_link_libraries(run_tests qpid-dispatch) diff --git a/qpid/extras/nexus/tests/alloc_test.c b/qpid/extras/dispatch/tests/alloc_test.c index c8ea07e1cc..2406048209 100644 --- a/qpid/extras/nexus/tests/alloc_test.c +++ b/qpid/extras/dispatch/tests/alloc_test.c @@ -27,13 +27,13 @@ typedef struct { int B; } object_t; -nx_alloc_config_t config = {3, 7, 10}; +dx_alloc_config_t config = {3, 7, 10}; ALLOC_DECLARE(object_t); ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config); -static char* check_stats(nx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg) +static char* check_stats(dx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg) { if (stats->total_alloc_from_heap != ah) return "Incorrect alloc-from-heap"; if (stats->total_free_to_heap != fh) return "Incorrect free-to-heap"; @@ -48,7 +48,7 @@ static char* test_alloc_basic(void *context) { object_t *obj[50]; int idx; - nx_alloc_stats_t *stats; + dx_alloc_stats_t *stats; char *error; for (idx = 0; idx < 20; idx++) @@ -77,7 +77,7 @@ static char* test_alloc_basic(void *context) int alloc_tests(void) { int result = 0; - nx_alloc_initialize(); + dx_alloc_initialize(); TEST_CASE(test_alloc_basic, 0); diff --git a/qpid/extras/nexus/tests/message_test.c b/qpid/extras/dispatch/tests/message_test.c index 1d69d30bc0..590b7f6ed7 100644 --- a/qpid/extras/nexus/tests/message_test.c +++ b/qpid/extras/dispatch/tests/message_test.c @@ -21,28 +21,28 @@ #include <stdio.h> #include <string.h> #include "message_private.h" -#include <qpid/nexus/iterator.h> +#include <qpid/dispatch/iterator.h> #include <proton/message.h> static char* test_send_to_messenger(void *context) { - nx_message_t *msg = nx_allocate_message(); - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); - nx_message_compose_1(msg, "test_addr_0", 0); - nx_buffer_t *buf = DEQ_HEAD(content->buffers); + dx_message_compose_1(msg, "test_addr_0", 0); + dx_buffer_t *buf = DEQ_HEAD(content->buffers); if (buf == 0) return "Expected a buffer in the test message"; pn_message_t *pn_msg = pn_message(); - int result = pn_message_decode(pn_msg, (const char*) nx_buffer_base(buf), nx_buffer_size(buf)); + int result = pn_message_decode(pn_msg, (const char*) dx_buffer_base(buf), dx_buffer_size(buf)); if (result != 0) return "Error in pn_message_decode"; if (strcmp(pn_message_get_address(pn_msg), "test_addr_0") != 0) return "Address mismatch in received message"; pn_message_free(pn_msg); - nx_free_message(msg); + dx_free_message(msg); return 0; } @@ -53,27 +53,27 @@ static char* test_receive_from_messenger(void *context) pn_message_t *pn_msg = pn_message(); pn_message_set_address(pn_msg, "test_addr_1"); - nx_buffer_t *buf = nx_allocate_buffer(); - size_t size = nx_buffer_capacity(buf); - int result = pn_message_encode(pn_msg, (char*) nx_buffer_cursor(buf), &size); + dx_buffer_t *buf = dx_allocate_buffer(); + size_t size = dx_buffer_capacity(buf); + int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size); if (result != 0) return "Error in pn_message_encode"; - nx_buffer_insert(buf, size); + dx_buffer_insert(buf, size); - nx_message_t *msg = nx_allocate_message(); - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); DEQ_INSERT_TAIL(content->buffers, buf); - int valid = nx_message_check(msg, NX_DEPTH_ALL); - if (!valid) return "nx_message_check returns 'invalid'"; + int valid = dx_message_check(msg, DX_DEPTH_ALL); + if (!valid) return "dx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO); + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); if (iter == 0) return "Expected an iterator for the 'to' field"; - if (!nx_field_iterator_equal(iter, (unsigned char*) "test_addr_1")) + if (!dx_field_iterator_equal(iter, (unsigned char*) "test_addr_1")) return "Mismatched 'to' field contents"; pn_message_free(pn_msg); - nx_free_message(msg); + dx_free_message(msg); return 0; } @@ -84,23 +84,23 @@ static char* test_insufficient_check_depth(void *context) pn_message_t *pn_msg = pn_message(); pn_message_set_address(pn_msg, "test_addr_2"); - nx_buffer_t *buf = nx_allocate_buffer(); - size_t size = nx_buffer_capacity(buf); - int result = pn_message_encode(pn_msg, (char*) nx_buffer_cursor(buf), &size); + dx_buffer_t *buf = dx_allocate_buffer(); + size_t size = dx_buffer_capacity(buf); + int result = pn_message_encode(pn_msg, (char*) dx_buffer_cursor(buf), &size); if (result != 0) return "Error in pn_message_encode"; - nx_buffer_insert(buf, size); + dx_buffer_insert(buf, size); - nx_message_t *msg = nx_allocate_message(); - nx_message_content_t *content = MSG_CONTENT(msg); + dx_message_t *msg = dx_allocate_message(); + dx_message_content_t *content = MSG_CONTENT(msg); DEQ_INSERT_TAIL(content->buffers, buf); - int valid = nx_message_check(msg, NX_DEPTH_DELIVERY_ANNOTATIONS); - if (!valid) return "nx_message_check returns 'invalid'"; + int valid = dx_message_check(msg, DX_DEPTH_DELIVERY_ANNOTATIONS); + if (!valid) return "dx_message_check returns 'invalid'"; - nx_field_iterator_t *iter = nx_message_field_iterator(msg, NX_FIELD_TO); + dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); if (iter) return "Expected no iterator for the 'to' field"; - nx_free_message(msg); + dx_free_message(msg); return 0; } diff --git a/qpid/extras/nexus/tests/run_tests.c b/qpid/extras/dispatch/tests/run_tests.c index a677c04577..a677c04577 100644 --- a/qpid/extras/nexus/tests/run_tests.c +++ b/qpid/extras/dispatch/tests/run_tests.c diff --git a/qpid/extras/nexus/tests/server_test.c b/qpid/extras/dispatch/tests/server_test.c index 29cd70eeb3..adeab62af9 100644 --- a/qpid/extras/nexus/tests/server_test.c +++ b/qpid/extras/dispatch/tests/server_test.c @@ -23,12 +23,12 @@ #include <fcntl.h> #include <errno.h> #include <assert.h> -#include <qpid/nexus/timer.h> +#include <qpid/dispatch/timer.h> #include "test_case.h" -#include <qpid/nexus/server.h> -#include <qpid/nexus/user_fd.h> -#include <qpid/nexus/threading.h> -#include <qpid/nexus/log.h> +#include <qpid/dispatch/server.h> +#include <qpid/dispatch/user_fd.h> +#include <qpid/dispatch/threading.h> +#include <qpid/dispatch/log.h> #define THREAD_COUNT 4 #define OCTET_COUNT 100 @@ -43,8 +43,8 @@ static char stored_error[512]; static int write_count; static int read_count; static int fd[2]; -static nx_user_fd_t *ufd_write; -static nx_user_fd_t *ufd_read; +static dx_user_fd_t *ufd_write; +static dx_user_fd_t *ufd_read; static void thread_start(void *context, int thread_id) @@ -62,18 +62,18 @@ static void thread_start(void *context, int thread_id) threads_seen[thread_id]++; if (call_count == THREAD_COUNT) - nx_server_stop(); + dx_server_stop(); sys_mutex_unlock(test_lock); } -static int conn_handler(void *context, nx_conn_event_t event, nx_connection_t *conn) +static int conn_handler(void *context, dx_conn_event_t event, dx_connection_t *conn) { return 0; } -static void ufd_handler(void *context, nx_user_fd_t *ufd) +static void ufd_handler(void *context, dx_user_fd_t *ufd) { long dir = (long) context; char buffer; @@ -84,31 +84,31 @@ static void ufd_handler(void *context, nx_user_fd_t *ufd) if (dir == 0) { // READ in_read++; assert(in_read == 1); - if (!nx_user_fd_is_readable(ufd_read)) { + if (!dx_user_fd_is_readable(ufd_read)) { sprintf(stored_error, "Expected Readable"); - nx_server_stop(); + dx_server_stop(); } else { len = read(fd[0], &buffer, 1); if (len == 1) { read_count++; if (read_count == OCTET_COUNT) - nx_server_stop(); + dx_server_stop(); } - nx_user_fd_activate_read(ufd_read); + dx_user_fd_activate_read(ufd_read); } in_read--; } else { // WRITE in_write++; assert(in_write == 1); - if (!nx_user_fd_is_writeable(ufd_write)) { + if (!dx_user_fd_is_writeable(ufd_write)) { sprintf(stored_error, "Expected Writable"); - nx_server_stop(); + dx_server_stop(); } else { write(fd[1], "X", 1); write_count++; if (write_count < OCTET_COUNT) - nx_user_fd_activate_write(ufd_write); + dx_user_fd_activate_write(ufd_write); } in_write--; } @@ -117,7 +117,7 @@ static void ufd_handler(void *context, nx_user_fd_t *ufd) static void fd_test_start(void *context) { - nx_user_fd_activate_read(ufd_read); + dx_user_fd_activate_read(ufd_read); } @@ -125,7 +125,7 @@ static char* test_start_handler(void *context) { int i; - nx_server_initialize(THREAD_COUNT); + dx_server_initialize(THREAD_COUNT); expected_context = (void*) 0x00112233; stored_error[0] = 0x0; @@ -133,10 +133,10 @@ static char* test_start_handler(void *context) for (i = 0; i < THREAD_COUNT; i++) threads_seen[i] = 0; - nx_server_set_conn_handler(conn_handler); - nx_server_set_start_handler(thread_start, expected_context); - nx_server_run(); - nx_server_finalize(); + dx_server_set_conn_handler(conn_handler); + dx_server_set_start_handler(thread_start, expected_context); + dx_server_run(); + dx_server_finalize(); if (stored_error[0]) return stored_error; if (call_count != THREAD_COUNT) return "Incorrect number of thread-start callbacks"; @@ -150,24 +150,24 @@ static char* test_start_handler(void *context) static char* test_user_fd(void *context) { int res; - nx_timer_t *timer; + dx_timer_t *timer; - nx_server_initialize(THREAD_COUNT); - nx_server_set_conn_handler(conn_handler); - nx_server_set_user_fd_handler(ufd_handler); - timer = nx_timer(fd_test_start, 0); - nx_timer_schedule(timer, 0); + dx_server_initialize(THREAD_COUNT); + dx_server_set_conn_handler(conn_handler); + dx_server_set_user_fd_handler(ufd_handler); + timer = dx_timer(fd_test_start, 0); + dx_timer_schedule(timer, 0); stored_error[0] = 0x0; res = pipe2(fd, O_NONBLOCK); if (res != 0) return "Error creating pipe2"; - ufd_write = nx_user_fd(fd[1], (void*) 1); - ufd_read = nx_user_fd(fd[0], (void*) 0); + ufd_write = dx_user_fd(fd[1], (void*) 1); + ufd_read = dx_user_fd(fd[0], (void*) 0); - nx_server_run(); - nx_timer_free(timer); - nx_server_finalize(); + dx_server_run(); + dx_timer_free(timer); + dx_server_finalize(); close(fd[0]); close(fd[1]); @@ -184,7 +184,7 @@ int server_tests(void) { int result = 0; test_lock = sys_mutex(); - nx_log_set_mask(LOG_NONE); + dx_log_set_mask(LOG_NONE); TEST_CASE(test_start_handler, 0); TEST_CASE(test_user_fd, 0); diff --git a/qpid/extras/nexus/tests/test_case.h b/qpid/extras/dispatch/tests/test_case.h index 6e36b440a5..6e36b440a5 100644 --- a/qpid/extras/nexus/tests/test_case.h +++ b/qpid/extras/dispatch/tests/test_case.h diff --git a/qpid/extras/nexus/tests/timer_test.c b/qpid/extras/dispatch/tests/timer_test.c index 09be21f4b6..3d199f2aa2 100644 --- a/qpid/extras/nexus/tests/timer_test.c +++ b/qpid/extras/dispatch/tests/timer_test.c @@ -18,27 +18,27 @@ */ #include <stdio.h> -#include <qpid/nexus/timer.h> +#include <qpid/dispatch/timer.h> #include "alloc_private.h" #include "timer_private.h" #include "test_case.h" -#include <qpid/nexus/threading.h> +#include <qpid/dispatch/threading.h> static unsigned long fire_mask; -static nx_timer_list_t pending_timers; +static dx_timer_list_t pending_timers; static sys_mutex_t *lock; static long time; -static nx_timer_t *timers[16]; +static dx_timer_t *timers[16]; -void nx_server_timer_pending_LH(nx_timer_t *timer) +void dx_server_timer_pending_LH(dx_timer_t *timer) { DEQ_INSERT_TAIL(pending_timers, timer); } -void nx_server_timer_cancel_LH(nx_timer_t *timer) +void dx_server_timer_cancel_LH(dx_timer_t *timer) { if (timer->state == TIMER_PENDING) DEQ_REMOVE(pending_timers, timer); @@ -49,10 +49,10 @@ static int fire_head() { sys_mutex_lock(lock); int result = DEQ_SIZE(pending_timers); - nx_timer_t *timer = DEQ_HEAD(pending_timers); + dx_timer_t *timer = DEQ_HEAD(pending_timers); if (timer) { DEQ_REMOVE_HEAD(pending_timers); - nx_timer_idle_LH(timer); + dx_timer_idle_LH(timer); fire_mask |= (unsigned long) timer->context; } sys_mutex_unlock(lock); @@ -65,11 +65,11 @@ static char* test_quiet(void *context) fire_mask = 0; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); while(fire_head()); @@ -84,7 +84,7 @@ static char* test_immediate(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 0); + dx_timer_schedule(timers[0], 0); if (fire_mask != 0) return "Premature firing"; if (fire_head() > 1) return "Too many firings"; @@ -99,17 +99,17 @@ static char* test_immediate_plus_delayed(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 0); - nx_timer_schedule(timers[1], 5); + dx_timer_schedule(timers[0], 0); + dx_timer_schedule(timers[1], 5); if (fire_mask != 0) return "Premature firing"; if (fire_head() > 1) return "Too many firings"; if (fire_mask != 1) return "Incorrect fire mask 1"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); time += 8; - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() < 1) return "Delayed Failed to fire"; @@ -124,23 +124,23 @@ static char* test_single(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[0], 2); if (fire_head() > 0) return "Premature firing 1"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() > 0) return "Premature firing 2"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() < 1) return "Failed to fire"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() != 0) return "Spurious fires"; @@ -156,12 +156,12 @@ static char* test_two_inorder(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 2); - nx_timer_schedule(timers[1], 4); + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 4); sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); int count = fire_head(); if (count < 1) return "First failed to fire"; @@ -169,8 +169,8 @@ static char* test_two_inorder(void *context) if (fire_mask != 1) return "Incorrect fire mask 1"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() < 1) return "Second failed to fire"; if (fire_mask != 3) return "Incorrect fire mask 3"; @@ -184,12 +184,12 @@ static char* test_two_reverse(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 4); - nx_timer_schedule(timers[1], 2); + dx_timer_schedule(timers[0], 4); + dx_timer_schedule(timers[1], 2); sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); int count = fire_head(); if (count < 1) return "First failed to fire"; @@ -197,8 +197,8 @@ static char* test_two_reverse(void *context) if (fire_mask != 2) return "Incorrect fire mask 2"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() < 1) return "Second failed to fire"; if (fire_mask != 3) return "Incorrect fire mask 3"; @@ -212,12 +212,12 @@ static char* test_two_duplicate(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 2); - nx_timer_schedule(timers[1], 2); + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 2); sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); int count = fire_head(); if (count != 2) return "Expected two firings"; @@ -225,8 +225,8 @@ static char* test_two_duplicate(void *context) if (fire_mask != 3) return "Incorrect fire mask 3"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); if (fire_head() > 0) return "Spurious timer fires"; @@ -241,24 +241,24 @@ static char* test_separated(void *context) while(fire_head()); fire_mask = 0; - nx_timer_schedule(timers[0], 2); - nx_timer_schedule(timers[1], 4); + dx_timer_schedule(timers[0], 2); + dx_timer_schedule(timers[1], 4); sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); count = fire_head(); if (count < 1) return "First failed to fire"; if (count > 1) return "Second fired prematurely"; if (fire_mask != 1) return "Incorrect fire mask 1"; - nx_timer_schedule(timers[2], 2); - nx_timer_schedule(timers[3], 4); + dx_timer_schedule(timers[2], 2); + dx_timer_schedule(timers[3], 4); sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); count = fire_head(); fire_head(); @@ -267,20 +267,20 @@ static char* test_separated(void *context) if (fire_mask != 7) return "Incorrect fire mask 7"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); count = fire_head(); if (count < 1) return "Fourth failed to fire"; if (fire_mask != 15) return "Incorrect fire mask 15"; sys_mutex_lock(lock); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); count = fire_head(); if (count > 0) return "Spurious fire"; @@ -322,10 +322,10 @@ static char* test_big(void *context) int i; for (i = 0; i < 16; i++) - nx_timer_schedule(timers[i], durations[i]); + dx_timer_schedule(timers[i], durations[i]); for (i = 0; i < 18; i++) { sys_mutex_lock(lock); - nx_timer_visit_LH(time++); + dx_timer_visit_LH(time++); sys_mutex_unlock(lock); while(fire_head()); if (fire_mask != masks[i]) { @@ -342,30 +342,30 @@ static char* test_big(void *context) int timer_tests(void) { int result = 0; - nx_alloc_initialize(); + dx_alloc_initialize(); fire_mask = 0; DEQ_INIT(pending_timers); lock = sys_mutex(); - nx_timer_initialize(lock); + dx_timer_initialize(lock); time = 1; - timers[0] = nx_timer(0, (void*) 0x00000001); - timers[1] = nx_timer(0, (void*) 0x00000002); - timers[2] = nx_timer(0, (void*) 0x00000004); - timers[3] = nx_timer(0, (void*) 0x00000008); - timers[4] = nx_timer(0, (void*) 0x00000010); - timers[5] = nx_timer(0, (void*) 0x00000020); - timers[6] = nx_timer(0, (void*) 0x00000040); - timers[7] = nx_timer(0, (void*) 0x00000080); - timers[8] = nx_timer(0, (void*) 0x00000100); - timers[9] = nx_timer(0, (void*) 0x00000200); - timers[10] = nx_timer(0, (void*) 0x00000400); - timers[11] = nx_timer(0, (void*) 0x00000800); - timers[12] = nx_timer(0, (void*) 0x00001000); - timers[13] = nx_timer(0, (void*) 0x00002000); - timers[14] = nx_timer(0, (void*) 0x00004000); - timers[15] = nx_timer(0, (void*) 0x00008000); + timers[0] = dx_timer(0, (void*) 0x00000001); + timers[1] = dx_timer(0, (void*) 0x00000002); + timers[2] = dx_timer(0, (void*) 0x00000004); + timers[3] = dx_timer(0, (void*) 0x00000008); + timers[4] = dx_timer(0, (void*) 0x00000010); + timers[5] = dx_timer(0, (void*) 0x00000020); + timers[6] = dx_timer(0, (void*) 0x00000040); + timers[7] = dx_timer(0, (void*) 0x00000080); + timers[8] = dx_timer(0, (void*) 0x00000100); + timers[9] = dx_timer(0, (void*) 0x00000200); + timers[10] = dx_timer(0, (void*) 0x00000400); + timers[11] = dx_timer(0, (void*) 0x00000800); + timers[12] = dx_timer(0, (void*) 0x00001000); + timers[13] = dx_timer(0, (void*) 0x00002000); + timers[14] = dx_timer(0, (void*) 0x00004000); + timers[15] = dx_timer(0, (void*) 0x00008000); TEST_CASE(test_quiet, 0); TEST_CASE(test_immediate, 0); @@ -379,9 +379,9 @@ int timer_tests(void) int i; for (i = 0; i < 16; i++) - nx_timer_free(timers[i]); + dx_timer_free(timers[i]); - nx_timer_finalize(); + dx_timer_finalize(); return result; } diff --git a/qpid/extras/nexus/tests/tool_test.c b/qpid/extras/dispatch/tests/tool_test.c index 0848b51ec7..7923ee3381 100644 --- a/qpid/extras/nexus/tests/tool_test.c +++ b/qpid/extras/dispatch/tests/tool_test.c @@ -20,7 +20,7 @@ #include "test_case.h" #include <stdio.h> #include <string.h> -#include <qpid/nexus/ctools.h> +#include <qpid/dispatch/ctools.h> typedef struct item_t { DEQ_LINKS(struct item_t); diff --git a/qpid/extras/nexus/include/qpid/nexus/container.h b/qpid/extras/nexus/include/qpid/nexus/container.h deleted file mode 100644 index 056c9a5b5e..0000000000 --- a/qpid/extras/nexus/include/qpid/nexus/container.h +++ /dev/null @@ -1,129 +0,0 @@ -#ifndef __container_h__ -#define __container_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/engine.h> -#include <qpid/nexus/server.h> -#include <qpid/nexus/alloc.h> -#include <qpid/nexus/ctools.h> - -typedef uint8_t nx_dist_mode_t; -#define NX_DIST_COPY 0x01 -#define NX_DIST_MOVE 0x02 -#define NX_DIST_BOTH 0x03 - -/** - * Node Lifetime Policy (see AMQP 3.5.9) - */ -typedef enum { - NX_LIFE_PERMANENT, - NX_LIFE_DELETE_CLOSE, - NX_LIFE_DELETE_NO_LINKS, - NX_LIFE_DELETE_NO_MESSAGES, - NX_LIFE_DELETE_NO_LINKS_MESSAGES -} nx_lifetime_policy_t; - - -/** - * Link Direction - */ -typedef enum { - NX_INCOMING, - NX_OUTGOING -} nx_direction_t; - - -typedef struct nx_node_t nx_node_t; -typedef struct nx_link_t nx_link_t; - -typedef void (*nx_container_delivery_handler_t) (void *node_context, nx_link_t *link, pn_delivery_t *delivery); -typedef int (*nx_container_link_handler_t) (void *node_context, nx_link_t *link); -typedef int (*nx_container_link_detach_handler_t) (void *node_context, nx_link_t *link, int closed); -typedef void (*nx_container_node_handler_t) (void *type_context, nx_node_t *node); -typedef void (*nx_container_conn_handler_t) (void *type_context, nx_connection_t *conn); - -typedef struct { - char *type_name; - void *type_context; - int allow_dynamic_creation; - - // - // Node-Instance Handlers - // - nx_container_delivery_handler_t rx_handler; - nx_container_delivery_handler_t tx_handler; - nx_container_delivery_handler_t disp_handler; - nx_container_link_handler_t incoming_handler; - nx_container_link_handler_t outgoing_handler; - nx_container_link_handler_t writable_handler; - nx_container_link_detach_handler_t link_detach_handler; - - // - // Node-Type Handlers - // - nx_container_node_handler_t node_created_handler; - nx_container_node_handler_t node_destroyed_handler; - nx_container_conn_handler_t inbound_conn_open_handler; - nx_container_conn_handler_t outbound_conn_open_handler; -} nx_node_type_t; - -void nx_container_initialize(void); -void nx_container_finalize(void); - -int nx_container_register_node_type(const nx_node_type_t *nt); - -void nx_container_set_default_node_type(const nx_node_type_t *nt, - void *node_context, - nx_dist_mode_t supported_dist); - -nx_node_t *nx_container_create_node(const nx_node_type_t *nt, - const char *name, - void *node_context, - nx_dist_mode_t supported_dist, - nx_lifetime_policy_t life_policy); -void nx_container_destroy_node(nx_node_t *node); - -void nx_container_node_set_context(nx_node_t *node, void *node_context); -nx_dist_mode_t nx_container_node_get_dist_modes(const nx_node_t *node); -nx_lifetime_policy_t nx_container_node_get_life_policy(const nx_node_t *node); - -nx_link_t *nx_link(nx_node_t *node, nx_connection_t *conn, nx_direction_t dir, const char *name); -void nx_link_set_context(nx_link_t *link, void *link_context); -void *nx_link_get_context(nx_link_t *link); -pn_link_t *nx_link_pn(nx_link_t *link); -pn_terminus_t *nx_link_source(nx_link_t *link); -pn_terminus_t *nx_link_target(nx_link_t *link); -pn_terminus_t *nx_link_remote_source(nx_link_t *link); -pn_terminus_t *nx_link_remote_target(nx_link_t *link); -void nx_link_activate(nx_link_t *link); -void nx_link_close(nx_link_t *link); - - -typedef struct nx_link_item_t nx_link_item_t; - -struct nx_link_item_t { - DEQ_LINKS(nx_link_item_t); - nx_link_t *link; -}; - -ALLOC_DECLARE(nx_link_item_t); -DEQ_DECLARE(nx_link_item_t, nx_link_list_t); - -#endif diff --git a/qpid/extras/nexus/include/qpid/nexus/message.h b/qpid/extras/nexus/include/qpid/nexus/message.h deleted file mode 100644 index 3885b09576..0000000000 --- a/qpid/extras/nexus/include/qpid/nexus/message.h +++ /dev/null @@ -1,165 +0,0 @@ -#ifndef __nexus_message_h__ -#define __nexus_message_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/engine.h> -#include <qpid/nexus/ctools.h> -#include <qpid/nexus/alloc.h> -#include <qpid/nexus/iterator.h> -#include <qpid/nexus/buffer.h> -#include <qpid/nexus/iovec.h> - -// Callback for status change (confirmed persistent, loaded-in-memory, etc.) - -typedef struct nx_message_t nx_message_t; - -DEQ_DECLARE(nx_message_t, nx_message_list_t); - -struct nx_message_t { - DEQ_LINKS(nx_message_t); - // Private members not listed here. -}; - -typedef enum { - NX_DEPTH_NONE, - NX_DEPTH_HEADER, - NX_DEPTH_DELIVERY_ANNOTATIONS, - NX_DEPTH_MESSAGE_ANNOTATIONS, - NX_DEPTH_PROPERTIES, - NX_DEPTH_APPLICATION_PROPERTIES, - NX_DEPTH_BODY, - NX_DEPTH_ALL -} nx_message_depth_t; - - -typedef enum { - // - // Message Sections - // - NX_FIELD_HEADER, - NX_FIELD_DELIVERY_ANNOTATION, - NX_FIELD_MESSAGE_ANNOTATION, - NX_FIELD_PROPERTIES, - NX_FIELD_APPLICATION_PROPERTIES, - NX_FIELD_BODY, - NX_FIELD_FOOTER, - - // - // Fields of the Header Section - // - NX_FIELD_DURABLE, - NX_FIELD_PRIORITY, - NX_FIELD_TTL, - NX_FIELD_FIRST_ACQUIRER, - NX_FIELD_DELIVERY_COUNT, - - // - // Fields of the Properties Section - // - NX_FIELD_MESSAGE_ID, - NX_FIELD_USER_ID, - NX_FIELD_TO, - NX_FIELD_SUBJECT, - NX_FIELD_REPLY_TO, - NX_FIELD_CORRELATION_ID, - NX_FIELD_CONTENT_TYPE, - NX_FIELD_CONTENT_ENCODING, - NX_FIELD_ABSOLUTE_EXPIRY_TIME, - NX_FIELD_CREATION_TIME, - NX_FIELD_GROUP_ID, - NX_FIELD_GROUP_SEQUENCE, - NX_FIELD_REPLY_TO_GROUP_ID -} nx_message_field_t; - -// -// Functions for allocation -// -nx_message_t *nx_allocate_message(void); -void nx_free_message(nx_message_t *qm); -nx_message_t *nx_message_copy(nx_message_t *qm); -int nx_message_persistent(nx_message_t *qm); -int nx_message_in_memory(nx_message_t *qm); - -void nx_message_set_out_delivery(nx_message_t *msg, pn_delivery_t *delivery); -pn_delivery_t *nx_message_out_delivery(nx_message_t *msg); -void nx_message_set_in_delivery(nx_message_t *msg, pn_delivery_t *delivery); -pn_delivery_t *nx_message_in_delivery(nx_message_t *msg); - -// -// Functions for received messages -// -nx_message_t *nx_message_receive(pn_delivery_t *delivery); -void nx_message_send(nx_message_t *msg, pn_link_t *link); - -int nx_message_check(nx_message_t *msg, nx_message_depth_t depth); -nx_field_iterator_t *nx_message_field_iterator(nx_message_t *msg, nx_message_field_t field); -nx_iovec_t *nx_message_field_iovec(nx_message_t *msg, nx_message_field_t field); - -pn_delivery_t *nx_message_inbound_delivery(nx_message_t *qm); - -// -// Functions for composed messages -// - -// Convenience Functions -void nx_message_compose_1(nx_message_t *msg, const char *to, nx_buffer_list_t *buffers); -void nx_message_copy_header(nx_message_t *msg); // Copy received header into send-header (prior to adding annotations) -void nx_message_copy_message_annotations(nx_message_t *msg); - -// Raw Functions -void nx_message_begin_header(nx_message_t *msg); -void nx_message_end_header(nx_message_t *msg); - -void nx_message_begin_delivery_annotations(nx_message_t *msg); -void nx_message_end_delivery_annotations(nx_message_t *msg); - -void nx_message_begin_message_annotations(nx_message_t *msg); -void nx_message_end_message_annotations(nx_message_t *msg); - -void nx_message_begin_message_properties(nx_message_t *msg); -void nx_message_end_message_properties(nx_message_t *msg); - -void nx_message_begin_application_properties(nx_message_t *msg); -void nx_message_end_application_properties(nx_message_t *msg); - -void nx_message_append_body_data(nx_message_t *msg, nx_buffer_list_t *buffers); - -void nx_message_begin_body_sequence(nx_message_t *msg); -void nx_message_end_body_sequence(nx_message_t *msg); - -void nx_message_begin_footer(nx_message_t *msg); -void nx_message_end_footer(nx_message_t *msg); - -void nx_message_insert_null(nx_message_t *msg); -void nx_message_insert_boolean(nx_message_t *msg, int value); -void nx_message_insert_ubyte(nx_message_t *msg, uint8_t value); -void nx_message_insert_uint(nx_message_t *msg, uint32_t value); -void nx_message_insert_ulong(nx_message_t *msg, uint64_t value); -void nx_message_insert_binary(nx_message_t *msg, const uint8_t *start, size_t len); -void nx_message_insert_string(nx_message_t *msg, const char *start); -void nx_message_insert_uuid(nx_message_t *msg, const uint8_t *value); -void nx_message_insert_symbol(nx_message_t *msg, const char *start, size_t len); -void nx_message_insert_timestamp(nx_message_t *msg, uint64_t value); -void nx_message_begin_list(nx_message_t* msg); -void nx_message_end_list(nx_message_t* msg); -void nx_message_begin_map(nx_message_t* msg); -void nx_message_end_map(nx_message_t* msg); - -#endif diff --git a/qpid/extras/nexus/site/images/gwarch.dia b/qpid/extras/nexus/site/images/gwarch.dia Binary files differdeleted file mode 100644 index fd7eef97a4..0000000000 --- a/qpid/extras/nexus/site/images/gwarch.dia +++ /dev/null diff --git a/qpid/extras/nexus/site/images/gwarch.png b/qpid/extras/nexus/site/images/gwarch.png Binary files differdeleted file mode 100644 index 923baadf9f..0000000000 --- a/qpid/extras/nexus/site/images/gwarch.png +++ /dev/null |
