summaryrefslogtreecommitdiff
path: root/telepathy-logger/text-channel.c
diff options
context:
space:
mode:
authorNicolas Dufresne <nicolas.dufresne@collabora.co.uk>2011-03-16 21:44:17 -0400
committerNicolas Dufresne <nicolas.dufresne@collabora.co.uk>2011-03-16 21:44:17 -0400
commit56dbfc424254bb9970237b407bcfa1080151a94e (patch)
treeea46564b27c463756bcbaa68a4dfe4476a5f21b9 /telepathy-logger/text-channel.c
parent18be97017eb658b14c5069bb17c03145f8dc3eb4 (diff)
downloadtelepathy-logger-56dbfc424254bb9970237b407bcfa1080151a94e.tar.gz
Use SQLite cache to avoid duplicates on logger crash
Diffstat (limited to 'telepathy-logger/text-channel.c')
-rw-r--r--telepathy-logger/text-channel.c220
1 files changed, 204 insertions, 16 deletions
diff --git a/telepathy-logger/text-channel.c b/telepathy-logger/text-channel.c
index 2b869f5..279385b 100644
--- a/telepathy-logger/text-channel.c
+++ b/telepathy-logger/text-channel.c
@@ -33,6 +33,7 @@
#include "entity-internal.h"
#include "event-internal.h"
#include "log-manager-internal.h"
+#include "log-store-sqlite-internal.h"
#include "observer-internal.h"
#include "text-event.h"
#include "text-event-internal.h"
@@ -228,7 +229,7 @@ get_remote_contact_cb (TpConnection *connection,
else
{
self->priv->remote =
- tpl_entity_new_from_tp_contact (contacts[1], TPL_ENTITY_CONTACT);
+ tpl_entity_new_from_tp_contact (contacts[0], TPL_ENTITY_CONTACT);
_tpl_action_chain_continue (ctx);
}
}
@@ -297,6 +298,35 @@ on_channel_invalidated_cb (TpProxy *proxy,
}
+static guint
+get_message_pending_id (TpMessage *m)
+{
+ return tp_asv_get_uint32 (tp_message_peek (TP_MESSAGE (m), 0),
+ "pending-message-id", NULL);
+}
+
+
+static guint
+get_message_timestamp (TpMessage *message)
+{
+ gint64 timestamp;
+
+ timestamp = tp_message_get_sent_timestamp (message);
+
+ if (timestamp == 0)
+ timestamp = tp_message_get_received_timestamp (message);
+
+ if (timestamp == 0)
+ {
+ GDateTime *datetime = g_date_time_new_now_utc ();
+ timestamp = g_date_time_to_unix (datetime);
+ g_date_time_unref (datetime);
+ }
+
+ return timestamp;
+}
+
+
static void
tpl_text_channel_store_message (TplTextChannel *self,
TpMessage *message,
@@ -338,17 +368,7 @@ tpl_text_channel_store_message (TplTextChannel *self,
}
/* Ensure timestamp */
- timestamp = tp_message_get_sent_timestamp (message);
-
- if (timestamp == 0)
- timestamp = tp_message_get_received_timestamp (message);
-
- if (timestamp == 0)
- {
- GDateTime *datetime = g_date_time_new_now_utc ();
- timestamp = g_date_time_to_unix (datetime);
- g_date_time_unref (datetime);
- }
+ timestamp = get_message_timestamp (message);
text = tp_message_to_text (message, NULL);
@@ -390,6 +410,22 @@ tpl_text_channel_store_message (TplTextChannel *self,
PATH_DEBUG (self, "LogStore: %s", error->message);
g_error_free (error);
}
+ else if (tpl_entity_get_entity_type (sender) != TPL_ENTITY_SELF)
+ {
+ TplLogStore *cache = _tpl_log_store_sqlite_dup ();
+ _tpl_log_store_sqlite_add_pending_message (cache,
+ TP_CHANNEL (self),
+ get_message_pending_id (message),
+ timestamp,
+ &error);
+
+ if (error != NULL)
+ {
+ PATH_DEBUG (self, "Failed to cache pending message: %s",
+ error->message);
+ g_error_free (error);
+ }
+ }
g_object_unref (logmanager);
g_object_unref (event);
@@ -451,20 +487,169 @@ on_message_sent_cb (TpChannel *proxy,
static void
+on_pending_message_removed_cb (TpTextChannel *self,
+ TpSignalledMessage *message,
+ gpointer user_data)
+{
+ TplLogStore *cache;
+ GList *ids = NULL;
+ GError *error = NULL;
+
+ ids = g_list_append (ids,
+ GUINT_TO_POINTER (get_message_pending_id (TP_MESSAGE (message))));
+
+ cache = _tpl_log_store_sqlite_dup ();
+ _tpl_log_store_sqlite_remove_pending_messages (cache, TP_CHANNEL (self),
+ ids, &error);
+
+ if (error != NULL)
+ {
+ PATH_DEBUG (self, "Failed to remove pending message from cache: %s",
+ error->message);
+ g_error_free (error);
+ }
+
+ g_object_unref (cache);
+}
+
+
+static gint
+pending_message_compare (TpSignalledMessage *m1, TpSignalledMessage *m2)
+{
+ guint id1, id2;
+
+ id1 = get_message_pending_id (TP_MESSAGE (m1));
+ id2 = get_message_pending_id (TP_MESSAGE (m2));
+
+ if (id1 > id2)
+ return 1;
+ else if (id1 < id2)
+ return -1;
+ else
+ return 0;
+}
+
+
+static void
pendingproc_store_pending_messages (TplActionChain *ctx,
gpointer user_data)
{
TplTextChannel *self = _tpl_action_chain_get_object (ctx);
+ TplLogStore *cache;
+ GError *error = NULL;
+ GList *cached_messages;
GList *pending_messages;
- GList *it;
+ GList *cached_it, *pending_it;
+ GList *to_remove = NULL;
+ GList *to_log = NULL;
+
+ cache = _tpl_log_store_sqlite_dup ();
+ cached_messages = _tpl_log_store_sqlite_get_pending_messages (cache,
+ TP_CHANNEL (self), &error);
+
+ if (error != NULL)
+ {
+ DEBUG ("Failed to read pending_message cache: %s.", error->message);
+ g_error_free (error);
+ /* We simply ignore this error, as if the list was empty */
+ }
pending_messages =
tp_text_channel_get_pending_messages (TP_TEXT_CHANNEL (self));
- for (it = pending_messages; it != NULL; it = g_list_next (it))
- on_message_received_cb (TP_TEXT_CHANNEL (self),
- TP_SIGNALLED_MESSAGE (it->data), self);
+ pending_messages = g_list_sort (pending_messages,
+ (GCompareFunc) pending_message_compare);
+
+ cached_it = cached_messages;
+ pending_it = pending_messages;
+
+ while (cached_it != NULL || pending_it != NULL)
+ {
+ TplPendingMessage *cached;
+ TpSignalledMessage *pending;
+ guint pending_id;
+ gint64 pending_ts;
+
+ if (cached_it == NULL)
+ {
+ /* No more cached pending, just log the pending messages */
+ to_log = g_list_append (to_log, pending_it->data);
+ pending_it = g_list_next (pending_it);
+ continue;
+ }
+
+ if (pending_it == NULL)
+ {
+ /* No more pending, just remove the cached messages */
+ to_remove = g_list_append (to_remove, GUINT_TO_POINTER (cached->id));
+ cached_it = g_list_next (cached_it);
+ continue;
+ }
+
+ cached = cached_it->data;
+ pending = pending_it->data;
+ pending_id = get_message_pending_id (TP_MESSAGE (pending));
+ pending_ts = get_message_timestamp (TP_MESSAGE (pending));
+
+ if (cached->id == pending_id)
+ {
+ if (cached->timestamp != pending_ts)
+ {
+ /* The cache messaged is invalid, remove it */
+ to_remove = g_list_append (to_remove,
+ GUINT_TO_POINTER (cached->id));
+ cached_it = g_list_next (cached_it);
+ }
+ else
+ {
+ /* The message is already logged */
+ cached_it = g_list_next (cached_it);
+ pending_it = g_list_next (pending_it);
+ }
+ }
+ else if (cached->id < pending_id)
+ {
+ /* The cached ID is not valid anymore, remove it */
+ to_remove = g_list_append (to_remove, GUINT_TO_POINTER (cached->id));
+ cached_it = g_list_next (cached_it);
+ }
+ else if (cached->id < pending_id)
+ {
+ /* The pending message has not been logged */
+ to_log = g_list_append (to_log, pending);
+ pending_it = g_list_next (pending_it);
+ }
+ }
+
+ g_list_foreach (cached_messages, (GFunc) g_free, NULL);
+ g_list_free (cached_messages);
+ g_list_free (pending_messages);
+
+ /* We need to remove before we log to avoid collisions */
+ if (to_remove != NULL)
+ {
+ if (!_tpl_log_store_sqlite_remove_pending_messages (cache,
+ TP_CHANNEL (self), to_remove, &error))
+ {
+ DEBUG ("Failed remove old pending messages from cache: %s", error->message);
+ g_error_free (error);
+ }
+ g_list_free (to_remove);
+ }
+
+ if (to_log != NULL)
+ {
+ GList *it;
+
+ for (it = to_log; it != NULL; it = g_list_next (it))
+ on_message_received_cb (TP_TEXT_CHANNEL (self),
+ TP_SIGNALLED_MESSAGE (it->data), self);
+
+ g_list_free (to_log);
+ }
+
+ g_object_unref (cache);
_tpl_action_chain_continue (ctx);
}
@@ -484,6 +669,9 @@ pendingproc_connect_message_signals (TplActionChain *ctx,
tp_g_signal_connect_object (self, "message-sent",
G_CALLBACK (on_message_sent_cb), self, 0);
+ tp_g_signal_connect_object (self, "pending-message-removed",
+ G_CALLBACK (on_pending_message_removed_cb), self, 0);
+
_tpl_action_chain_continue (ctx);
}