summaryrefslogtreecommitdiff
path: root/components/service_edge/src/service_edge_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/service_edge/src/service_edge_rpc.erl')
-rw-r--r--components/service_edge/src/service_edge_rpc.erl32
1 files changed, 27 insertions, 5 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index bee503e..4a61088 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -39,13 +39,15 @@
-export([service_available/3,
service_unavailable/3]).
-
+-export([record_fields/1]).
%%-include_lib("lhttpc/include/lhttpc.hrl").
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
+-include_lib("trace_runner/include/trace_runner.hrl").
+
-define(SERVER, ?MODULE).
@@ -62,7 +64,10 @@
url = undefined %% URL where the service can be reached.
}).
-
+record_fields(service_entry) -> record_info(fields, service_entry);
+record_fields(st ) -> record_info(fields, st);
+record_fields(component_spec) -> record_info(fields, component_spec);
+record_fields(_) -> no.
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -180,6 +185,7 @@ start_websocket() ->
%% Invoked by service_discovery to announce service availability
%% Must be handled either as a JSON-RPC call or a gen_server call.
service_available(CompSpec, SvcName, DataLinkModule) ->
+ ?event({service_available, SvcName}),
rvi_common:notification(service_edge, ?MODULE,
service_available,
[{ service, SvcName },
@@ -187,12 +193,14 @@ service_available(CompSpec, SvcName, DataLinkModule) ->
service_unavailable(CompSpec, SvcName, DataLinkModule) ->
+ ?event({service_unavailable, SvcName}),
rvi_common:notification(service_edge, ?MODULE,
service_unavailable,
[{ service, SvcName },
{ data_link_module, DataLinkModule }], CompSpec).
handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) ->
+ ?event({handle_remote_message, [Conn, SvcName, Timeout, Params]}),
{IP, Port} = Conn,
rvi_common:notification(service_edge, ?MODULE,
handle_remote_message,
@@ -207,6 +215,7 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) ->
%% A message originated from a locally connected service
%% has timed out
handle_local_timeout(CompSpec, SvcName, TransID) ->
+ ?event({handle_local_timeout, [SvcName, TransID]}),
rvi_common:notification(service_edge, ?SERVER, handle_local_timeout,
[ { service, SvcName},
{ transaction_id, TransID} ],
@@ -220,6 +229,7 @@ handle_websocket(WSock, Mesg, Arg) ->
?debug("Failed decode of ~p: ~p", [Mesg, E0]),
Mesg
end,
+ ?event({handle_websocket, Decoded}),
?debug("Decoded Mesg = ~p", [Decoded]),
{ ok, Method } = rvi_common:get_json_element(["method"], Mesg),
{ ok, Params0 } = rvi_common:get_json_element(["params"], Mesg),
@@ -248,6 +258,7 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) ->
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
SvcName = iolist_to_binary(SvcName0),
+ ?event({message, ws, [SvcName, Timeout, Parameters]}),
?debug("WS Parameters: ~p", [Parameters]),
%% Parameters = parse_ws_params(Parameters0),
LogId = log_id_json_tail(Params ++ Parameters),
@@ -270,6 +281,7 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) ->
handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
+ ?event({register_service, ws, SvcName}),
?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]),
[ok, FullSvcName ] = gen_server:call(?SERVER,
{ rvi,
@@ -283,6 +295,7 @@ handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) ->
handle_ws_json_rpc(WSock, <<"unregister_service">>, Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
+ ?event({unregister_service, ws, SvcName}),
?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]),
gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}),
{ ok, [ { status, rvi_common:json_rpc_status(ok)} ]};
@@ -310,6 +323,7 @@ handle_ws_json_rpc(_Ws , <<"get_available_services">>, _Params, _Arg ) ->
%%
handle_rpc(<<"register_service">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args),
+ ?event({register_service, json_rpc, SvcName}),
{ok, URL} = rvi_common:get_json_element([<<"network_address">>], Args),
[ok, FullSvcName ] = gen_server:call(?SERVER,
{ rvi, register_local_service,
@@ -323,6 +337,7 @@ handle_rpc(<<"register_service">>, Args) ->
handle_rpc(<<"unregister_service">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ ?event({unregister_service, json_rpc, SvcName}),
gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName]}),
{ok, [ { status, rvi_common:json_rpc_status(ok) },
{ method, <<"unregister_service">>}
@@ -341,6 +356,7 @@ handle_rpc(<<"message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ ?event({message, json_rpc, [SvcName, Timeout, Parameters]}),
LogId = log_id_json_tail(Args ++ Parameters),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
[ SvcName, Timeout, Parameters | LogId]}),
@@ -364,7 +380,7 @@ handle_notification(<<"service_available">>, Args) ->
[ SvcName, DataLinkModule ]}),
ok;
-handle_notification("service_unavailable", Args) ->
+handle_notification(<<"service_unavailable">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
?debug("service_edge:service_unavailable(): service: ~p", [ SvcName]),
@@ -514,6 +530,7 @@ handle_cast({rvi, handle_remote_message,
Timeout,
Parameters
] }, #st{cs = CS} = St) ->
+ ?event({handle_remote_message, [IP, Port, SvcName, Timeout]}, St),
spawn(fun() ->
handle_remote_message_(
IP, Port, SvcName, Timeout, Parameters, CS)
@@ -629,9 +646,10 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) ->
LookupRes = ets:lookup(?SERVICE_TABLE, SvcName),
?debug("Service LookupRes = ~p", [LookupRes]),
case LookupRes of
- [ #service_entry { url = URL } ] -> %% SvcName is local. Forward message
+ [ #service_entry { url = URL } = E ] -> %% SvcName is local. Forward message
?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."),
log("dispatch to ~s", [URL], CS),
+ ?event({matching_service_entry, E}),
Res = forward_message_to_local_service(URL,
SvcName,
Parameters,
@@ -743,7 +761,8 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) ->
SvcName, CompSpec)
catch
Tag:Err ->
- ?debug("Caught ~p:~p", [Tag,Err])
+ ?error("Caught ~p:~p~n~p",
+ [Tag,Err,erlang:get_stacktrace()])
end
end),
timer:sleep(500),
@@ -831,3 +850,6 @@ log_id_json_tail(Args) ->
{error, _} ->
[]
end.
+
+event(_, _, _) ->
+ ok.