diff options
-rw-r--r-- | src/channel.c | 163 | ||||
-rw-r--r-- | src/json.c | 7 | ||||
-rw-r--r-- | src/structs.h | 8 | ||||
-rw-r--r-- | src/testdir/test_channel.py | 27 | ||||
-rw-r--r-- | src/testdir/test_channel.vim | 9 | ||||
-rw-r--r-- | src/version.c | 2 |
6 files changed, 181 insertions, 35 deletions
diff --git a/src/channel.c b/src/channel.c index 0d3f452d5..b3115adb7 100644 --- a/src/channel.c +++ b/src/channel.c @@ -1362,7 +1362,7 @@ channel_collapse(channel_T *channel, int part) * Returns OK or FAIL. */ static int -channel_save(channel_T *channel, int part, char_u *buf, int len) +channel_save(channel_T *channel, int part, char_u *buf, int len, char *lead) { readq_T *node; readq_T *head = &channel->ch_part[part].ch_head; @@ -1403,9 +1403,9 @@ channel_save(channel_T *channel, int part, char_u *buf, int len) head->rq_prev->rq_next = node; head->rq_prev = node; - if (log_fd != NULL) + if (log_fd != NULL && lead != NULL) { - ch_log_lead("RECV ", channel); + ch_log_lead(lead, channel); fprintf(log_fd, "'"); if (fwrite(buf, len, 1, log_fd) != 1) return FAIL; @@ -1415,7 +1415,7 @@ channel_save(channel_T *channel, int part, char_u *buf, int len) } /* - * Use the read buffer of "channel"/"part" and parse a JSON messages that is + * Use the read buffer of "channel"/"part" and parse a JSON message that is * complete. The messages are added to the queue. * Return TRUE if there is more to read. */ @@ -1425,7 +1425,9 @@ channel_parse_json(channel_T *channel, int part) js_read_T reader; typval_T listtv; jsonq_T *item; - jsonq_T *head = &channel->ch_part[part].ch_json_head; + chanpart_T *chanpart = &channel->ch_part[part]; + jsonq_T *head = &chanpart->ch_json_head; + int status; int ret; if (channel_peek(channel, part) == NULL) @@ -1438,15 +1440,23 @@ channel_parse_json(channel_T *channel, int part) reader.js_fill = NULL; /* reader.js_fill = channel_fill; */ reader.js_cookie = channel; - ret = json_decode(&reader, &listtv, - channel->ch_part[part].ch_mode == MODE_JS ? JSON_JS : 0); - if (ret == OK) + + /* When a message is incomplete we wait for a short while for more to + * arrive. After the delay drop the input, otherwise a truncated string + * or list will make us hang. */ + status = json_decode(&reader, &listtv, + chanpart->ch_mode == MODE_JS ? JSON_JS : 0); + if (status == OK) { /* Only accept the response when it is a list with at least two * items. */ if (listtv.v_type != VAR_LIST || listtv.vval.v_list->lv_len < 2) { - /* TODO: give error */ + if (listtv.v_type != VAR_LIST) + ch_error(channel, "Did not receive a list, discarding"); + else + ch_errorn(channel, "Expected list with two items, got %d", + listtv.vval.v_list->lv_len); clear_tv(&listtv); } else @@ -1477,22 +1487,71 @@ channel_parse_json(channel_T *channel, int part) } } - /* Put the unread part back into the channel. - * TODO: insert in front */ - if (reader.js_buf[reader.js_used] != NUL) + if (status == OK) + chanpart->ch_waiting = FALSE; + else if (status == MAYBE) { - if (ret == FAIL) + if (!chanpart->ch_waiting) { - ch_error(channel, "Decoding failed - discarding input"); - ret = FALSE; + /* First time encountering incomplete message, set a deadline of + * 100 msec. */ + ch_log(channel, "Incomplete message - wait for more"); + reader.js_used = 0; + chanpart->ch_waiting = TRUE; +#ifdef WIN32 + chanpart->ch_deadline = GetTickCount() + 100L; +#else + gettimeofday(&chanpart->ch_deadline, NULL); + chanpart->ch_deadline.tv_usec += 100 * 1000; + if (chanpart->ch_deadline.tv_usec > 1000 * 1000) + { + chanpart->ch_deadline.tv_usec -= 1000 * 1000; + ++chanpart->ch_deadline.tv_sec; + } +#endif } else { - channel_save(channel, part, reader.js_buf + reader.js_used, - (int)(reader.js_end - reader.js_buf) - reader.js_used); - ret = TRUE; + int timeout; +#ifdef WIN32 + timeout = GetTickCount() > chanpart->ch_deadline; +#else + { + struct timeval now_tv; + + gettimeofday(&now_tv, NULL); + timeout = now_tv.tv_sec > chanpart->ch_deadline.tv_sec + || (now_tv.tv_sec == chanpart->ch_deadline.tv_sec + && now_tv.tv_usec > chanpart->ch_deadline.tv_usec); + } +#endif + if (timeout) + { + status = FAIL; + chanpart->ch_waiting = FALSE; + } + else + { + reader.js_used = 0; + ch_log(channel, "still waiting on incomplete message"); + } } } + + if (status == FAIL) + { + ch_error(channel, "Decoding failed - discarding input"); + ret = FALSE; + chanpart->ch_waiting = FALSE; + } + else if (reader.js_buf[reader.js_used] != NUL) + { + /* Put the unread part back into the channel. + * TODO: insert in front */ + channel_save(channel, part, reader.js_buf + reader.js_used, + (int)(reader.js_end - reader.js_buf) - reader.js_used, NULL); + ret = status == MAYBE ? FALSE: TRUE; + } else ret = FALSE; @@ -1559,6 +1618,8 @@ channel_get_json(channel_T *channel, int part, int id, typval_T **rettv) || tv->vval.v_number != channel->ch_part[part].ch_block_id))) { *rettv = item->jq_value; + if (tv->v_type == VAR_NUMBER) + ch_logn(channel, "Getting JSON message %d", tv->vval.v_number); remove_json_node(head, item); return OK; } @@ -2289,7 +2350,7 @@ channel_read(channel_T *channel, int part, char *func) break; /* error or nothing more to read */ /* Store the read message in the queue. */ - channel_save(channel, part, buf, len); + channel_save(channel, part, buf, len, "RECV "); readlen += len; if (len < MAXMSGSIZE) break; /* did read everything that's available */ @@ -2316,7 +2377,7 @@ channel_read(channel_T *channel, int part, char *func) if (channel->ch_part[part].ch_mode == MODE_RAW || channel->ch_part[part].ch_mode == MODE_NL) channel_save(channel, part, (char_u *)DETACH_MSG_RAW, - (int)STRLEN(DETACH_MSG_RAW)); + (int)STRLEN(DETACH_MSG_RAW), "PUT "); /* TODO: When reading from stdout is not possible, should we try to * keep stdin and stderr open? Probably not, assume the other side @@ -2361,9 +2422,13 @@ channel_read_block(channel_T *channel, int part, int timeout) continue; /* Wait for up to the channel timeout. */ - if (fd == INVALID_FD - || channel_wait(channel, fd, timeout) == FAIL) + if (fd == INVALID_FD) + return NULL; + if (channel_wait(channel, fd, timeout) == FAIL) + { + ch_log(channel, "Timed out"); return NULL; + } channel_read(channel, part, "channel_read_block"); } @@ -2403,16 +2468,18 @@ channel_read_block(channel_T *channel, int part, int timeout) channel_read_json_block( channel_T *channel, int part, - int timeout, + int timeout_arg, int id, typval_T **rettv) { int more; sock_T fd; + int timeout; + chanpart_T *chanpart = &channel->ch_part[part]; ch_log(channel, "Reading JSON"); if (id != -1) - channel->ch_part[part].ch_block_id = id; + chanpart->ch_block_id = id; for (;;) { more = channel_parse_json(channel, part); @@ -2420,7 +2487,7 @@ channel_read_json_block( /* search for messsage "id" */ if (channel_get_json(channel, part, id, rettv) == OK) { - channel->ch_part[part].ch_block_id = 0; + chanpart->ch_block_id = 0; return OK; } @@ -2431,14 +2498,50 @@ channel_read_json_block( if (channel_parse_messages()) continue; - /* Wait for up to the timeout. */ - fd = channel->ch_part[part].ch_fd; + /* Wait for up to the timeout. If there was an incomplete message + * use the deadline for that. */ + timeout = timeout_arg; + if (chanpart->ch_waiting) + { +#ifdef WIN32 + timeout = chanpart->ch_deadline - GetTickCount() + 1; +#else + { + struct timeval now_tv; + + gettimeofday(&now_tv, NULL); + timeout = (chanpart->ch_deadline.tv_sec + - now_tv.tv_sec) * 1000 + + (chanpart->ch_deadline.tv_usec + - now_tv.tv_usec) / 1000 + + 1; + } +#endif + if (timeout < 0) + { + /* Something went wrong, channel_parse_json() didn't + * discard message. Cancel waiting. */ + chanpart->ch_waiting = FALSE; + timeout = timeout_arg; + } + else if (timeout > timeout_arg) + timeout = timeout_arg; + } + fd = chanpart->ch_fd; if (fd == INVALID_FD || channel_wait(channel, fd, timeout) == FAIL) - break; - channel_read(channel, part, "channel_read_json_block"); + { + if (timeout == timeout_arg) + { + if (fd != INVALID_FD) + ch_log(channel, "Timed out"); + break; + } + } + else + channel_read(channel, part, "channel_read_json_block"); } } - channel->ch_part[part].ch_block_id = 0; + chanpart->ch_block_id = 0; return FAIL; } diff --git a/src/json.c b/src/json.c index cd80a761e..9738fc5fe 100644 --- a/src/json.c +++ b/src/json.c @@ -877,8 +877,9 @@ json_decode_all(js_read_T *reader, typval_T *res, int options) /* * Decode the JSON from "reader" and store the result in "res". * "options" can be JSON_JS or zero; - * Return FAIL if the message has a decoding error or the message is - * truncated. Consumes the message anyway. + * Return FAIL for a decoding error. + * Return MAYBE for an incomplete message. + * Consumes the message anyway. */ int json_decode(js_read_T *reader, typval_T *res, int options) @@ -891,7 +892,7 @@ json_decode(js_read_T *reader, typval_T *res, int options) ret = json_decode_item(reader, res, options); json_skip_white(reader); - return ret == OK ? OK : FAIL; + return ret; } /* diff --git a/src/structs.h b/src/structs.h index 50e2a68e2..470beff3c 100644 --- a/src/structs.h +++ b/src/structs.h @@ -1357,6 +1357,14 @@ typedef struct { jsonq_T ch_json_head; /* header for circular json read queue */ int ch_block_id; /* ID that channel_read_json_block() is waiting for */ + /* When ch_waiting is TRUE use ch_deadline to wait for incomplete message + * to be complete. */ + int ch_waiting; +#ifdef WIN32 + DWORD ch_deadline; +#else + struct timeval ch_deadline; +#endif cbq_T ch_cb_head; /* dummy node for per-request callbacks */ char_u *ch_callback; /* call when a msg is not handled */ diff --git a/src/testdir/test_channel.py b/src/testdir/test_channel.py index 47a12ea7e..ca12007a2 100644 --- a/src/testdir/test_channel.py +++ b/src/testdir/test_channel.py @@ -104,11 +104,36 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): print("sending: {}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" - elif decoded[1] == 'malformed': + elif decoded[1] == 'malformed1': cmd = '["ex",":"]wrong!["ex","smi"]' print("sending: {}".format(cmd)) self.request.sendall(cmd.encode('utf-8')) response = "ok" + elif decoded[1] == 'malformed2': + cmd = '"unterminated string' + print("sending: {}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + # Need to wait for Vim to give up, otherwise the double + # quote in the "ok" response terminates the string. + time.sleep(0.2) + elif decoded[1] == 'malformed3': + cmd = '["ex","missing ]"' + print("sending: {}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" + # Need to wait for Vim to give up, otherwise the ] + # in the "ok" response terminates the list. + time.sleep(0.2) + elif decoded[1] == 'split': + cmd = '["ex","let ' + print("sending: {}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + time.sleep(0.01) + cmd = 'g:split = 123"]' + print("sending: {}".format(cmd)) + self.request.sendall(cmd.encode('utf-8')) + response = "ok" elif decoded[1] == 'an expr': # Send an expr request. cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]' diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim index c628bbe88..c4e23c97f 100644 --- a/src/testdir/test_channel.vim +++ b/src/testdir/test_channel.vim @@ -127,7 +127,14 @@ func s:communicate(port) call assert_equal('got it', ch_evalexpr(handle, 'hello!')) " Malformed command should be ignored. - call assert_equal('ok', ch_evalexpr(handle, 'malformed')) + call assert_equal('ok', ch_evalexpr(handle, 'malformed1')) + call assert_equal('ok', ch_evalexpr(handle, 'malformed2')) + call assert_equal('ok', ch_evalexpr(handle, 'malformed3')) + + " split command should work + call assert_equal('ok', ch_evalexpr(handle, 'split')) + call s:waitFor('exists("g:split")') + call assert_equal(123, g:split) " Request that triggers sending two ex commands. These will usually be " handled before getting the response, but it's not guaranteed, thus wait a diff --git a/src/version.c b/src/version.c index c914ec479..491ded321 100644 --- a/src/version.c +++ b/src/version.c @@ -749,6 +749,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 1617, +/**/ 1616, /**/ 1615, |