summaryrefslogtreecommitdiff
path: root/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp')
-rw-r--r--Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp285
1 files changed, 285 insertions, 0 deletions
diff --git a/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp b/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp
new file mode 100644
index 000000000..ef7ba8998
--- /dev/null
+++ b/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp
@@ -0,0 +1,285 @@
+/*
+ * Copyright (C) 2015 Igalia S.L.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+ * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
+ * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "config.h"
+#include "NetworkCacheIOChannel.h"
+
+#if ENABLE(NETWORK_CACHE)
+
+#include "NetworkCacheFileSystem.h"
+#include <wtf/MainThread.h>
+#include <wtf/RunLoop.h>
+#include <wtf/glib/GUniquePtr.h>
+
+namespace WebKit {
+namespace NetworkCache {
+
+static const size_t gDefaultReadBufferSize = 4096;
+
+IOChannel::IOChannel(const String& filePath, Type type)
+ : m_path(filePath)
+ , m_type(type)
+{
+ auto path = WebCore::fileSystemRepresentation(filePath);
+ GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
+ switch (m_type) {
+ case Type::Create: {
+ g_file_delete(file.get(), nullptr, nullptr);
+ m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
+#if !HAVE(STAT_BIRTHTIME)
+ GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())));
+ g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
+#endif
+ break;
+ }
+ case Type::Write: {
+ m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
+ break;
+ }
+ case Type::Read:
+ m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
+ break;
+ }
+}
+
+IOChannel::~IOChannel()
+{
+ RELEASE_ASSERT(!m_wasDeleted.exchange(true));
+}
+
+Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
+{
+ return adoptRef(*new IOChannel(filePath, type));
+}
+
+static inline void runTaskInQueue(Function<void ()>&& task, WorkQueue* queue)
+{
+ if (queue) {
+ queue->dispatch(WTFMove(task));
+ return;
+ }
+
+ // Using nullptr as queue submits the result to the main context.
+ RunLoop::main().dispatch(WTFMove(task));
+}
+
+static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
+{
+ GRefPtr<SoupBuffer> buffer;
+ if (size != readBuffer->length) {
+ // The subbuffer does not copy the data.
+ buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
+ } else
+ buffer = readBuffer;
+
+ if (data.isNull()) {
+ // First chunk, we need to force the data to be copied.
+ data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
+ } else {
+ Data dataRead(WTFMove(buffer));
+ // Concatenate will copy the data.
+ data = concatenate(data, dataRead);
+ }
+}
+
+struct ReadAsyncData {
+ RefPtr<IOChannel> channel;
+ GRefPtr<SoupBuffer> buffer;
+ RefPtr<WorkQueue> queue;
+ size_t bytesToRead;
+ std::function<void (Data&, int error)> completionHandler;
+ Data data;
+};
+
+static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
+{
+ std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
+ gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
+ if (bytesRead == -1) {
+ WorkQueue* queue = asyncData->queue.get();
+ runTaskInQueue([asyncData = WTFMove(asyncData)] {
+ asyncData->completionHandler(asyncData->data, -1);
+ }, queue);
+ return;
+ }
+
+ if (!bytesRead) {
+ WorkQueue* queue = asyncData->queue.get();
+ runTaskInQueue([asyncData = WTFMove(asyncData)] {
+ asyncData->completionHandler(asyncData->data, 0);
+ }, queue);
+ return;
+ }
+
+ ASSERT(bytesRead > 0);
+ fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
+
+ size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
+ if (!pendingBytesToRead) {
+ WorkQueue* queue = asyncData->queue.get();
+ runTaskInQueue([asyncData = WTFMove(asyncData)] {
+ asyncData->completionHandler(asyncData->data, 0);
+ }, queue);
+ return;
+ }
+
+ size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
+ // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released.
+ auto data = const_cast<char*>(asyncData->buffer->data);
+ g_input_stream_read_async(stream, data, bytesToRead, G_PRIORITY_DEFAULT, nullptr,
+ reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
+}
+
+void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
+{
+ RefPtr<IOChannel> channel(this);
+ if (!m_inputStream) {
+ runTaskInQueue([channel, completionHandler] {
+ Data data;
+ completionHandler(data, -1);
+ }, queue);
+ return;
+ }
+
+ if (!isMainThread()) {
+ readSyncInThread(offset, size, queue, completionHandler);
+ return;
+ }
+
+ size_t bufferSize = std::min(size, gDefaultReadBufferSize);
+ uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
+ GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
+ ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, completionHandler, { } };
+
+ // FIXME: implement offset.
+ g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, G_PRIORITY_DEFAULT, nullptr,
+ reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
+}
+
+void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler)
+{
+ ASSERT(!isMainThread());
+
+ RefPtr<IOChannel> channel(this);
+ detachThread(createThread("IOChannel::readSync", [channel, size, queue, completionHandler] {
+ size_t bufferSize = std::min(size, gDefaultReadBufferSize);
+ uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
+ GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
+ Data data;
+ size_t pendingBytesToRead = size;
+ size_t bytesToRead = bufferSize;
+ do {
+ // FIXME: implement offset.
+ gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
+ if (bytesRead == -1) {
+ runTaskInQueue([channel, completionHandler] {
+ Data data;
+ completionHandler(data, -1);
+ }, queue);
+ return;
+ }
+
+ if (!bytesRead)
+ break;
+
+ ASSERT(bytesRead > 0);
+ fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
+
+ pendingBytesToRead = size - data.size();
+ bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
+ } while (pendingBytesToRead);
+
+ GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer();
+ runTaskInQueue([channel, bufferCapture, completionHandler] {
+ GRefPtr<SoupBuffer> buffer = bufferCapture;
+ Data data = { WTFMove(buffer) };
+ completionHandler(data, 0);
+ }, queue);
+ }));
+}
+
+struct WriteAsyncData {
+ RefPtr<IOChannel> channel;
+ GRefPtr<SoupBuffer> buffer;
+ RefPtr<WorkQueue> queue;
+ std::function<void (int error)> completionHandler;
+};
+
+static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
+{
+ std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
+ gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
+ if (bytesWritten == -1) {
+ WorkQueue* queue = asyncData->queue.get();
+ runTaskInQueue([asyncData = WTFMove(asyncData)] {
+ asyncData->completionHandler(-1);
+ }, queue);
+ return;
+ }
+
+ gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
+ if (!pendingBytesToWrite) {
+ WorkQueue* queue = asyncData->queue.get();
+ runTaskInQueue([asyncData = WTFMove(asyncData)] {
+ asyncData->completionHandler(0);
+ }, queue);
+ return;
+ }
+
+ asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
+ // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released.
+ auto data = asyncData->buffer->data;
+ g_output_stream_write_async(stream, data, pendingBytesToWrite, G_PRIORITY_DEFAULT_IDLE, nullptr,
+ reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
+}
+
+void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler)
+{
+ RefPtr<IOChannel> channel(this);
+ if (!m_outputStream && !m_ioStream) {
+ runTaskInQueue([channel, completionHandler] {
+ completionHandler(-1);
+ }, queue);
+ return;
+ }
+
+ GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
+ if (!stream) {
+ runTaskInQueue([channel, completionHandler] {
+ completionHandler(-1);
+ }, queue);
+ return;
+ }
+
+ WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, completionHandler };
+ // FIXME: implement offset.
+ g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), G_PRIORITY_DEFAULT_IDLE, nullptr,
+ reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
+}
+
+} // namespace NetworkCache
+} // namespace WebKit
+
+#endif