diff options
author | Raimo Niskanen <raimo@erlang.org> | 2020-02-07 14:11:22 +0100 |
---|---|---|
committer | Raimo Niskanen <raimo@erlang.org> | 2020-02-07 14:11:22 +0100 |
commit | 129361a04639482957db8082f658a7f8626a536e (patch) | |
tree | 946819655a1548919d018f6e4aa004d532cb5987 /erts | |
parent | 266f2dce5b41127a80c54a8290d6c08b0659e9e8 (diff) | |
parent | cd96f9c6934d48d08be61f63b37221a76946d21c (diff) | |
download | erlang-129361a04639482957db8082f658a7f8626a536e.tar.gz |
Merge branch 'raimo/erts/socket-fixes/OTP-14644' into maint
* raimo/erts/socket-fixes/OTP-14644:
Stop using typeof for struct timeval
Fix lock order
Cleanup locks
Cleanup redundant cleanup code
Cleanup requestor handling
Fix specs
Merge domain+type+proto in one getopt
Cleanup timeout handling
Cleanup parameter check functions
Cleanup try-catch handling
Fix nif_select and closeMtx handling
Fix lock order
Bugfix: do not read freed refs
Try fix double free
Fix close and abort handling
Clean up send code and ref handling
remove-unsuccesful-experiment: Fix timing dependent test case
Fix timing dependent test case
Do not self-close socket for econnreset
Avoid deadlock and redundant close + abort message
Diffstat (limited to 'erts')
-rw-r--r-- | erts/configure.in | 1 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 764 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.c | 51 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 21 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 80460 -> 79040 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 962 |
6 files changed, 964 insertions, 835 deletions
diff --git a/erts/configure.in b/erts/configure.in index 71c38129b4..609c457393 100644 --- a/erts/configure.in +++ b/erts/configure.in @@ -1818,6 +1818,7 @@ AC_CHECK_SIZEOF(long long) AC_CHECK_SIZEOF(size_t) AC_CHECK_SIZEOF(off_t) AC_CHECK_SIZEOF(time_t) +AC_CHECK_SIZEOF(suseconds_t) BITS64= diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 36dfe4e937..ab95748458 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -368,7 +368,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define ESOCK_GLOBAL_DEBUG_DEFAULT FALSE #define ESOCK_DEBUG_DEFAULT FALSE -/* Counters and stuff (Don't know where to sent this stuff anyway) */ +/* Counters and stuff (Don't know where to sen2 this stuff anyway) */ #define ESOCK_NIF_IOW_DEFAULT FALSE @@ -487,6 +487,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define ESOCK_RECV_FLAG_LOW ESOCK_RECV_FLAG_CMSG_CLOEXEC #define ESOCK_RECV_FLAG_HIGH ESOCK_RECV_FLAG_TRUNC +#define ESOCK_RECV_BUFFER_COUNT_DEFAULT 0 #define ESOCK_RECV_BUFFER_SIZE_DEFAULT 8192 #define ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024 #define ESOCK_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024 @@ -581,6 +582,7 @@ typedef union { #define ESOCK_OPT_OTP_DOMAIN 0xFF01 // INTERNAL AND ONLY GET #define ESOCK_OPT_OTP_TYPE 0xFF02 // INTERNAL AND ONLY GET #define ESOCK_OPT_OTP_PROTOCOL 0xFF03 // INTERNAL AND ONLY GET +#define ESOCK_OPT_OTP_DTP 0xFF04 // INTERNAL AND ONLY GET #define ESOCK_OPT_SOCK_ACCEPTCONN 1 #define ESOCK_OPT_SOCK_BINDTODEVICE 3 @@ -984,6 +986,9 @@ typedef struct { ERL_NIF_TERM closeRef; BOOLEAN_T closeLocal; + /* Lock order: closeMtx, readMtx, accMtx, writeMtx, cfgMtx + * unordered: cntMtx + */ } ESockDescriptor; @@ -993,13 +998,19 @@ typedef struct { /* These are for debugging, testing and the like */ // ERL_NIF_TERM version; // ERL_NIF_TERM buildDate; + + /* XXX Should be locked but too awkward and small gain */ BOOLEAN_T dbg; /* Registry stuff */ - ErlNifPid regPid; + ErlNifPid regPid; /* Constant - not locked */ + /* XXX + * Should be locked but too awkward for no gain since it is not used yet + */ BOOLEAN_T iow; // Where do we send this? Subscription? - ErlNifMutex* cntMtx; + + ErlNifMutex* cntMtx; /* Locks the below */ /* Its extreme overkill to have these counters be 64-bit, * but since the other counters are, its much simpler to * let to let these be 64-bit also @@ -1831,6 +1842,7 @@ static ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env, * *** esock_getopt_otp_domain *** * *** esock_getopt_otp_type *** * *** esock_getopt_otp_protocol *** + * *** esock_getopt_otp_dtp *** */ #define ESOCK_GETOPT_OTP_FUNCS \ ESOCK_GETOPT_OTP_FUNC_DEF(debug); \ @@ -1843,7 +1855,8 @@ static ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env, ESOCK_GETOPT_OTP_FUNC_DEF(fd); \ ESOCK_GETOPT_OTP_FUNC_DEF(domain); \ ESOCK_GETOPT_OTP_FUNC_DEF(type); \ - ESOCK_GETOPT_OTP_FUNC_DEF(protocol); + ESOCK_GETOPT_OTP_FUNC_DEF(protocol); \ + ESOCK_GETOPT_OTP_FUNC_DEF(dtp); #define ESOCK_GETOPT_OTP_FUNC_DEF(F) \ static ERL_NIF_TERM esock_getopt_otp_##F(ErlNifEnv* env, \ ESockDescriptor* descP) @@ -2317,6 +2330,10 @@ static ERL_NIF_TERM send_check_fail(ErlNifEnv* env, ESockDescriptor* descP, int saveErrno, ERL_NIF_TERM sockRef); +static void send_error_waiting_writers(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM reason); static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, ESockDescriptor* descP, ssize_t written, @@ -2370,10 +2387,10 @@ static ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, ErlNifBinary* buf2P, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); -static ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM recvRef); +static ERL_NIF_TERM recv_check_fail_econnreset(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef); static ERL_NIF_TERM recv_check_partial(ErlNifEnv* env, ESockDescriptor* descP, ssize_t read, @@ -2671,6 +2688,8 @@ ESOCK_OPERATOR_FUNCS_DEFS static BOOLEAN_T requestor_pop(ESockRequestQueue* q, ESockRequestor* reqP); +static void requestor_clear(ESockRequestor* reqP); + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, ESockRequestQueue* q, ErlNifPid* pid); @@ -2695,6 +2714,10 @@ static int esock_demonitor(const char* slogan, static void esock_monitor_init(ESockMonitor* mon); static ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env, const ESockMonitor* monP); +static void esock_release_current(const char* slogan, + ErlNifEnv* env, + ESockDescriptor* descP, + ESockRequestor* current); #endif // if defined(__WIN32__) @@ -3190,6 +3213,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') /* Local error reason atoms */ #define LOCAL_ERROR_REASON_ATOMS \ + LOCAL_ATOM_DECL(econnreset); \ LOCAL_ATOM_DECL(eisconn); \ LOCAL_ATOM_DECL(enotclosing); \ LOCAL_ATOM_DECL(enotconn); \ @@ -3340,37 +3364,51 @@ ERL_NIF_TERM nif_info(ErlNifEnv* env, static ERL_NIF_TERM esock_global_info(ErlNifEnv* env) { - ERL_NIF_TERM numBits = MKCT(env, atom_num_cnt_bits, ESOCK_COUNTER_SIZE); - ERL_NIF_TERM numSockets = MKCT(env, atom_num_sockets, data.numSockets); - ERL_NIF_TERM numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams); - ERL_NIF_TERM numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams); - ERL_NIF_TERM numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs); - ERL_NIF_TERM numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal); - ERL_NIF_TERM numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet); - ERL_NIF_TERM numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6); - ERL_NIF_TERM numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP); - ERL_NIF_TERM numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP); - ERL_NIF_TERM numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP); - ERL_NIF_TERM numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP); - ERL_NIF_TERM gcnt[] = {numBits, - numSockets, - numTypeDGrams, numTypeStreams, numTypeSeqPkgs, - numDomLocal, numDomInet, numDomInet6, - numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP}; - unsigned int lenGCnt = sizeof(gcnt) / sizeof(ERL_NIF_TERM); - ERL_NIF_TERM lgcnt = MKLA(env, gcnt, lenGCnt); - ERL_NIF_TERM keys[] = {esock_atom_debug, atom_iow, atom_counters}; - ERL_NIF_TERM vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt}; - ERL_NIF_TERM info; - unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); - unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + ERL_NIF_TERM + numBits, numSockets, numTypeDGrams, numTypeStreams, + numTypeSeqPkgs, numDomLocal, numDomInet, numDomInet6, + numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP; - ESOCK_ASSERT( (numKeys == numVals) ); + MLOCK(data.cntMtx); + numBits = MKCT(env, atom_num_cnt_bits, ESOCK_COUNTER_SIZE); + numSockets = MKCT(env, atom_num_sockets, data.numSockets); + numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams); + numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams); + numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs); + numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal); + numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet); + numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6); + numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP); + numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP); + numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP); + numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP); + MUNLOCK(data.cntMtx); - if (!MKMA(env, keys, vals, numKeys, &info)) - return enif_make_badarg(env); + { + ERL_NIF_TERM gcnt[] = + {numBits, + numSockets, + numTypeDGrams, numTypeStreams, numTypeSeqPkgs, + numDomLocal, numDomInet, numDomInet6, + numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP}; + unsigned int lenGCnt = + sizeof(gcnt) / sizeof(ERL_NIF_TERM); + ERL_NIF_TERM + lgcnt = MKLA(env, gcnt, lenGCnt), + keys[] = {esock_atom_debug, atom_iow, atom_counters}, + vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt}, + info; + unsigned int + numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM), + numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); - return info; + ESOCK_ASSERT( (numKeys == numVals) ); + + if (!MKMA(env, keys, vals, numKeys, &info)) + return enif_make_badarg(env); + + return info; + } } @@ -3511,8 +3549,8 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, { ERL_NIF_TERM info; - MLOCK(descP->writeMtx); MLOCK(descP->readMtx); + MLOCK(descP->writeMtx); { ERL_NIF_TERM readByteCnt = MKCT(env, atom_read_byte, descP->readByteCnt); @@ -3548,8 +3586,8 @@ ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, } - MUNLOCK(descP->readMtx); MUNLOCK(descP->writeMtx); + MUNLOCK(descP->readMtx); SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> done with" "\r\n info: %T" @@ -5735,8 +5773,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, * safe side we do the best we can to avoid complications... */ - MLOCK(descP->writeMtx); MLOCK(descP->readMtx); + MLOCK(descP->writeMtx); MLOCK(descP->cfgMtx); res = esock_connect(env, descP, sockRef); @@ -6465,13 +6503,6 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, if (esock_accept_accepted(env, descP, sockRef, accSock, descP->currentAcceptor.pid, remote, &res)) { - /* Clean out the old cobweb's before trying to invite a new spider */ - - esock_free_env("esock_accept_accepting_current_accept - " - "current-accept-env", - descP->currentAcceptor.env); - descP->currentAcceptor.env = NULL; - if (!activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, @@ -6482,13 +6513,6 @@ ERL_NIF_TERM esock_accept_accepting_current_accept(ErlNifEnv* env, descP->state = ESOCK_STATE_LISTENING; descP->currentAcceptorP = NULL; - /* Do we really need this? - * The activate_next_acceptor (actually the requestor_pop) function - * initiates these values if there are no waiting acceptor... - */ - descP->currentAcceptor.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentAcceptor.pid); - MON_INIT(&descP->currentAcceptor.mon); } } @@ -6510,10 +6534,8 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, ERL_NIF_TERM opRef, int save_errno) { - ESockRequestor req; ERL_NIF_TERM res, reason; - req.env = NULL; if (save_errno == ERRNO_BLOCK) { /* @@ -6533,12 +6555,17 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, descP->state); } else { + ESockRequestor req; ESOCK_CNT_INC(env, descP, sockRef, atom_acc_fails, &descP->accFails, 1); + esock_release_current("esock_accept_accepting_current_error", + env, descP, descP->currentAcceptorP); + reason = MKA(env, erl_errno_id(save_errno)); res = esock_make_error(env, reason); + req.env = NULL; while (acceptor_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", @@ -6550,7 +6577,7 @@ ERL_NIF_TERM esock_accept_accepting_current_error(ErlNifEnv* env, DEMONP("esock_accept_accepting_current_error -> pop'ed writer", env, descP, &req.mon); } - + descP->currentAcceptorP = NULL; } return res; @@ -6599,6 +6626,21 @@ ERL_NIF_TERM esock_accept_busy_retry(ErlNifEnv* env, if ((sres = esock_select_read(env, descP->sock, descP, pid, sockRef, accRef)) < 0) { + + DEMONP("esock_accept_busy_retry - select failed", + env, descP, &descP->currentAcceptor.mon); + /* It is very unlikely that a next acceptor will be able + * to do anything succesful, but we will clean the queue + */ + if (!activate_next_acceptor(env, descP, sockRef)) { + SSDBG( descP, + ("SOCKET", + "esock_accept_busy_retry -> no more acceptors\r\n") ); + + descP->state = ESOCK_STATE_LISTENING; + descP->currentAcceptorP = NULL; + } + reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); res = esock_make_error(env, reason); } else { @@ -6658,6 +6700,7 @@ BOOLEAN_T esock_accept_accepted(ErlNifEnv* env, accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accDescP->iow = descP->iow; // Inherit iow + accDescP->dbg = descP->dbg; // Inherit debug flag accRef = enif_make_resource(env, accDescP); enif_release_resource(accDescP); @@ -6727,12 +6770,14 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, if ((argc != 4) || !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[3], &eflags)) { + SSDBG( descP, ("SOCKET", "nif_send -> argv decode failed\r\n") ); return enif_make_badarg(env); } sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) { + SSDBG( descP, ("SOCKET", "nif_send -> get resource failed\r\n") ); return enif_make_badarg(env); } @@ -6744,8 +6789,10 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, "\r\n eFlags: 0x%lX" "\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) ); - if (!esendflags2sendflags(eflags, &flags)) + if (!esendflags2sendflags(eflags, &flags)) { + SSDBG( descP, ("SOCKET", "nif_send -> sendflags decode failed\r\n") ); return esock_make_error(env, esock_atom_einval); + } SSDBG( descP, ("SOCKET", "nif_send -> flags: 0x%lX\r\n", flags) ); @@ -6790,12 +6837,17 @@ ERL_NIF_TERM esock_send(ErlNifEnv* env, ssize_t written; ERL_NIF_TERM writerCheck; - if (!descP->isWritable) - return enif_make_badarg(env); + if (!descP->isWritable) { + SSDBG( descP, ("SOCKET", "esock_send -> return not writable\r\n") ); + return esock_make_error(env, atom_closed); + } - /* Check if there is already a current writer and if its us */ - if (!send_check_writer(env, descP, sendRef, &writerCheck)) + /* Ensure that we either have no current writer or that we are it */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) { + SSDBG( descP, ("SOCKET", "esock_send -> writer check failed: " + "\r\n %T\r\n", writerCheck) ); return writerCheck; + } /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... @@ -6857,6 +6909,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, if ((argc != 5) || !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[4], &eflags)) { + SSDBG( descP, ("SOCKET", "nif_sendto -> argv decode failed\r\n") ); return enif_make_badarg(env); } sockRef = argv[0]; // We need this in case we send abort (to the caller) @@ -6864,6 +6917,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, eSockAddr = argv[3]; if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) { + SSDBG( descP, ("SOCKET", "nif_sendto -> get resource failed\r\n") ); return enif_make_badarg(env); } @@ -6922,12 +6976,17 @@ ERL_NIF_TERM esock_sendto(ErlNifEnv* env, ssize_t written; ERL_NIF_TERM writerCheck; - if (!descP->isWritable) - return enif_make_badarg(env); + if (!descP->isWritable) { + SSDBG( descP, ("SOCKET", "esock_sendto -> return not writable\r\n") ); + return esock_make_error(env, atom_closed); + } - /* Check if there is already a current writer and if its us */ - if (!send_check_writer(env, descP, sendRef, &writerCheck)) + /* Ensure that we either have no current writer or we are it */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) { + SSDBG( descP, ("SOCKET", "esock_sendto -> writer check failed: " + "\r\n %T\r\n", writerCheck) ); return writerCheck; + } /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... @@ -6988,6 +7047,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, if ((argc != 4) || !IS_MAP(env, argv[2]) || !GET_UINT(env, argv[3], &eflags)) { + SSDBG( descP, ("SOCKET", "nif_sendmsg -> argv decode failed\r\n") ); return enif_make_badarg(env); } sockRef = argv[0]; // We need this in case we send abort (to the caller) @@ -6995,6 +7055,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, eMsgHdr = argv[2]; if (!ESOCK_GET_RESOURCE(env, sockRef, (void**) &descP)) { + SSDBG( descP, ("SOCKET", "nif_sendmsg -> get resource failed\r\n") ); return enif_make_badarg(env); } @@ -7006,8 +7067,10 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, "\r\n", descP->sock, argv[0], sendRef, eflags) ); - if (!esendflags2sendflags(eflags, &flags)) + if (!esendflags2sendflags(eflags, &flags)) { + SSDBG( descP, ("SOCKET", "nif_sendmsg -> sendflags decode failed\r\n") ); return esock_make_error(env, esock_atom_einval); + } MLOCK(descP->writeMtx); @@ -7049,19 +7112,15 @@ ERL_NIF_TERM esock_sendmsg(ErlNifEnv* env, char* xres; if (!descP->isWritable) { - SSDBG( descP, ("SOCKET", "esock_sendmsg -> not writable\r\n") ); - - return enif_make_badarg(env); + return esock_make_error(env, atom_closed); } - /* Check if there is already a current writer and if its us */ + /* Ensure that we either have no current writer or we are it */ if (!send_check_writer(env, descP, sendRef, &writerCheck)) { - - SSDBG( descP, - ("SOCKET", "esock_sendmsg -> writer check failed: " - "\r\n %T\r\n", writerCheck) ); - + SSDBG( descP, + ("SOCKET", "esock_sendmsg -> writer check failed: " + "\r\n %T\r\n", writerCheck) ); return writerCheck; } @@ -7333,6 +7392,8 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, MUNLOCK(descP->readMtx); + SSDBG( descP, ("SOCKET", "nif_recv -> done: %T\r\n", res) ); + return res; #endif // if defined(__WIN32__) @@ -7367,9 +7428,9 @@ ERL_NIF_TERM esock_recv(ErlNifEnv* env, flags) ); if (!descP->isReadable) - return enif_make_badarg(env); + return esock_make_error(env, atom_closed); - /* Check if there is already a current reader and if its us */ + /* Ensure that we either have no current reader or that we are it */ if (!recv_check_reader(env, descP, recvRef, &readerCheck)) return readerCheck; @@ -7532,9 +7593,9 @@ ERL_NIF_TERM esock_recvfrom(ErlNifEnv* env, "\r\n", len, bufSz, flags) ); if (!descP->isReadable) - return enif_make_badarg(env); + return esock_make_error(env, atom_closed); - /* Check if there is already a current reader and if its us */ + /* Ensure that we either have no current reader or that we are it */ if (!recv_check_reader(env, descP, recvRef, &readerCheck)) return readerCheck; @@ -7707,9 +7768,9 @@ ERL_NIF_TERM esock_recvmsg(ErlNifEnv* env, "\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) ); if (!descP->isReadable) - return enif_make_badarg(env); + return esock_make_error(env, atom_closed); - /* Check if there is already a current reader and if its us */ + /* Ensure that we either have no current reader or that we are it */ if (!recv_check_reader(env, descP, recvRef, &readerCheck)) return readerCheck; @@ -7786,6 +7847,9 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else ESockDescriptor* descP; + ERL_NIF_TERM res; + + SGDBG( ("SOCKET", "nif_close -> entry with argc: %d\r\n", argc) ); if ((argc != 1) || !ESOCK_GET_RESOURCE(env, argv[0], (void**) &descP)) { @@ -7795,7 +7859,15 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env, if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); - return esock_close(env, descP); + MLOCK(descP->closeMtx); + + res = esock_close(env, descP); + + MUNLOCK(descP->closeMtx); + + SSDBG( descP, ("SOCKET", "nif_close -> res: %T\r\n", res) ); + + return res; #endif // if defined(__WIN32__) } @@ -7816,8 +7888,6 @@ ERL_NIF_TERM esock_close(ErlNifEnv* env, descP->currentReaderP, descP->currentAcceptorP) ); - MLOCK(descP->closeMtx); - doClose = esock_close_check(env, descP, &reason); if (doClose) { @@ -7826,8 +7896,6 @@ ERL_NIF_TERM esock_close(ErlNifEnv* env, reply = esock_make_error(env, reason); } - MUNLOCK(descP->closeMtx); - SSDBG( descP, ("SOCKET", "esock_close -> [%d] done when: " "\r\n state: 0x%lX" @@ -8096,13 +8164,14 @@ ERL_NIF_TERM nif_shutdown(ErlNifEnv* env, return enif_make_badarg(env); } - if (IS_CLOSED(descP) || IS_CLOSING(descP)) + if (IS_CLOSED(descP)) return esock_make_error(env, atom_closed); if (!ehow2how(ehow, &how)) return enif_make_badarg(env); return esock_shutdown(env, descP, how); + #endif // if defined(__WIN32__) } @@ -11891,7 +11960,7 @@ ERL_NIF_TERM nif_getopt(ErlNifEnv* env, eIsEncoded = argv[1]; eOpt = argv[3]; // Is "normally" an int, but if raw mode: {Int, ValueSz} - if (IS_CLOSED(descP) || IS_CLOSING(descP)) + if (IS_CLOSED(descP)) return esock_make_error(env, atom_closed); SSDBG( descP, @@ -12027,6 +12096,10 @@ ERL_NIF_TERM esock_getopt_otp(ErlNifEnv* env, result = esock_getopt_otp_protocol(env, descP); break; + case ESOCK_OPT_OTP_DTP: + result = esock_getopt_otp_dtp(env, descP); + break; + default: result = esock_make_error(env, esock_atom_einval); break; @@ -12144,128 +12217,175 @@ ERL_NIF_TERM esock_getopt_otp_meta(ErlNifEnv* env, } -/* esock_getopt_otp_domain - Handle the OTP (level) domain option - */ static -ERL_NIF_TERM esock_getopt_otp_domain(ErlNifEnv* env, - ESockDescriptor* descP) +ERL_NIF_TERM getopt_otp_domain(ErlNifEnv* env, int domain) { - ERL_NIF_TERM result, reason; - int val = descP->domain; + ERL_NIF_TERM result; - switch (val) { + switch (domain) { case AF_INET: - result = esock_make_ok2(env, esock_atom_inet); + result = esock_atom_inet; break; #if defined(HAVE_IN6) && defined(AF_INET6) case AF_INET6: - result = esock_make_ok2(env, esock_atom_inet6); + result = esock_atom_inet6; break; #endif #if defined(HAVE_SYS_UN_H) case AF_UNIX: - result = esock_make_ok2(env, esock_atom_local); + result = esock_atom_local; break; #endif default: - reason = MKT2(env, esock_atom_unknown, MKI(env, val)); - result = esock_make_error(env, reason); + result = MKI(env, domain); break; } return result; } +/* + * esock_getopt_otp_domain - Handle the OTP (level) domain option + */ +static +ERL_NIF_TERM esock_getopt_otp_domain(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM domain, result; + + domain = getopt_otp_domain(env, descP->domain); + result = esock_make_ok2(env, domain); + + return result; +} -/* esock_getopt_otp_type - Handle the OTP (level) type options. - */ static -ERL_NIF_TERM esock_getopt_otp_type(ErlNifEnv* env, - ESockDescriptor* descP) +ERL_NIF_TERM getopt_otp_type(ErlNifEnv* env, int type) { - ERL_NIF_TERM result, reason; - int val = descP->type; + ERL_NIF_TERM result; - switch (val) { + switch (type) { case SOCK_STREAM: - result = esock_make_ok2(env, esock_atom_stream); + result = esock_atom_stream; break; case SOCK_DGRAM: - result = esock_make_ok2(env, esock_atom_dgram); + result = esock_atom_dgram; break; #ifdef HAVE_SCTP case SOCK_SEQPACKET: - result = esock_make_ok2(env, esock_atom_seqpacket); + result = esock_atom_seqpacket; break; #endif case SOCK_RAW: - result = esock_make_ok2(env, esock_atom_raw); + result = esock_atom_raw; break; case SOCK_RDM: - result = esock_make_ok2(env, esock_atom_rdm); + result = esock_atom_rdm; break; default: - reason = MKT2(env, esock_atom_unknown, MKI(env, val)); - result = esock_make_error(env, reason); + result = MKI(env, type); break; } return result; } +/* + * esock_getopt_otp_type - Handle the OTP (level) type options. + */ +static +ERL_NIF_TERM esock_getopt_otp_type(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM type, result; + + type = getopt_otp_type(env, descP->type); + result = esock_make_ok2(env, type); + + return result; +} -/* esock_getopt_otp_protocol - Handle the OTP (level) protocol options. - */ static -ERL_NIF_TERM esock_getopt_otp_protocol(ErlNifEnv* env, - ESockDescriptor* descP) +ERL_NIF_TERM getopt_otp_protocol(ErlNifEnv* env, + ESockDescriptor* descP) { - ERL_NIF_TERM result, reason; + ERL_NIF_TERM result; int val = descP->protocol; - switch (val) { - case IPPROTO_IP: + switch (val) { + case IPPROTO_IP: #if defined(AF_LOCAL) - if (descP->domain == AF_LOCAL) { - result = esock_make_ok2(env, esock_atom_default); - } else { - result = esock_make_ok2(env, esock_atom_ip); - } + if (descP->domain == AF_LOCAL) { + result = esock_atom_default; + } else { + result = esock_atom_ip; + } #else - result = esock_make_ok2(env, esock_atom_ip); + result = esock_atom_ip; #endif - break; + break; - case IPPROTO_TCP: - result = esock_make_ok2(env, esock_atom_tcp); - break; + case IPPROTO_TCP: + result = esock_atom_tcp; + break; - case IPPROTO_UDP: - result = esock_make_ok2(env, esock_atom_udp); - break; + case IPPROTO_UDP: + result = esock_atom_udp; + break; #if defined(HAVE_SCTP) - case IPPROTO_SCTP: - result = esock_make_ok2(env, esock_atom_sctp); - break; + case IPPROTO_SCTP: + result = esock_atom_sctp; + break; #endif - default: - reason = MKT2(env, esock_atom_unknown, MKI(env, val)); - result = esock_make_error(env, reason); - break; + default: + result = MKI(env, val); + break; } return result; } +/* + * esock_getopt_otp_protocol - Handle the OTP (level) protocol options. + */ +static +ERL_NIF_TERM esock_getopt_otp_protocol(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM protocol, result; + protocol = getopt_otp_protocol(env, descP); + result = esock_make_ok2(env, protocol); + + return result; +} + + +/* + * esock_getopt_otp_dtp - Handle the OTP (level) type options. + */ +static +ERL_NIF_TERM esock_getopt_otp_dtp(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM domain, type, protocol, dtp, result; + + domain = getopt_otp_domain(env, descP->domain); + type = getopt_otp_type(env, descP->type); + protocol = getopt_otp_protocol(env, descP); + dtp = MKT3(env, domain, type, protocol); + result = esock_make_ok2(env, dtp); + + return result; +} /* The option has *not* been encoded. Instead it has been provided @@ -15062,12 +15182,6 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "esock_cancel_accept_current -> cancel res: %T\r\n", res) ); - /* Clean out the old cobweb's before trying to invite a new spider */ - - esock_free_env("esock_cancel_accept_current - current-accept-env", - descP->currentAcceptor.env); - descP->currentAcceptor.env = NULL; - if (!activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, @@ -15077,13 +15191,6 @@ ERL_NIF_TERM esock_cancel_accept_current(ErlNifEnv* env, descP->state = ESOCK_STATE_LISTENING; descP->currentAcceptorP = NULL; - /* Do we really need this? - * The activate_next_acceptor (actually the requestor_pop) function - * initiates these values if there are no waiting acceptor... - */ - descP->currentAcceptor.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentAcceptor.pid); - MON_INIT(&descP->currentAcceptor.mon); } SSDBG( descP, ("SOCKET", "esock_cancel_accept_current -> done with result:" @@ -15187,10 +15294,8 @@ ERL_NIF_TERM esock_cancel_send_current(ErlNifEnv* env, if (!activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "esock_cancel_send_current -> no more writers\r\n") ); - descP->currentWriterP = NULL; - descP->currentWriter.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentWriter.pid); - esock_monitor_init(&descP->currentWriter.mon); + + descP->currentWriterP = NULL; } SSDBG( descP, ("SOCKET", "esock_cancel_send_current -> done with result:" @@ -15293,10 +15398,8 @@ ERL_NIF_TERM esock_cancel_recv_current(ErlNifEnv* env, if (!activate_next_reader(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "esock_cancel_recv_current -> no more readers\r\n") ); - descP->currentReaderP = NULL; - descP->currentReader.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentReader.pid); - esock_monitor_init(&descP->currentReader.mon); + + descP->currentReaderP = NULL; } SSDBG( descP, ("SOCKET", "esock_cancel_recv_current -> done with result:" @@ -15361,14 +15464,19 @@ ERL_NIF_TERM esock_cancel_mode_select(ErlNifEnv* env, int smode, int rmode) { + /* Assumes cancelling only one mode */ + int selectRes = esock_select_cancel(env, descP->sock, smode, descP); - if (selectRes & rmode) { - /* Was cancelled */ - return esock_atom_ok; - } else if (selectRes > 0) { - /* Has already sent the message */ - return esock_make_error(env, esock_atom_select_sent); + if (selectRes >= 0) { + /* Success */ + if ((selectRes & rmode) != 0) { + /* Was cancelled */ + return esock_atom_ok; + } else { + /* Has already sent the message */ + return esock_make_error(env, esock_atom_select_sent); + } } else { /* Stopped? */ SSDBG( descP, ("SOCKET", @@ -15406,6 +15514,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, if (enif_self(env, &caller) == NULL) { *checkResult = esock_make_error(env, atom_exself); + SSDBG( descP, ("SOCKET", + "send_check_writer -> exself\r\n") ); return FALSE; } @@ -15419,12 +15529,12 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, if (!writer_search4pid(env, descP, &caller)) *checkResult = writer_push(env, descP, caller, ref); else - *checkResult = esock_make_error(env, esock_atom_eagain); + *checkResult = esock_make_error(env, atom_exbusy); SSDBG( descP, ("SOCKET", "send_check_writer -> queue (push) result: %T\r\n", - checkResult) ); + *checkResult) ); return FALSE; @@ -15533,28 +15643,23 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, descP->writePkgMax = descP->writePkgMaxCnt; descP->writePkgMaxCnt = 0; - if (descP->currentWriterP != NULL) { - DEMONP("send_check_ok -> current writer", - env, descP, &descP->currentWriter.mon); - esock_free_env("send_check_ok", descP->currentWriter.env); - descP->currentWriter.env = NULL; - } - SSDBG( descP, ("SOCKET", "send_check_ok -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); + if (descP->currentWriterP != NULL) { + DEMONP("send_check_ok -> current writer", + env, descP, &descP->currentWriter.mon); + } /* * Ok, this write is done maybe activate the next (if any) */ - if (!activate_next_writer(env, descP, sockRef)) { - descP->currentWriterP = NULL; - ESOCK_ASSERT(!descP->currentWriter.env); - descP->currentWriter.env = NULL; - descP->currentWriter.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentWriter.pid); - esock_monitor_init(&descP->currentWriter.mon); + + SSDBG( descP, + ("SOCKET", "send_check_ok -> no more writers\r\n") ); + + descP->currentWriterP = NULL; } return esock_atom_ok; @@ -15573,10 +15678,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, int saveErrno, ERL_NIF_TERM sockRef) { - ESockRequestor req; - ERL_NIF_TERM reason; + ERL_NIF_TERM reason; - req.env = NULL; ESOCK_CNT_INC(env, descP, sockRef, atom_write_fails, &descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) ); @@ -15592,24 +15695,44 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, if (descP->currentWriterP != NULL) { - DEMONP("send_check_fail -> current writer", - env, descP, &descP->currentWriter.mon); + esock_release_current("send_check_fail", + env, descP, descP->currentWriterP); - while (writer_pop(env, descP, &req)) { - SSDBG( descP, - ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, req.env, - reason, &req.pid); - req.env = NULL; - DEMONP("send_check_fail -> pop'ed writer", - env, descP, &req.mon); - } + send_error_waiting_writers(env, descP, sockRef, reason); + + descP->currentWriterP = NULL; } } - return esock_make_error(env, reason); } +/* *** send_error_current_writer *** + * + * Process all waiting writers when a fatal error has occured. + * All waiting writers will be "aborted", that is a + * nif_abort message will be sent (with ref and reason). + */ +static +void send_error_waiting_writers(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM reason) +{ + ESockRequestor req; + + req.env = NULL; /* read by writer_pop before free */ + while (writer_pop(env, descP, &req)) { + SSDBG( descP, + ("SOCKET", "send_error_current_writer -> abort %T\r\n", + req.pid) ); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); + req.env = NULL; + DEMONP("send_error_waiting_writers -> pop'ed writer", + env, descP, &req.mon); + } +} + /* *** send_check_retry *** @@ -15644,11 +15767,14 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, enif_set_pid_undefined(&descP->currentWriter.pid); return esock_make_error(env, atom_exmon); } else { - ESOCK_ASSERT(!descP->currentWriter.env); + ESOCK_ASSERT(descP->currentWriter.env == NULL); descP->currentWriter.env = esock_alloc_env("current-writer"); - descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); + descP->currentWriter.ref = + CP_TERM(descP->currentWriter.env, sendRef); descP->currentWriterP = &descP->currentWriter; } + } else { + descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); } ESOCK_CNT_INC(env, descP, sockRef, atom_write_waits, &descP->writeWaits, 1); @@ -15724,8 +15850,6 @@ BOOLEAN_T recv_check_reader(ErlNifEnv* env, } if (COMPARE_PIDS(&descP->currentReader.pid, &caller) != 0) { - ERL_NIF_TERM tmp; - /* Not the "current reader", so (maybe) push onto queue */ SSDBG( descP, @@ -15733,15 +15857,14 @@ BOOLEAN_T recv_check_reader(ErlNifEnv* env, "recv_check_reader -> not (current) reader\r\n") ); if (!reader_search4pid(env, descP, &caller)) - tmp = reader_push(env, descP, caller, ref); + *checkResult = reader_push(env, descP, caller, ref); else - tmp = esock_make_error(env, esock_atom_eagain); + *checkResult = esock_make_error(env, atom_exbusy); SSDBG( descP, ("SOCKET", - "recv_check_reader -> queue (push) result: %T\r\n", tmp) ); - - *checkResult = tmp; + "recv_check_reader -> queue (push) result: %T\r\n", + *checkResult) ); return FALSE; @@ -15824,20 +15947,13 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, DEMONP("recv_update_current_reader", env, descP, &descP->currentReader.mon); - esock_free_env("recv_update_current_reader - current-read-env", - descP->currentReader.env); - descP->currentReader.env = NULL; - if (!activate_next_reader(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "recv_update_current_reader -> no more readers\r\n") ); - descP->currentReaderP = NULL; - descP->currentReader.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentReader.pid); - esock_monitor_init(&descP->currentReader.mon); + descP->currentReaderP = NULL; } } @@ -15852,7 +15968,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, * Process the current reader and any waiting readers * when a read (fatal) error has occured. * All waiting readers will be "aborted", that is a - * nif_abort message will be sent (with reaf and reason). + * nif_abort message will be sent (with ref and reason). */ static void recv_error_current_reader(ErlNifEnv* env, @@ -15860,14 +15976,13 @@ void recv_error_current_reader(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM reason) { - ESockRequestor req; - - req.env = NULL; if (descP->currentReaderP != NULL) { + ESockRequestor req; - DEMONP("recv_error_current_reader -> current reader", - env, descP, &descP->currentReader.mon); + esock_release_current("recv_error_current_reader", + env, descP, descP->currentReaderP); + req.env = NULL; /* read by reader_pop before free */ while (reader_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", @@ -15878,6 +15993,8 @@ void recv_error_current_reader(ErlNifEnv* env, DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &req.mon); } + + descP->currentReaderP = NULL; } } @@ -15918,8 +16035,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, */ if ((read == 0) && (descP->type == SOCK_STREAM)) { - - res = esock_make_error(env, atom_closed); + ERL_NIF_TERM reason = atom_closed; + res = esock_make_error(env, reason); ESOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); @@ -15932,7 +16049,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * We must also notify any waiting readers! */ - recv_error_current_reader(env, descP, sockRef, res); + recv_error_current_reader(env, descP, sockRef, reason); FREE_BIN(bufP); @@ -16074,27 +16191,25 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env, ESOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read); descP->readPkgMaxCnt += read; - if (descP->rNum > 0) { - - descP->rNumCnt++; - if (descP->rNumCnt >= descP->rNum) { + descP->rNumCnt++; + if (descP->rNumCnt >= descP->rNum) { - descP->rNumCnt = 0; + descP->rNumCnt = 0; - ESOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); - if (descP->readPkgMaxCnt > descP->readPkgMax) - descP->readPkgMax = descP->readPkgMaxCnt; - descP->readPkgMaxCnt = 0; + ESOCK_CNT_INC(env, descP, sockRef, + atom_read_pkg, &descP->readPkgCnt, 1); + if (descP->readPkgMaxCnt > descP->readPkgMax) + descP->readPkgMax = descP->readPkgMaxCnt; + descP->readPkgMaxCnt = 0; - recv_update_current_reader(env, descP, sockRef); + recv_update_current_reader(env, descP, sockRef); - /* This transfers "ownership" of the *allocated* binary to an - * erlang term (no need for an explicit free). - */ + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ - return esock_make_ok3(env, atom_true, MKBIN(env, bufP)); + return esock_make_ok3(env, atom_true, MKBIN(env, bufP)); - } } /* Yes, we *do* need to continue reading */ @@ -16179,13 +16294,13 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, /* +++ Oups - closed +++ */ - SSDBG( descP, ("SOCKET", "recv_check_fail -> closed\r\n") ); + SSDBG( descP, ("SOCKET", "recv_check_fail econnreset -> closed\r\n") ); // This is a bit overkill (to count here), but just in case... ESOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); - res = recv_check_fail_closed(env, descP, sockRef, recvRef); + res = recv_check_fail_econnreset(env, descP, sockRef, recvRef); } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -16210,19 +16325,19 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, -/* *** recv_check_fail_closed *** +/* *** recv_check_fail_econnreset *** * * We detected that the socket was closed wile reading. * Inform current and waiting readers. */ static -ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env, - ESockDescriptor* descP, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM recvRef) +ERL_NIF_TERM recv_check_fail_econnreset(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); - int sres; + ERL_NIF_TERM reason = atom_econnreset; + ERL_NIF_TERM res = esock_make_error(env, atom_econnreset); /* <KOLLA> * @@ -16239,16 +16354,18 @@ ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env, * </KOLLA> */ + recv_error_current_reader(env, descP, sockRef, reason); + + MUNLOCK(descP->readMtx); + + MLOCK(descP->closeMtx); + MLOCK(descP->readMtx); + + descP->isReadable = FALSE; descP->closeLocal = FALSE; descP->state = ESOCK_STATE_CLOSING; - recv_error_current_reader(env, descP, sockRef, res); - - if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { - esock_warning_msg("Failed stop select (closed) " - "for current reader (%T): %d\r\n", - recvRef, sres); - } + MUNLOCK(descP->closeMtx); return res; } @@ -16301,11 +16418,11 @@ ERL_NIF_TERM recv_check_fail_gen(ErlNifEnv* env, int saveErrno, ERL_NIF_TERM sockRef) { - ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + ERL_NIF_TERM reason = MKA(env, erl_errno_id(saveErrno)); - recv_error_current_reader(env, descP, sockRef, res); + recv_error_current_reader(env, descP, sockRef, reason); - return res; + return esock_make_error(env, reason); } @@ -16449,7 +16566,7 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, /* The recvfrom function delivers one (1) message. If our buffer - * is to small, the message will be truncated. So, regardless + * is too small, the message will be truncated. So, regardless * if we filled the buffer or not, we have got what we are going * to get regarding this message. */ @@ -18862,10 +18979,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock.w[%d]", sock); descP->writeMtx = MCREATE(buf); - enif_set_pid_undefined(&descP->currentWriter.pid); - MON_INIT(&descP->currentWriter.mon); - descP->currentWriter.env = NULL; - descP->currentWriter.ref = esock_atom_undefined; + requestor_clear(&descP->currentWriter); descP->currentWriterP = NULL; // currentWriter not used descP->writersQ.first = NULL; descP->writersQ.last = NULL; @@ -18880,10 +18994,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock.r[%d]", sock); descP->readMtx = MCREATE(buf); - enif_set_pid_undefined(&descP->currentReader.pid); - MON_INIT(&descP->currentReader.mon); - descP->currentReader.env = NULL; - descP->currentReader.ref = esock_atom_undefined; + requestor_clear(&descP->currentReader); descP->currentReaderP = NULL; // currentReader not used descP->readersQ.first = NULL; descP->readersQ.last = NULL; @@ -18898,10 +19009,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock.acc[%d]", sock); descP->accMtx = MCREATE(buf); - enif_set_pid_undefined(&descP->currentAcceptor.pid); - MON_INIT(&descP->currentAcceptor.mon); - descP->currentAcceptor.env = NULL; - descP->currentAcceptor.ref = esock_atom_undefined; + requestor_clear(&descP->currentAcceptor); descP->currentAcceptorP = NULL; // currentAcceptor not used descP->acceptorsQ.first = NULL; descP->acceptorsQ.last = NULL; @@ -18920,7 +19028,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock.cfg[%d]", sock); descP->cfgMtx = MCREATE(buf); descP->rBufSz = ESOCK_RECV_BUFFER_SIZE_DEFAULT; - descP->rNum = 0; + descP->rNum = ESOCK_RECV_BUFFER_COUNT_DEFAULT; descP->rNumCnt = 0; descP->rCtrlSz = ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT; descP->wCtrlSz = ESOCK_SEND_CTRL_BUFFER_SIZE_DEFAULT; @@ -19621,6 +19729,7 @@ char* esock_send_close_msg(ErlNifEnv* env, { ERL_NIF_TERM sockRef, msg; ErlNifEnv* menv; + char* result; if (descP->closeEnv != NULL) { sockRef = enif_make_resource(descP->closeEnv, descP); @@ -19632,7 +19741,9 @@ char* esock_send_close_msg(ErlNifEnv* env, menv = NULL; // This has the effect that the message will be copied } - return esock_send_msg(env, pid, msg, menv); + result = esock_send_msg(env, pid, msg, menv); + descP->closeEnv = NULL; + return result; } @@ -19893,6 +20004,14 @@ int esock_select_write(ErlNifEnv* env, } +/* *** esock_select_stop *** + * + * WARNING: enif_select may call esock_stop directly + * in which case deadlock is avoided by esock_stop that checks + * if it got a direct call and then does not lock closeMtx. + * + * So closeMtx is supposed to be locked when this function is called. + */ static int esock_select_stop(ErlNifEnv* env, ErlNifEvent event, @@ -20164,15 +20283,19 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q, return TRUE; } else { /* Queue was empty */ - enif_set_pid_undefined(&reqP->pid); - MON_INIT(&reqP->mon); - reqP->env = NULL; - reqP->ref = esock_atom_undefined; // Just in case + requestor_clear(reqP); return FALSE; } } +static void requestor_clear(ESockRequestor* reqP) { + enif_set_pid_undefined(&reqP->pid); + MON_INIT(&reqP->mon); + reqP->env = NULL; + reqP->ref = esock_atom_undefined; +} + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, @@ -20375,7 +20498,7 @@ int esock_demonitor(const char* slogan, res = enif_demonitor_process(env, descP, &monP->mon); if (res == 0) { - esock_monitor_init(monP); + MON_INIT(monP); } else { SSDBG( descP, ("SOCKET", "[%d][%T] %s: demonitor failed: %d\r\n", @@ -20404,6 +20527,17 @@ ERL_NIF_TERM esock_make_monitor_term(ErlNifEnv* env, const ESockMonitor* monP) +static +void esock_release_current(const char* slogan, + ErlNifEnv* env, + ESockDescriptor* descP, + ESockRequestor* current) +{ + DEMONP(slogan, env, descP, ¤t->mon); + esock_free_env(slogan, current->env); + requestor_clear(current); +} + #endif // if !defined(__WIN32__) @@ -20435,12 +20569,12 @@ void esock_dtor(ErlNifEnv* env, void* obj) #if !defined(__WIN32__) ESockDescriptor* descP = (ESockDescriptor*) obj; - SGDBG( ("SOCKET", "dtor -> try destroy write mutex\r\n") ); - MDESTROY(descP->writeMtx); descP->writeMtx = NULL; - SGDBG( ("SOCKET", "dtor -> try destroy read mutex\r\n") ); MDESTROY(descP->readMtx); descP->readMtx = NULL; + SGDBG( ("SOCKET", "dtor -> try destroy write mutex\r\n") ); + MDESTROY(descP->writeMtx); descP->writeMtx = NULL; + SGDBG( ("SOCKET", "dtor -> try destroy accept mutex\r\n") ); MDESTROY(descP->accMtx); descP->accMtx = NULL; @@ -20510,12 +20644,14 @@ void esock_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) ); /* +++ Lock it down +++ */ - - MLOCK(descP->writeMtx); + + /* If we are called with a direct call; we already have closeMtx + */ + if (!is_direct_call) MLOCK(descP->closeMtx); MLOCK(descP->readMtx); MLOCK(descP->accMtx); + MLOCK(descP->writeMtx); MLOCK(descP->cfgMtx); - if (!is_direct_call) MLOCK(descP->closeMtx); SSDBG( descP, ("SOCKET", "esock_stop -> " "[%d, %T] all mutex(s) locked when counters:" @@ -20668,15 +20804,18 @@ void esock_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } } - esock_free_env("esoc_stop - meta-env", descP->meta.env); + if (descP->meta.env != NULL) { + esock_free_env("esock_stop - meta-env", descP->meta.env); + descP->meta.env = NULL; + } SSDBG( descP, ("SOCKET", "esock_stop -> unlock all mutex(s)\r\n") ); - if (!is_direct_call) MUNLOCK(descP->closeMtx); MUNLOCK(descP->cfgMtx); + MUNLOCK(descP->writeMtx); MUNLOCK(descP->accMtx); MUNLOCK(descP->readMtx); - MUNLOCK(descP->writeMtx); + if (!is_direct_call) MUNLOCK(descP->closeMtx); /* And finally update the registry */ esock_send_reg_del_msg(env, sockRef); @@ -20706,7 +20845,8 @@ void esock_stop_handle_current(ErlNifEnv* env, DEMONP("esock_stop_handle_current", env, descP, &reqP->mon); - if (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0) { + if ((! enif_is_pid_undefined(&descP->closerPid)) && + (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0)) { SSDBG( descP, ("SOCKET", "esock_stop_handle_current -> " "send abort message to current %s %T\r\n", @@ -20715,9 +20855,10 @@ void esock_stop_handle_current(ErlNifEnv* env, if (esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env, atom_closed, &reqP->pid) != NULL) { - esock_warning_msg("Failed sending abort (%T) message to " + esock_warning_msg("esock_stop_handle_current: " + "Failed sending abort (closed) message to " "current %s %T\r\n", - reqP->ref, role, reqP->pid); + role, reqP->pid); } reqP->env = NULL; } @@ -20769,9 +20910,10 @@ void inform_waiting_procs(ErlNifEnv* env, reason, ¤tP->data.pid) != NULL) { - esock_warning_msg("Failed sending abort (%T) message to " + esock_warning_msg("inform_waiting_procs: " + "Failed sending abort (%T) message to " "current %s %T\r\n", - currentP->data.ref, + reason, role, currentP->data.pid); @@ -20825,6 +20967,8 @@ void esock_down(ErlNifEnv* env, * we leave it to the stop callback function. */ + MLOCK(descP->closeMtx); + SSDBG( descP, ("SOCKET", "esock_down -> controlling process exit\r\n") ); @@ -20917,6 +21061,8 @@ void esock_down(ErlNifEnv* env, MON2T(env, mon)); } + MUNLOCK(descP->closeMtx); + } else if (COMPARE_PIDS(&descP->connPid, pid) == 0) { /* The connPid is only set during the connection. @@ -20940,19 +21086,19 @@ void esock_down(ErlNifEnv* env, sockRef = enif_make_resource(env, descP); + MLOCK(descP->readMtx); MLOCK(descP->accMtx); + MLOCK(descP->writeMtx); + + if (descP->currentReaderP != NULL) + esock_down_reader(env, descP, sockRef, pid); if (descP->currentAcceptorP != NULL) esock_down_acceptor(env, descP, sockRef, pid); - MUNLOCK(descP->accMtx); - - MLOCK(descP->writeMtx); if (descP->currentWriterP != NULL) esock_down_writer(env, descP, sockRef, pid); - MUNLOCK(descP->writeMtx); - MLOCK(descP->readMtx); - if (descP->currentReaderP != NULL) - esock_down_reader(env, descP, sockRef, pid); + MUNLOCK(descP->writeMtx); + MUNLOCK(descP->accMtx); MUNLOCK(descP->readMtx); } @@ -20988,12 +21134,8 @@ void esock_down_acceptor(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "esock_down_acceptor -> no more writers\r\n") ); - descP->state = ESOCK_STATE_LISTENING; - - descP->currentAcceptorP = NULL; - descP->currentAcceptor.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentAcceptor.pid); - esock_monitor_init(&descP->currentAcceptor.mon); + descP->state = ESOCK_STATE_LISTENING; + descP->currentAcceptorP = NULL; } } else { @@ -21029,12 +21171,11 @@ void esock_down_writer(ErlNifEnv* env, "current writer - try activate next\r\n") ); if (!activate_next_writer(env, descP, sockRef)) { + SSDBG( descP, ("SOCKET", "esock_down_writer -> no active writer\r\n") ); + descP->currentWriterP = NULL; - descP->currentWriter.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentWriter.pid); - esock_monitor_init(&descP->currentWriter.mon); } } else { @@ -21070,13 +21211,12 @@ void esock_down_reader(ErlNifEnv* env, "current reader - try activate next\r\n") ); if (!activate_next_reader(env, descP, sockRef)) { + SSDBG( descP, ("SOCKET", "esock_down_reader -> no more readers\r\n") ); + descP->currentReaderP = NULL; - descP->currentReader.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentReader.pid); - esock_monitor_init(&descP->currentReader.mon); } } else { diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c index 6506ddc1e8..a30c2e36ae 100644 --- a/erts/emulator/nifs/common/socket_util.c +++ b/erts/emulator/nifs/common/socket_util.c @@ -1092,50 +1092,43 @@ char* esock_decode_timeval(ErlNifEnv* env, if (!GET_MAP_VAL(env, eTime, esock_atom_usec, &eUSec)) return ESOCK_STR_EINVAL; - /* On some platforms (e.g. OpenBSD) this is a 'long long' and on others - * (e.g. Linux) its a long. - * As long as they are both 64 bits, its easy (use our own signed 64-bit int - * and then cast). But if they are either not 64 bit, or they are of different size - * then we make it easy on ourselves and use long and then cast to whatever - * type sec is. + /* Use the appropriate variable type and nif function + * to decode the value from Erlang into the struct timeval fields */ -#if (SIZEOF_LONG_LONG == SIZEOF_LONG) && (SIZEOF_LONG == 8) - { + { /* time_t tv_sec; */ +#if (SIZEOF_TIME_T == 8) ErlNifSInt64 sec; if (!GET_INT64(env, eSec, &sec)) return ESOCK_STR_EINVAL; - timeP->tv_sec = (typeof(timeP->tv_sec)) sec; - } -#else - { +#elif (SIZEOF_TIME_T == SIZEOF_INT) + int sec; + if (!GET_INT(env, eSec, &sec)) + return ESOCK_STR_EINVAL; +#else /* long or other e.g undefined */ long sec; if (!GET_LONG(env, eSec, &sec)) return ESOCK_STR_EINVAL; - timeP->tv_sec = (typeof(timeP->tv_sec)) sec; - } #endif + timeP->tv_sec = sec; + } - #if (SIZEOF_INT == 4) - { + { /* suseconds_t tv_usec; */ +#if (SIZEOF_SUSECONDS_T == 8) + ErlNifSInt64 usec; + if (!GET_INT64(env, eSec, &usec)) + return ESOCK_STR_EINVAL; +#elif (SIZEOF_SUSECONDS_T == SIZEOF_INT) int usec; - if (!GET_INT(env, eUSec, &usec)) + if (!GET_INT(env, eSec, &usec)) return ESOCK_STR_EINVAL; - timeP->tv_usec = (typeof(timeP->tv_usec)) usec; - } -#elif (SIZEOF_LONG == 4) - { +#else /* long or other e.g undefined */ long usec; - if (!GET_LONG(env, eUSec, &usec)) + if (!GET_LONG(env, eSec, &usec)) return ESOCK_STR_EINVAL; - timeP->tv_usec = (typeof(timeP->tv_usec)) usec; +#endif + timeP->tv_usec = usec; } -#else - /* Ok, we give up... */ - if (!GET_LONG(env, eUSec, &timeP->tv_usec)) - return ESOCK_STR_EINVAL; -#endif - return NULL; } diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 6d36caa5e0..bce4ae1a5f 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -15826,6 +15826,16 @@ api_opt_sock_timestamp_tcp(InitState) -> ERROR end end}, + %% Linux pecularity observed here... + %% Detected on Kernel 4.15.0-72 x96_64. + %% The option set to enable receiving timestamps just above + %% has failed to be effective down in "await recv reply 2 + %% (from server, w timestamp)" below, unless we put the + %% sleep between setting the option and informing + %% the writer that it shall write to the other socket end. + %% A sleep 1 ms improves a lot but does not remove + %% problem completely. Believe it or not. + ?SEV_SLEEP(100), #{desc => "announce ready (timestamp on)", cmd => fun(#{tester := Tester}) -> ?SEV_ANNOUNCE_READY(Tester, timestamp_on), @@ -15846,7 +15856,7 @@ api_opt_sock_timestamp_tcp(InitState) -> ok end}, #{desc => "await recv reply 2 (from server, w timestamp)", - cmd => fun(#{sock := Sock, recv := Recv}) -> + cmd => fun(#{sock := Sock, recv := Recv, get := Get}) -> case Recv(Sock) of {ok, {[#{level := socket, type := timestamp, @@ -15856,6 +15866,8 @@ api_opt_sock_timestamp_tcp(InitState) -> "~n ~p", [TS]), ok; {ok, {BadCMsgHdrs, ?BASIC_REP}} -> + ?SEV_EPRINT("Current timestamp value:" + " ~p", [Get(Sock)]), {error, {unexpected_reply_cmsghdrs, BadCMsgHdrs}}; {ok, {[#{level := socket, @@ -16012,7 +16024,7 @@ api_opt_sock_timestamp_tcp(InitState) -> ?SEV_ANNOUNCE_CONTINUE(Server, accept), ok end}, - ?SEV_SLEEP(?SECS(1)), +%%% ?SEV_SLEEP(?SECS(1)), #{desc => "order client to continue (with connect)", cmd => fun(#{client := Client} = _State) -> ?SEV_ANNOUNCE_CONTINUE(Client, connect), @@ -16187,8 +16199,6 @@ api_opt_sock_timestamp_tcp(InitState) -> ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). - - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Tests that the add_mambership and drop_membership ip options work. @@ -27701,8 +27711,7 @@ traffic_send_and_recv_tcp(InitState) -> local_sa := #{path := Path}}) -> ?SEV_ANNOUNCE_READY(Tester, init, Path), ok; - (#{lsock := LSock, - tester := Tester, + (#{tester := Tester, lport := Port}) -> ?SEV_ANNOUNCE_READY(Tester, init, Port), ok diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 9fcb4ff712..1cffd45f05 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 2e38ede125..eb1bcdce66 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -778,6 +778,7 @@ -define(ESOCK_OPT_OTP_DOMAIN, 16#FF01). % INTERNAL -define(ESOCK_OPT_OTP_TYPE, 16#FF02). % INTERNAL -define(ESOCK_OPT_OTP_PROTOCOL, 16#FF03). % INTERNAL +-define(ESOCK_OPT_OTP_DTP, 16#FF04). % INTERNAL %% *** SOCKET (socket) options -define(ESOCK_OPT_SOCK_ACCEPTCONN, 1). @@ -1104,7 +1105,7 @@ supports() -> {recv_flags, supports(recv_flags)}]. --dialyzer({nowarn_function, supports/1}). +-dialyzer({no_contracts, supports/1}). -spec supports(options) -> supports_options(); (sctp) -> boolean(); (ipv6) -> boolean(); @@ -1129,7 +1130,7 @@ supports(recv_flags) -> supports(_Key1) -> false. --dialyzer({nowarn_function, supports/2}). +-dialyzer({no_contracts, supports/2}). -spec supports(options, socket) -> supports_options_socket(); (options, ip) -> supports_options_ip(); (options, ipv6) -> supports_options_ipv6(); @@ -1152,7 +1153,6 @@ supports(_Key1, _Level) -> false. --dialyzer({nowarn_function, supports/3}). -spec supports(options, socket, Opt :: socket_option()) -> boolean(); (options, ip, Opt :: ip_socket_option()) -> boolean(); (options, ipv6, Opt :: ipv6_socket_option()) -> boolean(); @@ -1164,6 +1164,7 @@ supports(_Key1, _Level) -> Key2 :: term(), Key3 :: term(). +-dialyzer({no_contracts, supports/3}). supports(options, Level, Opt) -> case supports(options, Level) of S when is_list(S) -> @@ -1262,23 +1263,17 @@ open(Domain, Type, Protocol, Extra) when is_map(Extra) -> EDomain = enc_domain(Domain), EType = enc_type(Type), EProtocol = enc_protocol(Protocol), - case nif_open(EDomain, EType, EProtocol, Extra) of - {ok, SockRef} -> - Socket = #socket{ref = SockRef}, - {ok, Socket}; - {error, _} = ERROR -> - ERROR - end + nif_open(EDomain, EType, EProtocol, Extra) end + of + {ok, SockRef} -> + Socket = #socket{ref = SockRef}, + {ok, Socket}; + {error, _} = ERROR -> + ERROR catch - throw:T -> - T; - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> - error:Reason -> - {error, Reason} + throw:ERROR -> + ERROR end. @@ -1300,31 +1295,23 @@ bind(#socket{ref = SockRef}, Addr) when ((Addr =:= any) orelse (Addr =:= broadcast) orelse (Addr =:= loopback)) -> - try which_domain(SockRef) of - inet -> - nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr)); - inet6 when (Addr =:= any) orelse (Addr =:= loopback) -> - nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr)); - _ -> - einval() + try + case which_domain(SockRef) of + inet -> + nif_bind(SockRef, ?SOCKADDR_IN4_DEFAULT(Addr)); + inet6 when (Addr =:= any) orelse (Addr =:= loopback) -> + nif_bind(SockRef, ?SOCKADDR_IN6_DEFAULT(Addr)); + Domain -> + invalid_domain(Domain) + end catch - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> throw:ERROR -> ERROR end; bind(#socket{ref = SockRef} = _Socket, Addr) when is_map(Addr) -> try - begin - nif_bind(SockRef, ensure_sockaddr(Addr)) - end + nif_bind(SockRef, ensure_sockaddr(Addr)) catch - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> throw:ERROR -> ERROR end. @@ -1353,35 +1340,28 @@ bind(#socket{ref = SockRef}, Addrs, Action) when is_list(Addrs) andalso ((Action =:= add) orelse (Action =:= remove)) -> try begin - ensure_type(SockRef, seqpacket), - ensure_proto(SockRef, sctp), - validate_addrs(which_domain(SockRef), Addrs), + {Domain, Type, Proto} = which_dtp(SockRef), + ensure_domain(Domain, [inet, inet6]), + ensure_type(Type, seqpacket), + ensure_protocol(Proto, sctp), + validate_addrs(Domain, Addrs), nif_bind(SockRef, Addrs, Action) end catch - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> throw:ERROR -> ERROR end. -ensure_type(SockRef, Type) -> - case which_type(SockRef) of - Type -> - ok; - _InvalidType -> - einval() - end. +ensure_domain(Domain, [Domain | _]) -> ok; +ensure_domain(Domain, [_ | Domains]) -> ensure_domain(Domain, Domains); +ensure_domain(Domain, []) -> invalid_domain(Domain). + +ensure_type(Type, Type) -> ok; +ensure_type(Type, _) -> invalid_type(Type). + +ensure_protocol(Proto, Proto) -> ok; +ensure_protocol(Proto, _) -> invalid_protocol(Proto). -ensure_proto(SockRef, Proto) -> - case which_protocol(SockRef) of - Proto -> - ok; - _InvalidProto -> - einval() - end. validate_addrs(inet = _Domain, Addrs) -> validate_inet_addrs(Addrs); @@ -1441,38 +1421,45 @@ connect(Socket, SockAddr) -> %% <KOLLA> %% Is it possible to connect with family = local for the (dest) sockaddr? %% </KOLLA> -connect(_Socket, _SockAddr, Timeout) - when (is_integer(Timeout) andalso (Timeout =< 0)) -> - {error, timeout}; -connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) - when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso - ((Timeout =:= nowait) orelse - (Timeout =:= infinity) orelse is_integer(Timeout)) -> - TS = timestamp(Timeout), +connect(#socket{ref = SockRef}, SockAddr, Timeout) -> + try + do_connect(SockRef, ensure_sockaddr(SockAddr), deadline(Timeout)) + catch + throw:ERROR -> + ERROR + end. + + +do_connect(SockRef, SockAddr, Deadline) -> case nif_connect(SockRef, ensure_sockaddr(SockAddr)) of + ok -> %% Connected! ok; - {ok, Ref} when (Timeout =:= nowait) -> + + {ok, Ref} when (Deadline =:= nowait) -> %% Connecting, but the caller does not want to wait... ?SELECT(connect, Ref); {ok, Ref} -> %% Connecting... - NewTimeout = next_timeout(TS, Timeout), - receive + Timeout = timeout(Deadline), + receive {?ESOCK_TAG, #socket{ref = SockRef}, select, Ref} -> - nif_finalize_connection(SockRef) - after NewTimeout -> + nif_finalize_connection(SockRef) + after Timeout -> cancel(SockRef, connect, Ref), - {error, timeout} - end; - {error, _} = ERROR -> - ERROR + {error, timeout} + end; + + + {error, _} = ERROR -> + ERROR end. + %% =========================================================================== %% %% listen - listen for connections on a socket @@ -1524,45 +1511,44 @@ accept(Socket) -> Socket :: socket(), Reason :: term(). -%% Do we really need this optimization? -accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> - {error, timeout}; -accept(#socket{ref = LSockRef}, Timeout) - when is_integer(Timeout) orelse - (Timeout =:= infinity) orelse - (Timeout =:= nowait) -> - do_accept(LSockRef, Timeout). - -do_accept(LSockRef, Timeout) -> - TS = timestamp(Timeout), +accept(#socket{ref = LSockRef}, Timeout) -> + try + do_accept(LSockRef, deadline(Timeout)) + catch + throw:ERROR -> + ERROR + end. + +do_accept(LSockRef, Deadline) -> AccRef = make_ref(), case nif_accept(LSockRef, AccRef) of + {ok, SockRef} -> Socket = #socket{ref = SockRef}, {ok, Socket}; - {error, eagain} when (Timeout =:= nowait) -> + {error, eagain} when (Deadline =:= nowait) -> ?SELECT(accept, AccRef); - {error, eagain} -> %% Each call is non-blocking, but even then it takes %% *some* time, so just to be sure, recalculate before %% the receive. - NewTimeout = next_timeout(TS, Timeout), + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = LSockRef}, select, AccRef} -> - do_accept(LSockRef, next_timeout(TS, Timeout)); + do_accept(LSockRef, Deadline); {?ESOCK_TAG, _Socket, abort, {AccRef, Reason}} -> {error, Reason} - after NewTimeout -> + after Timeout -> cancel(LSockRef, accept, AccRef), {error, timeout} end; + {error, _} = ERROR -> cancel(LSockRef, accept, AccRef), % Just to be on the safe side... ERROR @@ -1587,17 +1573,19 @@ send(Socket, Data) -> Socket :: socket(), Data :: iodata(), Flags :: send_flags(), - Reason :: term() - ; (Socket, Data, Timeout :: nowait) -> ok | - {select, SelectInfo} | - {ok, {RestData, SelectInfo}} | - {error, Reason} when + Reason :: term(); + (Socket, Data, Timeout :: nowait) -> + ok | + {ok, {binary(), SelectInfo}} | + {select, SelectInfo} | + {ok, {RestData, SelectInfo}} | + {error, Reason} when Socket :: socket(), Data :: iodata(), RestData :: binary(), SelectInfo :: select_info(), - Reason :: term() - ; (Socket, Data, Timeout) -> ok | {error, Reason} when + Reason :: term(); + (Socket, Data, Timeout) -> ok | {error, Reason} when Socket :: socket(), Data :: iodata(), Timeout :: timeout(), @@ -1629,75 +1617,95 @@ send(Socket, Data, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), send(Socket, Bin, Flags, Timeout); send(#socket{ref = SockRef}, Data, Flags, Timeout) - when is_binary(Data) andalso - is_list(Flags) andalso - ((Timeout =:= nowait) orelse - (Timeout =:= infinity) orelse - (is_integer(Timeout) andalso (Timeout > 0))) -> - EFlags = enc_send_flags(Flags), - do_send(SockRef, Data, EFlags, Timeout). - -do_send(SockRef, Data, EFlags, Timeout) -> - TS = timestamp(Timeout), + when is_binary(Data), is_list(Flags) -> + To = undefined, + try + begin + EFlags = enc_send_flags(Flags), + Deadline = deadline(Timeout), + send_common(SockRef, Data, To, EFlags, Deadline, send) + end + catch + throw:ERROR -> + ERROR + end. + +send_common(SockRef, Data, To, EFlags, Deadline, SendName) -> + SendRef = make_ref(), - case nif_send(SockRef, SendRef, Data, EFlags) of - ok -> - ok; + case + case SendName of + send -> + nif_send(SockRef, SendRef, Data, EFlags); + sendto -> + nif_sendto(SockRef, SendRef, Data, To, EFlags) + end + of - {ok, Written} when (Timeout =:= nowait) -> - <<_:Written/binary, Rest/binary>> = Data, + ok -> ok; + + + {ok, Written} when (Deadline =:= nowait) -> %% We are partially done, but the user don't want to wait (here) %% for completion - {ok, {Rest, ?SELECT_INFO(send, SendRef)}}; - + <<_:Written/binary, Rest/binary>> = Data, + {ok, {Rest, ?SELECT_INFO(SendName, SendRef)}}; {ok, Written} -> - NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, - do_send(SockRef, Rest, EFlags, - next_timeout(TS, Timeout)); + send_common( + SockRef, Rest, To, EFlags, Deadline, SendName); {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} -> - do_send(SockRef, Data, EFlags, - next_timeout(TS, Timeout)); + send_common( + SockRef, Data, To, EFlags, Deadline, SendName); {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} -> - {error, Reason} + {error, {Reason, size(Data)}} - after NewTimeout -> - cancel(SockRef, send, SendRef), + after Timeout -> + _ = cancel(SockRef, SendName, SendRef), {error, {timeout, size(Data)}} end; - {error, eagain} when (Timeout =:= nowait) -> - ?SELECT(send, SendRef); + {error, exbusy} = Error when Deadline =:= nowait -> Error; + + {error, exbusy = Reason} -> + %% Internal error: + %% we called send, got eagain, and called send again + %% - without waiting for select message + erlang:error(Reason); + + {error, eagain} when (Deadline =:= nowait) -> + ?SELECT(SendName, SendRef); {error, eagain} -> + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} -> - do_send(SockRef, Data, EFlags, - next_timeout(TS, Timeout)); + send_common( + SockRef, Data, To, EFlags, Deadline, SendName); {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} -> - {error, Reason} + {error, {Reason, size(Data)}} after Timeout -> - cancel(SockRef, send, SendRef), + _ = cancel(SockRef, SendName, SendRef), {error, {timeout, size(Data)}} end; - {error, _} = ERROR -> - ERROR - end. - + {error, Reason} -> + {error, {Reason, size(Data)}} + end. %% --------------------------------------------------------------------------- @@ -1740,9 +1748,11 @@ sendto(Socket, Data, Dest, Timeout) -> sendto(Socket, Data, Dest, ?ESOCK_SENDTO_FLAGS_DEFAULT, Timeout). --spec sendto(Socket, Data, Dest, Flags, nowait) -> ok | - {select, SelectInfo} | - {error, Reason} when +-spec sendto(Socket, Data, Dest, Flags, nowait) -> + ok | + {ok, {binary(), SelectInfo}} | + {select, SelectInfo} | + {error, Reason} when Socket :: socket(), Data :: binary(), Dest :: sockaddr(), @@ -1760,75 +1770,21 @@ sendto(Socket, Data, Dest, Timeout) -> sendto(Socket, Data, Dest, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), sendto(Socket, Bin, Dest, Flags, Timeout); -sendto(#socket{ref = SockRef}, Data, #{family := Fam} = Dest, Flags, Timeout) - when is_binary(Data) andalso - ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso - is_list(Flags) andalso - ((Timeout =:= nowait) orelse - (Timeout =:= infinity) orelse - (is_integer(Timeout) andalso (Timeout > 0))) -> - EFlags = enc_send_flags(Flags), - do_sendto(SockRef, Data, ensure_sockaddr(Dest), EFlags, Timeout). - -do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> - TS = timestamp(Timeout), - SendRef = make_ref(), - case nif_sendto(SockRef, SendRef, Data, Dest, EFlags) of - ok -> - %% We are done - ok; - - {ok, Written} when (Timeout =:= nowait) -> - <<_:Written/binary, Rest/binary>> = Data, - {ok, {Rest, ?SELECT_INFO(sendto, SendRef)}}; - - - {ok, Written} -> - %% We are partially done, wait for continuation - receive - {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} - when (Written > 0) -> - <<_:Written/binary, Rest/binary>> = Data, - do_sendto(SockRef, Rest, Dest, EFlags, - next_timeout(TS, Timeout)); - - {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} -> - do_sendto(SockRef, Data, Dest, EFlags, - next_timeout(TS, Timeout)); - - {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} -> - {error, Reason} - - after Timeout -> - cancel(SockRef, sendto, SendRef), - {error, timeout} - end; - - - {error, eagain} when (Timeout =:= nowait) -> - ?SELECT(sendto, SendRef); - - - {error, eagain} -> - receive - {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} -> - do_sendto(SockRef, Data, Dest, EFlags, - next_timeout(TS, Timeout)); - - {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} -> - {error, Reason} - - after Timeout -> - cancel(SockRef, sendto, SendRef), - {error, timeout} - end; - - {error, _} = ERROR -> +sendto(#socket{ref = SockRef}, Data, Dest, Flags, Timeout) + when is_binary(Data), is_list(Flags) -> + try + begin + To = ensure_sockaddr(Dest), + EFlags = enc_send_flags(Flags), + Deadline = deadline(Timeout), + send_common(SockRef, Data, To, EFlags, Deadline, sendto) + end + catch + throw:ERROR -> ERROR end. - %% --------------------------------------------------------------------------- %% %% The only part of the msghdr() that *must* exist (a connected @@ -1837,9 +1793,13 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> %% used when sending. %% --spec sendmsg(Socket, MsgHdr) -> ok | {error, Reason} when +-spec sendmsg(Socket, MsgHdr) -> + ok | + {ok, Remaining} | + {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), + Remaining :: erlang:iovec(), Reason :: term(). sendmsg(Socket, MsgHdr) -> @@ -1851,15 +1811,16 @@ sendmsg(Socket, MsgHdr) -> Socket :: socket(), MsgHdr :: msghdr(), Flags :: send_flags(), - Reason :: term() - ; (Socket, MsgHdr, Timeout :: nowait) -> ok | - {select, SelectInfo} | - {error, Reason} when + Reason :: term(); + (Socket, MsgHdr, Timeout :: nowait) -> + ok | + {ok, Remaining} | + {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), - SelectInfo :: select_info(), - Reason :: term() - ; (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when + Remaining :: erlang:iovec(), + Reason :: term(); + (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), Timeout :: timeout(), @@ -1867,21 +1828,18 @@ sendmsg(Socket, MsgHdr) -> sendmsg(Socket, MsgHdr, Flags) when is_list(Flags) -> sendmsg(Socket, MsgHdr, Flags, ?ESOCK_SENDMSG_TIMEOUT_DEFAULT); -sendmsg(Socket, MsgHdr, Timeout) - when is_integer(Timeout) orelse (Timeout =:= infinity) -> +sendmsg(Socket, MsgHdr, Timeout) -> sendmsg(Socket, MsgHdr, ?ESOCK_SENDMSG_FLAGS_DEFAULT, Timeout). -spec sendmsg(Socket, MsgHdr, Flags, nowait) -> ok | {ok, Remaining} | - {select, SelectInfo} | {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), Flags :: send_flags(), Remaining :: erlang:iovec(), - SelectInfo :: select_info(), Reason :: term() ; (Socket, MsgHdr, Flags, Timeout) -> ok | @@ -1895,25 +1853,24 @@ sendmsg(Socket, MsgHdr, Timeout) Reason :: term(). sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout) - when is_list(IOV) andalso - is_list(Flags) andalso - ((Timeout =:= nowait) orelse - (Timeout =:= infinity) orelse - (is_integer(Timeout) andalso (Timeout > 0))) -> - try ensure_msghdr(MsgHdr) of - M -> + when is_list(IOV), is_list(Flags) -> + try + begin + M = ensure_msghdr(MsgHdr), EFlags = enc_send_flags(Flags), - do_sendmsg(SockRef, M, EFlags, Timeout) + Deadline = deadline(Timeout), + do_sendmsg(SockRef, M, EFlags, Deadline) + end catch - throw:T -> - T; - error:Reason -> - {error, Reason} + throw:ERROR -> + ERROR end. -do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> - TS = timestamp(Timeout), + +do_sendmsg(SockRef, MsgHdr, EFlags, Deadline) -> + SendRef = make_ref(), + case nif_sendmsg(SockRef, SendRef, MsgHdr, EFlags) of ok -> %% We are done @@ -1925,28 +1882,39 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> %% be able to handle a message being split. Leave it to %% the caller to figure out (call again with the rest). %% - %% We should really not need to cancel, since this is - %% accepted for sendmsg! + %% We need to cancel this partial write. %% - cancel(SockRef, sendmsg, SendRef), + _ = cancel(SockRef, sendmsg, SendRef), {ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)}; - {error, eagain} when (Timeout =:= nowait) -> + {error, exbusy} = Error when Deadline =:= nowait -> Error; + + {error, exbusy = Reason} -> + %% Internal error: + %% we called send, got eagain, and called send again + %% - without waiting for select message + erlang:error(Reason); + + + {error, eagain} when (Deadline =:= nowait) -> ?SELECT(sendmsg, SendRef); - {error, eagain} -> + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, SendRef} -> - do_sendmsg(SockRef, MsgHdr, EFlags, - next_timeout(TS, Timeout)) + do_sendmsg(SockRef, MsgHdr, EFlags, Deadline); + + {?ESOCK_TAG, _Socket, abort, {SendRef, Reason}} -> + {error, Reason} after Timeout -> - cancel(SockRef, sendmsg, SendRef), + _ = cancel(SockRef, sendmsg, SendRef), {error, timeout} end; + {error, _} = ERROR -> ERROR end. @@ -1970,7 +1938,6 @@ ensure_msghdr(_) -> - %% =========================================================================== %% %% recv, recvfrom, recvmsg - receive a message from a socket @@ -2062,154 +2029,134 @@ recv(Socket, Length, Timeout) -> Reason :: term(). recv(#socket{ref = SockRef}, Length, Flags, Timeout) - when (is_integer(Length) andalso (Length >= 0)) andalso - is_list(Flags) andalso - (is_integer(Timeout) orelse - (Timeout =:= infinity) orelse - (Timeout =:= nowait)) -> - EFlags = enc_recv_flags(Flags), - do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout). - -%% We need to pass the "old recv ref" around because of the special case -%% with Length = 0. This case makes it neccessary to have a timeout function -%% clause since we may never wait for anything (no receive select), and so the -%% the only timeout check will be the function clause. -%% Note that the Timeout value of 'nowait' has a special meaning. It means + when is_integer(Length), Length >= 0, is_list(Flags) -> + try + EFlags = enc_recv_flags(Flags), + Deadline = deadline(Timeout), + do_recv(SockRef, Length, EFlags, Deadline, <<>>) + catch + throw:ERROR -> + ERROR + end. + +%% We will only recurse with Length == 0 if Length is 0, +%% so Length == 0 means to return all available data also when recursing +%% +%% Note that the Deadline value of 'nowait' has a special meaning. It means %% that we will either return with data or with the with {error, NNNN}. In %% wich case the caller will receive a select message at some later time. -do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) - when (Timeout =:= nowait) orelse - (Timeout =:= infinity) orelse - (is_integer(Timeout) andalso (Timeout > 0)) -> - TS = timestamp(Timeout), +%% +do_recv(SockRef, Length, EFlags, Deadline, Acc) -> + RecvRef = make_ref(), case nif_recv(SockRef, RecvRef, Length, EFlags) of - {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> - {ok, Bin}; + {ok, true = _Complete, Bin} -> - {ok, <<Acc/binary, Bin/binary>>}; + {ok, bincat(Acc, Bin)}; + %% It depends on the amount of bytes we tried to read: %% 0 - Read everything available %% We got something, but there may be more - keep reading. %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) - {ok, false = _Complete, Bin} when (Length =:= 0) -> - do_recv(SockRef, RecvRef, - Length, EFlags, - <<Acc/binary, Bin/binary>>, - next_timeout(TS, Timeout)); - + {ok, false = _Complete, Bin} when Length =:= 0 -> + Timeout = timeout(Deadline), + if + 0 < Timeout -> + do_recv( + SockRef, Length, EFlags, Deadline, bincat(Acc, Bin)); + true -> + {ok, bincat(Acc, Bin)} + end; %% Did not get all the user asked for, but the user also %% specified 'nowait', so deliver what we got and the %% select info. - {ok, false = _Completed, Bin} when (Timeout =:= nowait) andalso - (size(Acc) =:= 0) -> - {ok, {Bin, ?SELECT_INFO(recv, RecvRef)}}; + {ok, false = _Completed, Bin} when Deadline =:= nowait -> + {ok, {bincat(Acc, Bin), ?SELECT_INFO(recv, RecvRef)}}; - - {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> - %% We got the first chunk of it. - %% We will be notified (select message) when there - %% is more to read. - NewTimeout = next_timeout(TS, Timeout), + {ok, false = _Completed, Bin} -> + %% We got a chunk of it! + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} -> - do_recv(SockRef, RecvRef, - Length-size(Bin), EFlags, - Bin, - next_timeout(TS, Timeout)); + if + 0 < Timeout -> + do_recv( + SockRef, Length - byte_size(Bin), EFlags, + Deadline, bincat(Acc, Bin)); + true -> + {error, {timeout, bincat(Acc, Bin)}} + end; {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> + after Timeout -> cancel(SockRef, recv, RecvRef), - {error, {timeout, Acc}} + {error, {timeout, bincat(Acc, Bin)}} end; - {ok, false = _Completed, Bin} -> - %% We got a chunk of it! - NewTimeout = next_timeout(TS, Timeout), - receive - {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} -> - do_recv(SockRef, RecvRef, - Length-size(Bin), EFlags, - <<Acc/binary, Bin/binary>>, - next_timeout(TS, Timeout)); - {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} -> - {error, Reason} + {error, exbusy} = Error when Deadline =:= nowait -> Error; - after NewTimeout -> - cancel(SockRef, recv, RecvRef), - {error, {timeout, Acc}} - end; + {error, exbusy = Reason} -> + %% Internal error: + %% we called recv, got eagain, and called recv again + %% - without waiting for select message + erlang:error(Reason); %% The user does not want to wait! %% The user will be informed that there is something to read %% via the select socket message (see below). - - {error, eagain} when (Timeout =:= nowait) andalso (size(Acc) =:= 0) -> - ?SELECT(recv, RecvRef); - {error, eagain} when (Timeout =:= nowait) -> - {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}}; + {error, eagain} when Deadline =:= nowait -> + if + byte_size(Acc) =:= 0 -> + ?SELECT(recv, RecvRef); + true -> + {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}} + end; %% We return with the accumulated binary (if its non-empty) - {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> - %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR? + {error, eagain} when Length =:= 0, 0 < byte_size(Acc) -> + cancel(SockRef, recv, RecvRef), {ok, Acc}; {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - NewTimeout = next_timeout(TS, Timeout), + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} -> - do_recv(SockRef, RecvRef, - Length, EFlags, - Acc, - next_timeout(TS, Timeout)); + if + 0 < Timeout -> + do_recv( + SockRef, Length, EFlags, Deadline, Acc); + 0 < byte_size(Acc) -> + {error, {timeout, Acc}}; + true -> + {error, timeout} + end; {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> + after Timeout -> cancel(SockRef, recv, RecvRef), {error, timeout} end; - {error, closed = Reason} -> - do_close(SockRef), - if - (size(Acc) =:= 0) -> - {error, Reason}; - true -> - {error, {Reason, Acc}} - end; - {error, _} = ERROR when (size(Acc) =:= 0) -> + {error, _} = ERROR when byte_size(Acc) =:= 0 -> ERROR; {error, Reason} -> {error, {Reason, Acc}} - end; - -do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> - %% The current recv operation is to be cancelled, so no need for a ref... - %% The cancel will end our 'read everything you have' and "activate" - %% any waiting reader. - cancel(SockRef, recv, RecvRef), - {ok, Acc}; -do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) - when (size(Acc) > 0) -> - {error, {timeout, Acc}}; -do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> - {error, timeout}. + end. @@ -2324,43 +2271,54 @@ recvfrom(Socket, BufSz, Timeout) -> Reason :: term(). recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout) - when (is_integer(BufSz) andalso (BufSz >= 0)) andalso - is_list(Flags) andalso - (is_integer(Timeout) orelse - (Timeout =:= infinity) orelse - (Timeout =:= nowait)) -> - EFlags = enc_recv_flags(Flags), - do_recvfrom(SockRef, BufSz, EFlags, Timeout). - -do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> - TS = timestamp(Timeout), + when is_integer(BufSz), 0 =< BufSz, is_list(Flags) -> + try + EFlags = enc_recv_flags(Flags), + Deadline = deadline(Timeout), + do_recvfrom(SockRef, BufSz, EFlags, Deadline) + catch + throw:ERROR -> + ERROR + end. + +do_recvfrom(SockRef, BufSz, EFlags, Deadline) -> + RecvRef = make_ref(), case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of + {ok, {_Source, _NewData}} = OK -> OK; - {error, eagain} when (Timeout =:= nowait) -> - ?SELECT(recvfrom, RecvRef); + {error, exbusy} = Error when Deadline =:= nowait -> Error; + {error, exbusy = Reason} -> + %% Internal error: + %% we called recvfrom, got eagain, and called recvfrom again + %% - without waiting for select message + erlang:error(Reason); + + + {error, eagain} when Deadline =:= nowait -> + ?SELECT(recvfrom, RecvRef); {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - NewTimeout = next_timeout(TS, Timeout), + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} -> - do_recvfrom(SockRef, BufSz, EFlags, - next_timeout(TS, Timeout)); + do_recvfrom(SockRef, BufSz, EFlags, Deadline); {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> + after Timeout -> cancel(SockRef, recvfrom, RecvRef), {error, timeout} end; + {error, _Reason} = ERROR -> ERROR @@ -2425,7 +2383,7 @@ recvmsg(Socket, Timeout) -> recvmsg(Socket, Flags, Timeout) when is_list(Flags) -> recvmsg(Socket, 0, 0, Flags, Timeout); -recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) -> +recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz), is_integer(CtrlSz) -> recvmsg(Socket, BufSz, CtrlSz, ?ESOCK_RECV_FLAGS_DEFAULT, ?ESOCK_RECV_TIMEOUT_DEFAULT). @@ -2454,47 +2412,55 @@ recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) Reason :: term(). recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout) - when (is_integer(BufSz) andalso (BufSz >= 0)) andalso - (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso - is_list(Flags) andalso - (is_integer(Timeout) orelse - (Timeout =:= infinity) orelse - (Timeout =:= nowait)) -> - EFlags = enc_recv_flags(Flags), - do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout). - -do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> - TS = timestamp(Timeout), + when is_integer(BufSz), 0 =< BufSz, + is_integer(CtrlSz), 0 =< CtrlSz, + is_list(Flags) -> + try + EFlags = enc_recv_flags(Flags), + Deadline = deadline(Timeout), + do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline) + catch + throw:ERROR -> + ERROR + end. + +do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline) -> + RecvRef = make_ref(), case nif_recvmsg(SockRef, RecvRef, BufSz, CtrlSz, EFlags) of + {ok, _MsgHdr} = OK -> OK; - {error, eagain} when (Timeout =:= nowait) -> - ?SELECT(recvmsg, RecvRef); + {error, exbusy} = Error when Deadline =:= nowait -> Error; + {error, exbusy = Reason} -> + %% Internal error: + %% we called recvmsg, got eagain, and called recvmsg again + %% - without waiting for select message + erlang:error(Reason); + + + {error, eagain} when Deadline =:= nowait -> + ?SELECT(recvmsg, RecvRef); {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - NewTimeout = next_timeout(TS, Timeout), + Timeout = timeout(Deadline), receive {?ESOCK_TAG, #socket{ref = SockRef}, select, RecvRef} -> - do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, - next_timeout(TS, Timeout)); + do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Deadline); {?ESOCK_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> + after Timeout -> cancel(SockRef, recvmsg, RecvRef), {error, timeout} end; - {error, closed} = ERROR -> - do_close(SockRef), - ERROR; {error, _Reason} = ERROR -> ERROR @@ -2503,7 +2469,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> - %% =========================================================================== %% %% close - close a file descriptor @@ -2523,9 +2488,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> Reason :: term(). close(#socket{ref = SockRef}) -> - do_close(SockRef). - -do_close(SockRef) -> case nif_close(SockRef) of ok -> nif_finalize_close(SockRef); @@ -2555,10 +2517,7 @@ do_close(SockRef) -> shutdown(#socket{ref = SockRef}, How) -> try - begin - EHow = enc_shutdown_how(How), - nif_shutdown(SockRef, EHow) - end + nif_shutdown(SockRef, enc_shutdown_how(How)) catch throw:T -> T; @@ -2627,23 +2586,15 @@ shutdown(#socket{ref = SockRef}, How) -> setopt(#socket{ref = SockRef}, Level, Key, Value) -> try begin - Domain = which_domain(SockRef), - Type = which_type(SockRef), - Protocol = which_protocol(SockRef), + {Domain, Type, Proto} = which_dtp(SockRef), {EIsEncoded, ELevel} = enc_setopt_level(Level), - EKey = enc_setopt_key(Level, Key, Domain, Type, Protocol), - EVal = enc_setopt_value(Level, Key, Value, Domain, Type, Protocol), + EKey = enc_setopt_key(Level, Key, Domain, Type, Proto), + EVal = enc_setopt_value(Level, Key, Value, Domain, Type, Proto), nif_setopt(SockRef, EIsEncoded, ELevel, EKey, EVal) end catch - throw:T -> - T; - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> - error:Reason -> - {error, Reason} % Process more? + throw:ERROR -> + ERROR end. @@ -2703,33 +2654,26 @@ setopt(#socket{ref = SockRef}, Level, Key, Value) -> getopt(#socket{ref = SockRef}, Level, Key) -> try begin - Domain = which_domain(SockRef), - Type = which_type(SockRef), - Protocol = which_protocol(SockRef), + {Domain, Type, Proto} = which_dtp(SockRef), {EIsEncoded, ELevel} = enc_getopt_level(Level), - EKey = enc_getopt_key(Level, Key, Domain, Type, Protocol), + EKey = enc_getopt_key(Level, Key, Domain, Type, Proto), %% We may need to decode the value (for the same reason %% we (may have) needed to encode the value for setopt). case nif_getopt(SockRef, EIsEncoded, ELevel, EKey) of ok -> ok; {ok, EVal} -> - Val = dec_getopt_value(Level, Key, EVal, - Domain, Type, Protocol), + Val = + dec_getopt_value( + Level, Key, EVal, Domain, Type, Proto), {ok, Val}; - {error, _} = ERROR -> - ERROR + {error, _} = E -> + E end end catch - throw:E:_S -> - E; - %% <WIN32-TEMPORARY> - error:notsup:S -> - erlang:raise(error, notsup, S); - %% </WIN32-TEMPORARY> - error:Reason:_Stack -> - {error, Reason} % Process more? + throw:ERROR -> + ERROR end. @@ -2744,41 +2688,75 @@ which_domain(SockRef) -> case nif_getopt(SockRef, true, ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_DOMAIN) of {ok, Domain} -> - Domain; + if + is_atom(Domain) -> + Domain; + is_integer(Domain) -> + invalid_domain(Domain) + end; {error, _} = ERROR -> throw(ERROR) end. --spec which_type(SockRef) -> Type when - SockRef :: reference(), - Type :: type(). - -which_type(SockRef) -> - case nif_getopt(SockRef, true, - ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_TYPE) of - {ok, Type} -> - Type; - {error, _} = ERROR -> - throw(ERROR) - end. - --spec which_protocol(SockRef) -> Protocol when - SockRef :: reference(), - Protocol :: protocol(). - -which_protocol(SockRef) -> - case nif_getopt(SockRef, true, - ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_PROTOCOL) of - {ok, Proto} -> - Proto; +%%%-spec which_type(SockRef) -> Type when +%%% SockRef :: reference(), +%%% Type :: type(). +%%% +%%%which_type(SockRef) -> +%%% case nif_getopt(SockRef, true, +%%% ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_TYPE) of +%%% {ok, Type} -> +%%% if +%%% is_atom(Type) -> +%%% Type; +%%% is_integer(Type) -> +%%% invalid_type(Type) +%%% end; +%%% {error, _} = ERROR -> +%%% throw(ERROR) +%%% end. +%%% +%%%-spec which_protocol(SockRef) -> Protocol when +%%% SockRef :: reference(), +%%% Protocol :: protocol(). +%%% +%%%which_protocol(SockRef) -> +%%% case nif_getopt(SockRef, true, +%%% ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_PROTOCOL) of +%%% {ok, Proto} -> +%%% if +%%% is_atom(Proto) -> +%%% Proto; +%%% is_integer(Proto) -> +%%% invalid_protocol(Proto) +%%% end; +%%% {error, _} = ERROR -> +%%% throw(ERROR) +%%% end. + +which_dtp(SockRef) -> + case + nif_getopt( + SockRef, true, ?ESOCK_OPT_LEVEL_OTP, ?ESOCK_OPT_OTP_DTP) + of + {ok, {Domain, Type, Proto} = DTP} -> + if + is_integer(Domain) -> + invalid_domain(Domain); + is_integer(Type) -> + invalid_type(Type); + is_integer(Proto) -> + invalid_protocol(Proto); + is_atom(Domain), is_atom(Type), is_atom(Proto) -> + DTP + end; {error, _} = ERROR -> throw(ERROR) end. - %% =========================================================================== %% %% sockname - return the current address of the socket. @@ -2953,7 +2931,6 @@ enc_setopt_key(Level, Opt, Domain, Type, Protocol) -> %% encode the value into an more "manageable" type. %% It also handles "aliases" (see linger). --dialyzer({nowarn_function, enc_setopt_value/6}). -spec enc_setopt_value(otp, otp_socket_option(), Value, Domain, Type, Protocol) -> term() when Value :: term(), @@ -3421,12 +3398,6 @@ enc_getopt_key(Level, Opt, Domain, Type, Protocol) -> %% For the most part, we simply let the value pass through, but for some %% values we may need to do an actual decode. %% -%% For some reason dialyzer thinks that the only valid value for Opt to -%% this function is otp_socket_option(). Of course without explaining -%% how it came to that conclusion... And since I know that to be false... -%% - --dialyzer({nowarn_function, dec_getopt_value/6}). %% This string is NULL-terminated, but the general function we use %% in the nif code does not know that. So, deal with it here. @@ -3443,78 +3414,77 @@ dec_getopt_value(_L, _Opt, V, _D, _T, _P) -> %% Most options are usable both for set and get, but some are %% are only available for e.g. get. --spec enc_sockopt_key(Level, Opt, - Direction, +-spec enc_sockopt_key(Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: otp, Direction :: set | get, Opt :: otp_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, + Protocol :: protocol(); + (Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: socket, Direction :: set | get, Opt :: socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, + Protocol :: protocol(); + (Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: ip, Direction :: set | get, Opt :: ip_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, + Protocol :: protocol(); + (Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: ipv6, Direction :: set | get, Opt :: ipv6_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, - Domain, Type, Protocol) -> non_neg_integer() when + Protocol :: protocol(); + (Level, Opt, Direction, + Domain, Type, Protocol) -> non_neg_integer() when Level :: tcp, Direction :: set | get, Opt :: tcp_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, + Protocol :: protocol(); + (Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: udp, Direction :: set | get, Opt :: udp_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, + Protocol :: protocol(); + (Level, Opt, Direction, Domain, Type, Protocol) -> non_neg_integer() when Level :: sctp, Direction :: set | get, Opt :: sctp_socket_option(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, - Domain, Type, Protocol) -> non_neg_integer() when + Protocol :: protocol(); + (Level, Opt, Direction, + Domain, Type, Protocol) -> non_neg_integer() when Level :: integer(), Direction :: set, Opt :: integer(), Domain :: domain(), Type :: type(), - Protocol :: protocol() - ; (Level, Direction, Opt, - Domain, Type, Protocol) -> non_neg_integer() when + Protocol :: protocol(); + (Level, Opt, Direction, + Domain, Type, Protocol) -> {NativeOpt, ValueSize} when Level :: integer(), Direction :: get, Opt :: {NativeOpt, ValueSize}, NativeOpt :: integer(), - ValueSize :: non_neg_integer(), + ValueSize :: non_neg_integer() | 'int' | 'bool', Domain :: domain(), Type :: type(), Protocol :: protocol(). @@ -3934,8 +3904,8 @@ ensure_sockaddr(#{family := local, path := Path} = SockAddr) (byte_size(Path) > 0) andalso (byte_size(Path) =< 255) -> SockAddr; -ensure_sockaddr(_SockAddr) -> - einval(). +ensure_sockaddr(SockAddr) -> + invalid_address(SockAddr). @@ -3943,19 +3913,29 @@ cancel(SockRef, Op, OpRef) -> case nif_cancel(SockRef, Op, OpRef) of %% The select has already completed {error, select_sent} -> - flush_select_msgs(SockRef, OpRef); + flush_select_msg(SockRef, OpRef), + _ = flush_abort_msg(SockRef, OpRef), + ok; Other -> + _ = flush_abort_msg(SockRef, OpRef), Other end. -flush_select_msgs(SockRef, Ref) -> +flush_select_msg(SockRef, Ref) -> receive {?ESOCK_TAG, #socket{ref = SockRef}, select, Ref} -> - flush_select_msgs(SockRef, Ref) + ok after 0 -> ok end. +flush_abort_msg(SockRef, Ref) -> + receive + {?ESOCK_TAG, #socket{ref = SockRef}, abort, {Ref, Reason}} -> + Reason + after 0 -> + ok + end. %% formated_timestamp() -> %% format_timestamp(os:timestamp()). @@ -3983,34 +3963,37 @@ flush_select_msgs(SockRef, Ref) -> %% lists:flatten(FormatDate). -%% A timestamp in ms +deadline(Timeout) -> + case Timeout of + nowait -> Timeout; + infinity -> Timeout; + _ when is_integer(Timeout), 0 =< Timeout -> + timestamp() + Timeout; + _ -> + invalid_timeout(Timeout) + end. -timestamp(nowait = T) -> - T; -timestamp(infinity) -> - undefined; -timestamp(_) -> - timestamp(). +timeout(Deadline) -> + case Deadline of + nowait -> 0; + infinity -> infinity; + _ -> + Now = timestamp(), + if + Now < Deadline -> Deadline - Now; + true -> 0 + end + end. timestamp() -> erlang:monotonic_time(milli_seconds). -next_timeout(_, nowait = Timeout) -> - Timeout; -next_timeout(_, infinity = Timeout) -> - Timeout; -next_timeout(TS, Timeout) -> - NewTimeout = Timeout - tdiff(TS, timestamp()), - if - (NewTimeout > 0) -> - NewTimeout; - true -> - 0 - end. - -tdiff(T1, T2) -> - T2 - T1. +-compile({inline, [bincat/2]}). +bincat(<<>>, <<_/binary>> = B) -> B; +bincat(<<_/binary>> = A, <<>>) -> A; +bincat(<<_/binary>> = A, <<_/binary>> = B) -> + <<A/binary, B/binary>>. %% p(F) -> @@ -4036,42 +4019,45 @@ tdiff(T1, T2) -> -spec invalid_domain(Domain) -> no_return() when Domain :: term(). - invalid_domain(Domain) -> error({invalid_domain, Domain}). -spec invalid_type(Type) -> no_return() when Type :: term(). - invalid_type(Type) -> error({invalid_type, Type}). -spec invalid_protocol(Proto) -> no_return() when Proto :: term(). - invalid_protocol(Proto) -> error({invalid_protocol, Proto}). +-spec invalid_address(SockAddr) -> no_return() when + SockAddr :: term(). +invalid_address(SockAddr) -> + error({invalid_address, SockAddr}). + +-spec invalid_timeout(Timeout) -> no_return() when + Timeout :: term(). +invalid_timeout(Timeout) -> + error({invalid_timeout, Timeout}). + -spec not_supported(What) -> no_return() when What :: term(). - not_supported(What) -> error({not_supported, What}). -spec unknown(What) -> no_return() when What :: term(). - unknown(What) -> error({unknown, What}). -spec einval() -> no_return(). - einval() -> error(einval). -spec error(Reason) -> no_return() when Reason :: term(). - error(Reason) -> throw({error, Reason}). |