diff options
author | Sam Thursfield <sam.thursfield@codethink.co.uk> | 2014-07-07 18:17:53 +0000 |
---|---|---|
committer | Sam Thursfield <sam@afuera.me.uk> | 2014-09-05 13:00:21 +0000 |
commit | 153bab2a8b10dee3d4f961348594e7b24ffea6a8 (patch) | |
tree | bbfb68cac81dedbab964184a84618ed44a5f0a81 | |
parent | f71ed9f3de7eeffe52dc0e34a521d9ba6aee899c (diff) | |
download | morph-153bab2a8b10dee3d4f961348594e7b24ffea6a8.tar.gz |
It still works
-rw-r--r-- | morphlib/exts/docker.write | 170 |
1 files changed, 82 insertions, 88 deletions
diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write index 342fddef..e86f2d17 100644 --- a/morphlib/exts/docker.write +++ b/morphlib/exts/docker.write @@ -17,22 +17,16 @@ '''A Morph deployment write extension for deploying to Docker hosts''' -# bgtunnel: From https://github.com/jmagnusson/bgtunnel -# (using Paramiko would be better, but it's not in Baserock yet. Its -# demos/forward.py demonstrates what we need). -import bgtunnel # From https://github.com/dotcloud/docker-py import docker +#import paramiko + import cliapp -import contextlib -import gzip import logging -import os import Queue -import sys import tarfile import threading import time @@ -49,8 +43,6 @@ class ChunkedTarfileAdapter(object): to it so that the 'requests' module can iterate through it and send each block as a HTTP chunk. - The values probably require some tweaking. - ''' # Some rough profiling with a 256.52MB system over Gb ethernet. @@ -69,79 +61,92 @@ class ChunkedTarfileAdapter(object): EXPECTED_BUFFER_SIZE = 100 * 1024 - def __init__(self): - # Maximum size of the queue is 100MB. The max size should be several - # items because creating e.g. gzip headers involves a 6 small - # writes. + def __init__(self, status_interval=0, status_callback=None): + # Stream headers can involve several small writes (6 for gzip headers, + # for example). Therefore the queue buffers up to 10 blocks here. self.queue = Queue.Queue(maxsize=10) + self.eof = False self.exception = None + self.start_time = None - self.bytes_written = 0 + self.bytes_sent = 0 + + self.status_interval = status_interval + self.status_callback = status_callback + + self.last_status_time = None def __iter__(self): - # KeyboardInterrupt is ignored inside the Queue.get() function, so - # we block using 'while True'. + '''Generator for reading the queued data chunks. + + This should be used from the main thread of the program. + + ''' while True: try: - chunk = self.queue.get(block=False)#, True, timeout=1) + data_chunk = self.queue.get(block=True, timeout=0.1) + yield data_chunk + self.bytes_sent += len(data_chunk) except Queue.Empty: - if self.eof: - logging.debug('Complete!') - break - logging.debug('Queue empty: waiting 1s') - time.sleep(1) + pass + + if self.queue.empty() and self.eof: + logging.debug('All data queued for transfer!') + break else: - logging.debug('Yield %i bytes', len(chunk)) - try: - yield chunk - except BaseException as e: - logging.debug('Got exception %s in generator', e) - self.error = e - raise - logging.debug('Next iteration') + self.maybe_show_status() - def abort(self, exception=None): - exception = exception or Exception('Missing exception message') - logging.debug('ABORT !!! %s', exception) - self.exception = exception + def write(self, data_chunk): + '''Write a data chunk, blocking when the chunk queue is full. - def close(self): - logging.debug('CLOSE !!!') - self.eof = True + This can be called from a thread. If abort() is called, the exception + will be passed on and raised to the thread that is calling write(). - def write(self, data_chunk): - logging.debug('Write; length %i', len(data_chunk)) - if self.start_time is None: - self.start_time = time.time() + ''' if len(data_chunk) == 0: return - # KeyboardInterrupt is ignored inside the Queue.get() function, so - # we block using 'while True'. + if self.start_time is None: + self.start_time = self.last_status_time = time.time() while True: + if self.exception is not None: + raise self.exception try: - self.queue.put(data_chunk, block=False) + self.queue.put(data_chunk, block=True, timeout=0.1) except Queue.Full: - if self.exception is not None: - logging.debug('Reader got exception %s', self.exception) - raise self.exception - logging.debug('Queue full, waiting 1s ...') - try: - time.sleep(1) - continue - except BaseException as e: - logging.debug('Received %s', e) - raise + pass else: - self.bytes_written += len(data_chunk) - logging.debug('Addeded %i bytes to queue' % len(data_chunk)) - return len(data_chunk) + return + + def abort(self, exception=None): + '''Mark the transfer as failed.''' + exception = exception or Exception('Unknown exception') + self.exception = exception + + def close(self): + '''Mark the transfer as successfully completed.''' + self.eof = True - def status(self): - duration = time.time() - self.start_time - megabytes_written = self.bytes_written / float(MEGABYTES) - return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % ( - megabytes_written, duration, megabytes_written / duration) + def maybe_show_status(self): + '''Show status if the status_interval has elapsed.''' + if self.status_interval > 0 and self.status_callback is not None: + now = time.time() + if self.last_status_time + self.status_interval < now: + self.last_status_time = now + self.show_status() + + def show_status(self): + '''Summarise the status of the transfer.''' + if self.status_callback is not None: + if self.start_time is None: + message = 'Starting transfer' + else: + duration = time.time() - self.start_time + megabytes = 1024 * 1024 + megabytes_written = self.bytes_sent / float(megabytes) + message = '%0.2fMB transferred (%0.2f MB/sec)' % ( + megabytes_written, megabytes_written / duration) + self.status_callback(message) class DockerWriteExtension(morphlib.writeexts.WriteExtension): @@ -196,13 +201,10 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): user, host, port, image_name = self.parse_location(location) - # FIXME: is the tunnel cleaned up? do we need a 'with' ? self.status(msg='Connecting to Docker service at %s:%s' % (host, port)) docker_client = self.create_docker_client_with_remote_ssh_tunnel( user, host, port) - print docker_client.info() - # FIXME: hack! The docker-py library should let us put in a fileobj and # have it handle buffering automatically ... I.E. this hack should be # sent upstream as an improvement, instead. Still, it's kind of cool @@ -212,18 +214,17 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): # https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb import_url = docker_client._url('/images/create') - tar_stream = ChunkedTarfileAdapter() + def display_transfer_status(message): + self.status(msg=message) - logging.debug('Create tar thread') + tar_stream = ChunkedTarfileAdapter( + status_interval=1, status_callback=display_transfer_status) tar_thread = threading.Thread( target=self.stream_system_as_tar, args=[temp_root, tar_stream]) - # make the thread respond to ctrl+c - #tar_thread.daemon = True - tar_thread.start() try: - docker_client.post( + response = docker_client.post( import_url, data=tar_stream, params={ @@ -235,17 +236,18 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): 'Transfer-Encoding': 'chunked', } ) + # At this point 'WARNING Connection pool is full, discarding + # connection: 127.0.0.1' shows up in the logs. I'm not sure why. It + # seems harmless so far. except BaseException as e: - logging.debug('Main Received %r', e) + logging.debug('Received %r while sending image', e) tar_stream.abort(e) raise - print "OK! Wow, that surely didn't actually work." - - ### - autostart = self.get_environment_boolean('AUTOSTART') + tar_stream.show_status() + logging.debug('Transfer complete! Response %s', response) + print response - self.status(msg=tar_stream.status()) self.status( msg='Docker image %(image_name)s has been created', image_name=image_name) @@ -292,26 +294,18 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): # unusable on ARM CPUs so it's important to force # compresslevel=1 or something low. try: - logging.debug('Writing system as a tar!') - #gzip_stream = gzip.GzipFile( - # mode='wb', - # compresslevel=5, - # fileobj=chunked_stream) tar_stream = tarfile.TarFile.open( name='docker.write-temp', mode='w|', bufsize=chunked_stream.EXPECTED_BUFFER_SIZE, fileobj=chunked_stream) - #fileobj=gzip_stream) logging.debug("Creating tar of rootfs") tar_stream.add(fs_root, recursive=True) tar_stream.close() - #gzip_stream.close() logging.debug('Tar complete') except BaseException as e: - logging.debug('tar thread: Received %r', e) - raise - finally: + logging.debug('Tar thread: Received %r', e) + else: chunked_stream.close() |