From f71ed9f3de7eeffe52dc0e34a521d9ba6aee899c Mon Sep 17 00:00:00 2001 From: Sam Thursfield Date: Mon, 7 Jul 2014 17:43:09 +0000 Subject: start to unwind vim undo history to see where it broke --- morphlib/exts/docker.write | 79 ++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write index 23d2c29f..342fddef 100644 --- a/morphlib/exts/docker.write +++ b/morphlib/exts/docker.write @@ -49,6 +49,8 @@ class ChunkedTarfileAdapter(object): to it so that the 'requests' module can iterate through it and send each block as a HTTP chunk. + The values probably require some tweaking. + ''' # Some rough profiling with a 256.52MB system over Gb ethernet. @@ -67,65 +69,75 @@ class ChunkedTarfileAdapter(object): EXPECTED_BUFFER_SIZE = 100 * 1024 - def __init__(self, status_interval=0, status_callback=None): - # Stream headers can involve several small writes (6 for gzip headers, - # for example). Therefore the queue buffers up to 10 blocks here. + def __init__(self): + # Maximum size of the queue is 100MB. The max size should be several + # items because creating e.g. gzip headers involves a 6 small + # writes. self.queue = Queue.Queue(maxsize=10) self.eof = False self.exception = None self.start_time = None self.bytes_written = 0 - self.status_interval = status_interval - self.status_callback = status_callback def __iter__(self): - '''Generator for reading the queued data chunks. - - This should be used from the main thread of the program. - - ''' + # KeyboardInterrupt is ignored inside the Queue.get() function, so + # we block using 'while True'. while True: - if self.queue.empty() and self.eof: - return try: - chunk = self.queue.get(block=True, timeout=0.1) + chunk = self.queue.get(block=False)#, True, timeout=1) except Queue.Empty: - continue + if self.eof: + logging.debug('Complete!') + break + logging.debug('Queue empty: waiting 1s') + time.sleep(1) else: - yield chunk + logging.debug('Yield %i bytes', len(chunk)) + try: + yield chunk + except BaseException as e: + logging.debug('Got exception %s in generator', e) + self.error = e + raise + logging.debug('Next iteration') def abort(self, exception=None): - '''Mark the transfer as failed.''' - exception = exception or Exception('Unknown exception') + exception = exception or Exception('Missing exception message') + logging.debug('ABORT !!! %s', exception) self.exception = exception def close(self): - '''Mark the transfer as successfully completed.''' + logging.debug('CLOSE !!!') self.eof = True def write(self, data_chunk): - '''Write a data chunk, blocking when the chunk queue is full. - - This can be called from a thread. If abort() is called, the exception - will be passed on and raised to the thread that is calling write(). - - ''' + logging.debug('Write; length %i', len(data_chunk)) if self.start_time is None: self.start_time = time.time() if len(data_chunk) == 0: return + # KeyboardInterrupt is ignored inside the Queue.get() function, so + # we block using 'while True'. while True: - if self.exception is not None: - raise self.exception try: - self.queue.put(data_chunk, block=True, timeout=0.1) + self.queue.put(data_chunk, block=False) except Queue.Full: - continue + if self.exception is not None: + logging.debug('Reader got exception %s', self.exception) + raise self.exception + logging.debug('Queue full, waiting 1s ...') + try: + time.sleep(1) + continue + except BaseException as e: + logging.debug('Received %s', e) + raise else: self.bytes_written += len(data_chunk) + logging.debug('Addeded %i bytes to queue' % len(data_chunk)) + return len(data_chunk) def status(self): - '''Summarise the status of the transfer.''' duration = time.time() - self.start_time megabytes_written = self.bytes_written / float(MEGABYTES) return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % ( @@ -200,11 +212,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): # https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb import_url = docker_client._url('/images/create') - def display_transfer_status(message): - status(message) - - tar_stream = ChunkedTarfileAdapter( - status_interval=1, status_callback=display_transfer_status) + tar_stream = ChunkedTarfileAdapter() logging.debug('Create tar thread') tar_thread = threading.Thread( @@ -228,7 +236,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): } ) except BaseException as e: - logging.debug('Received %r while sending image', e) + logging.debug('Main Received %r', e) tar_stream.abort(e) raise @@ -302,6 +310,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension): logging.debug('Tar complete') except BaseException as e: logging.debug('tar thread: Received %r', e) + raise finally: chunked_stream.close() -- cgit v1.2.1