diff options
Diffstat (limited to 'components/service_edge/src/service_edge_rpc.erl')
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 161 |
1 files changed, 111 insertions, 50 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index cff7fad..ccac474 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -12,11 +12,8 @@ -export([handle_rpc/2]). -export([handle_notification/2]). --export([wse_register_service/2]). --export([wse_unregister_service/2]). --export([wse_get_available_services/1]). --export([wse_message/5]). --export([wse_message/4]). +-export([handle_websocket/3]). + -export([start_link/0]). @@ -39,6 +36,7 @@ service_unavailable/3]). + %%-include_lib("lhttpc/include/lhttpc.hrl"). -include_lib("lager/include/log.hrl"). @@ -120,16 +118,18 @@ start_websocket() -> Port -> %% FIXME: MONITOR AND RESTART - wse_server:start(Port, proplists:delete(port, WSOpts)), + wse_server:start(Port, + ?MODULE, handle_websocket, undefined, + [{type, text} | proplists:delete(port, WSOpts)]), ok end end. + %% Invoked by service_discovery to announce service availability %% Must be handled either as a JSON-RPC call or a gen_server call. - service_available(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(service_edge, ?MODULE, service_available, @@ -164,44 +164,63 @@ handle_local_timeout(CompSpec, SvcName, TransID) -> +handle_websocket(WSock, Mesg, Arg) -> + { ok, Method } = rvi_common:get_json_element(["method"], Mesg), + { ok, Params } = rvi_common:get_json_element(["params"], Mesg), + { ok, ID } = rvi_common:get_json_element(["id"], Mesg), -%% Websocket interface -wse_register_service(Ws, SvcName ) -> - ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, SvcName ]), - gen_server:call(?SERVER, { rvi, register_local_service, [ SvcName, "ws:" ++ pid_to_list(Ws)]}), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. + ?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]), -wse_unregister_service(Ws, SvcName ) -> - ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, SvcName ]), - gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. + case handle_ws_json_rpc(WSock, Method, {array,Params}, Arg) of + ok -> ok; + {ok, Reply} -> + EncReply = binary_to_list(iolist_to_binary(exo_json:encode({struct, [ { id, ID} |Reply]}))), + ?debug("service_edge_rpc:handle_websocket(~p/~p) reply: ~s", [ WSock, ID, EncReply]), + wse_server:send(WSock, list_to_binary(EncReply)) + end, + ok. -wse_get_available_services(_Ws ) -> - ?debug("service_edge_rpc:wse_get_available_services()"), - [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), - { ok, [ { status, rvi_common:json_rpc_status(ok)}, - { services, Services}] }. - +%% Websocket interface +handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> + { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), + { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), + { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), -wse_message(Ws, SvcName, Timeout, JSONParameters) -> - %% Parameters are delivered as JSON. Decode into tuple - { ok, Parameters } = exo_json:decode_string(JSONParameters), - ?debug("service_edge_rpc:wse_message(~p) SvcName: ~p", [ Ws, SvcName ]), - ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]), - ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]), + ?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]), + ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]), + ?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]), [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ SvcName, Timeout, Parameters]}), + [ SvcName, Timeout, [{struct, Parameters}]]}), - ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ Ws, Res ]), + ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]), { ok, [ { status, rvi_common:json_rpc_status(Res) }, - { transaction_id, TID} ] }. + { transaction_id, TID} ] }; -%% Deprecated -wse_message(Ws, SvcName, Timeout, JSONParameters, _CallingService) -> - wse_message(Ws, SvcName, Timeout, JSONParameters). +handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) -> + { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), + ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]), + [ok, FullSvcName ] = gen_server:call(?SERVER, + { rvi, + register_local_service, + [ SvcName, + "ws:" ++ pid_to_list(WSock)]}), + + { ok, [ { status, rvi_common:json_rpc_status(ok)}, + { service, FullSvcName }]}; +handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) -> + { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), + ?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]), + gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}), + { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}; + +handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) -> + ?debug("service_edge_rpc:websocket_get_available()"), + [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), + { ok, [ { status, rvi_common:json_rpc_status(ok)}, + { services, Services}] }. %% Invoked by locally connected services. @@ -312,10 +331,11 @@ handle_notification(Other, _Args) -> %% connected services that uses the same HTTP port to transmit their %% register_service, and message calls. handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> - ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]), - ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), + ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]), + ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), FullSvcName = rvi_common:local_service_to_string(SvcName), + ?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]), ets:insert(?SERVICE_TABLE, #service_entry { service = FullSvcName, @@ -348,6 +368,13 @@ handle_call({rvi, get_available_services, []}, _From, St) -> ?debug("service_edge_rpc:get_available_services()"), {reply, service_discovery_rpc:get_all_services(St#st.cs), St}; + +%%CRASH13:43:57.370 [debug] service_edge_rpc:local_msg: parameters: [{struct,"{"value":"3"}"}] + +%%13:43:57.370 [debug] service_edge_rpc:local_msg: parameters: [{struct,"{"value":"3"}"}] +%% [{struct,[{"a","b"}]}] +%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] + handle_call({ rvi, handle_local_message, [SvcName, TimeoutArg, Parameters] }, _From, St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), @@ -418,11 +445,13 @@ handle_call(Other, _From, St) -> handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) -> + ?debug("service_edge_rpc: Service available: ~p:", [ SvcName]), announce_service_availability(available, SvcName), { noreply, St }; handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) -> + ?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]), announce_service_availability(unavailable, SvcName), { noreply, St }; @@ -527,13 +556,23 @@ flatten_ws_args(Args) -> flatten_ws_args(Args, []). +json_rpc_notification(Method, Parameters) -> + iolist_to_binary( + exo_json:encode( + {struct, + [ { "json-rpc", "2.0"}, + { "method", Method }, + { "params", {struct, Parameters}} + ]})). + dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, {struct, [{ services, { array, Services}}]} ) -> ?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p", [ WSPidStr, Services]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "services_available", - [ "services", Services ]), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification("services_available", + [{"services", {array, Services}}])), + %% No reply ok; dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, @@ -541,24 +580,39 @@ dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, ?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p", [ WSPidStr, Services]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "services_unavailable", - [ "services", Services ]), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification("services_unavailable", + [{"services", {array, Services}}])), ok; dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, - {struct, [{ service_name, SvcName}, { parameters, Args}]} ) -> + {struct, [{ service_name, SvcName}, { parameters, [ { struct, Args} ]}]} ) -> ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "message", - [ "service_name", SvcName ] ++ flatten_ws_args(Args)), - ?debug("service_edge:dispatch_to_local_service(message, websock): Done", []), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification("message", + [{ "service_name", SvcName}, {parameters, { struct, Args}}])), + %% No response expected. + ?debug("service_edge:dispatch_to_local_service(message, websock): Done"), + ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, + {struct, [{ service_name, SvcName}, { parameters,{array,[{struct, Args}]}}]}) -> + ?info("service_edge:dispatch_to_local_service(message/alt, websock): ~p", [Args]), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification("message", + [{ "service_name", SvcName}, {parameters, { struct, Args}}])), + %% No response expected. + ?debug("service_edge:dispatch_to_local_service(message, websock): Done"), + ok; + +dispatch_to_local_service([ $w, $s, $: | _WSPidStr], message, Other) -> + ?warning("service_edge:dispatch_to_local_service(message/alt, websock): UNKNOWN: ~p", [Other]), ok; %% Dispatch to regular JSON-RPC over HTTP. dispatch_to_local_service(URL, Command, Args) -> CmdStr = atom_to_list(Command), - ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]), + ?debug("dispatch_to_local_service(): Command: ~p",[ Command]), ?debug("dispatch_to_local_service(): Args: ~p",[ Args]), ?debug("dispatch_to_local_service(): URL: ~p",[ URL]), Res = rvi_common:send_json_request(URL, CmdStr, Args), @@ -626,15 +680,22 @@ announce_service_availability(Available, SvcName) -> [ #service_entry { url = URL } ] -> [URL]; [] -> [] end, - + ets:foldl(fun(Term, _Acc) -> + ?debug("~p: ~p~n", [ ?SERVICE_TABLE, Term]), + ok + end, ok, ?SERVICE_TABLE), + ?debug("announce: service: ~p", [ SvcName]), + ?debug("announce: Block: ~p", [ BlockURLs]), ets:foldl( %% Notify if this is not the originating service. fun(#service_entry { url = URL }, Acc) -> %% If the URL is not on the blackout %% list, send a notification + ?debug(" URL: ~p - Acc : ~p ", [ URL, Acc]), case lists:member(URL, Acc) of false -> + ?debug("DISPATCH: ~p: ~p", [ URL, Cmd]), dispatch_to_local_service(URL, Cmd, {struct, [ { services, { array, [SvcName]} |