diff options
author | Thomas Haller <thaller@redhat.com> | 2020-11-05 10:56:37 +0100 |
---|---|---|
committer | Thomas Haller <thaller@redhat.com> | 2020-11-14 13:17:07 +0100 |
commit | a32045b27812c98e3794177957bc2d9533f8e4a5 (patch) | |
tree | d29433961c444de1c6262f6ac2632046093a00a6 | |
parent | 7291f4667797c141ff7ac3d4cda93cf9ab193c55 (diff) | |
download | NetworkManager-th/ovs-external-ids-3.tar.gz |
core/ovs: rework IO in "nm-ovsdb.c"th/ovs-external-ids-3
-rw-r--r-- | src/devices/ovs/nm-ovsdb.c | 742 |
1 files changed, 497 insertions, 245 deletions
diff --git a/src/devices/ovs/nm-ovsdb.c b/src/devices/ovs/nm-ovsdb.c index 61875bdcd0..d4708e9143 100644 --- a/src/devices/ovs/nm-ovsdb.c +++ b/src/devices/ovs/nm-ovsdb.c @@ -7,25 +7,41 @@ #include "nm-ovsdb.h" -#include <gmodule.h> -#include <gio/gunixsocketaddress.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <sys/un.h> +#include "devices/nm-device.h" +#include "nm-core-internal.h" +#include "nm-core-utils.h" +#include "nm-glib-aux/nm-io-utils.h" #include "nm-glib-aux/nm-jansson.h" #include "nm-glib-aux/nm-str-buf.h" -#include "nm-core-utils.h" -#include "nm-core-internal.h" -#include "devices/nm-device.h" #include "nm-setting-ovs-external-ids.h" +#if JANSSON_VERSION_HEX < 0x020400 + #warning "requires at least libjansson 2.4" +#endif + /*****************************************************************************/ +#define SOCK_RD_TIMEOUT_MSEC 5000 +#define SOCK_WR_TIMEOUT_MSEC 5000 + #define OVSDB_MAX_FAILURES 3 +#define OPENVSWITCH_DB_SOCK_PATH RUNSTATEDIR "/openvswitch/db.sock" + /*****************************************************************************/ -#if JANSSON_VERSION_HEX < 0x020400 - #warning "requires at least libjansson 2.4" -#endif +typedef enum _nm_packed { + FD_STATE_NONE, + FD_STATE_CONNECT, + FD_STATE_IN, + FD_STATE_INOUT, +} FdState; typedef struct { char * port_uuid; @@ -111,14 +127,14 @@ enum { DEVICE_ADDED, DEVICE_REMOVED, INTERFACE_FAILED, LAST_SIGNAL }; static guint signals[LAST_SIGNAL] = {0}; typedef struct { - GSocketClient * client; - GSocketConnection *conn; - GCancellable * cancellable; - char buf[4096]; /* Input buffer */ - size_t bufp; /* Last decoded byte in the input buffer. */ - GString * input; /* JSON stream waiting for decoding. */ - GString * output; /* JSON stream to be sent. */ - guint64 call_id_counter; + GSource *sock_source; + GSource *sock_wr_timeout_source; + GSource *sock_rd_timeout_source; + NMStrBuf rd_buf; + NMStrBuf wr_buf; + gsize rd_buf_offset; + gsize wr_buf_offset; + guint64 call_id_counter; CList calls_lst_head; @@ -126,7 +142,9 @@ typedef struct { GHashTable *ports; /* port uuid => OpenvswitchPort */ GHashTable *bridges; /* bridge uuid => OpenvswitchBridge */ char * db_uuid; + int sock_fd; guint num_failures; + FdState sock_fd_state; } NMOvsdbPrivate; struct _NMOvsdb { @@ -146,11 +164,13 @@ NM_DEFINE_SINGLETON_GETTER(NMOvsdb, nm_ovsdb_get, NM_TYPE_OVSDB); /*****************************************************************************/ -static void ovsdb_try_connect(NMOvsdb *self); -static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing); -static void ovsdb_read(NMOvsdb *self); -static void ovsdb_write(NMOvsdb *self); -static void ovsdb_next_command(NMOvsdb *self); +static void ovsdb_try_connect(NMOvsdb *self); +static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing); +static void ovsdb_disconnect_io_error(NMOvsdb *self); +static gboolean _sock_fd_handle_write(NMOvsdb *self, gboolean write_is_ready); +static void ovsdb_send_msg(NMOvsdb *self, const char *str, gssize len); +static void ovsdb_next_command(NMOvsdb *self); +static gboolean _sock_fd_set_state(NMOvsdb *self, FdState fd_state); /*****************************************************************************/ @@ -349,6 +369,37 @@ _signal_emit_interface_failed(NMOvsdb * self, /*****************************************************************************/ +static gboolean +_output_buffer_append_json(NMOvsdb *self, const char *str, gssize len) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + NMStrBuf * wr_buf = &priv->wr_buf; + gboolean was_empty; + gsize len_u; + + if (len < 0) + len_u = strlen(str); + else + len_u = len; + + if ((wr_buf->allocated - wr_buf->len) < len_u) { + /* It seems we need to reallocate the buffer. But first + * try to avoid that by erasing the part we already + * sent. */ + if (priv->wr_buf_offset > 0) { + nm_str_buf_erase(wr_buf, 0, priv->wr_buf_offset, FALSE); + priv->wr_buf_offset = 0; + } + } + + was_empty = ((wr_buf->len - priv->wr_buf_offset) == 0); + + nm_str_buf_append_len(wr_buf, str, len_u); + return was_empty; +} + +/*****************************************************************************/ + /** * ovsdb_call_method: * @@ -1248,10 +1299,10 @@ ovsdb_next_command(NMOvsdb *self) { NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self); OvsdbMethodCall * call; - char * cmd; - nm_auto_decref_json json_t *msg = NULL; + nm_auto_decref_json json_t *msg = NULL; + gs_free char * msg_as_str = NULL; - if (!priv->conn) + if (priv->sock_fd < 0) return; if (c_list_is_empty(&priv->calls_lst_head)) @@ -1369,12 +1420,9 @@ ovsdb_next_command(NMOvsdb *self) g_return_if_fail(msg); - cmd = json_dumps(msg, 0); - _LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, cmd); - g_string_append(priv->output, cmd); - free(cmd); - - ovsdb_write(self); + msg_as_str = json_dumps(msg, 0); + _LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, msg_as_str); + ovsdb_send_msg(self, msg_as_str, -1); } /** @@ -1523,14 +1571,14 @@ _external_ids_to_string(const GArray *arr) /*****************************************************************************/ /** - * ovsdb_got_update: + * ovsdb_got_msg_update: * * Called when we've got an "update" method call (we asked for it with the monitor * command). We use it to maintain a consistent view of bridge list regardless of * whether the changes are done by us or externally. */ static void -ovsdb_got_update(NMOvsdb *self, json_t *msg) +ovsdb_got_msg_update(NMOvsdb *self, json_t *msg) { NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); json_t * ovs = NULL; @@ -1907,28 +1955,20 @@ ovsdb_got_update(NMOvsdb *self, json_t *msg) } /** - * ovsdb_got_echo: + * ovsdb_got_msg_echo: * * Only implemented because the specification mandates it. Actual ovsdb hasn't been * seen doing this. */ static void -ovsdb_got_echo(NMOvsdb *self, json_int_t id, json_t *data) +ovsdb_got_msg_echo(NMOvsdb *self, json_int_t id, json_t *data) { - NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self); - nm_auto_decref_json json_t *msg = NULL; - char * reply; - gboolean output_was_empty; - - output_was_empty = priv->output->len == 0; + nm_auto_decref_json json_t *msg = NULL; + gs_free char * msg_as_str = NULL; - msg = json_pack("{s:I, s:O}", "id", id, "result", data); - reply = json_dumps(msg, 0); - g_string_append(priv->output, reply); - free(reply); - - if (output_was_empty) - ovsdb_write(self); + msg = json_pack("{s:I, s:O}", "id", id, "result", data); + msg_as_str = json_dumps(msg, 0); + ovsdb_send_msg(self, msg_as_str, -1); } /** @@ -1982,15 +2022,16 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg) return; } - if (nm_streq0(method, "update")) { - /* This is a update method call. */ - ovsdb_got_update(self, json_array_get(params, 1)); - } else if (nm_streq0(method, "echo")) { - /* This is an echo request. */ - ovsdb_got_echo(self, id, params); - } else { - _LOGW("got an unknown method call: '%s'", method); + if (nm_streq(method, "update")) { + ovsdb_got_msg_update(self, json_array_get(params, 1)); + return; + } + if (nm_streq(method, "echo")) { + ovsdb_got_msg_echo(self, id, params); + return; } + + _LOGD("got an unknown method call: '%s'", method); return; } @@ -2030,155 +2071,22 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg) priv->num_failures = 0; - /* Don't progress further commands in case the callback hit an error - * and disconnected us. */ - if (!priv->conn) - return; - /* Now we're free to serialize and send the next command, if any. */ ovsdb_next_command(self); - return; } /* This is a message we are not interested in. */ - _LOGW("got an unknown message, ignoring"); + _LOGD("got an unknown message, ignoring"); } /*****************************************************************************/ -/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the - * ovsdb socket. */ - -static size_t -_json_callback(void *buffer, size_t buflen, void *user_data) -{ - NMOvsdb * self = NM_OVSDB(user_data); - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - - if (priv->bufp == priv->input->len) { - /* No more bytes buffered for decoding. */ - return 0; - } - - /* Pass one more byte to the JSON decoder. */ - *(char *) buffer = priv->input->str[priv->bufp]; - priv->bufp++; - - return (size_t) 1; -} - -/** - * ovsdb_read_cb: - * - * Read out the data available from the ovsdb socket and try to deserialize - * the JSON. If we see a complete object, pass it upwards to ovsdb_got_msg(). - */ static void -ovsdb_read_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) +ovsdb_send_msg(NMOvsdb *self, const char *str, gssize len) { - NMOvsdb * self = NM_OVSDB(user_data); - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GInputStream * stream = G_INPUT_STREAM(source_object); - GError * error = NULL; - gssize size; - json_t * msg; - json_error_t json_error = { - 0, - }; - - size = g_input_stream_read_finish(stream, res, &error); - if (size == -1) { - /* ovsdb-server was possibly restarted */ - _LOGW("short read from ovsdb: %s", error->message); - priv->num_failures++; - g_clear_error(&error); - ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); - return; - } - - g_string_append_len(priv->input, priv->buf, size); - do { - priv->bufp = 0; - /* The callback always eats up only up to a single byte. This makes - * it possible for us to identify complete JSON objects in spite of - * us not knowing the length in advance. */ - msg = json_load_callback(_json_callback, self, JSON_DISABLE_EOF_CHECK, &json_error); - if (msg) { - ovsdb_got_msg(self, msg); - g_string_erase(priv->input, 0, priv->bufp); - } - json_decref(msg); - } while (msg); - - if (!priv->conn) - return; - - if (size) - ovsdb_read(self); -} - -static void -ovsdb_read(NMOvsdb *self) -{ - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - - g_input_stream_read_async(g_io_stream_get_input_stream(G_IO_STREAM(priv->conn)), - priv->buf, - sizeof(priv->buf), - G_PRIORITY_DEFAULT, - NULL, - ovsdb_read_cb, - self); -} - -static void -ovsdb_write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) -{ - GOutputStream * stream = G_OUTPUT_STREAM(source_object); - NMOvsdb * self = NM_OVSDB(user_data); - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GError * error = NULL; - gssize size; - - size = g_output_stream_write_finish(stream, res, &error); - if (size == -1) { - /* ovsdb-server was possibly restarted */ - _LOGW("short write to ovsdb: %s", error->message); - priv->num_failures++; - g_clear_error(&error); - ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); - return; - } - - if (!priv->conn) - return; - - g_string_erase(priv->output, 0, size); - - ovsdb_write(self); -} - -static void -ovsdb_write(NMOvsdb *self) -{ - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GOutputStream * stream; - - if (!priv->output->len) - return; - - stream = g_io_stream_get_output_stream(G_IO_STREAM(priv->conn)); - if (g_output_stream_has_pending(stream)) - return; - - g_output_stream_write_async(stream, - priv->output->str, - priv->output->len, - G_PRIORITY_DEFAULT, - NULL, - ovsdb_write_cb, - self); + if (_output_buffer_append_json(self, str, len)) + _sock_fd_handle_write(self, FALSE); } /*****************************************************************************/ @@ -2201,7 +2109,7 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing) nm_assert(!retry || !is_disposing); - if (!priv->client) + if (priv->sock_fd < 0) return; _LOGD("disconnecting from ovsdb, retry %d", retry); @@ -2222,19 +2130,32 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing) _call_complete(call, NULL, error); } - priv->bufp = 0; - g_string_truncate(priv->input, 0); - g_string_truncate(priv->output, 0); - g_clear_object(&priv->client); - g_clear_object(&priv->conn); + nm_close(nm_steal_fd(&priv->sock_fd)); + nm_clear_g_source_inst(&priv->sock_source); + nm_clear_g_source_inst(&priv->sock_rd_timeout_source); + nm_clear_g_source_inst(&priv->sock_wr_timeout_source); + + priv->rd_buf_offset = 0; + priv->wr_buf_offset = 0; + nm_str_buf_reset(&priv->rd_buf, NULL); + nm_str_buf_reset(&priv->wr_buf, NULL); + nm_clear_g_free(&priv->db_uuid); - nm_clear_g_cancellable(&priv->cancellable); if (retry) ovsdb_try_connect(self); } static void +ovsdb_disconnect_io_error(NMOvsdb *self) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + + priv->num_failures++; + ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); +} + +static void _monitor_bridges_cb(NMOvsdb *self, json_t *result, GError *error, gpointer user_data) { if (error) { @@ -2247,34 +2168,330 @@ _monitor_bridges_cb(NMOvsdb *self, json_t *result, GError *error, gpointer user_ /* Treat the first response the same as the subsequent "update" * messages we eventually get. */ - ovsdb_got_update(self, result); + ovsdb_got_msg_update(self, result); } -static void -_client_connect_cb(GObject *source_object, GAsyncResult *res, gpointer user_data) +/*****************************************************************************/ + +static gboolean +_sock_fd_timeout_rd(gpointer user_data) { - GSocketClient * client = G_SOCKET_CLIENT(source_object); - NMOvsdb * self = NM_OVSDB(user_data); - NMOvsdbPrivate * priv; - GError * error = NULL; - GSocketConnection *conn; - - conn = g_socket_client_connect_finish(client, res, &error); - if (conn == NULL) { - if (!g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) - _LOGI("%s", error->message); + NMOvsdb *self = user_data; - ovsdb_disconnect(self, FALSE, FALSE); - g_clear_error(&error); - return; + _LOGW("socket: timeout receiving message from ovsdb"); + ovsdb_disconnect_io_error(self); + return G_SOURCE_CONTINUE; +} + +static gboolean +_sock_fd_timeout_wr(gpointer user_data) +{ + NMOvsdb *self = user_data; + + _LOGW("socket: timeout sending message to ovsdb"); + ovsdb_disconnect_io_error(self); + return G_SOURCE_CONTINUE; +} + +static gboolean +_sock_fd_handle_write(NMOvsdb *self, gboolean force_write) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + NMStrBuf * wr_buf = &priv->wr_buf; + + nm_assert(priv->sock_fd >= 0); + nm_assert(NM_IN_SET(priv->sock_fd_state, FD_STATE_CONNECT, FD_STATE_IN, FD_STATE_INOUT)); + nm_assert(wr_buf->len > 0); + nm_assert(priv->wr_buf_offset < wr_buf->len); + + if (priv->sock_fd_state == FD_STATE_CONNECT) { + /* we are still connecting. Write is not yet ready. */ + return TRUE; } - priv = NM_OVSDB_GET_PRIVATE(self); - priv->conn = conn; - g_clear_object(&priv->cancellable); + if (priv->sock_fd_state == FD_STATE_INOUT && !force_write) { + /* we know we are polling for a write. There is no need to try, + * before the file descriptor tells us that we are ready. */ + return TRUE; + } - ovsdb_read(self); - ovsdb_next_command(self); + for (;;) { + gssize n_write; + + if (priv->wr_buf_offset == wr_buf->len) { + /* everything was sent .*/ + nm_str_buf_reset(wr_buf, NULL); + priv->wr_buf_offset = 0; + _sock_fd_set_state(self, FD_STATE_IN); + return TRUE; + } + + nm_assert(wr_buf->len > 0); + nm_assert(priv->wr_buf_offset < wr_buf->len); + + n_write = write(priv->sock_fd, + nm_str_buf_get_str_at_unsafe(wr_buf, priv->wr_buf_offset), + wr_buf->len - priv->wr_buf_offset); + + if (n_write < 0) { + int errsv = errno; + + if (errsv == EAGAIN) + goto out_eagain; + + _LOGD("socket: failure to write to socket: %s (%d)", + nm_strerror_native(errsv), + (int) errsv); + priv->num_failures++; + ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); + return FALSE; + } + + if (n_write == 0) { + /* odd. Let's treat it like EAGAIN. */ + goto out_eagain; + } + + nm_assert((gsize) n_write <= wr_buf->len - priv->wr_buf_offset); + + _LOGT("socket: %zu bytes written", (gsize) n_write); + + priv->wr_buf_offset += (gsize) n_write; + } + +out_eagain: + if (_sock_fd_set_state(self, FD_STATE_INOUT)) + _LOGT("socket: poll for fd to write"); + return TRUE; +} + +/*****************************************************************************/ + +/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the + * ovsdb socket. */ + +typedef struct { + NMStrBuf *input_buffer; + gsize read_offset; +} SockReadJsonCallbackData; + +static size_t +ovsdb_sock_read_json_callback(void *buffer, size_t buflen, void *user_data) +{ + SockReadJsonCallbackData *read_data = user_data; + + if (read_data->read_offset >= read_data->input_buffer->len) { + /* No more bytes buffered for decoding. */ + return 0; + } + + /* Pass one more byte to the JSON decoder. */ + ((char *) buffer)[0] = nm_str_buf_get_char(read_data->input_buffer, read_data->read_offset++); + return 1; +} + +static gboolean +_sock_fd_handle_read(NMOvsdb *self) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + NMStrBuf * rd_buf = &priv->rd_buf; + const gsize MIN_BUF_SIZE = NM_UTILS_GET_NEXT_REALLOC_SIZE_1000; + + nm_assert(priv->sock_fd >= 0); + + for (;;) { + gssize n_read; + + nm_assert(rd_buf->len <= rd_buf->allocated); + nm_assert(priv->rd_buf_offset <= rd_buf->len); + + if (rd_buf->allocated - rd_buf->len < MIN_BUF_SIZE) { + if (priv->rd_buf_offset > 0) { + /* before we reallocate the buffer, erase the parts + * we already parsed. */ + nm_str_buf_erase(rd_buf, 0, priv->rd_buf_offset, FALSE); + priv->rd_buf_offset = 0; + } + nm_str_buf_maybe_expand(rd_buf, MIN_BUF_SIZE, FALSE); + } + + n_read = read(priv->sock_fd, + nm_str_buf_get_str_at_unsafe(rd_buf, rd_buf->len), + rd_buf->allocated - rd_buf->len); + + if (n_read < 0) { + const int errsv = errno; + + if (errsv == EAGAIN) { + /* no more data to read. If we have an incomplete message in the + * receive buffer, we expect a completion within a certain timeout. */ + if (!priv->sock_rd_timeout_source) { + priv->sock_rd_timeout_source = nm_g_timeout_source_new(SOCK_RD_TIMEOUT_MSEC, + G_PRIORITY_DEFAULT, + _sock_fd_timeout_rd, + self, + NULL); + g_source_attach(priv->sock_rd_timeout_source, NULL); + } + return TRUE; + } + + _LOGD("socket: failure to read from socket: %s (%d)", nm_strerror_native(errsv), errsv); + goto out_disconnect; + } + + if (n_read == 0) { + _LOGD("socket: end of file reading from socket"); + goto out_disconnect; + } + + nm_clear_g_source_inst(&priv->sock_rd_timeout_source); + + nm_assert((gsize) n_read <= rd_buf->allocated - rd_buf->len); + _LOGT("socket: %zu bytes read", (gsize) n_read); + nm_str_buf_set_size(rd_buf, rd_buf->len + (gsize) n_read, TRUE, TRUE); + + for (;;) { + nm_auto_decref_json json_t *msg = NULL; + json_error_t json_error = { + 0, + }; + SockReadJsonCallbackData read_data = { + .input_buffer = rd_buf, + .read_offset = priv->rd_buf_offset, + }; + + /* The callback always eats up only up to a single byte. This makes + * it possible for us to identify complete JSON objects in spite of + * us not knowing the length in advance. */ + msg = json_load_callback(ovsdb_sock_read_json_callback, + &read_data, + JSON_DISABLE_EOF_CHECK, + &json_error); + if (!msg) + break; + + nm_assert(read_data.read_offset > priv->rd_buf_offset); + nm_assert(read_data.read_offset <= rd_buf->len); + priv->rd_buf_offset = read_data.read_offset; + + ovsdb_got_msg(self, msg); + } + + if (rd_buf->len - priv->rd_buf_offset > 4u * 1024u * 1024u) { + /* We are about to receive a huge message that we are unable to parse. + * Something is wrong. */ + _LOGE("socket: received a huge message (%zu bytes) from ovsdb without a valid JSON. " + "Close " + "connection", + rd_buf->len); + goto out_disconnect; + } + + if (priv->sock_fd < 0) + return TRUE; + } + + return TRUE; + +out_disconnect: + priv->num_failures++; + ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE); + return FALSE; +} + +/*****************************************************************************/ + +static gboolean +_sock_fd_io_ready(int fd, GIOCondition condition, gpointer user_data) +{ + NMOvsdb * self = user_data; + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + + if (priv->sock_fd_state == FD_STATE_CONNECT) { + int errsv; + socklen_t result_len = sizeof(errsv); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errsv, &result_len) < 0) + errsv = EIO; + + if (errsv != 0) { + _LOGD("socket: failure connecting unix socket to %s: %s (%d)", + OPENVSWITCH_DB_SOCK_PATH, + nm_strerror_native(errsv), + errsv); + ovsdb_disconnect_io_error(self); + return G_SOURCE_CONTINUE; + } + + _LOGT("socket: connected"); + _sock_fd_set_state(self, FD_STATE_IN); + + /* we have a message queued. Try to write it. */ + _sock_fd_handle_write(self, TRUE); + return G_SOURCE_CONTINUE; + } + + if (NM_FLAGS_HAS(condition, G_IO_OUT)) { + _LOGT("socket: socket is ready to write"); + _sock_fd_handle_write(self, TRUE); + return G_SOURCE_CONTINUE; + } + + if (NM_FLAGS_HAS(condition, G_IO_IN)) { + _LOGT("socket: socket is ready to read"); + _sock_fd_handle_read(self); + return G_SOURCE_CONTINUE; + } + + _LOGD("socket: unexpected IO error on socket"); + ovsdb_disconnect_io_error(self); + return G_SOURCE_CONTINUE; +} + +static gboolean +_sock_fd_set_state(NMOvsdb *self, FdState fd_state) +{ + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + GIOCondition io_condition; + + nm_assert(NM_IN_SET(fd_state, FD_STATE_NONE, FD_STATE_CONNECT, FD_STATE_IN, FD_STATE_INOUT)); + + nm_assert(fd_state != FD_STATE_INOUT + || NM_IN_SET(priv->sock_fd_state, FD_STATE_IN, FD_STATE_INOUT)); + + if (fd_state == FD_STATE_NONE) { + if (priv->sock_fd_state == FD_STATE_NONE) { + nm_assert(!priv->sock_source); + return FALSE; + } + priv->sock_fd_state = FD_STATE_NONE; + nm_clear_g_source_inst(&priv->sock_source); + return TRUE; + } + + if (priv->sock_source && priv->sock_fd_state == fd_state) + return FALSE; + + if (fd_state == FD_STATE_CONNECT) + io_condition = G_IO_OUT | G_IO_ERR | G_IO_HUP; + else if (fd_state == FD_STATE_IN) + io_condition = G_IO_IN | G_IO_ERR | G_IO_HUP; + else { + nm_assert(fd_state == FD_STATE_IN); + io_condition = G_IO_IN | G_IO_OUT | G_IO_ERR | G_IO_HUP; + } + + nm_clear_g_source_inst(&priv->sock_source); + priv->sock_fd_state = fd_state; + priv->sock_source = nm_g_unix_fd_source_new(priv->sock_fd, + io_condition, + G_PRIORITY_DEFAULT, + _sock_fd_io_ready, + self, + NULL); + g_source_attach(priv->sock_source, NULL); + return TRUE; } /** @@ -2287,23 +2504,51 @@ _client_connect_cb(GObject *source_object, GAsyncResult *res, gpointer user_data static void ovsdb_try_connect(NMOvsdb *self) { - NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); - GSocketAddress *addr; + NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self); + struct sockaddr_un sockname; + int errsv; + int r; - if (priv->client) + if (priv->sock_fd_state > FD_STATE_NONE) { + /* already connecting or connected. Good. */ return; + } - /* TODO: This should probably be made configurable via NetworkManager.conf */ - addr = g_unix_socket_address_new(RUNSTATEDIR "/openvswitch/db.sock"); + nm_assert(priv->sock_fd_state == FD_STATE_NONE); + nm_assert(priv->sock_fd < 0); + nm_assert(!priv->sock_source); - priv->client = g_socket_client_new(); - priv->cancellable = g_cancellable_new(); - g_socket_client_connect_async(priv->client, - G_SOCKET_CONNECTABLE(addr), - priv->cancellable, - _client_connect_cb, - self); - g_object_unref(addr); + priv->sock_fd = socket(PF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0); + if (priv->sock_fd < 0) { + errsv = errno; + _LOGE("socket: failure to create socket: %s (%d)", nm_strerror_native(errsv), errsv); + return; + } + + G_STATIC_ASSERT_EXPR(NM_STRLEN(OPENVSWITCH_DB_SOCK_PATH) < sizeof(sockname.sun_path)); + memset(&sockname, 0, sizeof(sockname)); + sockname.sun_family = AF_UNIX; + memcpy(sockname.sun_path, OPENVSWITCH_DB_SOCK_PATH, NM_STRLEN(OPENVSWITCH_DB_SOCK_PATH)); + + r = connect(priv->sock_fd, (struct sockaddr *) &sockname, sizeof(sockname)); + if (r < 0) { + errsv = errno; + if (errsv != EINPROGRESS) { + nm_close(nm_steal_fd(&priv->sock_fd)); + _LOGD("socket: failure to connect unix socket to %s: %s (%d)", + OPENVSWITCH_DB_SOCK_PATH, + nm_strerror_native(errsv), + errsv); + return; + } + /* we treat async connect as if the file descriptor would be good, but + * is simply not ready to read/write. */ + _LOGT("socket: connecting to %s (in progress)", OPENVSWITCH_DB_SOCK_PATH); + _sock_fd_set_state(self, FD_STATE_CONNECT); + } else { + _LOGT("socket: connected to %s", OPENVSWITCH_DB_SOCK_PATH); + _sock_fd_set_state(self, FD_STATE_IN); + } /* Queue a monitor call before any other command, ensuring that we have an up * to date view of existing bridged that we need for add and remove ops. */ @@ -2457,8 +2702,11 @@ nm_ovsdb_init(NMOvsdb *self) c_list_init(&priv->calls_lst_head); - priv->input = g_string_new(NULL); - priv->output = g_string_new(NULL); + priv->sock_fd = -1; + priv->sock_fd_state = FD_STATE_NONE; + nm_str_buf_init(&priv->rd_buf, 0, FALSE); + nm_str_buf_init(&priv->wr_buf, 0, FALSE); + priv->bridges = g_hash_table_new_full(nm_pstr_hash, nm_pstr_equal, (GDestroyNotify) _free_bridge, NULL); priv->ports = @@ -2479,15 +2727,6 @@ dispose(GObject *object) nm_assert(c_list_is_empty(&priv->calls_lst_head)); - if (priv->input) { - g_string_free(priv->input, TRUE); - priv->input = NULL; - } - if (priv->output) { - g_string_free(priv->output, TRUE); - priv->output = NULL; - } - nm_clear_pointer(&priv->bridges, g_hash_table_destroy); nm_clear_pointer(&priv->ports, g_hash_table_destroy); nm_clear_pointer(&priv->interfaces, g_hash_table_destroy); @@ -2496,11 +2735,24 @@ dispose(GObject *object) } static void +finalize(GObject *object) +{ + NMOvsdb * self = NM_OVSDB(object); + NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self); + + nm_str_buf_destroy(&priv->rd_buf); + nm_str_buf_destroy(&priv->wr_buf); + + G_OBJECT_CLASS(nm_ovsdb_parent_class)->finalize(object); +} + +static void nm_ovsdb_class_init(NMOvsdbClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS(klass); - object_class->dispose = dispose; + object_class->dispose = dispose; + object_class->finalize = finalize; signals[DEVICE_ADDED] = g_signal_new(NM_OVSDB_DEVICE_ADDED, G_OBJECT_CLASS_TYPE(object_class), |