summaryrefslogtreecommitdiff
path: root/components/service_edge/src/service_edge_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/service_edge/src/service_edge_rpc.erl')
-rw-r--r--components/service_edge/src/service_edge_rpc.erl161
1 files changed, 111 insertions, 50 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index cff7fad..ccac474 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -12,11 +12,8 @@
-export([handle_rpc/2]).
-export([handle_notification/2]).
--export([wse_register_service/2]).
--export([wse_unregister_service/2]).
--export([wse_get_available_services/1]).
--export([wse_message/5]).
--export([wse_message/4]).
+-export([handle_websocket/3]).
+
-export([start_link/0]).
@@ -39,6 +36,7 @@
service_unavailable/3]).
+
%%-include_lib("lhttpc/include/lhttpc.hrl").
-include_lib("lager/include/log.hrl").
@@ -120,16 +118,18 @@ start_websocket() ->
Port ->
%% FIXME: MONITOR AND RESTART
- wse_server:start(Port, proplists:delete(port, WSOpts)),
+ wse_server:start(Port,
+ ?MODULE, handle_websocket, undefined,
+ [{type, text} | proplists:delete(port, WSOpts)]),
ok
end
end.
+
%% Invoked by service_discovery to announce service availability
%% Must be handled either as a JSON-RPC call or a gen_server call.
-
service_available(CompSpec, SvcName, DataLinkModule) ->
rvi_common:notification(service_edge, ?MODULE,
service_available,
@@ -164,44 +164,63 @@ handle_local_timeout(CompSpec, SvcName, TransID) ->
+handle_websocket(WSock, Mesg, Arg) ->
+ { ok, Method } = rvi_common:get_json_element(["method"], Mesg),
+ { ok, Params } = rvi_common:get_json_element(["params"], Mesg),
+ { ok, ID } = rvi_common:get_json_element(["id"], Mesg),
-%% Websocket interface
-wse_register_service(Ws, SvcName ) ->
- ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, SvcName ]),
- gen_server:call(?SERVER, { rvi, register_local_service, [ SvcName, "ws:" ++ pid_to_list(Ws)]}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
+ ?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]),
-wse_unregister_service(Ws, SvcName ) ->
- ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, SvcName ]),
- gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
+ case handle_ws_json_rpc(WSock, Method, {array,Params}, Arg) of
+ ok -> ok;
+ {ok, Reply} ->
+ EncReply = binary_to_list(iolist_to_binary(exo_json:encode({struct, [ { id, ID} |Reply]}))),
+ ?debug("service_edge_rpc:handle_websocket(~p/~p) reply: ~s", [ WSock, ID, EncReply]),
+ wse_server:send(WSock, list_to_binary(EncReply))
+ end,
+ ok.
-wse_get_available_services(_Ws ) ->
- ?debug("service_edge_rpc:wse_get_available_services()"),
- [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)},
- { services, Services}] }.
-
+%% Websocket interface
+handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
+ { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
+ { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
+ { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
-wse_message(Ws, SvcName, Timeout, JSONParameters) ->
- %% Parameters are delivered as JSON. Decode into tuple
- { ok, Parameters } = exo_json:decode_string(JSONParameters),
- ?debug("service_edge_rpc:wse_message(~p) SvcName: ~p", [ Ws, SvcName ]),
- ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]),
- ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]),
+ ?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]),
+ ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
+ ?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}),
+ [ SvcName, Timeout, [{struct, Parameters}]]}),
- ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ Ws, Res ]),
+ ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]),
{ ok, [ { status, rvi_common:json_rpc_status(Res) },
- { transaction_id, TID} ] }.
+ { transaction_id, TID} ] };
-%% Deprecated
-wse_message(Ws, SvcName, Timeout, JSONParameters, _CallingService) ->
- wse_message(Ws, SvcName, Timeout, JSONParameters).
+handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) ->
+ { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
+ ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]),
+ [ok, FullSvcName ] = gen_server:call(?SERVER,
+ { rvi,
+ register_local_service,
+ [ SvcName,
+ "ws:" ++ pid_to_list(WSock)]}),
+
+ { ok, [ { status, rvi_common:json_rpc_status(ok)},
+ { service, FullSvcName }]};
+handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) ->
+ { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
+ ?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]),
+ gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}),
+ { ok, [ { status, rvi_common:json_rpc_status(ok)} ]};
+
+handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) ->
+ ?debug("service_edge_rpc:websocket_get_available()"),
+ [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
+ { ok, [ { status, rvi_common:json_rpc_status(ok)},
+ { services, Services}] }.
%% Invoked by locally connected services.
@@ -312,10 +331,11 @@ handle_notification(Other, _Args) ->
%% connected services that uses the same HTTP port to transmit their
%% register_service, and message calls.
handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
- ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]),
- ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
+ ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]),
+ ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
FullSvcName = rvi_common:local_service_to_string(SvcName),
+ ?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]),
ets:insert(?SERVICE_TABLE, #service_entry {
service = FullSvcName,
@@ -348,6 +368,13 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
?debug("service_edge_rpc:get_available_services()"),
{reply, service_discovery_rpc:get_all_services(St#st.cs), St};
+
+%%CRASH13:43:57.370 [debug] service_edge_rpc:local_msg: parameters: [{struct,"{"value":"3"}"}]
+
+%%13:43:57.370 [debug] service_edge_rpc:local_msg: parameters: [{struct,"{"value":"3"}"}]
+%% [{struct,[{"a","b"}]}]
+%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
+
handle_call({ rvi, handle_local_message,
[SvcName, TimeoutArg, Parameters] }, _From, St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
@@ -418,11 +445,13 @@ handle_call(Other, _From, St) ->
handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) ->
+ ?debug("service_edge_rpc: Service available: ~p:", [ SvcName]),
announce_service_availability(available, SvcName),
{ noreply, St };
handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) ->
+ ?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]),
announce_service_availability(unavailable, SvcName),
{ noreply, St };
@@ -527,13 +556,23 @@ flatten_ws_args(Args) ->
flatten_ws_args(Args, []).
+json_rpc_notification(Method, Parameters) ->
+ iolist_to_binary(
+ exo_json:encode(
+ {struct,
+ [ { "json-rpc", "2.0"},
+ { "method", Method },
+ { "params", {struct, Parameters}}
+ ]})).
+
dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
{struct, [{ services, { array, Services}}]} ) ->
?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p",
[ WSPidStr, Services]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "services_available",
- [ "services", Services ]),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification("services_available",
+ [{"services", {array, Services}}])),
+ %% No reply
ok;
dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
@@ -541,24 +580,39 @@ dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p",
[ WSPidStr, Services]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "services_unavailable",
- [ "services", Services ]),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification("services_unavailable",
+ [{"services", {array, Services}}])),
ok;
dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
- {struct, [{ service_name, SvcName}, { parameters, Args}]} ) ->
+ {struct, [{ service_name, SvcName}, { parameters, [ { struct, Args} ]}]} ) ->
?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "message",
- [ "service_name", SvcName ] ++ flatten_ws_args(Args)),
- ?debug("service_edge:dispatch_to_local_service(message, websock): Done", []),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification("message",
+ [{ "service_name", SvcName}, {parameters, { struct, Args}}])),
+ %% No response expected.
+ ?debug("service_edge:dispatch_to_local_service(message, websock): Done"),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
+ {struct, [{ service_name, SvcName}, { parameters,{array,[{struct, Args}]}}]}) ->
+ ?info("service_edge:dispatch_to_local_service(message/alt, websock): ~p", [Args]),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification("message",
+ [{ "service_name", SvcName}, {parameters, { struct, Args}}])),
+ %% No response expected.
+ ?debug("service_edge:dispatch_to_local_service(message, websock): Done"),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | _WSPidStr], message, Other) ->
+ ?warning("service_edge:dispatch_to_local_service(message/alt, websock): UNKNOWN: ~p", [Other]),
ok;
%% Dispatch to regular JSON-RPC over HTTP.
dispatch_to_local_service(URL, Command, Args) ->
CmdStr = atom_to_list(Command),
- ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]),
+ ?debug("dispatch_to_local_service(): Command: ~p",[ Command]),
?debug("dispatch_to_local_service(): Args: ~p",[ Args]),
?debug("dispatch_to_local_service(): URL: ~p",[ URL]),
Res = rvi_common:send_json_request(URL, CmdStr, Args),
@@ -626,15 +680,22 @@ announce_service_availability(Available, SvcName) ->
[ #service_entry { url = URL } ] -> [URL];
[] -> []
end,
-
+ ets:foldl(fun(Term, _Acc) ->
+ ?debug("~p: ~p~n", [ ?SERVICE_TABLE, Term]),
+ ok
+ end, ok, ?SERVICE_TABLE),
+ ?debug("announce: service: ~p", [ SvcName]),
+ ?debug("announce: Block: ~p", [ BlockURLs]),
ets:foldl(
%% Notify if this is not the originating service.
fun(#service_entry { url = URL }, Acc) ->
%% If the URL is not on the blackout
%% list, send a notification
+ ?debug(" URL: ~p - Acc : ~p ", [ URL, Acc]),
case lists:member(URL, Acc) of
false ->
+ ?debug("DISPATCH: ~p: ~p", [ URL, Cmd]),
dispatch_to_local_service(URL, Cmd,
{struct, [ { services,
{ array, [SvcName]}