summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-10-02 10:01:50 +0000
committerGerrit Code Review <review@openstack.org>2014-10-02 10:01:50 +0000
commitd6ab085cf51f852874e1ad63a9e28733208a6714 (patch)
treec3b088161cd08b1e6f3918a7071c74d70bd34003
parent957f72bf311d5c76c5e2f5fffdee4aebd12c56a5 (diff)
parente5c757f3ee9e2642f26139f3a344f9dcc1ab6fb9 (diff)
downloadtrove-d6ab085cf51f852874e1ad63a9e28733208a6714.tar.gz
Merge "Use unique passwords for replication user"2014.2.rc1
-rw-r--r--trove/common/cfg.py4
-rw-r--r--trove/guestagent/api.py7
-rw-r--r--trove/guestagent/datastore/mysql/manager.py9
-rw-r--r--trove/guestagent/datastore/mysql/service.py46
-rw-r--r--trove/guestagent/strategies/replication/base.py4
-rw-r--r--trove/guestagent/strategies/replication/mysql_binlog.py48
-rw-r--r--trove/taskmanager/manager.py6
-rwxr-xr-xtrove/taskmanager/models.py9
-rw-r--r--trove/tests/api/replication.py23
-rw-r--r--trove/tests/unittests/guestagent/test_dbaas.py4
10 files changed, 127 insertions, 33 deletions
diff --git a/trove/common/cfg.py b/trove/common/cfg.py
index 69e34d5d..bbd4e5f0 100644
--- a/trove/common/cfg.py
+++ b/trove/common/cfg.py
@@ -396,10 +396,6 @@ mysql_opts = [
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog',
help='Namespace to load replication strategies from.'),
- cfg.StrOpt('replication_user', default='slave_user',
- help='Userid for replication slave.', secret=True),
- cfg.StrOpt('replication_password', default='NETOU7897NNLOU',
- help='Password for replication slave user.', secret=True),
cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),
diff --git a/trove/guestagent/api.py b/trove/guestagent/api.py
index bf498c59..12f59497 100644
--- a/trove/guestagent/api.py
+++ b/trove/guestagent/api.py
@@ -340,7 +340,12 @@ class API(proxy.RpcProxy):
def detach_replica(self):
LOG.debug("Detaching replica %s from its replication source.", self.id)
- self._call("detach_replica", AGENT_HIGH_TIMEOUT)
+ return self._call("detach_replica", AGENT_HIGH_TIMEOUT)
+
+ def cleanup_source_on_replica_detach(self, replica_info):
+ LOG.debug("Cleaning up master %s on detach of replica.", self.id)
+ self._call("cleanup_source_on_replica_detach", AGENT_HIGH_TIMEOUT,
+ replica_info=replica_info)
def demote_replication_master(self):
LOG.debug("Demoting instance %s to non-master.", self.id)
diff --git a/trove/guestagent/datastore/mysql/manager.py b/trove/guestagent/datastore/mysql/manager.py
index 2e848d71..5947cab4 100644
--- a/trove/guestagent/datastore/mysql/manager.py
+++ b/trove/guestagent/datastore/mysql/manager.py
@@ -276,7 +276,14 @@ class Manager(periodic_task.PeriodicTasks):
LOG.debug("Detaching replica.")
app = MySqlApp(MySqlAppStatus.get())
replication = REPLICATION_STRATEGY_CLASS(context)
- replication.detach_slave(app)
+ replica_info = replication.detach_slave(app)
+ return replica_info
+
+ def cleanup_source_on_replica_detach(self, context, replica_info):
+ LOG.debug("Cleaning up the source on the detach of a replica.")
+ replication = REPLICATION_STRATEGY_CLASS(context)
+ replication.cleanup_source_on_replica_detach(MySqlAdmin(),
+ replica_info)
def demote_replication_master(self, context):
LOG.debug("Demoting replication master.")
diff --git a/trove/guestagent/datastore/mysql/service.py b/trove/guestagent/datastore/mysql/service.py
index 24618933..80f044b6 100644
--- a/trove/guestagent/datastore/mysql/service.py
+++ b/trove/guestagent/datastore/mysql/service.py
@@ -51,8 +51,6 @@ MYSQL_BASE_DIR = "/var/lib/mysql"
CONF = cfg.CONF
MANAGER = CONF.datastore_manager if CONF.datastore_manager else 'mysql'
-REPLICATION_USER = CONF.get(MANAGER).replication_user
-REPLICATION_PASSWORD = CONF.get(MANAGER).replication_password
INCLUDE_MARKER_OPERATORS = {
True: ">=",
@@ -341,11 +339,15 @@ class MySqlAdmin(object):
def delete_user(self, user):
"""Delete the specified user."""
+ mysql_user = models.MySQLUser()
+ mysql_user.deserialize(user)
+ self.delete_user_by_name(mysql_user.name, mysql_user.host)
+
+ def delete_user_by_name(self, name, host='%'):
with LocalSqlClient(get_engine()) as client:
- mysql_user = models.MySQLUser()
- mysql_user.deserialize(user)
- du = sql_query.DropUser(mysql_user.name, host=mysql_user.host)
+ du = sql_query.DropUser(name, host=host)
t = text(str(du))
+ LOG.debug("delete_user_by_name: %s", t)
client.execute(t)
def get_user(self, username, hostname):
@@ -839,13 +841,13 @@ class MySqlApp(object):
if os.path.exists(MYCNF_REPLMASTER):
utils.execute_with_timeout("sudo", "rm", MYCNF_REPLMASTER)
- def grant_replication_privilege(self):
+ def grant_replication_privilege(self, replication_user):
LOG.info(_("Granting Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
- user=REPLICATION_USER,
- clear=REPLICATION_PASSWORD)
+ user=replication_user['name'],
+ clear=replication_user['password'])
t = text(str(g))
client.execute(t)
@@ -854,11 +856,13 @@ class MySqlApp(object):
LOG.info(_("Revoking Replication Slave privilege."))
with LocalSqlClient(get_engine()) as client:
- g = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
- user=REPLICATION_USER,
- clear=REPLICATION_PASSWORD)
+ results = client.execute('SHOW SLAVE STATUS').fetchall()
+ slave_status_info = results[0]
- t = text(str(g))
+ r = sql_query.Revoke(permissions=['REPLICATION SLAVE'],
+ user=slave_status_info['master_user'])
+
+ t = text(str(r))
client.execute(t)
def get_port(self):
@@ -875,9 +879,10 @@ class MySqlApp(object):
}
return binlog_position
- def change_master_for_binlog(self, host, port, log_position):
+ def change_master_for_binlog(self, host, port, logging_config):
LOG.info(_("Configuring replication from %s.") % host)
+ replication_user = logging_config['replication_user']
change_master_cmd = ("CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
@@ -887,10 +892,10 @@ class MySqlApp(object):
{
'host': host,
'port': port,
- 'user': REPLICATION_USER,
- 'password': REPLICATION_PASSWORD,
- 'log_file': log_position['log_file'],
- 'log_pos': log_position['log_position']
+ 'user': replication_user['name'],
+ 'password': replication_user['password'],
+ 'log_file': logging_config['log_file'],
+ 'log_pos': logging_config['log_position']
})
with LocalSqlClient(get_engine()) as client:
@@ -903,11 +908,18 @@ class MySqlApp(object):
self._wait_for_slave_status("ON", client, 60)
def stop_slave(self):
+ replication_user = None
LOG.info(_("Stopping slave replication."))
with LocalSqlClient(get_engine()) as client:
+ result = client.execute('SHOW SLAVE STATUS')
+ replication_user = result.first()['Master_User']
client.execute('STOP SLAVE')
client.execute('RESET SLAVE ALL')
self._wait_for_slave_status("OFF", client, 30)
+ client.execute('DROP USER ' + replication_user)
+ return {
+ 'replication_user': replication_user
+ }
def _wait_for_slave_status(self, status, client, max_time):
diff --git a/trove/guestagent/strategies/replication/base.py b/trove/guestagent/strategies/replication/base.py
index 2d4dc1c3..0736e2e8 100644
--- a/trove/guestagent/strategies/replication/base.py
+++ b/trove/guestagent/strategies/replication/base.py
@@ -53,5 +53,9 @@ class Replication(Strategy):
"""Turn off replication on a slave site."""
@abc.abstractmethod
+ def cleanup_source_on_replica_detach(self, service, replica_info):
+ """Clean up the source on the detach of a replica."""
+
+ @abc.abstractmethod
def demote_master(self, service):
"""Turn off replication on a master site."""
diff --git a/trove/guestagent/strategies/replication/mysql_binlog.py b/trove/guestagent/strategies/replication/mysql_binlog.py
index 15508de6..bbe81aa6 100644
--- a/trove/guestagent/strategies/replication/mysql_binlog.py
+++ b/trove/guestagent/strategies/replication/mysql_binlog.py
@@ -15,12 +15,15 @@
#
import csv
+import uuid
from trove.common import cfg
from trove.common import exception
from trove.common import utils
from trove.guestagent.backup.backupagent import BackupAgent
from trove.guestagent.common import operating_system
+from trove.guestagent.datastore.mysql.service import MySqlAdmin
+from trove.guestagent.db import models
from trove.guestagent.strategies import backup
from trove.guestagent.strategies.replication import base
from trove.guestagent.strategies.storage import get_storage_strategy
@@ -65,6 +68,33 @@ class MysqlBinlogReplication(base.Replication):
}
return master_ref
+ def _create_replication_user(self):
+ replication_user = None
+ replication_password = utils.generate_random_password(16)
+
+ mysql_user = models.MySQLUser()
+ mysql_user.password = replication_password
+
+ retry_count = 0
+
+ while replication_user is None:
+ try:
+ mysql_user.name = 'slave_' + str(uuid.uuid4())[:8]
+ MySqlAdmin().create_user([mysql_user.serialize()])
+ LOG.debug("Trying to create replication user " +
+ mysql_user.name)
+ replication_user = {
+ 'name': mysql_user.name,
+ 'password': replication_password
+ }
+ except Exception:
+ retry_count += 1
+ if retry_count > 5:
+ LOG.error(_("Replication user retry count exceeded"))
+ raise
+
+ return replication_user
+
def snapshot_for_replication(self, context, service,
location, snapshot_info):
snapshot_id = snapshot_info['id']
@@ -76,9 +106,14 @@ class MysqlBinlogReplication(base.Replication):
AGENT.stream_backup_to_storage(snapshot_info, REPL_BACKUP_RUNNER,
storage, {}, REPL_EXTRA_OPTS)
+ replication_user = self._create_replication_user()
+ service.grant_replication_privilege(replication_user)
+
# With streamed InnobackupEx, the log position is in
# the stream and will be decoded by the slave
- log_position = {}
+ log_position = {
+ 'replication_user': replication_user
+ }
return snapshot_id, log_position
def enable_as_master(self, service, snapshot_info, master_config):
@@ -86,23 +121,28 @@ class MysqlBinlogReplication(base.Replication):
master_config = MASTER_CONFIG
service.write_replication_overrides(master_config)
service.restart()
- service.grant_replication_privilege()
def enable_as_slave(self, service, snapshot, slave_config):
if not slave_config:
slave_config = SLAVE_CONFIG
service.write_replication_overrides(slave_config)
service.restart()
+ logging_config = snapshot['log_position']
+ logging_config.update(self._read_log_position())
service.change_master_for_binlog(
snapshot['master']['host'],
snapshot['master']['port'],
- self._read_log_position())
+ logging_config)
service.start_slave()
def detach_slave(self, service):
- service.stop_slave()
+ replica_info = service.stop_slave()
service.remove_replication_overrides()
service.restart()
+ return replica_info
+
+ def cleanup_source_on_replica_detach(self, admin_service, replica_info):
+ admin_service.delete_user_by_name(replica_info['replication_user'])
def demote_master(self, service):
service.revoke_replication_privilege()
diff --git a/trove/taskmanager/manager.py b/trove/taskmanager/manager.py
index 1ea7b1e9..5c119c9b 100644
--- a/trove/taskmanager/manager.py
+++ b/trove/taskmanager/manager.py
@@ -60,8 +60,10 @@ class Manager(periodic_task.PeriodicTasks):
instance_tasks.restart()
def detach_replica(self, context, instance_id):
- instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
- instance_tasks.detach_replica()
+ slave = models.BuiltInstanceTasks.load(context, instance_id)
+ master_id = slave.slave_of_id
+ master = models.BuiltInstanceTasks.load(context, master_id)
+ slave.detach_replica(master)
def migrate(self, context, instance_id, host):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
diff --git a/trove/taskmanager/models.py b/trove/taskmanager/models.py
index f1bce8a8..2415eb0d 100755
--- a/trove/taskmanager/models.py
+++ b/trove/taskmanager/models.py
@@ -945,14 +945,19 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
return run_with_quotas(self.context.tenant, {'backups': 1},
_get_replication_snapshot)
- def detach_replica(self):
+ def detach_replica(self, master):
LOG.debug("Calling detach_replica on %s" % self.id)
try:
- self.guest.detach_replica()
+ replica_info = self.guest.detach_replica()
+ master.cleanup_source_on_replica_detach(replica_info)
self.update_db(slave_of_id=None)
except (GuestError, GuestTimeout):
LOG.exception(_("Failed to detach replica %s.") % self.id)
+ def cleanup_source_on_replica_detach(self, replica_info):
+ LOG.debug("Calling cleanup_source_on_replica_detach on %s" % self.id)
+ self.guest.cleanup_source_on_replica_detach(replica_info)
+
def reboot(self):
try:
LOG.debug("Stopping datastore on instance %s." % self.id)
diff --git a/trove/tests/api/replication.py b/trove/tests/api/replication.py
index da5165db..bdb056b1 100644
--- a/trove/tests/api/replication.py
+++ b/trove/tests/api/replication.py
@@ -42,6 +42,14 @@ slave_instance = SlaveInstanceTestInfo()
existing_db_on_master = generate_uuid()
+def _get_user_count(server_info):
+ cmd = ('mysql -BNq -e \\\'select count\\(*\\) from mysql.user'
+ ' where user like \\\"slave_%\\\"\\\'')
+ server = create_server_connection(server_info.id)
+ stdout, stderr = server.execute(cmd)
+ return int(stdout.rstrip())
+
+
def slave_is_running(running=True):
def check_slave_is_running():
@@ -141,6 +149,11 @@ class VerifySlave(object):
def test_existing_db_exists_on_slave(self):
poll_until(self.db_is_found(existing_db_on_master))
+ @test(depends_on=[test_existing_db_exists_on_slave])
+ def test_slave_user_exists(self):
+ assert_equal(_get_user_count(slave_instance), 1)
+ assert_equal(_get_user_count(instance_info), 1)
+
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
@@ -200,6 +213,16 @@ class DetachReplica(object):
stdout, stderr = server.execute(cmd)
assert_equal(stdout, "0\n")
+ @test(depends_on=[test_detach_replica])
+ def test_slave_user_removed(self):
+ if CONFIG.fake_mode:
+ raise SkipTest("Test not_read_only not supported in fake mode")
+
+ def _slave_user_deleted():
+ return _get_user_count(instance_info) == 0
+
+ poll_until(_slave_user_deleted)
+
@test(groups=[GROUP],
depends_on=[WaitForCreateSlaveToFinish],
diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py
index d6177c97..a9660606 100644
--- a/trove/tests/unittests/guestagent/test_dbaas.py
+++ b/trove/tests/unittests/guestagent/test_dbaas.py
@@ -293,7 +293,7 @@ class MySqlAdminTest(testtools.TestCase):
def test_delete_user(self):
- user = {"_name": "testUser"}
+ user = {"_name": "testUser", "_host": None}
self.mySqlAdmin.delete_user(user)
@@ -301,7 +301,7 @@ class MySqlAdminTest(testtools.TestCase):
call_args = dbaas.LocalSqlClient.execute.call_args
if call_args is not None:
args, _ = call_args
- expected = "DROP USER `testUser`;"
+ expected = "DROP USER `testUser`@`%`;"
self.assertEqual(args[0].text, expected,
"Delete user queries are not the same")