From 8969027196ef3b20b24a9f4fccec091f6db4de76 Mon Sep 17 00:00:00 2001 From: Gabriel Ivascu Date: Fri, 1 Dec 2017 21:53:59 +0200 Subject: sync-service: Implement batch upload --- lib/ephy-sync-utils.h | 3 + lib/sync/ephy-history-manager.c | 2 +- lib/sync/ephy-open-tabs-manager.c | 2 +- lib/sync/ephy-password-manager.c | 2 +- lib/sync/ephy-sync-service.c | 243 +++++++++++++++++++++++++++++++-- lib/sync/ephy-synchronizable-manager.h | 2 +- src/bookmarks/ephy-bookmarks-manager.c | 2 +- src/ephy-shell.c | 10 +- 8 files changed, 242 insertions(+), 24 deletions(-) diff --git a/lib/ephy-sync-utils.h b/lib/ephy-sync-utils.h index cad54f42f..a124dd0ed 100644 --- a/lib/ephy-sync-utils.h +++ b/lib/ephy-sync-utils.h @@ -37,6 +37,9 @@ const SecretSchema *ephy_sync_utils_get_secret_schema (void) G_GNUC_CONST; #define EPHY_SYNC_DEVICE_ID_LEN 32 #define EPHY_SYNC_BSO_ID_LEN 12 +#define EPHY_SYNC_BATCH_SIZE 80 +#define EPHY_SYNC_NUM_BATCHES 80 + char *ephy_sync_utils_encode_hex (const guint8 *data, gsize data_len); guint8 *ephy_sync_utils_decode_hex (const char *hex); diff --git a/lib/sync/ephy-history-manager.c b/lib/sync/ephy-history-manager.c index 6b99b6c76..3c12c6f0b 100644 --- a/lib/sync/ephy-history-manager.c +++ b/lib/sync/ephy-history-manager.c @@ -505,7 +505,7 @@ merge_history_cb (EphyHistoryService *service, data->remotes_updated); out: - data->callback (to_upload, TRUE, data->user_data); + data->callback (to_upload, data->user_data); g_list_free_full (urls, (GDestroyNotify)ephy_history_url_free); if (records_ht_id) diff --git a/lib/sync/ephy-open-tabs-manager.c b/lib/sync/ephy-open-tabs-manager.c index 4b58a23a2..e16b1ffb8 100644 --- a/lib/sync/ephy-open-tabs-manager.c +++ b/lib/sync/ephy-open-tabs-manager.c @@ -280,7 +280,7 @@ synchronizable_manager_merge (EphySynchronizableManager *manager, g_free (device_bso_id); - callback (to_upload, TRUE, user_data); + callback (to_upload, user_data); } static void diff --git a/lib/sync/ephy-password-manager.c b/lib/sync/ephy-password-manager.c index 9d4ca54c1..fb10005e9 100644 --- a/lib/sync/ephy-password-manager.c +++ b/lib/sync/ephy-password-manager.c @@ -1065,7 +1065,7 @@ merge_cb (GList *records, data->remotes_deleted, data->remotes_updated); - data->callback (to_upload, FALSE, data->user_data); + data->callback (to_upload, data->user_data); g_list_free_full (records, g_object_unref); merge_passwords_async_data_free (data); diff --git a/lib/sync/ephy-sync-service.c b/lib/sync/ephy-sync-service.c index 99c9cb31d..242f8fce5 100644 --- a/lib/sync/ephy-sync-service.c +++ b/lib/sync/ephy-sync-service.c @@ -130,6 +130,16 @@ typedef struct { EphySynchronizable *synchronizable; } SyncAsyncData; +typedef struct { + EphySyncService *service; + EphySynchronizableManager *manager; + GPtrArray *synchronizables; + guint start; + guint end; + char *batch_id; + gboolean batch_is_last; +} BatchUploadAsyncData; + static StorageRequestAsyncData * storage_request_async_data_new (const char *endpoint, const char *method, @@ -268,6 +278,51 @@ sync_async_data_free (SyncAsyncData *data) g_slice_free (SyncAsyncData, data); } +static inline BatchUploadAsyncData * +batch_upload_async_data_new (EphySyncService *service, + EphySynchronizableManager *manager, + GPtrArray *synchronizables, + guint start, + guint end, + const char *batch_id, + gboolean batch_is_last) +{ + BatchUploadAsyncData *data; + + data = g_slice_new (BatchUploadAsyncData); + data->service = g_object_ref (service); + data->manager = g_object_ref (manager); + data->synchronizables = g_ptr_array_ref (synchronizables); + data->start = start; + data->end = end; + data->batch_id = g_strdup (batch_id); + data->batch_is_last = batch_is_last; + + return data; +} + +static inline BatchUploadAsyncData * +batch_upload_async_data_dup (BatchUploadAsyncData *data) +{ + g_assert (data); + + return batch_upload_async_data_new (data->service, data->manager, + data->synchronizables, data->start, + data->end, data->batch_id, data->batch_is_last); +} + +static inline void +batch_upload_async_data_free (BatchUploadAsyncData *data) +{ + g_assert (data); + + g_object_unref (data->service); + g_object_unref (data->manager); + g_ptr_array_unref (data->synchronizables); + g_free (data->batch_id); + g_slice_free (BatchUploadAsyncData, data); +} + static void ephy_sync_service_set_property (GObject *object, guint prop_id, @@ -476,7 +531,7 @@ ephy_sync_service_send_storage_request (EphySyncService *self, data->request_body, strlen (data->request_body)); } - if (!g_strcmp0 (data->method, SOUP_METHOD_PUT)) + if (!g_strcmp0 (data->method, SOUP_METHOD_PUT) || !g_strcmp0 (data->method, SOUP_METHOD_POST)) soup_message_headers_append (msg->request_headers, "content-type", content_type); if (data->modified_since >= 0) { @@ -1244,27 +1299,192 @@ ephy_sync_service_upload_synchronizable (EphySyncService *self, ephy_sync_crypto_key_bundle_free (bundle); } +static GPtrArray * +ephy_sync_service_split_into_batches (EphySyncService *self, + EphySynchronizableManager *manager, + GPtrArray *synchronizables, + guint start, + guint end) +{ + SyncCryptoKeyBundle *bundle; + GPtrArray *batches; + const char *collection; + + g_assert (EPHY_IS_SYNC_SERVICE (self)); + g_assert (EPHY_IS_SYNCHRONIZABLE_MANAGER (manager)); + g_assert (synchronizables); + + batches = g_ptr_array_new_with_free_func (g_free); + collection = ephy_synchronizable_manager_get_collection_name (manager); + bundle = ephy_sync_service_get_key_bundle (self, collection); + + for (guint i = start; i < end; i += EPHY_SYNC_BATCH_SIZE) { + JsonNode *node = json_node_new (JSON_NODE_ARRAY); + JsonArray *array = json_array_new (); + + for (guint k = i; k < MIN (i + EPHY_SYNC_BATCH_SIZE, end); k++) { + EphySynchronizable *synchronizable = g_ptr_array_index (synchronizables, k); + JsonNode *bso = ephy_synchronizable_to_bso (synchronizable, bundle); + JsonObject *object = json_object_ref (json_node_get_object (bso)); + + json_array_add_object_element (array, object); + json_node_unref (bso); + } + + json_node_take_array (node, array); + g_ptr_array_add (batches, json_to_string (node, FALSE)); + json_node_unref (node); + } + + ephy_sync_crypto_key_bundle_free (bundle); + + return batches; +} + +static void +commit_batch_cb (SoupSession *session, + SoupMessage *msg, + gpointer user_data) +{ + BatchUploadAsyncData *data = user_data; + const char *last_modified; + + if (msg->status_code != 200) { + g_warning ("Failed to commit batch. Status code: %u, response: %s", + msg->status_code, msg->response_body->data); + goto out; + } + + LOG ("Successfully committed batches"); + /* Update sync time. */ + last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified"); + ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL)); + +out: + batch_upload_async_data_free (data); +} + +static void +upload_batch_cb (SoupSession *session, + SoupMessage *msg, + gpointer user_data) +{ + BatchUploadAsyncData *data = user_data; + const char *collection; + char *endpoint = NULL; + + /* Note: "202 Accepted" status code. */ + if (msg->status_code != 202) { + g_warning ("Failed to upload batch. Status code: %u, response: %s", + msg->status_code, msg->response_body->data); + } else { + LOG ("Successfully uploaded batch"); + } + + if (!data->batch_is_last) + goto out; + + collection = ephy_synchronizable_manager_get_collection_name (data->manager); + endpoint = g_strdup_printf ("storage/%s?commit=true&batch=%s", collection, data->batch_id); + ephy_sync_service_queue_storage_request (data->service, endpoint, + SOUP_METHOD_POST, "[]", -1, -1, + commit_batch_cb, + batch_upload_async_data_dup (data)); + +out: + g_free (endpoint); + /* Remove last reference to the array with the items to upload. */ + if (data->batch_is_last) + g_ptr_array_unref (data->synchronizables); + batch_upload_async_data_free (data); +} + +static void +start_batch_upload_cb (SoupSession *session, + SoupMessage *msg, + gpointer user_data) +{ + BatchUploadAsyncData *data = user_data; + GPtrArray *batches = NULL; + JsonNode *node = NULL; + JsonObject *object; + GError *error = NULL; + const char *collection; + char *endpoint = NULL; + + /* Note: "202 Accepted" status code. */ + if (msg->status_code != 202) { + g_warning ("Failed to start batch upload. Status code: %u, response: %s", + msg->status_code, msg->response_body->data); + goto out; + } + + node = json_from_string (msg->response_body->data, &error); + if (error) { + g_warning ("Response is not a valid JSON: %s", error->message); + g_error_free (error); + goto out; + } + + object = json_node_get_object (node); + data->batch_id = soup_uri_encode (json_object_get_string_member (object, "batch"), NULL); + collection = ephy_synchronizable_manager_get_collection_name (data->manager); + endpoint = g_strdup_printf ("storage/%s?batch=%s", collection, data->batch_id); + + batches = ephy_sync_service_split_into_batches (data->service, data->manager, + data->synchronizables, + data->start, data->end); + for (guint i = 0; i < batches->len; i++) { + BatchUploadAsyncData *data_dup = batch_upload_async_data_dup (data); + + if (i == batches->len - 1) + data_dup->batch_is_last = TRUE; + + ephy_sync_service_queue_storage_request (data->service, endpoint, SOUP_METHOD_POST, + g_ptr_array_index (batches, i), -1, -1, + upload_batch_cb, data_dup); + } + +out: + g_free (endpoint); + if (node) + json_node_unref (node); + if (batches) + g_ptr_array_unref (batches); + batch_upload_async_data_free (data); +} + static void merge_collection_finished_cb (GPtrArray *to_upload, - gboolean should_force, gpointer user_data) { - SyncCollectionAsyncData *data = (SyncCollectionAsyncData *)user_data; + SyncCollectionAsyncData *data = user_data; + BatchUploadAsyncData *bdata; + guint step = EPHY_SYNC_NUM_BATCHES * EPHY_SYNC_BATCH_SIZE; + const char *collection; + char *endpoint = NULL; if (!to_upload || to_upload->len == 0) goto out; - for (guint i = 0; i < to_upload->len; i++) { - ephy_sync_service_upload_synchronizable (data->service, data->manager, - g_ptr_array_index (to_upload, i), - should_force); + collection = ephy_synchronizable_manager_get_collection_name (data->manager); + endpoint = g_strdup_printf ("storage/%s?batch=true", collection); + + /* http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#example-uploading-a-large-batch-of-items */ + for (guint i = 0; i < to_upload->len; i += step) { + bdata = batch_upload_async_data_new (data->service, data->manager, + to_upload, i, + MIN (i + step, to_upload->len), + NULL, FALSE); + ephy_sync_service_queue_storage_request (data->service, endpoint, + SOUP_METHOD_POST, "[]", -1, -1, + start_batch_upload_cb, bdata); } out: + g_free (endpoint); if (data->is_last) g_signal_emit (data->service, signals[SYNC_FINISHED], 0); - if (to_upload) - g_ptr_array_unref (to_upload); sync_collection_async_data_free (data); } @@ -1281,7 +1501,6 @@ sync_collection_cb (SoupSession *session, GError *error = NULL; GType type; const char *collection; - const char *last_modified; gboolean is_deleted; collection = ephy_synchronizable_manager_get_collection_name (data->manager); @@ -1322,11 +1541,7 @@ sync_collection_cb (SoupSession *session, g_list_length (data->remotes_updated), collection); - /* Update sync time. */ - last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified"); - ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL)); ephy_synchronizable_manager_set_is_initial_sync (data->manager, FALSE); - ephy_synchronizable_manager_merge (data->manager, data->is_initial, data->remotes_deleted, data->remotes_updated, merge_collection_finished_cb, data); diff --git a/lib/sync/ephy-synchronizable-manager.h b/lib/sync/ephy-synchronizable-manager.h index 8c83e5268..04243d44c 100644 --- a/lib/sync/ephy-synchronizable-manager.h +++ b/lib/sync/ephy-synchronizable-manager.h @@ -30,7 +30,7 @@ G_BEGIN_DECLS G_DECLARE_INTERFACE (EphySynchronizableManager, ephy_synchronizable_manager, EPHY, SYNCHRONIZABLE_MANAGER, GObject) -typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gboolean should_force, gpointer user_data); +typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gpointer user_data); struct _EphySynchronizableManagerInterface { GTypeInterface parent_iface; diff --git a/src/bookmarks/ephy-bookmarks-manager.c b/src/bookmarks/ephy-bookmarks-manager.c index 0abec5bd1..b39310b05 100644 --- a/src/bookmarks/ephy-bookmarks-manager.c +++ b/src/bookmarks/ephy-bookmarks-manager.c @@ -916,7 +916,7 @@ synchronizable_manager_merge (EphySynchronizableManager *manager, else to_upload = ephy_bookmarks_manager_handle_regular_merge (self, remotes_updated, remotes_deleted); - callback (to_upload, FALSE, user_data); + callback (to_upload, user_data); } static void diff --git a/src/ephy-shell.c b/src/ephy-shell.c index a715725aa..d91fbcbf7 100644 --- a/src/ephy-shell.c +++ b/src/ephy-shell.c @@ -320,6 +320,11 @@ register_synchronizable_managers (EphyShell *shell, g_assert (EPHY_IS_SYNC_SERVICE (service)); g_assert (EPHY_IS_SHELL (shell)); + if (ephy_sync_utils_history_sync_is_enabled ()) { + manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell)); + ephy_sync_service_register_manager (service, manager); + } + if (ephy_sync_utils_bookmarks_sync_is_enabled ()) { manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_bookmarks_manager (shell)); ephy_sync_service_register_manager (service, manager); @@ -330,11 +335,6 @@ register_synchronizable_managers (EphyShell *shell, ephy_sync_service_register_manager (service, manager); } - if (ephy_sync_utils_history_sync_is_enabled ()) { - manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell)); - ephy_sync_service_register_manager (service, manager); - } - if (ephy_sync_utils_open_tabs_sync_is_enabled ()) { manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_open_tabs_manager (shell)); ephy_sync_service_register_manager (service, manager); -- cgit v1.2.1