summaryrefslogtreecommitdiff
path: root/src/channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/channel.c')
-rw-r--r--src/channel.c643
1 files changed, 616 insertions, 27 deletions
diff --git a/src/channel.c b/src/channel.c
index a108d7e7c..bb7db7e49 100644
--- a/src/channel.c
+++ b/src/channel.c
@@ -14,17 +14,106 @@
#if defined(FEAT_CHANNEL) || defined(PROTO)
+/*
+ * Change the zero to 1 to enable debugging.
+ * This will write a file "channel_debug.log".
+ */
+#if 0
+# define CHERROR(fmt, arg) cherror(fmt, arg)
+# define CHLOG(idx, send, buf) chlog(idx, send, buf)
+# define CHFILE "channel_debug.log"
+
+static void cherror(char *fmt, char *arg);
+static void chlog(int send, char_u *buf);
+#else
+# define CHERROR(fmt, arg)
+# define CHLOG(idx, send, buf)
+#endif
+
+/* TRUE when netbeans is running with a GUI. */
+#ifdef FEAT_GUI
+# define CH_HAS_GUI (gui.in_use || gui.starting)
+#endif
+
+/* Note: when making changes here also adjust configure.in. */
+#ifdef WIN32
+/* WinSock API is separated from C API, thus we can't use read(), write(),
+ * errno... */
+# define SOCK_ERRNO errno = WSAGetLastError()
+# undef ECONNREFUSED
+# define ECONNREFUSED WSAECONNREFUSED
+# ifdef EINTR
+# undef EINTR
+# endif
+# define EINTR WSAEINTR
+# define sock_write(sd, buf, len) send(sd, buf, len, 0)
+# define sock_read(sd, buf, len) recv(sd, buf, len, 0)
+# define sock_close(sd) closesocket(sd)
+# define sleep(t) Sleep(t*1000) /* WinAPI Sleep() accepts milliseconds */
+#else
+# include <netdb.h>
+# include <netinet/in.h>
+
+# include <sys/socket.h>
+# ifdef HAVE_LIBGEN_H
+# include <libgen.h>
+# endif
+# define SOCK_ERRNO
+# define sock_write(sd, buf, len) write(sd, buf, len)
+# define sock_read(sd, buf, len) read(sd, buf, len)
+# define sock_close(sd) close(sd)
+#endif
+
+#ifdef FEAT_GUI_W32
+extern HWND s_hwnd; /* Gvim's Window handle */
+#endif
+
+struct readqueue
+{
+ char_u *buffer;
+ struct readqueue *next;
+ struct readqueue *prev;
+};
+typedef struct readqueue queue_T;
+
typedef struct {
- sock_T ch_fd;
- int ch_idx;
+ sock_T ch_fd; /* the socket, -1 for a closed channel */
+ int ch_idx; /* used by channel_poll_setup() */
+ queue_T ch_head; /* dummy node, header for circular queue */
+
+ int ch_error; /* When TRUE an error was reported. Avoids giving
+ * pages full of error messages when the other side
+ * has exited, only mention the first error until the
+ * connection works again. */
+#ifdef FEAT_GUI_X11
+ XtInputId ch_inputHandler; /* Cookie for input */
+#endif
+#ifdef FEAT_GUI_GTK
+ gint ch_inputHandler; /* Cookie for input */
+#endif
+#ifdef FEAT_GUI_W32
+ int ch_inputHandler = -1; /* simply ret.value of WSAAsyncSelect() */
+#endif
+
+ void (*ch_close_cb)(void); /* callback invoked when channel is closed */
} channel_T;
+/*
+ * Information about all channels.
+ * There can be gaps for closed channels, they will be reused later.
+ */
static channel_T *channels = NULL;
static int channel_count = 0;
/*
+ * TODO: open debug file when desired.
+ */
+FILE *debugfd = NULL;
+
+/*
* Add a new channel slot, return the index.
- * Returns -1 if out of space.
+ * The channel isn't actually used into ch_fd is set >= 0;
+ * Returns -1 if all channels are in use.
*/
static int
add_channel(void)
@@ -39,59 +128,559 @@ add_channel(void)
return idx;
if (channel_count == MAX_OPEN_CHANNELS)
return -1;
- new_channels = (channel_T *)alloc(sizeof(channel_T) * channel_count + 1);
+ new_channels = (channel_T *)alloc(sizeof(channel_T) * (channel_count + 1));
if (new_channels == NULL)
return -1;
if (channels != NULL)
mch_memmove(new_channels, channels, sizeof(channel_T) * channel_count);
channels = new_channels;
+ (void)vim_memset(&channels[channel_count], 0, sizeof(channel_T));
+
channels[channel_count].ch_fd = (sock_T)-1;
+#ifdef FEAT_GUI_X11
+ channels[channel_count].ch_inputHandler = (XtInputId)NULL;
+#endif
+#ifdef FEAT_GUI_GTK
+ channels[channel_count].ch_inputHandler = 0;
+#endif
+#ifdef FEAT_GUI_W32
+ channels[channel_count].ch_inputHandler = -1;
+#endif
return channel_count++;
}
-#if defined(FEAT_NETBEANS_INTG) || defined(PROTO)
-static int netbeans_channel = -1;
+#if defined(FEAT_GUI) || defined(PROTO)
+/*
+ * Read a command from netbeans.
+ */
+#ifdef FEAT_GUI_X11
+ static void
+messageFromNetbeans(XtPointer clientData,
+ int *unused1 UNUSED,
+ XtInputId *unused2 UNUSED)
+{
+ channel_read((int)(long)clientData);
+}
+#endif
+
+#ifdef FEAT_GUI_GTK
+ static void
+messageFromNetbeans(gpointer clientData,
+ gint unused1 UNUSED,
+ GdkInputCondition unused2 UNUSED)
+{
+ channel_read((int)(long)clientData);
+}
+#endif
+
+ static void
+channel_gui_register(int idx)
+{
+ channel_T *channel = &channels[idx];
+
+ if (!CH_HAS_GUI)
+ return;
+
+# ifdef FEAT_GUI_X11
+ /* tell notifier we are interested in being called
+ * when there is input on the editor connection socket
+ */
+ if (channel->ch_inputHandler == (XtInputId)NULL)
+ channel->ch_inputHandler =
+ XtAppAddInput((XtAppContext)app_context, channel->ch_fd,
+ (XtPointer)(XtInputReadMask + XtInputExceptMask),
+ messageFromNetbeans, (XtPointer)idx);
+# else
+# ifdef FEAT_GUI_GTK
+ /*
+ * Tell gdk we are interested in being called when there
+ * is input on the editor connection socket
+ */
+ if (channel->ch_inputHandler == 0)
+ channel->ch_inputHandler =
+ gdk_input_add((gint)channel->ch_fd, (GdkInputCondition)
+ ((int)GDK_INPUT_READ + (int)GDK_INPUT_EXCEPTION),
+ messageFromNetbeans, (gpointer)(long)idx);
+# else
+# ifdef FEAT_GUI_W32
+ /*
+ * Tell Windows we are interested in receiving message when there
+ * is input on the editor connection socket.
+ * TODO: change WM_NETBEANS to something related to the channel index.
+ */
+ if (channel->ch_inputHandler == -1)
+ channel->ch_inputHandler =
+ WSAAsyncSelect(channel->ch_fd, s_hwnd, WM_NETBEANS, FD_READ);
+# endif
+# endif
+# endif
+}
/*
- * Add the netbeans socket to the channels.
- * Return the channel index.
+ * Register any of our file descriptors with the GUI event handling system.
+ * Called when the GUI has started.
+ */
+ void
+channel_gui_register_all(void)
+{
+ int i;
+
+ for (i = 0; i < channel_count; ++i)
+ if (channels[i].ch_fd >= 0)
+ channel_gui_register(i);
+}
+
+ static void
+channel_gui_unregister(int idx)
+{
+ channel_T *channel = &channels[idx];
+
+# ifdef FEAT_GUI_X11
+ if (channel->ch_inputHandler != (XtInputId)NULL)
+ {
+ XtRemoveInput(channel->ch_inputHandler);
+ channel->ch_inputHandler = (XtInputId)NULL;
+ }
+# else
+# ifdef FEAT_GUI_GTK
+ if (channel->ch_inputHandler != 0)
+ {
+ gdk_input_remove(channel->ch_inputHandler);
+ channel->ch_inputHandler = 0;
+ }
+# else
+# ifdef FEAT_GUI_W32
+ if (channel->ch_inputHandler == 0)
+ {
+ WSAAsyncSelect(nbsock, s_hwnd, 0, 0);
+ channel->ch_inputHandler = -1;
+ }
+# endif
+# endif
+# endif
+}
+
+#endif
+
+/*
+ * Open a channel to "hostname":"port".
+ * Returns the channel number for success.
+ * Returns a negative number for failure.
*/
int
-channel_add_netbeans(sock_T fd)
+channel_open(char *hostname, int port_in, void (*close_cb)(void))
{
- int idx = add_channel();
+ int sd;
+ struct sockaddr_in server;
+ struct hostent * host;
+#ifdef FEAT_GUI_W32
+ u_short port = port_in;
+#else
+ int port = port_in;
+#endif
+ int idx;
+
+#ifdef FEAT_GUI_W32
+ channel_init_winsock();
+#endif
+
+ idx = add_channel();
+ if (idx < 0)
+ {
+ CHERROR("All channels are in use\n", "");
+ EMSG(_("E999: All channels are in use"));
+ return -1;
+ }
- if (idx >= 0)
+ if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1)
{
- channels[idx].ch_fd = fd;
- netbeans_channel = idx;
+ CHERROR("error in socket() in channel_open()\n", "");
+ PERROR("E999: socket() in channel_open()");
+ return -1;
}
+
+ /* Get the server internet address and put into addr structure */
+ /* fill in the socket address structure and connect to server */
+ vim_memset((char *)&server, 0, sizeof(server));
+ server.sin_family = AF_INET;
+ server.sin_port = htons(port);
+ if ((host = gethostbyname(hostname)) == NULL)
+ {
+ CHERROR("error in gethostbyname() in channel_open()\n", "");
+ PERROR("E999: gethostbyname() in channel_open()");
+ sock_close(sd);
+ return -1;
+ }
+ memcpy((char *)&server.sin_addr, host->h_addr, host->h_length);
+
+ /* Connect to server */
+ if (connect(sd, (struct sockaddr *)&server, sizeof(server)))
+ {
+ SOCK_ERRNO;
+ CHERROR("channel_open: Connect failed with errno %d\n", errno);
+ if (errno == ECONNREFUSED)
+ {
+ sock_close(sd);
+ if ((sd = (sock_T)socket(AF_INET, SOCK_STREAM, 0)) == (sock_T)-1)
+ {
+ SOCK_ERRNO;
+ CHERROR("socket() retry in channel_open()\n", "");
+ PERROR("E999: socket() retry in channel_open()");
+ return -1;
+ }
+ if (connect(sd, (struct sockaddr *)&server, sizeof(server)))
+ {
+ int retries = 36;
+ int success = FALSE;
+
+ SOCK_ERRNO;
+ while (retries-- && ((errno == ECONNREFUSED)
+ || (errno == EINTR)))
+ {
+ CHERROR("retrying...\n", "");
+ mch_delay(3000L, TRUE);
+ ui_breakcheck();
+ if (got_int)
+ {
+ errno = EINTR;
+ break;
+ }
+ if (connect(sd, (struct sockaddr *)&server,
+ sizeof(server)) == 0)
+ {
+ success = TRUE;
+ break;
+ }
+ SOCK_ERRNO;
+ }
+ if (!success)
+ {
+ /* Get here when the server can't be found. */
+ CHERROR("Cannot connect to port after retry\n", "");
+ PERROR(_("E999: Cannot connect to port after retry2"));
+ sock_close(sd);
+ return -1;
+ }
+ }
+ }
+ else
+ {
+ CHERROR("Cannot connect to port\n", "");
+ PERROR(_("E999: Cannot connect to port"));
+ sock_close(sd);
+ return -1;
+ }
+ }
+
+ channels[idx].ch_fd = sd;
+ channels[idx].ch_close_cb = close_cb;
+
+#ifdef FEAT_GUI
+ channel_gui_register(idx);
+#endif
+
return idx;
}
- void
-channel_remove_netbeans()
+/*
+ * Return TRUE when channel "idx" is open.
+ */
+ int
+channel_is_open(int idx)
{
- channels[netbeans_channel].ch_fd = (sock_T)-1;
- netbeans_channel = -1;
+ return channels[idx].ch_fd >= 0;
}
+
+/*
+ * Close channel "idx".
+ * This does not trigger the close callback.
+ */
+ void
+channel_close(int idx)
+{
+ channel_T *channel = &channels[idx];
+
+ if (channel->ch_fd >= 0)
+ {
+ sock_close(channel->ch_fd);
+ channel->ch_fd = -1;
+#ifdef FEAT_GUI
+ channel_gui_unregister(idx);
#endif
+ }
+}
- static void
+/*
+ * Store "buf[len]" on channel "idx".
+ */
+ void
+channel_save(int idx, char_u *buf, int len)
+{
+ queue_T *node;
+ queue_T *head = &channels[idx].ch_head;
+
+ node = (queue_T *)alloc(sizeof(queue_T));
+ if (node == NULL)
+ return; /* out of memory */
+ node->buffer = alloc(len + 1);
+ if (node->buffer == NULL)
+ {
+ vim_free(node);
+ return; /* out of memory */
+ }
+ mch_memmove(node->buffer, buf, (size_t)len);
+ node->buffer[len] = NUL;
+
+ if (head->next == NULL) /* initialize circular queue */
+ {
+ head->next = head;
+ head->prev = head;
+ }
+
+ /* insert node at tail of queue */
+ node->next = head;
+ node->prev = head->prev;
+ head->prev->next = node;
+ head->prev = node;
+
+ if (debugfd != NULL)
+ {
+ fprintf(debugfd, "RECV on %d: ", idx);
+ fwrite(buf, len, 1, debugfd);
+ fprintf(debugfd, "\n");
+ }
+}
+
+/*
+ * Return the first buffer from the channel without removing it.
+ * Returns NULL if there is nothing.
+ */
+ char_u *
+channel_peek(int idx)
+{
+ queue_T *head = &channels[idx].ch_head;
+
+ if (head->next == head || head->next == NULL)
+ return NULL;
+ return head->next->buffer;
+}
+
+/*
+ * Return the first buffer from the channel and remove it.
+ * The caller must free it.
+ * Returns NULL if there is nothing.
+ */
+ char_u *
+channel_get(int idx)
+{
+ queue_T *head = &channels[idx].ch_head;
+ queue_T *node;
+ char_u *p;
+
+ if (head->next == head || head->next == NULL)
+ return NULL;
+ node = head->next;
+ /* dispose of the node but keep the buffer */
+ p = node->buffer;
+ head->next = node->next;
+ node->next->prev = node->prev;
+ vim_free(node);
+ return p;
+}
+
+/*
+ * Collapses the first and second buffer in the channel "idx".
+ * Returns FAIL if that is not possible.
+ */
+ int
+channel_collapse(int idx)
+{
+ queue_T *head = &channels[idx].ch_head;
+ queue_T *node = head->next;
+ char_u *p;
+
+ if (node == head || node == NULL || node->next == head)
+ return FAIL;
+
+ p = alloc((unsigned)(STRLEN(node->buffer)
+ + STRLEN(node->next->buffer) + 1));
+ if (p == NULL)
+ return FAIL; /* out of memory */
+ STRCPY(p, node->buffer);
+ STRCAT(p, node->next->buffer);
+ vim_free(node->next->buffer);
+ node->next->buffer = p;
+
+ /* dispose of the node and buffer */
+ head->next = node->next;
+ node->next->prev = node->prev;
+ vim_free(node->buffer);
+ vim_free(node);
+ return OK;
+}
+
+/*
+ * Clear the read buffer on channel "idx".
+ */
+ void
+channel_clear(int idx)
+{
+ queue_T *head = &channels[idx].ch_head;
+ queue_T *node = head->next;
+ queue_T *next;
+
+ while (node != NULL && node != head)
+ {
+ next = node->next;
+ vim_free(node->buffer);
+ vim_free(node);
+ if (next == head)
+ {
+ head->next = head;
+ head->prev = head;
+ break;
+ }
+ node = next;
+ }
+}
+
+/* Sent when the channel is found closed when reading. */
+#define DETACH_MSG "\"DETACH\"\n"
+
+/* Buffer size for reading incoming messages. */
+#define MAXMSGSIZE 4096
+
+/*
+ * Read from channel "idx". The data is put in the read queue.
+ */
+ void
channel_read(int idx)
{
-# ifdef FEAT_NETBEANS_INTG
- if (idx == netbeans_channel)
- netbeans_read();
- else
+ static char_u *buf = NULL;
+ int len = 0;
+ int readlen = 0;
+#ifdef HAVE_SELECT
+ struct timeval tval;
+ fd_set rfds;
+#else
+# ifdef HAVE_POLL
+ struct pollfd fds;
+# endif
+#endif
+ channel_T *channel = &channels[idx];
+
+ if (channel->ch_fd < 0)
+ {
+ CHLOG(idx, FALSE, "channel_read() called while socket is closed\n");
+ return;
+ }
+
+ /* Allocate a buffer to read into. */
+ if (buf == NULL)
+ {
+ buf = alloc(MAXMSGSIZE);
+ if (buf == NULL)
+ return; /* out of memory! */
+ }
+
+ /* Keep on reading for as long as there is something to read.
+ * Use select() or poll() to avoid blocking on a message that is exactly
+ * MAXMSGSIZE long. */
+ for (;;)
+ {
+#ifdef HAVE_SELECT
+ FD_ZERO(&rfds);
+ FD_SET(channel->ch_fd, &rfds);
+ tval.tv_sec = 0;
+ tval.tv_usec = 0;
+ if (select(channel->ch_fd + 1, &rfds, NULL, NULL, &tval) <= 0)
+ break;
+#else
+# ifdef HAVE_POLL
+ fds.fd = channel->ch_fd;
+ fds.events = POLLIN;
+ if (poll(&fds, 1, 0) <= 0)
+ break;
# endif
+#endif
+ len = sock_read(channel->ch_fd, buf, MAXMSGSIZE);
+ if (len <= 0)
+ break; /* error or nothing more to read */
+
+ /* Store the read message in the queue. */
+ channel_save(idx, buf, len);
+ readlen += len;
+ if (len < MAXMSGSIZE)
+ break; /* did read everything that's available */
+ }
+
+ /* Reading a socket disconnection (readlen == 0), or a socket error. */
+ if (readlen <= 0)
{
- ; /* TODO: read */
+ /* Queue a "DETACH" netbeans message in the command queue in order to
+ * terminate the netbeans session later. Do not end the session here
+ * directly as we may be running in the context of a call to
+ * netbeans_parse_messages():
+ * netbeans_parse_messages
+ * -> autocmd triggered while processing the netbeans cmd
+ * -> ui_breakcheck
+ * -> gui event loop or select loop
+ * -> channel_read()
+ */
+ channel_save(idx, (char_u *)DETACH_MSG, (int)STRLEN(DETACH_MSG));
+
+ channel_close(idx);
+ if (channel->ch_close_cb != NULL)
+ (*channel->ch_close_cb)();
+
+ if (len < 0)
+ {
+ /* Todo: which channel? */
+ CHERROR("%s(): cannot from channel\n", "channel_read");
+ PERROR(_("E999: read from channel"));
+ }
}
+
+#if defined(CH_HAS_GUI) && defined(FEAT_GUI_GTK)
+ if (CH_HAS_GUI && gtk_main_level() > 0)
+ gtk_main_quit();
+#endif
+}
+
+/*
+ * Write "buf" (NUL terminated string) to channel "idx".
+ * When "fun" is not NULL an error message might be given.
+ */
+ void
+channel_send(int idx, char_u *buf, char *fun)
+{
+ channel_T *channel = &channels[idx];
+ int len = (int)STRLEN(buf);
+
+ if (channel->ch_fd < 0)
+ {
+ if (!channel->ch_error && fun != NULL)
+ {
+ CHERROR(" %s(): write while not connected\n", fun);
+ EMSG2("E630: %s(): write while not connected", fun);
+ }
+ channel->ch_error = TRUE;
+ }
+ else if (sock_write(channel->ch_fd, buf, len) != len)
+ {
+ if (!channel->ch_error && fun != NULL)
+ {
+ CHERROR(" %s(): write failed\n", fun);
+ EMSG2("E631: %s(): write failed", fun);
+ }
+ channel->ch_error = TRUE;
+ }
+ else
+ channel->ch_error = FALSE;
}
-#if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO)
+# if (defined(UNIX) && !defined(HAVE_SELECT)) || defined(PROTO)
/*
* Add open channels to the poll struct.
* Return the adjusted struct index.
@@ -138,9 +727,9 @@ channel_poll_check(int ret_in, void *fds_in)
return ret;
}
-#endif /* UNIX && !HAVE_SELECT */
+# endif /* UNIX && !HAVE_SELECT */
-#if (defined(UNIX) && defined(HAVE_SELECT)) || defined(PROTO)
+# if (defined(UNIX) && defined(HAVE_SELECT)) || defined(PROTO)
/*
* The type of "rfds" is hidden to avoid problems with the function proto.
*/
@@ -182,6 +771,6 @@ channel_select_check(int ret_in, void *rfds_in)
return ret;
}
-#endif /* UNIX && HAVE_SELECT */
+# endif /* UNIX && HAVE_SELECT */
#endif /* FEAT_CHANNEL */