summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMitch Garnaat <mitch@garnaat.com>2012-03-28 07:13:10 -0700
committerMitch Garnaat <mitch@garnaat.com>2012-03-28 07:13:10 -0700
commitcde748106613918951212b90a6eec8cb72ca8a57 (patch)
tree9e900f7735d3f8c3b72b23d8bed32d95c5c4e524
parent4f2c222d289c4c16cb21d39a79b5e93aa133a58c (diff)
downloadboto-cde748106613918951212b90a6eec8cb72ca8a57.tar.gz
Removing services which will be broken out as a separate package.
-rw-r--r--boto/services/__init__.py23
-rwxr-xr-xboto/services/bs.py179
-rw-r--r--boto/services/message.py58
-rw-r--r--boto/services/result.py136
-rw-r--r--boto/services/service.py161
-rw-r--r--boto/services/servicedef.py91
-rw-r--r--boto/services/sonofmmm.cfg43
-rw-r--r--boto/services/sonofmmm.py81
-rw-r--r--boto/services/submit.py88
9 files changed, 0 insertions, 860 deletions
diff --git a/boto/services/__init__.py b/boto/services/__init__.py
deleted file mode 100644
index 449bd162..00000000
--- a/boto/services/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-#
-
-
diff --git a/boto/services/bs.py b/boto/services/bs.py
deleted file mode 100755
index 3d700315..00000000
--- a/boto/services/bs.py
+++ /dev/null
@@ -1,179 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-from optparse import OptionParser
-from boto.services.servicedef import ServiceDef
-from boto.services.submit import Submitter
-from boto.services.result import ResultProcessor
-import boto
-import sys, os, StringIO
-
-class BS(object):
-
- Usage = "usage: %prog [options] config_file command"
-
- Commands = {'reset' : 'Clear input queue and output bucket',
- 'submit' : 'Submit local files to the service',
- 'start' : 'Start the service',
- 'status' : 'Report on the status of the service buckets and queues',
- 'retrieve' : 'Retrieve output generated by a batch',
- 'batches' : 'List all batches stored in current output_domain'}
-
- def __init__(self):
- self.service_name = None
- self.parser = OptionParser(usage=self.Usage)
- self.parser.add_option("--help-commands", action="store_true", dest="help_commands",
- help="provides help on the available commands")
- self.parser.add_option("-a", "--access-key", action="store", type="string",
- help="your AWS Access Key")
- self.parser.add_option("-s", "--secret-key", action="store", type="string",
- help="your AWS Secret Access Key")
- self.parser.add_option("-p", "--path", action="store", type="string", dest="path",
- help="the path to local directory for submit and retrieve")
- self.parser.add_option("-k", "--keypair", action="store", type="string", dest="keypair",
- help="the SSH keypair used with launched instance(s)")
- self.parser.add_option("-l", "--leave", action="store_true", dest="leave",
- help="leave the files (don't retrieve) files during retrieve command")
- self.parser.set_defaults(leave=False)
- self.parser.add_option("-n", "--num-instances", action="store", type="string", dest="num_instances",
- help="the number of launched instance(s)")
- self.parser.set_defaults(num_instances=1)
- self.parser.add_option("-i", "--ignore-dirs", action="append", type="string", dest="ignore",
- help="directories that should be ignored by submit command")
- self.parser.add_option("-b", "--batch-id", action="store", type="string", dest="batch",
- help="batch identifier required by the retrieve command")
-
- def print_command_help(self):
- print '\nCommands:'
- for key in self.Commands.keys():
- print ' %s\t\t%s' % (key, self.Commands[key])
-
- def do_reset(self):
- iq = self.sd.get_obj('input_queue')
- if iq:
- print 'clearing out input queue'
- i = 0
- m = iq.read()
- while m:
- i += 1
- iq.delete_message(m)
- m = iq.read()
- print 'deleted %d messages' % i
- ob = self.sd.get_obj('output_bucket')
- ib = self.sd.get_obj('input_bucket')
- if ob:
- if ib and ob.name == ib.name:
- return
- print 'delete generated files in output bucket'
- i = 0
- for k in ob:
- i += 1
- k.delete()
- print 'deleted %d keys' % i
-
- def do_submit(self):
- if not self.options.path:
- self.parser.error('No path provided')
- if not os.path.exists(self.options.path):
- self.parser.error('Invalid path (%s)' % self.options.path)
- s = Submitter(self.sd)
- t = s.submit_path(self.options.path, None, self.options.ignore, None,
- None, True, self.options.path)
- print 'A total of %d files were submitted' % t[1]
- print 'Batch Identifier: %s' % t[0]
-
- def do_start(self):
- ami_id = self.sd.get('ami_id')
- instance_type = self.sd.get('instance_type', 'm1.small')
- security_group = self.sd.get('security_group', 'default')
- if not ami_id:
- self.parser.error('ami_id option is required when starting the service')
- ec2 = boto.connect_ec2()
- if not self.sd.has_section('Credentials'):
- self.sd.add_section('Credentials')
- self.sd.set('Credentials', 'aws_access_key_id', ec2.aws_access_key_id)
- self.sd.set('Credentials', 'aws_secret_access_key', ec2.aws_secret_access_key)
- s = StringIO.StringIO()
- self.sd.write(s)
- rs = ec2.get_all_images([ami_id])
- img = rs[0]
- r = img.run(user_data=s.getvalue(), key_name=self.options.keypair,
- max_count=self.options.num_instances,
- instance_type=instance_type,
- security_groups=[security_group])
- print 'Starting AMI: %s' % ami_id
- print 'Reservation %s contains the following instances:' % r.id
- for i in r.instances:
- print '\t%s' % i.id
-
- def do_status(self):
- iq = self.sd.get_obj('input_queue')
- if iq:
- print 'The input_queue (%s) contains approximately %s messages' % (iq.id, iq.count())
- ob = self.sd.get_obj('output_bucket')
- ib = self.sd.get_obj('input_bucket')
- if ob:
- if ib and ob.name == ib.name:
- return
- total = 0
- for k in ob:
- total += 1
- print 'The output_bucket (%s) contains %d keys' % (ob.name, total)
-
- def do_retrieve(self):
- if not self.options.path:
- self.parser.error('No path provided')
- if not os.path.exists(self.options.path):
- self.parser.error('Invalid path (%s)' % self.options.path)
- if not self.options.batch:
- self.parser.error('batch identifier is required for retrieve command')
- s = ResultProcessor(self.options.batch, self.sd)
- s.get_results(self.options.path, get_file=(not self.options.leave))
-
- def do_batches(self):
- d = self.sd.get_obj('output_domain')
- if d:
- print 'Available Batches:'
- rs = d.query("['type'='Batch']")
- for item in rs:
- print ' %s' % item.name
- else:
- self.parser.error('No output_domain specified for service')
-
- def main(self):
- self.options, self.args = self.parser.parse_args()
- if self.options.help_commands:
- self.print_command_help()
- sys.exit(0)
- if len(self.args) != 2:
- self.parser.error("config_file and command are required")
- self.config_file = self.args[0]
- self.sd = ServiceDef(self.config_file)
- self.command = self.args[1]
- if hasattr(self, 'do_%s' % self.command):
- method = getattr(self, 'do_%s' % self.command)
- method()
- else:
- self.parser.error('command (%s) not recognized' % self.command)
-
-if __name__ == "__main__":
- bs = BS()
- bs.main()
diff --git a/boto/services/message.py b/boto/services/message.py
deleted file mode 100644
index 79f6d19f..00000000
--- a/boto/services/message.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-from boto.sqs.message import MHMessage
-from boto.utils import get_ts
-from socket import gethostname
-import os, mimetypes, time
-
-class ServiceMessage(MHMessage):
-
- def for_key(self, key, params=None, bucket_name=None):
- if params:
- self.update(params)
- if key.path:
- t = os.path.split(key.path)
- self['OriginalLocation'] = t[0]
- self['OriginalFileName'] = t[1]
- mime_type = mimetypes.guess_type(t[1])[0]
- if mime_type == None:
- mime_type = 'application/octet-stream'
- self['Content-Type'] = mime_type
- s = os.stat(key.path)
- t = time.gmtime(s[7])
- self['FileAccessedDate'] = get_ts(t)
- t = time.gmtime(s[8])
- self['FileModifiedDate'] = get_ts(t)
- t = time.gmtime(s[9])
- self['FileCreateDate'] = get_ts(t)
- else:
- self['OriginalFileName'] = key.name
- self['OriginalLocation'] = key.bucket.name
- self['ContentType'] = key.content_type
- self['Host'] = gethostname()
- if bucket_name:
- self['Bucket'] = bucket_name
- else:
- self['Bucket'] = key.bucket.name
- self['InputKey'] = key.name
- self['Size'] = key.size
-
diff --git a/boto/services/result.py b/boto/services/result.py
deleted file mode 100644
index 98c67dcf..00000000
--- a/boto/services/result.py
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/usr/bin/env python
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-import os
-from datetime import datetime, timedelta
-from boto.utils import parse_ts
-import boto
-
-class ResultProcessor:
-
- LogFileName = 'log.csv'
-
- def __init__(self, batch_name, sd, mimetype_files=None):
- self.sd = sd
- self.batch = batch_name
- self.log_fp = None
- self.num_files = 0
- self.total_time = 0
- self.min_time = timedelta.max
- self.max_time = timedelta.min
- self.earliest_time = datetime.max
- self.latest_time = datetime.min
- self.queue = self.sd.get_obj('output_queue')
- self.domain = self.sd.get_obj('output_domain')
-
- def calculate_stats(self, msg):
- start_time = parse_ts(msg['Service-Read'])
- end_time = parse_ts(msg['Service-Write'])
- elapsed_time = end_time - start_time
- if elapsed_time > self.max_time:
- self.max_time = elapsed_time
- if elapsed_time < self.min_time:
- self.min_time = elapsed_time
- self.total_time += elapsed_time.seconds
- if start_time < self.earliest_time:
- self.earliest_time = start_time
- if end_time > self.latest_time:
- self.latest_time = end_time
-
- def log_message(self, msg, path):
- keys = sorted(msg)
- if not self.log_fp:
- self.log_fp = open(os.path.join(path, self.LogFileName), 'a')
- line = ','.join(keys)
- self.log_fp.write(line+'\n')
- values = []
- for key in keys:
- value = msg[key]
- if value.find(',') > 0:
- value = '"%s"' % value
- values.append(value)
- line = ','.join(values)
- self.log_fp.write(line+'\n')
-
- def process_record(self, record, path, get_file=True):
- self.log_message(record, path)
- self.calculate_stats(record)
- outputs = record['OutputKey'].split(',')
- if 'OutputBucket' in record:
- bucket = boto.lookup('s3', record['OutputBucket'])
- else:
- bucket = boto.lookup('s3', record['Bucket'])
- for output in outputs:
- if get_file:
- key_name = output.split(';')[0]
- key = bucket.lookup(key_name)
- file_name = os.path.join(path, key_name)
- print 'retrieving file: %s to %s' % (key_name, file_name)
- key.get_contents_to_filename(file_name)
- self.num_files += 1
-
- def get_results_from_queue(self, path, get_file=True, delete_msg=True):
- m = self.queue.read()
- while m:
- if 'Batch' in m and m['Batch'] == self.batch:
- self.process_record(m, path, get_file)
- if delete_msg:
- self.queue.delete_message(m)
- m = self.queue.read()
-
- def get_results_from_domain(self, path, get_file=True):
- rs = self.domain.query("['Batch'='%s']" % self.batch)
- for item in rs:
- self.process_record(item, path, get_file)
-
- def get_results_from_bucket(self, path):
- bucket = self.sd.get_obj('output_bucket')
- if bucket:
- print 'No output queue or domain, just retrieving files from output_bucket'
- for key in bucket:
- file_name = os.path.join(path, key)
- print 'retrieving file: %s to %s' % (key, file_name)
- key.get_contents_to_filename(file_name)
- self.num_files + 1
-
- def get_results(self, path, get_file=True, delete_msg=True):
- if not os.path.isdir(path):
- os.mkdir(path)
- if self.queue:
- self.get_results_from_queue(path, get_file)
- elif self.domain:
- self.get_results_from_domain(path, get_file)
- else:
- self.get_results_from_bucket(path)
- if self.log_fp:
- self.log_fp.close()
- print '%d results successfully retrieved.' % self.num_files
- if self.num_files > 0:
- self.avg_time = float(self.total_time)/self.num_files
- print 'Minimum Processing Time: %d' % self.min_time.seconds
- print 'Maximum Processing Time: %d' % self.max_time.seconds
- print 'Average Processing Time: %f' % self.avg_time
- self.elapsed_time = self.latest_time-self.earliest_time
- print 'Elapsed Time: %d' % self.elapsed_time.seconds
- tput = 1.0 / ((self.elapsed_time.seconds/60.0) / self.num_files)
- print 'Throughput: %f transactions / minute' % tput
-
diff --git a/boto/services/service.py b/boto/services/service.py
deleted file mode 100644
index e0e987ce..00000000
--- a/boto/services/service.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-import boto
-from boto.services.message import ServiceMessage
-from boto.services.servicedef import ServiceDef
-from boto.pyami.scriptbase import ScriptBase
-from boto.utils import get_ts
-import time
-import os
-import mimetypes
-
-
-class Service(ScriptBase):
-
- # Time required to process a transaction
- ProcessingTime = 60
-
- def __init__(self, config_file=None, mimetype_files=None):
- ScriptBase.__init__(self, config_file)
- self.name = self.__class__.__name__
- self.working_dir = boto.config.get('Pyami', 'working_dir')
- self.sd = ServiceDef(config_file)
- self.retry_count = self.sd.getint('retry_count', 5)
- self.loop_delay = self.sd.getint('loop_delay', 30)
- self.processing_time = self.sd.getint('processing_time', 60)
- self.input_queue = self.sd.get_obj('input_queue')
- self.output_queue = self.sd.get_obj('output_queue')
- self.output_domain = self.sd.get_obj('output_domain')
- if mimetype_files:
- mimetypes.init(mimetype_files)
-
- def split_key(key):
- if key.find(';') < 0:
- t = (key, '')
- else:
- key, type = key.split(';')
- label, mtype = type.split('=')
- t = (key, mtype)
- return t
-
- def read_message(self):
- boto.log.info('read_message')
- message = self.input_queue.read(self.processing_time)
- if message:
- boto.log.info(message.get_body())
- key = 'Service-Read'
- message[key] = get_ts()
- return message
-
- # retrieve the source file from S3
- def get_file(self, message):
- bucket_name = message['Bucket']
- key_name = message['InputKey']
- file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
- boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
- bucket = boto.lookup('s3', bucket_name)
- key = bucket.new_key(key_name)
- key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
- return file_name
-
- # process source file, return list of output files
- def process_file(self, in_file_name, msg):
- return []
-
- # store result file in S3
- def put_file(self, bucket_name, file_path, key_name=None):
- boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
- bucket = boto.lookup('s3', bucket_name)
- key = bucket.new_key(key_name)
- key.set_contents_from_filename(file_path)
- return key
-
- def save_results(self, results, input_message, output_message):
- output_keys = []
- for file, type in results:
- if 'OutputBucket' in input_message:
- output_bucket = input_message['OutputBucket']
- else:
- output_bucket = input_message['Bucket']
- key_name = os.path.split(file)[1]
- key = self.put_file(output_bucket, file, key_name)
- output_keys.append('%s;type=%s' % (key.name, type))
- output_message['OutputKey'] = ','.join(output_keys)
-
- # write message to each output queue
- def write_message(self, message):
- message['Service-Write'] = get_ts()
- message['Server'] = self.name
- if 'HOSTNAME' in os.environ:
- message['Host'] = os.environ['HOSTNAME']
- else:
- message['Host'] = 'unknown'
- message['Instance-ID'] = self.instance_id
- if self.output_queue:
- boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
- self.output_queue.write(message)
- if self.output_domain:
- boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
- item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
- self.output_domain.put_attributes(item_name, message)
-
- # delete message from input queue
- def delete_message(self, message):
- boto.log.info('deleting message from %s' % self.input_queue.id)
- self.input_queue.delete_message(message)
-
- # to clean up any files, etc. after each iteration
- def cleanup(self):
- pass
-
- def shutdown(self):
- on_completion = self.sd.get('on_completion', 'shutdown')
- if on_completion == 'shutdown':
- if self.instance_id:
- time.sleep(60)
- c = boto.connect_ec2()
- c.terminate_instances([self.instance_id])
-
- def main(self, notify=False):
- self.notify('Service: %s Starting' % self.name)
- empty_reads = 0
- while self.retry_count < 0 or empty_reads < self.retry_count:
- try:
- input_message = self.read_message()
- if input_message:
- empty_reads = 0
- output_message = ServiceMessage(None, input_message.get_body())
- input_file = self.get_file(input_message)
- results = self.process_file(input_file, output_message)
- self.save_results(results, input_message, output_message)
- self.write_message(output_message)
- self.delete_message(input_message)
- self.cleanup()
- else:
- empty_reads += 1
- time.sleep(self.loop_delay)
- except Exception:
- boto.log.exception('Service Failed')
- empty_reads += 1
- self.notify('Service: %s Shutting Down' % self.name)
- self.shutdown()
-
diff --git a/boto/services/servicedef.py b/boto/services/servicedef.py
deleted file mode 100644
index 1cb01aa7..00000000
--- a/boto/services/servicedef.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-from boto.pyami.config import Config
-from boto.services.message import ServiceMessage
-import boto
-
-class ServiceDef(Config):
-
- def __init__(self, config_file, aws_access_key_id=None, aws_secret_access_key=None):
- Config.__init__(self, config_file)
- self.aws_access_key_id = aws_access_key_id
- self.aws_secret_access_key = aws_secret_access_key
- script = Config.get(self, 'Pyami', 'scripts')
- if script:
- self.name = script.split('.')[-1]
- else:
- self.name = None
-
-
- def get(self, name, default=None):
- return Config.get(self, self.name, name, default)
-
- def has_option(self, option):
- return Config.has_option(self, self.name, option)
-
- def getint(self, option, default=0):
- try:
- val = Config.get(self, self.name, option)
- val = int(val)
- except:
- val = int(default)
- return val
-
- def getbool(self, option, default=False):
- try:
- val = Config.get(self, self.name, option)
- if val.lower() == 'true':
- val = True
- else:
- val = False
- except:
- val = default
- return val
-
- def get_obj(self, name):
- """
- Returns the AWS object associated with a given option.
-
- The heuristics used are a bit lame. If the option name contains
- the word 'bucket' it is assumed to be an S3 bucket, if the name
- contains the word 'queue' it is assumed to be an SQS queue and
- if it contains the word 'domain' it is assumed to be a SimpleDB
- domain. If the option name specified does not exist in the
- config file or if the AWS object cannot be retrieved this
- returns None.
- """
- val = self.get(name)
- if not val:
- return None
- if name.find('queue') >= 0:
- obj = boto.lookup('sqs', val)
- if obj:
- obj.set_message_class(ServiceMessage)
- elif name.find('bucket') >= 0:
- obj = boto.lookup('s3', val)
- elif name.find('domain') >= 0:
- obj = boto.lookup('sdb', val)
- else:
- obj = None
- return obj
-
-
diff --git a/boto/services/sonofmmm.cfg b/boto/services/sonofmmm.cfg
deleted file mode 100644
index d70d3794..00000000
--- a/boto/services/sonofmmm.cfg
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Your AWS Credentials
-# You only need to supply these in this file if you are not using
-# the boto tools to start your service
-#
-#[Credentials]
-#aws_access_key_id = <AWS Access Key Here>
-#aws_secret_access_key = <AWS Secret Key Here>
-
-#
-# Fill out this section if you want emails from the service
-# when it starts and stops
-#
-#[Notification]
-#smtp_host = <your smtp host>
-#smtp_user = <your smtp username, if necessary>
-#smtp_pass = <your smtp password, if necessary>
-#smtp_from = <email address for From: field>
-#smtp_to = <email address for To: field>
-
-[Pyami]
-scripts = boto.services.sonofmmm.SonOfMMM
-
-[SonOfMMM]
-# id of the AMI to be launched
-ami_id = ami-dc799cb5
-# number of times service will read an empty queue before exiting
-# a negative value will cause the service to run forever
-retry_count = 5
-# seconds to wait after empty queue read before reading again
-loop_delay = 10
-# average time it takes to process a transaction
-# controls invisibility timeout of messages
-processing_time = 60
-ffmpeg_args = -y -i %%s -f mov -r 29.97 -b 1200kb -mbd 2 -flags +4mv+trell -aic 2 -cmp 2 -subcmp 2 -ar 48000 -ab 19200 -s 320x240 -vcodec mpeg4 -acodec libfaac %%s
-output_mimetype = video/quicktime
-output_ext = .mov
-input_bucket = <S3 bucket where source videos live>
-output_bucket = <S3 bucket where converted videos should be stored>
-output_domain = <SimpleDB domain to store results - optional>
-output_queue = <SQS queue to store results - optional>
-input_queue = <SQS queue where work to be done will be queued up>
-
diff --git a/boto/services/sonofmmm.py b/boto/services/sonofmmm.py
deleted file mode 100644
index acb7e610..00000000
--- a/boto/services/sonofmmm.py
+++ /dev/null
@@ -1,81 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-import boto
-from boto.services.service import Service
-from boto.services.message import ServiceMessage
-import os
-import mimetypes
-
-class SonOfMMM(Service):
-
- def __init__(self, config_file=None):
- Service.__init__(self, config_file)
- self.log_file = '%s.log' % self.instance_id
- self.log_path = os.path.join(self.working_dir, self.log_file)
- boto.set_file_logger(self.name, self.log_path)
- if self.sd.has_option('ffmpeg_args'):
- self.command = '/usr/local/bin/ffmpeg ' + self.sd.get('ffmpeg_args')
- else:
- self.command = '/usr/local/bin/ffmpeg -y -i %s %s'
- self.output_mimetype = self.sd.get('output_mimetype')
- if self.sd.has_option('output_ext'):
- self.output_ext = self.sd.get('output_ext')
- else:
- self.output_ext = mimetypes.guess_extension(self.output_mimetype)
- self.output_bucket = self.sd.get_obj('output_bucket')
- self.input_bucket = self.sd.get_obj('input_bucket')
- # check to see if there are any messages queue
- # if not, create messages for all files in input_bucket
- m = self.input_queue.read(1)
- if not m:
- self.queue_files()
-
- def queue_files(self):
- boto.log.info('Queueing files from %s' % self.input_bucket.name)
- for key in self.input_bucket:
- boto.log.info('Queueing %s' % key.name)
- m = ServiceMessage()
- if self.output_bucket:
- d = {'OutputBucket' : self.output_bucket.name}
- else:
- d = None
- m.for_key(key, d)
- self.input_queue.write(m)
-
- def process_file(self, in_file_name, msg):
- base, ext = os.path.splitext(in_file_name)
- out_file_name = os.path.join(self.working_dir,
- base+self.output_ext)
- command = self.command % (in_file_name, out_file_name)
- boto.log.info('running:\n%s' % command)
- status = self.run(command)
- if status == 0:
- return [(out_file_name, self.output_mimetype)]
- else:
- return []
-
- def shutdown(self):
- if os.path.isfile(self.log_path):
- if self.output_bucket:
- key = self.output_bucket.new_key(self.log_file)
- key.set_contents_from_filename(self.log_path)
- Service.shutdown(self)
diff --git a/boto/services/submit.py b/boto/services/submit.py
deleted file mode 100644
index 89c439c5..00000000
--- a/boto/services/submit.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
-#
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the
-# "Software"), to deal in the Software without restriction, including
-# without limitation the rights to use, copy, modify, merge, publish, dis-
-# tribute, sublicense, and/or sell copies of the Software, and to permit
-# persons to whom the Software is furnished to do so, subject to the fol-
-# lowing conditions:
-#
-# The above copyright notice and this permission notice shall be included
-# in all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
-# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
-# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
-# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
-# IN THE SOFTWARE.
-
-import time
-import os
-
-
-class Submitter:
-
- def __init__(self, sd):
- self.sd = sd
- self.input_bucket = self.sd.get_obj('input_bucket')
- self.output_bucket = self.sd.get_obj('output_bucket')
- self.output_domain = self.sd.get_obj('output_domain')
- self.queue = self.sd.get_obj('input_queue')
-
- def get_key_name(self, fullpath, prefix):
- key_name = fullpath[len(prefix):]
- l = key_name.split(os.sep)
- return '/'.join(l)
-
- def write_message(self, key, metadata):
- if self.queue:
- m = self.queue.new_message()
- m.for_key(key, metadata)
- if self.output_bucket:
- m['OutputBucket'] = self.output_bucket.name
- self.queue.write(m)
-
- def submit_file(self, path, metadata=None, cb=None, num_cb=0, prefix='/'):
- if not metadata:
- metadata = {}
- key_name = self.get_key_name(path, prefix)
- k = self.input_bucket.new_key(key_name)
- k.update_metadata(metadata)
- k.set_contents_from_filename(path, replace=False, cb=cb, num_cb=num_cb)
- self.write_message(k, metadata)
-
- def submit_path(self, path, tags=None, ignore_dirs=None, cb=None, num_cb=0, status=False, prefix='/'):
- path = os.path.expanduser(path)
- path = os.path.expandvars(path)
- path = os.path.abspath(path)
- total = 0
- metadata = {}
- if tags:
- metadata['Tags'] = tags
- l = []
- for t in time.gmtime():
- l.append(str(t))
- metadata['Batch'] = '_'.join(l)
- if self.output_domain:
- self.output_domain.put_attributes(metadata['Batch'], {'type' : 'Batch'})
- if os.path.isdir(path):
- for root, dirs, files in os.walk(path):
- if ignore_dirs:
- for ignore in ignore_dirs:
- if ignore in dirs:
- dirs.remove(ignore)
- for file in files:
- fullpath = os.path.join(root, file)
- if status:
- print 'Submitting %s' % fullpath
- self.submit_file(fullpath, metadata, cb, num_cb, prefix)
- total += 1
- elif os.path.isfile(path):
- self.submit_file(path, metadata, cb, num_cb)
- total += 1
- else:
- print 'problem with %s' % path
- return (metadata['Batch'], total)