diff options
Diffstat (limited to 'components/service_edge/src/service_edge_rpc.erl')
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 32 |
1 files changed, 27 insertions, 5 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index bee503e..4a61088 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -39,13 +39,15 @@ -export([service_available/3, service_unavailable/3]). - +-export([record_fields/1]). %%-include_lib("lhttpc/include/lhttpc.hrl"). -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). +-include_lib("trace_runner/include/trace_runner.hrl"). + -define(SERVER, ?MODULE). @@ -62,7 +64,10 @@ url = undefined %% URL where the service can be reached. }). - +record_fields(service_entry) -> record_info(fields, service_entry); +record_fields(st ) -> record_info(fields, st); +record_fields(component_spec) -> record_info(fields, component_spec); +record_fields(_) -> no. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -180,6 +185,7 @@ start_websocket() -> %% Invoked by service_discovery to announce service availability %% Must be handled either as a JSON-RPC call or a gen_server call. service_available(CompSpec, SvcName, DataLinkModule) -> + ?event({service_available, SvcName}), rvi_common:notification(service_edge, ?MODULE, service_available, [{ service, SvcName }, @@ -187,12 +193,14 @@ service_available(CompSpec, SvcName, DataLinkModule) -> service_unavailable(CompSpec, SvcName, DataLinkModule) -> + ?event({service_unavailable, SvcName}), rvi_common:notification(service_edge, ?MODULE, service_unavailable, [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) -> + ?event({handle_remote_message, [Conn, SvcName, Timeout, Params]}), {IP, Port} = Conn, rvi_common:notification(service_edge, ?MODULE, handle_remote_message, @@ -207,6 +215,7 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) -> %% A message originated from a locally connected service %% has timed out handle_local_timeout(CompSpec, SvcName, TransID) -> + ?event({handle_local_timeout, [SvcName, TransID]}), rvi_common:notification(service_edge, ?SERVER, handle_local_timeout, [ { service, SvcName}, { transaction_id, TransID} ], @@ -220,6 +229,7 @@ handle_websocket(WSock, Mesg, Arg) -> ?debug("Failed decode of ~p: ~p", [Mesg, E0]), Mesg end, + ?event({handle_websocket, Decoded}), ?debug("Decoded Mesg = ~p", [Decoded]), { ok, Method } = rvi_common:get_json_element(["method"], Mesg), { ok, Params0 } = rvi_common:get_json_element(["params"], Mesg), @@ -248,6 +258,7 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), SvcName = iolist_to_binary(SvcName0), + ?event({message, ws, [SvcName, Timeout, Parameters]}), ?debug("WS Parameters: ~p", [Parameters]), %% Parameters = parse_ws_params(Parameters0), LogId = log_id_json_tail(Params ++ Parameters), @@ -270,6 +281,7 @@ handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), + ?event({register_service, ws, SvcName}), ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]), [ok, FullSvcName ] = gen_server:call(?SERVER, { rvi, @@ -283,6 +295,7 @@ handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) -> handle_ws_json_rpc(WSock, <<"unregister_service">>, Params, _Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), + ?event({unregister_service, ws, SvcName}), ?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]), gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}), { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}; @@ -310,6 +323,7 @@ handle_ws_json_rpc(_Ws , <<"get_available_services">>, _Params, _Arg ) -> %% handle_rpc(<<"register_service">>, Args) -> {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args), + ?event({register_service, json_rpc, SvcName}), {ok, URL} = rvi_common:get_json_element([<<"network_address">>], Args), [ok, FullSvcName ] = gen_server:call(?SERVER, { rvi, register_local_service, @@ -323,6 +337,7 @@ handle_rpc(<<"register_service">>, Args) -> handle_rpc(<<"unregister_service">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + ?event({unregister_service, json_rpc, SvcName}), gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName]}), {ok, [ { status, rvi_common:json_rpc_status(ok) }, { method, <<"unregister_service">>} @@ -341,6 +356,7 @@ handle_rpc(<<"message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), + ?event({message, json_rpc, [SvcName, Timeout, Parameters]}), LogId = log_id_json_tail(Args ++ Parameters), [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, [ SvcName, Timeout, Parameters | LogId]}), @@ -364,7 +380,7 @@ handle_notification(<<"service_available">>, Args) -> [ SvcName, DataLinkModule ]}), ok; -handle_notification("service_unavailable", Args) -> +handle_notification(<<"service_unavailable">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), ?debug("service_edge:service_unavailable(): service: ~p", [ SvcName]), @@ -514,6 +530,7 @@ handle_cast({rvi, handle_remote_message, Timeout, Parameters ] }, #st{cs = CS} = St) -> + ?event({handle_remote_message, [IP, Port, SvcName, Timeout]}, St), spawn(fun() -> handle_remote_message_( IP, Port, SvcName, Timeout, Parameters, CS) @@ -629,9 +646,10 @@ do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> LookupRes = ets:lookup(?SERVICE_TABLE, SvcName), ?debug("Service LookupRes = ~p", [LookupRes]), case LookupRes of - [ #service_entry { url = URL } ] -> %% SvcName is local. Forward message + [ #service_entry { url = URL } = E ] -> %% SvcName is local. Forward message ?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."), log("dispatch to ~s", [URL], CS), + ?event({matching_service_entry, E}), Res = forward_message_to_local_service(URL, SvcName, Parameters, @@ -743,7 +761,8 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> SvcName, CompSpec) catch Tag:Err -> - ?debug("Caught ~p:~p", [Tag,Err]) + ?error("Caught ~p:~p~n~p", + [Tag,Err,erlang:get_stacktrace()]) end end), timer:sleep(500), @@ -831,3 +850,6 @@ log_id_json_tail(Args) -> {error, _} -> [] end. + +event(_, _, _) -> + ok. |