From 38b62ca7591c0919988f173c954ca2cfd3447d19 Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Mon, 7 Jul 2014 17:39:20 +0000 Subject: It works --- morphlib/exts/docker.write | 202 ++++++++++++++++++++++++++++++++------------- 1 file changed, 144 insertions(+), 58 deletions(-) diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write index 0e933cc3..23d2c29f 100644 --- a/morphlib/exts/docker.write +++ b/morphlib/exts/docker.write @@ -31,13 +31,107 @@ import contextlib import gzip import logging import os +import Queue import sys import tarfile +import threading +import time import urlparse import morphlib.writeexts +class ChunkedTarfileAdapter(object): + '''File-like object which allows batched writes. + + We need to send an entire system through a HTTP POST request. This might + be big, so it must be streamed in chunks. This object buffers data written + to it so that the 'requests' module can iterate through it and send each + block as a HTTP chunk. + + ''' + + # Some rough profiling with a 256.52MB system over Gb ethernet. + # Morph machine was x86_32 VM with reasonable CPU and RAM. + # + # no compression bufsize=100KB 256.52MB in 9.04 seconds (28.39 MB/sec) + # no compression bufsize=1MB 256.52MB in 11.45 seconds (22.4 MB/sec) + # gzip -1 bufsize=100KB 117.99MB in 19.34 seconds (6.10 MB/sec) + # no compression bufsize=10MB 256.52MB in 65.57 seconds (3.91 MB/sec) + # gzip -1 bufsize=10MB 124.39MB in 77 sec (1.61 MB/sec) + # gzip -5 bufsize=10MB 117.99MB in 84.27 seconds (1.40 MB/sec) + # no compression bufsize=100MB took pretty much forever + # + # Ideally the buffer size would adapt to the available IO speed & free + # memory. For now 100KB is OK. + + EXPECTED_BUFFER_SIZE = 100 * 1024 + + def __init__(self, status_interval=0, status_callback=None): + # Stream headers can involve several small writes (6 for gzip headers, + # for example). Therefore the queue buffers up to 10 blocks here. + self.queue = Queue.Queue(maxsize=10) + self.eof = False + self.exception = None + self.start_time = None + self.bytes_written = 0 + self.status_interval = status_interval + self.status_callback = status_callback + + def __iter__(self): + '''Generator for reading the queued data chunks. + + This should be used from the main thread of the program. + + ''' + while True: + if self.queue.empty() and self.eof: + return + try: + chunk = self.queue.get(block=True, timeout=0.1) + except Queue.Empty: + continue + else: + yield chunk + + def abort(self, exception=None): + '''Mark the transfer as failed.''' + exception = exception or Exception('Unknown exception') + self.exception = exception + + def close(self): + '''Mark the transfer as successfully completed.''' + self.eof = True + + def write(self, data_chunk): + '''Write a data chunk, blocking when the chunk queue is full. + + This can be called from a thread. If abort() is called, the exception + will be passed on and raised to the thread that is calling write(). + + ''' + if self.start_time is None: + self.start_time = time.time() + if len(data_chunk) == 0: + return + while True: + if self.exception is not None: + raise self.exception + try: + self.queue.put(data_chunk, block=True, timeout=0.1) + except Queue.Full: + continue + else: + self.bytes_written += len(data_chunk) + + def status(self): + '''Summarise the status of the transfer.''' + duration = time.time() - self.start_time + megabytes_written = self.bytes_written / float(MEGABYTES) + return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % ( + megabytes_written, duration, megabytes_written / duration) + + class DockerWriteExtension(morphlib.writeexts.WriteExtension): '''Create a Docker image or container from a Morph deployment. @@ -95,10 +189,6 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): docker_client = self.create_docker_client_with_remote_ssh_tunnel( user, host, port) - tar_read_fd, tar_write_fd = os.pipe() - - tar_read_fileobj = os.fdopen(tar_read_fd, 'r') - print docker_client.info() # FIXME: hack! The docker-py library should let us put in a fileobj and @@ -110,54 +200,44 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): # https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb import_url = docker_client._url('/images/create') - logging.debug('Open tar write FD') - tar_write_fileobj = os.fdopen(tar_write_fd, 'w') + def display_transfer_status(message): + status(message) + + tar_stream = ChunkedTarfileAdapter( + status_interval=1, status_callback=display_transfer_status) logging.debug('Create tar thread') - tar_bytes = 0 - import threading tar_thread = threading.Thread( - target=self.write_system_as_tar, args=[temp_root, tar_write_fileobj]) + target=self.stream_system_as_tar, args=[temp_root, tar_stream]) + # make the thread respond to ctrl+c + #tar_thread.daemon = True + tar_thread.start() - print tar_thread - print tar_thread.is_alive() - - import select - def batch_fileobj(fileobj, batch_size): - '''Split an fileobj up into batches of 'batch_size' items.''' - i = 0 - # This is hard, we need to signal end ... - while True: - data = fileobj.read(batch_size) - yield data - print "End of fileobj" - yield [] - print "Yielded None, called again ..." - - #logging.debug('Prepare request...') - #import_request_prepped = docker_client.prepare_request(import_request) - logging.debug('Send request...') - # FOR SOME REASON THIS SEEMS NEVER TO EXIT! - - #docker_client.send(import_request_prepped) - docker_client.post( - import_url, - data=batch_fileobj(tar_read_fileobj, 10240), - params={ - 'fromSrc': '-', - 'repo': image_name - }, - headers = { - 'Content-Type': 'application/tar', - 'Transfer-Encoding': 'chunked', - } - ) + + try: + docker_client.post( + import_url, + data=tar_stream, + params={ + 'fromSrc': '-', + 'repo': image_name + }, + headers = { + 'Content-Type': 'application/tar', + 'Transfer-Encoding': 'chunked', + } + ) + except BaseException as e: + logging.debug('Received %r while sending image', e) + tar_stream.abort(e) + raise print "OK! Wow, that surely didn't actually work." ### autostart = self.get_environment_boolean('AUTOSTART') + self.status(msg=tar_stream.status()) self.status( msg='Docker image %(image_name)s has been created', image_name=image_name) @@ -198,26 +278,32 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): return docker_client - def write_system_as_tar(self, fs_root, fileobj): + def stream_system_as_tar(self, fs_root, chunked_stream): # Using tarfile.TarFile.gzopen() and passing compresslevel=1 # seems to result in compresslevel=9 anyway. That's completely # unusable on ARM CPUs so it's important to force # compresslevel=1 or something low. - logging.debug('Writing system as a tar!') - #gzip_stream = gzip.GzipFile( - # mode='wb', - # compresslevel=1, - # fileobj=fileobj) - tar_stream = tarfile.TarFile.gzopen( - name='docker.write-temp', - mode='w', - compresslevel=1, - fileobj=fileobj)#gzip_stream) - logging.debug("Creating tar of rootfs") - tar_stream.add(fs_root, recursive=True) - tar_stream.close() - logging.debug('Tar complete') - tar_finished = True + try: + logging.debug('Writing system as a tar!') + #gzip_stream = gzip.GzipFile( + # mode='wb', + # compresslevel=5, + # fileobj=chunked_stream) + tar_stream = tarfile.TarFile.open( + name='docker.write-temp', + mode='w|', + bufsize=chunked_stream.EXPECTED_BUFFER_SIZE, + fileobj=chunked_stream) + #fileobj=gzip_stream) + logging.debug("Creating tar of rootfs") + tar_stream.add(fs_root, recursive=True) + tar_stream.close() + #gzip_stream.close() + logging.debug('Tar complete') + except BaseException as e: + logging.debug('tar thread: Received %r', e) + finally: + chunked_stream.close() DockerWriteExtension().run() -- cgit v1.2.1