summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnders Svensson <anders@erlang.org>2019-05-10 11:55:10 +0200
committerAnders Svensson <anders@erlang.org>2019-05-29 15:47:37 +0200
commit596dc20ccd0613c664063ea90c602cb0ad46bfac (patch)
tree8035a4fdef6b4c8bffd04acf5b0e6e97d7cd9471
parentbd9faf298ae0d8525080b605a571fd7a9edb516c (diff)
downloaderlang-596dc20ccd0613c664063ea90c602cb0ad46bfac.tar.gz
Add ct_netconfc support for NETCONF 1.1
The 1.1 base capability can now be passed to hello/3, and the version selected for the session is as required by RFC 6241, 8.1 Capabilities Exchange: Both NETCONF peers MUST verify that the other peer has advertised a common protocol version. When comparing protocol version capability URIs, only the base part is used, in the event any parameters are encoded at the end of the URI string. If no protocol version capability in common is found, the NETCONF peer MUST NOT continue the session. If more than one protocol version URI in common is present, then the highest numbered (most recent) protocol version MUST be used by both peers. Negotiation of 1.1 (or any version greater than 1.0) results in message being chunked as required by RFC 6242, instead of the 1.0 framing (]]>]]>) specified by RFC 4742. For backwards compatibility, the 1.0 base capability is added to any capabilities specified by the user only if no base capability is specified. That is, no user-specified base capability results in 1.0 being added, otherwise it's up to the user to specify all base capabilities.
-rw-r--r--lib/common_test/doc/src/ct_netconfc.xml11
-rw-r--r--lib/common_test/src/ct_netconfc.erl899
2 files changed, 625 insertions, 285 deletions
diff --git a/lib/common_test/doc/src/ct_netconfc.xml b/lib/common_test/doc/src/ct_netconfc.xml
index 8fbe5f3df6..ae7488bc73 100644
--- a/lib/common_test/doc/src/ct_netconfc.xml
+++ b/lib/common_test/doc/src/ct_netconfc.xml
@@ -4,7 +4,7 @@
<erlref>
<header>
<copyright>
- <year>2010</year><year>2017</year>
+ <year>2010</year><year>2019</year>
<holder>Ericsson AB. All Rights Reserved.</holder>
</copyright>
<legalnotice>
@@ -37,11 +37,10 @@
<description>
- <p>NETCONF client module.</p>
-
- <p>The NETCONF client is compliant with RFC 4741 NETCONF Configuration
- Protocol and RFC 4742 Using the NETCONF Configuration Protocol over
- Secure SHell (SSH).</p>
+ <p>NETCONF client module compliant with RFC 6241, NETCONF Configuration
+ Protocol, and RFC 6242, Using the NETCONF Configuration Protocol over
+ Secure SHell (SSH), and with support for RFC 5277, NETCONF Event
+ Notifications.</p>
<marker id="Connecting"/>
<p><em>Connecting to a NETCONF server</em></p>
diff --git a/lib/common_test/src/ct_netconfc.erl b/lib/common_test/src/ct_netconfc.erl
index 37e9555343..bd1c993962 100644
--- a/lib/common_test/src/ct_netconfc.erl
+++ b/lib/common_test/src/ct_netconfc.erl
@@ -62,6 +62,8 @@
%%----------------------------------------------------------------------
-module(ct_netconfc).
+-dialyzer(no_improper_lists).
+
-include("ct_netconfc.hrl").
-include("ct_util.hrl").
-include_lib("xmerl/include/xmerl.hrl").
@@ -163,6 +165,9 @@
(is_atom(Xml) orelse (is_tuple(Xml) andalso is_atom(element(1,Xml))))).
-define(is_string(S), (is_list(S) andalso is_integer(hd(S)))).
+%% Keys into the process dictionary.
+-define(KEY(T), {?MODULE, T}).
+
%%----------------------------------------------------------------------
%% Records
%%----------------------------------------------------------------------
@@ -173,9 +178,9 @@
capabilities,
session_id,
msg_id = 1,
- hello_status,
- no_end_tag_buff = <<>>,
- buff = <<>>,
+ hello_status, % undefined | received | #pending{}
+ % string() | {error, Reason}
+ buf = false, % binary() | list() | boolean()
pending = [], % [#pending]
event_receiver}).% pid
@@ -859,6 +864,8 @@ kill_session(Client, SessionId, Timeout) ->
%% Callback functions
%%----------------------------------------------------------------------
+%% init/3
+
init(_KeyOrName,{CM,{Host,Port}},Options) ->
case ssh_channel(#connection{reference=CM,host=Host,port=Port},Options) of
{ok,Connection} ->
@@ -883,29 +890,33 @@ init(_KeyOrName,{_Host,_Port},Options) ->
{error,Reason}
end.
+%% terminate/2
terminate(_, #state{connection=Connection}) ->
ssh_close(Connection),
ok.
-handle_msg({hello, Options, Timeout}, From,
- #state{connection=Connection,hello_status=HelloStatus} = State) ->
+%% handle_msg/3
+
+%% Send hello and return to the caller only after reception of the
+%% server's hello.
+handle_msg({hello, Options, Timeout},
+ From,
+ #state{connection = Connection,
+ hello_status = HelloStatus}
+ = State) ->
case do_send(Connection, client_hello(Options)) of
- ok ->
- case HelloStatus of
- undefined ->
- {Ref,TRef} = set_request_timer(Timeout),
- {noreply, State#state{hello_status=#pending{tref=TRef,
- ref=Ref,
- caller=From}}};
- received ->
- {reply, ok, State#state{hello_status=done}};
- {error,Reason} ->
- {stop, {error,Reason}, State}
- end;
- Error ->
+ ok when HelloStatus == undefined -> %% server hello not yet received
+ {Ref, TRef} = set_request_timer(Timeout),
+ {noreply, State#state{hello_status = #pending{tref = TRef,
+ ref = Ref,
+ caller = From}}};
+ ok -> %% or yes: negotiate version
+ handle_capx(State);
+ Error ->
{stop, Error, State}
end;
+
handle_msg(get_ssh_connection, _From, #state{connection=Connection}=State) ->
Reply =
case Connection#connection.reference of
@@ -914,29 +925,40 @@ handle_msg(get_ssh_connection, _From, #state{connection=Connection}=State) ->
Connection#connection.port}}}
end,
{reply, Reply, State};
+
handle_msg(_, _From, #state{session_id=undefined} = State) ->
%% Hello is not yet excanged - this shall never happen
{reply,{error,waiting_for_hello},State};
+
handle_msg(get_capabilities, _From, #state{capabilities = Caps} = State) ->
{reply, Caps, State};
+
handle_msg(get_session_id, _From, #state{session_id = Id} = State) ->
{reply, Id, State};
-handle_msg({send, Timeout, SimpleXml}, From,
- #state{connection=Connection,pending=Pending} = State) ->
+
+handle_msg({send, Timeout, SimpleXml},
+ From,
+ #state{connection = Connection,
+ pending = Pending}
+ = State) ->
case do_send(Connection, SimpleXml) of
- ok ->
- {Ref,TRef} = set_request_timer(Timeout),
- {noreply, State#state{pending=[#pending{tref=TRef,
- ref=Ref,
- caller=From} | Pending]}};
- Error ->
- {reply, Error, State}
+ ok ->
+ {Ref, TRef} = set_request_timer(Timeout),
+ {noreply, State#state{pending = [#pending{tref = TRef,
+ ref = Ref,
+ caller = From}
+ | Pending]}};
+ Error ->
+ {reply, Error, State}
end;
+
handle_msg({send_rpc, SimpleXml, Timeout}, From, State) ->
do_send_rpc(undefined, SimpleXml, Timeout, From, State);
+
handle_msg({send_rpc_op, Op, Data, Timeout}, From, State) ->
SimpleXml = encode_rpc_operation(Op,Data),
do_send_rpc(Op, SimpleXml, Timeout, From, State);
+
handle_msg({get_event_streams=Op,Streams,Timeout}, From, State) ->
Filter = {netconf,?NETMOD_NOTIF_NAMESPACE_ATTR,
[{streams,[{stream,[{name,[Name]}]} || Name <- Streams]}]},
@@ -945,7 +967,9 @@ handle_msg({get_event_streams=Op,Streams,Timeout}, From, State) ->
handle_msg({ssh_cm, CM, {data, Ch, _Type, Data}}, State) ->
ssh_connection:adjust_window(CM,Ch,size(Data)),
+ log(State#state.connection, recv, Data),
handle_data(Data, State);
+
handle_msg({ssh_cm, _CM, _SshCloseMsg}, State) ->
%% _SshCloseMsg can probably be one of
%% {eof,Ch}
@@ -962,21 +986,28 @@ handle_msg({ssh_cm, _CM, _SshCloseMsg}, State) ->
%%! connection - due to terminate/2
{stop, State};
-handle_msg({Ref,timeout},
- #state{hello_status=#pending{ref=Ref,caller=Caller}} = State) ->
- ct_gen_conn:return(Caller,{error,{hello_session_failed,timeout}}),
- {stop,State#state{hello_status={error,timeout}}};
-handle_msg({Ref,timeout},#state{pending=Pending} = State) ->
- {value,#pending{op=Op,caller=Caller},Pending1} =
- lists:keytake(Ref,#pending.ref,Pending),
- ct_gen_conn:return(Caller,{error,timeout}),
+
+handle_msg({Ref, timeout}, #state{hello_status = #pending{ref = Ref,
+ caller = From}}
+ = State) ->
+ ct_gen_conn:return(From, {error, {hello_session_failed, timeout}}),
+ {stop, State#state{hello_status = {error,timeout}}};
+
+handle_msg({Ref, timeout}, #state{pending = Pending} = State) ->
+ {value, #pending{op = Op, caller = Caller}, Rest}
+ = lists:keytake(Ref, #pending.ref, Pending),
+ ct_gen_conn:return(Caller, {error, timeout}),
R = case Op of
- close_session -> stop;
- _ -> noreply
+ close_session -> stop;
+ _ -> noreply
end,
- %% Halfhearted try to get in correct state, this matches
- %% the implementation before this patch
- {R,State#state{pending=Pending1, no_end_tag_buff= <<>>, buff= <<>>}}.
+ %% Discard received bytes in hope that the server has sent an
+ %% incomplete message. Otherwise this is doomed to leave the
+ %% connection in an unusable state.
+ {R, State#state{pending = Rest,
+ buf = is_binary(State#state.buf)}}.
+
+%% close/1
%% Called by ct_util_server to close registered connections before terminate.
close(Client) ->
@@ -1097,14 +1128,77 @@ cancel_request_timer(Ref,TRef) ->
end.
%%%-----------------------------------------------------------------
-client_hello(Options) when is_list(Options) ->
- UserCaps = [{capability, UserCap} ||
- {capability, UserCap} <- Options,
- is_list(hd(UserCap))],
- {hello, ?NETCONF_NAMESPACE_ATTR,
- [{capabilities,
- [{capability,[?NETCONF_BASE_CAP++?NETCONF_BASE_CAP_VSN]}|
- UserCaps]}]}.
+
+%% client_hello/1
+%%
+%% Prepend the 1.0 base capability only if none is specified by the
+%% user. Store the versions in the process dictionary until they're
+%% examined upon reception of server capabilities in handle_capx/1.
+
+client_hello(Opts)
+ when is_list(Opts) ->
+ UserCaps = [T || {capability, C} = T <- Opts, is_list(hd(C))],
+ Vsns = versions(UserCaps),
+ put(?KEY(protocol_vsn), Vsns),
+ {hello,
+ ?NETCONF_NAMESPACE_ATTR,
+ [{capabilities, [{capability, [?NETCONF_BASE_CAP, ?NETCONF_BASE_CAP_VSN]}
+ || [] == Vsns]
+ ++ UserCaps}]}.
+
+%% versions/1
+%%
+%% Extract base protocol versions from capability options.
+
+versions(Opts) ->
+ [V || {capability, L} <- Opts,
+ S <- L,
+ ?NETCONF_BASE_CAP ++ X <- [lists:flatten(S)],
+ V <- [lists:takewhile(fun(C) -> C /= $? end, X)]].
+
+%% handle_capx/1
+%%
+%% Ignore parameters as RFC 6241 (NETCONF 1.1) requires in 8.1
+%% Capabilities Exchange. Be overly lenient with whitespace since RFC
+%% 6241 gives examples with significant trailing whitespace.
+
+handle_capx(#state{hello_status = received, capabilities = Caps} = S) ->
+ Remote = [V || ?NETCONF_BASE_CAP ++ X <- Caps,
+ [V|_] <- [string:lexemes(X, "? \t\r\n")]],
+ Local = erase(?KEY(protocol_vsn)),
+ case protocol_vsn(Local, Remote) of
+ false when Remote == [] ->
+ Reason = {incorrect_hello, no_base_capability_found},
+ {stop, {error, Reason}, S};
+ false ->
+ Reason = {incompatible_base_capability_vsn, lists:min(Remote)},
+ {stop, {error, Reason}, S};
+ Vsn ->
+ put(?KEY(chunk), Vsn /= "1.0"),
+ {reply, ok, rebuf(Vsn, S#state{hello_status = Vsn})}
+ end;
+
+handle_capx(#state{hello_status = {error, _} = No} = S) ->
+ {stop, No, S}.
+
+%% rebuf/2
+%%
+%% Turn the message buffer into a list for 1.1 chunking if the
+%% negotiated protocol version is > 1.0.
+
+rebuf("1.0", S) ->
+ S;
+
+rebuf(_, #state{buf = Bin} = S) ->
+ S#state{buf = [Bin, 3]}.
+
+%% protocol_vsn/2
+
+protocol_vsn([], Vsns) ->
+ protocol_vsn(["1.0"], Vsns);
+
+protocol_vsn(Local, Remote) ->
+ lists:max([false | [V || V <- Remote, lists:member(V, Local)]]).
%%%-----------------------------------------------------------------
@@ -1150,111 +1244,132 @@ maybe_element(Tag,Value) ->
%%%-----------------------------------------------------------------
%%% Send XML data to server
-do_send_rpc(PendingOp,SimpleXml,Timeout,Caller,
- #state{connection=Connection,msg_id=MsgId,pending=Pending} = State) ->
- case do_send_rpc(Connection, MsgId, SimpleXml) of
- ok ->
- {Ref,TRef} = set_request_timer(Timeout),
- {noreply, State#state{msg_id=MsgId+1,
- pending=[#pending{tref=TRef,
- ref=Ref,
- msg_id=MsgId,
- op=PendingOp,
- caller=Caller} | Pending]}};
- Error ->
- {reply, Error, State#state{msg_id=MsgId+1}}
+do_send_rpc(Op, SimpleXml, Timeout, Caller, #state{connection = Connection,
+ msg_id = MsgId,
+ pending = Pending}
+ = State) ->
+ Msg = {rpc,
+ [{'message-id', MsgId} | ?NETCONF_NAMESPACE_ATTR],
+ [SimpleXml]},
+ Next = MsgId + 1,
+ case do_send(Connection, Msg) of
+ ok ->
+ {Ref, TRef} = set_request_timer(Timeout),
+ Rec = #pending{tref = TRef,
+ ref = Ref,
+ msg_id = MsgId,
+ op = Op,
+ caller = Caller},
+ {noreply, State#state{msg_id = Next,
+ pending = [Rec | Pending]}};
+ Error ->
+ {reply, Error, State#state{msg_id = Next}}
end.
-do_send_rpc(Connection, MsgId, SimpleXml) ->
- do_send(Connection,
- {rpc,
- [{'message-id',MsgId} | ?NETCONF_NAMESPACE_ATTR],
- [SimpleXml]}).
+do_send(Connection, Simple) ->
+ ssh_send(Connection, frame(to_xml(Simple))).
-do_send(Connection, SimpleXml) ->
- Xml=to_xml_doc(SimpleXml),
- ssh_send(Connection, Xml).
-
-to_xml_doc(Simple) ->
+to_xml(Simple) ->
Prolog = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>",
- Xml = unicode:characters_to_binary(
- xmerl:export_simple([Simple],
- xmerl_xml,
- [#xmlAttribute{name=prolog,
- value=Prolog}])),
- <<Xml/binary,?END_TAG/binary>>.
+ Chars = xmerl:export_simple([Simple],
+ xmerl_xml,
+ [#xmlAttribute{name = prolog,
+ value = Prolog}]),
+ unicode:characters_to_binary(Chars).
+
+%% frame/1
+
+frame(Bin) ->
+ case get(?KEY(chunk)) of
+ true -> %% 1.1 chunking
+ [chunk(Bin) | "\n##\n"];
+ _ -> %% 1.0 framing
+ [Bin | ?END_TAG]
+ end.
+
+%% chunk/1
+%%
+%% Chunk randomly to exercise the server.
+
+chunk(<<>>) ->
+ [];
+
+chunk(Bin) ->
+ Sz = min(rand:uniform(1024), size(Bin)),
+ <<B:Sz/binary, Rest/binary>> = Bin,
+ ["\n#", integer_to_list(Sz), $\n, B | chunk(Rest)].
%%%-----------------------------------------------------------------
%%% Parse and handle received XML data
-%%% Two buffers are used:
-%%% * 'no_end_tag_buff' contains data that is checked and does not
-%%% contain any (part of an) end tag.
-%%% * 'buff' contains all other saved data - it may or may not
-%%% include (a part of) an end tag.
-%%% The reason for this is to avoid running binary:split/3 multiple
-%%% times on the same data when it does not contain an end tag. This
-%%% can be a considerable optimation in the case when a lot of data is
-%%% received (e.g. when fetching all data from a node) and the data is
-%%% sent in multiple ssh packages.
-handle_data(NewData,#state{connection=Connection} = State0) ->
- log(Connection,recv,NewData),
- NoEndTag0 = State0#state.no_end_tag_buff,
- Buff0 = State0#state.buff,
- Data = <<Buff0/binary, NewData/binary>>,
- case binary:split(Data,?END_TAG,[]) of
- [_NoEndTagFound] ->
- NoEndTagSize = case byte_size(Data) of
- Sz when Sz<5 -> 0;
- Sz -> Sz-5
- end,
- <<NoEndTag1:NoEndTagSize/binary,Buff/binary>> = Data,
- NoEndTag = <<NoEndTag0/binary,NoEndTag1/binary>>,
- {noreply, State0#state{no_end_tag_buff=NoEndTag, buff=Buff}};
- [FirstMsg0,Buff1] ->
- FirstMsg = remove_initial_nl(<<NoEndTag0/binary,FirstMsg0/binary>>),
- SaxArgs = [{event_fun,fun sax_event/3}, {event_state,[]}],
- case xmerl_sax_parser:stream(FirstMsg, SaxArgs) of
- {ok, Simple, _Thrash} ->
- case decode(Simple, State0#state{no_end_tag_buff= <<>>,
- buff=Buff1}) of
- {noreply, #state{buff=Buff} = State} when Buff =/= <<>> ->
- %% Recurse if we have more data in buffer
- handle_data(<<>>, State);
- Other ->
- Other
- end;
- {fatal_error,_Loc,Reason,_EndTags,_EventState} ->
- ?error(Connection#connection.name,
- [{parse_error,Reason},
- {buffer, Buff0},
- {new_data,NewData}]),
- handle_error(Reason, State0#state{no_end_tag_buff= <<>>,
- buff= <<>>})
- end
+
+handle_data(Bin, #state{buf = Head} = S) ->
+ case recv(Bin, Head) of
+ {error, Reason} ->
+ Conn = S#state.connection,
+ ?error(Conn#connection.name, [{receive_error, Reason},
+ {buffer, Head},
+ {bytes, Bin}]),
+ {stop, S};
+ {Bytes, Rest} ->
+ handle_more(Rest, handle_xml(Bytes, S));
+ Buf ->
+ {noreply, S#state{buf = Buf}}
end.
+%% handle_more/2
-%% xml does not accept a leading nl and some netconf server add a nl after
-%% each ?END_TAG, ignore them
-remove_initial_nl(<<"\n", Data/binary>>) ->
- remove_initial_nl(Data);
-remove_initial_nl(Data) ->
- Data.
-
-handle_error(Reason, State) ->
- Pending1 = case State#state.pending of
- [] -> [];
- Pending ->
- %% Assuming the first request gets the
- %% first answer
- P=#pending{tref=TRef,ref=Ref,caller=Caller} =
- lists:last(Pending),
- cancel_request_timer(Ref,TRef),
- Reason1 = {failed_to_parse_received_data,Reason},
- ct_gen_conn:return(Caller,{error,Reason1}),
- lists:delete(P,Pending)
- end,
- {noreply, State#state{pending=Pending1}}.
+handle_more(_, {stop, _} = No) ->
+ No;
+
+handle_more(Bin, {noreply, State}) ->
+ handle_data(Bin, State#state{buf = true == get(?KEY(chunk))}).
+
+%% handle_xml/2
+
+handle_xml(Bytes, State) ->
+ case parse(Bytes) of
+ {ok, Simple, _Rest} -> %% ignore trailing bytes
+ decode(Simple, State);
+ {fatal_error,_Loc,Reason,_EndTags,_EventState} ->
+ Conn = State#state.connection,
+ ?error(Conn#connection.name, [{parse_error, Reason},
+ {message, Bytes}]),
+ {noreply, handle_error(Reason, State)}
+ end.
+
+%% parse/1
+
+parse(Bytes) ->
+ xmerl_sax_parser:stream(<<>>, [{event_fun, fun sax_event/3},
+ {event_state, []},
+ {continuation_fun, fun cont/1},
+ {continuation_state, Bytes}]).
+
+%% cont/1
+
+cont([] = No) ->
+ {<<>>, No};
+
+cont([Bin | Rest]) ->
+ {Bin, Rest};
+
+cont(Bin) ->
+ {Bin, <<>>}.
+
+%% handle_error/2
+
+handle_error(_Reason, #state{pending = []} = State) ->
+ State;
+
+handle_error(Reason, #state{pending = Pending} = State) ->
+ %% Assuming the first request gets the first answer.
+ Rec = #pending{tref = TRef,
+ ref = Ref,
+ caller = Caller}
+ = lists:last(Pending),
+ cancel_request_timer(Ref, TRef),
+ ct_gen_conn:return(Caller,{error, {failed_to_parse_received_data, Reason}}),
+ State#state{pending = lists:delete(Rec, Pending)}.
%% Event function for the sax parser. It builds a simple XML structure.
%% Care is taken to keep namespace attributes and prefixes as in the original XML.
@@ -1305,136 +1420,182 @@ parse_attrs([]) ->
%%%-----------------------------------------------------------------
-%%% Decoding of parsed XML data
-decode({Tag,Attrs,_}=E, #state{connection=Connection,pending=Pending}=State) ->
- ConnName = Connection#connection.name,
- case get_local_name_atom(Tag) of
- 'rpc-reply' ->
- case get_msg_id(Attrs) of
- undefined ->
- case Pending of
- [#pending{msg_id=MsgId}] ->
- ?error(ConnName,[{warning,rpc_reply_missing_msg_id},
- {assuming,MsgId}]),
- decode_rpc_reply(MsgId,E,State);
- _ ->
- ?error(ConnName,[{error,rpc_reply_missing_msg_id}]),
- {noreply,State}
- end;
- MsgId ->
- decode_rpc_reply(MsgId,E,State)
- end;
- hello ->
- case State#state.hello_status of
- undefined ->
- case decode_hello(E) of
- {ok,SessionId,Capabilities} ->
- {noreply,State#state{session_id = SessionId,
- capabilities = Capabilities,
- hello_status = received}};
- {error,Reason} ->
- {noreply,State#state{hello_status = {error,Reason}}}
- end;
- #pending{tref=TRef,ref=Ref,caller=Caller} ->
- cancel_request_timer(Ref,TRef),
- case decode_hello(E) of
- {ok,SessionId,Capabilities} ->
- ct_gen_conn:return(Caller,ok),
- {noreply,State#state{session_id = SessionId,
- capabilities = Capabilities,
- hello_status = done}};
- {error,Reason} ->
- ct_gen_conn:return(Caller,{error,Reason}),
- {stop,State#state{hello_status={error,Reason}}}
- end;
- Other ->
- ?error(ConnName,[{got_unexpected_hello,E},
- {hello_status,Other}]),
- {noreply,State}
- end;
- notification ->
- EventReceiver = State#state.event_receiver,
- EventReceiver ! E,
- {noreply,State};
- Other ->
- %% Result of send/2, when not sending an rpc request - or
- %% if netconf server sends noise. Can handle this only if
- %% there is just one pending that matches (i.e. has
- %% undefined msg_id and op)
- case [P || P = #pending{msg_id=undefined,op=undefined} <- Pending] of
- [#pending{tref=TRef,ref=Ref,caller=Caller}] ->
- cancel_request_timer(Ref,TRef),
- ct_gen_conn:return(Caller,E),
- {noreply,State#state{pending=[]}};
- _ ->
- ?error(ConnName,[{got_unexpected_msg,Other},
- {expecting,Pending}]),
- {noreply,State}
- end
+%% decode/2
+%%
+%% Decode parsed (incoming) XML.
+
+decode({Tag, _, _} = E, #state{} = State) ->
+ case decode(get_local_name_atom(Tag), E, State) of
+ #state{} = S ->
+ {noreply, S};
+ {stop, #state{}} = T ->
+ T
end.
+%% decode/3
+
+decode('rpc-reply', {_, Attrs, _} = E, State) ->
+ decode_rpc_reply(get_msg_id(Attrs), E, State);
+
+%% Incoming hello, outgoing not yet sent.
+decode(hello, E, #state{hello_status = undefined} = State) ->
+ case decode_hello(E) of
+ {ok, SessionId, Capabilities} ->
+ State#state{session_id = SessionId,
+ capabilities = Capabilities,
+ hello_status = received};
+ {error, _Reason} = No ->
+ State#state{hello_status = No}
+ end;
+
+%% Incoming hello, outgoing already sent: negotiate protocol version.
+decode(hello, E, #state{hello_status = #pending{tref = TRef,
+ ref = Ref,
+ caller = From}}
+ = State) ->
+ cancel_request_timer(Ref, TRef),
+ case decode_hello(E) of
+ {ok, SessionId, Capabilities} ->
+ reply(From, handle_capx(State#state{session_id = SessionId,
+ capabilities = Capabilities,
+ hello_status = received}));
+ {error, _Reason} = No ->
+ ct_gen_conn:return(From, No),
+ {stop, State#state{hello_status = No}}
+ end;
+
+%% Duplicate hello: ignore.
+decode(hello, E, #state{hello_status = Other} = State) ->
+ ConnName = (State#state.connection)#connection.name,
+ ?error(ConnName, [{got_unexpected_hello, E},
+ {hello_status, Other}]),
+ State;
+
+decode(notification, E, State) ->
+ State#state.event_receiver ! E,
+ State;
+
+decode(Other, E, State) ->
+ decode_send({got_unexpected_msg, Other}, E, State).
+
+%% reply/2
+%%
+%% Explicitly send a reply that can't be returned.
+
+reply(From, {T, Res, State}) ->
+ ct_gen_conn:return(From, Res),
+ case T of
+ reply ->
+ State;
+ stop ->
+ {T, State}
+ end.
+
+%% get_msg_id/1
+
get_msg_id(Attrs) ->
- case lists:keyfind('message-id',1,Attrs) of
- {_,Str} ->
- list_to_integer(Str);
- false ->
- undefined
+ case find('message-id', Attrs) of
+ {_,Str} ->
+ list_to_integer(Str);
+ false ->
+ undefined
end.
-decode_rpc_reply(MsgId,{_,Attrs,Content0}=E,#state{pending=Pending} = State) ->
- case lists:keytake(MsgId,#pending.msg_id,Pending) of
- {value, #pending{tref=TRef,ref=Ref,op=Op,caller=Caller}, Pending1} ->
- cancel_request_timer(Ref,TRef),
- Content = forward_xmlns_attr(Attrs,Content0),
- {CallerReply,{ServerReply,State2}} =
- do_decode_rpc_reply(Op,Content,State#state{pending=Pending1}),
- ct_gen_conn:return(Caller,CallerReply),
- {ServerReply,State2};
- false ->
- %% Result of send/2, when receiving a correct
- %% rpc-reply. Can handle this only if there is just one
- %% pending that matches (i.e. has undefined msg_id and op)
- case [P || P = #pending{msg_id=undefined,op=undefined} <- Pending] of
- [#pending{tref=TRef,
- ref=Ref,
- msg_id=undefined,
- op=undefined,
- caller=Caller}] ->
- cancel_request_timer(Ref,TRef),
- ct_gen_conn:return(Caller,E),
- {noreply,State#state{pending=[]}};
- _ ->
- ConnName = (State#state.connection)#connection.name,
- ?error(ConnName,[{got_unexpected_msg_id,MsgId},
- {expecting,Pending}]),
- {noreply,State}
- end
+%% recode_rpc_reply/3
+
+decode_rpc_reply(undefined, E, #state{pending = [#pending{msg_id = MsgId}]}
+ = State)
+ when MsgId /= undefined ->
+ ConnName = (State#state.connection)#connection.name,
+ ?error(ConnName, [{warning, rpc_reply_missing_msg_id},
+ {assuming, MsgId}]),
+ decode_rpc_reply(MsgId, E, State);
+
+decode_rpc_reply(undefined, _, State) ->
+ ConnName = (State#state.connection)#connection.name,
+ ?error(ConnName, [{error, rpc_reply_missing_msg_id}]),
+ State;
+
+decode_rpc_reply(MsgId,
+ {_, Attrs, Content0}
+ = E,
+ #state{pending = Pending}
+ = State) ->
+ case lists:keytake(MsgId, #pending.msg_id, Pending) of
+ {value, Rec, Rest} ->
+ #pending{tref = TRef, ref = Ref, op = Op, caller = From}
+ = Rec,
+ cancel_request_timer(Ref, TRef),
+ Content = forward_xmlns_attr(Attrs, Content0),
+ {Reply, T} = do_decode_rpc_reply(Op,
+ Content,
+ State#state{pending = Rest}),
+ ct_gen_conn:return(From, Reply),
+ T;
+ false -> %% not a send_rcp or server has sent wrong id
+ decode_send({got_unexpected_msg_id, MsgId}, E, State)
end.
-do_decode_rpc_reply(Op,Result,State)
- when Op==lock; Op==unlock; Op==edit_config; Op==delete_config;
- Op==copy_config; Op==kill_session ->
- {decode_ok(Result),{noreply,State}};
-do_decode_rpc_reply(Op,Result,State)
- when Op==get; Op==get_config; Op==action ->
- {decode_data(Result),{noreply,State}};
-do_decode_rpc_reply(close_session,Result,State) ->
+%% decode_send/2
+%%
+%% Result of send/2,3. Only handle one at a time there since all
+%% pendings have msg_id/op = undefined.
+
+decode_send(ErrorT, Elem, #state{pending = Pending} = State) ->
+ case [P || #pending{msg_id = undefined, op = undefined} = P <- Pending] of
+ [Rec] ->
+ #pending{tref = TRef,
+ ref = Ref,
+ caller = From}
+ = Rec,
+ cancel_request_timer(Ref, TRef),
+ ct_gen_conn:return(From, Elem),
+ State#state{pending = []};
+ _ ->
+ Conn = State#state.connection,
+ ?error(Conn#connection.name, [ErrorT, {expecting, Pending}]),
+ State
+ end.
+
+%% do_decode_rpc_reply/3
+
+do_decode_rpc_reply(Op, Result, State)
+ when Op == lock;
+ Op == unlock;
+ Op == edit_config;
+ Op == delete_config;
+ Op == copy_config;
+ Op == kill_session ->
+ {decode_ok(Result), State};
+
+do_decode_rpc_reply(Op, Result, State)
+ when Op == get;
+ Op == get_config;
+ Op == action ->
+ {decode_data(Result), State};
+
+do_decode_rpc_reply(close_session, Result, State) ->
case decode_ok(Result) of
- ok -> {ok,{stop,State}};
- Other -> {Other,{noreply,State}}
+ ok ->
+ {ok, {stop, State}};
+ Other ->
+ {Other, State}
end;
-do_decode_rpc_reply({create_subscription,Caller},Result,State) ->
+
+do_decode_rpc_reply({create_subscription, From}, Result, State) ->
case decode_ok(Result) of
- ok ->
- {ok,{noreply,State#state{event_receiver=Caller}}};
- Other ->
- {Other,{noreply,State}}
+ ok ->
+ {ok, State#state{event_receiver = From}};
+ Other ->
+ {Other, State}
end;
-do_decode_rpc_reply(get_event_streams,Result,State) ->
- {decode_streams(decode_data(Result)),{noreply,State}};
-do_decode_rpc_reply(undefined,Result,State) ->
- {Result,{noreply,State}}.
+
+do_decode_rpc_reply(get_event_streams, Result, State) ->
+ {decode_streams(decode_data(Result)), State};
+
+do_decode_rpc_reply(undefined, Result, State) ->
+ {Result, State}.
@@ -1454,7 +1615,7 @@ decode_data([{Tag,Attrs,Content}]) ->
case get_local_name_atom(Tag) of
ok ->
%% when action has return type void
- ok;
+ ok;
data ->
%% Since content of data has nothing from the netconf
%% namespace, we remove the parent's xmlns attribute here
@@ -1525,7 +1686,6 @@ get_all_xmlns_attrs([{Key,_}=Attr|Attrs],XmlnsAttrs) ->
get_all_xmlns_attrs([],XmlnsAttrs) ->
XmlnsAttrs.
-
%% Decode server hello to pick out session id and capabilities
decode_hello({hello,_Attrs,Hello}) ->
case lists:keyfind('session-id',1,Hello) of
@@ -1545,21 +1705,18 @@ decode_hello({hello,_Attrs,Hello}) ->
{error,{incorrect_hello,no_session_id_found}}
end.
-decode_caps([{capability,[],[?NETCONF_BASE_CAP++Vsn=Cap]} |Caps], Acc, Base) ->
- case Vsn of
- ?NETCONF_BASE_CAP_VSN ->
- decode_caps(Caps, [Cap|Acc], true);
- _ -> %% unsupported version: discard
- decode_caps(Caps, Acc, Base)
- end;
-decode_caps([{capability,[],[Cap]}|Caps],Acc,Base) ->
- decode_caps(Caps,[Cap|Acc],Base);
-decode_caps([H|_T],_,_) ->
- {error,{unexpected_capability_element,H}};
-decode_caps([],_,false) ->
- {error,{incorrect_hello,no_base_capability_found}};
-decode_caps([],Acc,true) ->
- {ok,lists:reverse(Acc)}.
+decode_caps([{capability, [], [?NETCONF_BASE_CAP ++ _ = Cap]} | Caps],
+ Acc,
+ _) ->
+ decode_caps(Caps, [Cap|Acc], true);
+decode_caps([{capability, [], [Cap]} | Caps], Acc, Base) ->
+ decode_caps(Caps, [Cap|Acc], Base);
+decode_caps([H|_], _, _) ->
+ {error, {unexpected_capability_element, H}};
+decode_caps([], _, false) ->
+ {error, {incorrect_hello, no_base_capability_found}};
+decode_caps([], Acc, true) ->
+ {ok, lists:reverse(Acc)}.
%% Return a list of {Name,Data}, where data is a {Tag,Value} list for each stream
@@ -1570,7 +1727,7 @@ decode_streams({ok,[{netconf,_,Streams}]}) ->
decode_streams([{streams,_,Streams}]) ->
decode_streams(Streams);
decode_streams([{stream,_,Stream} | Streams]) ->
- {name,_,[Name]} = lists:keyfind(name,1,Stream),
+ {name,_,[Name]} = find(name, Stream),
[{Name,[{Tag,Value} || {Tag,_,[Value]} <- Stream, Tag /= name]}
| decode_streams(Streams)];
decode_streams([]) ->
@@ -1814,6 +1971,190 @@ ssh_close(Connection=#connection{reference = CM}) ->
log(Connection,disconnect),
ok.
+%% ===========================================================================
+
+%% recv/1
+%%
+%% Extract incoming messages using either NETCONF 1.0 framing or
+%% NETCONF 1.1 chunking.
+
+recv(Bin, true) ->
+ recv(Bin, [<<>>, 3]);
+recv(Bin, false) ->
+ recv(Bin, <<>>);
+
+recv(Bin, [Head, Len | Chunks]) -> %% 1.1 chunking
+ chunk(<<Head/binary, Bin/binary>>, Chunks, Len);
+
+%% Start looking for the terminating end-of-message sequence ]]>]]>
+%% 5 characters from the end of the buffered head, since this binary
+%% has already been scanned.
+recv(Bin, Head) when is_binary(Head) -> %% 1.0 framing
+ frame(<<Head/binary, Bin/binary>>, max(0, size(Head) - 5)).
+
+%% frame/2
+%%
+%% Extract a message terminated by the ]]>]]> end-of-message sequence.
+%% Don't need to extract characters as UTF-8 since matching byte-wise
+%% is unambiguous: the high-order bit of every byte of a multi-byte
+%% UTF character is 1, while the end-of-message sequence is ASCII.
+
+frame(Bin, Start) ->
+ Sz = size(Bin),
+ Scope = {Start, Sz - Start},
+ case binary:match(Bin, pattern(), [{scope, Scope}]) of
+ {Len, 6} ->
+ <<Msg:Len/binary, _:6/binary, Rest/binary>> = Bin,
+ {trim(Msg), Rest};
+ nomatch ->
+ Bin
+ end.
+
+%% pattern/0
+
+pattern() ->
+ Key = ?KEY(pattern),
+ case get(Key) of
+ undefined ->
+ CP = binary:compile_pattern(<<"]]>]]>">>),
+ put(Key, CP),
+ CP;
+ CP ->
+ CP
+ end.
+
+%% trim/1
+%%
+%% Whitespace before an XML declaration is an error, but be somewhat
+%% lenient and strip line breaks since the RFC's are unclear on what's
+%% allowed following a ]]>]]> delimiter. Typical seems to be a single
+%% $\n, but strip any of " \t\r\n", and regardless of NETCONF version.
+
+trim(<<C, Bin/binary>>)
+ when C == $\n;
+ C == $\r;
+ C == $\t;
+ C == $ ->
+ trim(Bin);
+
+trim(Bin) ->
+ Bin.
+
+%% chunk/3
+%%
+%% The final argument is either 0 to indicate that a specified number
+%% of bytes of chunk data should be consumed, or at least 3 to
+%% indicate an offset at which to look for a newline following a chunk
+%% size.
+
+%% Accumulating chunk-data ...
+chunk(Bin, [Sz | Chunks] = L, 0) ->
+ case Bin of
+ <<Chunk:Sz/binary, Rest/binary>> ->
+ chunk(Rest, acc(Chunk, Chunks), 3); %% complete chunk ...
+ _ ->
+ [Bin, 0 | L] %% ... or not
+ end;
+
+%% ... or a header.
+
+chunk(Bin, Chunks, Len)
+ when size(Bin) < 4 ->
+ [Bin, 3 = Len | Chunks];
+
+%% End of chunks.
+chunk(<<"\n##\n", Rest/binary>>, Chunks, _) ->
+ case Chunks of
+ [] ->
+ {error, "end-of-chunks unexpected"}; %% must be at least one
+ Bins ->
+ {lists:reverse(Bins), Rest}
+ end;
+
+%% Matching each of the 10 newline possibilities is faster than
+%% searching.
+chunk(<<"\n#", Head:1/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:2/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:3/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:4/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:5/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:6/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:7/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:8/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:9/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+chunk(<<"\n#", Head:10/binary, $\n, Rest/binary>>, Chunks, _) ->
+ acc(Head, Rest, Chunks);
+
+chunk(<<"\n#", Bin:11/binary, _/binary>>, _, _) ->
+ {error, {"chunk-size too long", Bin}}; %% 32-bits = max 10 digits
+
+chunk(<<"\n#", _/binary>> = Bin, Chunks, _) ->
+ [Bin, size(Bin) | Chunks];
+
+chunk(Bin, Chunks, 3 = Len) ->
+ case drop(Bin) of
+ <<>> ->
+ [Bin, Len | Chunks];
+ <<"\n#", _/binary>> = B ->
+ chunk(B, Chunks, Len);
+ _ ->
+ {error, {"not a chunk", Bin}}
+ end.
+
+%% drop/1
+
+drop(<<"\n#", _/binary>> = Bin) ->
+ Bin;
+
+drop(<<C, Bin/binary>>)
+ when C == $\n;
+ C == $\r;
+ C == $\t;
+ C == $ ->
+ drop(Bin);
+
+drop(Bin) ->
+ Bin.
+
+%% acc/2
+
+acc(Chunk, []) ->
+ [B || B <- [trim(Chunk)], <<>> /= B];
+
+acc(Chunk, Chunks) ->
+ [Chunk | Chunks].
+
+%% acc/3
+
+acc(Head, Rest, Chunks) ->
+ case chunk_size(Head) of
+ {error, _Reason} = No ->
+ No;
+ Sz ->
+ chunk(Rest, [Sz | Chunks], 0)
+ end.
+
+%% chunk_size/1
+
+chunk_size(<<C, _/binary>> = Bin) ->
+ try true = $0 < C, binary_to_integer(Bin) of
+ Sz when 0 < Sz bsr 32 ->
+ {error, {"chunk-size too large", Sz}};
+ Sz ->
+ Sz
+ catch
+ error: _ ->
+ {error, {"chunk-size invalid", Bin}}
+ end.
%%----------------------------------------------------------------------
%% END OF MODULE