diff options
Diffstat (limited to 'gio/gdbusprivate.c')
| -rw-r--r-- | gio/gdbusprivate.c | 1040 |
1 files changed, 1040 insertions, 0 deletions
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c new file mode 100644 index 000000000..adc6e12f3 --- /dev/null +++ b/gio/gdbusprivate.c @@ -0,0 +1,1040 @@ +/* GDBus - GLib D-Bus Library + * + * Copyright (C) 2008-2009 Red Hat, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General + * Public License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place, Suite 330, + * Boston, MA 02111-1307, USA. + * + * Author: David Zeuthen <davidz@redhat.com> + */ + +#include "config.h" + +#include <stdlib.h> +#include <string.h> + +#include <glib/gi18n.h> + +#ifdef G_OS_UNIX +#include <gio/gunixconnection.h> +#include <gio/gunixfdmessage.h> +#include "gunixcredentialsmessage.h" +#include <unistd.h> +#endif + +#include "giotypes.h" +#include "gdbusprivate.h" +#include "gdbusmessage.h" +#include "gdbuserror.h" +#include "gdbusintrospection.h" + +/* ---------------------------------------------------------------------------------------------------- */ + +static gchar * +hexdump (const gchar *data, gsize len, guint indent) +{ + guint n, m; + GString *ret; + + ret = g_string_new (NULL); + + for (n = 0; n < len; n += 16) + { + g_string_append_printf (ret, "%*s%04x: ", indent, "", n); + + for (m = n; m < n + 16; m++) + { + if (m > n && (m%4) == 0) + g_string_append_c (ret, ' '); + if (m < len) + g_string_append_printf (ret, "%02x ", (guchar) data[m]); + else + g_string_append (ret, " "); + } + + g_string_append (ret, " "); + + for (m = n; m < len && m < n + 16; m++) + g_string_append_c (ret, g_ascii_isprint (data[m]) ? data[m] : '.'); + + g_string_append_c (ret, '\n'); + } + + return g_string_free (ret, FALSE); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* Unfortunately ancillary messages are discarded when reading from a + * socket using the GSocketInputStream abstraction. So we provide a + * very GInputStream-ish API that uses GSocket in this case (very + * similar to GSocketInputStream). + */ + +typedef struct +{ + GSocket *socket; + GCancellable *cancellable; + + void *buffer; + gsize count; + + GSocketControlMessage ***messages; + gint *num_messages; + + GSimpleAsyncResult *simple; + + gboolean from_mainloop; +} ReadWithControlData; + +static void +read_with_control_data_free (ReadWithControlData *data) +{ + g_object_unref (data->socket); + if (data->cancellable != NULL) + g_object_unref (data->cancellable); + g_free (data); +} + +static gboolean +_g_socket_read_with_control_messages_ready (GSocket *socket, + GIOCondition condition, + gpointer user_data) +{ + ReadWithControlData *data = user_data; + GError *error; + gssize result; + GInputVector vector; + + error = NULL; + vector.buffer = data->buffer; + vector.size = data->count; + result = g_socket_receive_message (data->socket, + NULL, /* address */ + &vector, + 1, + data->messages, + data->num_messages, + NULL, + data->cancellable, + &error); + if (result >= 0) + { + g_simple_async_result_set_op_res_gssize (data->simple, result); + } + else + { + g_assert (error != NULL); + g_simple_async_result_set_from_error (data->simple, error); + g_error_free (error); + } + + if (data->from_mainloop) + g_simple_async_result_complete (data->simple); + else + g_simple_async_result_complete_in_idle (data->simple); + + return FALSE; +} + +static void +_g_socket_read_with_control_messages (GSocket *socket, + void *buffer, + gsize count, + GSocketControlMessage ***messages, + gint *num_messages, + gint io_priority, + GCancellable *cancellable, + GAsyncReadyCallback callback, + gpointer user_data) +{ + ReadWithControlData *data; + + data = g_new0 (ReadWithControlData, 1); + data->socket = g_object_ref (socket); + data->cancellable = cancellable != NULL ? g_object_ref (cancellable) : NULL; + data->buffer = buffer; + data->count = count; + data->messages = messages; + data->num_messages = num_messages; + + data->simple = g_simple_async_result_new (G_OBJECT (socket), + callback, + user_data, + _g_socket_read_with_control_messages); + + if (!g_socket_condition_check (socket, G_IO_IN)) + { + GSource *source; + data->from_mainloop = TRUE; + source = g_socket_create_source (data->socket, + G_IO_IN | G_IO_HUP | G_IO_ERR, + cancellable); + g_source_set_callback (source, + (GSourceFunc) _g_socket_read_with_control_messages_ready, + data, + (GDestroyNotify) read_with_control_data_free); + g_source_attach (source, g_main_context_get_thread_default ()); + g_source_unref (source); + } + else + { + _g_socket_read_with_control_messages_ready (data->socket, G_IO_IN, data); + read_with_control_data_free (data); + } +} + +static gssize +_g_socket_read_with_control_messages_finish (GSocket *socket, + GAsyncResult *result, + GError **error) +{ + GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result); + + g_return_val_if_fail (G_IS_SOCKET (socket), -1); + g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _g_socket_read_with_control_messages); + + if (g_simple_async_result_propagate_error (simple, error)) + return -1; + else + return g_simple_async_result_get_op_res_gssize (simple); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +G_LOCK_DEFINE_STATIC (shared_thread_lock); + +typedef struct +{ + gint num_users; + GThread *thread; + GMainContext *context; + GMainLoop *loop; +} SharedThreadData; + +static SharedThreadData *shared_thread_data = NULL; + +static gpointer +shared_thread_func (gpointer data) +{ + g_main_context_push_thread_default (shared_thread_data->context); + g_main_loop_run (shared_thread_data->loop); + g_main_context_pop_thread_default (shared_thread_data->context); + return NULL; +} + +typedef void (*GDBusSharedThreadFunc) (gpointer user_data); + +typedef struct +{ + GDBusSharedThreadFunc func; + gpointer user_data; + gboolean done; +} CallerData; + +static gboolean +invoke_caller (gpointer user_data) +{ + CallerData *data = user_data; + data->func (data->user_data); + data->done = TRUE; + return FALSE; +} + +static void +_g_dbus_shared_thread_ref (GDBusSharedThreadFunc func, + gpointer user_data) +{ + GError *error; + GSource *idle_source; + CallerData *data; + + G_LOCK (shared_thread_lock); + + if (shared_thread_data != NULL) + { + shared_thread_data->num_users += 1; + goto have_thread; + } + + shared_thread_data = g_new0 (SharedThreadData, 1); + shared_thread_data->num_users = 1; + + error = NULL; + shared_thread_data->context = g_main_context_new (); + shared_thread_data->loop = g_main_loop_new (shared_thread_data->context, FALSE); + shared_thread_data->thread = g_thread_create (shared_thread_func, + NULL, + TRUE, + &error); + g_assert_no_error (error); + + have_thread: + + data = g_new0 (CallerData, 1); + data->func = func; + data->user_data = user_data; + data->done = FALSE; + + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + invoke_caller, + data, + NULL); + g_source_attach (idle_source, shared_thread_data->context); + g_source_unref (idle_source); + + /* wait for the user code to run.. hmm.. probably use a condition variable instead */ + while (!data->done) + g_thread_yield (); + + g_free (data); + + G_UNLOCK (shared_thread_lock); +} + +static void +_g_dbus_shared_thread_unref (void) +{ + /* TODO: actually destroy the shared thread here */ +#if 0 + G_LOCK (shared_thread_lock); + g_assert (shared_thread_data != NULL); + shared_thread_data->num_users -= 1; + if (shared_thread_data->num_users == 0) + { + g_main_loop_quit (shared_thread_data->loop); + //g_thread_join (shared_thread_data->thread); + g_main_loop_unref (shared_thread_data->loop); + g_main_context_unref (shared_thread_data->context); + g_free (shared_thread_data); + shared_thread_data = NULL; + G_UNLOCK (shared_thread_lock); + } + else + { + G_UNLOCK (shared_thread_lock); + } +#endif +} + +/* ---------------------------------------------------------------------------------------------------- */ + +struct GDBusWorker +{ + volatile gint ref_count; + gboolean stopped; + GIOStream *stream; + GDBusCapabilityFlags capabilities; + GCancellable *cancellable; + GDBusWorkerMessageReceivedCallback message_received_callback; + GDBusWorkerDisconnectedCallback disconnected_callback; + gpointer user_data; + + GThread *thread; + + /* if not NULL, stream is GSocketConnection */ + GSocket *socket; + + /* used for reading */ + GMutex *read_lock; + gchar *read_buffer; + gsize read_buffer_allocated_size; + gsize read_buffer_cur_size; + gsize read_buffer_bytes_wanted; + GUnixFDList *read_fd_list; + GSocketControlMessage **read_ancillary_messages; + gint read_num_ancillary_messages; + + /* used for writing */ + GMutex *write_lock; + GQueue *write_queue; + gboolean write_is_pending; +}; + +struct _MessageToWriteData ; +typedef struct _MessageToWriteData MessageToWriteData; + +static void message_to_write_data_free (MessageToWriteData *data); + +static GDBusWorker * +_g_dbus_worker_ref (GDBusWorker *worker) +{ + g_atomic_int_inc (&worker->ref_count); + return worker; +} + +static void +_g_dbus_worker_unref (GDBusWorker *worker) +{ + if (g_atomic_int_dec_and_test (&worker->ref_count)) + { + _g_dbus_shared_thread_unref (); + + g_object_unref (worker->stream); + + g_mutex_free (worker->read_lock); + g_object_unref (worker->cancellable); + if (worker->read_fd_list != NULL) + g_object_unref (worker->read_fd_list); + + g_mutex_free (worker->write_lock); + g_queue_foreach (worker->write_queue, + (GFunc) message_to_write_data_free, + NULL); + g_queue_free (worker->write_queue); + g_free (worker); + } +} + +static void +_g_dbus_worker_emit_disconnected (GDBusWorker *worker, + gboolean remote_peer_vanished, + GError *error) +{ + if (!worker->stopped) + worker->disconnected_callback (worker, remote_peer_vanished, error, worker->user_data); +} + +static void +_g_dbus_worker_emit_message (GDBusWorker *worker, + GDBusMessage *message) +{ + if (!worker->stopped) + worker->message_received_callback (worker, message, worker->user_data); +} + +static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker); + +/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ +static void +_g_dbus_worker_do_read_cb (GInputStream *input_stream, + GAsyncResult *res, + gpointer user_data) +{ + GDBusWorker *worker = user_data; + GError *error; + gssize bytes_read; + + g_mutex_lock (worker->read_lock); + + /* If already stopped, don't even process the reply */ + if (worker->stopped) + goto out; + + error = NULL; + if (worker->socket == NULL) + bytes_read = g_input_stream_read_finish (g_io_stream_get_input_stream (worker->stream), + res, + &error); + else + bytes_read = _g_socket_read_with_control_messages_finish (worker->socket, + res, + &error); + if (worker->read_num_ancillary_messages > 0) + { + gint n; + for (n = 0; n < worker->read_num_ancillary_messages; n++) + { + GSocketControlMessage *control_message = G_SOCKET_CONTROL_MESSAGE (worker->read_ancillary_messages[n]); + + if (FALSE) + { + } +#ifdef G_OS_UNIX + else if (G_IS_UNIX_FD_MESSAGE (control_message)) + { + GUnixFDMessage *fd_message; + gint *fds; + gint num_fds; + + fd_message = G_UNIX_FD_MESSAGE (control_message); + fds = g_unix_fd_message_steal_fds (fd_message, &num_fds); + if (worker->read_fd_list == NULL) + { + worker->read_fd_list = g_unix_fd_list_new_from_array (fds, num_fds); + } + else + { + gint n; + for (n = 0; n < num_fds; n++) + { + /* TODO: really want a append_steal() */ + g_unix_fd_list_append (worker->read_fd_list, fds[n], NULL); + close (fds[n]); + } + } + g_free (fds); + } + else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message)) + { + /* do nothing */ + } +#endif + else + { + if (error == NULL) + { + g_set_error (&error, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Unexpected ancillary message of type %s received from peer", + g_type_name (G_TYPE_FROM_INSTANCE (control_message))); + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + g_object_unref (control_message); + n++; + while (n < worker->read_num_ancillary_messages) + g_object_unref (worker->read_ancillary_messages[n++]); + g_free (worker->read_ancillary_messages); + goto out; + } + } + g_object_unref (control_message); + } + g_free (worker->read_ancillary_messages); + } + + if (bytes_read == -1) + { + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + goto out; + } + +#if 0 + g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p", + (gint) bytes_read, + g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))), + g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream))), + g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)), + G_IO_IN | G_IO_OUT | G_IO_HUP), + worker->stream, + worker); +#endif + + /* TODO: hmm, hmm... */ + if (bytes_read == 0) + { + g_set_error (&error, + G_IO_ERROR, + G_IO_ERROR_FAILED, + "Underlying GIOStream returned 0 bytes on an async read"); + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + goto out; + } + + worker->read_buffer_cur_size += bytes_read; + if (worker->read_buffer_bytes_wanted == worker->read_buffer_cur_size) + { + /* OK, got what we asked for! */ + if (worker->read_buffer_bytes_wanted == 16) + { + gssize message_len; + /* OK, got the header - determine how many more bytes are needed */ + error = NULL; + message_len = g_dbus_message_bytes_needed ((guchar *) worker->read_buffer, + 16, + &error); + if (message_len == -1) + { + g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error->message); + _g_dbus_worker_emit_disconnected (worker, FALSE, error); + g_error_free (error); + goto out; + } + + worker->read_buffer_bytes_wanted = message_len; + _g_dbus_worker_do_read_unlocked (worker); + } + else + { + GDBusMessage *message; + error = NULL; + + /* TODO: use connection->priv->auth to decode the message */ + + message = g_dbus_message_new_from_blob ((guchar *) worker->read_buffer, + worker->read_buffer_cur_size, + &error); + if (message == NULL) + { + _g_dbus_worker_emit_disconnected (worker, FALSE, error); + g_error_free (error); + goto out; + } + + if (worker->read_fd_list != NULL) + { + g_dbus_message_set_unix_fd_list (message, worker->read_fd_list); + worker->read_fd_list = NULL; + } + + if (G_UNLIKELY (_g_dbus_debug_message ())) + { + gchar *s; + g_print ("========================================================================\n" + "GDBus-debug:Message:\n" + " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", + worker->read_buffer_cur_size); + s = g_dbus_message_print (message, 2); + g_print ("%s", s); + g_free (s); + s = hexdump (worker->read_buffer, worker->read_buffer_cur_size, 2); + g_print ("%s\n", s); + g_free (s); + } + + /* yay, got a message, go deliver it */ + _g_dbus_worker_emit_message (worker, message); + g_object_unref (message); + + /* start reading another message! */ + worker->read_buffer_bytes_wanted = 0; + worker->read_buffer_cur_size = 0; + _g_dbus_worker_do_read_unlocked (worker); + } + } + else + { + /* didn't get all the bytes we requested - so repeat the request... */ + _g_dbus_worker_do_read_unlocked (worker); + } + + out: + g_mutex_unlock (worker->read_lock); + + /* gives up the reference acquired when calling g_input_stream_read_async() */ + _g_dbus_worker_unref (worker); +} + +/* called in private thread shared by all GDBusConnection instances (with read-lock held) */ +static void +_g_dbus_worker_do_read_unlocked (GDBusWorker *worker) +{ + /* if bytes_wanted is zero, it means start reading a message */ + if (worker->read_buffer_bytes_wanted == 0) + { + worker->read_buffer_cur_size = 0; + worker->read_buffer_bytes_wanted = 16; + } + + /* ensure we have a (big enough) buffer */ + if (worker->read_buffer == NULL || worker->read_buffer_bytes_wanted > worker->read_buffer_allocated_size) + { + /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */ + worker->read_buffer_allocated_size = MAX (worker->read_buffer_bytes_wanted, 4096); + worker->read_buffer = g_realloc (worker->read_buffer, worker->read_buffer_allocated_size); + } + + if (worker->socket == NULL) + g_input_stream_read_async (g_io_stream_get_input_stream (worker->stream), + worker->read_buffer + worker->read_buffer_cur_size, + worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size, + G_PRIORITY_DEFAULT, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + else + { + worker->read_ancillary_messages = NULL; + worker->read_num_ancillary_messages = 0; + _g_socket_read_with_control_messages (worker->socket, + worker->read_buffer + worker->read_buffer_cur_size, + worker->read_buffer_bytes_wanted - worker->read_buffer_cur_size, + &worker->read_ancillary_messages, + &worker->read_num_ancillary_messages, + G_PRIORITY_DEFAULT, + worker->cancellable, + (GAsyncReadyCallback) _g_dbus_worker_do_read_cb, + _g_dbus_worker_ref (worker)); + } +} + +/* called in private thread shared by all GDBusConnection instances (without read-lock held) */ +static void +_g_dbus_worker_do_read (GDBusWorker *worker) +{ + g_mutex_lock (worker->read_lock); + _g_dbus_worker_do_read_unlocked (worker); + g_mutex_unlock (worker->read_lock); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +struct _MessageToWriteData +{ + GDBusMessage *message; + gchar *blob; + gsize blob_size; +}; + +static void +message_to_write_data_free (MessageToWriteData *data) +{ + g_object_unref (data->message); + g_free (data->blob); + g_free (data); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* called in private thread shared by all GDBusConnection instances (with write-lock held) */ +static gboolean +write_message (GDBusWorker *worker, + MessageToWriteData *data, + GError **error) +{ + gboolean ret; + + g_return_val_if_fail (data->blob_size > 16, FALSE); + + ret = FALSE; + + /* First, the initial 16 bytes - special case UNIX sockets here + * since it may involve writing an ancillary message with file + * descriptors + */ +#ifdef G_OS_UNIX + { + GOutputVector vector; + GSocketControlMessage *message; + GUnixFDList *fd_list; + gssize bytes_written; + + fd_list = g_dbus_message_get_unix_fd_list (data->message); + + message = NULL; + if (fd_list != NULL) + { + if (!G_IS_UNIX_CONNECTION (worker->stream)) + { + g_set_error (error, + G_IO_ERROR, + G_IO_ERROR_INVALID_ARGUMENT, + "Tried sending a file descriptor on unsupported stream of type %s", + g_type_name (G_TYPE_FROM_INSTANCE (worker->stream))); + goto out; + } + else if (!(worker->capabilities & G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING)) + { + g_set_error_literal (error, + G_IO_ERROR, + G_IO_ERROR_INVALID_ARGUMENT, + "Tried sending a file descriptor but remote peer does not support this capability"); + goto out; + } + message = g_unix_fd_message_new_with_fd_list (fd_list); + } + + vector.buffer = data->blob; + vector.size = 16; + + bytes_written = g_socket_send_message (worker->socket, + NULL, /* address */ + &vector, + 1, + message != NULL ? &message : NULL, + message != NULL ? 1 : 0, + G_SOCKET_MSG_NONE, + worker->cancellable, + error); + if (bytes_written == -1) + { + g_prefix_error (error, _("Error writing first 16 bytes of message to socket: ")); + if (message != NULL) + g_object_unref (message); + goto out; + } + if (message != NULL) + g_object_unref (message); + + if (bytes_written < 16) + { + /* TODO: I think this needs to be handled ... are we guaranteed that the ancillary + * messages are sent? + */ + g_assert_not_reached (); + } + } +#else + /* write the first 16 bytes (guaranteed to return an error if everything can't be written) */ + if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), + (const gchar *) data->blob, + 16, + NULL, /* bytes_written */ + worker->cancellable, /* cancellable */ + error)) + goto out; +#endif + + /* Then write the rest of the message (guaranteed to return an error if everything can't be written) */ + if (!g_output_stream_write_all (g_io_stream_get_output_stream (worker->stream), + (const gchar *) data->blob + 16, + data->blob_size - 16, + NULL, /* bytes_written */ + worker->cancellable, /* cancellable */ + error)) + goto out; + + ret = TRUE; + + if (G_UNLIKELY (_g_dbus_debug_message ())) + { + gchar *s; + g_print ("========================================================================\n" + "GDBus-debug:Message:\n" + " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT " bytes)\n", + data->blob_size); + s = g_dbus_message_print (data->message, 2); + g_print ("%s", s); + g_free (s); + s = hexdump (data->blob, data->blob_size, 2); + g_print ("%s\n", s); + g_free (s); + } + + out: + return ret; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* called in private thread shared by all GDBusConnection instances (without write-lock held) */ +static gboolean +write_message_in_idle_cb (gpointer user_data) +{ + GDBusWorker *worker = user_data; + gboolean more_writes_are_pending; + MessageToWriteData *data; + GError *error; + + g_mutex_lock (worker->write_lock); + + data = g_queue_pop_head (worker->write_queue); + g_assert (data != NULL); + + error = NULL; + if (!write_message (worker, + data, + &error)) + { + /* TODO: handle */ + _g_dbus_worker_emit_disconnected (worker, TRUE, error); + g_error_free (error); + } + message_to_write_data_free (data); + + more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0); + + worker->write_is_pending = more_writes_are_pending; + g_mutex_unlock (worker->write_lock); + + return more_writes_are_pending; +} + +/* ---------------------------------------------------------------------------------------------------- */ + +/* can be called from any thread - steals blob */ +void +_g_dbus_worker_send_message (GDBusWorker *worker, + GDBusMessage *message, + gchar *blob, + gsize blob_len) +{ + MessageToWriteData *data; + + g_return_if_fail (G_IS_DBUS_MESSAGE (message)); + g_return_if_fail (blob != NULL); + g_return_if_fail (blob_len > 16); + + data = g_new0 (MessageToWriteData, 1); + data->message = g_object_ref (message); + data->blob = blob; /* steal! */ + data->blob_size = blob_len; + + g_mutex_lock (worker->write_lock); + g_queue_push_tail (worker->write_queue, data); + if (!worker->write_is_pending) + { + GSource *idle_source; + + worker->write_is_pending = TRUE; + + idle_source = g_idle_source_new (); + g_source_set_priority (idle_source, G_PRIORITY_DEFAULT); + g_source_set_callback (idle_source, + write_message_in_idle_cb, + _g_dbus_worker_ref (worker), + (GDestroyNotify) _g_dbus_worker_unref); + g_source_attach (idle_source, shared_thread_data->context); + g_source_unref (idle_source); + } + g_mutex_unlock (worker->write_lock); +} + +/* ---------------------------------------------------------------------------------------------------- */ + +static void +_g_dbus_worker_thread_begin_func (gpointer user_data) +{ + GDBusWorker *worker = user_data; + + worker->thread = g_thread_self (); + + /* begin reading */ + _g_dbus_worker_do_read (worker); +} + +GDBusWorker * +_g_dbus_worker_new (GIOStream *stream, + GDBusCapabilityFlags capabilities, + GDBusWorkerMessageReceivedCallback message_received_callback, + GDBusWorkerDisconnectedCallback disconnected_callback, + gpointer user_data) +{ + GDBusWorker *worker; + + g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL); + g_return_val_if_fail (message_received_callback != NULL, NULL); + g_return_val_if_fail (disconnected_callback != NULL, NULL); + + worker = g_new0 (GDBusWorker, 1); + worker->ref_count = 1; + + worker->read_lock = g_mutex_new (); + worker->message_received_callback = message_received_callback; + worker->disconnected_callback = disconnected_callback; + worker->user_data = user_data; + worker->stream = g_object_ref (stream); + worker->capabilities = capabilities; + worker->cancellable = g_cancellable_new (); + + worker->write_lock = g_mutex_new (); + worker->write_queue = g_queue_new (); + + if (G_IS_SOCKET_CONNECTION (worker->stream)) + worker->socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker->stream)); + + _g_dbus_shared_thread_ref (_g_dbus_worker_thread_begin_func, worker); + + return worker; +} + +/* This can be called from any thread - frees worker - guarantees no callbacks + * will ever be issued again + */ +void +_g_dbus_worker_stop (GDBusWorker *worker) +{ + /* If we're called in the worker thread it means we are called from + * a worker callback. This is fine, we just can't lock in that case since + * we're already holding the lock... + */ + if (g_thread_self () != worker->thread) + g_mutex_lock (worker->read_lock); + worker->stopped = TRUE; + if (g_thread_self () != worker->thread) + g_mutex_unlock (worker->read_lock); + + g_cancellable_cancel (worker->cancellable); + _g_dbus_worker_unref (worker); +} + +#define G_DBUS_DEBUG_AUTHENTICATION (1<<0) +#define G_DBUS_DEBUG_MESSAGE (1<<1) +#define G_DBUS_DEBUG_ALL 0xffffffff +static gint _gdbus_debug_flags = 0; + +gboolean +_g_dbus_debug_authentication (void) +{ + _g_dbus_initialize (); + return (_gdbus_debug_flags & G_DBUS_DEBUG_AUTHENTICATION) != 0; +} + +gboolean +_g_dbus_debug_message (void) +{ + _g_dbus_initialize (); + return (_gdbus_debug_flags & G_DBUS_DEBUG_MESSAGE) != 0; +} + +/** + * _g_dbus_initialize: + * + * Does various one-time init things such as + * + * - registering the G_DBUS_ERROR error domain + * - parses the G_DBUS_DEBUG environment variable + */ +void +_g_dbus_initialize (void) +{ + static volatile gsize initialized = 0; + + if (g_once_init_enter (&initialized)) + { + volatile GQuark g_dbus_error_domain; + const gchar *debug; + + g_dbus_error_domain = G_DBUS_ERROR; + + debug = g_getenv ("G_DBUS_DEBUG"); + if (debug != NULL) + { + gchar **tokens; + guint n; + tokens = g_strsplit (debug, ",", 0); + for (n = 0; tokens[n] != NULL; n++) + { + if (g_strcmp0 (tokens[n], "authentication") == 0) + _gdbus_debug_flags |= G_DBUS_DEBUG_AUTHENTICATION; + else if (g_strcmp0 (tokens[n], "message") == 0) + _gdbus_debug_flags |= G_DBUS_DEBUG_MESSAGE; + else if (g_strcmp0 (tokens[n], "all") == 0) + _gdbus_debug_flags |= G_DBUS_DEBUG_ALL; + } + g_strfreev (tokens); + } + + g_once_init_leave (&initialized, 1); + } +} + +/* ---------------------------------------------------------------------------------------------------- */ + +gchar * +_g_dbus_compute_complete_signature (GDBusArgInfo **args, + gboolean include_parentheses) +{ + GString *s; + guint n; + + if (include_parentheses) + s = g_string_new ("("); + else + s = g_string_new (""); + if (args != NULL) + for (n = 0; args[n] != NULL; n++) + g_string_append (s, args[n]->signature); + + if (include_parentheses) + g_string_append_c (s, ')'); + + return g_string_free (s, FALSE); +} |
