summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGabriel Ivascu <gabrielivascu@gnome.org>2017-12-01 21:53:59 +0200
committerGabriel Ivascu <gabrielivascu@gnome.org>2017-12-02 00:35:51 +0200
commit8969027196ef3b20b24a9f4fccec091f6db4de76 (patch)
treecfa0cb1b3f0cc773dec7843071003a4877b7b2d4
parent1007f727f6591ba273020333a72340a8d2f7d7ff (diff)
downloadepiphany-8969027196ef3b20b24a9f4fccec091f6db4de76.tar.gz
sync-service: Implement batch upload
-rw-r--r--lib/ephy-sync-utils.h3
-rw-r--r--lib/sync/ephy-history-manager.c2
-rw-r--r--lib/sync/ephy-open-tabs-manager.c2
-rw-r--r--lib/sync/ephy-password-manager.c2
-rw-r--r--lib/sync/ephy-sync-service.c243
-rw-r--r--lib/sync/ephy-synchronizable-manager.h2
-rw-r--r--src/bookmarks/ephy-bookmarks-manager.c2
-rw-r--r--src/ephy-shell.c10
8 files changed, 242 insertions, 24 deletions
diff --git a/lib/ephy-sync-utils.h b/lib/ephy-sync-utils.h
index cad54f42f..a124dd0ed 100644
--- a/lib/ephy-sync-utils.h
+++ b/lib/ephy-sync-utils.h
@@ -37,6 +37,9 @@ const SecretSchema *ephy_sync_utils_get_secret_schema (void) G_GNUC_CONST;
#define EPHY_SYNC_DEVICE_ID_LEN 32
#define EPHY_SYNC_BSO_ID_LEN 12
+#define EPHY_SYNC_BATCH_SIZE 80
+#define EPHY_SYNC_NUM_BATCHES 80
+
char *ephy_sync_utils_encode_hex (const guint8 *data,
gsize data_len);
guint8 *ephy_sync_utils_decode_hex (const char *hex);
diff --git a/lib/sync/ephy-history-manager.c b/lib/sync/ephy-history-manager.c
index 6b99b6c76..3c12c6f0b 100644
--- a/lib/sync/ephy-history-manager.c
+++ b/lib/sync/ephy-history-manager.c
@@ -505,7 +505,7 @@ merge_history_cb (EphyHistoryService *service,
data->remotes_updated);
out:
- data->callback (to_upload, TRUE, data->user_data);
+ data->callback (to_upload, data->user_data);
g_list_free_full (urls, (GDestroyNotify)ephy_history_url_free);
if (records_ht_id)
diff --git a/lib/sync/ephy-open-tabs-manager.c b/lib/sync/ephy-open-tabs-manager.c
index 4b58a23a2..e16b1ffb8 100644
--- a/lib/sync/ephy-open-tabs-manager.c
+++ b/lib/sync/ephy-open-tabs-manager.c
@@ -280,7 +280,7 @@ synchronizable_manager_merge (EphySynchronizableManager *manager,
g_free (device_bso_id);
- callback (to_upload, TRUE, user_data);
+ callback (to_upload, user_data);
}
static void
diff --git a/lib/sync/ephy-password-manager.c b/lib/sync/ephy-password-manager.c
index 9d4ca54c1..fb10005e9 100644
--- a/lib/sync/ephy-password-manager.c
+++ b/lib/sync/ephy-password-manager.c
@@ -1065,7 +1065,7 @@ merge_cb (GList *records,
data->remotes_deleted,
data->remotes_updated);
- data->callback (to_upload, FALSE, data->user_data);
+ data->callback (to_upload, data->user_data);
g_list_free_full (records, g_object_unref);
merge_passwords_async_data_free (data);
diff --git a/lib/sync/ephy-sync-service.c b/lib/sync/ephy-sync-service.c
index 99c9cb31d..242f8fce5 100644
--- a/lib/sync/ephy-sync-service.c
+++ b/lib/sync/ephy-sync-service.c
@@ -130,6 +130,16 @@ typedef struct {
EphySynchronizable *synchronizable;
} SyncAsyncData;
+typedef struct {
+ EphySyncService *service;
+ EphySynchronizableManager *manager;
+ GPtrArray *synchronizables;
+ guint start;
+ guint end;
+ char *batch_id;
+ gboolean batch_is_last;
+} BatchUploadAsyncData;
+
static StorageRequestAsyncData *
storage_request_async_data_new (const char *endpoint,
const char *method,
@@ -268,6 +278,51 @@ sync_async_data_free (SyncAsyncData *data)
g_slice_free (SyncAsyncData, data);
}
+static inline BatchUploadAsyncData *
+batch_upload_async_data_new (EphySyncService *service,
+ EphySynchronizableManager *manager,
+ GPtrArray *synchronizables,
+ guint start,
+ guint end,
+ const char *batch_id,
+ gboolean batch_is_last)
+{
+ BatchUploadAsyncData *data;
+
+ data = g_slice_new (BatchUploadAsyncData);
+ data->service = g_object_ref (service);
+ data->manager = g_object_ref (manager);
+ data->synchronizables = g_ptr_array_ref (synchronizables);
+ data->start = start;
+ data->end = end;
+ data->batch_id = g_strdup (batch_id);
+ data->batch_is_last = batch_is_last;
+
+ return data;
+}
+
+static inline BatchUploadAsyncData *
+batch_upload_async_data_dup (BatchUploadAsyncData *data)
+{
+ g_assert (data);
+
+ return batch_upload_async_data_new (data->service, data->manager,
+ data->synchronizables, data->start,
+ data->end, data->batch_id, data->batch_is_last);
+}
+
+static inline void
+batch_upload_async_data_free (BatchUploadAsyncData *data)
+{
+ g_assert (data);
+
+ g_object_unref (data->service);
+ g_object_unref (data->manager);
+ g_ptr_array_unref (data->synchronizables);
+ g_free (data->batch_id);
+ g_slice_free (BatchUploadAsyncData, data);
+}
+
static void
ephy_sync_service_set_property (GObject *object,
guint prop_id,
@@ -476,7 +531,7 @@ ephy_sync_service_send_storage_request (EphySyncService *self,
data->request_body, strlen (data->request_body));
}
- if (!g_strcmp0 (data->method, SOUP_METHOD_PUT))
+ if (!g_strcmp0 (data->method, SOUP_METHOD_PUT) || !g_strcmp0 (data->method, SOUP_METHOD_POST))
soup_message_headers_append (msg->request_headers, "content-type", content_type);
if (data->modified_since >= 0) {
@@ -1244,27 +1299,192 @@ ephy_sync_service_upload_synchronizable (EphySyncService *self,
ephy_sync_crypto_key_bundle_free (bundle);
}
+static GPtrArray *
+ephy_sync_service_split_into_batches (EphySyncService *self,
+ EphySynchronizableManager *manager,
+ GPtrArray *synchronizables,
+ guint start,
+ guint end)
+{
+ SyncCryptoKeyBundle *bundle;
+ GPtrArray *batches;
+ const char *collection;
+
+ g_assert (EPHY_IS_SYNC_SERVICE (self));
+ g_assert (EPHY_IS_SYNCHRONIZABLE_MANAGER (manager));
+ g_assert (synchronizables);
+
+ batches = g_ptr_array_new_with_free_func (g_free);
+ collection = ephy_synchronizable_manager_get_collection_name (manager);
+ bundle = ephy_sync_service_get_key_bundle (self, collection);
+
+ for (guint i = start; i < end; i += EPHY_SYNC_BATCH_SIZE) {
+ JsonNode *node = json_node_new (JSON_NODE_ARRAY);
+ JsonArray *array = json_array_new ();
+
+ for (guint k = i; k < MIN (i + EPHY_SYNC_BATCH_SIZE, end); k++) {
+ EphySynchronizable *synchronizable = g_ptr_array_index (synchronizables, k);
+ JsonNode *bso = ephy_synchronizable_to_bso (synchronizable, bundle);
+ JsonObject *object = json_object_ref (json_node_get_object (bso));
+
+ json_array_add_object_element (array, object);
+ json_node_unref (bso);
+ }
+
+ json_node_take_array (node, array);
+ g_ptr_array_add (batches, json_to_string (node, FALSE));
+ json_node_unref (node);
+ }
+
+ ephy_sync_crypto_key_bundle_free (bundle);
+
+ return batches;
+}
+
+static void
+commit_batch_cb (SoupSession *session,
+ SoupMessage *msg,
+ gpointer user_data)
+{
+ BatchUploadAsyncData *data = user_data;
+ const char *last_modified;
+
+ if (msg->status_code != 200) {
+ g_warning ("Failed to commit batch. Status code: %u, response: %s",
+ msg->status_code, msg->response_body->data);
+ goto out;
+ }
+
+ LOG ("Successfully committed batches");
+ /* Update sync time. */
+ last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified");
+ ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL));
+
+out:
+ batch_upload_async_data_free (data);
+}
+
+static void
+upload_batch_cb (SoupSession *session,
+ SoupMessage *msg,
+ gpointer user_data)
+{
+ BatchUploadAsyncData *data = user_data;
+ const char *collection;
+ char *endpoint = NULL;
+
+ /* Note: "202 Accepted" status code. */
+ if (msg->status_code != 202) {
+ g_warning ("Failed to upload batch. Status code: %u, response: %s",
+ msg->status_code, msg->response_body->data);
+ } else {
+ LOG ("Successfully uploaded batch");
+ }
+
+ if (!data->batch_is_last)
+ goto out;
+
+ collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+ endpoint = g_strdup_printf ("storage/%s?commit=true&batch=%s", collection, data->batch_id);
+ ephy_sync_service_queue_storage_request (data->service, endpoint,
+ SOUP_METHOD_POST, "[]", -1, -1,
+ commit_batch_cb,
+ batch_upload_async_data_dup (data));
+
+out:
+ g_free (endpoint);
+ /* Remove last reference to the array with the items to upload. */
+ if (data->batch_is_last)
+ g_ptr_array_unref (data->synchronizables);
+ batch_upload_async_data_free (data);
+}
+
+static void
+start_batch_upload_cb (SoupSession *session,
+ SoupMessage *msg,
+ gpointer user_data)
+{
+ BatchUploadAsyncData *data = user_data;
+ GPtrArray *batches = NULL;
+ JsonNode *node = NULL;
+ JsonObject *object;
+ GError *error = NULL;
+ const char *collection;
+ char *endpoint = NULL;
+
+ /* Note: "202 Accepted" status code. */
+ if (msg->status_code != 202) {
+ g_warning ("Failed to start batch upload. Status code: %u, response: %s",
+ msg->status_code, msg->response_body->data);
+ goto out;
+ }
+
+ node = json_from_string (msg->response_body->data, &error);
+ if (error) {
+ g_warning ("Response is not a valid JSON: %s", error->message);
+ g_error_free (error);
+ goto out;
+ }
+
+ object = json_node_get_object (node);
+ data->batch_id = soup_uri_encode (json_object_get_string_member (object, "batch"), NULL);
+ collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+ endpoint = g_strdup_printf ("storage/%s?batch=%s", collection, data->batch_id);
+
+ batches = ephy_sync_service_split_into_batches (data->service, data->manager,
+ data->synchronizables,
+ data->start, data->end);
+ for (guint i = 0; i < batches->len; i++) {
+ BatchUploadAsyncData *data_dup = batch_upload_async_data_dup (data);
+
+ if (i == batches->len - 1)
+ data_dup->batch_is_last = TRUE;
+
+ ephy_sync_service_queue_storage_request (data->service, endpoint, SOUP_METHOD_POST,
+ g_ptr_array_index (batches, i), -1, -1,
+ upload_batch_cb, data_dup);
+ }
+
+out:
+ g_free (endpoint);
+ if (node)
+ json_node_unref (node);
+ if (batches)
+ g_ptr_array_unref (batches);
+ batch_upload_async_data_free (data);
+}
+
static void
merge_collection_finished_cb (GPtrArray *to_upload,
- gboolean should_force,
gpointer user_data)
{
- SyncCollectionAsyncData *data = (SyncCollectionAsyncData *)user_data;
+ SyncCollectionAsyncData *data = user_data;
+ BatchUploadAsyncData *bdata;
+ guint step = EPHY_SYNC_NUM_BATCHES * EPHY_SYNC_BATCH_SIZE;
+ const char *collection;
+ char *endpoint = NULL;
if (!to_upload || to_upload->len == 0)
goto out;
- for (guint i = 0; i < to_upload->len; i++) {
- ephy_sync_service_upload_synchronizable (data->service, data->manager,
- g_ptr_array_index (to_upload, i),
- should_force);
+ collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+ endpoint = g_strdup_printf ("storage/%s?batch=true", collection);
+
+ /* http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#example-uploading-a-large-batch-of-items */
+ for (guint i = 0; i < to_upload->len; i += step) {
+ bdata = batch_upload_async_data_new (data->service, data->manager,
+ to_upload, i,
+ MIN (i + step, to_upload->len),
+ NULL, FALSE);
+ ephy_sync_service_queue_storage_request (data->service, endpoint,
+ SOUP_METHOD_POST, "[]", -1, -1,
+ start_batch_upload_cb, bdata);
}
out:
+ g_free (endpoint);
if (data->is_last)
g_signal_emit (data->service, signals[SYNC_FINISHED], 0);
- if (to_upload)
- g_ptr_array_unref (to_upload);
sync_collection_async_data_free (data);
}
@@ -1281,7 +1501,6 @@ sync_collection_cb (SoupSession *session,
GError *error = NULL;
GType type;
const char *collection;
- const char *last_modified;
gboolean is_deleted;
collection = ephy_synchronizable_manager_get_collection_name (data->manager);
@@ -1322,11 +1541,7 @@ sync_collection_cb (SoupSession *session,
g_list_length (data->remotes_updated),
collection);
- /* Update sync time. */
- last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified");
- ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL));
ephy_synchronizable_manager_set_is_initial_sync (data->manager, FALSE);
-
ephy_synchronizable_manager_merge (data->manager, data->is_initial,
data->remotes_deleted, data->remotes_updated,
merge_collection_finished_cb, data);
diff --git a/lib/sync/ephy-synchronizable-manager.h b/lib/sync/ephy-synchronizable-manager.h
index 8c83e5268..04243d44c 100644
--- a/lib/sync/ephy-synchronizable-manager.h
+++ b/lib/sync/ephy-synchronizable-manager.h
@@ -30,7 +30,7 @@ G_BEGIN_DECLS
G_DECLARE_INTERFACE (EphySynchronizableManager, ephy_synchronizable_manager, EPHY, SYNCHRONIZABLE_MANAGER, GObject)
-typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gboolean should_force, gpointer user_data);
+typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gpointer user_data);
struct _EphySynchronizableManagerInterface {
GTypeInterface parent_iface;
diff --git a/src/bookmarks/ephy-bookmarks-manager.c b/src/bookmarks/ephy-bookmarks-manager.c
index 0abec5bd1..b39310b05 100644
--- a/src/bookmarks/ephy-bookmarks-manager.c
+++ b/src/bookmarks/ephy-bookmarks-manager.c
@@ -916,7 +916,7 @@ synchronizable_manager_merge (EphySynchronizableManager *manager,
else
to_upload = ephy_bookmarks_manager_handle_regular_merge (self, remotes_updated, remotes_deleted);
- callback (to_upload, FALSE, user_data);
+ callback (to_upload, user_data);
}
static void
diff --git a/src/ephy-shell.c b/src/ephy-shell.c
index a715725aa..d91fbcbf7 100644
--- a/src/ephy-shell.c
+++ b/src/ephy-shell.c
@@ -320,6 +320,11 @@ register_synchronizable_managers (EphyShell *shell,
g_assert (EPHY_IS_SYNC_SERVICE (service));
g_assert (EPHY_IS_SHELL (shell));
+ if (ephy_sync_utils_history_sync_is_enabled ()) {
+ manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell));
+ ephy_sync_service_register_manager (service, manager);
+ }
+
if (ephy_sync_utils_bookmarks_sync_is_enabled ()) {
manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_bookmarks_manager (shell));
ephy_sync_service_register_manager (service, manager);
@@ -330,11 +335,6 @@ register_synchronizable_managers (EphyShell *shell,
ephy_sync_service_register_manager (service, manager);
}
- if (ephy_sync_utils_history_sync_is_enabled ()) {
- manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell));
- ephy_sync_service_register_manager (service, manager);
- }
-
if (ephy_sync_utils_open_tabs_sync_is_enabled ()) {
manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_open_tabs_manager (shell));
ephy_sync_service_register_manager (service, manager);