diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-06-28 11:59:34 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-08-05 22:19:46 -0400 |
| commit | 30885744142d89740d459f4dae670ba4775d1d8c (patch) | |
| tree | 0fdd30c0c96778029a48414195f95c5b1fdba30c /examples/sharding | |
| parent | c7b489b25802f7a25ef78d0731411295c611cc1c (diff) | |
| download | sqlalchemy-30885744142d89740d459f4dae670ba4775d1d8c.tar.gz | |
Documentation updates for 1.4
* major additions to 1.4 migration doc; removed additional
verbosity regarding caching methodology and reorganized the
doc to present itself more as a "what's changed" guide
* as we now have a path for asyncio, update that doc so that
we aren't spreading obsolete information
* updates to the 2.0 migration guide with latest info, however
this is still an architecture doc and not a migration guide
yet, will need further rework.
* start really talking about 1.x vs. 2.0 style everywhere. Querying
is most of the docs so this is going to be a prominent
theme, start getting it to fit in
* Add introductory documentation for ORM example sections as these
are too sparse
* new documentation for do_orm_execute(), many separate sections,
adding deprecation notes to before_compile() and similar
* new example suites to illustrate do_orm_execute(),
with_loader_criteria()
* modernized horizontal sharding examples and added a separate
example to distinguish between multiple databases and single
database w/ multiple tables use case
* introducing DEEP ALCHEMY, will use zzzeeksphinx 1.1.6
* no name for the alchemist yet however the dragon's name
is Flambé
Change-Id: Id6b5c03b1ce9ddb7b280f66792212a0ef0a1c541
Diffstat (limited to 'examples/sharding')
| -rw-r--r-- | examples/sharding/__init__.py | 19 | ||||
| -rw-r--r-- | examples/sharding/separate_databases.py (renamed from examples/sharding/attribute_shard.py) | 97 | ||||
| -rw-r--r-- | examples/sharding/separate_tables.py | 316 |
3 files changed, 379 insertions, 53 deletions
diff --git a/examples/sharding/__init__.py b/examples/sharding/__init__.py index eb8e10686..90cf6cc6c 100644 --- a/examples/sharding/__init__.py +++ b/examples/sharding/__init__.py @@ -4,21 +4,28 @@ databases. The basic components of a "sharded" mapping are: -* multiple databases, each assigned a 'shard id' +* multiple :class:`_engine.Engine` instances, each assigned a "shard id". + These :class:`_engine.Engine` instances may refer to different databases, + or different schemas / accounts within the same database, or they can + even be differentiated only by options that will cause them to access + different schemas or tables when used. + * a function which can return a single shard id, given an instance to be saved; this is called "shard_chooser" + * a function which can return a list of shard ids which apply to a particular instance identifier; this is called "id_chooser".If it returns all shard ids, all shards will be searched. + * a function which can return a list of shard ids to try, given a particular Query ("query_chooser"). If it returns all shard ids, all shards will be queried and the results joined together. -In this example, four sqlite databases will store information about weather -data on a database-per-continent basis. We provide example shard_chooser, -id_chooser and query_chooser functions. The query_chooser illustrates -inspection of the SQL expression element in order to attempt to determine a -single shard being requested. +In these examples, different kinds of shards are used against the same basic +example which accommodates weather data on a per-continent basis. We provide +example shard_chooser, id_chooser and query_chooser functions. The +query_chooser illustrates inspection of the SQL expression element in order to +attempt to determine a single shard being requested. The construction of generic sharding routines is an ambitious approach to the issue of organizing instances among multiple databases. For a diff --git a/examples/sharding/attribute_shard.py b/examples/sharding/separate_databases.py index 7b8f87d90..95f12fa72 100644 --- a/examples/sharding/attribute_shard.py +++ b/examples/sharding/separate_databases.py @@ -1,3 +1,5 @@ +"""Illustrates sharding using distinct SQLite databases.""" + import datetime from sqlalchemy import Column @@ -7,6 +9,7 @@ from sqlalchemy import Float from sqlalchemy import ForeignKey from sqlalchemy import inspect from sqlalchemy import Integer +from sqlalchemy import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy.ext.declarative import declarative_base @@ -26,15 +29,15 @@ db4 = create_engine("sqlite://", echo=echo) # create session function. this binds the shard ids # to databases within a ShardedSession and returns it. -create_session = sessionmaker(class_=ShardedSession) - -create_session.configure( +Session = sessionmaker( + class_=ShardedSession, + future=True, shards={ "north_america": db1, "asia": db2, "europe": db3, "south_america": db4, - } + }, ) @@ -54,7 +57,7 @@ ids = Table("ids", Base.metadata, Column("nextid", Integer, nullable=False)) def id_generator(ctx): # in reality, might want to use a separate transaction for this. with db1.connect() as conn: - nextid = conn.scalar(ids.select(for_update=True)) + nextid = conn.scalar(ids.select().with_for_update()) conn.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1})) return nextid @@ -99,11 +102,11 @@ class Report(Base): # create tables for db in (db1, db2, db3, db4): - Base.metadata.drop_all(db) Base.metadata.create_all(db) # establish initial "id" in db1 -db1.execute(ids.insert(), nextid=1) +with db1.begin() as conn: + conn.execute(ids.insert(), nextid=1) # step 5. define sharding functions. @@ -199,18 +202,7 @@ def _get_query_comparisons(query): def visit_bindparam(bind): # visit a bind parameter. - # check in _params for it first - if bind.key in query._params: - value = query._params[bind.key] - elif bind.callable: - # some ORM functions (lazy loading) - # place the bind's value as a - # callable for deferred evaluation. - value = bind.callable() - else: - # just use .value - value = bind.value - + value = bind.effective_value binds[bind] = value def visit_column(column): @@ -230,9 +222,9 @@ def _get_query_comparisons(query): # here we will traverse through the query's criterion, searching # for SQL constructs. We will place simple column comparisons # into a list. - if query._criterion is not None: - visitors.traverse_depthfirst( - query._criterion, + if query.whereclause is not None: + visitors.traverse( + query.whereclause, {}, { "bindparam": visit_bindparam, @@ -244,7 +236,7 @@ def _get_query_comparisons(query): # further configure create_session to use these functions -create_session.configure( +Session.configure( shard_chooser=shard_chooser, id_chooser=id_chooser, query_chooser=query_chooser, @@ -264,36 +256,47 @@ tokyo.reports.append(Report(80.0)) newyork.reports.append(Report(75)) quito.reports.append(Report(85)) -sess = create_session() +with Session() as sess: -sess.add_all([tokyo, newyork, toronto, london, dublin, brasilia, quito]) + sess.add_all([tokyo, newyork, toronto, london, dublin, brasilia, quito]) -sess.commit() + sess.commit() -t = sess.query(WeatherLocation).get(tokyo.id) -assert t.city == tokyo.city -assert t.reports[0].temperature == 80.0 + t = sess.get(WeatherLocation, tokyo.id) + assert t.city == tokyo.city + assert t.reports[0].temperature == 80.0 -north_american_cities = sess.query(WeatherLocation).filter( - WeatherLocation.continent == "North America" -) -assert {c.city for c in north_american_cities} == {"New York", "Toronto"} + north_american_cities = sess.execute( + select(WeatherLocation).filter( + WeatherLocation.continent == "North America" + ) + ).scalars() -asia_and_europe = sess.query(WeatherLocation).filter( - WeatherLocation.continent.in_(["Europe", "Asia"]) -) -assert {c.city for c in asia_and_europe} == {"Tokyo", "London", "Dublin"} + assert {c.city for c in north_american_cities} == {"New York", "Toronto"} + + asia_and_europe = sess.execute( + select(WeatherLocation).filter( + WeatherLocation.continent.in_(["Europe", "Asia"]) + ) + ).scalars() -# the Report class uses a simple integer primary key. So across two databases, -# a primary key will be repeated. The "identity_token" tracks in memory -# that these two identical primary keys are local to different databases. -newyork_report = newyork.reports[0] -tokyo_report = tokyo.reports[0] + assert {c.city for c in asia_and_europe} == {"Tokyo", "London", "Dublin"} -assert inspect(newyork_report).identity_key == (Report, (1,), "north_america") -assert inspect(tokyo_report).identity_key == (Report, (1,), "asia") + # the Report class uses a simple integer primary key. So across two + # databases, a primary key will be repeated. The "identity_token" tracks + # in memory that these two identical primary keys are local to different + # databases. + newyork_report = newyork.reports[0] + tokyo_report = tokyo.reports[0] + + assert inspect(newyork_report).identity_key == ( + Report, + (1,), + "north_america", + ) + assert inspect(tokyo_report).identity_key == (Report, (1,), "asia") -# the token representing the originating shard is also available directly + # the token representing the originating shard is also available directly -assert inspect(newyork_report).identity_token == "north_america" -assert inspect(tokyo_report).identity_token == "asia" + assert inspect(newyork_report).identity_token == "north_america" + assert inspect(tokyo_report).identity_token == "asia" diff --git a/examples/sharding/separate_tables.py b/examples/sharding/separate_tables.py new file mode 100644 index 000000000..f24dde288 --- /dev/null +++ b/examples/sharding/separate_tables.py @@ -0,0 +1,316 @@ +"""Illustrates sharding using a single SQLite database, that will however +have multiple tables using a naming convention.""" + +import datetime + +from sqlalchemy import Column +from sqlalchemy import create_engine +from sqlalchemy import DateTime +from sqlalchemy import event +from sqlalchemy import Float +from sqlalchemy import ForeignKey +from sqlalchemy import inspect +from sqlalchemy import Integer +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.horizontal_shard import ShardedSession +from sqlalchemy.orm import relationship +from sqlalchemy.orm import sessionmaker +from sqlalchemy.sql import operators +from sqlalchemy.sql import visitors + + +echo = True +engine = create_engine("sqlite://", echo=echo) + +db1 = engine.execution_options(table_prefix="north_america") +db2 = engine.execution_options(table_prefix="asia") +db3 = engine.execution_options(table_prefix="europe") +db4 = engine.execution_options(table_prefix="south_america") + + +@event.listens_for(engine, "before_cursor_execute", retval=True) +def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany +): + table_prefix = context.execution_options.get("table_prefix", None) + if table_prefix: + statement = statement.replace("_prefix_", table_prefix) + return statement, parameters + + +# create session function. this binds the shard ids +# to databases within a ShardedSession and returns it. +Session = sessionmaker( + class_=ShardedSession, + future=True, + shards={ + "north_america": db1, + "asia": db2, + "europe": db3, + "south_america": db4, + }, +) + + +# mappings and tables +Base = declarative_base() + +# we need a way to create identifiers which are unique across all databases. +# one easy way would be to just use a composite primary key, where one value +# is the shard id. but here, we'll show something more "generic", an id +# generation function. we'll use a simplistic "id table" stored in database +# #1. Any other method will do just as well; UUID, hilo, application-specific, +# etc. + +ids = Table("ids", Base.metadata, Column("nextid", Integer, nullable=False)) + + +def id_generator(ctx): + # in reality, might want to use a separate transaction for this. + with engine.connect() as conn: + nextid = conn.scalar(ids.select().with_for_update()) + conn.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1})) + return nextid + + +# table setup. we'll store a lead table of continents/cities, and a secondary +# table storing locations. a particular row will be placed in the database +# whose shard id corresponds to the 'continent'. in this setup, secondary rows +# in 'weather_reports' will be placed in the same DB as that of the parent, but +# this can be changed if you're willing to write more complex sharding +# functions. + + +class WeatherLocation(Base): + __tablename__ = "_prefix__weather_locations" + + id = Column(Integer, primary_key=True, default=id_generator) + continent = Column(String(30), nullable=False) + city = Column(String(50), nullable=False) + + reports = relationship("Report", backref="location") + + def __init__(self, continent, city): + self.continent = continent + self.city = city + + +class Report(Base): + __tablename__ = "_prefix__weather_reports" + + id = Column(Integer, primary_key=True) + location_id = Column( + "location_id", Integer, ForeignKey("_prefix__weather_locations.id") + ) + temperature = Column("temperature", Float) + report_time = Column( + "report_time", DateTime, default=datetime.datetime.now + ) + + def __init__(self, temperature): + self.temperature = temperature + + +# create tables +for db in (db1, db2, db3, db4): + Base.metadata.create_all(db) + +# establish initial "id" in db1 +with db1.begin() as conn: + conn.execute(ids.insert(), nextid=1) + + +# step 5. define sharding functions. + +# we'll use a straight mapping of a particular set of "country" +# attributes to shard id. +shard_lookup = { + "North America": "north_america", + "Asia": "asia", + "Europe": "europe", + "South America": "south_america", +} + + +def shard_chooser(mapper, instance, clause=None): + """shard chooser. + + looks at the given instance and returns a shard id + note that we need to define conditions for + the WeatherLocation class, as well as our secondary Report class which will + point back to its WeatherLocation via its 'location' attribute. + + """ + if isinstance(instance, WeatherLocation): + return shard_lookup[instance.continent] + else: + return shard_chooser(mapper, instance.location) + + +def id_chooser(query, ident): + """id chooser. + + given a primary key, returns a list of shards + to search. here, we don't have any particular information from a + pk so we just return all shard ids. often, you'd want to do some + kind of round-robin strategy here so that requests are evenly + distributed among DBs. + + """ + if query.lazy_loaded_from: + # if we are in a lazy load, we can look at the parent object + # and limit our search to that same shard, assuming that's how we've + # set things up. + return [query.lazy_loaded_from.identity_token] + else: + return ["north_america", "asia", "europe", "south_america"] + + +def query_chooser(query): + """query chooser. + + this also returns a list of shard ids, which can + just be all of them. but here we'll search into the Query in order + to try to narrow down the list of shards to query. + + """ + ids = [] + + # we'll grab continent names as we find them + # and convert to shard ids + for column, operator, value in _get_query_comparisons(query): + # "shares_lineage()" returns True if both columns refer to the same + # statement column, adjusting for any annotations present. + # (an annotation is an internal clone of a Column object + # and occur when using ORM-mapped attributes like + # "WeatherLocation.continent"). A simpler comparison, though less + # accurate, would be "column.key == 'continent'". + if column.shares_lineage(WeatherLocation.__table__.c.continent): + if operator == operators.eq: + ids.append(shard_lookup[value]) + elif operator == operators.in_op: + ids.extend(shard_lookup[v] for v in value) + + if len(ids) == 0: + return ["north_america", "asia", "europe", "south_america"] + else: + return ids + + +def _get_query_comparisons(query): + """Search an orm.Query object for binary expressions. + + Returns expressions which match a Column against one or more + literal values as a list of tuples of the form + (column, operator, values). "values" is a single value + or tuple of values depending on the operator. + + """ + binds = {} + clauses = set() + comparisons = [] + + def visit_bindparam(bind): + # visit a bind parameter. + + value = bind.effective_value + binds[bind] = value + + def visit_column(column): + clauses.add(column) + + def visit_binary(binary): + if binary.left in clauses and binary.right in binds: + comparisons.append( + (binary.left, binary.operator, binds[binary.right]) + ) + + elif binary.left in binds and binary.right in clauses: + comparisons.append( + (binary.right, binary.operator, binds[binary.left]) + ) + + # here we will traverse through the query's criterion, searching + # for SQL constructs. We will place simple column comparisons + # into a list. + if query.whereclause is not None: + visitors.traverse( + query.whereclause, + {}, + { + "bindparam": visit_bindparam, + "binary": visit_binary, + "column": visit_column, + }, + ) + return comparisons + + +# further configure create_session to use these functions +Session.configure( + shard_chooser=shard_chooser, + id_chooser=id_chooser, + query_chooser=query_chooser, +) + +# save and load objects! + +tokyo = WeatherLocation("Asia", "Tokyo") +newyork = WeatherLocation("North America", "New York") +toronto = WeatherLocation("North America", "Toronto") +london = WeatherLocation("Europe", "London") +dublin = WeatherLocation("Europe", "Dublin") +brasilia = WeatherLocation("South America", "Brasila") +quito = WeatherLocation("South America", "Quito") + +tokyo.reports.append(Report(80.0)) +newyork.reports.append(Report(75)) +quito.reports.append(Report(85)) + +with Session() as sess: + + sess.add_all([tokyo, newyork, toronto, london, dublin, brasilia, quito]) + + sess.commit() + + t = sess.get(WeatherLocation, tokyo.id) + assert t.city == tokyo.city + assert t.reports[0].temperature == 80.0 + + north_american_cities = sess.execute( + select(WeatherLocation).filter( + WeatherLocation.continent == "North America" + ) + ).scalars() + + assert {c.city for c in north_american_cities} == {"New York", "Toronto"} + + asia_and_europe = sess.execute( + select(WeatherLocation).filter( + WeatherLocation.continent.in_(["Europe", "Asia"]) + ) + ).scalars() + + assert {c.city for c in asia_and_europe} == {"Tokyo", "London", "Dublin"} + + # the Report class uses a simple integer primary key. So across two + # databases, a primary key will be repeated. The "identity_token" tracks + # in memory that these two identical primary keys are local to different + # databases. + newyork_report = newyork.reports[0] + tokyo_report = tokyo.reports[0] + + assert inspect(newyork_report).identity_key == ( + Report, + (1,), + "north_america", + ) + assert inspect(tokyo_report).identity_key == (Report, (1,), "asia") + + # the token representing the originating shard is also available directly + + assert inspect(newyork_report).identity_token == "north_america" + assert inspect(tokyo_report).identity_token == "asia" |
