diff options
Diffstat (limited to 'cpp/src/qmf')
-rw-r--r-- | cpp/src/qmf/Agent.cpp | 29 | ||||
-rw-r--r-- | cpp/src/qmf/AgentImpl.h | 1 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 229 | ||||
-rw-r--r-- | cpp/src/qmf/AgentSessionImpl.h | 175 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 125 | ||||
-rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 22 | ||||
-rw-r--r-- | cpp/src/qmf/DataAddr.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/DataAddrImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qmf/EventNotifierImpl.cpp | 56 | ||||
-rw-r--r-- | cpp/src/qmf/EventNotifierImpl.h | 48 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifier.cpp | 65 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.cpp | 112 | ||||
-rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.h | 61 | ||||
-rw-r--r-- | cpp/src/qmf/PrivateImplRef.h | 2 | ||||
-rw-r--r-- | cpp/src/qmf/engine/ResilientConnection.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.cpp | 11 | ||||
-rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.h | 7 |
17 files changed, 760 insertions, 199 deletions
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp index 915f2a1c88..684f8e4fba 100644 --- a/cpp/src/qmf/Agent.cpp +++ b/cpp/src/qmf/Agent.cpp @@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) + sender(session.directSender), schemaCache(s.schemaCache) { } @@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout) uint32_t AgentImpl::queryAsync(const Query& query) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendQuery(query, correlator); return correlator; @@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text) ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator; + uint32_t correlator(session.correlator()); ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); sendMethod(method, args, addr, correlator); return correlator; @@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const void AgentImpl::sendSchemaRequest(const SchemaId& id) { - uint32_t correlator; - - { - qpid::sys::Mutex::ScopedLock l(lock); - correlator = nextCorrelator++; - } + uint32_t correlator(session.correlator()); if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { Query query(QUERY_SCHEMA, id); diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h index 7fa4f4373a..09754a3a7e 100644 --- a/cpp/src/qmf/AgentImpl.h +++ b/cpp/src/qmf/AgentImpl.h @@ -99,7 +99,6 @@ namespace qmf { uint32_t capability; qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; - uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; boost::shared_ptr<SchemaCache> schemaCache; mutable std::set<std::string> packageSet; diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 4c5a72a467..251c25fd44 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -19,132 +19,7 @@ * */ -#include "qpid/RefCounted.h" -#include "qmf/PrivateImplRef.h" -#include "qmf/exceptions.h" -#include "qmf/AgentSession.h" -#include "qmf/AgentEventImpl.h" -#include "qmf/SchemaIdImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/DataAddrImpl.h" -#include "qmf/DataImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/agentCapability.h" -#include "qmf/constants.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/AddressParser.h" -#include "qpid/management/Buffer.h" -#include <queue> -#include <map> -#include <set> -#include <iostream> -#include <memory> - -using namespace std; -using namespace qpid::messaging; -using namespace qmf; -using qpid::types::Variant; - -namespace qmf { - class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { - public: - ~AgentSessionImpl(); - - // - // Methods from API handle - // - AgentSessionImpl(Connection& c, const string& o); - void setDomain(const string& d) { checkOpen(); domain = d; } - void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } - void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } - void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } - void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } - const string& getName() const { return agentName; } - void open(); - void close(); - bool nextEvent(AgentEvent& e, Duration t); - - void registerSchema(Schema& s); - DataAddr addData(Data& d, const string& n, bool persist); - void delData(const DataAddr&); - - void authAccept(AgentEvent& e); - void authReject(AgentEvent& e, const string& m); - void raiseException(AgentEvent& e, const string& s); - void raiseException(AgentEvent& e, const Data& d); - void response(AgentEvent& e, const Data& d); - void complete(AgentEvent& e); - void methodSuccess(AgentEvent& e); - void raiseEvent(const Data& d); - void raiseEvent(const Data& d, int s); - - private: - typedef map<DataAddr, Data, DataAddrCompare> DataIndex; - typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; - - mutable qpid::sys::Mutex lock; - qpid::sys::Condition cond; - Connection connection; - Session session; - Sender directSender; - Sender topicSender; - string domain; - Variant::Map attributes; - Variant::Map options; - string agentName; - bool opened; - queue<AgentEvent> eventQueue; - qpid::sys::Thread* thread; - bool threadCanceled; - uint32_t bootSequence; - uint32_t interval; - uint64_t lastHeartbeat; - uint64_t lastVisit; - bool forceHeartbeat; - bool externalStorage; - bool autoAllowQueries; - bool autoAllowMethods; - uint32_t maxSubscriptions; - uint32_t minSubInterval; - uint32_t subLifetime; - bool publicEvents; - bool listenOnDirect; - bool strictSecurity; - uint64_t schemaUpdateTime; - string directBase; - string topicBase; - - SchemaMap schemata; - DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; - - void checkOpen(); - void setAgentName(); - void enqueueEvent(const AgentEvent&); - void handleLocateRequest(const Variant::List& content, const Message& msg); - void handleMethodRequest(const Variant::Map& content, const Message& msg); - void handleQueryRequest(const Variant::Map& content, const Message& msg); - void handleSchemaRequest(AgentEvent&); - void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); - void dispatch(Message); - void sendHeartbeat(); - void send(Message, const Address&); - void flushResponses(AgentEvent&, bool); - void periodicProcessing(uint64_t); - void run(); - }; -} - -typedef qmf::PrivateImplRef<AgentSession> PI; +#include "qmf/AgentSessionImpl.h" AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } @@ -161,6 +36,7 @@ const string& AgentSession::getName() const { return impl->getName(); } void AgentSession::open() { impl->open(); } void AgentSession::close() { impl->close(); } bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } +int AgentSession::pendingEvents() const { return impl->pendingEvents(); } void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } void AgentSession::delData(const DataAddr& a) { impl->delData(a); } @@ -179,11 +55,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } //======================================================================================== AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), + connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), - listenOnDirect(true), strictSecurity(false), + listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -244,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > interval) + maxThreadWaitTime = interval; } @@ -252,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -260,6 +148,12 @@ void AgentSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + const string addrArgs(";{create:never,node:{type:topic}}"); const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); attributes["_direct_subject"] = routableAddr; @@ -297,34 +191,47 @@ void AgentSessionImpl::open() } -void AgentSessionImpl::close() +void AgentSessionImpl::closeAsync() { if (!opened) return; - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } +void AgentSessionImpl::close() +{ + closeAsync(); + + if (thread) { + thread->join(); + delete thread; + thread = 0; + } +} + + bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty()) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -332,6 +239,26 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) } +int AgentSessionImpl::pendingEvents() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventQueue.size(); +} + + +void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + eventNotifier = notifier; +} + +EventNotifierImpl* AgentSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventNotifier; +} + + void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) @@ -587,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event) qpid::sys::Mutex::ScopedLock l(lock); bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -1032,6 +961,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) } +void AgentSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void AgentSessionImpl::run() { QPID_LOG(debug, "AgentSession thread started for agent " << agentName); @@ -1041,7 +977,7 @@ void AgentSessionImpl::run() periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -1058,6 +994,19 @@ void AgentSessionImpl::run() enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } + +AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session) +{ + return *session.impl; +} + + +const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session) +{ + return *session.impl; +} + diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h new file mode 100644 index 0000000000..ae512a4054 --- /dev/null +++ b/cpp/src/qmf/AgentSessionImpl.h @@ -0,0 +1,175 @@ +#ifndef __QMF_AGENT_SESSION_IMPL_H +#define __QMF_AGENT_SESSION_IMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentEventImpl.h" +#include "qmf/EventNotifierImpl.h" +#include "qpid/messaging/Connection.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/AgentSession.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/DataImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" +#include "qmf/constants.h" + +#include <queue> +#include <map> +#include <iostream> +#include <memory> + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; + +typedef qmf::PrivateImplRef<AgentSession> PI; + +namespace qmf { + class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~AgentSessionImpl(); + + // + // Methods from API handle + // + AgentSessionImpl(Connection& c, const string& o); + void setDomain(const string& d) { checkOpen(); domain = d; } + void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } + void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } + void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } + void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } + const string& getName() const { return agentName; } + void open(); + void closeAsync(); + void close(); + bool nextEvent(AgentEvent& e, Duration t); + int pendingEvents() const; + + void setEventNotifier(EventNotifierImpl* eventNotifier); + EventNotifierImpl* getEventNotifier() const; + + void registerSchema(Schema& s); + DataAddr addData(Data& d, const string& n, bool persist); + void delData(const DataAddr&); + + void authAccept(AgentEvent& e); + void authReject(AgentEvent& e, const string& m); + void raiseException(AgentEvent& e, const string& s); + void raiseException(AgentEvent& e, const Data& d); + void response(AgentEvent& e, const Data& d); + void complete(AgentEvent& e); + void methodSuccess(AgentEvent& e); + void raiseEvent(const Data& d); + void raiseEvent(const Data& d, int s); + + private: + typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; + + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + Connection connection; + Session session; + Sender directSender; + Sender topicSender; + string domain; + Variant::Map attributes; + Variant::Map options; + string agentName; + bool opened; + queue<AgentEvent> eventQueue; + EventNotifierImpl* eventNotifier; + qpid::sys::Thread* thread; + bool threadCanceled; + uint32_t bootSequence; + uint32_t interval; + uint64_t lastHeartbeat; + uint64_t lastVisit; + bool forceHeartbeat; + bool externalStorage; + bool autoAllowQueries; + bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; + bool listenOnDirect; + bool strictSecurity; + uint32_t maxThreadWaitTime; + uint64_t schemaUpdateTime; + string directBase; + string topicBase; + + SchemaMap schemata; + DataIndex globalIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; + + void checkOpen(); + void setAgentName(); + void enqueueEvent(const AgentEvent&); + void alertEventNotifierLH(bool readable); + void handleLocateRequest(const Variant::List& content, const Message& msg); + void handleMethodRequest(const Variant::Map& content, const Message& msg); + void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); + void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); + void dispatch(Message); + void sendHeartbeat(); + void send(Message, const Address&); + void flushResponses(AgentEvent&, bool); + void periodicProcessing(uint64_t); + void run(); + }; + + struct AgentSessionImplAccess { + static AgentSessionImpl& get(AgentSession& session); + static const AgentSessionImpl& get(const AgentSession& session); + }; +} + + +#endif + diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index e12c1152f6..2dfc894c58 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -54,6 +54,7 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } +int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } @@ -65,9 +66,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), - opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) + connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), + opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -91,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); + + iter = optMap.find("max-thread-wait-time"); + if (iter != optMap.end()) + maxThreadWaitTime = iter->second.asUint32(); } + + if (maxThreadWaitTime > 60) + maxThreadWaitTime = 60; } @@ -99,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); + + if (thread) { + thread->join(); + delete thread; + } } @@ -153,6 +166,12 @@ void ConsoleSessionImpl::open() if (opened) throw QmfException("The session is already open"); + // If the thread exists, join and delete it before creating a new one. + if (thread) { + thread->join(); + delete thread; + } + // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; @@ -181,45 +200,57 @@ void ConsoleSessionImpl::open() // Start the receiver thread threadCanceled = false; + opened = true; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); - - opened = true; } -void ConsoleSessionImpl::close() +void ConsoleSessionImpl::closeAsync() { if (!opened) throw QmfException("The session is already closed"); - // Stop and join the receiver thread + // Stop the receiver thread. Don't join it until the destructor is called or open() is called. threadCanceled = true; - thread->join(); - delete thread; - - // Close the AMQP session - session.close(); opened = false; } +void ConsoleSessionImpl::close() +{ + closeAsync(); + + if (thread) { + thread->join(); + delete thread; + thread = 0; + } +} + + bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) { uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty()) - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), - qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); + if (eventQueue.empty() && milliseconds > 0) { + int64_t nsecs(qpid::sys::TIME_INFINITE); + if ((uint64_t)(nsecs / 1000000) > milliseconds) + nsecs = (int64_t) milliseconds * 1000000; + qpid::sys::Duration then(nsecs); + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); + } if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); + if (eventQueue.empty()) + alertEventNotifierLH(false); return true; } @@ -227,6 +258,27 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) } +int ConsoleSessionImpl::pendingEvents() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventQueue.size(); +} + + +void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier) +{ + qpid::sys::Mutex::ScopedLock l(lock); + eventNotifier = notifier; +} + + +EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const +{ + qpid::sys::Mutex::ScopedLock l(lock); + return eventNotifier; +} + + uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -268,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) + if (notify) { cond.notify(); + alertEventNotifierLH(true); + } } @@ -421,7 +475,23 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian iter = content.find("_values"); if (iter == content.end()) return; - Variant::Map attrs(iter->second.asMap()); + const Variant::Map& in_attrs(iter->second.asMap()); + Variant::Map attrs; + + // + // Copy the map from the message to "attrs". Translate any old-style + // keys to their new key values in the process. + // + for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { + if (iter->first == "epoch") + attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; + else if (iter->first == "timestamp") + attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; + else if (iter->first == "heartbeat_interval") + attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; + else + attrs[iter->first] = iter->second; + } iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) @@ -562,6 +632,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) } +void ConsoleSessionImpl::alertEventNotifierLH(bool readable) +{ + if (eventNotifier) + eventNotifier->setReadable(readable); +} + + void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); @@ -572,7 +649,7 @@ void ConsoleSessionImpl::run() qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND); + bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); if (threadCanceled) break; if (valid) { @@ -589,6 +666,18 @@ void ConsoleSessionImpl::run() enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } + session.close(); QPID_LOG(debug, "ConsoleSession thread exiting"); } + +ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session) +{ + return *session.impl; +} + + +const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session) +{ + return *session.impl; +} diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index 675c8bcfb5..e2b30602fa 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -27,6 +27,7 @@ #include "qmf/SchemaId.h" #include "qmf/Schema.h" #include "qmf/ConsoleEventImpl.h" +#include "qmf/EventNotifierImpl.h" #include "qmf/SchemaCache.h" #include "qmf/Query.h" #include "qpid/sys/Mutex.h" @@ -41,9 +42,13 @@ #include "qpid/messaging/Address.h" #include "qpid/management/Buffer.h" #include "qpid/types/Variant.h" + +#include <boost/shared_ptr.hpp> #include <map> #include <queue> +using namespace std; + namespace qmf { class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { public: @@ -56,8 +61,14 @@ namespace qmf { void setDomain(const std::string& d) { domain = d; } void setAgentFilter(const std::string& f); void open(); + void closeAsync(); void close(); bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t); + int pendingEvents() const; + + void setEventNotifier(EventNotifierImpl* notifier); + EventNotifierImpl* getEventNotifier() const; + uint32_t getAgentCount() const; Agent getAgent(uint32_t i) const; Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } @@ -75,9 +86,11 @@ namespace qmf { uint32_t maxAgentAgeMinutes; bool listenOnDirect; bool strictSecurity; + uint32_t maxThreadWaitTime; Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; + EventNotifierImpl* eventNotifier; qpid::sys::Thread* thread; bool threadCanceled; uint64_t lastVisit; @@ -89,6 +102,8 @@ namespace qmf { std::string directBase; std::string topicBase; boost::shared_ptr<SchemaCache> schemaCache; + qpid::sys::Mutex corrlock; + uint32_t nextCorrelator; void enqueueEvent(const ConsoleEvent&); void enqueueEventLH(const ConsoleEvent&); @@ -98,10 +113,17 @@ namespace qmf { void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); + void alertEventNotifierLH(bool readable); void run(); + uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; + + struct ConsoleSessionImplAccess { + static ConsoleSessionImpl& get(ConsoleSession& session); + static const ConsoleSessionImpl& get(const ConsoleSession& session); + }; } #endif diff --git a/cpp/src/qmf/DataAddr.cpp b/cpp/src/qmf/DataAddr.cpp index fb51d5787f..d16e12062e 100644 --- a/cpp/src/qmf/DataAddr.cpp +++ b/cpp/src/qmf/DataAddr.cpp @@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this); } DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); } bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; } +bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; } bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; } +bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; } DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); } DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); } @@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); } uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); } Variant::Map DataAddr::asMap() const { return impl->asMap(); } -bool DataAddrImpl::operator==(const DataAddrImpl& other) +bool DataAddrImpl::operator==(const DataAddrImpl& other) const { return agentName == other.agentName && @@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other) } -bool DataAddrImpl::operator<(const DataAddrImpl& other) +bool DataAddrImpl::operator<(const DataAddrImpl& other) const { if (agentName < other.agentName) return true; if (agentName > other.agentName) return false; diff --git a/cpp/src/qmf/DataAddrImpl.h b/cpp/src/qmf/DataAddrImpl.h index 3f9cae9453..11d512f0c4 100644 --- a/cpp/src/qmf/DataAddrImpl.h +++ b/cpp/src/qmf/DataAddrImpl.h @@ -38,8 +38,8 @@ namespace qmf { // // Methods from API handle // - bool operator==(const DataAddrImpl&); - bool operator<(const DataAddrImpl&); + bool operator==(const DataAddrImpl&) const; + bool operator<(const DataAddrImpl&) const; DataAddrImpl(const qpid::types::Variant::Map&); DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) : agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {} diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp new file mode 100644 index 0000000000..20114aaa5e --- /dev/null +++ b/cpp/src/qmf/EventNotifierImpl.cpp @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/EventNotifierImpl.h" +#include "qmf/AgentSessionImpl.h" +#include "qmf/ConsoleSessionImpl.h" + +EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession) + : readable(false), agent(agentSession) +{ + AgentSessionImplAccess::get(agent).setEventNotifier(this); +} + + +EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession) + : readable(false), console(consoleSession) +{ + ConsoleSessionImplAccess::get(console).setEventNotifier(this); +} + + +EventNotifierImpl::~EventNotifierImpl() +{ + if (agent.isValid()) + AgentSessionImplAccess::get(agent).setEventNotifier(NULL); + if (console.isValid()) + ConsoleSessionImplAccess::get(console).setEventNotifier(NULL); +} + +void EventNotifierImpl::setReadable(bool readable) +{ + update(readable); + this->readable = readable; +} + + +bool EventNotifierImpl::isReadable() const +{ + return this->readable; +} diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h new file mode 100644 index 0000000000..d85f9979d2 --- /dev/null +++ b/cpp/src/qmf/EventNotifierImpl.h @@ -0,0 +1,48 @@ +#ifndef __QMF_EVENT_NOTIFIER_IMPL_H +#define __QMF_EVENT_NOTIFIER_IMPL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/AgentSession.h" +#include "qmf/ConsoleSession.h" + +namespace qmf +{ + class EventNotifierImpl { + private: + bool readable; + AgentSession agent; + ConsoleSession console; + + public: + EventNotifierImpl(AgentSession& agentSession); + EventNotifierImpl(ConsoleSession& consoleSession); + virtual ~EventNotifierImpl(); + + void setReadable(bool readable); + bool isReadable() const; + + protected: + virtual void update(bool readable) = 0; + }; +} + +#endif + diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp new file mode 100644 index 0000000000..a364cc155d --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifier.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/posix/EventNotifier.h" +#include "qmf/PosixEventNotifierImpl.h" +#include "qmf/PrivateImplRef.h" + +using namespace qmf; +using namespace std; + +typedef qmf::PrivateImplRef<posix::EventNotifier> PI; + +posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); } + +posix::EventNotifier::EventNotifier(AgentSession& agentSession) +{ + PI::ctor(*this, new PosixEventNotifierImpl(agentSession)); +} + + +posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession) +{ + PI::ctor(*this, new PosixEventNotifierImpl(consoleSession)); +} + + +posix::EventNotifier::EventNotifier(const posix::EventNotifier& that) + : Handle<PosixEventNotifierImpl>() +{ + PI::copy(*this, that); +} + + +posix::EventNotifier::~EventNotifier() +{ + PI::dtor(*this); +} + +posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that) +{ + return PI::assign(*this, that); +} + + +int posix::EventNotifier::getHandle() const +{ + return impl->getHandle(); +} + diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp new file mode 100644 index 0000000000..011dbcc214 --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifierImpl.cpp @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "PosixEventNotifierImpl.h" +#include "qpid/log/Statement.h" + +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> + +#define BUFFER_SIZE 10 + +using namespace qmf; + +PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession) + : EventNotifierImpl(agentSession) +{ + openHandle(); +} + + +PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession) + : EventNotifierImpl(consoleSession) +{ + openHandle(); +} + + +PosixEventNotifierImpl::~PosixEventNotifierImpl() +{ + closeHandle(); +} + + +void PosixEventNotifierImpl::update(bool readable) +{ + char buffer[BUFFER_SIZE]; + + if(readable && !this->isReadable()) { + if (::write(myHandle, "1", 1) == -1) + QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno); + } + else if(!readable && this->isReadable()) { + if (::read(yourHandle, buffer, BUFFER_SIZE) == -1) + QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno); + } +} + + +void PosixEventNotifierImpl::openHandle() +{ + int pair[2]; + + if(::pipe(pair) == -1) + throw QmfException("Unable to open event notifier handle."); + + yourHandle = pair[0]; + myHandle = pair[1]; + + int flags; + + flags = ::fcntl(yourHandle, F_GETFL); + if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1) + throw QmfException("Unable to make remote handle non-blocking."); + + flags = ::fcntl(myHandle, F_GETFL); + if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1) + throw QmfException("Unable to make local handle non-blocking."); +} + + +void PosixEventNotifierImpl::closeHandle() +{ + if(myHandle > 0) { + ::close(myHandle); + myHandle = -1; + } + + if(yourHandle > 0) { + ::close(yourHandle); + yourHandle = -1; + } +} + + +PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier) +{ + return *notifier.impl; +} + + +const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier) +{ + return *notifier.impl; +} + diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h new file mode 100644 index 0000000000..c8a7446bd5 --- /dev/null +++ b/cpp/src/qmf/PosixEventNotifierImpl.h @@ -0,0 +1,61 @@ +#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H +#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/posix/EventNotifier.h" +#include "qmf/EventNotifierImpl.h" +#include "qpid/RefCounted.h" + +namespace qmf +{ + class AgentSession; + class ConsoleSession; + + class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted + { + public: + PosixEventNotifierImpl(AgentSession& agentSession); + PosixEventNotifierImpl(ConsoleSession& consoleSession); + virtual ~PosixEventNotifierImpl(); + + int getHandle() const { return yourHandle; } + + private: + int myHandle; + int yourHandle; + + void openHandle(); + void closeHandle(); + + protected: + void update(bool readable); + }; + + struct PosixEventNotifierImplAccess + { + static PosixEventNotifierImpl& get(posix::EventNotifier& notifier); + static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier); + }; + +} + +#endif + diff --git a/cpp/src/qmf/PrivateImplRef.h b/cpp/src/qmf/PrivateImplRef.h index 8b698c4199..960cbb2e09 100644 --- a/cpp/src/qmf/PrivateImplRef.h +++ b/cpp/src/qmf/PrivateImplRef.h @@ -23,8 +23,8 @@ */ #include "qmf/ImportExport.h" -#include <boost/intrusive_ptr.hpp> #include "qpid/RefCounted.h" +#include <boost/intrusive_ptr.hpp> namespace qmf { diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp index ab65b8d768..41dd9ff00c 100644 --- a/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/cpp/src/qmf/engine/ResilientConnection.cpp @@ -334,8 +334,7 @@ void ResilientConnectionImpl::notify() { if (notifyFd != -1) { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); + (void) ::write(notifyFd, ".", 1); } } @@ -432,8 +431,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k if (notifyFd != -1) { - int unused_ret; //Suppress warnings about ignoring return value. - unused_ret = ::write(notifyFd, ".", 1); + (void) ::write(notifyFd, ".", 1); } } diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp index e0948a9911..9d363d3012 100644 --- a/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/cpp/src/qmf/engine/SchemaImpl.cpp @@ -35,17 +35,17 @@ using qpid::framing::Uuid; SchemaHash::SchemaHash() { for (int idx = 0; idx < 16; idx++) - hash[idx] = 0x5A; + hash.b[idx] = 0x5A; } void SchemaHash::encode(Buffer& buffer) const { - buffer.putBin128(hash); + buffer.putBin128(hash.b); } void SchemaHash::decode(Buffer& buffer) { - buffer.getBin128(hash); + buffer.getBin128(hash.b); } void SchemaHash::update(uint8_t data) @@ -55,9 +55,8 @@ void SchemaHash::update(uint8_t data) void SchemaHash::update(const char* data, uint32_t len) { - uint64_t* first = (uint64_t*) hash; - uint64_t* second = (uint64_t*) hash + 1; - + uint64_t* first = &hash.q[0]; + uint64_t* second = &hash.q[1]; for (uint32_t idx = 0; idx < len; idx++) { *first = *first ^ (uint64_t) data[idx]; *second = *second << 1; diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h index 8b079a5ec6..683fb6f8f0 100644 --- a/cpp/src/qmf/engine/SchemaImpl.h +++ b/cpp/src/qmf/engine/SchemaImpl.h @@ -35,7 +35,10 @@ namespace engine { // they've been registered. class SchemaHash { - uint8_t hash[16]; + union h { + uint8_t b[16]; + uint64_t q[2]; + } hash; public: SchemaHash(); void encode(qpid::framing::Buffer& buffer) const; @@ -47,7 +50,7 @@ namespace engine { void update(Direction d) { update((uint8_t) d); } void update(Access a) { update((uint8_t) a); } void update(bool b) { update((uint8_t) (b ? 1 : 0)); } - const uint8_t* get() const { return hash; } + const uint8_t* get() const { return hash.b; } bool operator==(const SchemaHash& other) const; bool operator<(const SchemaHash& other) const; bool operator>(const SchemaHash& other) const; |