summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMorgan Jones <morgan@parelastic.com>2014-09-03 12:20:53 -0700
committerDenis Makogon <dmakogon@mirantis.com>2014-10-01 14:57:35 +0300
commite5c757f3ee9e2642f26139f3a344f9dcc1ab6fb9 (patch)
tree9b3313cf7049d2f9437b4b2035a87a257bc84ed9
parentba3a43f2c5d3233118f0cb152a299c29e6018d36 (diff)
downloadtrove-e5c757f3ee9e2642f26139f3a344f9dcc1ab6fb9.tar.gz
Use unique passwords for replication user
Generates a unique user with a random password for each slave. The replication user and password is passed to each slave during the snapshot attach process. Replication user is deleted from both the master and the slave during the detach process. Co-Authored-By: Nikhil Manchanda <SlickNik@gmail.com> Co-Authored-By: Denis Makogon <dmakogon@mirantis.com> Change-Id: I9cb158a161714bfff90225227f5c652120393ba7 Closes-bug: 1357065
-rw-r--r--trove/common/cfg.py4
-rw-r--r--trove/guestagent/api.py7
-rw-r--r--trove/guestagent/datastore/mysql/manager.py9
-rw-r--r--trove/guestagent/datastore/mysql/service.py46
-rw-r--r--trove/guestagent/strategies/replication/base.py4
-rw-r--r--trove/guestagent/strategies/replication/mysql_binlog.py48
-rw-r--r--trove/taskmanager/manager.py6
-rwxr-xr-xtrove/taskmanager/models.py9
-rw-r--r--trove/tests/api/replication.py23
-rw-r--r--trove/tests/unittests/guestagent/test_dbaas.py4
10 files changed, 127 insertions, 33 deletions
diff --git a/trove/common/cfg.py b/trove/common/cfg.py
index 69e34d5d..bbd4e5f0 100644
--- a/trove/common/cfg.py
+++ b/trove/common/cfg.py
@@ -396,10 +396,6 @@ mysql_opts = [
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
- cfg.StrOpt('replication_user', default='slave_user',
- help='Userid for replication slave.', secret=True),
- cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
- help='Password for replication slave user.', secret=True),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index bdefa8fe..2e4dd5de 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -338,7 +338,12 @@ class API(proxy.RpcProxy):
def detach_replica(self):
LOG.debug("Detaching replica %s from its replication source.", self.id)
- self._call("detach_replica", AGENT_HIGH_TIMEOUT)
+ return self._call("detach_replica", AGENT_HIGH_TIMEOUT)
+
+ def cleanup_source_on_replica_detach(self, replica_info):
+ LOG.debug("Cleaning up master %s on detach of replica.", self.id)
+ self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
+ replica_info=replica_info)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)
diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py
index 6625a30b..2d7d4545 100644
--- a/trove/guestagent/datastore/mysql/manager.py
+++ b/trove/guestagent/datastore/mysql/manager.py
@@ -274,7 +274,14 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Detaching replica.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
- replication.detach_slave(app)
+ replica_info = replication.detach_slave(app)
+ return replica_info
+
+ def cleanup_source_on_replica_detach(self, context, replica_info):
+ LOG.debug("Cleaning up the source on the detach of a replica.")
+ replication = REPLICATION_STRATEGY_CLASS(context)
+ replication.cleanup_source_on_replica_detach(MySqlAdmin(),
+ replica_info)
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py
index 810bdba0..62a1c646 100644
--- a/trove/guestagent/datastore/mysql/service.py
+++ b/trove/guestagent/datastore/mysql/service.py
@@ -51,8 +51,6 @@ MYSQL_BASE_DIR = "/var/lib/mysql"
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
-REPLICATION_USER = CONF.get(MANAGER).replication_user
-REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
INCLUDE_MARKER_OPERATORS = {
True: ">=",
@@ -341,11 +339,15 @@ class MySqlAdmin(object):
def delete_user(self, user):
"""Delete the specified user."""
+ mysql_user = models.MySQLUser()
+ mysql_user.deserialize(user)
+ self.delete_user_by_name(mysql_user.name, mysql_user.host)
+
+ def delete_user_by_name(self, name, host='%'):
with LocalSqlClient(get_engine()) as client:
- mysql_user = models.MySQLUser()
- mysql_user.deserialize(user)
- du = sql_query.DropUser(mysql_user.name, host=mysql_user.host)
+ du = sql_query.DropUser(name, host=host)
t = text(str(du))
+ LOG.debug("delete_user_by_name: %s", t)
client.execute(t)
def get_user(self, username, hostname):
@@ -839,13 +841,13 @@ class MySqlApp(object):
if os.path.exists(MYCNF_REPLMASTER):
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
- def grant_replication_privilege(self):
+ def grant_replication_privilege(self, replication_user):
LOG.info(_("Granting Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
- user=REPLICATION_USER,
- clear=REPLICATION_PASSWORD)
+ user=replication_user['name'],
+ clear=replication_user['password'])
t = text(str(g))
client.execute(t)
@@ -854,11 +856,13 @@ class MySqlApp(object):
LOG.info(_("Revoking Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
- g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
- user=REPLICATION_USER,
- clear=REPLICATION_PASSWORD)
+ results = client.execute('SHOW SLAVE STATUS').fetchall()
+ slave_status_info = results[0]
- t = text(str(g))
+ r = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
+ user=slave_status_info['master_user'])
+
+ t = text(str(r))
client.execute(t)
def get_port(self):
@@ -875,9 +879,10 @@ class MySqlApp(object):
}
return binlog_position
- def change_master_for_binlog(self, host, port, log_position):
+ def change_master_for_binlog(self, host, port, logging_config):
LOG.info(_("Configuring replication from %s.") % host)
+ replication_user = logging_config['replication_user']
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
@@ -887,10 +892,10 @@ class MySqlApp(object):
{
'host': host,
'port': port,
- 'user': REPLICATION_USER,
- 'password': REPLICATION_PASSWORD,
- 'log_file': log_position['log_file'],
- 'log_pos': log_position['log_position']
+ 'user': replication_user['name'],
+ 'password': replication_user['password'],
+ 'log_file': logging_config['log_file'],
+ 'log_pos': logging_config['log_position']
})
with LocalSqlClient(get_engine()) as client:
@@ -903,11 +908,18 @@ class MySqlApp(object):
self._wait_for_slave_status("ON", client, 60)
def stop_slave(self):
+ replication_user = None
LOG.info(_("Stopping slave replication."))
with LocalSqlClient(get_engine()) as client:
+ result = client.execute('SHOW SLAVE STATUS')
+ replication_user = result.first()['Master_User']
client.execute('STOP SLAVE')
client.execute('RESET SLAVE ALL')
self._wait_for_slave_status("OFF", client, 30)
+ client.execute('DROP USER ' + replication_user)
+ return {
+ 'replication_user': replication_user
+ }
def _wait_for_slave_status(self, status, client, max_time):
diff --git a/trove/guestagent/strategies/replication/base.py b/trove/guestagent/strategies/replication/base.py
index 2d4dc1c3..0736e2e8 100644
--- a/trove/guestagent/strategies/replication/base.py
+++ b/trove/guestagent/strategies/replication/base.py
@@ -53,5 +53,9 @@ class Replication(Strategy):
"""Turn off replication on a slave site."""
@abc.abstractmethod
+ def cleanup_source_on_replica_detach(self, service, replica_info):
+ """Clean up the source on the detach of a replica."""
+
+ @abc.abstractmethod
def demote_master(self, service):
"""Turn off replication on a master site."""
diff --git a/trove/guestagent/strategies/replication/mysql_binlog.py b/trove/guestagent/strategies/replication/mysql_binlog.py
index 604b516e..5a8e359d 100644
--- a/trove/guestagent/strategies/replication/mysql_binlog.py
+++ b/trove/guestagent/strategies/replication/mysql_binlog.py
@@ -15,12 +15,15 @@
#
import csv
+import uuid
from trove.common import cfg
from trove.common import exception
from trove.common import utils
from trove.guestagent.backup.backupagent import BackupAgent
from trove.guestagent.common import operating_system
+from trove.guestagent.datastore.mysql.service import MySqlAdmin
+from trove.guestagent.db import models
from trove.guestagent.strategies import backup
from trove.guestagent.strategies.replication import base
from trove.guestagent.strategies.storage import get_storage_strategy
@@ -65,6 +68,33 @@ class MysqlBinlogReplication(base.Replication):
}
return master_ref
+ def _create_replication_user(self):
+ replication_user = None
+ replication_password = utils.generate_random_password(16)
+
+ mysql_user = models.MySQLUser()
+ mysql_user.password = replication_password
+
+ retry_count = 0
+
+ while replication_user is None:
+ try:
+ mysql_user.name = 'slave_' + str(uuid.uuid4())[:8]
+ MySqlAdmin().create_user([mysql_user.serialize()])
+ LOG.debug("Trying to create replication user " +
+ mysql_user.name)
+ replication_user = {
+ 'name': mysql_user.name,
+ 'password': replication_password
+ }
+ except Exception:
+ retry_count += 1
+ if retry_count > 5:
+ LOG.error(_("Replication user retry count exceeded"))
+ raise
+
+ return replication_user
+
def snapshot_for_replication(self, context, service,
location, snapshot_info):
snapshot_id = snapshot_info['id']
@@ -76,29 +106,39 @@ class MysqlBinlogReplication(base.Replication):
AGENT.stream_backup_to_storage(snapshot_info, REPL_BACKUP_RUNNER,
storage, {}, REPL_EXTRA_OPTS)
+ replication_user = self._create_replication_user()
+ service.grant_replication_privilege(replication_user)
+
# With streamed InnobackupEx, the log position is in
# the stream and will be decoded by the slave
- log_position = {}
+ log_position = {
+ 'replication_user': replication_user
+ }
return snapshot_id, log_position
def enable_as_master(self, service, snapshot_info):
service.write_replication_overrides(MASTER_CONFIG)
service.restart()
- service.grant_replication_privilege()
def enable_as_slave(self, service, snapshot):
service.write_replication_overrides(SLAVE_CONFIG)
service.restart()
+ logging_config = snapshot['log_position']
+ logging_config.update(self._read_log_position())
service.change_master_for_binlog(
snapshot['master']['host'],
snapshot['master']['port'],
- self._read_log_position())
+ logging_config)
service.start_slave()
def detach_slave(self, service):
- service.stop_slave()
+ replica_info = service.stop_slave()
service.remove_replication_overrides()
service.restart()
+ return replica_info
+
+ def cleanup_source_on_replica_detach(self, admin_service, replica_info):
+ admin_service.delete_user_by_name(replica_info['replication_user'])
def demote_master(self, service):
service.revoke_replication_privilege()
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index 4e299ee7..7a381b31 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -60,8 +60,10 @@ class Manager(periodic_task.PeriodicTasks):
instance_tasks.restart()
def detach_replica(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.detach_replica()
+ slave = models.BuiltInstanceTasks.load(context, instance_id)
+ master_id = slave.slave_of_id
+ master = models.BuiltInstanceTasks.load(context, master_id)
+ slave.detach_replica(master)
def migrate(self, context, instance_id, host):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py
index 6360608a..7e92fbd2 100755
--- a/trove/taskmanager/models.py
+++ b/trove/taskmanager/models.py
@@ -928,14 +928,19 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
return run_with_quotas(self.context.tenant, {'backups': 1},
_get_replication_snapshot)
- def detach_replica(self):
+ def detach_replica(self, master):
LOG.debug("Calling detach_replica on %s" % self.id)
try:
- self.guest.detach_replica()
+ replica_info = self.guest.detach_replica()
+ master.cleanup_source_on_replica_detach(replica_info)
self.update_db(slave_of_id=None)
except (GuestError, GuestTimeout):
LOG.exception(_("Failed to detach replica %s.") % self.id)
+ def cleanup_source_on_replica_detach(self, replica_info):
+ LOG.debug("Calling cleanup_source_on_replica_detach on %s" % self.id)
+ self.guest.cleanup_source_on_replica_detach(replica_info)
+
def reboot(self):
try:
LOG.debug("Stopping datastore on instance %s." % self.id)
diff --git a/trove/tests/api/replication.py b/trove/tests/api/replication.py
index da5165db..bdb056b1 100644
--- a/trove/tests/api/replication.py
+++ b/trove/tests/api/replication.py
@@ -42,6 +42,14 @@ slave_instance = SlaveInstanceTestInfo()
existing_db_on_master = generate_uuid()
+def _get_user_count(server_info):
+ cmd = ('mysql -BNq -e \\\'select count\\(*\\) from mysql.user'
+ ' where user like \\\"slave_%\\\"\\\'')
+ server = create_server_connection(server_info.id)
+ stdout, stderr = server.execute(cmd)
+ return int(stdout.rstrip())
+
+
def slave_is_running(running=True):
def check_slave_is_running():
@@ -141,6 +149,11 @@ class VerifySlave(object):
def test_existing_db_exists_on_slave(self):
poll_until(self.db_is_found(existing_db_on_master))
+ @test(depends_on=[test_existing_db_exists_on_slave])
+ def test_slave_user_exists(self):
+ assert_equal(_get_user_count(slave_instance), 1)
+ assert_equal(_get_user_count(instance_info), 1)
+
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
@@ -200,6 +213,16 @@ class DetachReplica(object):
stdout, stderr = server.execute(cmd)
assert_equal(stdout, "0\n")
+ @test(depends_on=[test_detach_replica])
+ def test_slave_user_removed(self):
+ if CONFIG.fake_mode:
+ raise SkipTest("Test not_read_only not supported in fake mode")
+
+ def _slave_user_deleted():
+ return _get_user_count(instance_info) == 0
+
+ poll_until(_slave_user_deleted)
+
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py
index d6177c97..a9660606 100644
--- a/trove/tests/unittests/guestagent/test_dbaas.py
+++ b/trove/tests/unittests/guestagent/test_dbaas.py
@@ -293,7 +293,7 @@ class MySqlAdminTest(testtools.TestCase):
def test_delete_user(self):
- user = {"_name": "testUser"}
+ user = {"_name": "testUser", "_host": None}
self.mySqlAdmin.delete_user(user)
@@ -301,7 +301,7 @@ class MySqlAdminTest(testtools.TestCase):
call_args = dbaas.LocalSqlClient.execute.call_args
if call_args is not None:
args, _ = call_args
- expected = "DROP USER `testUser`;"
+ expected = "DROP USER `testUser`@`%`;"
self.assertEqual(args[0].text, expected,
"Delete user queries are not the same")