summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--lib/abstractroutingengine.h21
-rw-r--r--lib/vehicleproperty.h1
-rw-r--r--plugins/obd2plugin/obd2source.cpp200
-rw-r--r--plugins/obd2plugin/obd2source.h1
-rw-r--r--plugins/websocketsink/test/api.js30
-rw-r--r--plugins/websocketsink/websocketsinkmanager.cpp134
-rw-r--r--plugins/websocketsink/websocketsinkmanager.h1
-rw-r--r--plugins/websocketsourceplugin/websocketsource.cpp119
-rw-r--r--plugins/websocketsourceplugin/websocketsource.h1
10 files changed, 456 insertions, 54 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e0d30983..0d4faae8 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -6,7 +6,7 @@ set(CMAKE_BUILD_TYPE, Debug)
include(FindPkgConfig)
set(PROJECT_NAME "automotive-message-broker")
-set(PROJECT_VERSION "0.4.0")
+set(PROJECT_VERSION "0.5.0")
add_definitions(-DPROJECT_VERSION="${PROJECT_VERSION}")
add_definitions(-DPROJECT_NAME="${PROJECT_NAME}")
diff --git a/lib/abstractroutingengine.h b/lib/abstractroutingengine.h
index f39503e1..cca57b15 100644
--- a/lib/abstractroutingengine.h
+++ b/lib/abstractroutingengine.h
@@ -40,8 +40,13 @@ typedef std::function<void (AsyncRangePropertyReply*)> GetRangedPropertyComplete
class PropertyValueTime {
public:
+ ~PropertyValueTime()
+ {
+ delete value;
+ }
+
AbstractPropertyType* value;
- time_t timestamp;
+ double timestamp;
};
class AsyncPropertyRequest
@@ -122,8 +127,8 @@ public:
VehicleProperty::Property property;
GetRangedPropertyCompletedSignal completed;
- time_t begin;
- time_t end;
+ double begin;
+ double end;
};
class AsyncRangePropertyReply: public AsyncRangePropertyRequest
@@ -135,6 +140,16 @@ public:
}
+ ~AsyncRangePropertyReply()
+ {
+ for(auto itr = values.begin(); itr != values.end(); itr++)
+ {
+ delete (*itr);
+ }
+
+ values.clear();
+ }
+
std::list<PropertyValueTime*> values;
bool success;
};
diff --git a/lib/vehicleproperty.h b/lib/vehicleproperty.h
index 5b4348b6..faeebf59 100644
--- a/lib/vehicleproperty.h
+++ b/lib/vehicleproperty.h
@@ -26,6 +26,7 @@
#include <set>
#include <sstream>
#include <map>
+#include <functional>
#include <abstractpropertytype.h>
diff --git a/plugins/obd2plugin/obd2source.cpp b/plugins/obd2plugin/obd2source.cpp
index 3fe54b03..f2b4446f 100644
--- a/plugins/obd2plugin/obd2source.cpp
+++ b/plugins/obd2plugin/obd2source.cpp
@@ -121,8 +121,13 @@ void threadLoop(gpointer data)
ObdPid::ByteArray replyVector;
std::string reply;
std::string port;
+<<<<<<< HEAD
+ int baud;
+ bool connected = false;
+=======
std::string baud;
bool connected=false;
+>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece
while (true)
{
//gpointer query = g_async_queue_pop(privCommandQueue);
@@ -154,6 +159,50 @@ void threadLoop(gpointer data)
DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Command:" << req->req << endl;
if (req->req == "connect")
{
+<<<<<<< HEAD
+ //printf("First: %s\nSecond: %s\n",req->arg.substr(0,req->arg.find(':')).c_str(),req->arg.substr(req->arg.find(':')+1).c_str());
+ port = req->arg.substr(0,req->arg.find(':'));
+ baud = boost::lexical_cast<int>(req->arg.substr(req->arg.find(':')+1));
+ obd->openPort(port.c_str(),baud);
+
+ obd->sendObdRequestString("ATZ\r",4,&replyVector,500,3);
+ for (unsigned int i=0;i<replyVector.size();i++)
+ {
+ reply += replyVector[i];
+ }
+ if (reply.find("ELM") == -1)
+ {
+ //No reply found
+ //printf("Error!\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error resetting ELM\n";
+ }
+ else
+ {
+ //printf("Reply to reset: %s\n",reply.c_str());
+ }
+ if (!sendElmCommand(obd,"ATSP0"))
+ {
+ //printf("Error sending echo\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error setting auto protocol"<<endl;
+ }
+ if (!sendElmCommand(obd,"ATE0"))
+ {
+ //printf("Error sending echo\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off echo"<<endl;
+ }
+ if (!sendElmCommand(obd,"ATH0"))
+ {
+ //printf("Error sending headers off\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off headers"<<endl;
+ }
+ if (!sendElmCommand(obd,"ATL0"))
+ {
+ //printf("Error turning linefeeds off\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error turning off linefeeds"<<endl;
+ }
+ connected = true;
+ }
+=======
connect(obd,req->arglist[0],req->arglist[1]);
connected = true;
}
@@ -162,6 +211,7 @@ void threadLoop(gpointer data)
port = req->arglist[0];
baud = req->arglist[1];
}
+>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece
else if (req->req == "disconnect")
{
obd->closePort();
@@ -206,6 +256,63 @@ void threadLoop(gpointer data)
{
repeatReqList.push_back(*i);
}
+<<<<<<< HEAD
+ if (repeatReqList.size() == 0)
+ {
+ //Nothing in the queue, we should disconnect and sit idle.
+ if (connected)
+ {
+ ObdRequest *requ = new ObdRequest();
+ requ->req = "disconnect";
+ g_async_queue_push(privCommandQueue,requ);
+ }
+ }
+ else
+ {
+ if (!connected)
+ {
+ //Things in the request queue, but we aren't connected. Queue up a connect.
+ ObdRequest *requ = new ObdRequest();
+ requ->req = "connect";
+ requ->arg = port + ":" + baud;
+ g_async_queue_push(privCommandQueue,requ);
+ }
+ }
+ if (connected)
+ {
+ for (std::list<std::string>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++)
+ {
+ //printf("Req: %s\n",(*i).c_str());
+ if ((*i) == "ATRV\r")
+ {
+ //printf("Requesting voltage...\n");
+ if (!obd->sendObdRequestString((*i).c_str(),(*i).length(),&replyVector))
+ {
+ //printf("Unable to request voltage!!!\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unable to request voltage!\n";
+ continue;
+ }
+ std::string replystring = "";
+ for (int j=0;j<replyVector.size();j++)
+ {
+ replystring += replyVector[j];
+ }
+ //printf("Voltage reply: %s\n",replystring.c_str());
+ replystring.substr(0,replystring.find("V"));
+ ObdReply *rep = new ObdReply();
+ rep->req = "ATRV\r";
+ rep->reply = replystring;
+ g_async_queue_push(privResponseQueue,rep);
+ }
+ if (!obd->sendObdRequest((*i).c_str(),(*i).length(),&replyVector))
+ {
+ //printf("Error sending obd2 request\n");
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Error sending OBD2 request\n";
+ continue;
+ }
+ //printf("Reply: %i %i\n",replyVector[0],replyVector[1]);
+ if (replyVector[0] == 0x41)
+=======
for (std::list<ObdPid*>::iterator i=repeatReqList.begin();i!= repeatReqList.end();i++)
{
if (!obd->sendObdRequestString((*i)->pid.c_str(),(*i)->pid.length(),&replyVector))
@@ -264,18 +371,89 @@ void threadLoop(gpointer data)
//VIN number reply
string vinstring;
for (int j=0;j<replyVector.size();j++)
+>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece
{
- if(replyVector[j] == 0x49 && replyVector[j+1] == 0x02)
+ if (replyVector[1] == 0x0C)
{
- //We're at a reply header
- j+=3;
+ double rpm = ((replyVector[2] << 8) + replyVector[3]) / 4.0;
+ ObdReply *rep = new ObdReply();
+ rep->req = "0C";
+ rep->property = VehicleProperty::EngineSpeed;
+ rep->reply = boost::lexical_cast<string>(rpm);
+ g_async_queue_push(privResponseQueue,rep);
+ //printf("RPM: %f\n",rpm);
}
- if (replyVector[j] != 0x00)
+ else if (replyVector[1] == 0x0D)
{
- vinstring += (char)replyVector[j];
- //printf("VIN: %i %c\n",replyVector[j],replyVector[j]);
+ int mph = replyVector[2];
+ ObdReply *rep = new ObdReply();
+ rep->req = "0D";
+ rep->property = VehicleProperty::VehicleSpeed;
+ rep->reply = boost::lexical_cast<string>(mph);
+ g_async_queue_push(privResponseQueue,rep);
}
+ else if (replyVector[1] == 0x05)
+ {
+ int temp = replyVector[2] - 40;
+ ObdReply *rep = new ObdReply();
+ rep->req = "05";
+ rep->property = VehicleProperty::EngineCoolantTemperature;
+ rep->reply = boost::lexical_cast<string>(temp);
+ g_async_queue_push(privResponseQueue,rep);
+ }
+ else if (replyVector[1] == 0x10)
+ {
+ double maf = ((replyVector[2] << 8) + replyVector[3]) / 100.0;
+ ObdReply *rep = new ObdReply();
+ rep->req = "10";
+ rep->property = VehicleProperty::MassAirFlow;
+ rep->reply = boost::lexical_cast<string>(maf);
+ g_async_queue_push(privResponseQueue,rep);
+ }
+ else
+ {
+ //printf("Unknown response type: %i\n",replyVector[1]);
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Unknown response type" << replyVector[1] << endl;
+ }
+ }
+<<<<<<< HEAD
+ else if (replyVector[0] == 0x49)
+ {
+ /*
+ 49 02 01 00 00 00 31
+ 49 02 02 47 31 4A 43
+ 49 02 03 35 34 34 34
+ 49 02 04 52 37 32 35
+ 49 02 05 32 33 36 37
+ */
+ //VIN number reply
+ string vinstring;
+ for (int j=0;j<replyVector.size();j++)
+ {
+ if(replyVector[j] == 0x49 && replyVector[j+1] == 0x02)
+ {
+ //We're at a reply header
+ j+=3;
+ }
+ if (replyVector[j] != 0x00)
+ {
+ vinstring += (char)replyVector[j];
+ //printf("VIN: %i %c\n",replyVector[j],replyVector[j]);
+ }
+ }
+ ObdReply *rep = new ObdReply();
+ rep->req = "0902";
+ rep->reply = vinstring;
+ g_async_queue_push(privResponseQueue,rep);
+ //printf("VIN Number: %i %s\n",replyVector.size(),vinstring.c_str());
+
}
+
+ //DebugOut()<<"Reply: "<<replyVector[2]<<" "<<replyVector[3]<<endl;
+ }
+ }
+ else
+=======
/*ObdReply *rep = new ObdReply();
rep->req = "0902";
rep->reply = vinstring;
@@ -284,6 +462,7 @@ void threadLoop(gpointer data)
//DebugOut()<<"Reply: "<<replyVector[2]<<" "<<replyVector[3]<<endl;
}
if (!connected)
+>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece
{
usleep(10000);
}
@@ -651,8 +830,17 @@ void OBD2Source::unsubscribeToPropertyChanges(VehicleProperty::Property property
return;
}
+<<<<<<< HEAD
+ Obd2Amb obd2amb;
+ ObdRequest *requ = new ObdRequest();
+ requ->property = property;
+ requ->req = obd2amb.propertyPidMap[property];
+ g_async_queue_push(subscriptionRemoveQueue,requ);
+
+=======
ObdPid *pid = obd2AmbInstance->createPidforProperty(property);
g_async_queue_push(subscriptionRemoveQueue,pid);
+>>>>>>> 250035b8916580d198760b9e068ee39dce8bcece
}
diff --git a/plugins/obd2plugin/obd2source.h b/plugins/obd2plugin/obd2source.h
index a37b694a..6c628706 100644
--- a/plugins/obd2plugin/obd2source.h
+++ b/plugins/obd2plugin/obd2source.h
@@ -142,6 +142,7 @@ public:
std::string m_port;
std::string m_baud;
map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap;
+ list<VehicleProperty::Property> propertySubscriptionList;
void updateProperty(VehicleProperty::Property property,AbstractPropertyType *value);
obdLib * obd;
diff --git a/plugins/websocketsink/test/api.js b/plugins/websocketsink/test/api.js
index 3a6f3359..5c0d8362 100644
--- a/plugins/websocketsink/test/api.js
+++ b/plugins/websocketsink/test/api.js
@@ -44,6 +44,19 @@
* form of data[n].name/data[n].value
* errorCB: error callback, called with error message string
*
+* Function name: getHistory(event, startTime, endTime, successCB, errorCB)
+* Description:
+* Retrieves a list of event/value pairs for a target list of event names
+* Required arguments:
+* event: event to read
+* startTime: start date/time
+* endTime: end date/time
+* successCB: success callback, gets called with the event/value pair list
+* for all requested events. The list is the in the
+* form of data[n].name/data[n].value
+* errorCB: error callback, called with error message string
+*
+*
* Function name: set(eventlist, valuelist, successCB, errorCB)
* Description:
* Sets a gourp of event's values (triggers error on read-only events)
@@ -245,6 +258,19 @@ Vehicle.prototype.get = function(namelist, successCB, errorCB)
this.send(obj, successCB, errorCB);
}
+Vehicle.prototype.getHistory = function(event, startTime, endTime, successCB, errorCB)
+{
+ var obj = {
+ "type" : "method",
+ "name": "getHistory",
+ "transactionid" : this.generateTransactionId(),
+ "data" : [event, (startTime.getTime()/1000).toString(), (endTime.getTime()/1000).toString()]
+ };
+
+ this.send(obj, successCB, errorCB);
+
+}
+
Vehicle.prototype.set = function(namelist, valuelist, successCB, errorCB)
{
if((namelist.length != valuelist.length)||(namelist.length <= 0))
@@ -312,8 +338,8 @@ Vehicle.prototype.receive = function(msg)
return;
}
- if((event == undefined)||(event.type == undefined)||
- (event.name == undefined))
+ if((event === undefined)||(event.type === undefined)||
+ (event.name === undefined))
{
self.iErrorCB("BADLY FORMED MESSAGE: "+msg);
return;
diff --git a/plugins/websocketsink/websocketsinkmanager.cpp b/plugins/websocketsink/websocketsinkmanager.cpp
index 38755014..a9a2d608 100644
--- a/plugins/websocketsink/websocketsinkmanager.cpp
+++ b/plugins/websocketsink/websocketsinkmanager.cpp
@@ -80,6 +80,7 @@ void WebSocketSinkManager::setConfiguration(map<string, string> config)
}
context = libwebsocket_create_context(port, interface.c_str(), protocollist,libwebsocket_internal_extensions,ssl_cert_path, ssl_key_path, -1, -1, options);
}
+
void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id)
{
AsyncPropertyRequest velocityRequest;
@@ -122,7 +123,7 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
//TODO: Dirty hack hardcoded stuff, jsut to make it work.
string tmpstr = "";
tmpstr = property;
- s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"name\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
+ s << "{\"type\":\"methodReply\",\"name\":\"get\",\"data\":[{\"property\":\"" << tmpstr << "\",\"value\":\"" << reply->value->toString() << "\"}],\"transactionid\":\"" << id << "\"}";
string replystr = s.str();
//printf("Reply: %s\n",replystr.c_str());
@@ -136,11 +137,88 @@ void WebSocketSinkManager::addSingleShotSink(libwebsocket* socket, VehicleProper
//TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
//delete new_response; <- Unneeded. Apparently libwebsocket free's it.
delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
-
+ delete reply;
};
AsyncPropertyReply* reply = routingEngine->getPropertyAsync(velocityRequest);
}
+
+void WebSocketSinkManager::addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property, double start, double end, string id)
+{
+ AsyncRangePropertyRequest rangedRequest;
+
+ rangedRequest.begin = start;
+ rangedRequest.end = end;
+
+ if (property == "running_status_speedometer")
+ {
+ rangedRequest.property = VehicleProperty::VehicleSpeed;
+ }
+ else if (property == "running_status_engine_speed")
+ {
+ rangedRequest.property = VehicleProperty::EngineSpeed;
+ }
+ else if (property == "running_status_steering_wheel_angle")
+ {
+ rangedRequest.property = VehicleProperty::SteeringWheelAngle;
+ }
+ else if (property == "running_status_transmission_gear_status")
+ {
+ rangedRequest.property = VehicleProperty::TransmissionShiftPosition;
+ }
+ else
+ {
+ PropertyList foo = VehicleProperty::capabilities();
+ if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(property))
+ {
+ rangedRequest.property = property;
+ }
+ else
+ {
+ DebugOut(0)<<"websocketsink: Invalid property requested: "<<property;
+ return;
+ }
+
+ }
+ rangedRequest.completed = [socket,id](AsyncRangePropertyReply* reply)
+ {
+ stringstream s;
+
+ //TODO: Dirty hack hardcoded stuff, jsut to make it work.
+ stringstream data ("[");
+ std::list<PropertyValueTime*> values = reply->values;
+ for(auto itr = values.begin(); itr != values.end(); itr++)
+ {
+ if(itr != values.begin())
+ {
+ data<<",";
+ }
+
+ data << "{ \"value\" : " << "\"" << (*itr)->value->toString() << "\", \"time\" : \"" << (*itr)->timestamp << "\" }";
+ }
+
+ data<<"]";
+
+ s << "{\"type\":\"methodReply\",\"name\":\"getHistory\",\"data\":"<<data<<",\"transactionid\":\"" << id << "\"}";
+
+ string replystr = s.str();
+ //printf("Reply: %s\n",replystr.c_str());
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(socket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+
+ //TODO: run valgrind on this. libwebsocket's documentation says NOTHING about this, yet malloc insists it's true.
+ //delete new_response; <- Unneeded. Apparently libwebsocket free's it.
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING); //Needs to subtract pre-padding, to get back to the start of the pointer.
+ delete reply;
+ };
+
+ AsyncRangePropertyReply* reply = routingEngine->getRangePropertyAsync(rangedRequest);
+}
+
void WebSocketSinkManager::removeSink(libwebsocket* socket,VehicleProperty::Property property, string uuid)
{
if (m_sinkMap.find(property) != m_sinkMap.end())
@@ -397,7 +475,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
name = json_reader_get_string_value(reader);
json_reader_end_member(reader);
- list<string> data;
+ vector<string> data;
list<string> key;
list<string> value;
json_reader_read_member(reader,"data");
@@ -471,30 +549,6 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
//GetProperty is going to be a singleshot sink.
//string arg = arguments.front();
sinkManager->addSingleShotSink(wsi,data.front(),id);
- /*if (data.front()== "running_status_speedometer")
- {
- sinkManager->addSingleShotSink(wsi,VehicleProperty::VehicleSpeed,id);
- }
- else if (data.front() == "running_status_engine_speed")
- {
- sinkManager->addSingleShotSink(wsi,VehicleProperty::EngineSpeed,id);
- }
- else if (data.front() == "running_status_steering_wheel_angle")
- {
- sinkManager->addSingleShotSink(wsi,VehicleProperty::SteeringWheelAngle,id);
- }
- else if (data.front() == "running_status_transmission_gear_status")
- {
- sinkManager->addSingleShotSink(wsi,VehicleProperty::TransmissionShiftPosition,id);
- }
- else
- {
- PropertyList foo = VehicleProperty::capabilities();
- if (ListPlusPlus<VehicleProperty::Property>(&foo).contains(data.front()))
- {
- sinkManager->addSingleShotSink(wsi,data.front(),id);
- }
- }*/
}
else
{
@@ -531,7 +585,7 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
else if (name == "subscribe")
{
//Websocket wants to subscribe to an event, data.front();
- for (list<string>::iterator i=data.begin();i!=data.end();i++)
+ for (auto i=data.begin();i!=data.end();i++)
{
sinkManager->addSink(wsi,(*i),id);
}
@@ -539,11 +593,29 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
else if (name == "unsubscribe")
{
//Websocket wants to unsubscribe to an event, data.front();
- for (list<string>::iterator i=data.begin();i!=data.end();i++)
+ for (auto i=data.begin();i!=data.end();i++)
{
sinkManager->removeSink(wsi,(*i),id);
}
}
+ else if (name == "getHistory")
+ {
+ if(data.size() == 3)
+ {
+ std::string property = data[0];
+ std::string startStr = data[1];
+ std::string endStr = data[2];
+
+ sinkManager->addSingleShotRangedSink(wsi,property,
+ boost::lexical_cast<double,std::string>(startStr),
+ boost::lexical_cast<double,std::string>(endStr), id);
+ }
+
+ else
+ {
+ //TODO: error, "invalid arguments" should be sent in reply to this.
+ }
+ }
else if (name == "getSupportedEventTypes")
{
//If data.front() dosen't contain a property name, return a list of properties supported.
@@ -602,6 +674,10 @@ static int websocket_callback(struct libwebsocket_context *context,struct libweb
libwebsocket_write(wsi, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
}
+ else
+ {
+ DebugOut(0)<<"Unknown method called."<<endl;
+ }
}
break;
}
diff --git a/plugins/websocketsink/websocketsinkmanager.h b/plugins/websocketsink/websocketsinkmanager.h
index 56a9163b..4a13caa5 100644
--- a/plugins/websocketsink/websocketsinkmanager.h
+++ b/plugins/websocketsink/websocketsinkmanager.h
@@ -35,6 +35,7 @@ class WebSocketSinkManager: public AbstractSinkManager
public:
WebSocketSinkManager(AbstractRoutingEngine* engine, map<string, string> config);
void addSingleShotSink(libwebsocket* socket, VehicleProperty::Property property,string id);
+ void addSingleShotRangedSink(libwebsocket* socket, VehicleProperty::Property property,double start, double end, string id);
void addSink(libwebsocket* socket, VehicleProperty::Property property,string uuid);
void disconnectAll(libwebsocket* socket);
void removeSink(libwebsocket* socket,VehicleProperty::Property property,string uuid);
diff --git a/plugins/websocketsourceplugin/websocketsource.cpp b/plugins/websocketsourceplugin/websocketsource.cpp
index faec6456..2a564ab3 100644
--- a/plugins/websocketsourceplugin/websocketsource.cpp
+++ b/plugins/websocketsourceplugin/websocketsource.cpp
@@ -194,27 +194,71 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
json_reader_end_member(reader);
list<string> data;
- json_reader_read_member(reader,"data");
- if (json_reader_is_array(reader))
+ list<pair<string,string> > pairdata;
+ if (name == "get")
{
- for(int i=0; i < json_reader_count_elements(reader); i++)
+ json_reader_read_member(reader,"data");
+ if (json_reader_is_array(reader))
{
- json_reader_read_element(reader,i);
- string path = json_reader_get_string_value(reader);
- data.push_back(path);
- json_reader_end_element(reader);
+ for(int i=0; i < json_reader_count_elements(reader); i++)
+ {
+
+ pair<string,string> pair;
+ json_reader_read_element(reader,i);
+
+ json_reader_read_member(reader,"property");
+ pair.first = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+
+ json_reader_read_member(reader,"value");
+ pair.second = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+
+ json_reader_end_element(reader);
+
+ pairdata.push_back(pair);
+ }
+ }
+ else
+ {
+ pair<string,string> pair;
+
+ json_reader_read_member(reader,"property");
+ pair.first = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+
+ json_reader_read_member(reader,"value");
+ pair.second = json_reader_get_string_value(reader);
+ json_reader_end_member(reader);
+
+ pairdata.push_back(pair);
}
+ json_reader_end_member(reader);
}
else
{
- string path = json_reader_get_string_value(reader);
- if (path != "")
+ json_reader_read_member(reader,"data");
+ if (json_reader_is_array(reader))
+ {
+ for(int i=0; i < json_reader_count_elements(reader); i++)
+ {
+ json_reader_read_element(reader,i);
+ string path = json_reader_get_string_value(reader);
+ data.push_back(path);
+ json_reader_end_element(reader);
+ }
+ }
+ else
{
- data.push_back(path);
+ string path = json_reader_get_string_value(reader);
+ if (path != "")
+ {
+ data.push_back(path);
+ }
}
+ json_reader_end_member(reader);
}
- json_reader_end_member(reader);
-
+
string id;
json_reader_read_member(reader,"transactionid");
if (strcmp("gchararray",g_type_name(json_node_get_value_type(json_reader_get_value(reader)))) == 0)
@@ -282,6 +326,23 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
source->setSupported(props);
//m_re->updateSupported(m_supportedProperties,PropertyList());
}
+ else if (name == "get")
+ {
+
+ DebugOut() << __SMALLFILE__ << ":" << __LINE__ << "Got \"GET\" event:" << pairdata.size();
+ while (pairdata.size() > 0)
+ {
+ pair<string,string> pair = pairdata.front();
+ pairdata.pop_front();
+ if (source->propertyReplyMap.find(pair.first) != source->propertyReplyMap.end())
+ {
+ source->propertyReplyMap[pair.first]->value = VehicleProperty::getPropertyTypeForPropertyNameValue(source->propertyReplyMap[pair.first]->property,pair.second);
+ source->propertyReplyMap[pair.first]->completed(source->propertyReplyMap[pair.first]);
+ source->propertyReplyMap.erase(pair.first);
+ }
+ }
+ //data will contain a property/value map.
+ }
}
break;
}
@@ -355,6 +416,25 @@ void WebSocketSource::unsubscribeToPropertyChanges(VehicleProperty::Property pro
void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
{
///TODO: fill in
+ //s << "{\"type\":\"method\",\"name\":\"getSupportedEventTypes\",\"data\":[],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+ //m_re->getPropertyAsync();
+ /*reply.value = 1;
+ reply->completed(reply);
+ reply->completed = [](AsyncPropertyReply* reply) {
+ DebugOut()<<"Velocity Async request completed: "<<reply->value->toString()<<endl;
+ delete reply;
+ };*/
+ propertyReplyMap[reply->property] = reply;
+ stringstream s;
+ s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+ //printf("Reply: %s\n",replystr.c_str());
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
}
void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
@@ -365,7 +445,20 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
AsyncPropertyReply * WebSocketSource::setProperty( AsyncSetPropertyRequest request )
{
///TODO: fill in
- return NULL;
+ AsyncPropertyReply* reply = new AsyncPropertyReply(request);
+ reply->success = true;
+ stringstream s;
+ s << "{\"type\":\"method\",\"name\":\"set\",\"data\":[\"property\" : \"" << request.property << "\",\"value\" : \"" << request.value << "\"],\"transactionid\":\"" << "d293f670-f0b3-11e1-aff1-0800200c9a66" << "\"}";
+ string replystr = s.str();
+ DebugOut() << __SMALLFILE__ <<":"<< __LINE__ << "Reply:" << replystr << "\n";
+ //printf("Reply: %s\n",replystr.c_str());
+ char *new_response = new char[LWS_SEND_BUFFER_PRE_PADDING + strlen(replystr.c_str()) + LWS_SEND_BUFFER_POST_PADDING];
+ new_response+=LWS_SEND_BUFFER_PRE_PADDING;
+ strcpy(new_response,replystr.c_str());
+ libwebsocket_write(clientsocket, (unsigned char*)new_response, strlen(new_response), LWS_WRITE_TEXT);
+ delete (char*)(new_response-LWS_SEND_BUFFER_PRE_PADDING);
+ reply->completed(reply);
+ return reply;
}
extern "C" AbstractSource * create(AbstractRoutingEngine* routingengine, map<string, string> config)
diff --git a/plugins/websocketsourceplugin/websocketsource.h b/plugins/websocketsourceplugin/websocketsource.h
index 7854c806..3eaaaa58 100644
--- a/plugins/websocketsourceplugin/websocketsource.h
+++ b/plugins/websocketsourceplugin/websocketsource.h
@@ -50,6 +50,7 @@ public:
void propertyChanged(VehicleProperty::Property property, AbstractPropertyType* value, string uuid) {}
void supportedChanged(PropertyList) {}
void setConfiguration(map<string, string> config);
+ map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap;
private:
PropertyList m_supportedProperties;