diff options
author | Nicolas Dufresne <nicolas.dufresne@collabora.co.uk> | 2011-03-14 16:57:18 -0400 |
---|---|---|
committer | Nicolas Dufresne <nicolas.dufresne@collabora.co.uk> | 2011-03-14 16:57:18 -0400 |
commit | ab912305daaaa5bdc5029c21347641646ad2068c (patch) | |
tree | 3c7d514b013d76cd6eefb03a673700ea2276fa56 /telepathy-logger/text-channel.c | |
parent | b07dfc15a92c3e6c435508f036a4dbec104411de (diff) | |
download | telepathy-logger-ab912305daaaa5bdc5029c21347641646ad2068c.tar.gz |
Removing pending message caching
Diffstat (limited to 'telepathy-logger/text-channel.c')
-rw-r--r-- | telepathy-logger/text-channel.c | 467 |
1 files changed, 2 insertions, 465 deletions
diff --git a/telepathy-logger/text-channel.c b/telepathy-logger/text-channel.c index 6e7cca6..8a84512 100644 --- a/telepathy-logger/text-channel.c +++ b/telepathy-logger/text-channel.c @@ -33,7 +33,6 @@ #include "entity-internal.h" #include "event-internal.h" #include "log-manager-internal.h" -#include "log-store-sqlite-internal.h" #include "observer-internal.h" #include "text-event.h" #include "text-event-internal.h" @@ -421,9 +420,7 @@ on_received_signal_cb (TpChannel *proxy, TplTextChannel *tpl_text = TPL_TEXT_CHANNEL (proxy); TpConnection *tp_conn; TplEntity *remote; - TplLogStore *index = _tpl_log_store_sqlite_dup (); const gchar *channel_path = tp_proxy_get_object_path (TP_PROXY (tpl_text)); - gchar *log_id; ReceivedData *data; /* TODO use the Message iface to check the delivery @@ -442,29 +439,9 @@ on_received_signal_cb (TpChannel *proxy, return; } - /* Check if log_id has already been logged - * - * FIXME: There is a race condition for which, right after a 'NewChannel' - * signal is raised and a message is received, the 'received' signal handler - * may be cateched before or being slower and arriving after the TplChannel - * preparation (in which pending message list is examined) - * - * Workaround: - * In the first case the analisys of P.M.L will detect that actually the - * handler has already received and logged the message. - * In the latter (here), the handler will detect that the P.M.L analisys - * has found and logged it, returning immediatly */ - log_id = _tpl_create_message_token (channel_path, timestamp, msg_id); - if (_tpl_log_store_sqlite_log_id_is_present (index, log_id)) - { - PATH_DEBUG (tpl_text, "%s found, not logging", log_id); - g_free (log_id); - goto out; - } - data = g_slice_new0 (ReceivedData); data->msg_id = msg_id; - data->log_id = log_id; + data->log_id = _tpl_create_message_token (channel_path, timestamp, msg_id); data->type = type; data->text = g_strdup (text); data->timestamp = timestamp; @@ -485,9 +462,6 @@ on_received_signal_cb (TpChannel *proxy, keepon_on_receiving_signal (tpl_text, remote, data); received_data_free (data); } - -out: - g_object_unref (index); } @@ -609,39 +583,6 @@ on_lost_message_cb (TpChannel *proxy, } -/* Signal's Callbacks */ -static void -on_pending_messages_removed_cb (TpChannel *proxy, - const GArray *message_ids, - gpointer user_data, - GObject *weak_object) -{ - TplLogStore *cache = _tpl_log_store_sqlite_dup (); - guint i; - GError *error = NULL; - - for (i = 0; i < message_ids->len; ++i) - { - guint msg_id = g_array_index (message_ids, guint, i); - _tpl_log_store_sqlite_set_acknowledgment_by_msg_id (cache, proxy, msg_id, - &error); - if (error != NULL) - { - PATH_DEBUG (proxy, "cannot set the ACK flag for msg_id %u: %s", - msg_id, error->message); - g_clear_error (&error); - } - else - { - PATH_DEBUG (proxy, "msg_id %d acknowledged", msg_id); - } - } - - if (cache != NULL) - g_object_unref (cache); -} - - static void pendingproc_connect_message_signals (TplActionChain *ctx, gpointer user_data) @@ -669,13 +610,6 @@ pendingproc_connect_message_signals (TplActionChain *ctx, on_lost_message_cb, tpl_text, NULL, NULL, &error) == NULL) goto disaster; - if (tp_proxy_has_interface_by_id (tpl_text, - TP_IFACE_QUARK_CHANNEL_INTERFACE_MESSAGES) && - tp_cli_channel_interface_messages_connect_to_pending_messages_removed ( - channel, on_pending_messages_removed_cb, NULL, NULL, - G_OBJECT (tpl_text), &error) == NULL) - goto disaster; - _tpl_action_chain_continue (ctx); return; @@ -686,398 +620,6 @@ disaster: } -/* Clean up passed messages (GList of tokens), which are known to be stale, - * setting them acknowledged in SQLite */ -static void -tpl_text_channel_clean_up_stale_tokens (TplTextChannel *self, - GList *stale_tokens) -{ - TplLogStore *cache = _tpl_log_store_sqlite_dup (); - GError *loc_error = NULL; - - for (; stale_tokens != NULL; stale_tokens = g_list_next (stale_tokens)) - { - gchar *log_id = stale_tokens->data; - - _tpl_log_store_sqlite_set_acknowledgment (cache, log_id, &loc_error); - - if (loc_error != NULL) - { - PATH_CRITICAL (self, "Unable to set %s as acknoledged in " - "TPL DB: %s", log_id, loc_error->message); - g_clear_error (&loc_error); - } - } - - if (cache != NULL) - g_object_unref (cache); -} - - -/* PendingMessages CB for Message interface */ -static void -got_message_pending_messages_cb (TpProxy *proxy, - const GValue *out_Value, - const GError *error, - gpointer user_data, - GObject *weak_object) -{ - const gchar *channel_path = tp_proxy_get_object_path (proxy); - TplLogStore *cache = _tpl_log_store_sqlite_dup (); - TplActionChain *ctx = user_data; - GPtrArray *result = NULL; - GList *cached_pending_msgs = NULL; - GError *loc_error = NULL; - guint i; - - if (!TPL_IS_TEXT_CHANNEL (proxy)) - { - CRITICAL ("Passed proxy is not a proper TplTextChannel"); - goto out; - } - - if (!TPL_IS_TEXT_CHANNEL (weak_object)) - { - CRITICAL ("Passed weak_object is not a proper TplTextChannel"); - goto out; - } - - if (error != NULL) - { - PATH_CRITICAL (weak_object, "retrieving messages for Message iface: %s", - error->message); - goto out; - } - - /* It's aaa{vs}, a list of message each containing a list of message's parts - * each containing a dictioanry k:v */ - result = g_value_get_boxed (out_Value); - - /* getting messages ids known to be pending at last TPL exit */ - cached_pending_msgs = _tpl_log_store_sqlite_get_pending_messages (cache, - TP_CHANNEL (proxy), &loc_error); - if (loc_error != NULL) - { - CRITICAL ("Unable to obtain pending messages stored in TPL DB: %s", - loc_error->message); - goto out; - } - - /* cycle the list of messages */ - if (result->len > 0) - PATH_DEBUG (proxy, "Checking if there are any un-logged messages among " - "%d pending messages", result->len); - for (i = 0; i < result->len; ++i) - { - GPtrArray *message_parts; - GHashTable *message_headers; /* string:gvalue */ - GHashTable *message_part; /* string:gvalue */ - GList *l; - const gchar *message_token; - gchar *tpl_message_token; - gint64 message_timestamp; - guint message_type = TP_CHANNEL_TEXT_MESSAGE_TYPE_NORMAL; - guint message_flags = 0; - guint message_id; - TpHandle message_sender_handle; - const gchar *message_body; - gboolean valid; - - /* list of message's parts */ - message_parts = g_ptr_array_index (result, i); - /* message part 0 is the message's headers */ - message_headers = g_ptr_array_index (message_parts, 0); - /* message part 1 is is the first part, the most 'faithful' among - * alternatives. - * TODO fully support alternatives and attachments/images - * related to them */ - message_part = g_ptr_array_index (message_parts, 1); - message_token = tp_asv_get_string (message_headers, "message-token"); - message_id = tp_asv_get_uint32 (message_headers, "pending-message-id", - &valid); - if (!valid) - { - DEBUG ("pending-message-id not in a valid range, setting to " - "UNKNOWN"); - message_id = TPL_TEXT_EVENT_MSG_ID_UNKNOWN; - } - message_timestamp = tp_asv_get_int64 (message_headers, - "message-received", NULL); - - tpl_message_token = _tpl_create_message_token (channel_path, - message_timestamp, message_id); - - message_sender_handle = tp_asv_get_uint32 (message_headers, - "message-sender", NULL); - - message_type = tp_asv_get_uint32 (message_headers, "message-type", - &valid); - if (!valid) - { - PATH_DEBUG (proxy, "message-type not in a valid range, falling " - "back to type=NORMAL"); - message_type = TP_CHANNEL_TEXT_MESSAGE_TYPE_NORMAL; - } - - if (tp_asv_get_boolean (message_headers, "rescued", &valid) && valid) - message_flags |= TP_CHANNEL_TEXT_MESSAGE_FLAG_RESCUED; - - if (tp_asv_get_boolean (message_headers, "scrollback", NULL) && valid) - message_flags |= TP_CHANNEL_TEXT_MESSAGE_FLAG_SCROLLBACK; - - message_body = tp_asv_get_string (message_part, "content"); - - /* log only log-ids not in cached_pending_msgs -> not already logged */ - l = g_list_find_custom (cached_pending_msgs, tpl_message_token, - (GCompareFunc) g_strcmp0); - - if (l == NULL) - { - /* call the received signal callback to trigger the message storing */ - /* FIXME Avoid converting gint64 timestamp into guint timestamp */ - on_received_signal_cb (TP_CHANNEL (proxy), - message_id, (guint) message_timestamp, message_sender_handle, - message_type, message_flags, message_body, - NULL, NULL); - } - else - { - /* the message has been already logged, remove it from the list so - * that, in the end of the loop, the items still in - * cached_pending_msgs can be considered stale */ - g_free (l->data); - cached_pending_msgs = g_list_delete_link (cached_pending_msgs, l); - } - - g_free (tpl_message_token); - } - - /* At this point all remaining elements of cached_pending_msgs are those - * that the TplLogStoreSqlite knew as pending but currently not - * listed as such in the current pending message list -> stale */ - tpl_text_channel_clean_up_stale_tokens (TPL_TEXT_CHANNEL (proxy), - cached_pending_msgs); - while (cached_pending_msgs != NULL) - { - PATH_DEBUG (proxy, "%s is stale, removed from DB", - (gchar *) cached_pending_msgs->data); - - g_free (cached_pending_msgs->data); - cached_pending_msgs = g_list_delete_link (cached_pending_msgs, - cached_pending_msgs); - } - -out: - if (cache != NULL) - g_object_unref (cache); - - if (loc_error != NULL) - g_error_free (loc_error); - -/* If an error occured, do not terminate(), just have it logged. - * terminate() would be fatal for TplChannel preparation, - * but in this case it would just mean that it couldn't retrieve pending - * messages, but it might still log the rest. If the next operation in chain - * fails, it's fatal. Partial data loss is better than total data loss */ - _tpl_action_chain_continue (ctx); -} - - -/* PendingMessages CB for Text interface */ -static void -got_text_pending_messages_cb (TpChannel *proxy, - const GPtrArray *result, - const GError *error, - gpointer user_data, - GObject *weak_object) -{ - TplLogStore *cache = _tpl_log_store_sqlite_dup (); - TplActionChain *ctx = user_data; - GList *cached_pending_msgs, *l; - const gchar *channel_path; - GError *loc_error = NULL; - guint i; - - if (error != NULL) - { - PATH_CRITICAL (proxy, "retrieving pending messages for Text iface: %s", - error->message); - _tpl_action_chain_terminate (ctx); - return; - } - - channel_path = tp_proxy_get_object_path (proxy); - - /* getting messages ids known to be pending at last TPL exit */ - cached_pending_msgs = _tpl_log_store_sqlite_get_pending_messages (cache, - TP_CHANNEL (proxy), &loc_error); - - if (loc_error != NULL) - { - PATH_CRITICAL (proxy, - "Unable to obtain pending messages stored in TPL DB: %s", - loc_error->message); - _tpl_action_chain_terminate (ctx); - - return; - } - - PATH_DEBUG (proxy, "%d pending message(s) for Text iface", result->len); - for (i = 0; i < result->len; ++i) - { - GValueArray *message_struct; - const gchar *message_body; - gchar *tpl_message_token; - guint message_id; - guint message_timestamp; - guint from_handle; - guint message_type; - guint message_flags; - - message_struct = g_ptr_array_index (result, i); - - tp_value_array_unpack (message_struct, 6, - &message_id, - &message_timestamp, - &from_handle, - &message_type, - &message_flags, - &message_body); - - tpl_message_token = _tpl_create_message_token (channel_path, - message_timestamp, message_id); - - /* log only log-ids not in cached_pending_msgs -> not already logged */ - l = g_list_find_custom (cached_pending_msgs, tpl_message_token, - (GCompareFunc) g_strcmp0); - - if (l == NULL) - { - /* call the received signal callback to trigger the message storing */ - on_received_signal_cb (proxy, message_id, message_timestamp, - from_handle, message_type, message_flags, message_body, - NULL, NULL); - } - else - { - /* the message has been already logged, remove it from the list so - * that, in the end of the loop, the items still in - * cached_pending_msgs can be considered stale */ - g_free (l->data); - cached_pending_msgs = g_list_delete_link (cached_pending_msgs, l); - } - - g_free (tpl_message_token); - } - - /* At this point all remaining elements of cached_pending_msgs are those - * that the TplLogStoreSqlite knew as pending but currently not - * listed as such in the current pending message list -> stale */ - tpl_text_channel_clean_up_stale_tokens (TPL_TEXT_CHANNEL (proxy), - cached_pending_msgs); - while (cached_pending_msgs != NULL) - { - PATH_DEBUG (proxy, "%s is stale, removed from DB", - (gchar *) cached_pending_msgs->data); - - g_free (cached_pending_msgs->data); - cached_pending_msgs = g_list_delete_link (cached_pending_msgs, - cached_pending_msgs); - } - - _tpl_action_chain_continue (ctx); -} - - -static void -pendingproc_get_pending_messages (TplActionChain *ctx, - gpointer user_data) -{ - TplTextChannel *chan_text = _tpl_action_chain_get_object (ctx); - - if (tp_proxy_has_interface_by_id (chan_text, - TP_IFACE_QUARK_CHANNEL_INTERFACE_MESSAGES)) - tp_cli_dbus_properties_call_get (chan_text, -1, - TP_IFACE_CHANNEL_INTERFACE_MESSAGES, "PendingMessages", - got_message_pending_messages_cb, ctx, NULL, - G_OBJECT (chan_text)); - else - tp_cli_channel_type_text_call_list_pending_messages (TP_CHANNEL (chan_text), - -1, FALSE, got_text_pending_messages_cb, ctx, NULL, NULL); -} - - -/* Cleans up stale log-ids in the index logstore. - * - * It 'brutally' considers as stale all log-ids which timestamp is older than - * <time_limit> days AND are still not set as acknowledged. - * - * NOTE: While retrieving open channels, a partial clean-up for the channel's - * stale pending messages is done. It's not enough, since it doesn't consider - * all the channels being closed at retrieval time. This function tries to - * catch stale ids in the rest of the DB, heuristically. - * - * It is wrong to consider all the log-ids not having an channel currently - * open as stale, since a channel might be temporarely disconnected and - * reconnected and some protocols might repropose not acknowledged messages on - * reconnection. We need to consider only reasonably old log-ids. - * - * This function is meant only to reduce the size of the DB used for indexing. - * - * No _tpl_action_chain_terminate() is called if some fatal error occurs since - * it's not considered a crucial point for TplChannel preparation. - */ -#if 0 -static void -pendingproc_cleanup_pending_messages_db (TplActionChain *ctx, - gpointer user_data) -{ - /* FIXME: https://bugs.freedesktop.org/show_bug.cgi?id=28791 */ - /* five days ago in seconds */ - TplTextChannel *self = _tpl_action_chain_get_object (ctx); - const time_t time_limit = _tpl_time_get_current () - - TPL_LOG_STORE_SQLITE_CLEANUP_DELTA_LIMIT; - TplLogStore *cache = _tpl_log_store_sqlite_dup (); - GList *l; - GError *error = NULL; - - if (cache == NULL) - { - DEBUG ("Unable to obtain the TplLogStoreIndex singleton"); - goto out; - } - - l = _tpl_log_store_sqlite_get_log_ids (cache, NULL, time_limit, - &error); - if (error != NULL) - { - DEBUG ("unable to obtain log-id in Index DB: %s", error->message); - g_error_free (error); - /* do not call _tpl_action_chain_terminate, if it's temporary next startup - * TPL will re-do the clean-up. If it's fatal, the flow will stop later - * anyway */ - goto out; - } - - if (l != NULL) - PATH_DEBUG (self, "Cleaning up stale messages"); - tpl_text_channel_clean_up_stale_tokens (self, l); - while (l != NULL) - { - PATH_DEBUG (self, "%s is stale, removed from DB", (gchar *) l->data); - g_free (l->data); - l = g_list_delete_link (l, l); - } - -out: - if (cache != NULL) - g_object_unref (cache); - - _tpl_action_chain_continue (ctx); -} -#endif - - static void tpl_text_channel_call_when_ready (TplChannel *chan, GAsyncReadyCallback cb, gpointer user_data) @@ -1087,7 +629,7 @@ tpl_text_channel_call_when_ready (TplChannel *chan, /* first: connect signals, so none are lost * second: prepare all TplChannel * third: cache my contact and the remote one. - * last: check for pending messages + * last: connect message signals * * If for any reason, the order is changed, it's needed to check what objects * are unreferenced by g_object_unref but used by a next action AND what @@ -1097,11 +639,6 @@ tpl_text_channel_call_when_ready (TplChannel *chan, _tpl_action_chain_append (actions, pendingproc_get_my_contact, NULL); _tpl_action_chain_append (actions, pendingproc_get_remote_contacts, NULL); _tpl_action_chain_append (actions, pendingproc_connect_message_signals, NULL); - _tpl_action_chain_append (actions, pendingproc_get_pending_messages, NULL); -#if 0 - _tpl_action_chain_append (actions, pendingproc_cleanup_pending_messages_db, - NULL); -#endif /* start the chain consuming */ _tpl_action_chain_continue (actions); } |