diff options
author | Mitch Garnaat <mitch@garnaat.com> | 2012-03-28 07:13:10 -0700 |
---|---|---|
committer | Mitch Garnaat <mitch@garnaat.com> | 2012-03-28 07:13:10 -0700 |
commit | cde748106613918951212b90a6eec8cb72ca8a57 (patch) | |
tree | 9e900f7735d3f8c3b72b23d8bed32d95c5c4e524 | |
parent | 4f2c222d289c4c16cb21d39a79b5e93aa133a58c (diff) | |
download | boto-cde748106613918951212b90a6eec8cb72ca8a57.tar.gz |
Removing services which will be broken out as a separate package.
-rw-r--r-- | boto/services/__init__.py | 23 | ||||
-rwxr-xr-x | boto/services/bs.py | 179 | ||||
-rw-r--r-- | boto/services/message.py | 58 | ||||
-rw-r--r-- | boto/services/result.py | 136 | ||||
-rw-r--r-- | boto/services/service.py | 161 | ||||
-rw-r--r-- | boto/services/servicedef.py | 91 | ||||
-rw-r--r-- | boto/services/sonofmmm.cfg | 43 | ||||
-rw-r--r-- | boto/services/sonofmmm.py | 81 | ||||
-rw-r--r-- | boto/services/submit.py | 88 |
9 files changed, 0 insertions, 860 deletions
diff --git a/boto/services/__init__.py b/boto/services/__init__.py deleted file mode 100644 index 449bd162..00000000 --- a/boto/services/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -# - - diff --git a/boto/services/bs.py b/boto/services/bs.py deleted file mode 100755 index 3d700315..00000000 --- a/boto/services/bs.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -from optparse import OptionParser -from boto.services.servicedef import ServiceDef -from boto.services.submit import Submitter -from boto.services.result import ResultProcessor -import boto -import sys, os, StringIO - -class BS(object): - - Usage = "usage: %prog [options] config_file command" - - Commands = {'reset' : 'Clear input queue and output bucket', - 'submit' : 'Submit local files to the service', - 'start' : 'Start the service', - 'status' : 'Report on the status of the service buckets and queues', - 'retrieve' : 'Retrieve output generated by a batch', - 'batches' : 'List all batches stored in current output_domain'} - - def __init__(self): - self.service_name = None - self.parser = OptionParser(usage=self.Usage) - self.parser.add_option("--help-commands", action="store_true", dest="help_commands", - help="provides help on the available commands") - self.parser.add_option("-a", "--access-key", action="store", type="string", - help="your AWS Access Key") - self.parser.add_option("-s", "--secret-key", action="store", type="string", - help="your AWS Secret Access Key") - self.parser.add_option("-p", "--path", action="store", type="string", dest="path", - help="the path to local directory for submit and retrieve") - self.parser.add_option("-k", "--keypair", action="store", type="string", dest="keypair", - help="the SSH keypair used with launched instance(s)") - self.parser.add_option("-l", "--leave", action="store_true", dest="leave", - help="leave the files (don't retrieve) files during retrieve command") - self.parser.set_defaults(leave=False) - self.parser.add_option("-n", "--num-instances", action="store", type="string", dest="num_instances", - help="the number of launched instance(s)") - self.parser.set_defaults(num_instances=1) - self.parser.add_option("-i", "--ignore-dirs", action="append", type="string", dest="ignore", - help="directories that should be ignored by submit command") - self.parser.add_option("-b", "--batch-id", action="store", type="string", dest="batch", - help="batch identifier required by the retrieve command") - - def print_command_help(self): - print '\nCommands:' - for key in self.Commands.keys(): - print ' %s\t\t%s' % (key, self.Commands[key]) - - def do_reset(self): - iq = self.sd.get_obj('input_queue') - if iq: - print 'clearing out input queue' - i = 0 - m = iq.read() - while m: - i += 1 - iq.delete_message(m) - m = iq.read() - print 'deleted %d messages' % i - ob = self.sd.get_obj('output_bucket') - ib = self.sd.get_obj('input_bucket') - if ob: - if ib and ob.name == ib.name: - return - print 'delete generated files in output bucket' - i = 0 - for k in ob: - i += 1 - k.delete() - print 'deleted %d keys' % i - - def do_submit(self): - if not self.options.path: - self.parser.error('No path provided') - if not os.path.exists(self.options.path): - self.parser.error('Invalid path (%s)' % self.options.path) - s = Submitter(self.sd) - t = s.submit_path(self.options.path, None, self.options.ignore, None, - None, True, self.options.path) - print 'A total of %d files were submitted' % t[1] - print 'Batch Identifier: %s' % t[0] - - def do_start(self): - ami_id = self.sd.get('ami_id') - instance_type = self.sd.get('instance_type', 'm1.small') - security_group = self.sd.get('security_group', 'default') - if not ami_id: - self.parser.error('ami_id option is required when starting the service') - ec2 = boto.connect_ec2() - if not self.sd.has_section('Credentials'): - self.sd.add_section('Credentials') - self.sd.set('Credentials', 'aws_access_key_id', ec2.aws_access_key_id) - self.sd.set('Credentials', 'aws_secret_access_key', ec2.aws_secret_access_key) - s = StringIO.StringIO() - self.sd.write(s) - rs = ec2.get_all_images([ami_id]) - img = rs[0] - r = img.run(user_data=s.getvalue(), key_name=self.options.keypair, - max_count=self.options.num_instances, - instance_type=instance_type, - security_groups=[security_group]) - print 'Starting AMI: %s' % ami_id - print 'Reservation %s contains the following instances:' % r.id - for i in r.instances: - print '\t%s' % i.id - - def do_status(self): - iq = self.sd.get_obj('input_queue') - if iq: - print 'The input_queue (%s) contains approximately %s messages' % (iq.id, iq.count()) - ob = self.sd.get_obj('output_bucket') - ib = self.sd.get_obj('input_bucket') - if ob: - if ib and ob.name == ib.name: - return - total = 0 - for k in ob: - total += 1 - print 'The output_bucket (%s) contains %d keys' % (ob.name, total) - - def do_retrieve(self): - if not self.options.path: - self.parser.error('No path provided') - if not os.path.exists(self.options.path): - self.parser.error('Invalid path (%s)' % self.options.path) - if not self.options.batch: - self.parser.error('batch identifier is required for retrieve command') - s = ResultProcessor(self.options.batch, self.sd) - s.get_results(self.options.path, get_file=(not self.options.leave)) - - def do_batches(self): - d = self.sd.get_obj('output_domain') - if d: - print 'Available Batches:' - rs = d.query("['type'='Batch']") - for item in rs: - print ' %s' % item.name - else: - self.parser.error('No output_domain specified for service') - - def main(self): - self.options, self.args = self.parser.parse_args() - if self.options.help_commands: - self.print_command_help() - sys.exit(0) - if len(self.args) != 2: - self.parser.error("config_file and command are required") - self.config_file = self.args[0] - self.sd = ServiceDef(self.config_file) - self.command = self.args[1] - if hasattr(self, 'do_%s' % self.command): - method = getattr(self, 'do_%s' % self.command) - method() - else: - self.parser.error('command (%s) not recognized' % self.command) - -if __name__ == "__main__": - bs = BS() - bs.main() diff --git a/boto/services/message.py b/boto/services/message.py deleted file mode 100644 index 79f6d19f..00000000 --- a/boto/services/message.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -from boto.sqs.message import MHMessage -from boto.utils import get_ts -from socket import gethostname -import os, mimetypes, time - -class ServiceMessage(MHMessage): - - def for_key(self, key, params=None, bucket_name=None): - if params: - self.update(params) - if key.path: - t = os.path.split(key.path) - self['OriginalLocation'] = t[0] - self['OriginalFileName'] = t[1] - mime_type = mimetypes.guess_type(t[1])[0] - if mime_type == None: - mime_type = 'application/octet-stream' - self['Content-Type'] = mime_type - s = os.stat(key.path) - t = time.gmtime(s[7]) - self['FileAccessedDate'] = get_ts(t) - t = time.gmtime(s[8]) - self['FileModifiedDate'] = get_ts(t) - t = time.gmtime(s[9]) - self['FileCreateDate'] = get_ts(t) - else: - self['OriginalFileName'] = key.name - self['OriginalLocation'] = key.bucket.name - self['ContentType'] = key.content_type - self['Host'] = gethostname() - if bucket_name: - self['Bucket'] = bucket_name - else: - self['Bucket'] = key.bucket.name - self['InputKey'] = key.name - self['Size'] = key.size - diff --git a/boto/services/result.py b/boto/services/result.py deleted file mode 100644 index 98c67dcf..00000000 --- a/boto/services/result.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import os -from datetime import datetime, timedelta -from boto.utils import parse_ts -import boto - -class ResultProcessor: - - LogFileName = 'log.csv' - - def __init__(self, batch_name, sd, mimetype_files=None): - self.sd = sd - self.batch = batch_name - self.log_fp = None - self.num_files = 0 - self.total_time = 0 - self.min_time = timedelta.max - self.max_time = timedelta.min - self.earliest_time = datetime.max - self.latest_time = datetime.min - self.queue = self.sd.get_obj('output_queue') - self.domain = self.sd.get_obj('output_domain') - - def calculate_stats(self, msg): - start_time = parse_ts(msg['Service-Read']) - end_time = parse_ts(msg['Service-Write']) - elapsed_time = end_time - start_time - if elapsed_time > self.max_time: - self.max_time = elapsed_time - if elapsed_time < self.min_time: - self.min_time = elapsed_time - self.total_time += elapsed_time.seconds - if start_time < self.earliest_time: - self.earliest_time = start_time - if end_time > self.latest_time: - self.latest_time = end_time - - def log_message(self, msg, path): - keys = sorted(msg) - if not self.log_fp: - self.log_fp = open(os.path.join(path, self.LogFileName), 'a') - line = ','.join(keys) - self.log_fp.write(line+'\n') - values = [] - for key in keys: - value = msg[key] - if value.find(',') > 0: - value = '"%s"' % value - values.append(value) - line = ','.join(values) - self.log_fp.write(line+'\n') - - def process_record(self, record, path, get_file=True): - self.log_message(record, path) - self.calculate_stats(record) - outputs = record['OutputKey'].split(',') - if 'OutputBucket' in record: - bucket = boto.lookup('s3', record['OutputBucket']) - else: - bucket = boto.lookup('s3', record['Bucket']) - for output in outputs: - if get_file: - key_name = output.split(';')[0] - key = bucket.lookup(key_name) - file_name = os.path.join(path, key_name) - print 'retrieving file: %s to %s' % (key_name, file_name) - key.get_contents_to_filename(file_name) - self.num_files += 1 - - def get_results_from_queue(self, path, get_file=True, delete_msg=True): - m = self.queue.read() - while m: - if 'Batch' in m and m['Batch'] == self.batch: - self.process_record(m, path, get_file) - if delete_msg: - self.queue.delete_message(m) - m = self.queue.read() - - def get_results_from_domain(self, path, get_file=True): - rs = self.domain.query("['Batch'='%s']" % self.batch) - for item in rs: - self.process_record(item, path, get_file) - - def get_results_from_bucket(self, path): - bucket = self.sd.get_obj('output_bucket') - if bucket: - print 'No output queue or domain, just retrieving files from output_bucket' - for key in bucket: - file_name = os.path.join(path, key) - print 'retrieving file: %s to %s' % (key, file_name) - key.get_contents_to_filename(file_name) - self.num_files + 1 - - def get_results(self, path, get_file=True, delete_msg=True): - if not os.path.isdir(path): - os.mkdir(path) - if self.queue: - self.get_results_from_queue(path, get_file) - elif self.domain: - self.get_results_from_domain(path, get_file) - else: - self.get_results_from_bucket(path) - if self.log_fp: - self.log_fp.close() - print '%d results successfully retrieved.' % self.num_files - if self.num_files > 0: - self.avg_time = float(self.total_time)/self.num_files - print 'Minimum Processing Time: %d' % self.min_time.seconds - print 'Maximum Processing Time: %d' % self.max_time.seconds - print 'Average Processing Time: %f' % self.avg_time - self.elapsed_time = self.latest_time-self.earliest_time - print 'Elapsed Time: %d' % self.elapsed_time.seconds - tput = 1.0 / ((self.elapsed_time.seconds/60.0) / self.num_files) - print 'Throughput: %f transactions / minute' % tput - diff --git a/boto/services/service.py b/boto/services/service.py deleted file mode 100644 index e0e987ce..00000000 --- a/boto/services/service.py +++ /dev/null @@ -1,161 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import boto -from boto.services.message import ServiceMessage -from boto.services.servicedef import ServiceDef -from boto.pyami.scriptbase import ScriptBase -from boto.utils import get_ts -import time -import os -import mimetypes - - -class Service(ScriptBase): - - # Time required to process a transaction - ProcessingTime = 60 - - def __init__(self, config_file=None, mimetype_files=None): - ScriptBase.__init__(self, config_file) - self.name = self.__class__.__name__ - self.working_dir = boto.config.get('Pyami', 'working_dir') - self.sd = ServiceDef(config_file) - self.retry_count = self.sd.getint('retry_count', 5) - self.loop_delay = self.sd.getint('loop_delay', 30) - self.processing_time = self.sd.getint('processing_time', 60) - self.input_queue = self.sd.get_obj('input_queue') - self.output_queue = self.sd.get_obj('output_queue') - self.output_domain = self.sd.get_obj('output_domain') - if mimetype_files: - mimetypes.init(mimetype_files) - - def split_key(key): - if key.find(';') < 0: - t = (key, '') - else: - key, type = key.split(';') - label, mtype = type.split('=') - t = (key, mtype) - return t - - def read_message(self): - boto.log.info('read_message') - message = self.input_queue.read(self.processing_time) - if message: - boto.log.info(message.get_body()) - key = 'Service-Read' - message[key] = get_ts() - return message - - # retrieve the source file from S3 - def get_file(self, message): - bucket_name = message['Bucket'] - key_name = message['InputKey'] - file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) - boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) - bucket = boto.lookup('s3', bucket_name) - key = bucket.new_key(key_name) - key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) - return file_name - - # process source file, return list of output files - def process_file(self, in_file_name, msg): - return [] - - # store result file in S3 - def put_file(self, bucket_name, file_path, key_name=None): - boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) - bucket = boto.lookup('s3', bucket_name) - key = bucket.new_key(key_name) - key.set_contents_from_filename(file_path) - return key - - def save_results(self, results, input_message, output_message): - output_keys = [] - for file, type in results: - if 'OutputBucket' in input_message: - output_bucket = input_message['OutputBucket'] - else: - output_bucket = input_message['Bucket'] - key_name = os.path.split(file)[1] - key = self.put_file(output_bucket, file, key_name) - output_keys.append('%s;type=%s' % (key.name, type)) - output_message['OutputKey'] = ','.join(output_keys) - - # write message to each output queue - def write_message(self, message): - message['Service-Write'] = get_ts() - message['Server'] = self.name - if 'HOSTNAME' in os.environ: - message['Host'] = os.environ['HOSTNAME'] - else: - message['Host'] = 'unknown' - message['Instance-ID'] = self.instance_id - if self.output_queue: - boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) - self.output_queue.write(message) - if self.output_domain: - boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) - item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) - self.output_domain.put_attributes(item_name, message) - - # delete message from input queue - def delete_message(self, message): - boto.log.info('deleting message from %s' % self.input_queue.id) - self.input_queue.delete_message(message) - - # to clean up any files, etc. after each iteration - def cleanup(self): - pass - - def shutdown(self): - on_completion = self.sd.get('on_completion', 'shutdown') - if on_completion == 'shutdown': - if self.instance_id: - time.sleep(60) - c = boto.connect_ec2() - c.terminate_instances([self.instance_id]) - - def main(self, notify=False): - self.notify('Service: %s Starting' % self.name) - empty_reads = 0 - while self.retry_count < 0 or empty_reads < self.retry_count: - try: - input_message = self.read_message() - if input_message: - empty_reads = 0 - output_message = ServiceMessage(None, input_message.get_body()) - input_file = self.get_file(input_message) - results = self.process_file(input_file, output_message) - self.save_results(results, input_message, output_message) - self.write_message(output_message) - self.delete_message(input_message) - self.cleanup() - else: - empty_reads += 1 - time.sleep(self.loop_delay) - except Exception: - boto.log.exception('Service Failed') - empty_reads += 1 - self.notify('Service: %s Shutting Down' % self.name) - self.shutdown() - diff --git a/boto/services/servicedef.py b/boto/services/servicedef.py deleted file mode 100644 index 1cb01aa7..00000000 --- a/boto/services/servicedef.py +++ /dev/null @@ -1,91 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -from boto.pyami.config import Config -from boto.services.message import ServiceMessage -import boto - -class ServiceDef(Config): - - def __init__(self, config_file, aws_access_key_id=None, aws_secret_access_key=None): - Config.__init__(self, config_file) - self.aws_access_key_id = aws_access_key_id - self.aws_secret_access_key = aws_secret_access_key - script = Config.get(self, 'Pyami', 'scripts') - if script: - self.name = script.split('.')[-1] - else: - self.name = None - - - def get(self, name, default=None): - return Config.get(self, self.name, name, default) - - def has_option(self, option): - return Config.has_option(self, self.name, option) - - def getint(self, option, default=0): - try: - val = Config.get(self, self.name, option) - val = int(val) - except: - val = int(default) - return val - - def getbool(self, option, default=False): - try: - val = Config.get(self, self.name, option) - if val.lower() == 'true': - val = True - else: - val = False - except: - val = default - return val - - def get_obj(self, name): - """ - Returns the AWS object associated with a given option. - - The heuristics used are a bit lame. If the option name contains - the word 'bucket' it is assumed to be an S3 bucket, if the name - contains the word 'queue' it is assumed to be an SQS queue and - if it contains the word 'domain' it is assumed to be a SimpleDB - domain. If the option name specified does not exist in the - config file or if the AWS object cannot be retrieved this - returns None. - """ - val = self.get(name) - if not val: - return None - if name.find('queue') >= 0: - obj = boto.lookup('sqs', val) - if obj: - obj.set_message_class(ServiceMessage) - elif name.find('bucket') >= 0: - obj = boto.lookup('s3', val) - elif name.find('domain') >= 0: - obj = boto.lookup('sdb', val) - else: - obj = None - return obj - - diff --git a/boto/services/sonofmmm.cfg b/boto/services/sonofmmm.cfg deleted file mode 100644 index d70d3794..00000000 --- a/boto/services/sonofmmm.cfg +++ /dev/null @@ -1,43 +0,0 @@ -# -# Your AWS Credentials -# You only need to supply these in this file if you are not using -# the boto tools to start your service -# -#[Credentials] -#aws_access_key_id = <AWS Access Key Here> -#aws_secret_access_key = <AWS Secret Key Here> - -# -# Fill out this section if you want emails from the service -# when it starts and stops -# -#[Notification] -#smtp_host = <your smtp host> -#smtp_user = <your smtp username, if necessary> -#smtp_pass = <your smtp password, if necessary> -#smtp_from = <email address for From: field> -#smtp_to = <email address for To: field> - -[Pyami] -scripts = boto.services.sonofmmm.SonOfMMM - -[SonOfMMM] -# id of the AMI to be launched -ami_id = ami-dc799cb5 -# number of times service will read an empty queue before exiting -# a negative value will cause the service to run forever -retry_count = 5 -# seconds to wait after empty queue read before reading again -loop_delay = 10 -# average time it takes to process a transaction -# controls invisibility timeout of messages -processing_time = 60 -ffmpeg_args = -y -i %%s -f mov -r 29.97 -b 1200kb -mbd 2 -flags +4mv+trell -aic 2 -cmp 2 -subcmp 2 -ar 48000 -ab 19200 -s 320x240 -vcodec mpeg4 -acodec libfaac %%s -output_mimetype = video/quicktime -output_ext = .mov -input_bucket = <S3 bucket where source videos live> -output_bucket = <S3 bucket where converted videos should be stored> -output_domain = <SimpleDB domain to store results - optional> -output_queue = <SQS queue to store results - optional> -input_queue = <SQS queue where work to be done will be queued up> - diff --git a/boto/services/sonofmmm.py b/boto/services/sonofmmm.py deleted file mode 100644 index acb7e610..00000000 --- a/boto/services/sonofmmm.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import boto -from boto.services.service import Service -from boto.services.message import ServiceMessage -import os -import mimetypes - -class SonOfMMM(Service): - - def __init__(self, config_file=None): - Service.__init__(self, config_file) - self.log_file = '%s.log' % self.instance_id - self.log_path = os.path.join(self.working_dir, self.log_file) - boto.set_file_logger(self.name, self.log_path) - if self.sd.has_option('ffmpeg_args'): - self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args') - else: - self.command = '/usr/local/bin/ffmpeg -y -i %s %s' - self.output_mimetype = self.sd.get('output_mimetype') - if self.sd.has_option('output_ext'): - self.output_ext = self.sd.get('output_ext') - else: - self.output_ext = mimetypes.guess_extension(self.output_mimetype) - self.output_bucket = self.sd.get_obj('output_bucket') - self.input_bucket = self.sd.get_obj('input_bucket') - # check to see if there are any messages queue - # if not, create messages for all files in input_bucket - m = self.input_queue.read(1) - if not m: - self.queue_files() - - def queue_files(self): - boto.log.info('Queueing files from %s' % self.input_bucket.name) - for key in self.input_bucket: - boto.log.info('Queueing %s' % key.name) - m = ServiceMessage() - if self.output_bucket: - d = {'OutputBucket' : self.output_bucket.name} - else: - d = None - m.for_key(key, d) - self.input_queue.write(m) - - def process_file(self, in_file_name, msg): - base, ext = os.path.splitext(in_file_name) - out_file_name = os.path.join(self.working_dir, - base+self.output_ext) - command = self.command % (in_file_name, out_file_name) - boto.log.info('running:\n%s' % command) - status = self.run(command) - if status == 0: - return [(out_file_name, self.output_mimetype)] - else: - return [] - - def shutdown(self): - if os.path.isfile(self.log_path): - if self.output_bucket: - key = self.output_bucket.new_key(self.log_file) - key.set_contents_from_filename(self.log_path) - Service.shutdown(self) diff --git a/boto/services/submit.py b/boto/services/submit.py deleted file mode 100644 index 89c439c5..00000000 --- a/boto/services/submit.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import time -import os - - -class Submitter: - - def __init__(self, sd): - self.sd = sd - self.input_bucket = self.sd.get_obj('input_bucket') - self.output_bucket = self.sd.get_obj('output_bucket') - self.output_domain = self.sd.get_obj('output_domain') - self.queue = self.sd.get_obj('input_queue') - - def get_key_name(self, fullpath, prefix): - key_name = fullpath[len(prefix):] - l = key_name.split(os.sep) - return '/'.join(l) - - def write_message(self, key, metadata): - if self.queue: - m = self.queue.new_message() - m.for_key(key, metadata) - if self.output_bucket: - m['OutputBucket'] = self.output_bucket.name - self.queue.write(m) - - def submit_file(self, path, metadata=None, cb=None, num_cb=0, prefix='/'): - if not metadata: - metadata = {} - key_name = self.get_key_name(path, prefix) - k = self.input_bucket.new_key(key_name) - k.update_metadata(metadata) - k.set_contents_from_filename(path, replace=False, cb=cb, num_cb=num_cb) - self.write_message(k, metadata) - - def submit_path(self, path, tags=None, ignore_dirs=None, cb=None, num_cb=0, status=False, prefix='/'): - path = os.path.expanduser(path) - path = os.path.expandvars(path) - path = os.path.abspath(path) - total = 0 - metadata = {} - if tags: - metadata['Tags'] = tags - l = [] - for t in time.gmtime(): - l.append(str(t)) - metadata['Batch'] = '_'.join(l) - if self.output_domain: - self.output_domain.put_attributes(metadata['Batch'], {'type' : 'Batch'}) - if os.path.isdir(path): - for root, dirs, files in os.walk(path): - if ignore_dirs: - for ignore in ignore_dirs: - if ignore in dirs: - dirs.remove(ignore) - for file in files: - fullpath = os.path.join(root, file) - if status: - print 'Submitting %s' % fullpath - self.submit_file(fullpath, metadata, cb, num_cb, prefix) - total += 1 - elif os.path.isfile(path): - self.submit_file(path, metadata, cb, num_cb) - total += 1 - else: - print 'problem with %s' % path - return (metadata['Batch'], total) |