summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2014-07-07 18:17:53 +0000
committerSam Thursfield <sam@afuera.me.uk>2014-09-05 13:00:21 +0000
commit153bab2a8b10dee3d4f961348594e7b24ffea6a8 (patch)
treebbfb68cac81dedbab964184a84618ed44a5f0a81
parentf71ed9f3de7eeffe52dc0e34a521d9ba6aee899c (diff)
downloadmorph-153bab2a8b10dee3d4f961348594e7b24ffea6a8.tar.gz
It still works
-rw-r--r--morphlib/exts/docker.write170
1 files changed, 82 insertions, 88 deletions
diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write
index 342fddef..e86f2d17 100644
--- a/morphlib/exts/docker.write
+++ b/morphlib/exts/docker.write
@@ -17,22 +17,16 @@
'''A Morph deployment write extension for deploying to Docker hosts'''
-# bgtunnel: From https://github.com/jmagnusson/bgtunnel
-# (using Paramiko would be better, but it's not in Baserock yet. Its
-# demos/forward.py demonstrates what we need).
-import bgtunnel
# From https://github.com/dotcloud/docker-py
import docker
+#import paramiko
+
import cliapp
-import contextlib
-import gzip
import logging
-import os
import Queue
-import sys
import tarfile
import threading
import time
@@ -49,8 +43,6 @@ class ChunkedTarfileAdapter(object):
to it so that the 'requests' module can iterate through it and send each
block as a HTTP chunk.
- The values probably require some tweaking.
-
'''
# Some rough profiling with a 256.52MB system over Gb ethernet.
@@ -69,79 +61,92 @@ class ChunkedTarfileAdapter(object):
EXPECTED_BUFFER_SIZE = 100 * 1024
- def __init__(self):
- # Maximum size of the queue is 100MB. The max size should be several
- # items because creating e.g. gzip headers involves a 6 small
- # writes.
+ def __init__(self, status_interval=0, status_callback=None):
+ # Stream headers can involve several small writes (6 for gzip headers,
+ # for example). Therefore the queue buffers up to 10 blocks here.
self.queue = Queue.Queue(maxsize=10)
+
self.eof = False
self.exception = None
+
self.start_time = None
- self.bytes_written = 0
+ self.bytes_sent = 0
+
+ self.status_interval = status_interval
+ self.status_callback = status_callback
+
+ self.last_status_time = None
def __iter__(self):
- # KeyboardInterrupt is ignored inside the Queue.get() function, so
- # we block using 'while True'.
+ '''Generator for reading the queued data chunks.
+
+ This should be used from the main thread of the program.
+
+ '''
while True:
try:
- chunk = self.queue.get(block=False)#, True, timeout=1)
+ data_chunk = self.queue.get(block=True, timeout=0.1)
+ yield data_chunk
+ self.bytes_sent += len(data_chunk)
except Queue.Empty:
- if self.eof:
- logging.debug('Complete!')
- break
- logging.debug('Queue empty: waiting 1s')
- time.sleep(1)
+ pass
+
+ if self.queue.empty() and self.eof:
+ logging.debug('All data queued for transfer!')
+ break
else:
- logging.debug('Yield %i bytes', len(chunk))
- try:
- yield chunk
- except BaseException as e:
- logging.debug('Got exception %s in generator', e)
- self.error = e
- raise
- logging.debug('Next iteration')
+ self.maybe_show_status()
- def abort(self, exception=None):
- exception = exception or Exception('Missing exception message')
- logging.debug('ABORT !!! %s', exception)
- self.exception = exception
+ def write(self, data_chunk):
+ '''Write a data chunk, blocking when the chunk queue is full.
- def close(self):
- logging.debug('CLOSE !!!')
- self.eof = True
+ This can be called from a thread. If abort() is called, the exception
+ will be passed on and raised to the thread that is calling write().
- def write(self, data_chunk):
- logging.debug('Write; length %i', len(data_chunk))
- if self.start_time is None:
- self.start_time = time.time()
+ '''
if len(data_chunk) == 0:
return
- # KeyboardInterrupt is ignored inside the Queue.get() function, so
- # we block using 'while True'.
+ if self.start_time is None:
+ self.start_time = self.last_status_time = time.time()
while True:
+ if self.exception is not None:
+ raise self.exception
try:
- self.queue.put(data_chunk, block=False)
+ self.queue.put(data_chunk, block=True, timeout=0.1)
except Queue.Full:
- if self.exception is not None:
- logging.debug('Reader got exception %s', self.exception)
- raise self.exception
- logging.debug('Queue full, waiting 1s ...')
- try:
- time.sleep(1)
- continue
- except BaseException as e:
- logging.debug('Received %s', e)
- raise
+ pass
else:
- self.bytes_written += len(data_chunk)
- logging.debug('Addeded %i bytes to queue' % len(data_chunk))
- return len(data_chunk)
+ return
+
+ def abort(self, exception=None):
+ '''Mark the transfer as failed.'''
+ exception = exception or Exception('Unknown exception')
+ self.exception = exception
+
+ def close(self):
+ '''Mark the transfer as successfully completed.'''
+ self.eof = True
- def status(self):
- duration = time.time() - self.start_time
- megabytes_written = self.bytes_written / float(MEGABYTES)
- return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % (
- megabytes_written, duration, megabytes_written / duration)
+ def maybe_show_status(self):
+ '''Show status if the status_interval has elapsed.'''
+ if self.status_interval > 0 and self.status_callback is not None:
+ now = time.time()
+ if self.last_status_time + self.status_interval < now:
+ self.last_status_time = now
+ self.show_status()
+
+ def show_status(self):
+ '''Summarise the status of the transfer.'''
+ if self.status_callback is not None:
+ if self.start_time is None:
+ message = 'Starting transfer'
+ else:
+ duration = time.time() - self.start_time
+ megabytes = 1024 * 1024
+ megabytes_written = self.bytes_sent / float(megabytes)
+ message = '%0.2fMB transferred (%0.2f MB/sec)' % (
+ megabytes_written, megabytes_written / duration)
+ self.status_callback(message)
class DockerWriteExtension(morphlib.writeexts.WriteExtension):
@@ -196,13 +201,10 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
user, host, port, image_name = self.parse_location(location)
- # FIXME: is the tunnel cleaned up? do we need a 'with' ?
self.status(msg='Connecting to Docker service at %s:%s' % (host, port))
docker_client = self.create_docker_client_with_remote_ssh_tunnel(
user, host, port)
- print docker_client.info()
-
# FIXME: hack! The docker-py library should let us put in a fileobj and
# have it handle buffering automatically ... I.E. this hack should be
# sent upstream as an improvement, instead. Still, it's kind of cool
@@ -212,18 +214,17 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
# https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb
import_url = docker_client._url('/images/create')
- tar_stream = ChunkedTarfileAdapter()
+ def display_transfer_status(message):
+ self.status(msg=message)
- logging.debug('Create tar thread')
+ tar_stream = ChunkedTarfileAdapter(
+ status_interval=1, status_callback=display_transfer_status)
tar_thread = threading.Thread(
target=self.stream_system_as_tar, args=[temp_root, tar_stream])
- # make the thread respond to ctrl+c
- #tar_thread.daemon = True
-
tar_thread.start()
try:
- docker_client.post(
+ response = docker_client.post(
import_url,
data=tar_stream,
params={
@@ -235,17 +236,18 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
'Transfer-Encoding': 'chunked',
}
)
+ # At this point 'WARNING Connection pool is full, discarding
+ # connection: 127.0.0.1' shows up in the logs. I'm not sure why. It
+ # seems harmless so far.
except BaseException as e:
- logging.debug('Main Received %r', e)
+ logging.debug('Received %r while sending image', e)
tar_stream.abort(e)
raise
- print "OK! Wow, that surely didn't actually work."
-
- ###
- autostart = self.get_environment_boolean('AUTOSTART')
+ tar_stream.show_status()
+ logging.debug('Transfer complete! Response %s', response)
+ print response
- self.status(msg=tar_stream.status())
self.status(
msg='Docker image %(image_name)s has been created',
image_name=image_name)
@@ -292,26 +294,18 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
# unusable on ARM CPUs so it's important to force
# compresslevel=1 or something low.
try:
- logging.debug('Writing system as a tar!')
- #gzip_stream = gzip.GzipFile(
- # mode='wb',
- # compresslevel=5,
- # fileobj=chunked_stream)
tar_stream = tarfile.TarFile.open(
name='docker.write-temp',
mode='w|',
bufsize=chunked_stream.EXPECTED_BUFFER_SIZE,
fileobj=chunked_stream)
- #fileobj=gzip_stream)
logging.debug("Creating tar of rootfs")
tar_stream.add(fs_root, recursive=True)
tar_stream.close()
- #gzip_stream.close()
logging.debug('Tar complete')
except BaseException as e:
- logging.debug('tar thread: Received %r', e)
- raise
- finally:
+ logging.debug('Tar thread: Received %r', e)
+ else:
chunked_stream.close()