diff options
author | Mitch Garnaat <mitch@cloudright.com> | 2010-08-22 14:41:24 -0400 |
---|---|---|
committer | Mitch Garnaat <mitch@cloudright.com> | 2010-08-22 14:41:24 -0400 |
commit | 49eb7df1a13454ae1e54661a98dfaeaedda6ad81 (patch) | |
tree | 58eb158b93903e6deef31b91d00c45de19f1adbb | |
parent | c4f8050f3fa8172307ddf84f376ca800f47c728c (diff) | |
download | boto-49eb7df1a13454ae1e54661a98dfaeaedda6ad81.tar.gz |
Removing this module. It's obselete and gets confused with emr.
-rw-r--r-- | boto/mapreduce/__init__.py | 23 | ||||
-rw-r--r-- | boto/mapreduce/lqs.py | 152 | ||||
-rw-r--r-- | boto/mapreduce/partitiondb.py | 175 | ||||
-rw-r--r-- | boto/mapreduce/pdb_delete | 135 | ||||
-rwxr-xr-x | boto/mapreduce/pdb_describe | 124 | ||||
-rwxr-xr-x | boto/mapreduce/pdb_revert | 135 | ||||
-rwxr-xr-x | boto/mapreduce/pdb_upload | 172 | ||||
-rw-r--r-- | boto/mapreduce/queuetools.py | 66 |
8 files changed, 0 insertions, 982 deletions
diff --git a/boto/mapreduce/__init__.py b/boto/mapreduce/__init__.py deleted file mode 100644 index ac3ddc44..00000000 --- a/boto/mapreduce/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -# - - diff --git a/boto/mapreduce/lqs.py b/boto/mapreduce/lqs.py deleted file mode 100644 index fc76e50f..00000000 --- a/boto/mapreduce/lqs.py +++ /dev/null @@ -1,152 +0,0 @@ -import SocketServer, os, datetime, sys, random, time -import simplejson - -class LQSCommand: - - def __init__(self, line): - self.raw_line = line - self.line = self.raw_line.strip() - l = self.line.split(' ') - self.name = l[0] - if len(l) > 1: - self.args = [arg for arg in l[1:] if arg] - else: - self.args = [] - -class LQSMessage(dict): - - def __init__(self, item=None, args=None, jsonvalue=None): - dict.__init__(self) - if jsonvalue: - self.decode(jsonvalue) - else: - self['id'] = '%d_%d' % (int(time.time()), int(random.random()*1000000)) - self['item'] = item - self['args'] = args - - def encode(self): - return simplejson.dumps(self) - - def decode(self, value): - self.update(simplejson.loads(value)) - - def is_empty(self): - if self['item'] == None: - return True - return False - -class LQSServer(SocketServer.UDPServer): - - PORT = 5151 - TIMEOUT = 30 - MAXSIZE = 8192 - - def __init__(self, server_address, RequestHandlerClass, iterator, args=None): - server_address = (server_address, self.PORT) - SocketServer.UDPServer.__init__(self, server_address, RequestHandlerClass) - self.count = 0 - self.iterator = iterator - self.args = args - self.start = datetime.datetime.now() - self.end = None - self.extant = [] - -class LQSHandler(SocketServer.DatagramRequestHandler): - - def get_cmd(self): - return LQSCommand(self.rfile.readline()) - - def build_msg(self): - if not self.server.iterator: - return LQSMessage(None) - try: - item = self.server.iterator.next() - msg = LQSMessage(item, self.server.args) - return msg - except StopIteration: - self.server.iterator = None - return LQSMessage(None) - - def respond(self, msg): - self.wfile.write(msg.encode()) - - def check_extant(self): - if len(self.server.extant) == 0 and not self.server.iterator: - self.server.end = datetime.datetime.now() - delta = self.server.end - self.server.start - print 'Total Processing Time: %s' % delta - print 'Total Messages Processed: %d' % self.server.count - - def do_debug(self, cmd): - args = {'extant' : self.server.extant, - 'count' : self.server.count} - msg = LQSMessage('debug', args) - self.respond(msg) - - def do_next(self, cmd): - out_msg = self.build_msg() - if not out_msg.is_empty(): - self.server.count += 1 - self.server.extant.append(out_msg['id']) - self.respond(out_msg) - - def do_delete(self, cmd): - if len(cmd.args) != 1: - self.error(cmd, 'delete command requires message id') - else: - mid = cmd.args[0] - try: - self.server.extant.remove(mid) - except ValueError: - self.error(cmd, 'message id not found') - args = {'deleted' : True} - msg = LQSMessage(mid, args) - self.respond(msg) - self.check_extant() - - def error(self, cmd, error_msg=None): - args = {'error_msg' : error_msg, - 'cmd_name' : cmd.name, - 'cmd_args' : cmd.args} - msg = LQSMessage('error', args) - self.respond(msg) - - def do_stop(self, cmd): - sys.exit(0) - - def handle(self): - cmd = self.get_cmd() - if hasattr(self, 'do_%s' % cmd.name): - method = getattr(self, 'do_%s' % cmd.name) - method(cmd) - else: - self.error(cmd, 'unrecognized command') - -class PersistHandler(LQSHandler): - - def build_msg(self): - if not self.server.iterator: - return LQSMessage(None) - try: - obj = self.server.iterator.next() - msg = LQSMessage(obj.id, self.server.args) - return msg - except StopIteration: - self.server.iterator = None - return LQSMessage(None) - -def test_file(path, args=None): - l = os.listdir(path) - if not args: - args = {} - args['path'] = path - s = LQSServer('', LQSHandler, iter(l), args) - print "Awaiting UDP messages on port %d" % s.PORT - s.serve_forever() - -def test_simple(n): - l = range(0, n) - s = LQSServer('', LQSHandler, iter(l), None) - print "Awaiting UDP messages on port %d" % s.PORT - s.serve_forever() - diff --git a/boto/mapreduce/partitiondb.py b/boto/mapreduce/partitiondb.py deleted file mode 100644 index 25cf1353..00000000 --- a/boto/mapreduce/partitiondb.py +++ /dev/null @@ -1,175 +0,0 @@ -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -# - -import random -import os -import datetime -from boto.sdb.persist.object import SDBObject -from boto.sdb.persist.property import StringProperty, ObjectProperty, DateTimeProperty, ObjectListProperty, S3KeyProperty - - -HEX_DIGITS = '0123456789abcdef' - -class Identifier(object): - - @staticmethod - def gen(prefix): - suffix = '' - for i in range(0,8): - suffix += random.choice(HEX_DIGITS) - return prefix + '-' + suffix - - -class Version(SDBObject): - - name = StringProperty() - pdb = ObjectProperty(ref_class=SDBObject) - date = DateTimeProperty() - - def __init__(self, id=None, manager=None): - SDBObject.__init__(self, id, manager) - if id == None: - self.name = Identifier.gen('v') - self.date = datetime.datetime.now() - print 'created Version %s' % self.name - - def partitions(self): - """ - Return an iterator containing all Partition objects related to this Version. - - :rtype: iterator of :class:`boto.mapreduce.partitiondb.Partition` - :return: The Partitions in this Version - """ - return self.get_related_objects('version', Partition) - - def add_partition(self, name=None): - """ - Add a new Partition to this Version. - - :type name: string - :param name: The name of the new Partition (optional) - - :rtype: :class:`boto.mapreduce.partitiondb.Partition` - :return: The new Partition object - """ - p = Partition(manager=self.manager, name=name) - p.version = self - p.pdb = self.pdb - p.save() - return p - - def get_s3_prefix(self): - if not self.pdb: - raise ValueError, 'pdb attribute must be set to compute S3 prefix' - return self.pdb.get_s3_prefix() + self.name + '/' - -class PartitionDB(SDBObject): - - name = StringProperty() - bucket_name = StringProperty() - versions = ObjectListProperty(ref_class=Version) - - def __init__(self, id=None, manager=None, name='', bucket_name=''): - SDBObject.__init__(self, id, manager) - if id == None: - self.name = name - self.bucket_name = bucket_name - - def get_s3_prefix(self): - return self.name + '/' - - def add_version(self): - """ - Add a new Version to this PartitionDB. The newly added version becomes the - current version. - - :rtype: :class:`boto.mapreduce.partitiondb.Version` - :return: The newly created Version object. - """ - v = Version() - v.pdb = self - v.save() - self.versions.append(v) - return v - - def revert(self): - """ - Revert to the previous version of this PartitionDB. The current version is removed from the - list of Versions and the Version immediately preceeding it becomes the current version. - Note that this method does not delete the Version object or any Partitions related to the - Version object. - - :rtype: :class:`boto.mapreduce.partitiondb.Version` - :return: The previous current Version object. - """ - v = self.current_version() - if v: - self.versions.remove(v) - return v - - def current_version(self): - """ - Get the currently active Version of this PartitionDB object. - - :rtype: :class:`boto.mapreduce.partitiondb.Version` - :return: The current Version object or None if there are no Versions associated - with this PartitionDB object. - """ - if self.versions: - if len(self.versions) > 0: - return self.versions[-1] - return None - -class Partition(SDBObject): - - def __init__(self, id=None, manager=None, name=None): - SDBObject.__init__(self, id, manager) - if id == None: - self.name = name - - name = StringProperty() - version = ObjectProperty(ref_class=Version) - pdb = ObjectProperty(ref_class=PartitionDB) - data = S3KeyProperty() - - def get_key_name(self): - return self.version.get_s3_prefix() + self.name - - def upload(self, path, bucket_name=None): - if not bucket_name: - bucket_name = self.version.pdb.bucket_name - s3 = self.manager.get_s3_connection() - bucket = s3.lookup(bucket_name) - directory, filename = os.path.split(path) - self.name = filename - key = bucket.new_key(self.get_key_name()) - key.set_contents_from_filename(path) - self.data = key - self.save() - - def delete(self): - if self.data: - self.data.delete() - SDBObject.delete(self) - - - diff --git a/boto/mapreduce/pdb_delete b/boto/mapreduce/pdb_delete deleted file mode 100644 index b7af9cc0..00000000 --- a/boto/mapreduce/pdb_delete +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -import queuetools, os, signal, sys -import subprocess -import time -from optparse import OptionParser -from boto.mapreduce.partitiondb import PartitionDB, Partition, Version -from lqs import LQSServer, PersistHandler -from boto.exception import SDBPersistenceError -from boto.sdb.persist import get_manager, revive_object_from_id - -USAGE = """ - SYNOPSIS - %prog [options] [command] - DESCRIPTION - Delete a PartitionDB and all related data in SimpleDB and S3. -""" -class Client: - - def __init__(self, queue_name): - self.q = queuetools.get_queue(queue_name) - self.q.connect() - self.manager = get_manager() - self.process() - - def process(self): - m = self.q.get() - while m['item']: - print 'Deleting: %s' % m['item'] - obj = revive_object_from_id(m['item'], manager=self.manager) - obj.delete() - self.q.delete(m) - m = self.q.get() - print 'client processing complete' - -class Server: - - def __init__(self, pdb_name, domain_name=None): - self.pdb_name = pdb_name - self.manager = get_manager(domain_name) - self.pdb = PartitionDB.get(name=self.pdb_name) - self.serve() - - def serve(self): - args = {'pdb_id' : self.pdb.id} - rs = self.pdb.get_related_objects('pdb') - self.pdb.delete() - s = LQSServer('', PersistHandler, rs, args) - s.serve_forever() - -class Delete: - - Commands = {'client' : 'Start a Delete client', - 'server' : 'Start a Delete server'} - - def __init__(self): - self.parser = OptionParser(usage=USAGE) - self.parser.add_option("--help-commands", action="store_true", dest="help_commands", - help="provides help on the available commands") - self.parser.add_option('-d', '--domain-name', action='store', type='string', - help='name of the SimpleDB domain where PDB objects are stored') - self.parser.add_option('-n', '--num-processes', action='store', type='int', dest='num_processes', - help='the number of client processes launched') - self.parser.set_defaults(num_processes=5) - self.parser.add_option('-p', '--pdb-name', action='store', type='string', - help='name of the PDB in which to store files (will create if necessary)') - self.options, self.args = self.parser.parse_args() - self.prog_name = sys.argv[0] - - def print_command_help(self): - print '\nCommands:' - for key in self.Commands.keys(): - print ' %s\t\t%s' % (key, self.Commands[key]) - - def do_server(self): - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - s = Server(self.options.pdb_name, self.options.domain_name) - - def do_client(self): - c = Client('localhost') - - def main(self): - if self.options.help_commands: - self.print_command_help() - sys.exit(0) - if len(self.args) == 0: - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - server_command = '%s -p %s ' % (self.prog_name, self.options.pdb_name) - server_command += ' server' - client_command = '%s client' % self.prog_name - server = subprocess.Popen(server_command, shell=True) - print 'server pid: %s' % server.pid - time.sleep(5) - clients = [] - for i in range(0, self.options.num_processes): - client = subprocess.Popen(client_command, shell=True) - clients.append(client) - print 'waiting for clients to finish' - for client in clients: - client.wait() - os.kill(server.pid, signal.SIGTERM) - elif len(self.args) == 1: - self.command = self.args[0] - if hasattr(self, 'do_%s' % self.command): - method = getattr(self, 'do_%s' % self.command) - method() - else: - self.parser.error('command (%s) not recognized' % self.command) - else: - self.parser.error('unrecognized commands') - -if __name__ == "__main__": - delete = Delete() - delete.main() diff --git a/boto/mapreduce/pdb_describe b/boto/mapreduce/pdb_describe deleted file mode 100755 index d0fa86c6..00000000 --- a/boto/mapreduce/pdb_describe +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -import sys -from optparse import OptionParser -from boto.mapreduce.partitiondb import PartitionDB, Partition, Version -from boto.exception import SDBPersistenceError -from boto.sdb.persist import get_manager, get_domain - -USAGE = """ - SYNOPSIS - %prog [options] - DESCRIPTION - List and describe your PartitionDBs. - Called with no options, all PartitionDB objects defined in your default - domain (as specified in the "default_domain" option in the "[Persist]" - section of your boto config file) will be listed. - When called with a particular PartitionDB name (using -p option) all - Version objects of that PartitionDB object will be listed. - When called with the -p option and a particular Version name specified - (using the -v option) all Partitions in that Version object will be listed. -""" -class Describe: - - def __init__(self): - self.parser = OptionParser(usage=USAGE) - self.parser.add_option('-d', '--domain-name', action='store', type='string', - help='name of the SimpleDB domain where PDB objects are stored') - self.parser.add_option('-n', '--num-entries', action='store', type='int', - help='maximum number of entries to print (default 100)') - self.parser.set_defaults(num_entries=100) - self.parser.add_option('-p', '--pdb-name', action='store', type='string', - help='name of the PDB to describe') - self.parser.add_option('-v', '--version-name', action='store', type='string', - help='name of the PDB Version to describe') - self.options, self.args = self.parser.parse_args() - self.prog_name = sys.argv[0] - - def describe_all(self): - print 'Using SimpleDB Domain: %s' % get_domain() - print 'PDBs:' - rs = PartitionDB.list() - i = 0 - for pdb in rs: - print '%s\t%s\t%s' % (pdb.id, pdb.name, pdb.bucket_name) - i += 1 - if i == self.options.num_entries: - break - - def describe_pdb(self, pdb_name): - print 'Using SimpleDB Domain: %s' % get_domain() - print 'PDB: %s' % pdb_name - print 'Versions:' - try: - pdb = PartitionDB.get(name=pdb_name) - i = 0 - for v in pdb.versions: - if v.date: - ds = v.date.isoformat() - else: - ds = 'unknown' - print '%s\t%s\t%s' % (v.id, v.name, ds) - i += 1 - if i == self.options.num_entries: - break - cv = pdb.current_version() - if cv: - print 'Current Version: %s' % cv.name - else: - print 'Current Version: None' - except SDBPersistenceError: - self.parser.error('pdb_name (%s) unknown' % pdb_name) - - def describe_version(self, pdb_name, version_name): - print 'Using SimpleDB Domain: %s' % get_domain() - print 'PDB: %s' % pdb_name - print 'Version: %s' % version_name - print 'Partitions:' - try: - pdb = PartitionDB.get(name=pdb_name) - for v in pdb.versions: - if v.name == version_name: - i = 0 - for p in v.partitions(): - print '%s\t%s' % (p.id, p.name) - i += 1 - if i == self.options.num_entries: - break - except SDBPersistenceError: - self.parser.error('pdb_name (%s) unknown' % pdb_name) - - def main(self): - self.options, self.args = self.parser.parse_args() - self.manager = get_manager(self.options.domain_name) - - if self.options.pdb_name: - if self.options.version_name: - self.describe_version(self.options.pdb_name, self.options.version_name) - else: - self.describe_pdb(self.options.pdb_name) - else: - self.describe_all() - -if __name__ == "__main__": - describe = Describe() - describe.main() diff --git a/boto/mapreduce/pdb_revert b/boto/mapreduce/pdb_revert deleted file mode 100755 index daffeef1..00000000 --- a/boto/mapreduce/pdb_revert +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -import queuetools, os, signal, sys -import subprocess -import time -from optparse import OptionParser -from boto.mapreduce.partitiondb import PartitionDB, Partition, Version -from lqs import LQSServer, PersistHandler -from boto.exception import SDBPersistenceError -from boto.sdb.persist import get_manager - -USAGE = """ - SYNOPSIS - %prog [options] [command] - DESCRIPTION - Revert to the previous Version in a PartitionDB. -""" -class Client: - - def __init__(self, queue_name): - self.q = queuetools.get_queue(queue_name) - self.q.connect() - self.manager = get_manager() - self.process() - - def process(self): - m = self.q.get() - while m['item']: - print 'Deleting: %s' % m['item'] - p = Partition(id=m['item'], manager=self.manager) - p.delete() - self.q.delete(m) - m = self.q.get() - print 'client processing complete' - -class Server: - - def __init__(self, pdb_name, domain_name=None): - self.pdb_name = pdb_name - self.manager = get_manager(domain_name) - self.pdb = PartitionDB.get(name=self.pdb_name) - self.serve() - - def serve(self): - v = self.pdb.revert() - args = {'v_id' : v.id} - rs = v.partitions() - s = LQSServer('', PersistHandler, rs, args) - s.serve_forever() - -class Revert: - - Commands = {'client' : 'Start a Revert client', - 'server' : 'Start a Revert server'} - - def __init__(self): - self.parser = OptionParser(usage=USAGE) - self.parser.add_option("--help-commands", action="store_true", dest="help_commands", - help="provides help on the available commands") - self.parser.add_option('-d', '--domain-name', action='store', type='string', - help='name of the SimpleDB domain where PDB objects are stored') - self.parser.add_option('-n', '--num-processes', action='store', type='int', dest='num_processes', - help='the number of client processes launched') - self.parser.set_defaults(num_processes=5) - self.parser.add_option('-p', '--pdb-name', action='store', type='string', - help='name of the PDB in which to store files (will create if necessary)') - self.options, self.args = self.parser.parse_args() - self.prog_name = sys.argv[0] - - def print_command_help(self): - print '\nCommands:' - for key in self.Commands.keys(): - print ' %s\t\t%s' % (key, self.Commands[key]) - - def do_server(self): - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - s = Server(self.options.pdb_name, self.options.domain_name) - - def do_client(self): - c = Client('localhost') - - def main(self): - if self.options.help_commands: - self.print_command_help() - sys.exit(0) - if len(self.args) == 0: - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - server_command = '%s -p %s ' % (self.prog_name, self.options.pdb_name) - server_command += ' server' - client_command = '%s client' % self.prog_name - server = subprocess.Popen(server_command, shell=True) - print 'server pid: %s' % server.pid - time.sleep(5) - clients = [] - for i in range(0, self.options.num_processes): - client = subprocess.Popen(client_command, shell=True) - clients.append(client) - print 'waiting for clients to finish' - for client in clients: - client.wait() - os.kill(server.pid, signal.SIGTERM) - elif len(self.args) == 1: - self.command = self.args[0] - if hasattr(self, 'do_%s' % self.command): - method = getattr(self, 'do_%s' % self.command) - method() - else: - self.parser.error('command (%s) not recognized' % self.command) - else: - self.parser.error('unrecognized commands') - -if __name__ == "__main__": - revert = Revert() - revert.main() diff --git a/boto/mapreduce/pdb_upload b/boto/mapreduce/pdb_upload deleted file mode 100755 index 1ca2b6d3..00000000 --- a/boto/mapreduce/pdb_upload +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python -# Copyright (c) 2006-2008 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. -import queuetools, os, signal, sys -import subprocess -import time -from optparse import OptionParser -from boto.mapreduce.partitiondb import PartitionDB, Partition, Version -from lqs import LQSServer, LQSHandler -from boto.exception import SDBPersistenceError -from boto.sdb.persist import get_manager - -USAGE = """ - SYNOPSIS - %prog [options] - DESCRIPTION - Upload partition files to a PartitionDB. - Called with no options, all PartitionDB objects defined in your default - domain (as specified in the "default_domain" option in the "[Persist]" - section of your boto config file) will be listed. - When called with a particular PartitionDB name (using -p option) all - Version objects of that PartitionDB object will be listed. - When called with the -p option and a particular Version name specified - (using the -v option) all Partitions in that Version object will be listed. -""" -class Client: - - def __init__(self, queue_name): - self.q = queuetools.get_queue(queue_name) - self.q.connect() - self.manager = get_manager() - self.process() - - def process(self): - m = self.q.get() - if m['item']: - v = Version(m['args']['v_id'], self.manager) - bucket_name = v.pdb.bucket_name - while m['item']: - print 'Uploading: %s' % m['item'] - p = v.add_partition(name=m['item']) - p.upload(os.path.join(m['args']['path'], m['item']), bucket_name) - self.q.delete(m) - m = self.q.get() - print 'client processing complete' - -class Server: - - def __init__(self, path, pdb_name, bucket_name=None, domain_name=None): - self.path = path - self.pdb_name = pdb_name - self.bucket_name = bucket_name - self.manager = get_manager(domain_name) - self.get_pdb() - self.serve() - - def get_pdb(self): - try: - self.pdb = PartitionDB.get(name=self.pdb_name) - except SDBPersistenceError: - self.pdb = PartitionDB(manager=self.manager, name=self.pdb_name, bucket_name=self.bucket_name) - self.pdb.save() - - def serve(self): - v = self.pdb.add_version() - args = {'path' : self.path, - 'v_id' : v.id} - l = os.listdir(self.path) - s = LQSServer('', LQSHandler, iter(l), args) - s.serve_forever() - -class Upload: - - Usage = "usage: %prog [options] command" - - Commands = {'client' : 'Start an Upload client', - 'server' : 'Start an Upload server'} - - def __init__(self): - self.parser = OptionParser(usage=self.Usage) - self.parser.add_option("--help-commands", action="store_true", dest="help_commands", - help="provides help on the available commands") - self.parser.add_option('-d', '--domain-name', action='store', type='string', - help='name of the SimpleDB domain where PDB objects are stored') - self.parser.add_option('-n', '--num-processes', action='store', type='int', dest='num_processes', - help='the number of client processes launched') - self.parser.set_defaults(num_processes=2) - self.parser.add_option('-i', '--input-path', action='store', type='string', - help='the path to directory to upload') - self.parser.add_option('-p', '--pdb-name', action='store', type='string', - help='name of the PDB in which to store files (will create if necessary)') - self.parser.add_option('-b', '--bucket-name', action='store', type='string', - help='name of S3 bucket (only needed if creating new PDB)') - self.options, self.args = self.parser.parse_args() - self.prog_name = sys.argv[0] - - def print_command_help(self): - print '\nCommands:' - for key in self.Commands.keys(): - print ' %s\t\t%s' % (key, self.Commands[key]) - - def do_server(self): - if not self.options.input_path: - self.parser.error('No path provided') - if not os.path.isdir(self.options.input_path): - self.parser.error('Invalid path (%s)' % self.options.input_path) - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - s = Server(self.options.input_path, self.options.pdb_name, - self.options.bucket_name, self.options.domain_name) - - def do_client(self): - c = Client('localhost') - - def main(self): - if self.options.help_commands: - self.print_command_help() - sys.exit(0) - if len(self.args) == 0: - if not self.options.input_path: - self.parser.error('No path provided') - if not os.path.isdir(self.options.input_path): - self.parser.error('Invalid path (%s)' % self.options.input_path) - if not self.options.pdb_name: - self.parser.error('No PDB name provided') - server_command = '%s -p %s -i %s' % (self.prog_name, self.options.pdb_name, self.options.input_path) - if self.options.bucket_name: - server_command += ' -b %s' % self.options.bucket_name - server_command += ' server' - client_command = '%s client' % self.prog_name - server = subprocess.Popen(server_command, shell=True) - print 'server pid: %s' % server.pid - time.sleep(5) - clients = [] - for i in range(0, self.options.num_processes): - client = subprocess.Popen(client_command, shell=True) - clients.append(client) - print 'waiting for clients to finish' - for client in clients: - client.wait() - os.kill(server.pid, signal.SIGTERM) - elif len(self.args) == 1: - self.command = self.args[0] - if hasattr(self, 'do_%s' % self.command): - method = getattr(self, 'do_%s' % self.command) - method() - else: - self.parser.error('command (%s) not recognized' % self.command) - else: - self.parser.error('unrecognized commands') - -if __name__ == "__main__": - upload = Upload() - upload.main() diff --git a/boto/mapreduce/queuetools.py b/boto/mapreduce/queuetools.py deleted file mode 100644 index db1e495c..00000000 --- a/boto/mapreduce/queuetools.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/python -import socket -from lqs import LQSServer, LQSMessage -import boto -from boto.sqs.jsonmessage import JSONMessage - -class LQSClient: - - def __init__(self, host): - self.host = host - self.port = LQSServer.PORT - self.timeout = LQSServer.TIMEOUT - self.max_len = LQSServer.MAXSIZE - self.sock = None - - def connect(self): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.sock.settimeout(self.timeout) - self.sock.connect((self.host, self.port)) - - def decode(self, jsonstr): - return LQSMessage(jsonvalue=jsonstr) - - def get(self): - self.sock.send('next') - try: - jsonstr = self.sock.recv(self.max_len) - msg = LQSMessage(jsonvalue=jsonstr) - return msg - except: - print "recv from %s failed" % self.host - - def delete(self, msg): - self.sock.send('delete %s' % msg['id']) - try: - jsonstr = self.sock.recv(self.max_len) - msg = LQSMessage(jsonvalue=jsonstr) - return msg - except: - print "recv from %s failed" % self.host - - def close(self): - self.sock.close() - -class SQSClient: - - def __init__(self, queue_name): - self.queue_name = queue_name - - def connect(self): - self.queue = boto.lookup('sqs', self.queue_name) - self.queue.set_mesasge_class(JSONMessage) - - def get(self): - m = self.queue.read() - return m.get_body() - - def close(self): - pass - -def get_queue(name): - if name == 'localhost': - return LQSClient(name) - else: - return SQSClient(name) - |