summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2012-11-15 13:11:35 +0100
committerunknown <knielsen@knielsen-hq.org>2012-11-15 13:11:35 +0100
commitbdbce30dee9bf550cf09e5d9098bc2a7e0ad940e (patch)
treeab2d41f16d3bcec7ac78d6b12545dce8615ddbd0 /sql/log_event.cc
parent3e798b033ab37291bc4e4a72e887aaf9a042662b (diff)
downloadmariadb-git-bdbce30dee9bf550cf09e5d9098bc2a7e0ad940e.tar.gz
MDEV-26: Global transaction id: Intermediate commit.
Now slave can connect to master, sending start position as slave state rather than old-style binlog name/position. This enables to switch to a new master by changing just connection information, replication slave GTID state ensures that slave starts at the correct point in the new master.
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc196
1 files changed, 196 insertions, 0 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f124ee1eae7..7a32a812214 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -6279,6 +6279,67 @@ rpl_slave_state::next_subid(uint32 domain_id)
#endif
+/*
+ Prepare the current slave state as a string, suitable for sending to the
+ master to request to receive binlog events starting from that GTID state.
+
+ The state consists of the most recently applied GTID for each domain_id,
+ ie. the one with the highest sub_id within each domain_id.
+*/
+
+int
+rpl_slave_state::tostring(String *dest)
+{
+ bool first= true;
+ uint32 i;
+ int err= 1;
+
+ lock();
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ uint64 best_sub_id;
+ rpl_gtid best_gtid;
+ element *e= (element *)my_hash_element(&hash, i);
+ list_element *l= e->list;
+
+ DBUG_ASSERT(l /* We should never have empty list in element. */);
+ if (!l)
+ goto err;
+
+ best_gtid.domain_id= e->domain_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ best_sub_id= l->sub_id;
+ while ((l= l->next))
+ {
+ if (l->sub_id > best_sub_id)
+ {
+ best_sub_id= l->sub_id;
+ best_gtid.server_id= l->server_id;
+ best_gtid.seq_no= l->seq_no;
+ }
+ }
+
+ if (first)
+ first= false;
+ else
+ dest->append("-",1);
+ dest->append_ulonglong(best_gtid.domain_id);
+ dest->append("-",1);
+ dest->append_ulonglong(best_gtid.server_id);
+ dest->append("-",1);
+ dest->append_ulonglong(best_gtid.seq_no);
+ }
+
+ err= 0;
+
+err:
+ unlock();
+ return err;
+}
+
+
rpl_binlog_state::rpl_binlog_state()
{
my_hash_init(&hash, &my_charset_bin, 32,
@@ -6410,6 +6471,119 @@ rpl_binlog_state::read_from_iocache(IO_CACHE *src)
}
return 0;
}
+
+
+slave_connection_state::slave_connection_state()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(rpl_gtid, domain_id), sizeof(uint32), NULL, my_free,
+ HASH_UNIQUE);
+}
+
+
+slave_connection_state::~slave_connection_state()
+{
+ my_hash_free(&hash);
+}
+
+
+/*
+ Create a hash from the slave GTID state that is sent to master when slave
+ connects to start replication.
+
+ The state is sent as <GTID>,<GTID>,...,<GTID>, for example:
+
+ 0-2-112,1-4-1022
+
+ The state gives for each domain_id the GTID to start replication from for
+ the corresponding replication stream. So domain_id must be unique.
+
+ Returns 0 if ok, non-zero if error due to malformed input.
+
+ Note that input string is built by slave server, so it will not be incorrect
+ unless bug/corruption/malicious server. So we just need basic sanity check,
+ not fancy user-friendly error message.
+*/
+
+int
+slave_connection_state::load(char *slave_request, size_t len)
+{
+ char *p, *q, *end;
+ uint64 v;
+ uint32 domain_id, server_id;
+ uint64 seq_no;
+ uchar *rec;
+ rpl_gtid *gtid;
+ int err= 0;
+
+ my_hash_reset(&hash);
+ p= slave_request;
+ end= slave_request + len;
+ for (;;)
+ {
+ q= end;
+ v= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v > (uint32)0xffffffff || *q != '-')
+ return 1;
+ domain_id= (uint32)v;
+ p= q+1;
+ q= end;
+ v= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0 || v > (uint32)0xffffffff || *q != '-')
+ return 1;
+ server_id= (uint32)v;
+ p= q+1;
+ q= end;
+ seq_no= (uint64)my_strtoll10(p, &q, &err);
+ if (err != 0)
+ return 1;
+
+ if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
+ return 1;
+ gtid= (rpl_gtid *)rec;
+ gtid->domain_id= domain_id;
+ gtid->server_id= server_id;
+ gtid->seq_no= seq_no;
+ if (my_hash_insert(&hash, rec))
+ {
+ my_free(rec);
+ return 1;
+ }
+ if (q == end)
+ break; /* Finished. */
+ if (*q != ',')
+ return 1;
+ p= q+1;
+ }
+
+ return 0;
+}
+
+
+rpl_gtid *
+slave_connection_state::find(uint32 domain_id)
+{
+ return (rpl_gtid *) my_hash_search(&hash, (const uchar *)(&domain_id), 0);
+}
+
+
+void
+slave_connection_state::remove(const rpl_gtid *in_gtid)
+{
+ bool err;
+ uchar *rec= my_hash_search(&hash, (const uchar *)(&in_gtid->domain_id), 0);
+#ifndef DBUG_OFF
+ rpl_gtid *slave_gtid= (rpl_gtid *)rec;
+ DBUG_ASSERT(rec /* We should never try to remove not present domain_id. */);
+ DBUG_ASSERT(slave_gtid->server_id == in_gtid->server_id);
+ DBUG_ASSERT(slave_gtid->seq_no == in_gtid->seq_no);
+#endif
+
+ err= my_hash_delete(&hash, rec);
+ DBUG_ASSERT(!err);
+}
+
+
#endif /* MYSQL_SERVER */
@@ -6443,6 +6617,28 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
{
}
+
+/*
+ Used to record GTID while sending binlog to slave, without having to
+ fully contruct every Gtid_log_event() needlessly.
+*/
+bool
+Gtid_log_event::peek(const char *event_start, size_t event_len,
+ uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
+ uchar *flags2)
+{
+ const char *p;
+ if (event_len < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
+ return true;
+ *server_id= uint4korr(event_start + SERVER_ID_OFFSET);
+ p= event_start + LOG_EVENT_HEADER_LEN;
+ *seq_no= uint8korr(p);
+ p+= 8;
+ *domain_id= uint4korr(p);
+ return false;
+}
+
+
bool
Gtid_log_event::write(IO_CACHE *file)
{