/* GIO - GLib Input, Output and Streaming Library * * Copyright (C) Carl-Anton Ingmarsson 2011 * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. * * Author: Carl-Anton Ingmarsson */ #include #include #include "gvfsafpconnection.h" /* * GVfsAfpName */ struct _GVfsAfpName { guint32 text_encoding; gchar *str; gsize len; gint ref_count; }; static void _g_vfs_afp_name_free (GVfsAfpName *afp_name) { g_free (afp_name->str); g_slice_free (GVfsAfpName, afp_name); } void g_vfs_afp_name_unref (GVfsAfpName *afp_name) { if (g_atomic_int_dec_and_test (&afp_name->ref_count)) _g_vfs_afp_name_free (afp_name); } void g_vfs_afp_name_ref (GVfsAfpName *afp_name) { g_atomic_int_inc (&afp_name->ref_count); } char * g_vfs_afp_name_get_string (GVfsAfpName *afp_name) { return g_utf8_normalize (afp_name->str, afp_name->len, G_NORMALIZE_DEFAULT_COMPOSE); } GVfsAfpName * g_vfs_afp_name_new (guint32 text_encoding, gchar *str, gsize len) { GVfsAfpName *afp_name; afp_name = g_slice_new (GVfsAfpName); afp_name->ref_count = 1; afp_name->text_encoding = text_encoding; afp_name->str = str; afp_name->len = len; return afp_name; } /* * GVfsAfpReply */ struct _GVfsAfpReplyClass { GObjectClass parent_class; }; struct _GVfsAfpReply { GObject parent_instance; AfpResultCode result_code; char *data; gsize len; gboolean free_data; goffset pos; }; G_DEFINE_TYPE (GVfsAfpReply, g_vfs_afp_reply, G_TYPE_OBJECT); static void g_vfs_afp_reply_init (GVfsAfpReply *reply) { reply->data = NULL; reply->len = 0; reply->pos = 0; } static void g_vfs_afp_reply_finalize (GObject *object) { GVfsAfpReply *reply = (GVfsAfpReply *)object; if (reply->free_data) g_free (reply->data); } static void g_vfs_afp_reply_class_init (GVfsAfpReplyClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); object_class->finalize = g_vfs_afp_reply_finalize; } static GVfsAfpReply * g_vfs_afp_reply_new (AfpResultCode result_code, char *data, gsize len, gboolean take_data) { GVfsAfpReply *reply; reply = g_object_new (G_VFS_TYPE_AFP_REPLY, NULL); reply->result_code = result_code; reply->len = len; reply->data = data; reply->free_data = take_data; return reply; } gboolean g_vfs_afp_reply_read_byte (GVfsAfpReply *reply, guint8 *byte) { if ((reply->len - reply->pos) < 1) return FALSE; if (byte) *byte = reply->data[reply->pos]; reply->pos++; return TRUE; } gboolean g_vfs_afp_reply_read_int64 (GVfsAfpReply *reply, gint64 *val) { if ((reply->len - reply->pos) < 8) return FALSE; if (val) *val = GINT64_FROM_BE (*((gint64 *)(reply->data + reply->pos))); reply->pos += 8; return TRUE; } gboolean g_vfs_afp_reply_read_int32 (GVfsAfpReply *reply, gint32 *val) { if ((reply->len - reply->pos) < 4) return FALSE; if (val) *val = GINT32_FROM_BE (*((gint32 *)(reply->data + reply->pos))); reply->pos += 4; return TRUE; } gboolean g_vfs_afp_reply_read_int16 (GVfsAfpReply *reply, gint16 *val) { if ((reply->len - reply->pos) < 2) return FALSE; if (val) *val = GINT16_FROM_BE (*((gint16 *)(reply->data + reply->pos))); reply->pos += 2; return TRUE; } gboolean g_vfs_afp_reply_read_uint64 (GVfsAfpReply *reply, guint64 *val) { if ((reply->len - reply->pos) < 8) return FALSE; if (val) *val = GUINT64_FROM_BE (*((guint64 *)(reply->data + reply->pos))); reply->pos += 8; return TRUE; } gboolean g_vfs_afp_reply_read_uint32 (GVfsAfpReply *reply, guint32 *val) { if ((reply->len - reply->pos) < 4) return FALSE; if (val) *val = GUINT32_FROM_BE (*((guint32 *)(reply->data + reply->pos))); reply->pos += 4; return TRUE; } gboolean g_vfs_afp_reply_read_uint16 (GVfsAfpReply *reply, guint16 *val) { if ((reply->len - reply->pos) < 2) return FALSE; if (val) *val = GUINT16_FROM_BE (*((guint16 *)(reply->data + reply->pos))); reply->pos += 2; return TRUE; } gboolean g_vfs_afp_reply_get_data (GVfsAfpReply *reply, gsize size, guint8 **data) { if ((reply->len - reply->pos) < size) return FALSE; if (data) *data = (guint8 *)(reply->data + reply->pos); reply->pos += size; return TRUE; } gboolean g_vfs_afp_reply_dup_data (GVfsAfpReply *reply, gsize size, guint8 **data) { if ((reply->len - reply->pos) < size) return FALSE; if (data) { *data = g_malloc (size); memcpy (*data, reply->data + reply->pos, size); } reply->pos += size; return TRUE; } gboolean g_vfs_afp_reply_read_pascal (GVfsAfpReply *reply, char **str) { guint8 strsize; if (!g_vfs_afp_reply_read_byte (reply, &strsize)) return FALSE; if (strsize > (reply->len - reply->pos)) { reply->pos--; return FALSE; } if (str) { *str = g_convert (reply->data + reply->pos, strsize, "UTF-8", "MACINTOSH", NULL, NULL, NULL); } reply->pos += strsize; return TRUE; } gboolean g_vfs_afp_reply_read_afp_name (GVfsAfpReply *reply, gboolean read_text_encoding, GVfsAfpName **afp_name) { gint old_pos; guint32 text_encoding; guint16 len; gchar *str; old_pos = reply->pos; if (read_text_encoding) { if (!g_vfs_afp_reply_read_uint32 (reply, &text_encoding)) return FALSE; } else text_encoding = 0; if (!g_vfs_afp_reply_read_uint16 (reply, &len)) { reply->pos = old_pos; return FALSE; } if (!g_vfs_afp_reply_get_data (reply, len, (guint8 **)&str)) { reply->pos = old_pos; return FALSE; } if (afp_name) *afp_name = g_vfs_afp_name_new (text_encoding, g_strndup (str, len), len); return TRUE; } gboolean g_vfs_afp_reply_seek (GVfsAfpReply *reply, goffset offset, GSeekType type) { goffset absolute; switch (type) { case G_SEEK_CUR: absolute = reply->pos + offset; break; case G_SEEK_SET: absolute = offset; break; case G_SEEK_END: absolute = reply->len + offset; break; default: return FALSE; } if (absolute < 0 || absolute >= reply->len) return FALSE; reply->pos = absolute; return TRUE; } gboolean g_vfs_afp_reply_skip_to_even (GVfsAfpReply *reply) { if ((reply->pos % 2) == 0) return TRUE; if ((reply->len - reply->pos) < 1) return FALSE; reply->pos++; return TRUE; } goffset g_vfs_afp_reply_get_pos (GVfsAfpReply *reply) { return reply->pos; } gsize g_vfs_afp_reply_get_size (GVfsAfpReply *reply) { return reply->len; } AfpResultCode g_vfs_afp_reply_get_result_code (GVfsAfpReply *reply) { return reply->result_code; } /* * GVfsAfpCommand */ struct _GVfsAfpCommandClass { GDataOutputStreamClass parent_class; }; struct _GVfsAfpCommand { GDataOutputStream parent_instance; AfpCommandType type; char *buf; gsize buf_size; }; G_DEFINE_TYPE (GVfsAfpCommand, g_vfs_afp_command, G_TYPE_DATA_OUTPUT_STREAM); static void g_vfs_afp_command_init (GVfsAfpCommand *comm) { } static void g_vfs_afp_command_class_init (GVfsAfpCommandClass *klass) { } GVfsAfpCommand * g_vfs_afp_command_new (AfpCommandType type) { GOutputStream *mem_stream; GVfsAfpCommand *comm; mem_stream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free); comm = g_object_new (G_VFS_TYPE_AFP_COMMAND, "base-stream", mem_stream, NULL); g_object_unref (mem_stream); comm->type = type; g_vfs_afp_command_put_byte (comm, type); return comm; } void g_vfs_afp_command_put_byte (GVfsAfpCommand *comm, guint8 byte) { g_data_output_stream_put_byte (G_DATA_OUTPUT_STREAM (comm), byte, NULL, NULL); } void g_vfs_afp_command_put_int16 (GVfsAfpCommand *comm, gint16 val) { g_data_output_stream_put_int16 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_int32 (GVfsAfpCommand *comm, gint32 val) { g_data_output_stream_put_int32 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_int64 (GVfsAfpCommand *comm, gint64 val) { g_data_output_stream_put_int64 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_uint16 (GVfsAfpCommand *comm, guint16 val) { g_data_output_stream_put_uint16 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_uint32 (GVfsAfpCommand *comm, guint32 val) { g_data_output_stream_put_uint32 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_uint64 (GVfsAfpCommand *comm, guint64 val) { g_data_output_stream_put_uint64 (G_DATA_OUTPUT_STREAM (comm), val, NULL, NULL); } void g_vfs_afp_command_put_pascal (GVfsAfpCommand *comm, const char *str) { size_t len; len = MIN (strlen (str), 256); g_vfs_afp_command_put_byte (comm, len); g_output_stream_write (G_OUTPUT_STREAM (comm), str, len, NULL, NULL); } void g_vfs_afp_command_put_afp_name (GVfsAfpCommand *comm, GVfsAfpName *afp_name) { g_vfs_afp_command_put_uint32 (comm, afp_name->text_encoding); g_vfs_afp_command_put_uint16 (comm, afp_name->len); if (afp_name->len > 0) { g_output_stream_write_all (G_OUTPUT_STREAM (comm), afp_name->str, afp_name->len, NULL, NULL, NULL); } } static GVfsAfpName * filename_to_afp_pathname (const char *filename) { gsize len; char *str; gint i; while (*filename == '/') filename++; len = strlen (filename); str = g_malloc (len); for (i = 0; i < len; i++) { if (filename[i] == '/') str[i] = '\0'; else str[i] = filename[i]; } return g_vfs_afp_name_new (0x08000103, str, len); } void g_vfs_afp_command_put_pathname (GVfsAfpCommand *comm, const char *filename) { GVfsAfpName *pathname; /* PathType */ g_vfs_afp_command_put_byte (comm, AFP_PATH_TYPE_UTF8_NAME); /* Pathname */ pathname = filename_to_afp_pathname (filename); g_vfs_afp_command_put_afp_name (comm, pathname); g_vfs_afp_name_unref (pathname); } void g_vfs_afp_command_pad_to_even (GVfsAfpCommand *comm) { if (g_vfs_afp_command_get_size (comm) % 2 == 1) g_vfs_afp_command_put_byte (comm, 0); } gsize g_vfs_afp_command_get_size (GVfsAfpCommand *comm) { GMemoryOutputStream *mem_stream; mem_stream = G_MEMORY_OUTPUT_STREAM (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (comm))); return g_memory_output_stream_get_data_size (mem_stream); } char * g_vfs_afp_command_get_data (GVfsAfpCommand *comm) { GMemoryOutputStream *mem_stream; mem_stream = G_MEMORY_OUTPUT_STREAM (g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (comm))); return g_memory_output_stream_get_data (mem_stream); } void g_vfs_afp_command_set_buffer (GVfsAfpCommand *comm, char *buf, gsize size) { g_return_if_fail (buf != NULL); g_return_if_fail (size > 0); comm->buf = buf; comm->buf_size = size; } /* * GVfsAfpConnection */ enum { ATTENTION, LAST_SIGNAL }; static guint signals[LAST_SIGNAL] = {0,}; G_DEFINE_TYPE (GVfsAfpConnection, g_vfs_afp_connection, G_TYPE_OBJECT); typedef struct { guint8 flags; guint8 command; guint16 requestID; union { guint32 errorCode; guint32 writeOffset; }; guint32 totalDataLength; guint32 reserved; } DSIHeader; struct _GVfsAfpConnectionPrivate { GSocketConnectable *addr; GIOStream *conn; guint16 request_id; guint32 kRequestQuanta; guint32 kServerReplayCacheSize; GQueue *request_queue; GHashTable *request_hash; /* send loop */ gboolean send_loop_running; DSIHeader write_dsi_header; /* read loop */ gboolean read_loop_running; DSIHeader read_dsi_header; char *reply_buf; gboolean free_reply_buf; }; typedef enum { DSI_CLOSE_SESSION = 1, DSI_COMMAND = 2, DSI_GET_STATUS = 3, DSI_OPEN_SESSION = 4, DSI_TICKLE = 5, DSI_WRITE = 6, DSI_ATTENTION = 8 } DsiCommand; typedef enum { REQUEST_TYPE_COMMAND, REQUEST_TYPE_TICKLE } RequestType; typedef struct { RequestType type; GVfsAfpCommand *command; char *reply_buf; GSimpleAsyncResult *simple; GCancellable *cancellable; } RequestData; static void free_request_data (RequestData *req_data) { if (req_data->command) g_object_unref (req_data->command); if (req_data->simple) g_object_unref (req_data->simple); if (req_data->cancellable) g_object_unref (req_data->cancellable); g_slice_free (RequestData, req_data); } static void g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv; afp_connection->priv = priv = G_TYPE_INSTANCE_GET_PRIVATE (afp_connection, G_VFS_TYPE_AFP_CONNECTION, GVfsAfpConnectionPrivate); priv->addr = NULL; priv->conn = NULL; priv->request_id = 0; priv->kRequestQuanta = -1; priv->kServerReplayCacheSize = -1; priv->request_queue = g_queue_new (); priv->request_hash = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (GDestroyNotify)free_request_data); priv->send_loop_running = FALSE; priv->read_loop_running = FALSE; } static void g_vfs_afp_connection_finalize (GObject *object) { GVfsAfpConnection *afp_connection = (GVfsAfpConnection *)object; GVfsAfpConnectionPrivate *priv = afp_connection->priv; if (priv->addr) g_object_unref (priv->addr); if (priv->conn) g_object_unref (priv->conn); G_OBJECT_CLASS (g_vfs_afp_connection_parent_class)->finalize (object); } static void g_vfs_afp_connection_class_init (GVfsAfpConnectionClass *klass) { GObjectClass* object_class = G_OBJECT_CLASS (klass); g_type_class_add_private (klass, sizeof (GVfsAfpConnectionPrivate)); object_class->finalize = g_vfs_afp_connection_finalize; signals[ATTENTION] = g_signal_new ("attention", G_TYPE_FROM_CLASS (object_class), G_SIGNAL_RUN_LAST | G_SIGNAL_NO_RECURSE | G_SIGNAL_NO_HOOKS, 0, NULL, NULL, g_cclosure_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT); } static void read_reply (GVfsAfpConnection *afp_connection); static void send_request (GVfsAfpConnection *afp_connection); static guint16 get_request_id (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; return priv->request_id++; } static void run_loop (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; if (!priv->send_loop_running) { priv->send_loop_running = TRUE; send_request (afp_connection); } if (!priv->read_loop_running) { priv->read_loop_running = TRUE; read_reply (afp_connection); } } typedef struct { void *buffer; gsize count; int io_priority; GCancellable *cancellable; gsize bytes_read; } ReadAllData; static void free_read_all_data (ReadAllData *read_data) { if (read_data->cancellable) g_object_unref (read_data->cancellable); g_slice_free (ReadAllData, read_data); } static void read_all_cb (GObject *source_object, GAsyncResult *res, gpointer user_data) { GInputStream *stream = G_INPUT_STREAM (source_object); GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); gsize bytes_read; GError *err = NULL; ReadAllData *read_data; bytes_read = g_input_stream_read_finish (stream, res, &err); if (bytes_read == -1) { g_simple_async_result_take_error (simple, err); goto done; } read_data = g_simple_async_result_get_op_res_gpointer (simple); read_data->bytes_read += bytes_read; if (read_data->bytes_read < read_data->count) { g_input_stream_read_async (stream, (guint8 *)read_data->buffer + read_data->bytes_read, read_data->count - read_data->bytes_read, 0, read_data->cancellable, read_all_cb, simple); return; } done: g_simple_async_result_complete (simple); g_object_unref (simple); } static void read_all_async (GInputStream *stream, void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { ReadAllData *read_data; GSimpleAsyncResult *simple; read_data = g_slice_new0 (ReadAllData); read_data->buffer = buffer; read_data->count = count; read_data->io_priority = io_priority; if (cancellable) read_data->cancellable = g_object_ref (cancellable); simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, read_all_async); g_simple_async_result_set_op_res_gpointer (simple, read_data, (GDestroyNotify)free_read_all_data); g_input_stream_read_async (stream, buffer, count, io_priority, cancellable, read_all_cb, simple); } static gboolean read_all_finish (GInputStream *stream, GAsyncResult *res, gsize *bytes_read, GError **error) { GSimpleAsyncResult *simple; g_return_val_if_fail (g_simple_async_result_is_valid (res, G_OBJECT (stream), read_all_async), FALSE); simple = (GSimpleAsyncResult *)res; if (g_simple_async_result_propagate_error (simple, error)) return FALSE; if (bytes_read) { ReadAllData *read_data; read_data = g_simple_async_result_get_op_res_gpointer (simple); *bytes_read = read_data->bytes_read; } return TRUE; } static void dispatch_reply (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; DSIHeader *dsi_header = &priv->read_dsi_header; switch (dsi_header->command) { case DSI_CLOSE_SESSION: { g_warning ("Server closed session\n"); break; } case DSI_TICKLE: { RequestData *req_data; /* Send back a tickle message */ req_data = g_slice_new0 (RequestData); req_data->type = REQUEST_TYPE_TICKLE; g_queue_push_head (priv->request_queue, req_data); run_loop (afp_connection); break; } case DSI_ATTENTION: { guint8 attention_code; attention_code = priv->reply_buf[0] >> 4; g_signal_emit (afp_connection, signals[ATTENTION], 0, attention_code); break; } case DSI_COMMAND: case DSI_WRITE: { RequestData *req_data; req_data = g_hash_table_lookup (priv->request_hash, GUINT_TO_POINTER ((guint)dsi_header->requestID)); if (req_data) { GVfsAfpReply *reply; reply = g_vfs_afp_reply_new (dsi_header->errorCode, priv->reply_buf, dsi_header->totalDataLength, priv->free_reply_buf); priv->free_reply_buf = FALSE; g_simple_async_result_set_op_res_gpointer (req_data->simple, reply, g_object_unref); g_simple_async_result_complete (req_data->simple); g_hash_table_remove (priv->request_hash, GUINT_TO_POINTER ((guint)dsi_header->requestID)); } break; } default: g_assert_not_reached (); } } static void read_data_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GInputStream *input = G_INPUT_STREAM (object); GVfsAfpConnection *afp_connection = G_VFS_AFP_CONNECTION (user_data); GVfsAfpConnectionPrivate *priv = afp_connection->priv; gboolean result; GError *err = NULL; result = read_all_finish (input, res, NULL, &err); if (!result) { g_warning ("FAIL!!! \"%s\"\n", err->message); g_error_free (err); } dispatch_reply (afp_connection); if (priv->free_reply_buf) g_free (priv->reply_buf); read_reply (afp_connection); } static void read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GInputStream *input = G_INPUT_STREAM (object); GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); GVfsAfpConnectionPrivate *priv = afp_conn->priv; gboolean result; GError *err = NULL; DSIHeader *dsi_header; result = read_all_finish (input, res, NULL, &err); if (!result) { g_warning ("FAIL!!! \"%s\"\n", err->message); g_error_free (err); } dsi_header = &priv->read_dsi_header; dsi_header->requestID = GUINT16_FROM_BE (dsi_header->requestID); dsi_header->errorCode = GUINT32_FROM_BE (dsi_header->errorCode); dsi_header->totalDataLength = GUINT32_FROM_BE (dsi_header->totalDataLength); if (dsi_header->totalDataLength > 0) { RequestData *req_data; req_data = g_hash_table_lookup (priv->request_hash, GUINT_TO_POINTER ((guint)dsi_header->requestID)); if (req_data && req_data->reply_buf) { priv->reply_buf = req_data->reply_buf; priv->free_reply_buf = FALSE; } else { priv->reply_buf = g_malloc (dsi_header->totalDataLength); priv->free_reply_buf = TRUE; } read_all_async (input, priv->reply_buf, dsi_header->totalDataLength, 0, NULL, read_data_cb, afp_conn); return; } dispatch_reply (afp_conn); read_reply (afp_conn); } static void read_reply (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; GInputStream *input; input = g_io_stream_get_input_stream (priv->conn); read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0, NULL, read_dsi_header_cb, afp_connection); } typedef struct { const void *buffer; gsize count; int io_priority; GCancellable *cancellable; gssize bytes_written; } WriteAllData; inline static void free_write_all_data (WriteAllData *write_data) { if (write_data->cancellable) g_object_unref (write_data->cancellable); g_slice_free (WriteAllData, write_data); } static void write_all_cb (GObject *source_object, GAsyncResult *res, gpointer user_data) { GOutputStream *stream = G_OUTPUT_STREAM (source_object); GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data); gssize bytes_written; GError *err = NULL; WriteAllData *write_data; bytes_written = g_output_stream_write_finish (stream, res, &err); if (bytes_written == -1) { g_simple_async_result_take_error (simple, err); goto done; } write_data = g_simple_async_result_get_op_res_gpointer (simple); write_data->bytes_written += bytes_written; if (write_data->bytes_written < write_data->count) { g_output_stream_write_async (stream, (const guint8 *)write_data->buffer + write_data->bytes_written, write_data->count - write_data->bytes_written, write_data->io_priority, write_data->cancellable, write_all_cb, simple); return; } done: g_simple_async_result_complete (simple); g_object_unref (simple); } static void write_all_async (GOutputStream *stream, const void *buffer, gsize count, int io_priority, GCancellable *cancellable, GAsyncReadyCallback callback, gpointer user_data) { GSimpleAsyncResult *simple; WriteAllData *write_data; write_data = g_slice_new0 (WriteAllData); write_data->buffer = buffer; write_data->count = count; write_data->io_priority = io_priority; if (cancellable) write_data->cancellable = g_object_ref (cancellable); simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, write_all_async); g_simple_async_result_set_op_res_gpointer (simple, write_data, (GDestroyNotify)free_write_all_data); g_output_stream_write_async (stream, buffer, count, io_priority, cancellable, write_all_cb, simple); } static gboolean write_all_finish (GOutputStream *stream, GAsyncResult *res, gsize *bytes_written, GError **error) { GSimpleAsyncResult *simple; g_return_val_if_fail (g_simple_async_result_is_valid (res, G_OBJECT (stream), write_all_async), FALSE); simple = (GSimpleAsyncResult *)res; if (g_simple_async_result_propagate_error (simple, error)) return FALSE; if (bytes_written) { WriteAllData *write_data; write_data = g_simple_async_result_get_op_res_gpointer (simple); *bytes_written = write_data->bytes_written; } return TRUE; } static void remove_first (GQueue *request_queue) { RequestData *req_data; req_data = g_queue_pop_head (request_queue); free_request_data (req_data); } #define HANDLE_RES() { \ gboolean result; \ GError *err = NULL; \ \ result = write_all_finish (output, res, NULL, &err); \ if (!result) \ { \ if (req_data->simple) \ { \ g_simple_async_result_set_from_error (req_data->simple, err); \ g_simple_async_result_complete (req_data->simple); \ } \ \ remove_first (priv->request_queue); \ g_error_free (err); \ \ send_request (afp_conn); \ return; \ } \ } static void write_buf_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); GVfsAfpConnectionPrivate *priv = afp_conn->priv; RequestData *req_data; req_data = g_queue_peek_head (priv->request_queue); HANDLE_RES (); g_hash_table_insert (priv->request_hash, GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)), req_data); g_queue_pop_head (priv->request_queue); send_request (afp_conn); } static void write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); GVfsAfpConnectionPrivate *priv = afp_conn->priv; RequestData *req_data; req_data = g_queue_peek_head (priv->request_queue); HANDLE_RES (); if (priv->write_dsi_header.command == DSI_WRITE && req_data->command->buf) { write_all_async (output, req_data->command->buf, req_data->command->buf_size, 0, NULL, write_buf_cb, afp_conn); return; } g_hash_table_insert (priv->request_hash, GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)), req_data); g_queue_pop_head (priv->request_queue); send_request (afp_conn); } static void write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data) { GOutputStream *output = G_OUTPUT_STREAM (object); GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data); GVfsAfpConnectionPrivate *priv = afp_conn->priv; RequestData *req_data; char *data; gsize size; req_data = g_queue_peek_head (priv->request_queue); HANDLE_RES (); if (req_data->type == REQUEST_TYPE_TICKLE) { remove_first (priv->request_queue); send_request (afp_conn); return; } data = g_vfs_afp_command_get_data (req_data->command); size = g_vfs_afp_command_get_size (req_data->command); write_all_async (output, data, size, 0, NULL, write_command_cb, afp_conn); } static void send_request (GVfsAfpConnection *afp_connection) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; RequestData *req_data; guint32 writeOffset; guint8 dsi_command; while ((req_data = g_queue_peek_head (priv->request_queue))) { if (req_data->cancellable && g_cancellable_is_cancelled (req_data->cancellable)) { if (req_data->simple) { GError *err = NULL; g_cancellable_set_error_if_cancelled (req_data->cancellable, &err); g_simple_async_result_take_error (req_data->simple, err); g_simple_async_result_complete (req_data->simple); } remove_first (priv->request_queue); } else break; } if (!req_data) { priv->send_loop_running = FALSE; return; } switch (req_data->type) { case REQUEST_TYPE_TICKLE: priv->write_dsi_header.flags = 0x00; priv->write_dsi_header.command = DSI_TICKLE; priv->write_dsi_header.requestID = GUINT16_TO_BE (get_request_id (afp_connection)); priv->write_dsi_header.writeOffset = 0; priv->write_dsi_header.totalDataLength = 0; priv->write_dsi_header.reserved = 0; break; case REQUEST_TYPE_COMMAND: { gsize size; switch (req_data->command->type) { case AFP_COMMAND_WRITE: writeOffset = 8; dsi_command = DSI_WRITE; break; case AFP_COMMAND_WRITE_EXT: writeOffset = 20; dsi_command = DSI_WRITE; break; default: writeOffset = 0; dsi_command = DSI_COMMAND; break; } priv->write_dsi_header.flags = 0x00; priv->write_dsi_header.command = dsi_command; priv->write_dsi_header.requestID = GUINT16_TO_BE (get_request_id (afp_connection)); priv->write_dsi_header.writeOffset = GUINT32_TO_BE (writeOffset); /* totalDataLength */ size = g_vfs_afp_command_get_size (req_data->command); if (dsi_command == DSI_WRITE && req_data->command->buf) size += req_data->command->buf_size; priv->write_dsi_header.totalDataLength = GUINT32_TO_BE (size); priv->write_dsi_header.reserved = 0; break; } default: g_assert_not_reached (); } write_all_async (g_io_stream_get_output_stream (priv->conn), &priv->write_dsi_header, sizeof (DSIHeader), 0, NULL, write_dsi_header_cb, afp_connection); } void g_vfs_afp_connection_send_command (GVfsAfpConnection *afp_connection, GVfsAfpCommand *command, char *reply_buf, GAsyncReadyCallback callback, GCancellable *cancellable, gpointer user_data) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; RequestData *req_data; req_data = g_slice_new0 (RequestData); req_data->type = REQUEST_TYPE_COMMAND; req_data->command = g_object_ref (command); req_data->reply_buf = reply_buf; req_data->simple = g_simple_async_result_new (G_OBJECT (afp_connection), callback, user_data, g_vfs_afp_connection_send_command); if (cancellable) req_data->cancellable = g_object_ref (cancellable); g_queue_push_tail (priv->request_queue, req_data); run_loop (afp_connection); } GVfsAfpReply * g_vfs_afp_connection_send_command_finish (GVfsAfpConnection *afp_connection, GAsyncResult *res, GError **error) { GSimpleAsyncResult *simple; g_return_val_if_fail (g_simple_async_result_is_valid (res, G_OBJECT (afp_connection), g_vfs_afp_connection_send_command), NULL); simple = (GSimpleAsyncResult *)res; if (g_simple_async_result_propagate_error (simple, error)) return NULL; return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple)); } static gboolean read_reply_sync (GInputStream *input, DSIHeader *dsi_header, char **data, GCancellable *cancellable, GError **error) { gboolean res; gsize read_count, bytes_read; g_assert (dsi_header != NULL); read_count = sizeof (DSIHeader); res = g_input_stream_read_all (input, dsi_header, read_count, &bytes_read, cancellable, error); if (!res) return FALSE; if (bytes_read < read_count) { g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Connection unexpectedly went down")); return FALSE; } dsi_header->requestID = GUINT16_FROM_BE (dsi_header->requestID); dsi_header->errorCode = GUINT32_FROM_BE (dsi_header->errorCode); dsi_header->totalDataLength = GUINT32_FROM_BE (dsi_header->totalDataLength); if (dsi_header->totalDataLength == 0) { *data = NULL; return TRUE; } *data = g_malloc (dsi_header->totalDataLength); read_count = dsi_header->totalDataLength; res = g_input_stream_read_all (input, *data, read_count, &bytes_read, cancellable, error); if (!res) { g_free (*data); return FALSE; } if (bytes_read < read_count) { g_free (*data); g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Got EOS")); return FALSE; } return TRUE; } GVfsAfpReply * g_vfs_afp_connection_read_reply_sync (GVfsAfpConnection *afp_connection, GCancellable *cancellable, GError **error) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; gboolean res; char *data; DSIHeader dsi_header; res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), &dsi_header, &data, cancellable, error); if (!res) return NULL; return g_vfs_afp_reply_new (dsi_header.errorCode, data, dsi_header.totalDataLength, TRUE); } static gboolean send_request_sync (GOutputStream *output, DsiCommand command, guint16 request_id, guint32 writeOffset, gsize len, const char *data, GCancellable *cancellable, GError **error) { DSIHeader dsi_header; gboolean res; gsize write_count, bytes_written; dsi_header.flags = 0x00; dsi_header.command = command; dsi_header.requestID = GUINT16_TO_BE (request_id); dsi_header.writeOffset = GUINT32_TO_BE (writeOffset); dsi_header.totalDataLength = GUINT32_TO_BE (len); dsi_header.reserved = 0; write_count = sizeof (DSIHeader); res = g_output_stream_write_all (output, &dsi_header, write_count, &bytes_written, cancellable, error); if (!res) return FALSE; if (data == NULL) return TRUE; write_count = len; res = g_output_stream_write_all (output, data, write_count, &bytes_written, cancellable, error); if (!res) return FALSE; return TRUE; } gboolean g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection, GVfsAfpCommand *afp_command, GCancellable *cancellable, GError **error) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; DsiCommand dsi_command; guint16 req_id; guint32 writeOffset; /* set dsi_command */ switch (afp_command->type) { case AFP_COMMAND_WRITE: writeOffset = 8; dsi_command = DSI_WRITE; break; case AFP_COMMAND_WRITE_EXT: writeOffset = 20; dsi_command = DSI_WRITE; break; default: writeOffset = 0; dsi_command = DSI_COMMAND; break; } req_id = get_request_id (afp_connection); return send_request_sync (g_io_stream_get_output_stream (priv->conn), dsi_command, req_id, writeOffset, g_vfs_afp_command_get_size (afp_command), g_vfs_afp_command_get_data (afp_command), cancellable, error); } gboolean g_vfs_afp_connection_close (GVfsAfpConnection *afp_connection, GCancellable *cancellable, GError **error) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; guint16 req_id; gboolean res; /* close DSI session */ req_id = get_request_id (afp_connection); res = send_request_sync (g_io_stream_get_output_stream (priv->conn), DSI_CLOSE_SESSION, req_id, 0, 0, NULL, cancellable, error); if (!res) { g_io_stream_close (priv->conn, cancellable, NULL); g_object_unref (priv->conn); return FALSE; } res = g_io_stream_close (priv->conn, cancellable, error); g_object_unref (priv->conn); return res; } gboolean g_vfs_afp_connection_open (GVfsAfpConnection *afp_connection, GCancellable *cancellable, GError **error) { GVfsAfpConnectionPrivate *priv = afp_connection->priv; GSocketClient *client; guint16 req_id; gboolean res; char *reply; DSIHeader dsi_header; guint pos; client = g_socket_client_new (); priv->conn = G_IO_STREAM (g_socket_client_connect (client, priv->addr, cancellable, error)); g_object_unref (client); if (!priv->conn) return FALSE; req_id = get_request_id (afp_connection); res = send_request_sync (g_io_stream_get_output_stream (priv->conn), DSI_OPEN_SESSION, req_id, 0, 0, NULL, cancellable, error); if (!res) return FALSE; res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), &dsi_header, &reply, cancellable, error); if (!res) return FALSE; pos = 0; while ((dsi_header.totalDataLength - pos) > 2) { guint8 optionType; guint8 optionLength; optionType = reply[pos++]; optionLength = reply[pos++]; switch (optionType) { case 0x00: if (optionLength == 4 && (dsi_header.totalDataLength - pos) >= 4) priv->kRequestQuanta = GUINT32_FROM_BE (*(guint32 *)(reply + pos)); break; case 0x02: if (optionLength == 4 && (dsi_header.totalDataLength - pos) >= 4) priv->kServerReplayCacheSize = GUINT32_FROM_BE (*(guint32 *)(reply + pos)); break; default: g_debug ("Unknown DSI option\n"); } pos += optionLength; } g_free (reply); return TRUE; } GVfsAfpConnection * g_vfs_afp_connection_new (GSocketConnectable *addr) { GVfsAfpConnection *afp_connection; GVfsAfpConnectionPrivate *priv; afp_connection = g_object_new (G_VFS_TYPE_AFP_CONNECTION, NULL); priv = afp_connection->priv; priv->addr = g_object_ref (addr); return afp_connection; } GVfsAfpReply * g_vfs_afp_query_server_info (GSocketConnectable *addr, GCancellable *cancellable, GError **error) { GSocketClient *client; GIOStream *conn; gboolean res; DSIHeader dsi_header; char *data; client = g_socket_client_new (); conn = G_IO_STREAM (g_socket_client_connect (client, addr, cancellable, error)); g_object_unref (client); if (!conn) return NULL; res = send_request_sync (g_io_stream_get_output_stream (conn), DSI_GET_STATUS, 0, 0, 0, NULL, cancellable, error); if (!res) { g_object_unref (conn); return NULL; } res = read_reply_sync (g_io_stream_get_input_stream (conn), &dsi_header, &data, cancellable, error); if (!res) { g_object_unref (conn); return NULL; } g_object_unref (conn); return g_vfs_afp_reply_new (dsi_header.errorCode, data, dsi_header.totalDataLength, TRUE); }