summaryrefslogtreecommitdiff
path: root/python/samba/tests/domain_backup_offline.py
blob: 4ba1249e81b300f669b4106258254fba9195f5bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@samba.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import tarfile
import os
import shutil
import tempfile
from samba.tests import BlackboxTestCase
from samba.netcmd import CommandError
from samba.param import LoadParm

# The backup tests require that a completely clean LoadParm object gets used
# for the restore. Otherwise the same global LP gets re-used, and the LP
# settings can bleed from one test case to another.
# To do this, these tests should use check_output(), which executes the command
# in a separate process (as opposed to runcmd(), runsubcmd()).
# So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
# so that we never inadvertently use .runcmd() by accident.
class DomainBackupOfflineCmp(BlackboxTestCase):

    def test_domain_backup_offline_nested_tdb(self):
        self.nested_testcase('tdb')

    def test_domain_backup_offline_nested_mdb(self):
        self.nested_testcase('mdb')

    def nested_testcase(self, backend):
        self.prov_dir = self.provision(backend)
        self.extract_dir = None

        src = os.path.join(self.prov_dir, "private")
        dst = os.path.join(self.prov_dir, "state", "private")

        # Move private directory inside state directory
        shutil.move(src, dst)

        smbconf = os.path.join(self.prov_dir, "etc", "smb.conf")

        # Update the conf file
        lp = LoadParm(filename_for_non_global_lp=smbconf)
        lp.set("private dir", dst)
        lp.dump(False, smbconf)

        backup_file = self.backup(self.prov_dir)

        # Ensure each file is only present once in the tar file
        tf = tarfile.open(backup_file)
        names = tf.getnames()
        self.assertEqual(len(names), len(set(names)))

    def test_domain_backup_offline_hard_link_tdb(self):
        self.hard_link_testcase('tdb')

    def test_domain_backup_offline_hard_link_mdb(self):
        self.hard_link_testcase('mdb')

    def hard_link_testcase(self, backend):
        self.prov_dir = self.provision(backend)
        self.extract_dir = None

        # Create hard links in the private and state directories
        os.link(os.path.join(self.prov_dir, "private", "krb5.conf"),
                os.path.join(self.prov_dir, "state", "krb5.conf"))

        backup_file = self.backup(self.prov_dir)

        # Extract the backup
        self.extract_dir = tempfile.mkdtemp(dir=self.tempdir)
        tf = tarfile.open(backup_file)
        tf.extractall(self.extract_dir)

        # Ensure that the hard link in the private directory was backed up,
        # while the one in the state directory was not.
        self.assertTrue(os.path.exists(os.path.join(self.extract_dir,
                                                    "private", "krb5.conf")))
        self.assertFalse(os.path.exists(os.path.join(self.extract_dir,
                                                    "statedir", "krb5.conf")))

    def test_domain_backup_offline_untar_tdb(self):
        self.untar_testcase('tdb')

    def test_domain_backup_offline_untar_mdb(self):
        self.untar_testcase('mdb')

    def test_domain_backup_offline_restore_tdb(self):
        self.restore_testcase('tdb')

    def test_domain_backup_offline_restore_mdb(self):
        self.restore_testcase('mdb')

    def restore_testcase(self, backend):
        self.prov_dir = self.provision(backend)
        self.extract_dir = None
        backup_file = self.backup(self.prov_dir)

        self.extract_dir = tempfile.mkdtemp(dir=self.tempdir)
        cmd = ("samba-tool domain backup restore --backup-file={f}"
               " --targetdir={d} "
               "--newservername=NEWSERVER").format(f=backup_file,
                                                   d=self.extract_dir)
        self.check_output(cmd)

        # attrs that are altered by the restore process
        ignore_attrs = ["servicePrincipalName", "lastLogonTimestamp",
                        "rIDAllocationPool", "rIDAvailablePool",
                        "localPolicyFlags", "operatingSystem", "displayName",
                        "dnsRecord", "dNSTombstoned",
                        "msDS-NC-Replica-Locations", "msDS-HasInstantiatedNCs",
                        "interSiteTopologyGenerator"]
        filter_arg = "--filter=" + ",".join(ignore_attrs)
        args = ["--two", filter_arg]
        self.ldapcmp(self.prov_dir, self.extract_dir, args)

    def untar_testcase(self, backend):
        self.prov_dir = self.provision(backend)
        self.extract_dir = None
        backup_file = self.backup(self.prov_dir)

        self.extract_dir = tempfile.mkdtemp(dir=self.tempdir)
        tf = tarfile.open(backup_file)
        tf.extractall(self.extract_dir)

        self.ldapcmp(self.prov_dir, self.extract_dir)

    def ldapcmp(self, prov_dir, ex_dir, args=[]):
        sam_fn = os.path.join("private", "sam.ldb")
        url1 = "tdb://" + os.path.join(os.path.realpath(prov_dir), sam_fn)
        url2 = "tdb://" + os.path.join(os.path.realpath(ex_dir), sam_fn)

        # Compare the restored sam.ldb with the old one
        for partition in ["domain", "configuration", "schema",
                          "dnsdomain", "dnsforest"]:
            cmd = "samba-tool ldapcmp " + " ".join([url1, url2, partition] + args)
            self.check_output(cmd)

    # Test the "samba-tool domain backup" command with ldapcmp
    def provision(self, backend):
        target = tempfile.mkdtemp(dir=self.tempdir)

        # Provision domain.  Use fake ACLs and store xattrs in tdbs so that
        # NTACL backup will work inside the testenv.
        # host-name option must be given because if this test runs on a
        # system with a very long hostname, it will be shortened in certain
        # circumstances, causing the ldapcmp to fail.
        prov_cmd = "samba-tool domain provision " +\
                   "--domain FOO --realm foo.example.com " +\
                   "--targetdir {target} " +\
                   "--backend-store {backend} " +\
                   "--host-name OLDSERVER "+\
                   "--option=\"vfs objects=fake_acls xattr_tdb\""
        prov_cmd = prov_cmd.format(target=target, backend=backend)
        self.check_output(prov_cmd)

        return target

    def backup(self, prov_dir):
        # Run the backup and check we got one backup tar file
        cmd = ("samba-tool domain backup offline --targetdir={prov_dir} "
               "--configfile={prov_dir}/etc/smb.conf").format(prov_dir=prov_dir)
        self.check_output(cmd)

        tar_files = [fn for fn in os.listdir(prov_dir)
                     if fn.startswith("samba-backup-") and
                     fn.endswith(".tar.bz2")]
        if len(tar_files) != 1:
            raise CommandError("expected domain backup to create one tar" +
                               " file but got {0}".format(len(tar_files)))

        backup_file = os.path.join(prov_dir, tar_files[0])
        return backup_file

    def tearDown(self):
        # Remove temporary directories
        shutil.rmtree(self.prov_dir)
        if self.extract_dir:
            shutil.rmtree(self.extract_dir)