/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ /* * Copyright (C) 1999-2008 Novell, Inc. (www.novell.com) * * This library is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation. * * This library is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License * for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this library. If not, see . * */ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #ifdef G_OS_WIN32 #define WIN32_LEAN_AND_MEAN #include #endif #include "camel-msgport.h" #ifdef G_OS_WIN32 #define MP_CLOSE(socket) closesocket (socket) #define MP_READ(socket, buf, nbytes) recv((socket), (buf), (nbytes), 0) #define MP_WRITE(socket, buf, nbytes) send((socket), (buf), (nbytes), 0) #define MP_IS_STATUS_INTR() 0 /* No WSAEINTR errors in WinSock2 */ #else #define MP_CLOSE(socket) close (socket) #define MP_READ(socket, buf, nbytes) read((socket), (buf), (nbytes)) #define MP_WRITE(socket, buf, nbytes) write((socket), (buf), (nbytes)) #define MP_IS_STATUS_INTR() (errno == EINTR) #endif /* message flags */ enum { MSG_FLAG_SYNC_WITH_PIPE = 1 << 0, MSG_FLAG_SYNC_WITH_PR_PIPE = 1 << 1 }; struct _CamelMsgPort { GAsyncQueue *queue; gint pipe[2]; /* on Win32, actually a pair of SOCKETs */ PRFileDesc *prpipe[2]; }; static gint msgport_pipe (gint *fds) { #ifndef G_OS_WIN32 if (pipe (fds) != -1) return 0; fds[0] = -1; fds[1] = -1; return -1; #else SOCKET temp, socket1 = -1, socket2 = -1; struct sockaddr_in saddr; gint len; u_long arg; fd_set read_set, write_set; struct timeval tv; temp = socket (AF_INET, SOCK_STREAM, 0); if (temp == INVALID_SOCKET) { goto out0; } arg = 1; if (ioctlsocket (temp, FIONBIO, &arg) == SOCKET_ERROR) { goto out0; } memset (&saddr, 0, sizeof (saddr)); saddr.sin_family = AF_INET; saddr.sin_port = 0; saddr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); if (bind (temp, (struct sockaddr *) &saddr, sizeof (saddr))) { goto out0; } if (listen (temp, 1) == SOCKET_ERROR) { goto out0; } len = sizeof (saddr); if (getsockname (temp, (struct sockaddr *) &saddr, &len)) { goto out0; } socket1 = socket (AF_INET, SOCK_STREAM, 0); if (socket1 == INVALID_SOCKET) { goto out0; } arg = 1; if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) { goto out1; } if (connect (socket1, (struct sockaddr *) &saddr, len) != SOCKET_ERROR || WSAGetLastError () != WSAEWOULDBLOCK) { goto out1; } FD_ZERO (&read_set); FD_SET (temp, &read_set); tv.tv_sec = 0; tv.tv_usec = 0; if (select (0, &read_set, NULL, NULL, NULL) == SOCKET_ERROR) { goto out1; } if (!FD_ISSET (temp, &read_set)) { goto out1; } socket2 = accept (temp, (struct sockaddr *) &saddr, &len); if (socket2 == INVALID_SOCKET) { goto out1; } FD_ZERO (&write_set); FD_SET (socket1, &write_set); tv.tv_sec = 0; tv.tv_usec = 0; if (select (0, NULL, &write_set, NULL, NULL) == SOCKET_ERROR) { goto out2; } if (!FD_ISSET (socket1, &write_set)) { goto out2; } arg = 0; if (ioctlsocket (socket1, FIONBIO, &arg) == SOCKET_ERROR) { goto out2; } arg = 0; if (ioctlsocket (socket2, FIONBIO, &arg) == SOCKET_ERROR) { goto out2; } fds[0] = socket1; fds[1] = socket2; closesocket (temp); return 0; out2: closesocket (socket2); out1: closesocket (socket1); out0: closesocket (temp); errno = EMFILE; /* FIXME: use the real syscall errno? */ fds[0] = -1; fds[1] = -1; return -1; #endif } static gint msgport_prpipe (PRFileDesc **fds) { #ifdef G_OS_WIN32 if (PR_NewTCPSocketPair (fds) != PR_FAILURE) return 0; #else if (PR_CreatePipe (&fds[0], &fds[1]) != PR_FAILURE) return 0; #endif fds[0] = NULL; fds[1] = NULL; return -1; } static void msgport_sync_with_pipe (gint fd) { gchar buffer[1]; while (fd >= 0) { if (MP_READ (fd, buffer, 1) > 0) break; else if (!MP_IS_STATUS_INTR ()) { g_warning ( "%s: Failed to read from pipe: %s", G_STRFUNC, g_strerror (errno)); break; } } } static void msgport_sync_with_prpipe (PRFileDesc *prfd) { gchar buffer[1]; while (prfd != NULL) { if (PR_Read (prfd, buffer, 1) > 0) break; else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { gchar *text = g_alloca (PR_GetErrorTextLength ()); PR_GetErrorText (text); g_warning ( "%s: Failed to read from NSPR pipe: %s", G_STRFUNC, text); break; } } } /** * camel_msgport_new: * * Since: 2.24 **/ CamelMsgPort * camel_msgport_new (void) { CamelMsgPort *msgport; msgport = g_slice_new (CamelMsgPort); msgport->queue = g_async_queue_new (); msgport->pipe[0] = -1; msgport->pipe[1] = -1; msgport->prpipe[0] = NULL; msgport->prpipe[1] = NULL; return msgport; } /** * camel_msgport_destroy: * * Since: 2.24 **/ void camel_msgport_destroy (CamelMsgPort *msgport) { g_return_if_fail (msgport != NULL); if (msgport->pipe[0] >= 0) { MP_CLOSE (msgport->pipe[0]); MP_CLOSE (msgport->pipe[1]); } if (msgport->prpipe[0] != NULL) { PR_Close (msgport->prpipe[0]); PR_Close (msgport->prpipe[1]); } g_async_queue_unref (msgport->queue); g_slice_free (CamelMsgPort, msgport); } /** * camel_msgport_fd: * * Since: 2.24 **/ gint camel_msgport_fd (CamelMsgPort *msgport) { gint fd; g_return_val_if_fail (msgport != NULL, -1); g_async_queue_lock (msgport->queue); fd = msgport->pipe[0]; if (fd < 0 && msgport_pipe (msgport->pipe) == 0) fd = msgport->pipe[0]; g_async_queue_unlock (msgport->queue); return fd; } /** * camel_msgport_prfd: * * Since: 2.24 **/ PRFileDesc * camel_msgport_prfd (CamelMsgPort *msgport) { PRFileDesc *prfd; g_return_val_if_fail (msgport != NULL, NULL); g_async_queue_lock (msgport->queue); prfd = msgport->prpipe[0]; if (prfd == NULL && msgport_prpipe (msgport->prpipe) == 0) prfd = msgport->prpipe[0]; g_async_queue_unlock (msgport->queue); return prfd; } /** * camel_msgport_push: * * Since: 2.24 **/ void camel_msgport_push (CamelMsgPort *msgport, CamelMsg *msg) { gint fd; PRFileDesc *prfd; g_return_if_fail (msgport != NULL); g_return_if_fail (msg != NULL); g_async_queue_lock (msgport->queue); msg->flags = 0; fd = msgport->pipe[1]; while (fd >= 0) { if (MP_WRITE (fd, "E", 1) > 0) { msg->flags |= MSG_FLAG_SYNC_WITH_PIPE; break; } else if (!MP_IS_STATUS_INTR ()) { g_warning ( "%s: Failed to write to pipe: %s", G_STRFUNC, g_strerror (errno)); break; } } prfd = msgport->prpipe[1]; while (prfd != NULL) { if (PR_Write (prfd, "E", 1) > 0) { msg->flags |= MSG_FLAG_SYNC_WITH_PR_PIPE; break; } else if (PR_GetError () != PR_PENDING_INTERRUPT_ERROR) { gchar *text = g_alloca (PR_GetErrorTextLength ()); PR_GetErrorText (text); g_warning ( "%s: Failed to write to NSPR pipe: %s", G_STRFUNC, text); break; } } g_async_queue_push_unlocked (msgport->queue, msg); g_async_queue_unlock (msgport->queue); } /** * camel_msgport_pop: * * Since: 2.24 **/ CamelMsg * camel_msgport_pop (CamelMsgPort *msgport) { CamelMsg *msg; g_return_val_if_fail (msgport != NULL, NULL); g_async_queue_lock (msgport->queue); msg = g_async_queue_pop_unlocked (msgport->queue); g_return_val_if_fail (msg != NULL, NULL); if (msg->flags & MSG_FLAG_SYNC_WITH_PIPE) msgport_sync_with_pipe (msgport->pipe[0]); if (msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) msgport_sync_with_prpipe (msgport->prpipe[0]); g_async_queue_unlock (msgport->queue); return msg; } /** * camel_msgport_try_pop: * * Since: 2.24 **/ CamelMsg * camel_msgport_try_pop (CamelMsgPort *msgport) { CamelMsg *msg; g_return_val_if_fail (msgport != NULL, NULL); g_async_queue_lock (msgport->queue); msg = g_async_queue_try_pop_unlocked (msgport->queue); if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE) msgport_sync_with_pipe (msgport->pipe[0]); if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) msgport_sync_with_prpipe (msgport->prpipe[0]); g_async_queue_unlock (msgport->queue); return msg; } /** * camel_msgport_timeout_pop: * @msgport: a #CamelMsgPort * @timeout: number of microseconds to wait * * Since: 3.8 **/ CamelMsg * camel_msgport_timeout_pop (CamelMsgPort *msgport, guint64 timeout) { CamelMsg *msg; g_return_val_if_fail (msgport != NULL, NULL); g_async_queue_lock (msgport->queue); msg = g_async_queue_timeout_pop_unlocked (msgport->queue, timeout); if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PIPE) msgport_sync_with_pipe (msgport->pipe[0]); if (msg != NULL && msg->flags & MSG_FLAG_SYNC_WITH_PR_PIPE) msgport_sync_with_prpipe (msgport->prpipe[0]); g_async_queue_unlock (msgport->queue); return msg; } /** * camel_msgport_reply: * * Since: 2.24 **/ void camel_msgport_reply (CamelMsg *msg) { g_return_if_fail (msg != NULL); if (msg->reply_port) camel_msgport_push (msg->reply_port, msg); /* else lost? */ }