summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2014-07-07 17:43:09 +0000
committerSam Thursfield <sam@afuera.me.uk>2014-09-05 13:00:21 +0000
commitf71ed9f3de7eeffe52dc0e34a521d9ba6aee899c (patch)
tree82af0c08d6b053aaabf30b216cfc473d4234ac7d
parent38b62ca7591c0919988f173c954ca2cfd3447d19 (diff)
downloadmorph-f71ed9f3de7eeffe52dc0e34a521d9ba6aee899c.tar.gz
start to unwind vim undo history to see where it broke
-rw-r--r--morphlib/exts/docker.write79
1 files changed, 44 insertions, 35 deletions
diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write
index 23d2c29f..342fddef 100644
--- a/morphlib/exts/docker.write
+++ b/morphlib/exts/docker.write
@@ -49,6 +49,8 @@ class ChunkedTarfileAdapter(object):
to it so that the 'requests' module can iterate through it and send each
block as a HTTP chunk.
+ The values probably require some tweaking.
+
'''
# Some rough profiling with a 256.52MB system over Gb ethernet.
@@ -67,65 +69,75 @@ class ChunkedTarfileAdapter(object):
EXPECTED_BUFFER_SIZE = 100 * 1024
- def __init__(self, status_interval=0, status_callback=None):
- # Stream headers can involve several small writes (6 for gzip headers,
- # for example). Therefore the queue buffers up to 10 blocks here.
+ def __init__(self):
+ # Maximum size of the queue is 100MB. The max size should be several
+ # items because creating e.g. gzip headers involves a 6 small
+ # writes.
self.queue = Queue.Queue(maxsize=10)
self.eof = False
self.exception = None
self.start_time = None
self.bytes_written = 0
- self.status_interval = status_interval
- self.status_callback = status_callback
def __iter__(self):
- '''Generator for reading the queued data chunks.
-
- This should be used from the main thread of the program.
-
- '''
+ # KeyboardInterrupt is ignored inside the Queue.get() function, so
+ # we block using 'while True'.
while True:
- if self.queue.empty() and self.eof:
- return
try:
- chunk = self.queue.get(block=True, timeout=0.1)
+ chunk = self.queue.get(block=False)#, True, timeout=1)
except Queue.Empty:
- continue
+ if self.eof:
+ logging.debug('Complete!')
+ break
+ logging.debug('Queue empty: waiting 1s')
+ time.sleep(1)
else:
- yield chunk
+ logging.debug('Yield %i bytes', len(chunk))
+ try:
+ yield chunk
+ except BaseException as e:
+ logging.debug('Got exception %s in generator', e)
+ self.error = e
+ raise
+ logging.debug('Next iteration')
def abort(self, exception=None):
- '''Mark the transfer as failed.'''
- exception = exception or Exception('Unknown exception')
+ exception = exception or Exception('Missing exception message')
+ logging.debug('ABORT !!! %s', exception)
self.exception = exception
def close(self):
- '''Mark the transfer as successfully completed.'''
+ logging.debug('CLOSE !!!')
self.eof = True
def write(self, data_chunk):
- '''Write a data chunk, blocking when the chunk queue is full.
-
- This can be called from a thread. If abort() is called, the exception
- will be passed on and raised to the thread that is calling write().
-
- '''
+ logging.debug('Write; length %i', len(data_chunk))
if self.start_time is None:
self.start_time = time.time()
if len(data_chunk) == 0:
return
+ # KeyboardInterrupt is ignored inside the Queue.get() function, so
+ # we block using 'while True'.
while True:
- if self.exception is not None:
- raise self.exception
try:
- self.queue.put(data_chunk, block=True, timeout=0.1)
+ self.queue.put(data_chunk, block=False)
except Queue.Full:
- continue
+ if self.exception is not None:
+ logging.debug('Reader got exception %s', self.exception)
+ raise self.exception
+ logging.debug('Queue full, waiting 1s ...')
+ try:
+ time.sleep(1)
+ continue
+ except BaseException as e:
+ logging.debug('Received %s', e)
+ raise
else:
self.bytes_written += len(data_chunk)
+ logging.debug('Addeded %i bytes to queue' % len(data_chunk))
+ return len(data_chunk)
def status(self):
- '''Summarise the status of the transfer.'''
duration = time.time() - self.start_time
megabytes_written = self.bytes_written / float(MEGABYTES)
return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % (
@@ -200,11 +212,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
# https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb
import_url = docker_client._url('/images/create')
- def display_transfer_status(message):
- status(message)
-
- tar_stream = ChunkedTarfileAdapter(
- status_interval=1, status_callback=display_transfer_status)
+ tar_stream = ChunkedTarfileAdapter()
logging.debug('Create tar thread')
tar_thread = threading.Thread(
@@ -228,7 +236,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
}
)
except BaseException as e:
- logging.debug('Received %r while sending image', e)
+ logging.debug('Main Received %r', e)
tar_stream.abort(e)
raise
@@ -302,6 +310,7 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
logging.debug('Tar complete')
except BaseException as e:
logging.debug('tar thread: Received %r', e)
+ raise
finally:
chunked_stream.close()