diff options
Diffstat (limited to 'gio/gdbusprivate.c')
-rw-r--r-- | gio/gdbusprivate.c | 425 |
1 files changed, 303 insertions, 122 deletions
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c index 0e5bef2c8..c01de77e7 100644 --- a/gio/gdbusprivate.c +++ b/gio/gdbusprivate.c @@ -45,6 +45,9 @@ #include "gsocketoutputstream.h" #ifdef G_OS_UNIX +#ifdef KDBUS_TRANSPORT +#include "gkdbusconnection.h" +#endif #include "gunixfdmessage.h" #include "gunixconnection.h" #include "gunixcredentialsmessage.h" @@ -117,6 +120,63 @@ typedef struct gboolean from_mainloop; } ReadWithControlData; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +typedef struct +{ + GKdbus *kdbus; + GCancellable *cancellable; + + void *buffer; + gsize count; + + GSimpleAsyncResult *simple; + + gboolean from_mainloop; +} ReadKdbusData; + +static void +read_kdbus_data_free (ReadKdbusData *data) +{ + g_object_unref (data->kdbus); + if (data->cancellable != NULL) + g_object_unref (data->cancellable); + g_object_unref (data->simple); + g_free (data); +} + +static gboolean +_g_kdbus_read_ready (GKdbus *kdbus, + GIOCondition condition, + gpointer user_data) +{ + ReadKdbusData *data = user_data; + GError *error; + gssize result; + + error = NULL; + + result = g_kdbus_receive (data->kdbus, + data->buffer, + &error); + if (result >= 0) + { + g_simple_async_result_set_op_res_gssize (data->simple, result); + } + else + { + g_assert (error != NULL); + g_simple_async_result_take_error (data->simple, error); + } + + if (data->from_mainloop) + g_simple_async_result_complete (data->simple); + else + g_simple_async_result_complete_in_idle (data->simple); + + return FALSE; +} +#endif + static void read_with_control_data_free (ReadWithControlData *data) { @@ -167,6 +227,43 @@ _g_socket_read_with_control_messages_ready (GSocket *socket, return FALSE; } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +static void +_g_kdbus_read (GKdbus *kdbus, + void *buffer, + gsize count, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + ReadKdbusData *data; + GSource *source; + + data = g_new0 (ReadKdbusData, 1); + data->kdbus = g_object_ref (kdbus); + data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL; + data->buffer = buffer; + data->count = count; + + data->simple = g_simple_async_result_new (G_OBJECT (kdbus), + callback, + user_data, + _g_kdbus_read); + g_simple_async_result_set_check_cancellable (data->simple, cancellable); + + data->from_mainloop = TRUE; + source = g_kdbus_create_source (data->kdbus, + G_IO_IN | G_IO_HUP | G_IO_ERR, + cancellable); + g_source_set_callback (source, + (GSourceFunc) _g_kdbus_read_ready, + data, + (GDestroyNotify) read_kdbus_data_free); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); +} +#endif + static void _g_socket_read_with_control_messages (GSocket *socket, void *buffer, @@ -215,6 +312,24 @@ _g_socket_read_with_control_messages (GSocket *socket, } } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) +static gssize +_g_kdbus_read_finish (GKdbus *kdbus, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (G_IS_KDBUS (kdbus), -1); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_kdbus_read); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + else + return g_simple_async_result_get_op_res_gssize (simple); +} +#endif + static gssize _g_socket_read_with_control_messages_finish (GSocket *socket, GAsyncResult *result, @@ -364,8 +479,11 @@ struct GDBusWorker GDBusWorkerDisconnectedCallback disconnected_callback; gpointer user_data; - /* if not NULL, stream is GSocketConnection */ + /* if GSocket and GKdbus are NULL, stream is GSocketConnection */ GSocket *socket; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + GKdbus *kdbus; +#endif /* used for reading */ GMutex read_lock; @@ -506,7 +624,7 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker, } /* can only be called from private thread with read-lock held - takes ownership of @message */ -static void +void _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker, GDBusMessage *message) { @@ -584,7 +702,16 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, goto out; error = NULL; +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + bytes_read = _g_kdbus_read_finish (worker->kdbus, + res, + &error); + } else if (worker->socket == NULL) +#else if (worker->socket == NULL) +#endif bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), res, &error); @@ -722,6 +849,13 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream, read_message_print_transport_debug (bytes_read, worker); worker->read_buffer_cur_size += bytes_read; + +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + /* For KDBUS transport we don't have to read message header */ + if (G_IS_KDBUS_CONNECTION (worker->stream)) + worker->read_buffer_bytes_wanted = worker->read_buffer_cur_size; +#endif + if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) { /* OK, got what we asked for! */ @@ -841,12 +975,38 @@ _g_dbus_worker_do_read_unlocked (GDBusWorker *worker) /* ensure we have a (big enough) buffer */ if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size) { - /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */ - worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096); - worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + /* For KDBUS transport we have to alloc buffer only once - DBUS_MAXIMUM_MESSAGE_LENGTH=2^27 */ + worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 134217728); + worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); + } + else + { +#endif + /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */ + worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096); + worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + } +#endif } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + { + _g_kdbus_read(worker->kdbus, + worker->read_buffer, + worker->read_buffer_bytes_wanted, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + + } else if (worker->socket == NULL) +#else if (worker->socket == NULL) +#endif g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream), worker->read_buffer + worker->read_buffer_cur_size, worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size, @@ -986,131 +1146,144 @@ write_message_continue_writing (MessageToWriteData *data) { GOutputStream *ostream; #ifdef G_OS_UNIX - GSimpleAsyncResult *simple; GUnixFDList *fd_list; -#endif - -#ifdef G_OS_UNIX - /* Note: we can't access data->simple after calling g_async_result_complete () because the - * callback can free @data and we're not completing in idle. So use a copy of the pointer. - */ + GSimpleAsyncResult *simple; simple = data->simple; #endif - ostream = g_io_stream_get_output_stream (data->worker->stream); -#ifdef G_OS_UNIX - fd_list = g_dbus_message_get_unix_fd_list (data->message); -#endif - - g_assert (!g_output_stream_has_pending (ostream)); - g_assert_cmpint (data->total_written, <, data->blob_size); - - if (FALSE) +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (data->worker->stream)) { + GError *error; + error = NULL; + data->total_written = g_kdbus_send_message(data->worker, data->worker->kdbus, data->message, data->blob, data->blob_size, &error); + + write_message_print_transport_debug (data->total_written, data); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; } -#ifdef G_OS_UNIX - else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0) + else { - GOutputVector vector; - GSocketControlMessage *control_message; - gssize bytes_written; - GError *error; - - vector.buffer = data->blob; - vector.size = data->blob_size; +#endif + ostream = g_io_stream_get_output_stream (data->worker->stream); +#ifdef G_OS_UNIX + fd_list = g_dbus_message_get_unix_fd_list (data->message); +#endif - control_message = NULL; - if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0) - { - if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) - { - g_simple_async_result_set_error (simple, - G_IO_ERROR, - G_IO_ERROR_FAILED, - "Tried sending a file descriptor but remote peer does not support this capability"); - g_simple_async_result_complete (simple); - g_object_unref (simple); - goto out; - } - control_message = g_unix_fd_message_new_with_fd_list (fd_list); - } + g_assert (!g_output_stream_has_pending (ostream)); + g_assert_cmpint (data->total_written, <, data->blob_size); - error = NULL; - bytes_written = g_socket_send_message (data->worker->socket, - NULL, /* address */ - &vector, - 1, - control_message != NULL ? &control_message : NULL, - control_message != NULL ? 1 : 0, - G_SOCKET_MSG_NONE, - data->worker->cancellable, - &error); - if (control_message != NULL) - g_object_unref (control_message); - - if (bytes_written == -1) + if (FALSE) { - /* Handle WOULD_BLOCK by waiting until there's room in the buffer */ - if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) - { - GSource *source; - source = g_socket_create_source (data->worker->socket, - G_IO_OUT | G_IO_HUP | G_IO_ERR, - data->worker->cancellable); - g_source_set_callback (source, - (GSourceFunc) on_socket_ready, - data, - NULL); /* GDestroyNotify */ - g_source_attach (source, g_main_context_get_thread_default ()); - g_source_unref (source); - g_error_free (error); - goto out; - } - g_simple_async_result_take_error (simple, error); - g_simple_async_result_complete (simple); - g_object_unref (simple); - goto out; } - g_assert (bytes_written > 0); /* zero is never returned */ - - write_message_print_transport_debug (bytes_written, data); - - data->total_written += bytes_written; - g_assert (data->total_written <= data->blob_size); - if (data->total_written == data->blob_size) +#ifdef G_OS_UNIX + else if (G_IS_SOCKET_OUTPUT_STREAM (ostream) && data->total_written == 0) { - g_simple_async_result_complete (simple); - g_object_unref (simple); - goto out; - } - - write_message_continue_writing (data); - } + GOutputVector vector; + GSocketControlMessage *control_message; + gssize bytes_written; + GError *error; + + vector.buffer = data->blob; + vector.size = data->blob_size; + + control_message = NULL; + if (fd_list != NULL && g_unix_fd_list_get_length (fd_list) > 0) + { + if (!(data->worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) + { + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor but remote peer does not support this capability"); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + control_message = g_unix_fd_message_new_with_fd_list (fd_list); + } + + error = NULL; + bytes_written = g_socket_send_message (data->worker->socket, + NULL, /* address */ + &vector, + 1, + control_message != NULL ? &control_message : NULL, + control_message != NULL ? 1 : 0, + G_SOCKET_MSG_NONE, + data->worker->cancellable, + &error); + if (control_message != NULL) + g_object_unref (control_message); + + if (bytes_written == -1) + { + /* Handle WOULD_BLOCK by waiting until there's room in the buffer */ + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) + { + GSource *source; + source = g_socket_create_source (data->worker->socket, + G_IO_OUT | G_IO_HUP | G_IO_ERR, + data->worker->cancellable); + g_source_set_callback (source, + (GSourceFunc) on_socket_ready, + data, + NULL); /* GDestroyNotify */ + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + g_error_free (error); + goto out; + } + g_simple_async_result_take_error (simple, error); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + g_assert (bytes_written > 0); /* zero is never returned */ + + write_message_print_transport_debug (bytes_written, data); + + data->total_written += bytes_written; + g_assert (data->total_written <= data->blob_size); + if (data->total_written == data->blob_size) + { + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } + + write_message_continue_writing (data); + } #endif - else - { + else + { #ifdef G_OS_UNIX - if (fd_list != NULL) - { - g_simple_async_result_set_error (simple, - G_IO_ERROR, - G_IO_ERROR_FAILED, - "Tried sending a file descriptor on unsupported stream of type %s", - g_type_name (G_TYPE_FROM_INSTANCE (ostream))); - g_simple_async_result_complete (simple); - g_object_unref (simple); - goto out; - } + if (fd_list != NULL) + { + g_simple_async_result_set_error (simple, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Tried sending a file descriptor on unsupported stream of type %s", + g_type_name (G_TYPE_FROM_INSTANCE (ostream))); + g_simple_async_result_complete (simple); + g_object_unref (simple); + goto out; + } #endif - g_output_stream_write_async (ostream, - (const gchar *) data->blob + data->total_written, - data->blob_size - data->total_written, - G_PRIORITY_DEFAULT, - data->worker->cancellable, - write_message_async_cb, - data); + g_output_stream_write_async (ostream, + (const gchar *) data->blob + data->total_written, + data->blob_size - data->total_written, + G_PRIORITY_DEFAULT, + data->worker->cancellable, + write_message_async_cb, + data); + } +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) } +#endif + #ifdef G_OS_UNIX out: #endif @@ -1452,9 +1625,14 @@ continue_writing (GDBusWorker *worker) worker->close_expected = TRUE; worker->output_pending = PENDING_CLOSE; - g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, - NULL, iostream_close_cb, - _g_dbus_worker_ref (worker)); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + g_kdbus_connection_close (worker->stream, NULL, NULL); + else +#endif + g_io_stream_close_async (worker->stream, G_PRIORITY_DEFAULT, + NULL, iostream_close_cb, + _g_dbus_worker_ref (worker)); } else { @@ -1677,6 +1855,11 @@ _g_dbus_worker_new (GIOStream *stream, if (G_IS_SOCKET_CONNECTION (worker->stream)) worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); +#if defined (G_OS_UNIX) && (KDBUS_TRANSPORT) + if (G_IS_KDBUS_CONNECTION (worker->stream)) + worker->kdbus = g_kdbus_connection_get_kdbus (G_KDBUS_CONNECTION (worker->stream)); +#endif + worker->shared_thread_data = _g_dbus_shared_thread_ref (); /* begin reading */ @@ -2156,12 +2339,11 @@ write_message_print_transport_debug (gssize bytes_written, g_print ("========================================================================\n" "GDBus-debug:Transport:\n" " >>>> WROTE %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" - " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT " on a %s\n", + " size %" G_GSIZE_FORMAT " from offset %" G_GSIZE_FORMAT "\n", bytes_written, g_dbus_message_get_serial (data->message), data->blob_size, - data->total_written, - g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data->worker->stream)))); + data->total_written); _g_dbus_debug_print_unlock (); out: ; @@ -2207,12 +2389,11 @@ read_message_print_transport_debug (gssize bytes_read, g_print ("========================================================================\n" "GDBus-debug:Transport:\n" " <<<< READ %" G_GSIZE_FORMAT " bytes of message with serial %d and\n" - " size %d to offset %" G_GSIZE_FORMAT " from a %s\n", + " size %d to offset %" G_GSIZE_FORMAT "\n", bytes_read, serial, message_length, - worker->read_buffer_cur_size, - g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker->stream)))); + worker->read_buffer_cur_size); _g_dbus_debug_print_unlock (); out: ; |