summaryrefslogtreecommitdiff
path: root/gdata/gdata-upload-stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'gdata/gdata-upload-stream.c')
-rw-r--r--gdata/gdata-upload-stream.c200
1 files changed, 197 insertions, 3 deletions
diff --git a/gdata/gdata-upload-stream.c b/gdata/gdata-upload-stream.c
index e636bdf5..079cab65 100644
--- a/gdata/gdata-upload-stream.c
+++ b/gdata/gdata-upload-stream.c
@@ -202,17 +202,33 @@ struct _GDataUploadStreamPrivate {
GCancellable *cancellable;
GThread *network_thread;
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ GMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
+#else
GStaticMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
+#endif
gsize message_bytes_outstanding; /* the number of bytes which have been written to the buffer but not libsoup (signalled by write_cond) */
gsize network_bytes_outstanding; /* the number of bytes which have been written to libsoup but not the network (signalled by write_cond) */
gsize network_bytes_written; /* the number of bytes which have been written to the network (signalled by write_cond) */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ GCond write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
+#else
GCond *write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
+#endif
gboolean finished; /* set once the upload thread has finished (protected by response_mutex) */
guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ GCond finished_cond; /* signalled when sending the message (and receiving the response) is finished (protected by response_mutex) */
+#else
GCond *finished_cond; /* signalled when sending the message (and receiving the response) is finished (protected by response_mutex) */
+#endif
GError *response_error; /* error asynchronously set by the network thread, and picked up by the main thread when appropriate */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ GMutex response_mutex; /* mutex for ->response_error, ->response_status and ->finished_cond */
+#else
GStaticMutex response_mutex; /* mutex for ->response_error, ->response_status and ->finished_cond */
+#endif
};
enum {
@@ -367,10 +383,17 @@ gdata_upload_stream_init (GDataUploadStream *self)
{
self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, GDATA_TYPE_UPLOAD_STREAM, GDataUploadStreamPrivate);
self->priv->buffer = gdata_buffer_new ();
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_init (&(self->priv->write_mutex));
+ g_cond_init (&(self->priv->write_cond));
+ g_cond_init (&(self->priv->finished_cond));
+ g_mutex_init (&(self->priv->response_mutex));
+#else
g_static_mutex_init (&(self->priv->write_mutex));
self->priv->write_cond = g_cond_new ();
self->priv->finished_cond = g_cond_new ();
g_static_mutex_init (&(self->priv->response_mutex));
+#endif
}
static GObject *
@@ -459,10 +482,17 @@ gdata_upload_stream_finalize (GObject *object)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (object)->priv;
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_clear (&(priv->response_mutex));
+ g_cond_clear (&(priv->finished_cond));
+ g_cond_clear (&(priv->write_cond));
+ g_mutex_clear (&(priv->write_mutex));
+#else
g_static_mutex_free (&(priv->response_mutex));
g_cond_free (priv->finished_cond);
g_cond_free (priv->write_cond);
g_static_mutex_free (&(priv->write_mutex));
+#endif
gdata_buffer_free (priv->buffer);
g_clear_error (&(priv->response_error));
g_free (priv->upload_uri);
@@ -561,10 +591,17 @@ write_cancelled_cb (GCancellable *cancellable, CancelledData *data)
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_write() function that it should stop blocking and cancel */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ *(data->cancelled) = TRUE;
+ g_cond_signal (&(priv->write_cond));
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (priv->write_cond);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
}
static gssize
@@ -587,12 +624,20 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) write_cancelled_cb, &data, NULL);
/* Check for an error and return if necessary */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
+#endif
if (cancelled == TRUE) {
g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
length_written = -1;
goto done;
@@ -603,7 +648,11 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
old_network_bytes_written = priv->network_bytes_written;
priv->message_bytes_outstanding += count;
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Handle the more common case of the network thread already having been created first */
if (priv->network_thread != NULL) {
@@ -629,9 +678,15 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ priv->network_bytes_outstanding += priv->message->request_body->length;
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
priv->network_bytes_outstanding += priv->message->request_body->length;
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
}
/* Also write out the first chunk of data, so there's guaranteed to be something in the buffer */
@@ -645,11 +700,20 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
}
write:
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
+#endif
/* Wait for it to be written */
- while (priv->network_bytes_written - old_network_bytes_written < count && cancelled == FALSE)
+ while (priv->network_bytes_written - old_network_bytes_written < count && cancelled == FALSE) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
+#else
g_cond_wait (priv->write_cond, g_static_mutex_get_mutex (&(priv->write_mutex)));
+#endif
+ }
length_written = MIN (count, priv->network_bytes_written - old_network_bytes_written);
/* Check for an error and return if necessary */
@@ -659,7 +723,11 @@ write:
length_written = -1;
}
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
done:
/* Disconnect from the cancelled signals. Note that we have to do this with @write_mutex not held, as g_cancellable_disconnect() blocks
@@ -680,10 +748,17 @@ flush_cancelled_cb (GCancellable *cancellable, CancelledData *data)
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_flush() function that it should stop blocking and cancel */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ *(data->cancelled) = TRUE;
+ g_cond_signal (&(priv->write_cond));
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (priv->write_cond);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
}
/* Block until ->network_bytes_outstanding reaches zero. Cancelling the cancellable passed to gdata_upload_stream_flush() breaks out of the wait(),
@@ -710,11 +785,20 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
/* Start the flush operation proper */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
+#endif
/* Wait for all outstanding bytes to be written to the network */
- while (priv->network_bytes_outstanding > 0 && cancelled == FALSE)
+ while (priv->network_bytes_outstanding > 0 && cancelled == FALSE) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
+#else
g_cond_wait (priv->write_cond, g_static_mutex_get_mutex (&(priv->write_mutex)));
+#endif
+ }
/* Check for an error and return if necessary */
if (cancelled == TRUE) {
@@ -723,7 +807,11 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
success = FALSE;
}
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Disconnect from the cancelled signals. Note that we have to do this without @write_mutex held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
@@ -741,10 +829,17 @@ close_cancelled_cb (GCancellable *cancellable, CancelledData *data)
GDataUploadStreamPrivate *priv = data->upload_stream->priv;
/* Signal the gdata_upload_stream_close() function that it should stop blocking and cancel */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->response_mutex));
+ *(data->cancelled) = TRUE;
+ g_cond_signal (&(priv->finished_cond));
+ g_mutex_unlock (&(priv->response_mutex));
+#else
g_static_mutex_lock (&(priv->response_mutex));
*(data->cancelled) = TRUE;
g_cond_signal (priv->finished_cond);
g_static_mutex_unlock (&(priv->response_mutex));
+#endif
}
/* It's guaranteed that we have set ->response_status and ->response_error and are done with *all* network activity before this returns, unless it's
@@ -779,13 +874,27 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
return TRUE;
/* If we've already closed the stream, return G_IO_ERROR_CLOSED */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->response_mutex));
+#else
g_static_mutex_lock (&(priv->response_mutex));
+#endif
+
if (priv->response_status != SOUP_STATUS_NONE) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->response_mutex));
+#else
g_static_mutex_unlock (&(priv->response_mutex));
+#endif
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED, _("Stream is already closed"));
return FALSE;
}
+
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->response_mutex));
+#else
g_static_mutex_unlock (&(priv->response_mutex));
+#endif
/* Allow cancellation */
data.upload_stream = GDATA_UPLOAD_STREAM (stream);
@@ -796,7 +905,11 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) close_cancelled_cb, &data, NULL);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->response_mutex));
+#else
g_static_mutex_lock (&(priv->response_mutex));
+#endif
/* If an operation is still in progress, the upload thread hasn't finished yet… */
if (priv->finished == FALSE) {
@@ -807,9 +920,15 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
gdata_buffer_push_data (priv->buffer, (const guint8*) footer, footer_length);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ priv->message_bytes_outstanding += footer_length;
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
priv->message_bytes_outstanding += footer_length;
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
}
/* Mark the buffer as having reached EOF, and the write operation will close in its own time */
@@ -818,8 +937,13 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
/* Wait for the signal that we've finished. Cancelling the call to gdata_upload_stream_close() will cause this wait to be aborted,
* but won't actually prevent the stream being closed (i.e. all it means is that the stream isn't guaranteed to have been closed by
* the time gdata_upload_stream_close() returns — whereas normally it would be). */
- if (cancelled == FALSE)
+ if (cancelled == FALSE) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_cond_wait (&(priv->finished_cond), &(priv->response_mutex));
+#else
g_cond_wait (priv->finished_cond, g_static_mutex_get_mutex (&(priv->response_mutex)));
+#endif
+ }
}
g_assert (priv->response_status == SOUP_STATUS_NONE);
@@ -849,7 +973,11 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
g_assert (priv->response_status != SOUP_STATUS_NONE && (SOUP_STATUS_IS_SUCCESSFUL (priv->response_status) || child_error != NULL));
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->response_mutex));
+#else
g_static_mutex_unlock (&(priv->response_mutex));
+#endif
/* Disconnect from the signal handler. Note that we have to do this with @response_mutex not held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @response_mutex. */
@@ -884,15 +1012,27 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
gboolean reached_eof = FALSE;
guint8 next_buffer[CHUNK_SIZE];
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
+#endif
/* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream */
if (priv->network_bytes_outstanding > 0) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
return;
}
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Append the next chunk to the message body so it can join in the fun.
* Note that this call isn't blocking, and can return less than the CHUNK_SIZE. This is because
@@ -901,10 +1041,17 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
* stream, so we'd happily block on receiving more bytes which weren't forthcoming. */
length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ priv->message_bytes_outstanding -= length;
+ priv->network_bytes_outstanding += length;
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
priv->message_bytes_outstanding -= length;
priv->network_bytes_outstanding += length;
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Append whatever data was returned */
if (length > 0)
@@ -912,9 +1059,15 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
/* Finish off the request body if we've reached EOF (i.e. the stream has been closed) */
if (reached_eof == TRUE) {
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ g_assert (priv->message_bytes_outstanding == 0);
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
g_assert (priv->message_bytes_outstanding == 0);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
soup_message_body_complete (priv->message->request_body);
}
@@ -928,9 +1081,15 @@ wrote_headers_cb (SoupMessage *message, GDataUploadStream *self)
GDataUploadStreamPrivate *priv = self->priv;
/* Signal the main thread that the headers have been written */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ g_cond_signal (&(priv->write_cond));
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
g_cond_signal (priv->write_cond);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Send the first chunk to libsoup */
write_next_chunk (self, message);
@@ -942,12 +1101,21 @@ wrote_body_data_cb (SoupMessage *message, SoupBuffer *buffer, GDataUploadStream
GDataUploadStreamPrivate *priv = self->priv;
/* Signal the main thread that the chunk has been written */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->write_mutex));
+ g_assert (priv->network_bytes_outstanding > 0);
+ priv->network_bytes_outstanding -= buffer->length;
+ priv->network_bytes_written += buffer->length;
+ g_cond_signal (&(priv->write_cond));
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->write_mutex));
g_assert (priv->network_bytes_outstanding > 0);
priv->network_bytes_outstanding -= buffer->length;
priv->network_bytes_written += buffer->length;
g_cond_signal (priv->write_cond);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Send the next chunk to libsoup */
write_next_chunk (self, message);
@@ -969,16 +1137,30 @@ upload_thread (GDataUploadStream *self)
_gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
/* Signal write_cond, just in case we errored out and finished sending in the middle of a write */
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(priv->response_mutex));
+ g_mutex_lock (&(priv->write_mutex));
+ if (priv->message_bytes_outstanding > 0 || priv->network_bytes_outstanding > 0) {
+ g_cond_signal (&(priv->write_cond));
+ }
+ g_mutex_unlock (&(priv->write_mutex));
+#else
g_static_mutex_lock (&(priv->response_mutex));
g_static_mutex_lock (&(priv->write_mutex));
if (priv->message_bytes_outstanding > 0 || priv->network_bytes_outstanding > 0)
g_cond_signal (priv->write_cond);
g_static_mutex_unlock (&(priv->write_mutex));
+#endif
/* Signal that the operation has finished */
priv->finished = TRUE;
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_cond_signal (&(priv->finished_cond));
+ g_mutex_unlock (&(priv->response_mutex));
+#else
g_cond_signal (priv->finished_cond);
g_static_mutex_unlock (&(priv->response_mutex));
+#endif
g_object_unref (self);
@@ -991,7 +1173,11 @@ create_network_thread (GDataUploadStream *self, GError **error)
GDataUploadStreamPrivate *priv = self->priv;
g_assert (priv->network_thread == NULL);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ priv->network_thread = g_thread_try_new ("upload-thread", (GThreadFunc) upload_thread, self, error);
+#else
priv->network_thread = g_thread_create ((GThreadFunc) upload_thread, self, TRUE, error);
+#endif
}
/**
@@ -1088,7 +1274,11 @@ gdata_upload_stream_get_response (GDataUploadStream *self, gssize *length)
g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), NULL);
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_lock (&(self->priv->response_mutex));
+#else
g_static_mutex_lock (&(self->priv->response_mutex));
+#endif
if (self->priv->response_status == SOUP_STATUS_NONE) {
/* We can't touch the message until the network thread has finished using it, since it isn't threadsafe */
@@ -1104,7 +1294,11 @@ gdata_upload_stream_get_response (GDataUploadStream *self, gssize *length)
_response = self->priv->message->response_body->data;
}
+#if GLIB_CHECK_VERSION (2, 31, 0)
+ g_mutex_unlock (&(self->priv->response_mutex));
+#else
g_static_mutex_unlock (&(self->priv->response_mutex));
+#endif
if (length != NULL)
*length = _length;