Parallelizing DB tests
DAGZ rewrites the DB name in each driver-level connect call so every worker reaches a distinct database. Calling code is unchanged.
DAGZ does not manage databases.
An integration sets up its database support by calling configure() and supplying lifecycle callbacks that invoke the project's existing DB tooling at fixed points in the test run.
Overview
DAGZ provides utilities for parallelizing tests:
- DB family support (
DbConfig): abstract class defining a DB-agnostic model for rerouting. - Per-test prepare (
prepare(db_name)): callback that runs before each test on the rerouted DB. - Per-worker setup (
worker_init(worker_num)): callback that runs once per worker on its first DB access. - Manual init (
get_manual_init_db_name(name)): resolves to the per-worker form and marks it skip-prepare for the current test. - Bypass (
bypass()): context manager that suppresses rerouting. - Main-worker marker (
@pytest.mark.dagz_main_worker): schedules a test on the main worker, which can be configured to bypass rerouting per DB. - Node-mutex marker (
@pytest.mark.dagz_node_mutex): schedules a test alone on its worker node. - Marker assertions (
assert_main_worker,assert_node_mutex, plus_atpatching variants): force-fail tests that should carry a marker but don't. The_atform is a function-patching convenience that injects the assertion at a target call site. - Packaged drivers:
DbConfigimplementations for popular databases and drivers.
Subprocesses inherit the worker suffix and connect to the same per-worker DB.
Standard parallelization
DAGZ provides built-in support for most popular databases. See the full list in Supported drivers.
A project configures a DB integration by calling configure() on the database support object (e.g. PG_CONFIG.configure(...)) with these keyword arguments:
rewrite_db_name(db_name, session) -> str: computes the rerouted name. The defaultdefault_rewrite_db_nameappends the worker suffix.should_reroute(db_name) -> bool: filters which names get rerouted. The defaultdefault_should_rerouteskips the family's system DBs.prepare(db_name): runs before each test against the rerouted DB; typically calls the integration'struncatehelper to empty user tables. PassNoneto skip.worker_init(worker_num)(optional): runs once per worker on its first DB access; typically creates the per-worker DB and runs migrations.create_worker_init([db, ...], **conn_kwargs)is the factory for the standardCREATE DATABASEflow.main_worker_bypass=False,single_worker_disable=False: opt-in flags described under Tests that can't be rerouted and the Reference section.
The config object also exposes helper methods that fixtures and client driver hooks call directly:
bypass(): context manager that suppresses rerouting for one block (admin DBs, schema dumps, cross-DB queries). Backed bycontextvars; safe under threads and async tasks.get_manual_init_db_name(name)/get_manual_init_db_url(url): resolve a name (or URL path) to its per-worker form and mark it as already-prepared, skipping thepreparecallback for the current test.truncate(db_name, **connect_kwargs) -> TruncateResult: empty all user tables; returns whether the DB existed and which tables were truncated. Typically called from insideprepare.
Connection names are rewritten by appending the worker suffix. Worker 3 sees myapp as myapp_jw03.
The following Postgres configure() call sets up per-worker CREATE DATABASE and per-test truncate:
from dagz.integ.psycopg2 import PG_CONFIG, create_worker_init
def _prepare(db_name):
PG_CONFIG.truncate(
db_name, host="localhost", port=5432, user="postgres", password="postgres",
)
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=_prepare,
worker_init=create_worker_init(
["myapp"], host="localhost", port=5432, user="postgres", password="postgres",
),
)
The full parameter list is in the Python API reference.
Tests that can't be rerouted
Two patterns require explicit marking.
Literal DB names
Tests that use a DB name DAGZ can't intercept: SQL string literals, subprocesses connecting outside the instrumented driver, queries against the catalog by literal name. Mark with @pytest.mark.dagz_main_worker. With main_worker_bypass=True in configure(), the main worker skips rerouting while other workers continue normally.
To find tests that need the marker systematically, wrap the at-risk function with assert_main_worker_at in conftest.py; calls without the marker raise a DagzError instead of silently producing wrong results.
Resetting the DB service
Tests that wipe or restart the database server, drop all databases, run FLUSHALL, or otherwise mutate state shared across all workers on the node. Mark with @pytest.mark.dagz_node_mutex; DAGZ schedules them with no other test on the same worker node, while other nodes keep running.
assert_node_mutex_at is the systematic check, same shape as assert_main_worker_at.
Customizing the rerouting policy
Two configure() parameters control which DB names get rewritten and how:
- Naming scheme (
rewrite_db_name): override to match an external convention (Django's_testsuffix, an xdist_gw0scheme, a project-specific layout). - Exclusions (
should_reroute): override to leave specific databases untouched. Common case: shared read-only reference data that every worker should query directly.
The full parameter list is in the Python API reference.
Adding driver or DB support
DAGZ separates extension into two layers:
- Database support (
DbConfigsubclass): defines the rerouting policy for one DB family (suffix scheme, system DBs to skip,truncatehelper). Examples:PostgresConfig(singletonPG_CONFIG),MysqlConfig(MYSQL_CONFIG). - Client driver support: hooks one Python client library and consults the database support. Many client libraries can share one database support (a service layer on
psycopg, an analytics path on ADBC, and a Django ORM onpsycopg2all sharePG_CONFIG).
Adding a client driver hook in conftest.py is the common case; adding a new DB family is rarer.
A client driver hook captures the library's connect function at conftest module load (not in a fixture, so rerouting is active before any collection-time code runs) and replaces it with an override that calls into the appropriate config, typically via maybe_reroute(name) for kwarg-style libraries or maybe_reroute_uri(uri) for URI-style. See driver-level helpers for the full list.
For a DB family DAGZ doesn't cover, subclass DbConfig: a singleton config, a default_should_reroute that knows the family's system DBs, a truncate helper, and the patching for at least one client library. The DAGZ source's pymysql.py is the canonical reference.
Reference
DAGZ's built-in drivers rewrite one parameter each:
| Driver | What gets rewritten |
|---|---|
psycopg (v3) and psycopg2 | dbname parameter |
pymysql and aiomysql | database / db parameter |
cassandra-driver | keyspace |
redis-py (sync + async) | logical DB number (each worker maps to a distinct slot) |
RabbitMQ (pika, aiormq) | virtual host |
System databases are skipped automatically: postgres, information_schema, Cassandra's system* keyspaces, and similar.
By default, rerouting applies regardless of worker count: --dagz-workers=1 still rewrites the DB name. Pass single_worker_disable=True in configure() to unhook entirely when DAGZ runs with one worker.