diff options
-rw-r--r-- | src/channel.c | 109 | ||||
-rw-r--r-- | src/proto/channel.pro | 7 | ||||
-rw-r--r-- | src/testdir/Make_all.mak | 1 | ||||
-rw-r--r-- | src/testdir/test_channel.py | 110 | ||||
-rw-r--r-- | src/testdir/test_channel.vim | 53 | ||||
-rw-r--r-- | src/version.c | 2 |
6 files changed, 242 insertions, 40 deletions
diff --git a/src/channel.c b/src/channel.c index 7183ff61f..2f7403a85 100644 --- a/src/channel.c +++ b/src/channel.c @@ -523,19 +523,21 @@ channel_collapse(int idx) } /* - * Use the read buffer of channel "ch_idx" and parse JSON messages that are + * Use the read buffer of channel "ch_idx" and parse a JSON messages that is * complete. The messages are added to the queue. + * Return TRUE if there is more to read. */ - void -channel_read_json(int ch_idx) + static int +channel_parse_json(int ch_idx) { js_read_T reader; typval_T listtv; jsonq_T *item; jsonq_T *head = &channels[ch_idx].ch_json_head; + int ret; if (channel_peek(ch_idx) == NULL) - return; + return FALSE; /* TODO: make reader work properly */ /* reader.js_buf = channel_peek(ch_idx); */ @@ -544,26 +546,35 @@ channel_read_json(int ch_idx) reader.js_fill = NULL; /* reader.js_fill = channel_fill; */ reader.js_cookie = &ch_idx; - if (json_decode(&reader, &listtv) == OK) + ret = json_decode(&reader, &listtv); + if (ret == OK) { - item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T)); - if (item == NULL) + if (listtv.v_type != VAR_LIST) + { + /* TODO: give error */ clear_tv(&listtv); + } else { - item->value = alloc_tv(); - if (item->value == NULL) - { - vim_free(item); + item = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T)); + if (item == NULL) clear_tv(&listtv); - } else { - *item->value = listtv; - item->prev = head->prev; - head->prev = item; - item->next = head; - item->prev->next = item; + item->value = alloc_tv(); + if (item->value == NULL) + { + vim_free(item); + clear_tv(&listtv); + } + else + { + *item->value = listtv; + item->prev = head->prev; + head->prev = item; + item->next = head; + item->prev->next = item; + } } } } @@ -571,9 +582,16 @@ channel_read_json(int ch_idx) /* Put the unread part back into the channel. * TODO: insert in front */ if (reader.js_buf[reader.js_used] != NUL) + { channel_save(ch_idx, reader.js_buf + reader.js_used, (int)(reader.js_end - reader.js_buf) - reader.js_used); + ret = TRUE; + } + else + ret = FALSE; + vim_free(reader.js_buf); + return ret; } /* @@ -607,7 +625,8 @@ channel_get_json(int ch_idx, int id, typval_T **rettv) typval_T *tv = &l->lv_first->li_tv; if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id) - || id <= 0) + || (id <= 0 + && (tv->v_type != VAR_NUMBER || tv->vval.v_number < 0))) { *rettv = item->value; remove_json_node(item); @@ -717,23 +736,19 @@ may_invoke_callback(int idx) int seq_nr = -1; int json_mode = channels[idx].ch_json_mode; - if (channel_peek(idx) == NULL) - return FALSE; if (channels[idx].ch_close_cb != NULL) /* this channel is handled elsewhere (netbeans) */ return FALSE; if (json_mode) { - /* Get any json message. Return if there isn't one. */ - channel_read_json(idx); + /* Get any json message in the queue. */ if (channel_get_json(idx, -1, &listtv) == FAIL) - return FALSE; - if (listtv->v_type != VAR_LIST) { - /* TODO: give error */ - clear_tv(listtv); - return FALSE; + /* Parse readahead, return when there is still no message. */ + channel_parse_json(idx); + if (channel_get_json(idx, -1, &listtv) == FAIL) + return FALSE; } list = listtv->vval.v_list; @@ -767,6 +782,11 @@ may_invoke_callback(int idx) } seq_nr = typetv->vval.v_number; } + else if (channel_peek(idx) == NULL) + { + /* nothing to read on raw channel */ + return FALSE; + } else { /* For a raw channel we don't know where the message ends, just get @@ -1080,19 +1100,29 @@ channel_read_block(int idx) int channel_read_json_block(int ch_idx, int id, typval_T **rettv) { + int more; + for (;;) { - channel_read_json(ch_idx); + more = channel_parse_json(ch_idx); /* search for messsage "id" */ if (channel_get_json(ch_idx, id, rettv) == OK) return OK; - /* Wait for up to 2 seconds. - * TODO: use timeout set on the channel. */ - if (channel_wait(channels[ch_idx].ch_fd, 2000) == FAIL) - break; - channel_read(ch_idx); + if (!more) + { + /* Handle any other messages in the queue. If done some more + * messages may have arrived. */ + if (channel_parse_messages()) + continue; + + /* Wait for up to 2 seconds. + * TODO: use timeout set on the channel. */ + if (channel_wait(channels[ch_idx].ch_fd, 2000) == FAIL) + break; + channel_read(ch_idx); + } } return FAIL; } @@ -1246,16 +1276,23 @@ channel_select_check(int ret_in, void *rfds_in) # endif /* !FEAT_GUI_W32 && HAVE_SELECT */ /* - * Invoked from the main loop when it's save to execute received commands. + * Execute queued up commands. + * Invoked from the main loop when it's safe to execute received commands. + * Return TRUE when something was done. */ - void + int channel_parse_messages(void) { int i; + int ret = FALSE; for (i = 0; i < channel_count; ++i) while (may_invoke_callback(i) == OK) - ; + { + i = 0; /* start over */ + ret = TRUE; + } + return ret; } #endif /* FEAT_CHANNEL */ diff --git a/src/proto/channel.pro b/src/proto/channel.pro index 16946eb79..f53ac6680 100644 --- a/src/proto/channel.pro +++ b/src/proto/channel.pro @@ -4,23 +4,22 @@ int channel_open(char *hostname, int port_in, void (*close_cb)(void)); void channel_set_json_mode(int idx, int json_mode); void channel_set_callback(int idx, char_u *callback); void channel_set_req_callback(int idx, char_u *callback); +char_u *channel_get(int idx); +int channel_collapse(int idx); int channel_is_open(int idx); void channel_close(int idx); int channel_save(int idx, char_u *buf, int len); char_u *channel_peek(int idx); -char_u *channel_get(int idx); -int channel_collapse(int idx); void channel_clear(int idx); int channel_get_id(void); void channel_read(int idx); char_u *channel_read_block(int idx); int channel_read_json_block(int ch_idx, int id, typval_T **rettv); -void channel_read_json(int ch_idx); int channel_socket2idx(sock_T fd); int channel_send(int idx, char_u *buf, char *fun); int channel_poll_setup(int nfd_in, void *fds_in); int channel_poll_check(int ret_in, void *fds_in); int channel_select_setup(int maxfd_in, void *rfds_in); int channel_select_check(int ret_in, void *rfds_in); -void channel_parse_messages(void); +int channel_parse_messages(void); /* vim: set ft=c : */ diff --git a/src/testdir/Make_all.mak b/src/testdir/Make_all.mak index d7a06eeb4..bb92904bf 100644 --- a/src/testdir/Make_all.mak +++ b/src/testdir/Make_all.mak @@ -171,6 +171,7 @@ SCRIPTS_GUI = test16.out NEW_TESTS = test_arglist.res \ test_assert.res \ test_cdo.res \ + test_channel.res \ test_hardcopy.res \ test_increment.res \ test_langmap.res \ diff --git a/src/testdir/test_channel.py b/src/testdir/test_channel.py new file mode 100644 index 000000000..a706243e4 --- /dev/null +++ b/src/testdir/test_channel.py @@ -0,0 +1,110 @@ +#!/usr/bin/python +# +# Server that will accept connections from a Vim channel. +# Run this server and then in Vim you can open the channel: +# :let handle = ch_open('localhost:8765', 'json') +# +# Then Vim can send requests to the server: +# :let response = ch_sendexpr(handle, 'hello!') +# +# And you can control Vim by typing a JSON message here, e.g.: +# ["ex","echo 'hi there'"] +# +# There is no prompt, just type a line and press Enter. +# To exit cleanly type "quit<Enter>". +# +# See ":help channel-demo" in Vim. +# +# This requires Python 2.6 or later. + +from __future__ import print_function +import json +import socket +import sys +import threading + +try: + # Python 3 + import socketserver +except ImportError: + # Python 2 + import SocketServer as socketserver + +thesocket = None + +class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): + + def handle(self): + print("=== socket opened ===") + global thesocket + thesocket = self.request + while True: + try: + data = self.request.recv(4096).decode('utf-8') + except socket.error: + print("=== socket error ===") + break + except IOError: + print("=== socket closed ===") + break + if data == '': + print("=== socket closed ===") + break + print("received: {}".format(data)) + try: + decoded = json.loads(data) + except ValueError: + print("json decoding failed") + decoded = [-1, ''] + + # Send a response if the sequence number is positive. + # Negative numbers are used for "eval" responses. + if decoded[0] >= 0: + if decoded[1] == 'hello!': + # simply send back a string + response = "got it" + elif decoded[1] == 'make change': + # Send two ex commands at the same time, before replying to + # the request. + cmd = '["ex","call append(\\"$\\",\\"added1\\")"]' + cmd += '["ex","call append(\\"$\\",\\"added2\\")"]' + print("sending: {}".format(cmd)) + thesocket.sendall(cmd.encode('utf-8')) + response = "ok" + elif decoded[1] == '!quit!': + # we're done + sys.exit(0) + else: + response = "what?" + + encoded = json.dumps([decoded[0], response]) + print("sending: {}".format(encoded)) + thesocket.sendall(encoded.encode('utf-8')) + + thesocket = None + +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + +if __name__ == "__main__": + HOST, PORT = "localhost", 0 + + server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler) + ip, port = server.server_address + + # Start a thread with the server -- that thread will then start one + # more thread for each request + server_thread = threading.Thread(target=server.serve_forever) + + # Exit the server thread when the main thread terminates + server_thread.daemon = True + server_thread.start() + + # Write the port number in Xportnr, so that the test knows it. + f = open("Xportnr", "w") + f.write("{}".format(port)) + f.close() + + # Block here + print("Listening on port {}".format(port)) + server.serve_forever() diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim new file mode 100644 index 000000000..140691c39 --- /dev/null +++ b/src/testdir/test_channel.vim @@ -0,0 +1,53 @@ +" Test for channel functions. +scriptencoding utf-8 + +" This requires the Python command to run the test server. +" This most likely only works on Unix. +if !has('unix') || !executable('python') + finish +endif + +func Test_communicate() + " The Python program writes the port number in Xportnr. + silent !./test_channel.py& + + " Wait for up to 2 seconds for the port number to be there. + let cnt = 20 + let l = [] + while cnt > 0 + try + let l = readfile("Xportnr") + catch + endtry + if len(l) >= 1 + break + endif + sleep 100m + let cnt -= 1 + endwhile + call delete("Xportnr") + + if len(l) == 0 + " Can't make the connection, give up. + call system("killall test_channel.py") + return + endif + let port = l[0] + let handle = ch_open('localhost:' . port, 'json') + + " Simple string request and reply. + call assert_equal('got it', ch_sendexpr(handle, 'hello!')) + + " Request that triggers sending two ex commands. These will usually be + " handled before getting the response, but it's not guaranteed, thus wait a + " tiny bit for the commands to get executed. + call assert_equal('ok', ch_sendexpr(handle, 'make change')) + sleep 10m + call assert_equal('added1', getline(line('$') - 1)) + call assert_equal('added2', getline('$')) + + " make the server quit, can't check if this works, should not hang. + call ch_sendexpr(handle, '!quit!', 0) + + call system("killall test_channel.py") +endfunc diff --git a/src/version.c b/src/version.c index b93a59dcf..6aa4baa93 100644 --- a/src/version.c +++ b/src/version.c @@ -743,6 +743,8 @@ static char *(features[]) = static int included_patches[] = { /* Add new patch number below this line */ /**/ + 1246, +/**/ 1245, /**/ 1244, |