diff options
| author | Sverker Eriksson <sverker@erlang.org> | 2019-11-12 15:28:07 +0100 |
|---|---|---|
| committer | Sverker Eriksson <sverker@erlang.org> | 2020-01-14 15:20:17 +0100 |
| commit | fc309a039a4d817725d7b9887a36d2c501a83679 (patch) | |
| tree | 1039453d07d003d1881166357aa2e519c1a247e1 /lib/jinterface/java_src | |
| parent | 69d79d8cee4bb0f492209b1cf0d830b402319b03 (diff) | |
| download | erlang-fc309a039a4d817725d7b9887a36d2c501a83679.tar.gz | |
Improve distribution handshake
Diffstat (limited to 'lib/jinterface/java_src')
4 files changed, 166 insertions, 68 deletions
diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java index 0bf3ca2a67..26f6ffcd97 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractConnection.java @@ -147,21 +147,6 @@ public abstract class AbstractConnection extends Thread { if (traceLevel >= handshakeThreshold) { System.out.println("<- ACCEPT FROM " + s); } - - // get his info - recvName(peer); - - // now find highest common dist value - if (peer.proto != self.proto || self.distHigh < peer.distLow - || self.distLow > peer.distHigh) { - close(); - throw new IOException( - "No common protocol found - cannot accept connection"); - } - // highest common version: min(peer.distHigh, self.distHigh) - peer.distChoose = peer.distHigh > self.distHigh ? self.distHigh - : peer.distHigh; - doAccept(); name = peer.node(); } @@ -953,10 +938,12 @@ public abstract class AbstractConnection extends Thread { } protected void doAccept() throws IOException, OtpAuthException { + final int send_name_tag = recvName(peer); try { sendStatus("ok"); final int our_challenge = genChallenge(); - sendChallenge(peer.distChoose, localNode.flags, our_challenge); + sendChallenge(peer.flags, localNode.flags, our_challenge); + recvComplement(send_name_tag); final int her_challenge = recvChallengeReply(our_challenge); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); @@ -992,12 +979,14 @@ public abstract class AbstractConnection extends Thread { System.out.println("-> MD5 CONNECT TO " + peer.host() + ":" + port); } - sendName(peer.distChoose, localNode.flags); + final int send_name_tag = sendName(peer.distChoose, localNode.flags, + localNode.creation); recvStatus(); final int her_challenge = recvChallenge(); final byte[] our_digest = genDigest(her_challenge, localNode.cookie()); final int our_challenge = genChallenge(); + sendComplement(send_name_tag); sendChallengeReply(our_challenge, our_digest); recvChallengeAck(our_challenge); cookieOk = true; @@ -1070,17 +1059,31 @@ public abstract class AbstractConnection extends Thread { return res; } - protected void sendName(final int dist, final int aflags) + protected int sendName(final int dist, final long aflags, + final int creation) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); - obuf.write2BE(str.length() + 7); // 7 bytes + nodename - obuf.write1(AbstractNode.NTYPE_R6); - obuf.write2BE(dist); - obuf.write4BE(aflags); - obuf.write(str.getBytes()); + int send_name_tag; + if (dist == 5) { + obuf.write2BE(1+2+4 + str.length()); + send_name_tag = 'n'; + obuf.write1(send_name_tag); + obuf.write2BE(dist); + obuf.write4BE(aflags); + obuf.write(str.getBytes()); + } + else { + obuf.write2BE(1+8+4+2 + str.length()); + send_name_tag = 'N'; + obuf.write1(send_name_tag); + obuf.write8BE(aflags); + obuf.write4BE(creation); + obuf.write2BE(str.length()); + obuf.write(str.getBytes()); + } obuf.writeToAndFlush(socket.getOutputStream()); @@ -1088,26 +1091,61 @@ public abstract class AbstractConnection extends Thread { System.out.println("-> " + "HANDSHAKE sendName" + " flags=" + aflags + " dist=" + dist + " local=" + localNode); } + return send_name_tag; } - protected void sendChallenge(final int dist, final int aflags, - final int challenge) throws IOException { + protected void sendComplement(final int send_name_tag) + throws IOException { + + if (send_name_tag == 'n' && + (peer.flags & AbstractNode.dFlagHandshake23) != 0) { + @SuppressWarnings("resource") + final OtpOutputStream obuf = new OtpOutputStream(); + obuf.write2BE(1+4+4); + obuf.write1('c'); + final int flagsHigh = (int)(localNode.flags >> 32); + obuf.write4BE(flagsHigh); + obuf.write4BE(localNode.creation); + + obuf.writeToAndFlush(socket.getOutputStream()); + + if (traceLevel >= handshakeThreshold) { + System.out.println("-> " + "HANDSHAKE sendComplement" + + " flagsHigh=" + flagsHigh + + " creation=" + localNode.creation); + } + } + } + + protected void sendChallenge(final long her_flags, final long our_flags, + final int challenge) throws IOException { @SuppressWarnings("resource") final OtpOutputStream obuf = new OtpOutputStream(); final String str = localNode.node(); - obuf.write2BE(str.length() + 11); // 11 bytes + nodename - obuf.write1(AbstractNode.NTYPE_R6); - obuf.write2BE(dist); - obuf.write4BE(aflags); - obuf.write4BE(challenge); - obuf.write(str.getBytes()); + if ((her_flags & AbstractNode.dFlagHandshake23) == 0) { + obuf.write2BE(1+2+4+4 + str.length()); + obuf.write1('n'); + obuf.write2BE(5); + obuf.write4BE(our_flags & 0xffffffff); + obuf.write4BE(challenge); + obuf.write(str.getBytes()); + } + else { + obuf.write2BE(1+8+4+4+2 + str.length()); + obuf.write1('N'); + obuf.write8BE(our_flags); + obuf.write4BE(challenge); + obuf.write4BE(localNode.creation); + obuf.write2BE(str.length()); + obuf.write(str.getBytes()); + } obuf.writeToAndFlush(socket.getOutputStream()); if (traceLevel >= handshakeThreshold) { System.out.println("-> " + "HANDSHAKE sendChallenge" + " flags=" - + aflags + " dist=" + dist + " challenge=" + challenge + + our_flags + " challenge=" + challenge + " local=" + localNode); } } @@ -1127,8 +1165,8 @@ public abstract class AbstractConnection extends Thread { return tmpbuf; } - protected void recvName(final OtpPeer apeer) throws IOException { - + protected int recvName(final OtpPeer apeer) throws IOException { + int send_name_tag; String hisname = ""; try { @@ -1137,25 +1175,31 @@ public abstract class AbstractConnection extends Thread { final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); byte[] tmpname; final int len = tmpbuf.length; - apeer.ntype = ibuf.read1(); - if (apeer.ntype != AbstractNode.NTYPE_R6) { + send_name_tag = ibuf.read1(); + switch (send_name_tag) { + case 'n': + apeer.distLow = apeer.distHigh = ibuf.read2BE(); + if (apeer.distLow != 5) + throw new IOException("Invalid handshake version"); + apeer.flags = ibuf.read4BE(); + tmpname = new byte[len - 7]; + ibuf.readN(tmpname); + hisname = OtpErlangString.newString(tmpname); + break; + case 'N': + apeer.distLow = apeer.distHigh = 6; + apeer.flags = ibuf.read8BE(); + if ((apeer.flags & AbstractNode.dFlagHandshake23) == 0) + throw new IOException("Missing DFLAG_HANDSHAKE_23"); + apeer.creation = ibuf.read4BE(); + int namelen = ibuf.read2BE(); + tmpname = new byte[namelen]; + ibuf.readN(tmpname); + hisname = OtpErlangString.newString(tmpname); + break; + default: throw new IOException("Unknown remote node type"); } - apeer.distLow = apeer.distHigh = ibuf.read2BE(); - if (apeer.distLow < 5) { - throw new IOException("Unknown remote node type"); - } - apeer.flags = ibuf.read4BE(); - tmpname = new byte[len - 7]; - ibuf.readN(tmpname); - hisname = OtpErlangString.newString(tmpname); - // Set the old nodetype parameter to indicate hidden/normal status - // When the old handshake is removed, the ntype should also be. - if ((apeer.flags & AbstractNode.dFlagPublished) != 0) { - apeer.ntype = AbstractNode.NTYPE_R4_ERLANG; - } else { - apeer.ntype = AbstractNode.NTYPE_R4_HIDDEN; - } if ((apeer.flags & AbstractNode.dFlagExtendedReferences) == 0) { throw new IOException( @@ -1180,6 +1224,7 @@ public abstract class AbstractConnection extends Thread { System.out.println("<- " + "HANDSHAKE" + " ntype=" + apeer.ntype + " dist=" + apeer.distHigh + " remote=" + apeer); } + return send_name_tag; } protected int recvChallenge() throws IOException { @@ -1190,14 +1235,31 @@ public abstract class AbstractConnection extends Thread { final byte[] buf = read2BytePackage(); @SuppressWarnings("resource") final OtpInputStream ibuf = new OtpInputStream(buf, 0); - peer.ntype = ibuf.read1(); - if (peer.ntype != AbstractNode.NTYPE_R6) { + int namelen; + switch (ibuf.read1()) { + case 'n': + if (peer.distChoose != 5) + throw new IOException("Old challenge wrong version"); + peer.distLow = peer.distHigh = ibuf.read2BE(); + peer.flags = ibuf.read4BE(); + if ((peer.flags & AbstractNode.dFlagHandshake23) != 0) + throw new IOException("Old challenge unexpected DFLAG_HANDHAKE_23"); + challenge = ibuf.read4BE(); + namelen = buf.length - (1+2+4+4); + break; + case 'N': + peer.distLow = peer.distHigh = peer.distChoose = 6; + peer.flags = ibuf.read8BE(); + if ((peer.flags & AbstractNode.dFlagHandshake23) == 0) + throw new IOException("New challenge missing DFLAG_HANDHAKE_23"); + challenge = ibuf.read4BE(); + peer.creation = ibuf.read4BE(); + namelen = ibuf.read2BE(); + break; + default: throw new IOException("Unexpected peer type"); } - peer.distLow = peer.distHigh = ibuf.read2BE(); - peer.flags = ibuf.read4BE(); - challenge = ibuf.read4BE(); - final byte[] tmpname = new byte[buf.length - 11]; + final byte[] tmpname = new byte[namelen]; ibuf.readN(tmpname); final String hisname = OtpErlangString.newString(tmpname); if (!hisname.equals(peer.node)) { @@ -1228,6 +1290,27 @@ public abstract class AbstractConnection extends Thread { return challenge; } + protected void recvComplement(int send_name_tag) throws IOException { + + if (send_name_tag == 'n' && + (peer.flags & AbstractNode.dFlagHandshake23) != 0) { + try { + final byte[] tmpbuf = read2BytePackage(); + @SuppressWarnings("resource") + final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); + if (ibuf.read1() != 'c') + throw new IOException("Not a complement tag"); + + final long flagsHigh = ibuf.read4BE(); + peer.flags |= flagsHigh << 32; + peer.creation = ibuf.read4BE(); + + } catch (final OtpErlangDecodeException e) { + throw new IOException("Handshake failed - not enough data"); + } + } + } + protected void sendChallengeReply(final int challenge, final byte[] digest) throws IOException { diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java index c3f71a84f0..fa6db9a046 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/AbstractNode.java @@ -74,10 +74,7 @@ public class AbstractNode implements OtpTransportFactory { static String defaultCookie = null; final OtpTransportFactory transportFactory; - // Node types static final int NTYPE_R6 = 110; // 'n' post-r5, all nodes - static final int NTYPE_R4_ERLANG = 109; // 'm' Only for source compatibility - static final int NTYPE_R4_HIDDEN = 104; // 'h' Only for source compatibility // Node capability flags static final int dFlagPublished = 1; @@ -96,17 +93,19 @@ public class AbstractNode implements OtpTransportFactory { static final int dFlagUtf8Atoms = 0x10000; static final int dFlagMapTag = 0x20000; static final int dFlagBigCreation = 0x40000; + static final int dFlagHandshake23 = 0x1000000; int ntype = NTYPE_R6; int proto = 0; // tcp/ip - int distHigh = 5; // Cannot talk to nodes before R6 + int distHigh = 6; int distLow = 5; // Cannot talk to nodes before R6 int creation = 0; - int flags = dFlagExtendedReferences | dFlagExtendedPidsPorts + long flags = dFlagExtendedReferences | dFlagExtendedPidsPorts | dFlagBitBinaries | dFlagNewFloats | dFlagFunTags | dflagNewFunTags | dFlagUtf8Atoms | dFlagMapTag | dFlagExportPtrTag - | dFlagBigCreation; + | dFlagBigCreation + | dFlagHandshake23; /* initialize hostname and default cookie */ static { diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java index fffb8475d3..008ee9727e 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpEpmd.java @@ -74,8 +74,9 @@ public class OtpEpmd { private static final byte port4req = (byte) 122; private static final byte port4resp = (byte) 119; - private static final byte publish4req = (byte) 120; - private static final byte publish4resp = (byte) 121; + private static final byte ALIVE2_REQ = (byte) 120; + private static final byte ALIVE2_RESP = (byte) 121; + private static final byte ALIVE2_X_RESP = (byte) 118; private static final byte names4req = (byte) 110; private static int traceLevel = 0; @@ -287,7 +288,7 @@ public class OtpEpmd { obuf.write2BE(node.alive().length() + 13); - obuf.write1(publish4req); + obuf.write1(ALIVE2_REQ); obuf.write2BE(node.port()); obuf.write1(node.type()); @@ -322,10 +323,11 @@ public class OtpEpmd { final OtpInputStream ibuf = new OtpInputStream(tmpbuf, 0); final int response = ibuf.read1(); - if (response == publish4resp) { + if (response == ALIVE2_RESP || response == ALIVE2_X_RESP) { final int result = ibuf.read1(); if (result == 0) { - node.creation = ibuf.read2BE(); + node.creation = (response == ALIVE2_RESP + ? ibuf.read2BE() : ibuf.read4BE()); if (traceLevel >= traceThreshold) { System.out.println("<- OK"); } diff --git a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java index 6d81ce630b..8cc5b3c21d 100644 --- a/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java +++ b/lib/jinterface/java_src/com/ericsson/otp/erlang/OtpInputStream.java @@ -239,6 +239,20 @@ public class OtpInputStream extends ByteArrayInputStream { } /** + * Read a eight byte big endian integer from the stream. + * + * @return the bytes read, converted from big endian to a long integer. + * + * @exception OtpErlangDecodeException + * if the next byte cannot be read. + */ + public long read8BE() throws OtpErlangDecodeException { + long high = read4BE(); + long low = read4BE(); + return (high << 32) | (low & 0xffffffff); + } + + /** * Read a two byte little endian integer from the stream. * * @return the bytes read, converted from little endian to an integer. |
