#!/usr/bin/env python import argparse import os import boto from boto.compat import json from boto.dynamodb.schema import Schema DESCRIPTION = """Load data into one or more DynamoDB tables. For each table, data is read from two files: - {table_name}.metadata for the table's name, schema and provisioned throughput (only required if creating the table). - {table_name}.data for the table's actual contents. Both files are searched for in the current directory. To read them from somewhere else, use the --in-dir parameter. This program does not wipe the tables prior to loading data. However, any items present in the data files will overwrite the table's contents. """ def _json_iterload(fd): """Lazily load newline-separated JSON objects from a file-like object.""" buffer = "" eof = False while not eof: try: # Add a line to the buffer buffer += fd.next() except StopIteration: # We can't let that exception bubble up, otherwise the last # object in the file will never be decoded. eof = True try: # Try to decode a JSON object. json_object = json.loads(buffer.strip()) # Success: clear the buffer (everything was decoded). buffer = "" except ValueError: if eof and buffer.strip(): # No more lines to load and the buffer contains something other # than whitespace: the file is, in fact, malformed. raise # We couldn't decode a complete JSON object: load more lines. continue yield json_object def create_table(metadata_fd): """Create a table from a metadata file-like object.""" def load_table(table, in_fd): """Load items into a table from a file-like object.""" for i in _json_iterload(in_fd): # Convert lists back to sets. data = {} for k, v in i.iteritems(): if isinstance(v, list): data[k] = set(v) else: data[k] = v table.new_item(attrs=data).put() def dynamodb_load(tables, in_dir, create_tables): conn = boto.connect_dynamodb() for t in tables: metadata_file = os.path.join(in_dir, "%s.metadata" % t) data_file = os.path.join(in_dir, "%s.data" % t) if create_tables: with open(metadata_file) as meta_fd: metadata = json.load(meta_fd) table = conn.create_table( name=t, schema=Schema(metadata["schema"]), read_units=metadata["read_units"], write_units=metadata["write_units"], ) table.refresh(wait_for_active=True) else: table = conn.get_table(t) with open(data_file) as in_fd: load_table(table, in_fd) if __name__ == "__main__": parser = argparse.ArgumentParser( prog="dynamodb_load", description=DESCRIPTION ) parser.add_argument( "--create-tables", action="store_true", help="Create the tables if they don't exist already (without this flag, attempts to load data into non-existing tables fail)." ) parser.add_argument("--in-dir", default=".") parser.add_argument("tables", metavar="TABLES", nargs="+") namespace = parser.parse_args() dynamodb_load(namespace.tables, namespace.in_dir, namespace.create_tables)