summaryrefslogtreecommitdiff
path: root/tests/smbserver.py
blob: 3de8d69c0a5ff5bbb287a1951e7ee5d8b67032ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
#  Project                     ___| | | |  _ \| |
#                             / __| | | | |_) | |
#                            | (__| |_| |  _ <| |___
#                             \___|\___/|_| \_\_____|
#
# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
# SPDX-License-Identifier: curl
#
"""Server for testing SMB"""

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import argparse
import logging
import os
import signal
import sys
import tempfile
import threading

# Import our curl test data helper
from util import ClosingFileHandler, TestData

if sys.version_info.major >= 3:
    import configparser
else:
    import ConfigParser as configparser

# impacket needs to be installed in the Python environment
try:
    import impacket
except ImportError:
    sys.stderr.write('Python package impacket needs to be installed!\n')
    sys.stderr.write('Use pip or your package manager to install it.\n')
    sys.exit(1)
from impacket import smb as imp_smb
from impacket import smbserver as imp_smbserver
from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_NO_SUCH_FILE,
                                STATUS_SUCCESS)

log = logging.getLogger(__name__)
SERVER_MAGIC = "SERVER_MAGIC"
TESTS_MAGIC = "TESTS_MAGIC"
VERIFIED_REQ = "verifiedserver"
VERIFIED_RSP = "WE ROOLZ: {pid}\n"


class ShutdownHandler(threading.Thread):
    """Cleanly shut down the SMB server

    This can only be done from another thread while the server is in
    serve_forever(), so a thread is spawned here that waits for a shutdown
    signal before doing its thing. Use in a with statement around the
    serve_forever() call.
    """

    def __init__(self, server):
        super(ShutdownHandler, self).__init__()
        self.server = server
        self.shutdown_event = threading.Event()

    def __enter__(self):
        self.start()
        signal.signal(signal.SIGINT, self._sighandler)
        signal.signal(signal.SIGTERM, self._sighandler)

    def __exit__(self, *_):
        # Call for shutdown just in case it wasn't done already
        self.shutdown_event.set()
        # Wait for thread, and therefore also the server, to finish
        self.join()
        # Uninstall our signal handlers
        signal.signal(signal.SIGINT, signal.SIG_DFL)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        # Delete any temporary files created by the server during its run
        log.info("Deleting %d temporary files", len(self.server.tmpfiles))
        for f in self.server.tmpfiles:
            os.unlink(f)

    def _sighandler(self, _signum, _frame):
        # Wake up the cleanup task
        self.shutdown_event.set()

    def run(self):
        # Wait for shutdown signal
        self.shutdown_event.wait()
        # Notify the server to shut down
        self.server.shutdown()


def smbserver(options):
    """Start up a TCP SMB server that serves forever

    """
    if options.pidfile:
        pid = os.getpid()
        # see tests/server/util.c function write_pidfile
        if os.name == "nt":
            pid += 65536
        with open(options.pidfile, "w") as f:
            f.write(str(pid))

    # Here we write a mini config for the server
    smb_config = configparser.ConfigParser()
    smb_config.add_section("global")
    smb_config.set("global", "server_name", "SERVICE")
    smb_config.set("global", "server_os", "UNIX")
    smb_config.set("global", "server_domain", "WORKGROUP")
    smb_config.set("global", "log_file", "")
    smb_config.set("global", "credentials_file", "")

    # We need a share which allows us to test that the server is running
    smb_config.add_section("SERVER")
    smb_config.set("SERVER", "comment", "server function")
    smb_config.set("SERVER", "read only", "yes")
    smb_config.set("SERVER", "share type", "0")
    smb_config.set("SERVER", "path", SERVER_MAGIC)

    # Have a share for tests.  These files will be autogenerated from the
    # test input.
    smb_config.add_section("TESTS")
    smb_config.set("TESTS", "comment", "tests")
    smb_config.set("TESTS", "read only", "yes")
    smb_config.set("TESTS", "share type", "0")
    smb_config.set("TESTS", "path", TESTS_MAGIC)

    if not options.srcdir or not os.path.isdir(options.srcdir):
        raise ScriptException("--srcdir is mandatory")

    test_data_dir = os.path.join(options.srcdir, "data")

    smb_server = TestSmbServer((options.host, options.port),
                               config_parser=smb_config,
                               test_data_directory=test_data_dir)
    log.info("[SMB] setting up SMB server on port %s", options.port)
    smb_server.processConfigFile()

    # Start a thread that cleanly shuts down the server on a signal
    with ShutdownHandler(smb_server):
        # This will block until smb_server.shutdown() is called
        smb_server.serve_forever()

    return 0


class TestSmbServer(imp_smbserver.SMBSERVER):
    """
    Test server for SMB which subclasses the impacket SMBSERVER and provides
    test functionality.
    """

    def __init__(self,
                 address,
                 config_parser=None,
                 test_data_directory=None):
        imp_smbserver.SMBSERVER.__init__(self,
                                         address,
                                         config_parser=config_parser)
        self.tmpfiles = []

        # Set up a test data object so we can get test data later.
        self.ctd = TestData(test_data_directory)

        # Override smbComNtCreateAndX so we can pretend to have files which
        # don't exist.
        self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
                            self.create_and_x)

    def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
        """
        Our version of smbComNtCreateAndX looks for special test files and
        fools the rest of the framework into opening them as if they were
        normal files.
        """
        conn_data = smb_server.getConnectionData(conn_id)

        # Wrap processing in a try block which allows us to throw SmbException
        # to control the flow.
        try:
            ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
                smb_command["Parameters"])

            path = self.get_share_path(conn_data,
                                       ncax_parms["RootFid"],
                                       recv_packet["Tid"])
            log.info("[SMB] Requested share path: %s", path)

            disposition = ncax_parms["Disposition"]
            log.debug("[SMB] Requested disposition: %s", disposition)

            # Currently we only support reading files.
            if disposition != imp_smb.FILE_OPEN:
                raise SmbException(STATUS_ACCESS_DENIED,
                                   "Only support reading files")

            # Check to see if the path we were given is actually a
            # magic path which needs generating on the fly.
            if path not in [SERVER_MAGIC, TESTS_MAGIC]:
                # Pass the command onto the original handler.
                return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
                                                                    smb_server,
                                                                    smb_command,
                                                                    recv_packet)

            flags2 = recv_packet["Flags2"]
            ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
                                                     data=smb_command[
                                                         "Data"])
            requested_file = imp_smbserver.decodeSMBString(
                flags2,
                ncax_data["FileName"])
            log.debug("[SMB] User requested file '%s'", requested_file)

            if path == SERVER_MAGIC:
                fid, full_path = self.get_server_path(requested_file)
            else:
                assert (path == TESTS_MAGIC)
                fid, full_path = self.get_test_path(requested_file)

            self.tmpfiles.append(full_path)

            resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
            resp_data = ""

            # Simple way to generate a fid
            if len(conn_data["OpenedFiles"]) == 0:
                fakefid = 1
            else:
                fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
            resp_parms["Fid"] = fakefid
            resp_parms["CreateAction"] = disposition

            if os.path.isdir(path):
                resp_parms[
                    "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
                resp_parms["IsDirectory"] = 1
            else:
                resp_parms["IsDirectory"] = 0
                resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]

            # Get this file's information
            resp_info, error_code = imp_smbserver.queryPathInformation(
                os.path.dirname(full_path), os.path.basename(full_path),
                level=imp_smb.SMB_QUERY_FILE_ALL_INFO)

            if error_code != STATUS_SUCCESS:
                raise SmbException(error_code, "Failed to query path info")

            resp_parms["CreateTime"] = resp_info["CreationTime"]
            resp_parms["LastAccessTime"] = resp_info[
                "LastAccessTime"]
            resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
            resp_parms["LastChangeTime"] = resp_info[
                "LastChangeTime"]
            resp_parms["FileAttributes"] = resp_info[
                "ExtFileAttributes"]
            resp_parms["AllocationSize"] = resp_info[
                "AllocationSize"]
            resp_parms["EndOfFile"] = resp_info["EndOfFile"]

            # Let's store the fid for the connection
            # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
            conn_data["OpenedFiles"][fakefid] = {}
            conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
            conn_data["OpenedFiles"][fakefid]["FileName"] = path
            conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False

        except SmbException as s:
            log.debug("[SMB] SmbException hit: %s", s)
            error_code = s.error_code
            resp_parms = ""
            resp_data = ""

        resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
        resp_cmd["Parameters"] = resp_parms
        resp_cmd["Data"] = resp_data
        smb_server.setConnectionData(conn_id, conn_data)

        return [resp_cmd], None, error_code

    def get_share_path(self, conn_data, root_fid, tid):
        conn_shares = conn_data["ConnectedShares"]

        if tid in conn_shares:
            if root_fid > 0:
                # If we have a rootFid, the path is relative to that fid
                path = conn_data["OpenedFiles"][root_fid]["FileName"]
                log.debug("RootFid present %s!" % path)
            else:
                if "path" in conn_shares[tid]:
                    path = conn_shares[tid]["path"]
                else:
                    raise SmbException(STATUS_ACCESS_DENIED,
                                       "Connection share had no path")
        else:
            raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
                               "TID was invalid")

        return path

    def get_server_path(self, requested_filename):
        log.debug("[SMB] Get server path '%s'", requested_filename)

        if requested_filename not in [VERIFIED_REQ]:
            raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")

        fid, filename = tempfile.mkstemp()
        log.debug("[SMB] Created %s (%d) for storing '%s'",
                  filename, fid, requested_filename)

        contents = ""

        if requested_filename == VERIFIED_REQ:
            log.debug("[SMB] Verifying server is alive")
            pid = os.getpid()
            # see tests/server/util.c function write_pidfile
            if os.name == "nt":
                pid += 65536
            contents = VERIFIED_RSP.format(pid=pid).encode('utf-8')

        self.write_to_fid(fid, contents)
        return fid, filename

    def write_to_fid(self, fid, contents):
        # Write the contents to file descriptor
        os.write(fid, contents)
        os.fsync(fid)

        # Rewind the file to the beginning so a read gets us the contents
        os.lseek(fid, 0, os.SEEK_SET)

    def get_test_path(self, requested_filename):
        log.info("[SMB] Get reply data from 'test%s'", requested_filename)

        fid, filename = tempfile.mkstemp()
        log.debug("[SMB] Created %s (%d) for storing test '%s'",
                  filename, fid, requested_filename)

        try:
            contents = self.ctd.get_test_data(requested_filename).encode('utf-8')
            self.write_to_fid(fid, contents)
            return fid, filename

        except Exception:
            log.exception("Failed to make test file")
            raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")


class SmbException(Exception):
    def __init__(self, error_code, error_message):
        super(SmbException, self).__init__(error_message)
        self.error_code = error_code


class ScriptRC(object):
    """Enum for script return codes"""
    SUCCESS = 0
    FAILURE = 1
    EXCEPTION = 2


class ScriptException(Exception):
    pass


def get_options():
    parser = argparse.ArgumentParser()

    parser.add_argument("--port", action="store", default=9017,
                      type=int, help="port to listen on")
    parser.add_argument("--host", action="store", default="127.0.0.1",
                      help="host to listen on")
    parser.add_argument("--verbose", action="store", type=int, default=0,
                        help="verbose output")
    parser.add_argument("--pidfile", action="store",
                        help="file name for the PID")
    parser.add_argument("--logfile", action="store",
                        help="file name for the log")
    parser.add_argument("--srcdir", action="store", help="test directory")
    parser.add_argument("--id", action="store", help="server ID")
    parser.add_argument("--ipv4", action="store_true", default=0,
                        help="IPv4 flag")

    return parser.parse_args()


def setup_logging(options):
    """
    Set up logging from the command line options
    """
    root_logger = logging.getLogger()
    add_stdout = False

    formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")

    # Write out to a logfile
    if options.logfile:
        handler = ClosingFileHandler(options.logfile)
        handler.setFormatter(formatter)
        handler.setLevel(logging.DEBUG)
        root_logger.addHandler(handler)
    else:
        # The logfile wasn't specified. Add a stdout logger.
        add_stdout = True

    if options.verbose:
        # Add a stdout logger as well in verbose mode
        root_logger.setLevel(logging.DEBUG)
        add_stdout = True
    else:
        root_logger.setLevel(logging.INFO)

    if add_stdout:
        stdout_handler = logging.StreamHandler(sys.stdout)
        stdout_handler.setFormatter(formatter)
        stdout_handler.setLevel(logging.DEBUG)
        root_logger.addHandler(stdout_handler)


if __name__ == '__main__':
    # Get the options from the user.
    options = get_options()

    # Setup logging using the user options
    setup_logging(options)

    # Run main script.
    try:
        rc = smbserver(options)
    except Exception as e:
        log.exception(e)
        rc = ScriptRC.EXCEPTION

    if options.pidfile and os.path.isfile(options.pidfile):
        os.unlink(options.pidfile)

    log.info("[SMB] Returning %d", rc)
    sys.exit(rc)