summaryrefslogtreecommitdiff
path: root/src/network_gthread_aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/network_gthread_aio.c')
-rw-r--r--src/network_gthread_aio.c481
1 files changed, 0 insertions, 481 deletions
diff --git a/src/network_gthread_aio.c b/src/network_gthread_aio.c
deleted file mode 100644
index 42a2f3bd..00000000
--- a/src/network_gthread_aio.c
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * - MAP_ANON needs _BSD_SOURCE | _SVID_SOURCE
- * - pread() needs _XOPEN_SOURCE 500 on Linux
- * - _XOPEN_SOURCE breaks the compile on FreeBSD (see #1240, #1251)
- *
- */
-#ifndef _GNU_SOURCE
-# ifndef _BSD_SOURCE
-# define _BSD_SOURCE 1
-# endif
-#endif
-
-#ifdef linux
-/* For pread()/pwrite() */
-#define _XOPEN_SOURCE 500
-#endif
-
-#include "settings.h"
-#include "network_backends.h"
-#ifdef USE_GTHREAD_AIO
-#include <sys/types.h>
-#include <sys/stat.h>
-#ifdef HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-#include <stdlib.h>
-#include <fcntl.h>
-#include <assert.h>
-
-#include "network.h"
-#include "fdevent.h"
-#include "log.h"
-#include "stat_cache.h"
-#include "joblist.h"
-#include "timing.h"
-
-#include "sys-files.h"
-#include "sys-socket.h"
-
-typedef struct {
- chunk *c;
-
- void *con;
-
- int sock_fd;
-} write_job;
-
-static write_job *write_job_init() {
- write_job *wj = calloc(1, sizeof(*wj));
-
- return wj;
-}
-
-static void write_job_free(write_job *wj) {
- if (!wj) return;
-
- free(wj);
-}
-
-#define kByte * (1024)
-#define MByte * (1024 kByte)
-
-/**
- * log the time-stamps of the different stages
- */
-static void timing_print(server *srv, connection *con) {
- if (!srv->srvconf.log_timing) return;
-
- TRACE("write-start: %ld.%06ld "
- "read-queue-wait: %ld ms "
- "read-time: %ld ms "
- "write-time: %ld ms ",
- con->timestamps[TIME_SEND_WRITE_START].tv_sec,
- con->timestamps[TIME_SEND_WRITE_START].tv_usec,
-
- TIME_DIFF(TIME_SEND_ASYNC_READ_START, TIME_SEND_ASYNC_READ_QUEUED),
- TIME_DIFF(TIME_SEND_ASYNC_READ_END, TIME_SEND_ASYNC_READ_START),
- TIME_DIFF(TIME_SEND_WRITE_END, TIME_SEND_ASYNC_READ_END_QUEUED)
- );
-}
-
-gpointer network_gthread_aio_read_thread(gpointer _srv) {
- server *srv = (server *)_srv;
-
- GAsyncQueue * inq;
-
- int fadvise_is_enosys = 0;
-
- g_async_queue_ref(srv->aio_write_queue);
-
- inq = srv->aio_write_queue;
-
- /* */
- while (!srv->is_shutdown) {
- write_job *wj = NULL;
-
- if ((wj = g_async_queue_pop(inq))) {
- /* let's see what we have to stat */
- ssize_t r;
- off_t offset;
- size_t toSend;
- chunk *c;
- connection *con;
- off_t max_toSend = 64 kByte; /** should be larger than the send buffer */
-
- int fadvise_fd = 0;
- off_t fadvise_offset = 0;
- off_t fadvise_len = 0;
-
- if(wj == (write_job *) 1)
- continue; /* just notifying us that srv->is_shutdown changed */
-
- c = wj->c;
- con = wj->con;
-
-#if 0
- /* try to be adaptive */
- int snd_buf_size = 0;
- socklen_t snd_buf_size_size = sizeof(snd_buf_size);
-
- if (0 == getsockopt(con->sock->fd, SOL_SOCKET, SO_SNDBUF, &snd_buf_size, &snd_buf_size_size)) {
- /* adjust the read-data to the send-buffer */
- if (snd_buf_size * 4 > max_toSend) {
- max_toSend = snd_buf_size * 4;
- }
- }
-#endif
-
- offset = c->file.start + c->offset;
-
- toSend = c->file.length - c->offset > max_toSend ?
- max_toSend : c->file.length - c->offset;
-
- c->file.copy.offset = 0;
- c->file.copy.length = 0;
-
- /* open a file in /dev/shm to write to */
- if (c->file.mmap.start == MAP_FAILED) {
-#if defined(HAVE_MEM_MMAP_ZERO)
- int mmap_fd;
- if (-1 == (mmap_fd = open("/dev/zero", O_RDWR))) {
- if (errno != EMFILE) {
- TRACE("open(/dev/zero) returned: %d (%s)",
- errno, strerror(errno));
- c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
- } else {
- c->async.ret_val = NETWORK_STATUS_WAIT_FOR_FD;
- }
- } else {
- c->file.mmap.offset = 0;
- c->file.mmap.length = toSend;
-
- c->file.mmap.start = mmap(0, c->file.mmap.length,
- PROT_READ | PROT_WRITE, MAP_SHARED, mmap_fd, 0);
- if (c->file.mmap.start == MAP_FAILED) {
- TRACE("mmap(/dev/zero) returned: %d (%s)",
- errno, strerror(errno));
- c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
- }
-
- close(mmap_fd);
- mmap_fd = -1;
- }
-#elif defined(HAVE_MEM_MMAP_ANON)
- c->file.mmap.offset = 0;
- c->file.mmap.length = toSend; /* align to page-size */
- c->file.mmap.start = mmap(0, c->file.mmap.length,
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
-
- if (c->file.mmap.start == MAP_FAILED) {
- TRACE("mmap(MAP_ANON) returned: %d (%s)",
- errno, strerror(errno));
- c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
- }
-#else
-#error hmm, does your system support mmap(/dev/zero) or mmap(MAP_ANON)
-#endif
- }
- if (c->file.mmap.start != MAP_FAILED) {
- timing_log(srv, con, TIME_SEND_ASYNC_READ_START);
-
- if (-1 == (r = pread(c->file.fd, c->file.mmap.start, toSend, c->file.start + c->offset))) {
- switch(errno) {
- default:
- ERROR("reading file failed: %d (%s)", errno, strerror(errno));
-
- c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
- }
- } else if (r == 0) {
- ERROR("pread(%s) returned 0 ... not good", SAFE_BUF_STR(c->file.name));
-
- c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
- } else {
- timing_log(srv, con, TIME_SEND_ASYNC_READ_END);
- c->file.copy.length = r;
- }
- }
-
- if (c->file.copy.length && !fadvise_is_enosys) {
- fadvise_fd = c->file.fd;
- fadvise_offset = c->file.start + c->offset + c->file.copy.length;
- fadvise_len = c->file.length - c->offset - c->file.copy.length;
-
- if (fadvise_len > max_toSend) {
- fadvise_len = max_toSend;
- }
- }
- timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED);
- /* read async, write as usual */
- joblist_async_append(srv, wj->con);
-
-#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
- /* read ahead */
- if (c->file.copy.length && !fadvise_is_enosys && fadvise_len) {
- /* let's hope that the fd is still valid when we try to read ahead */
- if (-1 == posix_fadvise(fadvise_fd, fadvise_offset, fadvise_len, POSIX_FADV_WILLNEED)) {
- if (ENOSYS != errno) {
- ERROR("posix_fadvise(%d) failed: %s (%d)", fadvise_fd, strerror(errno), errno);
- } else {
- /* don't try again as we don't support it */
- fadvise_is_enosys = 1;
- }
- }
- }
-#endif
- write_job_free(wj);
- }
- }
-
- g_async_queue_unref(srv->aio_write_queue);
-
- return NULL;
-
-}
-
-
-NETWORK_BACKEND_WRITE(gthreadaio) {
- chunk *c, *tc;
- size_t chunks_written = 0;
-
- for(c = cq->first; c; c = c->next, chunks_written++) {
- int chunk_finished = 0;
- network_status_t ret;
-
- switch(c->type) {
- case MEM_CHUNK:
- ret = network_write_chunkqueue_writev_mem(srv, con, sock, cq, c);
-
- /* check which chunks are finished now */
- for (tc = c; tc && chunk_is_done(tc); tc = tc->next) {
- /* skip the first c->next as that will be done by the c = c->next in the other for()-loop */
- if (chunk_finished) {
- c = c->next;
- } else {
- chunk_finished = 1;
- }
- }
-
- if (ret != NETWORK_STATUS_SUCCESS) {
- return ret;
- }
-
- break;
- case FILE_CHUNK: {
- ssize_t r;
-
- /* we might be on our way back from the async request and have a status-code */
- if (c->async.ret_val != NETWORK_STATUS_UNSET) {
- ret = c->async.ret_val;
-
- c->async.ret_val = NETWORK_STATUS_UNSET;
-
- ERROR("thread returned: %d", ret);
-
- return ret;
- }
-
- /* open file if not already opened */
- if (-1 == c->file.fd) {
- if (-1 == (c->file.fd = open(c->file.name->ptr, O_RDONLY /* | O_DIRECT */ | (srv->srvconf.use_noatime ? O_NOATIME : 0)))) {
- ERROR("opening '%s' failed: %s", SAFE_BUF_STR(c->file.name), strerror(errno));
-
- return NETWORK_STATUS_FATAL_ERROR;
- }
-#ifdef FD_CLOEXEC
- fcntl(c->file.fd, F_SETFD, FD_CLOEXEC);
-#endif
-#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_SEQUENTIAL)
- /* tell the kernel that we want to stream the file
- *
- * - POSIX_FADV_SEQUENTIAL doubles the read-ahead on Linux
- *
- * */
- if (-1 == posix_fadvise(c->file.fd, c->file.start, c->file.length, POSIX_FADV_SEQUENTIAL)) {
- if (ENOSYS != errno) {
- ERROR("posix_fadvise(%s) failed: %s (%d)", c->file.name->ptr, strerror(errno), errno);
- }
- }
-#endif
- }
- /* check if we have content */
- if (c->file.copy.length == 0) {
- const off_t max_toSend = 64 kByte; /** should be larger than the send buffer */
- size_t toSend;
- off_t offset;
-
- /* start to write a block out the to net work */
- timing_log(srv, con, TIME_SEND_WRITE_START);
-
- offset = c->file.start + c->offset;
-
- toSend = c->file.length - c->offset > max_toSend ?
- max_toSend : c->file.length - c->offset;
-
- /* we small files don't take the overhead of a full async-loop */
- if (toSend < 4 * 1024) {
-
- c->file.copy.offset = 0;
- c->file.copy.length = toSend;
-
- /* open a file in /dev/shm to write to */
- if (c->file.mmap.start == MAP_FAILED) {
-#if defined(HAVE_MEM_MMAP_ZERO)
- int mmap_fd;
-
- if (-1 == (mmap_fd = open("/dev/zero", O_RDWR))) {
- if (errno != EMFILE) {
- TRACE("open(/dev/zero) returned: %d (%s), open fds: %d",
- errno, strerror(errno));
- return NETWORK_STATUS_FATAL_ERROR;
- } else {
- return NETWORK_STATUS_WAIT_FOR_FD;
- }
- } else {
- c->file.mmap.offset = 0;
- c->file.mmap.length = c->file.copy.length; /* align to page-size */
-
- c->file.mmap.start = mmap(0, c->file.mmap.length,
- PROT_READ | PROT_WRITE, MAP_SHARED, mmap_fd, 0);
- if (c->file.mmap.start == MAP_FAILED) {
- ERROR("mmap(%s) failed: %s (%d)", c->file.name->ptr, strerror(errno), errno);
-
- return NETWORK_STATUS_FATAL_ERROR;
- }
-
- close(mmap_fd);
- mmap_fd = -1;
- }
-#elif defined(HAVE_MEM_MMAP_ANON)
- c->file.mmap.offset = 0;
- c->file.mmap.length = c->file.copy.length; /* align to page-size */
- c->file.mmap.start = mmap(0, c->file.mmap.length,
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
-
- if (c->file.mmap.start == MAP_FAILED) {
- TRACE("mmap(MAP_ANON) returned: %d (%s)",
- errno, strerror(errno));
- return NETWORK_STATUS_FATAL_ERROR;
- }
-#else
-#error hmm, does your system support mmap(/dev/zero) or mmap(MAP_ANON)
-#endif
-
- }
-
- if (c->file.mmap.start != MAP_FAILED) {
- lseek(c->file.fd, c->file.start + c->offset, SEEK_SET);
-
- if (-1 == (r = read(c->file.fd, c->file.mmap.start, c->file.copy.length))) {
- switch(errno) {
- default:
- ERROR("reading file failed: %d (%s)", errno, strerror(errno));
-
- return NETWORK_STATUS_FATAL_ERROR;
- }
- } else if (r == 0) {
- ERROR("read() returned 0 ... not good: %s", "");
-
- return NETWORK_STATUS_FATAL_ERROR;
- } else if (r != c->file.copy.length) {
- ERROR("read() returned %zd instead of %jd", r, (intmax_t) c->file.copy.length);
-
- return NETWORK_STATUS_FATAL_ERROR;
- }
- } else {
- return NETWORK_STATUS_FATAL_ERROR;
- }
- } else {
- write_job *wj;
-
- wj = write_job_init();
- wj->c = c;
- wj->con = con;
- wj->sock_fd = sock->fd;
-
- c->async.written = -1;
- c->async.ret_val = NETWORK_STATUS_UNSET;
-
- g_async_queue_push(srv->aio_write_queue, wj);
-
- timing_log(srv, con, TIME_SEND_ASYNC_READ_QUEUED);
-
- return NETWORK_STATUS_WAIT_FOR_AIO_EVENT;
- }
- }
-
- if (-1 == (r = write(sock->fd, c->file.mmap.start + c->file.copy.offset, c->file.copy.length - c->file.copy.offset))) {
- switch (errno) {
- case EINTR:
- case EAGAIN:
- return NETWORK_STATUS_WAIT_FOR_EVENT;
- case EPIPE:
- case ECONNRESET:
- return NETWORK_STATUS_CONNECTION_CLOSE;
- default:
- ERROR("write failed: %d (%s) [%jd, %p, %jd]",
- errno, strerror(errno), (intmax_t) c->file.copy.length,
- c->file.mmap.start, (intmax_t) c->file.copy.offset);
- return NETWORK_STATUS_FATAL_ERROR;
- }
- }
-
- if (r == 0) {
- return NETWORK_STATUS_CONNECTION_CLOSE;
- }
-
- c->file.copy.offset += r; /* offset in the copy-chunk */
-
- c->offset += r; /* global offset in the file */
- cq->bytes_out += r;
-
- if (c->file.copy.offset == (off_t) c->file.mmap.length) {
- /* this block is sent, get a new one */
- timing_log(srv, con, TIME_SEND_WRITE_END);
-
- timing_print(srv, con);
-
- c->file.copy.length = 0;
- }
-
- if (c->offset == c->file.length) {
- chunk_finished = 1;
-
- munmap(c->file.mmap.start, c->file.mmap.length);
- c->file.mmap.start = MAP_FAILED;
-
- if (c->file.copy.fd != -1) {
- close(c->file.copy.fd);
- c->file.copy.fd = -1;
- }
-
- if (c->file.fd != -1) {
- close(c->file.fd);
- c->file.fd = -1;
- }
- }
-
- break;
- }
- default:
-
- log_error_write(srv, __FILE__, __LINE__, "ds", c, "type not known");
-
- return NETWORK_STATUS_FATAL_ERROR;
- }
-
- if (!chunk_finished) {
- /* not finished yet */
-
- return NETWORK_STATUS_WAIT_FOR_EVENT;
- }
- }
-
- return NETWORK_STATUS_SUCCESS;
-}
-
-#endif