summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/channel.c293
-rw-r--r--src/misc2.c3
-rw-r--r--src/os_unix.c12
-rw-r--r--src/proto/channel.pro6
-rw-r--r--src/structs.h7
-rw-r--r--src/testdir/test_channel.vim8
-rw-r--r--src/version.c2
-rw-r--r--src/vim.h2
8 files changed, 288 insertions, 45 deletions
diff --git a/src/channel.c b/src/channel.c
index c9c0b7c95..ba9f9c736 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -973,6 +973,7 @@ channel_set_job(channel_T *channel, job_T *job, jobopt_T *options)
/* Special mode: send last-but-one line when appending a line
* to the buffer. */
in_part->ch_buffer->b_write_to_channel = TRUE;
+ in_part->ch_buf_append = TRUE;
in_part->ch_buf_top =
in_part->ch_buffer->b_ml.ml_line_count + 1;
}
@@ -1047,6 +1048,8 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout;
if (opt->jo_set & JO_ERR_TIMEOUT)
channel->ch_part[PART_ERR].ch_timeout = opt->jo_err_timeout;
+ if (opt->jo_set & JO_BLOCK_WRITE)
+ channel->ch_part[PART_IN].ch_block_write = 1;
if (opt->jo_set & JO_CALLBACK)
{
@@ -1193,9 +1196,78 @@ write_buf_line(buf_T *buf, linenr_T lnum, channel_T *channel)
}
/*
+ * Return TRUE if "channel" can be written to.
+ * Returns FALSE if the input is closed or the write would block.
+ */
+ static int
+can_write_buf_line(channel_T *channel)
+{
+ chanpart_T *in_part = &channel->ch_part[PART_IN];
+
+ if (in_part->ch_fd == INVALID_FD)
+ return FALSE; /* pipe was closed */
+
+ /* for testing: block every other attempt to write */
+ if (in_part->ch_block_write == 1)
+ in_part->ch_block_write = -1;
+ else if (in_part->ch_block_write == -1)
+ in_part->ch_block_write = 1;
+
+ /* TODO: Win32 implementation, probably using WaitForMultipleObjects() */
+#ifndef WIN32
+ {
+# if defined(HAVE_SELECT)
+ struct timeval tval;
+ fd_set wfds;
+ int ret;
+
+ FD_ZERO(&wfds);
+ FD_SET((int)in_part->ch_fd, &wfds);
+ tval.tv_sec = 0;
+ tval.tv_usec = 0;
+ for (;;)
+ {
+ ret = select((int)in_part->ch_fd + 1, NULL, &wfds, NULL, &tval);
+# ifdef EINTR
+ SOCK_ERRNO;
+ if (ret == -1 && errno == EINTR)
+ continue;
+# endif
+ if (ret <= 0 || in_part->ch_block_write == 1)
+ {
+ if (ret > 0)
+ ch_log(channel, "FAKED Input not ready for writing");
+ else
+ ch_log(channel, "Input not ready for writing");
+ return FALSE;
+ }
+ break;
+ }
+# else
+ struct pollfd fds;
+
+ fds.fd = in_part->ch_fd;
+ fds.events = POLLOUT;
+ if (poll(&fds, 1, 0) <= 0)
+ {
+ ch_log(channel, "Input not ready for writing");
+ return FALSE;
+ }
+ if (in_part->ch_block_write == 1)
+ {
+ ch_log(channel, "FAKED Input not ready for writing");
+ return FALSE;
+ }
+# endif
+ }
+#endif
+ return TRUE;
+}
+
+/*
* Write any lines to the input channel.
*/
- void
+ static void
channel_write_in(channel_T *channel)
{
chanpart_T *in_part = &channel->ch_part[PART_IN];
@@ -1203,8 +1275,8 @@ channel_write_in(channel_T *channel)
buf_T *buf = in_part->ch_buffer;
int written = 0;
- if (buf == NULL)
- return;
+ if (buf == NULL || in_part->ch_buf_append)
+ return; /* no buffer or using appending */
if (!buf_valid(buf) || buf->b_ml.ml_mfp == NULL)
{
/* buffer was wiped out or unloaded */
@@ -1215,10 +1287,8 @@ channel_write_in(channel_T *channel)
for (lnum = in_part->ch_buf_top; lnum <= in_part->ch_buf_bot
&& lnum <= buf->b_ml.ml_line_count; ++lnum)
{
- if (in_part->ch_fd == INVALID_FD)
- /* pipe was closed */
- return;
- /* TODO: check if channel can be written to, do not block on write */
+ if (!can_write_buf_line(channel))
+ break;
write_buf_line(buf, lnum, channel);
++written;
}
@@ -1229,6 +1299,37 @@ channel_write_in(channel_T *channel)
ch_logn(channel, "written %d lines to channel", written);
in_part->ch_buf_top = lnum;
+ if (lnum > buf->b_ml.ml_line_count)
+ {
+ /* Writing is done, no longer need the buffer. */
+ in_part->ch_buffer = NULL;
+ ch_log(channel, "Finished writing all lines to channel");
+ }
+ else
+ ch_logn(channel, "Still %d more lines to write",
+ buf->b_ml.ml_line_count - lnum + 1);
+}
+
+/*
+ * Write any lines waiting to be written to a channel.
+ */
+ void
+channel_write_any_lines()
+{
+ channel_T *channel;
+
+ for (channel = first_channel; channel != NULL; channel = channel->ch_next)
+ {
+ chanpart_T *in_part = &channel->ch_part[PART_IN];
+
+ if (in_part->ch_buffer != NULL)
+ {
+ if (in_part->ch_buf_append)
+ channel_write_new_lines(in_part->ch_buffer);
+ else
+ channel_write_in(channel);
+ }
+ }
}
/*
@@ -1248,15 +1349,16 @@ channel_write_new_lines(buf_T *buf)
linenr_T lnum;
int written = 0;
- if (in_part->ch_buffer == buf)
+ if (in_part->ch_buffer == buf && in_part->ch_buf_append)
{
if (in_part->ch_fd == INVALID_FD)
- /* pipe was closed */
- continue;
+ continue; /* pipe was closed */
found_one = TRUE;
for (lnum = in_part->ch_buf_bot; lnum < buf->b_ml.ml_line_count;
++lnum)
{
+ if (!can_write_buf_line(channel))
+ break;
write_buf_line(buf, lnum, channel);
++written;
}
@@ -1265,6 +1367,9 @@ channel_write_new_lines(buf_T *buf)
ch_logn(channel, "written line %d to channel", (int)lnum - 1);
else if (written > 1)
ch_logn(channel, "written %d lines to channel", written);
+ if (lnum < buf->b_ml.ml_line_count)
+ ch_logn(channel, "Still %d more lines to write",
+ buf->b_ml.ml_line_count - lnum);
in_part->ch_buf_bot = lnum;
}
@@ -2379,6 +2484,57 @@ channel_free_all(void)
/* Buffer size for reading incoming messages. */
#define MAXMSGSIZE 4096
+#if defined(HAVE_SELECT)
+/*
+ * Add write fds where we are waiting for writing to be possible.
+ */
+ static int
+channel_fill_wfds(int maxfd_arg, fd_set *wfds)
+{
+ int maxfd = maxfd_arg;
+ channel_T *ch;
+
+ for (ch = first_channel; ch != NULL; ch = ch->ch_next)
+ {
+ chanpart_T *in_part = &ch->ch_part[PART_IN];
+
+ if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL)
+ {
+ FD_SET((int)in_part->ch_fd, wfds);
+ if ((int)in_part->ch_fd >= maxfd)
+ maxfd = (int)in_part->ch_fd + 1;
+ }
+ }
+ return maxfd;
+}
+#else
+/*
+ * Add write fds where we are waiting for writing to be possible.
+ */
+ static int
+channel_fill_poll_write(int nfd_in, struct pollfd *fds)
+{
+ int nfd = nfd_in;
+ channel_T *ch;
+
+ for (ch = first_channel; ch != NULL; ch = ch->ch_next)
+ {
+ chanpart_T *in_part = &ch->ch_part[PART_IN];
+
+ if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL)
+ {
+ in_part->ch_poll_idx = nfd;
+ fds[nfd].fd = in_part->ch_fd;
+ fds[nfd].events = POLLOUT;
+ ++nfd;
+ }
+ else
+ in_part->ch_poll_idx = -1;
+ }
+ return nfd;
+}
+#endif
+
/*
* Check for reading from "fd" with "timeout" msec.
* Return FAIL when there is nothing to read.
@@ -2403,6 +2559,10 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
if (PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &nread, NULL)
&& nread > 0)
return OK;
+
+ /* perhaps write some buffer lines */
+ channel_write_any_lines();
+
sleep_time = deadline - GetTickCount();
if (sleep_time <= 0)
break;
@@ -2422,31 +2582,56 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
#if defined(HAVE_SELECT)
struct timeval tval;
fd_set rfds;
- int ret;
+ fd_set wfds;
+ int ret;
+ int maxfd;
- FD_ZERO(&rfds);
- FD_SET((int)fd, &rfds);
tval.tv_sec = timeout / 1000;
tval.tv_usec = (timeout % 1000) * 1000;
for (;;)
{
- ret = select((int)fd + 1, &rfds, NULL, NULL, &tval);
+ FD_ZERO(&rfds);
+ FD_SET((int)fd, &rfds);
+
+ /* Write lines to a pipe when a pipe can be written to. Need to
+ * set this every time, some buffers may be done. */
+ maxfd = (int)fd + 1;
+ FD_ZERO(&wfds);
+ maxfd = channel_fill_wfds(maxfd, &wfds);
+
+ ret = select(maxfd, &rfds, &wfds, NULL, &tval);
# ifdef EINTR
SOCK_ERRNO;
if (ret == -1 && errno == EINTR)
continue;
# endif
if (ret > 0)
- return OK;
+ {
+ if (FD_ISSET(fd, &rfds))
+ return OK;
+ channel_write_any_lines();
+ continue;
+ }
break;
}
#else
- struct pollfd fds;
+ for (;;)
+ {
+ struct pollfd fds[MAX_OPEN_CHANNELS + 1];
+ int nfd = 1;
- fds.fd = fd;
- fds.events = POLLIN;
- if (poll(&fds, 1, timeout) > 0)
- return OK;
+ fds[0].fd = fd;
+ fds[0].events = POLLIN;
+ nfd = channel_fill_poll_write(nfd, fds);
+ if (poll(fds, nfd, timeout) > 0)
+ {
+ if (fds[0].revents & POLLIN)
+ return OK;
+ channel_write_any_lines();
+ continue;
+ }
+ break;
+ }
#endif
}
return FAIL;
@@ -3010,10 +3195,12 @@ channel_poll_setup(int nfd_in, void *fds_in)
{
for (part = PART_SOCK; part < PART_IN; ++part)
{
- if (channel->ch_part[part].ch_fd != INVALID_FD)
+ chanpart_T *ch_part = &channel->ch_part[part];
+
+ if (ch_part->ch_fd != INVALID_FD)
{
- channel->ch_part[part].ch_poll_idx = nfd;
- fds[nfd].fd = channel->ch_part[part].ch_fd;
+ ch_part->ch_poll_idx = nfd;
+ fds[nfd].fd = ch_part->ch_fd;
fds[nfd].events = POLLIN;
nfd++;
}
@@ -3022,6 +3209,8 @@ channel_poll_setup(int nfd_in, void *fds_in)
}
}
+ nfd = channel_fill_poll_write(nfd, fds);
+
return nfd;
}
@@ -3035,19 +3224,35 @@ channel_poll_check(int ret_in, void *fds_in)
channel_T *channel;
struct pollfd *fds = fds_in;
int part;
+ int idx;
+ chanpart_T *in_part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{
for (part = PART_SOCK; part < PART_IN; ++part)
{
- int idx = channel->ch_part[part].ch_poll_idx;
+ idx = channel->ch_part[part].ch_poll_idx;
- if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN)
+ if (ret > 0 && idx != -1 && (fds[idx].revents & POLLIN))
{
channel_read(channel, part, "channel_poll_check");
--ret;
}
}
+
+ in_part = &channel->ch_part[PART_IN];
+ idx = in_part->ch_poll_idx;
+ if (ret > 0 && idx != -1 && (fds[idx].revents & POLLOUT))
+ {
+ if (in_part->ch_buf_append)
+ {
+ if (in_part->ch_buffer != NULL)
+ channel_write_new_lines(in_part->ch_buffer);
+ }
+ else
+ channel_write_in(channel);
+ --ret;
+ }
}
return ret;
@@ -3056,14 +3261,15 @@ channel_poll_check(int ret_in, void *fds_in)
# if (!defined(WIN32) && defined(HAVE_SELECT)) || defined(PROTO)
/*
- * The type of "rfds" is hidden to avoid problems with the function proto.
+ * The "fd_set" type is hidden to avoid problems with the function proto.
*/
int
-channel_select_setup(int maxfd_in, void *rfds_in)
+channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in)
{
int maxfd = maxfd_in;
channel_T *channel;
fd_set *rfds = rfds_in;
+ fd_set *wfds = wfds_in;
int part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
@@ -3081,19 +3287,23 @@ channel_select_setup(int maxfd_in, void *rfds_in)
}
}
+ maxfd = channel_fill_wfds(maxfd, wfds);
+
return maxfd;
}
/*
- * The type of "rfds" is hidden to avoid problems with the function proto.
+ * The "fd_set" type is hidden to avoid problems with the function proto.
*/
int
-channel_select_check(int ret_in, void *rfds_in)
+channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
{
int ret = ret_in;
channel_T *channel;
fd_set *rfds = rfds_in;
+ fd_set *wfds = wfds_in;
int part;
+ chanpart_T *in_part;
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{
@@ -3107,6 +3317,20 @@ channel_select_check(int ret_in, void *rfds_in)
--ret;
}
}
+
+ in_part = &channel->ch_part[PART_IN];
+ if (ret > 0 && in_part->ch_fd != INVALID_FD
+ && FD_ISSET(in_part->ch_fd, wfds))
+ {
+ if (in_part->ch_buf_append)
+ {
+ if (in_part->ch_buffer != NULL)
+ channel_write_new_lines(in_part->ch_buffer);
+ }
+ else
+ channel_write_in(channel);
+ --ret;
+ }
}
return ret;
@@ -3608,6 +3832,13 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
return FAIL;
}
}
+ else if (STRCMP(hi->hi_key, "block_write") == 0)
+ {
+ if (!(supported & JO_BLOCK_WRITE))
+ break;
+ opt->jo_set |= JO_BLOCK_WRITE;
+ opt->jo_block_write = get_tv_number(item);
+ }
else
break;
--todo;
@@ -3827,8 +4058,8 @@ job_start(typval_T *argvars)
clear_job_options(&opt);
opt.jo_mode = MODE_NL;
if (get_job_options(&argvars[1], &opt,
- JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL
- + JO_STOPONEXIT + JO_EXIT_CB + JO_OUT_IO) == FAIL)
+ JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL + JO_STOPONEXIT
+ + JO_EXIT_CB + JO_OUT_IO + JO_BLOCK_WRITE) == FAIL)
return job;
/* Check that when io is "file" that there is a file name. */
diff --git a/src/misc2.c b/src/misc2.c
index ca340b71e..a0cce07f6 100644
--- a/src/misc2.c
+++ b/src/misc2.c
@@ -6230,6 +6230,9 @@ parse_queued_messages(void)
netbeans_parse_messages();
# endif
# ifdef FEAT_JOB_CHANNEL
+ /* Write any buffer lines still to be written. */
+ channel_write_any_lines();
+
/* Process the messages queued on channels. */
channel_parse_messages();
# endif
diff --git a/src/os_unix.c b/src/os_unix.c
index 74ffe9300..dc8e00952 100644
--- a/src/os_unix.c
+++ b/src/os_unix.c
@@ -5539,7 +5539,8 @@ RealWaitForChar(int fd, long msec, int *check_for_gpm UNUSED, int *break_loop)
# endif
#endif
#ifndef HAVE_SELECT
- struct pollfd fds[6 + MAX_OPEN_CHANNELS];
+ /* each channel may use in, out and err */
+ struct pollfd fds[6 + 3 * MAX_OPEN_CHANNELS];
int nfd;
# ifdef FEAT_XCLIPBOARD
int xterm_idx = -1;
@@ -5652,7 +5653,7 @@ RealWaitForChar(int fd, long msec, int *check_for_gpm UNUSED, int *break_loop)
struct timeval tv;
struct timeval *tvp;
- fd_set rfds, efds;
+ fd_set rfds, wfds, efds;
int maxfd;
long towait = msec;
@@ -5685,6 +5686,7 @@ RealWaitForChar(int fd, long msec, int *check_for_gpm UNUSED, int *break_loop)
*/
select_eintr:
FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
FD_ZERO(&efds);
FD_SET(fd, &rfds);
# if !defined(__QNX__) && !defined(__CYGWIN32__)
@@ -5725,10 +5727,10 @@ select_eintr:
}
# endif
# ifdef FEAT_JOB_CHANNEL
- maxfd = channel_select_setup(maxfd, &rfds);
+ maxfd = channel_select_setup(maxfd, &rfds, &wfds);
# endif
- ret = select(maxfd + 1, &rfds, NULL, &efds, tvp);
+ ret = select(maxfd + 1, &rfds, &wfds, &efds, tvp);
result = ret > 0 && FD_ISSET(fd, &rfds);
if (result)
--ret;
@@ -5810,7 +5812,7 @@ select_eintr:
# endif
#ifdef FEAT_JOB_CHANNEL
if (ret > 0)
- ret = channel_select_check(ret, &rfds);
+ ret = channel_select_check(ret, &rfds, &wfds);
#endif
#endif /* HAVE_SELECT */
diff --git a/src/proto/channel.pro b/src/proto/channel.pro
index e33490875..b796d82bf 100644
--- a/src/proto/channel.pro
+++ b/src/proto/channel.pro
@@ -13,7 +13,7 @@ void channel_set_pipes(channel_T *channel, sock_T in, sock_T out, sock_T err);
void channel_set_job(channel_T *channel, job_T *job, jobopt_T *options);
void channel_set_options(channel_T *channel, jobopt_T *opt);
void channel_set_req_callback(channel_T *channel, int part, char_u *callback, partial_T *partial, int id);
-void channel_write_in(channel_T *channel);
+void channel_write_any_lines(void);
void channel_write_new_lines(buf_T *buf);
char_u *channel_get(channel_T *channel, int part);
int channel_collapse(channel_T *channel, int part);
@@ -37,8 +37,8 @@ void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);
int channel_poll_setup(int nfd_in, void *fds_in);
int channel_poll_check(int ret_in, void *fds_in);
-int channel_select_setup(int maxfd_in, void *rfds_in);
-int channel_select_check(int ret_in, void *rfds_in);
+int channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in);
+int channel_select_check(int ret_in, void *rfds_in, void *wfds_in);
int channel_parse_messages(void);
int set_ref_in_channel(int copyID);
int channel_part_send(channel_T *channel);
diff --git a/src/structs.h b/src/structs.h
index 68b791789..e753860b8 100644
--- a/src/structs.h
+++ b/src/structs.h
@@ -1383,12 +1383,15 @@ typedef struct {
#else
struct timeval ch_deadline;
#endif
+ int ch_block_write; /* for testing: 0 when not used, -1 when write
+ * does not block, 1 simulate blocking */
cbq_T ch_cb_head; /* dummy node for per-request callbacks */
char_u *ch_callback; /* call when a msg is not handled */
partial_T *ch_partial;
buf_T *ch_buffer; /* buffer to read from or write to */
+ int ch_buf_append; /* write appended lines instead top-bot */
linenr_T ch_buf_top; /* next line to send */
linenr_T ch_buf_bot; /* last line to send */
} chanpart_T;
@@ -1457,7 +1460,8 @@ struct channel_S {
#define JO_ERR_BUF 0x2000000 /* "err_buf" (JO_OUT_BUF << 1) */
#define JO_IN_BUF 0x4000000 /* "in_buf" (JO_OUT_BUF << 2) */
#define JO_CHANNEL 0x8000000 /* "channel" */
-#define JO_ALL 0xfffffff
+#define JO_BLOCK_WRITE 0x10000000 /* "block_write" */
+#define JO_ALL 0x7fffffff
#define JO_MODE_ALL (JO_MODE + JO_IN_MODE + JO_OUT_MODE + JO_ERR_MODE)
#define JO_CB_ALL \
@@ -1499,6 +1503,7 @@ typedef struct
int jo_timeout;
int jo_out_timeout;
int jo_err_timeout;
+ int jo_block_write; /* for testing only */
int jo_part;
int jo_id;
char_u jo_soe_buf[NUMBUFLEN];
diff --git a/src/testdir/test_channel.vim b/src/testdir/test_channel.vim
index 8356ba7a5..5e34ad2ca 100644
--- a/src/testdir/test_channel.vim
+++ b/src/testdir/test_channel.vim
@@ -791,7 +791,7 @@ func Run_test_pipe_from_buffer(use_name)
sp pipe-input
call setline(1, ['echo one', 'echo two', 'echo three'])
- let options = {'in_io': 'buffer'}
+ let options = {'in_io': 'buffer', 'block_write': 1}
if a:use_name
let options['in_name'] = 'pipe-input'
else
@@ -885,7 +885,8 @@ func Test_pipe_io_two_buffers()
let job = job_start(s:python . " test_channel_pipe.py",
\ {'in_io': 'buffer', 'in_name': 'pipe-input', 'in_top': 0,
- \ 'out_io': 'buffer', 'out_name': 'pipe-output'})
+ \ 'out_io': 'buffer', 'out_name': 'pipe-output',
+ \ 'block_write': 1})
call assert_equal("run", job_status(job))
try
exe "normal Gaecho hello\<CR>"
@@ -920,7 +921,8 @@ func Test_pipe_io_one_buffer()
let job = job_start(s:python . " test_channel_pipe.py",
\ {'in_io': 'buffer', 'in_name': 'pipe-io', 'in_top': 0,
- \ 'out_io': 'buffer', 'out_name': 'pipe-io'})
+ \ 'out_io': 'buffer', 'out_name': 'pipe-io',
+ \ 'block_write': 1})
call assert_equal("run", job_status(job))
try
exe "normal Goecho hello\<CR>"
diff --git a/src/version.c b/src/version.c
index 9ba137583..826ee1deb 100644
--- a/src/version.c
+++ b/src/version.c
@@ -749,6 +749,8 @@ static char *(features[]) =
static int included_patches[] =
{ /* Add new patch number below this line */
/**/
+ 1669,
+/**/
1668,
/**/
1667,
diff --git a/src/vim.h b/src/vim.h
index a2dc07961..36cc19a27 100644
--- a/src/vim.h
+++ b/src/vim.h
@@ -493,13 +493,11 @@ typedef unsigned long u8char_T; /* long should be 32 bits or more */
#ifndef HAVE_SELECT
# ifdef HAVE_SYS_POLL_H
# include <sys/poll.h>
-# define HAVE_POLL
# elif defined(WIN32)
# define HAVE_SELECT
# else
# ifdef HAVE_POLL_H
# include <poll.h>
-# define HAVE_POLL
# endif
# endif
#endif