diff options
Diffstat (limited to 'Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp')
-rw-r--r-- | Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp b/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp new file mode 100644 index 000000000..ef7ba8998 --- /dev/null +++ b/Source/WebKit2/NetworkProcess/cache/NetworkCacheIOChannelSoup.cpp @@ -0,0 +1,285 @@ +/* + * Copyright (C) 2015 Igalia S.L. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS + * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include "config.h" +#include "NetworkCacheIOChannel.h" + +#if ENABLE(NETWORK_CACHE) + +#include "NetworkCacheFileSystem.h" +#include <wtf/MainThread.h> +#include <wtf/RunLoop.h> +#include <wtf/glib/GUniquePtr.h> + +namespace WebKit { +namespace NetworkCache { + +static const size_t gDefaultReadBufferSize = 4096; + +IOChannel::IOChannel(const String& filePath, Type type) + : m_path(filePath) + , m_type(type) +{ + auto path = WebCore::fileSystemRepresentation(filePath); + GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data())); + switch (m_type) { + case Type::Create: { + g_file_delete(file.get(), nullptr, nullptr); + m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr))); +#if !HAVE(STAT_BIRTHTIME) + GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()))); + g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr); +#endif + break; + } + case Type::Write: { + m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr)); + break; + } + case Type::Read: + m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr))); + break; + } +} + +IOChannel::~IOChannel() +{ + RELEASE_ASSERT(!m_wasDeleted.exchange(true)); +} + +Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type) +{ + return adoptRef(*new IOChannel(filePath, type)); +} + +static inline void runTaskInQueue(Function<void ()>&& task, WorkQueue* queue) +{ + if (queue) { + queue->dispatch(WTFMove(task)); + return; + } + + // Using nullptr as queue submits the result to the main context. + RunLoop::main().dispatch(WTFMove(task)); +} + +static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data) +{ + GRefPtr<SoupBuffer> buffer; + if (size != readBuffer->length) { + // The subbuffer does not copy the data. + buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size)); + } else + buffer = readBuffer; + + if (data.isNull()) { + // First chunk, we need to force the data to be copied. + data = { reinterpret_cast<const uint8_t*>(buffer->data), size }; + } else { + Data dataRead(WTFMove(buffer)); + // Concatenate will copy the data. + data = concatenate(data, dataRead); + } +} + +struct ReadAsyncData { + RefPtr<IOChannel> channel; + GRefPtr<SoupBuffer> buffer; + RefPtr<WorkQueue> queue; + size_t bytesToRead; + std::function<void (Data&, int error)> completionHandler; + Data data; +}; + +static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData) +{ + std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData)); + gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr); + if (bytesRead == -1) { + WorkQueue* queue = asyncData->queue.get(); + runTaskInQueue([asyncData = WTFMove(asyncData)] { + asyncData->completionHandler(asyncData->data, -1); + }, queue); + return; + } + + if (!bytesRead) { + WorkQueue* queue = asyncData->queue.get(); + runTaskInQueue([asyncData = WTFMove(asyncData)] { + asyncData->completionHandler(asyncData->data, 0); + }, queue); + return; + } + + ASSERT(bytesRead > 0); + fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data); + + size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size(); + if (!pendingBytesToRead) { + WorkQueue* queue = asyncData->queue.get(); + runTaskInQueue([asyncData = WTFMove(asyncData)] { + asyncData->completionHandler(asyncData->data, 0); + }, queue); + return; + } + + size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length); + // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released. + auto data = const_cast<char*>(asyncData->buffer->data); + g_input_stream_read_async(stream, data, bytesToRead, G_PRIORITY_DEFAULT, nullptr, + reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release()); +} + +void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler) +{ + RefPtr<IOChannel> channel(this); + if (!m_inputStream) { + runTaskInQueue([channel, completionHandler] { + Data data; + completionHandler(data, -1); + }, queue); + return; + } + + if (!isMainThread()) { + readSyncInThread(offset, size, queue, completionHandler); + return; + } + + size_t bufferSize = std::min(size, gDefaultReadBufferSize); + uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize)); + GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree)); + ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, completionHandler, { } }; + + // FIXME: implement offset. + g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, G_PRIORITY_DEFAULT, nullptr, + reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData); +} + +void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, std::function<void (Data&, int error)> completionHandler) +{ + ASSERT(!isMainThread()); + + RefPtr<IOChannel> channel(this); + detachThread(createThread("IOChannel::readSync", [channel, size, queue, completionHandler] { + size_t bufferSize = std::min(size, gDefaultReadBufferSize); + uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize)); + GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree)); + Data data; + size_t pendingBytesToRead = size; + size_t bytesToRead = bufferSize; + do { + // FIXME: implement offset. + gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr); + if (bytesRead == -1) { + runTaskInQueue([channel, completionHandler] { + Data data; + completionHandler(data, -1); + }, queue); + return; + } + + if (!bytesRead) + break; + + ASSERT(bytesRead > 0); + fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data); + + pendingBytesToRead = size - data.size(); + bytesToRead = std::min(pendingBytesToRead, readBuffer->length); + } while (pendingBytesToRead); + + GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer(); + runTaskInQueue([channel, bufferCapture, completionHandler] { + GRefPtr<SoupBuffer> buffer = bufferCapture; + Data data = { WTFMove(buffer) }; + completionHandler(data, 0); + }, queue); + })); +} + +struct WriteAsyncData { + RefPtr<IOChannel> channel; + GRefPtr<SoupBuffer> buffer; + RefPtr<WorkQueue> queue; + std::function<void (int error)> completionHandler; +}; + +static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData) +{ + std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData)); + gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr); + if (bytesWritten == -1) { + WorkQueue* queue = asyncData->queue.get(); + runTaskInQueue([asyncData = WTFMove(asyncData)] { + asyncData->completionHandler(-1); + }, queue); + return; + } + + gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten; + if (!pendingBytesToWrite) { + WorkQueue* queue = asyncData->queue.get(); + runTaskInQueue([asyncData = WTFMove(asyncData)] { + asyncData->completionHandler(0); + }, queue); + return; + } + + asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite)); + // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released. + auto data = asyncData->buffer->data; + g_output_stream_write_async(stream, data, pendingBytesToWrite, G_PRIORITY_DEFAULT_IDLE, nullptr, + reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release()); +} + +void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, std::function<void (int error)> completionHandler) +{ + RefPtr<IOChannel> channel(this); + if (!m_outputStream && !m_ioStream) { + runTaskInQueue([channel, completionHandler] { + completionHandler(-1); + }, queue); + return; + } + + GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get())); + if (!stream) { + runTaskInQueue([channel, completionHandler] { + completionHandler(-1); + }, queue); + return; + } + + WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, completionHandler }; + // FIXME: implement offset. + g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), G_PRIORITY_DEFAULT_IDLE, nullptr, + reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData); +} + +} // namespace NetworkCache +} // namespace WebKit + +#endif |