summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2014-07-07 17:39:20 +0000
committerSam Thursfield <sam@afuera.me.uk>2014-09-05 13:00:20 +0000
commit38b62ca7591c0919988f173c954ca2cfd3447d19 (patch)
treeddae1012ce58662ee02105d87f0a313a933340fd
parent44ce57f9aab1e20a74cb35e0efa11ba8a7d40d36 (diff)
downloadmorph-38b62ca7591c0919988f173c954ca2cfd3447d19.tar.gz
It works
-rw-r--r--morphlib/exts/docker.write202
1 files changed, 144 insertions, 58 deletions
diff --git a/morphlib/exts/docker.write b/morphlib/exts/docker.write
index 0e933cc3..23d2c29f 100644
--- a/morphlib/exts/docker.write
+++ b/morphlib/exts/docker.write
@@ -31,13 +31,107 @@ import contextlib
import gzip
import logging
import os
+import Queue
import sys
import tarfile
+import threading
+import time
import urlparse
import morphlib.writeexts
+class ChunkedTarfileAdapter(object):
+ '''File-like object which allows batched writes.
+
+ We need to send an entire system through a HTTP POST request. This might
+ be big, so it must be streamed in chunks. This object buffers data written
+ to it so that the 'requests' module can iterate through it and send each
+ block as a HTTP chunk.
+
+ '''
+
+ # Some rough profiling with a 256.52MB system over Gb ethernet.
+ # Morph machine was x86_32 VM with reasonable CPU and RAM.
+ #
+ # no compression bufsize=100KB 256.52MB in 9.04 seconds (28.39 MB/sec)
+ # no compression bufsize=1MB 256.52MB in 11.45 seconds (22.4 MB/sec)
+ # gzip -1 bufsize=100KB 117.99MB in 19.34 seconds (6.10 MB/sec)
+ # no compression bufsize=10MB 256.52MB in 65.57 seconds (3.91 MB/sec)
+ # gzip -1 bufsize=10MB 124.39MB in 77 sec (1.61 MB/sec)
+ # gzip -5 bufsize=10MB 117.99MB in 84.27 seconds (1.40 MB/sec)
+ # no compression bufsize=100MB took pretty much forever
+ #
+ # Ideally the buffer size would adapt to the available IO speed & free
+ # memory. For now 100KB is OK.
+
+ EXPECTED_BUFFER_SIZE = 100 * 1024
+
+ def __init__(self, status_interval=0, status_callback=None):
+ # Stream headers can involve several small writes (6 for gzip headers,
+ # for example). Therefore the queue buffers up to 10 blocks here.
+ self.queue = Queue.Queue(maxsize=10)
+ self.eof = False
+ self.exception = None
+ self.start_time = None
+ self.bytes_written = 0
+ self.status_interval = status_interval
+ self.status_callback = status_callback
+
+ def __iter__(self):
+ '''Generator for reading the queued data chunks.
+
+ This should be used from the main thread of the program.
+
+ '''
+ while True:
+ if self.queue.empty() and self.eof:
+ return
+ try:
+ chunk = self.queue.get(block=True, timeout=0.1)
+ except Queue.Empty:
+ continue
+ else:
+ yield chunk
+
+ def abort(self, exception=None):
+ '''Mark the transfer as failed.'''
+ exception = exception or Exception('Unknown exception')
+ self.exception = exception
+
+ def close(self):
+ '''Mark the transfer as successfully completed.'''
+ self.eof = True
+
+ def write(self, data_chunk):
+ '''Write a data chunk, blocking when the chunk queue is full.
+
+ This can be called from a thread. If abort() is called, the exception
+ will be passed on and raised to the thread that is calling write().
+
+ '''
+ if self.start_time is None:
+ self.start_time = time.time()
+ if len(data_chunk) == 0:
+ return
+ while True:
+ if self.exception is not None:
+ raise self.exception
+ try:
+ self.queue.put(data_chunk, block=True, timeout=0.1)
+ except Queue.Full:
+ continue
+ else:
+ self.bytes_written += len(data_chunk)
+
+ def status(self):
+ '''Summarise the status of the transfer.'''
+ duration = time.time() - self.start_time
+ megabytes_written = self.bytes_written / float(MEGABYTES)
+ return 'Wrote %0.2fMB in %0.2f seconds (%0.2f MB/sec)' % (
+ megabytes_written, duration, megabytes_written / duration)
+
+
class DockerWriteExtension(morphlib.writeexts.WriteExtension):
'''Create a Docker image or container from a Morph deployment.
@@ -95,10 +189,6 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
docker_client = self.create_docker_client_with_remote_ssh_tunnel(
user, host, port)
- tar_read_fd, tar_write_fd = os.pipe()
-
- tar_read_fileobj = os.fdopen(tar_read_fd, 'r')
-
print docker_client.info()
# FIXME: hack! The docker-py library should let us put in a fileobj and
@@ -110,54 +200,44 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
# https://github.com/swipely/docker-api/blob/master/lib/docker/image.rb
import_url = docker_client._url('/images/create')
- logging.debug('Open tar write FD')
- tar_write_fileobj = os.fdopen(tar_write_fd, 'w')
+ def display_transfer_status(message):
+ status(message)
+
+ tar_stream = ChunkedTarfileAdapter(
+ status_interval=1, status_callback=display_transfer_status)
logging.debug('Create tar thread')
- tar_bytes = 0
- import threading
tar_thread = threading.Thread(
- target=self.write_system_as_tar, args=[temp_root, tar_write_fileobj])
+ target=self.stream_system_as_tar, args=[temp_root, tar_stream])
+ # make the thread respond to ctrl+c
+ #tar_thread.daemon = True
+
tar_thread.start()
- print tar_thread
- print tar_thread.is_alive()
-
- import select
- def batch_fileobj(fileobj, batch_size):
- '''Split an fileobj up into batches of 'batch_size' items.'''
- i = 0
- # This is hard, we need to signal end ...
- while True:
- data = fileobj.read(batch_size)
- yield data
- print "End of fileobj"
- yield []
- print "Yielded None, called again ..."
-
- #logging.debug('Prepare request...')
- #import_request_prepped = docker_client.prepare_request(import_request)
- logging.debug('Send request...')
- # FOR SOME REASON THIS SEEMS NEVER TO EXIT!
-
- #docker_client.send(import_request_prepped)
- docker_client.post(
- import_url,
- data=batch_fileobj(tar_read_fileobj, 10240),
- params={
- 'fromSrc': '-',
- 'repo': image_name
- },
- headers = {
- 'Content-Type': 'application/tar',
- 'Transfer-Encoding': 'chunked',
- }
- )
+
+ try:
+ docker_client.post(
+ import_url,
+ data=tar_stream,
+ params={
+ 'fromSrc': '-',
+ 'repo': image_name
+ },
+ headers = {
+ 'Content-Type': 'application/tar',
+ 'Transfer-Encoding': 'chunked',
+ }
+ )
+ except BaseException as e:
+ logging.debug('Received %r while sending image', e)
+ tar_stream.abort(e)
+ raise
print "OK! Wow, that surely didn't actually work."
###
autostart = self.get_environment_boolean('AUTOSTART')
+ self.status(msg=tar_stream.status())
self.status(
msg='Docker image %(image_name)s has been created',
image_name=image_name)
@@ -198,26 +278,32 @@ class DockerWriteExtension(morphlib.writeexts.WriteExtension):
return docker_client
- def write_system_as_tar(self, fs_root, fileobj):
+ def stream_system_as_tar(self, fs_root, chunked_stream):
# Using tarfile.TarFile.gzopen() and passing compresslevel=1
# seems to result in compresslevel=9 anyway. That's completely
# unusable on ARM CPUs so it's important to force
# compresslevel=1 or something low.
- logging.debug('Writing system as a tar!')
- #gzip_stream = gzip.GzipFile(
- # mode='wb',
- # compresslevel=1,
- # fileobj=fileobj)
- tar_stream = tarfile.TarFile.gzopen(
- name='docker.write-temp',
- mode='w',
- compresslevel=1,
- fileobj=fileobj)#gzip_stream)
- logging.debug("Creating tar of rootfs")
- tar_stream.add(fs_root, recursive=True)
- tar_stream.close()
- logging.debug('Tar complete')
- tar_finished = True
+ try:
+ logging.debug('Writing system as a tar!')
+ #gzip_stream = gzip.GzipFile(
+ # mode='wb',
+ # compresslevel=5,
+ # fileobj=chunked_stream)
+ tar_stream = tarfile.TarFile.open(
+ name='docker.write-temp',
+ mode='w|',
+ bufsize=chunked_stream.EXPECTED_BUFFER_SIZE,
+ fileobj=chunked_stream)
+ #fileobj=gzip_stream)
+ logging.debug("Creating tar of rootfs")
+ tar_stream.add(fs_root, recursive=True)
+ tar_stream.close()
+ #gzip_stream.close()
+ logging.debug('Tar complete')
+ except BaseException as e:
+ logging.debug('tar thread: Received %r', e)
+ finally:
+ chunked_stream.close()
DockerWriteExtension().run()