summaryrefslogtreecommitdiff
path: root/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf')
-rw-r--r--cpp/src/qmf/Agent.cpp29
-rw-r--r--cpp/src/qmf/AgentImpl.h1
-rw-r--r--cpp/src/qmf/AgentSession.cpp229
-rw-r--r--cpp/src/qmf/AgentSessionImpl.h175
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp125
-rw-r--r--cpp/src/qmf/ConsoleSessionImpl.h22
-rw-r--r--cpp/src/qmf/DataAddr.cpp6
-rw-r--r--cpp/src/qmf/DataAddrImpl.h4
-rw-r--r--cpp/src/qmf/EventNotifierImpl.cpp56
-rw-r--r--cpp/src/qmf/EventNotifierImpl.h48
-rw-r--r--cpp/src/qmf/PosixEventNotifier.cpp65
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.cpp112
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.h61
-rw-r--r--cpp/src/qmf/PrivateImplRef.h2
-rw-r--r--cpp/src/qmf/engine/ResilientConnection.cpp6
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.cpp11
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.h7
17 files changed, 760 insertions, 199 deletions
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp
index 915f2a1c88..684f8e4fba 100644
--- a/cpp/src/qmf/Agent.cpp
+++ b/cpp/src/qmf/Agent.cpp
@@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
- sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
+ sender(session.directSender), schemaCache(s.schemaCache)
{
}
@@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const
ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator;
+ uint32_t correlator(session.correlator());
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
uint32_t AgentImpl::queryAsync(const Query& query)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
sendQuery(query, correlator);
return correlator;
@@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text)
ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator;
+ uint32_t correlator(session.correlator());
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg
uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
sendMethod(method, args, addr, correlator);
return correlator;
@@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
void AgentImpl::sendSchemaRequest(const SchemaId& id)
{
- uint32_t correlator;
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- correlator = nextCorrelator++;
- }
+ uint32_t correlator(session.correlator());
if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
Query query(QUERY_SCHEMA, id);
diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h
index 7fa4f4373a..09754a3a7e 100644
--- a/cpp/src/qmf/AgentImpl.h
+++ b/cpp/src/qmf/AgentImpl.h
@@ -99,7 +99,6 @@ namespace qmf {
uint32_t capability;
qpid::messaging::Sender sender;
qpid::types::Variant::Map attributes;
- uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
boost::shared_ptr<SchemaCache> schemaCache;
mutable std::set<std::string> packageSet;
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 4c5a72a467..251c25fd44 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -19,132 +19,7 @@
*
*/
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/AgentEventImpl.h"
-#include "qmf/SchemaIdImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/DataAddrImpl.h"
-#include "qmf/DataImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/agentCapability.h"
-#include "qmf/constants.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/AddressParser.h"
-#include "qpid/management/Buffer.h"
-#include <queue>
-#include <map>
-#include <set>
-#include <iostream>
-#include <memory>
-
-using namespace std;
-using namespace qpid::messaging;
-using namespace qmf;
-using qpid::types::Variant;
-
-namespace qmf {
- class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
- public:
- ~AgentSessionImpl();
-
- //
- // Methods from API handle
- //
- AgentSessionImpl(Connection& c, const string& o);
- void setDomain(const string& d) { checkOpen(); domain = d; }
- void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
- void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
- void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
- void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
- const string& getName() const { return agentName; }
- void open();
- void close();
- bool nextEvent(AgentEvent& e, Duration t);
-
- void registerSchema(Schema& s);
- DataAddr addData(Data& d, const string& n, bool persist);
- void delData(const DataAddr&);
-
- void authAccept(AgentEvent& e);
- void authReject(AgentEvent& e, const string& m);
- void raiseException(AgentEvent& e, const string& s);
- void raiseException(AgentEvent& e, const Data& d);
- void response(AgentEvent& e, const Data& d);
- void complete(AgentEvent& e);
- void methodSuccess(AgentEvent& e);
- void raiseEvent(const Data& d);
- void raiseEvent(const Data& d, int s);
-
- private:
- typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
- typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
-
- mutable qpid::sys::Mutex lock;
- qpid::sys::Condition cond;
- Connection connection;
- Session session;
- Sender directSender;
- Sender topicSender;
- string domain;
- Variant::Map attributes;
- Variant::Map options;
- string agentName;
- bool opened;
- queue<AgentEvent> eventQueue;
- qpid::sys::Thread* thread;
- bool threadCanceled;
- uint32_t bootSequence;
- uint32_t interval;
- uint64_t lastHeartbeat;
- uint64_t lastVisit;
- bool forceHeartbeat;
- bool externalStorage;
- bool autoAllowQueries;
- bool autoAllowMethods;
- uint32_t maxSubscriptions;
- uint32_t minSubInterval;
- uint32_t subLifetime;
- bool publicEvents;
- bool listenOnDirect;
- bool strictSecurity;
- uint64_t schemaUpdateTime;
- string directBase;
- string topicBase;
-
- SchemaMap schemata;
- DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
-
- void checkOpen();
- void setAgentName();
- void enqueueEvent(const AgentEvent&);
- void handleLocateRequest(const Variant::List& content, const Message& msg);
- void handleMethodRequest(const Variant::Map& content, const Message& msg);
- void handleQueryRequest(const Variant::Map& content, const Message& msg);
- void handleSchemaRequest(AgentEvent&);
- void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
- void dispatch(Message);
- void sendHeartbeat();
- void send(Message, const Address&);
- void flushResponses(AgentEvent&, bool);
- void periodicProcessing(uint64_t);
- void run();
- };
-}
-
-typedef qmf::PrivateImplRef<AgentSession> PI;
+#include "qmf/AgentSessionImpl.h"
AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
@@ -161,6 +36,7 @@ const string& AgentSession::getName() const { return impl->getName(); }
void AgentSession::open() { impl->open(); }
void AgentSession::close() { impl->close(); }
bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int AgentSession::pendingEvents() const { return impl->pendingEvents(); }
void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); }
DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); }
void AgentSession::delData(const DataAddr& a) { impl->delData(a); }
@@ -179,11 +55,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
//========================================================================================
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
+ connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
- listenOnDirect(true), strictSecurity(false),
+ listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -244,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > interval)
+ maxThreadWaitTime = interval;
}
@@ -252,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -260,6 +148,12 @@ void AgentSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
@@ -297,34 +191,47 @@ void AgentSessionImpl::open()
}
-void AgentSessionImpl::close()
+void AgentSessionImpl::closeAsync()
{
if (!opened)
return;
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
+void AgentSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
{
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty())
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -332,6 +239,26 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
}
+int AgentSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
void AgentSessionImpl::registerSchema(Schema& schema)
{
if (!schema.isFinalized())
@@ -587,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
qpid::sys::Mutex::ScopedLock l(lock);
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -1032,6 +961,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
}
+void AgentSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void AgentSessionImpl::run()
{
QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
@@ -1041,7 +977,7 @@ void AgentSessionImpl::run()
periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -1058,6 +994,19 @@ void AgentSessionImpl::run()
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
+
+AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
+{
+ return *session.impl;
+}
+
+
+const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
+{
+ return *session.impl;
+}
+
diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h
new file mode 100644
index 0000000000..ae512a4054
--- /dev/null
+++ b/cpp/src/qmf/AgentSessionImpl.h
@@ -0,0 +1,175 @@
+#ifndef __QMF_AGENT_SESSION_IMPL_H
+#define __QMF_AGENT_SESSION_IMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/AgentSession.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/DataImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
+
+#include <queue>
+#include <map>
+#include <iostream>
+#include <memory>
+
+using namespace std;
+using namespace qpid::messaging;
+using namespace qmf;
+using qpid::types::Variant;
+
+typedef qmf::PrivateImplRef<AgentSession> PI;
+
+namespace qmf {
+ class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
+ public:
+ ~AgentSessionImpl();
+
+ //
+ // Methods from API handle
+ //
+ AgentSessionImpl(Connection& c, const string& o);
+ void setDomain(const string& d) { checkOpen(); domain = d; }
+ void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
+ void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
+ void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
+ void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
+ const string& getName() const { return agentName; }
+ void open();
+ void closeAsync();
+ void close();
+ bool nextEvent(AgentEvent& e, Duration t);
+ int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* eventNotifier);
+ EventNotifierImpl* getEventNotifier() const;
+
+ void registerSchema(Schema& s);
+ DataAddr addData(Data& d, const string& n, bool persist);
+ void delData(const DataAddr&);
+
+ void authAccept(AgentEvent& e);
+ void authReject(AgentEvent& e, const string& m);
+ void raiseException(AgentEvent& e, const string& s);
+ void raiseException(AgentEvent& e, const Data& d);
+ void response(AgentEvent& e, const Data& d);
+ void complete(AgentEvent& e);
+ void methodSuccess(AgentEvent& e);
+ void raiseEvent(const Data& d);
+ void raiseEvent(const Data& d, int s);
+
+ private:
+ typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
+
+ mutable qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ Connection connection;
+ Session session;
+ Sender directSender;
+ Sender topicSender;
+ string domain;
+ Variant::Map attributes;
+ Variant::Map options;
+ string agentName;
+ bool opened;
+ queue<AgentEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
+ qpid::sys::Thread* thread;
+ bool threadCanceled;
+ uint32_t bootSequence;
+ uint32_t interval;
+ uint64_t lastHeartbeat;
+ uint64_t lastVisit;
+ bool forceHeartbeat;
+ bool externalStorage;
+ bool autoAllowQueries;
+ bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
+ uint32_t maxThreadWaitTime;
+ uint64_t schemaUpdateTime;
+ string directBase;
+ string topicBase;
+
+ SchemaMap schemata;
+ DataIndex globalIndex;
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
+
+ void checkOpen();
+ void setAgentName();
+ void enqueueEvent(const AgentEvent&);
+ void alertEventNotifierLH(bool readable);
+ void handleLocateRequest(const Variant::List& content, const Message& msg);
+ void handleMethodRequest(const Variant::Map& content, const Message& msg);
+ void handleQueryRequest(const Variant::Map& content, const Message& msg);
+ void handleSchemaRequest(AgentEvent&);
+ void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
+ void dispatch(Message);
+ void sendHeartbeat();
+ void send(Message, const Address&);
+ void flushResponses(AgentEvent&, bool);
+ void periodicProcessing(uint64_t);
+ void run();
+ };
+
+ struct AgentSessionImplAccess {
+ static AgentSessionImpl& get(AgentSession& session);
+ static const AgentSessionImpl& get(const AgentSession& session);
+ };
+}
+
+
+#endif
+
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index e12c1152f6..2dfc894c58 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -54,6 +54,7 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f);
void ConsoleSession::open() { impl->open(); }
void ConsoleSession::close() { impl->close(); }
bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
@@ -65,9 +66,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5),
- opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
- connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -91,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > 60)
+ maxThreadWaitTime = 60;
}
@@ -99,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -153,6 +166,12 @@ void ConsoleSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
// Establish messaging addresses
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
@@ -181,45 +200,57 @@ void ConsoleSessionImpl::open()
// Start the receiver thread
threadCanceled = false;
+ opened = true;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
if (agentQuery)
sendAgentLocate();
-
- opened = true;
}
-void ConsoleSessionImpl::close()
+void ConsoleSessionImpl::closeAsync()
{
if (!opened)
throw QmfException("The session is already closed");
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
+void ConsoleSessionImpl::close()
+{
+ closeAsync();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ thread = 0;
+ }
+}
+
+
bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
{
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty())
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
+ if (eventQueue.empty())
+ alertEventNotifierLH(false);
return true;
}
@@ -227,6 +258,27 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
}
+int ConsoleSessionImpl::pendingEvents() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventQueue.size();
+}
+
+
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ return eventNotifier;
+}
+
+
uint32_t ConsoleSessionImpl::getAgentCount() const
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -268,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
{
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify)
+ if (notify) {
cond.notify();
+ alertEventNotifierLH(true);
+ }
}
@@ -421,7 +475,23 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
iter = content.find("_values");
if (iter == content.end())
return;
- Variant::Map attrs(iter->second.asMap());
+ const Variant::Map& in_attrs(iter->second.asMap());
+ Variant::Map attrs;
+
+ //
+ // Copy the map from the message to "attrs". Translate any old-style
+ // keys to their new key values in the process.
+ //
+ for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
+ if (iter->first == "epoch")
+ attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
+ else if (iter->first == "timestamp")
+ attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
+ else if (iter->first == "heartbeat_interval")
+ attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
+ else
+ attrs[iter->first] = iter->second;
+ }
iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
@@ -562,6 +632,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
}
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+ if (eventNotifier)
+ eventNotifier->setReadable(readable);
+}
+
+
void ConsoleSessionImpl::run()
{
QPID_LOG(debug, "ConsoleSession thread started");
@@ -572,7 +649,7 @@ void ConsoleSessionImpl::run()
qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -589,6 +666,18 @@ void ConsoleSessionImpl::run()
enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "ConsoleSession thread exiting");
}
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+ return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+ return *session.impl;
+}
diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h
index 675c8bcfb5..e2b30602fa 100644
--- a/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/cpp/src/qmf/ConsoleSessionImpl.h
@@ -27,6 +27,7 @@
#include "qmf/SchemaId.h"
#include "qmf/Schema.h"
#include "qmf/ConsoleEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
#include "qmf/SchemaCache.h"
#include "qmf/Query.h"
#include "qpid/sys/Mutex.h"
@@ -41,9 +42,13 @@
#include "qpid/messaging/Address.h"
#include "qpid/management/Buffer.h"
#include "qpid/types/Variant.h"
+
+#include <boost/shared_ptr.hpp>
#include <map>
#include <queue>
+using namespace std;
+
namespace qmf {
class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
public:
@@ -56,8 +61,14 @@ namespace qmf {
void setDomain(const std::string& d) { domain = d; }
void setAgentFilter(const std::string& f);
void open();
+ void closeAsync();
void close();
bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
+ int pendingEvents() const;
+
+ void setEventNotifier(EventNotifierImpl* notifier);
+ EventNotifierImpl* getEventNotifier() const;
+
uint32_t getAgentCount() const;
Agent getAgent(uint32_t i) const;
Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -75,9 +86,11 @@ namespace qmf {
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
+ uint32_t maxThreadWaitTime;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
+ EventNotifierImpl* eventNotifier;
qpid::sys::Thread* thread;
bool threadCanceled;
uint64_t lastVisit;
@@ -89,6 +102,8 @@ namespace qmf {
std::string directBase;
std::string topicBase;
boost::shared_ptr<SchemaCache> schemaCache;
+ qpid::sys::Mutex corrlock;
+ uint32_t nextCorrelator;
void enqueueEvent(const ConsoleEvent&);
void enqueueEventLH(const ConsoleEvent&);
@@ -98,10 +113,17 @@ namespace qmf {
void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
+ void alertEventNotifierLH(bool readable);
void run();
+ uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
friend class AgentImpl;
};
+
+ struct ConsoleSessionImplAccess {
+ static ConsoleSessionImpl& get(ConsoleSession& session);
+ static const ConsoleSessionImpl& get(const ConsoleSession& session);
+ };
}
#endif
diff --git a/cpp/src/qmf/DataAddr.cpp b/cpp/src/qmf/DataAddr.cpp
index fb51d5787f..d16e12062e 100644
--- a/cpp/src/qmf/DataAddr.cpp
+++ b/cpp/src/qmf/DataAddr.cpp
@@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this); }
DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
+bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
+bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
@@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); }
uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
Variant::Map DataAddr::asMap() const { return impl->asMap(); }
-bool DataAddrImpl::operator==(const DataAddrImpl& other)
+bool DataAddrImpl::operator==(const DataAddrImpl& other) const
{
return
agentName == other.agentName &&
@@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other)
}
-bool DataAddrImpl::operator<(const DataAddrImpl& other)
+bool DataAddrImpl::operator<(const DataAddrImpl& other) const
{
if (agentName < other.agentName) return true;
if (agentName > other.agentName) return false;
diff --git a/cpp/src/qmf/DataAddrImpl.h b/cpp/src/qmf/DataAddrImpl.h
index 3f9cae9453..11d512f0c4 100644
--- a/cpp/src/qmf/DataAddrImpl.h
+++ b/cpp/src/qmf/DataAddrImpl.h
@@ -38,8 +38,8 @@ namespace qmf {
//
// Methods from API handle
//
- bool operator==(const DataAddrImpl&);
- bool operator<(const DataAddrImpl&);
+ bool operator==(const DataAddrImpl&) const;
+ bool operator<(const DataAddrImpl&) const;
DataAddrImpl(const qpid::types::Variant::Map&);
DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp
new file mode 100644
index 0000000000..20114aaa5e
--- /dev/null
+++ b/cpp/src/qmf/EventNotifierImpl.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/EventNotifierImpl.h"
+#include "qmf/AgentSessionImpl.h"
+#include "qmf/ConsoleSessionImpl.h"
+
+EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession)
+ : readable(false), agent(agentSession)
+{
+ AgentSessionImplAccess::get(agent).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession)
+ : readable(false), console(consoleSession)
+{
+ ConsoleSessionImplAccess::get(console).setEventNotifier(this);
+}
+
+
+EventNotifierImpl::~EventNotifierImpl()
+{
+ if (agent.isValid())
+ AgentSessionImplAccess::get(agent).setEventNotifier(NULL);
+ if (console.isValid())
+ ConsoleSessionImplAccess::get(console).setEventNotifier(NULL);
+}
+
+void EventNotifierImpl::setReadable(bool readable)
+{
+ update(readable);
+ this->readable = readable;
+}
+
+
+bool EventNotifierImpl::isReadable() const
+{
+ return this->readable;
+}
diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h
new file mode 100644
index 0000000000..d85f9979d2
--- /dev/null
+++ b/cpp/src/qmf/EventNotifierImpl.h
@@ -0,0 +1,48 @@
+#ifndef __QMF_EVENT_NOTIFIER_IMPL_H
+#define __QMF_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/AgentSession.h"
+#include "qmf/ConsoleSession.h"
+
+namespace qmf
+{
+ class EventNotifierImpl {
+ private:
+ bool readable;
+ AgentSession agent;
+ ConsoleSession console;
+
+ public:
+ EventNotifierImpl(AgentSession& agentSession);
+ EventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~EventNotifierImpl();
+
+ void setReadable(bool readable);
+ bool isReadable() const;
+
+ protected:
+ virtual void update(bool readable) = 0;
+ };
+}
+
+#endif
+
diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp
new file mode 100644
index 0000000000..a364cc155d
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifier.cpp
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/PosixEventNotifierImpl.h"
+#include "qmf/PrivateImplRef.h"
+
+using namespace qmf;
+using namespace std;
+
+typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
+
+posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); }
+
+posix::EventNotifier::EventNotifier(AgentSession& agentSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
+}
+
+
+posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession)
+{
+ PI::ctor(*this, new PosixEventNotifierImpl(consoleSession));
+}
+
+
+posix::EventNotifier::EventNotifier(const posix::EventNotifier& that)
+ : Handle<PosixEventNotifierImpl>()
+{
+ PI::copy(*this, that);
+}
+
+
+posix::EventNotifier::~EventNotifier()
+{
+ PI::dtor(*this);
+}
+
+posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that)
+{
+ return PI::assign(*this, that);
+}
+
+
+int posix::EventNotifier::getHandle() const
+{
+ return impl->getHandle();
+}
+
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp
new file mode 100644
index 0000000000..011dbcc214
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifierImpl.cpp
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "PosixEventNotifierImpl.h"
+#include "qpid/log/Statement.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+
+#define BUFFER_SIZE 10
+
+using namespace qmf;
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
+ : EventNotifierImpl(agentSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
+ : EventNotifierImpl(consoleSession)
+{
+ openHandle();
+}
+
+
+PosixEventNotifierImpl::~PosixEventNotifierImpl()
+{
+ closeHandle();
+}
+
+
+void PosixEventNotifierImpl::update(bool readable)
+{
+ char buffer[BUFFER_SIZE];
+
+ if(readable && !this->isReadable()) {
+ if (::write(myHandle, "1", 1) == -1)
+ QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno);
+ }
+ else if(!readable && this->isReadable()) {
+ if (::read(yourHandle, buffer, BUFFER_SIZE) == -1)
+ QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno);
+ }
+}
+
+
+void PosixEventNotifierImpl::openHandle()
+{
+ int pair[2];
+
+ if(::pipe(pair) == -1)
+ throw QmfException("Unable to open event notifier handle.");
+
+ yourHandle = pair[0];
+ myHandle = pair[1];
+
+ int flags;
+
+ flags = ::fcntl(yourHandle, F_GETFL);
+ if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make remote handle non-blocking.");
+
+ flags = ::fcntl(myHandle, F_GETFL);
+ if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
+ throw QmfException("Unable to make local handle non-blocking.");
+}
+
+
+void PosixEventNotifierImpl::closeHandle()
+{
+ if(myHandle > 0) {
+ ::close(myHandle);
+ myHandle = -1;
+ }
+
+ if(yourHandle > 0) {
+ ::close(yourHandle);
+ yourHandle = -1;
+ }
+}
+
+
+PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
+
+const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
+{
+ return *notifier.impl;
+}
+
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h
new file mode 100644
index 0000000000..c8a7446bd5
--- /dev/null
+++ b/cpp/src/qmf/PosixEventNotifierImpl.h
@@ -0,0 +1,61 @@
+#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/posix/EventNotifier.h"
+#include "qmf/EventNotifierImpl.h"
+#include "qpid/RefCounted.h"
+
+namespace qmf
+{
+ class AgentSession;
+ class ConsoleSession;
+
+ class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
+ {
+ public:
+ PosixEventNotifierImpl(AgentSession& agentSession);
+ PosixEventNotifierImpl(ConsoleSession& consoleSession);
+ virtual ~PosixEventNotifierImpl();
+
+ int getHandle() const { return yourHandle; }
+
+ private:
+ int myHandle;
+ int yourHandle;
+
+ void openHandle();
+ void closeHandle();
+
+ protected:
+ void update(bool readable);
+ };
+
+ struct PosixEventNotifierImplAccess
+ {
+ static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
+ static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
+ };
+
+}
+
+#endif
+
diff --git a/cpp/src/qmf/PrivateImplRef.h b/cpp/src/qmf/PrivateImplRef.h
index 8b698c4199..960cbb2e09 100644
--- a/cpp/src/qmf/PrivateImplRef.h
+++ b/cpp/src/qmf/PrivateImplRef.h
@@ -23,8 +23,8 @@
*/
#include "qmf/ImportExport.h"
-#include <boost/intrusive_ptr.hpp>
#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
namespace qmf {
diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp
index ab65b8d768..41dd9ff00c 100644
--- a/cpp/src/qmf/engine/ResilientConnection.cpp
+++ b/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -334,8 +334,7 @@ void ResilientConnectionImpl::notify()
{
if (notifyFd != -1)
{
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
+ (void) ::write(notifyFd, ".", 1);
}
}
@@ -432,8 +431,7 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
if (notifyFd != -1)
{
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
+ (void) ::write(notifyFd, ".", 1);
}
}
diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp
index e0948a9911..9d363d3012 100644
--- a/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -35,17 +35,17 @@ using qpid::framing::Uuid;
SchemaHash::SchemaHash()
{
for (int idx = 0; idx < 16; idx++)
- hash[idx] = 0x5A;
+ hash.b[idx] = 0x5A;
}
void SchemaHash::encode(Buffer& buffer) const
{
- buffer.putBin128(hash);
+ buffer.putBin128(hash.b);
}
void SchemaHash::decode(Buffer& buffer)
{
- buffer.getBin128(hash);
+ buffer.getBin128(hash.b);
}
void SchemaHash::update(uint8_t data)
@@ -55,9 +55,8 @@ void SchemaHash::update(uint8_t data)
void SchemaHash::update(const char* data, uint32_t len)
{
- uint64_t* first = (uint64_t*) hash;
- uint64_t* second = (uint64_t*) hash + 1;
-
+ uint64_t* first = &hash.q[0];
+ uint64_t* second = &hash.q[1];
for (uint32_t idx = 0; idx < len; idx++) {
*first = *first ^ (uint64_t) data[idx];
*second = *second << 1;
diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h
index 8b079a5ec6..683fb6f8f0 100644
--- a/cpp/src/qmf/engine/SchemaImpl.h
+++ b/cpp/src/qmf/engine/SchemaImpl.h
@@ -35,7 +35,10 @@ namespace engine {
// they've been registered.
class SchemaHash {
- uint8_t hash[16];
+ union h {
+ uint8_t b[16];
+ uint64_t q[2];
+ } hash;
public:
SchemaHash();
void encode(qpid::framing::Buffer& buffer) const;
@@ -47,7 +50,7 @@ namespace engine {
void update(Direction d) { update((uint8_t) d); }
void update(Access a) { update((uint8_t) a); }
void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
- const uint8_t* get() const { return hash; }
+ const uint8_t* get() const { return hash.b; }
bool operator==(const SchemaHash& other) const;
bool operator<(const SchemaHash& other) const;
bool operator>(const SchemaHash& other) const;