summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThomas Haller <thaller@redhat.com>2020-11-05 10:56:37 +0100
committerThomas Haller <thaller@redhat.com>2020-11-14 13:17:07 +0100
commita32045b27812c98e3794177957bc2d9533f8e4a5 (patch)
treed29433961c444de1c6262f6ac2632046093a00a6
parent7291f4667797c141ff7ac3d4cda93cf9ab193c55 (diff)
downloadNetworkManager-th/ovs-external-ids-3.tar.gz
core/ovs: rework IO in "nm-ovsdb.c"th/ovs-external-ids-3
-rw-r--r--src/devices/ovs/nm-ovsdb.c742
1 files changed, 497 insertions, 245 deletions
diff --git a/src/devices/ovs/nm-ovsdb.c b/src/devices/ovs/nm-ovsdb.c
index 61875bdcd0..d4708e9143 100644
--- a/src/devices/ovs/nm-ovsdb.c
+++ b/src/devices/ovs/nm-ovsdb.c
@@ -7,25 +7,41 @@
#include "nm-ovsdb.h"
-#include <gmodule.h>
-#include <gio/gunixsocketaddress.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include "devices/nm-device.h"
+#include "nm-core-internal.h"
+#include "nm-core-utils.h"
+#include "nm-glib-aux/nm-io-utils.h"
#include "nm-glib-aux/nm-jansson.h"
#include "nm-glib-aux/nm-str-buf.h"
-#include "nm-core-utils.h"
-#include "nm-core-internal.h"
-#include "devices/nm-device.h"
#include "nm-setting-ovs-external-ids.h"
+#if JANSSON_VERSION_HEX < 0x020400
+ #warning "requires at least libjansson 2.4"
+#endif
+
/*****************************************************************************/
+#define SOCK_RD_TIMEOUT_MSEC 5000
+#define SOCK_WR_TIMEOUT_MSEC 5000
+
#define OVSDB_MAX_FAILURES 3
+#define OPENVSWITCH_DB_SOCK_PATH RUNSTATEDIR "/openvswitch/db.sock"
+
/*****************************************************************************/
-#if JANSSON_VERSION_HEX < 0x020400
- #warning "requires at least libjansson 2.4"
-#endif
+typedef enum _nm_packed {
+ FD_STATE_NONE,
+ FD_STATE_CONNECT,
+ FD_STATE_IN,
+ FD_STATE_INOUT,
+} FdState;
typedef struct {
char * port_uuid;
@@ -111,14 +127,14 @@ enum { DEVICE_ADDED, DEVICE_REMOVED, INTERFACE_FAILED, LAST_SIGNAL };
static guint signals[LAST_SIGNAL] = {0};
typedef struct {
- GSocketClient * client;
- GSocketConnection *conn;
- GCancellable * cancellable;
- char buf[4096]; /* Input buffer */
- size_t bufp; /* Last decoded byte in the input buffer. */
- GString * input; /* JSON stream waiting for decoding. */
- GString * output; /* JSON stream to be sent. */
- guint64 call_id_counter;
+ GSource *sock_source;
+ GSource *sock_wr_timeout_source;
+ GSource *sock_rd_timeout_source;
+ NMStrBuf rd_buf;
+ NMStrBuf wr_buf;
+ gsize rd_buf_offset;
+ gsize wr_buf_offset;
+ guint64 call_id_counter;
CList calls_lst_head;
@@ -126,7 +142,9 @@ typedef struct {
GHashTable *ports; /* port uuid => OpenvswitchPort */
GHashTable *bridges; /* bridge uuid => OpenvswitchBridge */
char * db_uuid;
+ int sock_fd;
guint num_failures;
+ FdState sock_fd_state;
} NMOvsdbPrivate;
struct _NMOvsdb {
@@ -146,11 +164,13 @@ NM_DEFINE_SINGLETON_GETTER(NMOvsdb, nm_ovsdb_get, NM_TYPE_OVSDB);
/*****************************************************************************/
-static void ovsdb_try_connect(NMOvsdb *self);
-static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
-static void ovsdb_read(NMOvsdb *self);
-static void ovsdb_write(NMOvsdb *self);
-static void ovsdb_next_command(NMOvsdb *self);
+static void ovsdb_try_connect(NMOvsdb *self);
+static void ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing);
+static void ovsdb_disconnect_io_error(NMOvsdb *self);
+static gboolean _sock_fd_handle_write(NMOvsdb *self, gboolean write_is_ready);
+static void ovsdb_send_msg(NMOvsdb *self, const char *str, gssize len);
+static void ovsdb_next_command(NMOvsdb *self);
+static gboolean _sock_fd_set_state(NMOvsdb *self, FdState fd_state);
/*****************************************************************************/
@@ -349,6 +369,37 @@ _signal_emit_interface_failed(NMOvsdb * self,
/*****************************************************************************/
+static gboolean
+_output_buffer_append_json(NMOvsdb *self, const char *str, gssize len)
+{
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ NMStrBuf * wr_buf = &priv->wr_buf;
+ gboolean was_empty;
+ gsize len_u;
+
+ if (len < 0)
+ len_u = strlen(str);
+ else
+ len_u = len;
+
+ if ((wr_buf->allocated - wr_buf->len) < len_u) {
+ /* It seems we need to reallocate the buffer. But first
+ * try to avoid that by erasing the part we already
+ * sent. */
+ if (priv->wr_buf_offset > 0) {
+ nm_str_buf_erase(wr_buf, 0, priv->wr_buf_offset, FALSE);
+ priv->wr_buf_offset = 0;
+ }
+ }
+
+ was_empty = ((wr_buf->len - priv->wr_buf_offset) == 0);
+
+ nm_str_buf_append_len(wr_buf, str, len_u);
+ return was_empty;
+}
+
+/*****************************************************************************/
+
/**
* ovsdb_call_method:
*
@@ -1248,10 +1299,10 @@ ovsdb_next_command(NMOvsdb *self)
{
NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self);
OvsdbMethodCall * call;
- char * cmd;
- nm_auto_decref_json json_t *msg = NULL;
+ nm_auto_decref_json json_t *msg = NULL;
+ gs_free char * msg_as_str = NULL;
- if (!priv->conn)
+ if (priv->sock_fd < 0)
return;
if (c_list_is_empty(&priv->calls_lst_head))
@@ -1369,12 +1420,9 @@ ovsdb_next_command(NMOvsdb *self)
g_return_if_fail(msg);
- cmd = json_dumps(msg, 0);
- _LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, cmd);
- g_string_append(priv->output, cmd);
- free(cmd);
-
- ovsdb_write(self);
+ msg_as_str = json_dumps(msg, 0);
+ _LOGT_call(call, "send: call-id=%" G_GUINT64_FORMAT ", %s", call->call_id, msg_as_str);
+ ovsdb_send_msg(self, msg_as_str, -1);
}
/**
@@ -1523,14 +1571,14 @@ _external_ids_to_string(const GArray *arr)
/*****************************************************************************/
/**
- * ovsdb_got_update:
+ * ovsdb_got_msg_update:
*
* Called when we've got an "update" method call (we asked for it with the monitor
* command). We use it to maintain a consistent view of bridge list regardless of
* whether the changes are done by us or externally.
*/
static void
-ovsdb_got_update(NMOvsdb *self, json_t *msg)
+ovsdb_got_msg_update(NMOvsdb *self, json_t *msg)
{
NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
json_t * ovs = NULL;
@@ -1907,28 +1955,20 @@ ovsdb_got_update(NMOvsdb *self, json_t *msg)
}
/**
- * ovsdb_got_echo:
+ * ovsdb_got_msg_echo:
*
* Only implemented because the specification mandates it. Actual ovsdb hasn't been
* seen doing this.
*/
static void
-ovsdb_got_echo(NMOvsdb *self, json_int_t id, json_t *data)
+ovsdb_got_msg_echo(NMOvsdb *self, json_int_t id, json_t *data)
{
- NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self);
- nm_auto_decref_json json_t *msg = NULL;
- char * reply;
- gboolean output_was_empty;
-
- output_was_empty = priv->output->len == 0;
+ nm_auto_decref_json json_t *msg = NULL;
+ gs_free char * msg_as_str = NULL;
- msg = json_pack("{s:I, s:O}", "id", id, "result", data);
- reply = json_dumps(msg, 0);
- g_string_append(priv->output, reply);
- free(reply);
-
- if (output_was_empty)
- ovsdb_write(self);
+ msg = json_pack("{s:I, s:O}", "id", id, "result", data);
+ msg_as_str = json_dumps(msg, 0);
+ ovsdb_send_msg(self, msg_as_str, -1);
}
/**
@@ -1982,15 +2022,16 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
return;
}
- if (nm_streq0(method, "update")) {
- /* This is a update method call. */
- ovsdb_got_update(self, json_array_get(params, 1));
- } else if (nm_streq0(method, "echo")) {
- /* This is an echo request. */
- ovsdb_got_echo(self, id, params);
- } else {
- _LOGW("got an unknown method call: '%s'", method);
+ if (nm_streq(method, "update")) {
+ ovsdb_got_msg_update(self, json_array_get(params, 1));
+ return;
+ }
+ if (nm_streq(method, "echo")) {
+ ovsdb_got_msg_echo(self, id, params);
+ return;
}
+
+ _LOGD("got an unknown method call: '%s'", method);
return;
}
@@ -2030,155 +2071,22 @@ ovsdb_got_msg(NMOvsdb *self, json_t *msg)
priv->num_failures = 0;
- /* Don't progress further commands in case the callback hit an error
- * and disconnected us. */
- if (!priv->conn)
- return;
-
/* Now we're free to serialize and send the next command, if any. */
ovsdb_next_command(self);
-
return;
}
/* This is a message we are not interested in. */
- _LOGW("got an unknown message, ignoring");
+ _LOGD("got an unknown message, ignoring");
}
/*****************************************************************************/
-/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the
- * ovsdb socket. */
-
-static size_t
-_json_callback(void *buffer, size_t buflen, void *user_data)
-{
- NMOvsdb * self = NM_OVSDB(user_data);
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
-
- if (priv->bufp == priv->input->len) {
- /* No more bytes buffered for decoding. */
- return 0;
- }
-
- /* Pass one more byte to the JSON decoder. */
- *(char *) buffer = priv->input->str[priv->bufp];
- priv->bufp++;
-
- return (size_t) 1;
-}
-
-/**
- * ovsdb_read_cb:
- *
- * Read out the data available from the ovsdb socket and try to deserialize
- * the JSON. If we see a complete object, pass it upwards to ovsdb_got_msg().
- */
static void
-ovsdb_read_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
+ovsdb_send_msg(NMOvsdb *self, const char *str, gssize len)
{
- NMOvsdb * self = NM_OVSDB(user_data);
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GInputStream * stream = G_INPUT_STREAM(source_object);
- GError * error = NULL;
- gssize size;
- json_t * msg;
- json_error_t json_error = {
- 0,
- };
-
- size = g_input_stream_read_finish(stream, res, &error);
- if (size == -1) {
- /* ovsdb-server was possibly restarted */
- _LOGW("short read from ovsdb: %s", error->message);
- priv->num_failures++;
- g_clear_error(&error);
- ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
- return;
- }
-
- g_string_append_len(priv->input, priv->buf, size);
- do {
- priv->bufp = 0;
- /* The callback always eats up only up to a single byte. This makes
- * it possible for us to identify complete JSON objects in spite of
- * us not knowing the length in advance. */
- msg = json_load_callback(_json_callback, self, JSON_DISABLE_EOF_CHECK, &json_error);
- if (msg) {
- ovsdb_got_msg(self, msg);
- g_string_erase(priv->input, 0, priv->bufp);
- }
- json_decref(msg);
- } while (msg);
-
- if (!priv->conn)
- return;
-
- if (size)
- ovsdb_read(self);
-}
-
-static void
-ovsdb_read(NMOvsdb *self)
-{
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
-
- g_input_stream_read_async(g_io_stream_get_input_stream(G_IO_STREAM(priv->conn)),
- priv->buf,
- sizeof(priv->buf),
- G_PRIORITY_DEFAULT,
- NULL,
- ovsdb_read_cb,
- self);
-}
-
-static void
-ovsdb_write_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
-{
- GOutputStream * stream = G_OUTPUT_STREAM(source_object);
- NMOvsdb * self = NM_OVSDB(user_data);
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GError * error = NULL;
- gssize size;
-
- size = g_output_stream_write_finish(stream, res, &error);
- if (size == -1) {
- /* ovsdb-server was possibly restarted */
- _LOGW("short write to ovsdb: %s", error->message);
- priv->num_failures++;
- g_clear_error(&error);
- ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
- return;
- }
-
- if (!priv->conn)
- return;
-
- g_string_erase(priv->output, 0, size);
-
- ovsdb_write(self);
-}
-
-static void
-ovsdb_write(NMOvsdb *self)
-{
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GOutputStream * stream;
-
- if (!priv->output->len)
- return;
-
- stream = g_io_stream_get_output_stream(G_IO_STREAM(priv->conn));
- if (g_output_stream_has_pending(stream))
- return;
-
- g_output_stream_write_async(stream,
- priv->output->str,
- priv->output->len,
- G_PRIORITY_DEFAULT,
- NULL,
- ovsdb_write_cb,
- self);
+ if (_output_buffer_append_json(self, str, len))
+ _sock_fd_handle_write(self, FALSE);
}
/*****************************************************************************/
@@ -2201,7 +2109,7 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
nm_assert(!retry || !is_disposing);
- if (!priv->client)
+ if (priv->sock_fd < 0)
return;
_LOGD("disconnecting from ovsdb, retry %d", retry);
@@ -2222,19 +2130,32 @@ ovsdb_disconnect(NMOvsdb *self, gboolean retry, gboolean is_disposing)
_call_complete(call, NULL, error);
}
- priv->bufp = 0;
- g_string_truncate(priv->input, 0);
- g_string_truncate(priv->output, 0);
- g_clear_object(&priv->client);
- g_clear_object(&priv->conn);
+ nm_close(nm_steal_fd(&priv->sock_fd));
+ nm_clear_g_source_inst(&priv->sock_source);
+ nm_clear_g_source_inst(&priv->sock_rd_timeout_source);
+ nm_clear_g_source_inst(&priv->sock_wr_timeout_source);
+
+ priv->rd_buf_offset = 0;
+ priv->wr_buf_offset = 0;
+ nm_str_buf_reset(&priv->rd_buf, NULL);
+ nm_str_buf_reset(&priv->wr_buf, NULL);
+
nm_clear_g_free(&priv->db_uuid);
- nm_clear_g_cancellable(&priv->cancellable);
if (retry)
ovsdb_try_connect(self);
}
static void
+ovsdb_disconnect_io_error(NMOvsdb *self)
+{
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+
+ priv->num_failures++;
+ ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
+}
+
+static void
_monitor_bridges_cb(NMOvsdb *self, json_t *result, GError *error, gpointer user_data)
{
if (error) {
@@ -2247,34 +2168,330 @@ _monitor_bridges_cb(NMOvsdb *self, json_t *result, GError *error, gpointer user_
/* Treat the first response the same as the subsequent "update"
* messages we eventually get. */
- ovsdb_got_update(self, result);
+ ovsdb_got_msg_update(self, result);
}
-static void
-_client_connect_cb(GObject *source_object, GAsyncResult *res, gpointer user_data)
+/*****************************************************************************/
+
+static gboolean
+_sock_fd_timeout_rd(gpointer user_data)
{
- GSocketClient * client = G_SOCKET_CLIENT(source_object);
- NMOvsdb * self = NM_OVSDB(user_data);
- NMOvsdbPrivate * priv;
- GError * error = NULL;
- GSocketConnection *conn;
-
- conn = g_socket_client_connect_finish(client, res, &error);
- if (conn == NULL) {
- if (!g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
- _LOGI("%s", error->message);
+ NMOvsdb *self = user_data;
- ovsdb_disconnect(self, FALSE, FALSE);
- g_clear_error(&error);
- return;
+ _LOGW("socket: timeout receiving message from ovsdb");
+ ovsdb_disconnect_io_error(self);
+ return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+_sock_fd_timeout_wr(gpointer user_data)
+{
+ NMOvsdb *self = user_data;
+
+ _LOGW("socket: timeout sending message to ovsdb");
+ ovsdb_disconnect_io_error(self);
+ return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+_sock_fd_handle_write(NMOvsdb *self, gboolean force_write)
+{
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ NMStrBuf * wr_buf = &priv->wr_buf;
+
+ nm_assert(priv->sock_fd >= 0);
+ nm_assert(NM_IN_SET(priv->sock_fd_state, FD_STATE_CONNECT, FD_STATE_IN, FD_STATE_INOUT));
+ nm_assert(wr_buf->len > 0);
+ nm_assert(priv->wr_buf_offset < wr_buf->len);
+
+ if (priv->sock_fd_state == FD_STATE_CONNECT) {
+ /* we are still connecting. Write is not yet ready. */
+ return TRUE;
}
- priv = NM_OVSDB_GET_PRIVATE(self);
- priv->conn = conn;
- g_clear_object(&priv->cancellable);
+ if (priv->sock_fd_state == FD_STATE_INOUT && !force_write) {
+ /* we know we are polling for a write. There is no need to try,
+ * before the file descriptor tells us that we are ready. */
+ return TRUE;
+ }
- ovsdb_read(self);
- ovsdb_next_command(self);
+ for (;;) {
+ gssize n_write;
+
+ if (priv->wr_buf_offset == wr_buf->len) {
+ /* everything was sent .*/
+ nm_str_buf_reset(wr_buf, NULL);
+ priv->wr_buf_offset = 0;
+ _sock_fd_set_state(self, FD_STATE_IN);
+ return TRUE;
+ }
+
+ nm_assert(wr_buf->len > 0);
+ nm_assert(priv->wr_buf_offset < wr_buf->len);
+
+ n_write = write(priv->sock_fd,
+ nm_str_buf_get_str_at_unsafe(wr_buf, priv->wr_buf_offset),
+ wr_buf->len - priv->wr_buf_offset);
+
+ if (n_write < 0) {
+ int errsv = errno;
+
+ if (errsv == EAGAIN)
+ goto out_eagain;
+
+ _LOGD("socket: failure to write to socket: %s (%d)",
+ nm_strerror_native(errsv),
+ (int) errsv);
+ priv->num_failures++;
+ ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
+ return FALSE;
+ }
+
+ if (n_write == 0) {
+ /* odd. Let's treat it like EAGAIN. */
+ goto out_eagain;
+ }
+
+ nm_assert((gsize) n_write <= wr_buf->len - priv->wr_buf_offset);
+
+ _LOGT("socket: %zu bytes written", (gsize) n_write);
+
+ priv->wr_buf_offset += (gsize) n_write;
+ }
+
+out_eagain:
+ if (_sock_fd_set_state(self, FD_STATE_INOUT))
+ _LOGT("socket: poll for fd to write");
+ return TRUE;
+}
+
+/*****************************************************************************/
+
+/* Lower level marshalling and demarshalling of the JSON-RPC traffic on the
+ * ovsdb socket. */
+
+typedef struct {
+ NMStrBuf *input_buffer;
+ gsize read_offset;
+} SockReadJsonCallbackData;
+
+static size_t
+ovsdb_sock_read_json_callback(void *buffer, size_t buflen, void *user_data)
+{
+ SockReadJsonCallbackData *read_data = user_data;
+
+ if (read_data->read_offset >= read_data->input_buffer->len) {
+ /* No more bytes buffered for decoding. */
+ return 0;
+ }
+
+ /* Pass one more byte to the JSON decoder. */
+ ((char *) buffer)[0] = nm_str_buf_get_char(read_data->input_buffer, read_data->read_offset++);
+ return 1;
+}
+
+static gboolean
+_sock_fd_handle_read(NMOvsdb *self)
+{
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ NMStrBuf * rd_buf = &priv->rd_buf;
+ const gsize MIN_BUF_SIZE = NM_UTILS_GET_NEXT_REALLOC_SIZE_1000;
+
+ nm_assert(priv->sock_fd >= 0);
+
+ for (;;) {
+ gssize n_read;
+
+ nm_assert(rd_buf->len <= rd_buf->allocated);
+ nm_assert(priv->rd_buf_offset <= rd_buf->len);
+
+ if (rd_buf->allocated - rd_buf->len < MIN_BUF_SIZE) {
+ if (priv->rd_buf_offset > 0) {
+ /* before we reallocate the buffer, erase the parts
+ * we already parsed. */
+ nm_str_buf_erase(rd_buf, 0, priv->rd_buf_offset, FALSE);
+ priv->rd_buf_offset = 0;
+ }
+ nm_str_buf_maybe_expand(rd_buf, MIN_BUF_SIZE, FALSE);
+ }
+
+ n_read = read(priv->sock_fd,
+ nm_str_buf_get_str_at_unsafe(rd_buf, rd_buf->len),
+ rd_buf->allocated - rd_buf->len);
+
+ if (n_read < 0) {
+ const int errsv = errno;
+
+ if (errsv == EAGAIN) {
+ /* no more data to read. If we have an incomplete message in the
+ * receive buffer, we expect a completion within a certain timeout. */
+ if (!priv->sock_rd_timeout_source) {
+ priv->sock_rd_timeout_source = nm_g_timeout_source_new(SOCK_RD_TIMEOUT_MSEC,
+ G_PRIORITY_DEFAULT,
+ _sock_fd_timeout_rd,
+ self,
+ NULL);
+ g_source_attach(priv->sock_rd_timeout_source, NULL);
+ }
+ return TRUE;
+ }
+
+ _LOGD("socket: failure to read from socket: %s (%d)", nm_strerror_native(errsv), errsv);
+ goto out_disconnect;
+ }
+
+ if (n_read == 0) {
+ _LOGD("socket: end of file reading from socket");
+ goto out_disconnect;
+ }
+
+ nm_clear_g_source_inst(&priv->sock_rd_timeout_source);
+
+ nm_assert((gsize) n_read <= rd_buf->allocated - rd_buf->len);
+ _LOGT("socket: %zu bytes read", (gsize) n_read);
+ nm_str_buf_set_size(rd_buf, rd_buf->len + (gsize) n_read, TRUE, TRUE);
+
+ for (;;) {
+ nm_auto_decref_json json_t *msg = NULL;
+ json_error_t json_error = {
+ 0,
+ };
+ SockReadJsonCallbackData read_data = {
+ .input_buffer = rd_buf,
+ .read_offset = priv->rd_buf_offset,
+ };
+
+ /* The callback always eats up only up to a single byte. This makes
+ * it possible for us to identify complete JSON objects in spite of
+ * us not knowing the length in advance. */
+ msg = json_load_callback(ovsdb_sock_read_json_callback,
+ &read_data,
+ JSON_DISABLE_EOF_CHECK,
+ &json_error);
+ if (!msg)
+ break;
+
+ nm_assert(read_data.read_offset > priv->rd_buf_offset);
+ nm_assert(read_data.read_offset <= rd_buf->len);
+ priv->rd_buf_offset = read_data.read_offset;
+
+ ovsdb_got_msg(self, msg);
+ }
+
+ if (rd_buf->len - priv->rd_buf_offset > 4u * 1024u * 1024u) {
+ /* We are about to receive a huge message that we are unable to parse.
+ * Something is wrong. */
+ _LOGE("socket: received a huge message (%zu bytes) from ovsdb without a valid JSON. "
+ "Close "
+ "connection",
+ rd_buf->len);
+ goto out_disconnect;
+ }
+
+ if (priv->sock_fd < 0)
+ return TRUE;
+ }
+
+ return TRUE;
+
+out_disconnect:
+ priv->num_failures++;
+ ovsdb_disconnect(self, priv->num_failures <= OVSDB_MAX_FAILURES, FALSE);
+ return FALSE;
+}
+
+/*****************************************************************************/
+
+static gboolean
+_sock_fd_io_ready(int fd, GIOCondition condition, gpointer user_data)
+{
+ NMOvsdb * self = user_data;
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+
+ if (priv->sock_fd_state == FD_STATE_CONNECT) {
+ int errsv;
+ socklen_t result_len = sizeof(errsv);
+
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errsv, &result_len) < 0)
+ errsv = EIO;
+
+ if (errsv != 0) {
+ _LOGD("socket: failure connecting unix socket to %s: %s (%d)",
+ OPENVSWITCH_DB_SOCK_PATH,
+ nm_strerror_native(errsv),
+ errsv);
+ ovsdb_disconnect_io_error(self);
+ return G_SOURCE_CONTINUE;
+ }
+
+ _LOGT("socket: connected");
+ _sock_fd_set_state(self, FD_STATE_IN);
+
+ /* we have a message queued. Try to write it. */
+ _sock_fd_handle_write(self, TRUE);
+ return G_SOURCE_CONTINUE;
+ }
+
+ if (NM_FLAGS_HAS(condition, G_IO_OUT)) {
+ _LOGT("socket: socket is ready to write");
+ _sock_fd_handle_write(self, TRUE);
+ return G_SOURCE_CONTINUE;
+ }
+
+ if (NM_FLAGS_HAS(condition, G_IO_IN)) {
+ _LOGT("socket: socket is ready to read");
+ _sock_fd_handle_read(self);
+ return G_SOURCE_CONTINUE;
+ }
+
+ _LOGD("socket: unexpected IO error on socket");
+ ovsdb_disconnect_io_error(self);
+ return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+_sock_fd_set_state(NMOvsdb *self, FdState fd_state)
+{
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+ GIOCondition io_condition;
+
+ nm_assert(NM_IN_SET(fd_state, FD_STATE_NONE, FD_STATE_CONNECT, FD_STATE_IN, FD_STATE_INOUT));
+
+ nm_assert(fd_state != FD_STATE_INOUT
+ || NM_IN_SET(priv->sock_fd_state, FD_STATE_IN, FD_STATE_INOUT));
+
+ if (fd_state == FD_STATE_NONE) {
+ if (priv->sock_fd_state == FD_STATE_NONE) {
+ nm_assert(!priv->sock_source);
+ return FALSE;
+ }
+ priv->sock_fd_state = FD_STATE_NONE;
+ nm_clear_g_source_inst(&priv->sock_source);
+ return TRUE;
+ }
+
+ if (priv->sock_source && priv->sock_fd_state == fd_state)
+ return FALSE;
+
+ if (fd_state == FD_STATE_CONNECT)
+ io_condition = G_IO_OUT | G_IO_ERR | G_IO_HUP;
+ else if (fd_state == FD_STATE_IN)
+ io_condition = G_IO_IN | G_IO_ERR | G_IO_HUP;
+ else {
+ nm_assert(fd_state == FD_STATE_IN);
+ io_condition = G_IO_IN | G_IO_OUT | G_IO_ERR | G_IO_HUP;
+ }
+
+ nm_clear_g_source_inst(&priv->sock_source);
+ priv->sock_fd_state = fd_state;
+ priv->sock_source = nm_g_unix_fd_source_new(priv->sock_fd,
+ io_condition,
+ G_PRIORITY_DEFAULT,
+ _sock_fd_io_ready,
+ self,
+ NULL);
+ g_source_attach(priv->sock_source, NULL);
+ return TRUE;
}
/**
@@ -2287,23 +2504,51 @@ _client_connect_cb(GObject *source_object, GAsyncResult *res, gpointer user_data
static void
ovsdb_try_connect(NMOvsdb *self)
{
- NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
- GSocketAddress *addr;
+ NMOvsdbPrivate * priv = NM_OVSDB_GET_PRIVATE(self);
+ struct sockaddr_un sockname;
+ int errsv;
+ int r;
- if (priv->client)
+ if (priv->sock_fd_state > FD_STATE_NONE) {
+ /* already connecting or connected. Good. */
return;
+ }
- /* TODO: This should probably be made configurable via NetworkManager.conf */
- addr = g_unix_socket_address_new(RUNSTATEDIR "/openvswitch/db.sock");
+ nm_assert(priv->sock_fd_state == FD_STATE_NONE);
+ nm_assert(priv->sock_fd < 0);
+ nm_assert(!priv->sock_source);
- priv->client = g_socket_client_new();
- priv->cancellable = g_cancellable_new();
- g_socket_client_connect_async(priv->client,
- G_SOCKET_CONNECTABLE(addr),
- priv->cancellable,
- _client_connect_cb,
- self);
- g_object_unref(addr);
+ priv->sock_fd = socket(PF_UNIX, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);
+ if (priv->sock_fd < 0) {
+ errsv = errno;
+ _LOGE("socket: failure to create socket: %s (%d)", nm_strerror_native(errsv), errsv);
+ return;
+ }
+
+ G_STATIC_ASSERT_EXPR(NM_STRLEN(OPENVSWITCH_DB_SOCK_PATH) < sizeof(sockname.sun_path));
+ memset(&sockname, 0, sizeof(sockname));
+ sockname.sun_family = AF_UNIX;
+ memcpy(sockname.sun_path, OPENVSWITCH_DB_SOCK_PATH, NM_STRLEN(OPENVSWITCH_DB_SOCK_PATH));
+
+ r = connect(priv->sock_fd, (struct sockaddr *) &sockname, sizeof(sockname));
+ if (r < 0) {
+ errsv = errno;
+ if (errsv != EINPROGRESS) {
+ nm_close(nm_steal_fd(&priv->sock_fd));
+ _LOGD("socket: failure to connect unix socket to %s: %s (%d)",
+ OPENVSWITCH_DB_SOCK_PATH,
+ nm_strerror_native(errsv),
+ errsv);
+ return;
+ }
+ /* we treat async connect as if the file descriptor would be good, but
+ * is simply not ready to read/write. */
+ _LOGT("socket: connecting to %s (in progress)", OPENVSWITCH_DB_SOCK_PATH);
+ _sock_fd_set_state(self, FD_STATE_CONNECT);
+ } else {
+ _LOGT("socket: connected to %s", OPENVSWITCH_DB_SOCK_PATH);
+ _sock_fd_set_state(self, FD_STATE_IN);
+ }
/* Queue a monitor call before any other command, ensuring that we have an up
* to date view of existing bridged that we need for add and remove ops. */
@@ -2457,8 +2702,11 @@ nm_ovsdb_init(NMOvsdb *self)
c_list_init(&priv->calls_lst_head);
- priv->input = g_string_new(NULL);
- priv->output = g_string_new(NULL);
+ priv->sock_fd = -1;
+ priv->sock_fd_state = FD_STATE_NONE;
+ nm_str_buf_init(&priv->rd_buf, 0, FALSE);
+ nm_str_buf_init(&priv->wr_buf, 0, FALSE);
+
priv->bridges =
g_hash_table_new_full(nm_pstr_hash, nm_pstr_equal, (GDestroyNotify) _free_bridge, NULL);
priv->ports =
@@ -2479,15 +2727,6 @@ dispose(GObject *object)
nm_assert(c_list_is_empty(&priv->calls_lst_head));
- if (priv->input) {
- g_string_free(priv->input, TRUE);
- priv->input = NULL;
- }
- if (priv->output) {
- g_string_free(priv->output, TRUE);
- priv->output = NULL;
- }
-
nm_clear_pointer(&priv->bridges, g_hash_table_destroy);
nm_clear_pointer(&priv->ports, g_hash_table_destroy);
nm_clear_pointer(&priv->interfaces, g_hash_table_destroy);
@@ -2496,11 +2735,24 @@ dispose(GObject *object)
}
static void
+finalize(GObject *object)
+{
+ NMOvsdb * self = NM_OVSDB(object);
+ NMOvsdbPrivate *priv = NM_OVSDB_GET_PRIVATE(self);
+
+ nm_str_buf_destroy(&priv->rd_buf);
+ nm_str_buf_destroy(&priv->wr_buf);
+
+ G_OBJECT_CLASS(nm_ovsdb_parent_class)->finalize(object);
+}
+
+static void
nm_ovsdb_class_init(NMOvsdbClass *klass)
{
GObjectClass *object_class = G_OBJECT_CLASS(klass);
- object_class->dispose = dispose;
+ object_class->dispose = dispose;
+ object_class->finalize = finalize;
signals[DEVICE_ADDED] = g_signal_new(NM_OVSDB_DEVICE_ADDED,
G_OBJECT_CLASS_TYPE(object_class),