Parallelizing DB Tests
When tests that access a database run in parallel, their operations could collide on the same tables: one test truncating while another reads, race conditions on auto-increment IDs, deadlocks under load. The usual fix is to modify tests to use some contextual info (like a worker ID) and modify connection parameters.
DAGZ does this automatically. It intercepts DB connection calls and reroutes every connection to a per-worker database, with no changes to your application code.
This is an opt-in feature, so you control exactly how rerouting should work in your test suite. You can customize the naming scheme, which databases get rerouted, and per-test cleanup logic.
DAGZ supports Postgres, MySQL, Cassandra, Redis, and RabbitMQ, with more drivers in planning. Adding a new driver is straightforward, and you can add your own integration if your database isn't supported yet.
How it works
Each DAGZ worker is assigned a numeric ID and a suffix (_jw03 for worker 3).
When your app calls psycopg.connect(dbname="myapp"), DAGZ rewrites the name to myapp_jw03 before the call reaches the driver:
worker 0 → myapp_jw00
worker 1 → myapp_jw01
worker 2 → myapp_jw02
...
The rewrite happens at the driver level by instrumenting DB driver code. Your code, your ORM, your fixtures, and your migrations require no modification.
If your test spawns a subprocess that opens its own connections, it inherits the worker's suffix and connects to the same per-worker DB.
Standard Setup
In your conftest.py, configure each driver you use.
configure() requires you to choose explicitly: pass the bundled default_* implementations for the typical case, or substitute your own.
try:
from dagz.integ.psycopg import PG_CONFIG
from dagz.integ.pymysql import MYSQL_CONFIG
except ImportError:
pass
else:
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
)
MYSQL_CONFIG.configure(
rewrite_db_name=MYSQL_CONFIG.default_rewrite_db_name,
should_reroute=MYSQL_CONFIG.default_should_reroute,
prepare=None,
)
The setup runs at conftest module load, not inside a session-scoped fixture.
This is deliberate: a session fixture only runs after collection, by which point fixture parameter resolution, parametrize() expressions, or other collection-time code may have already opened a DB connection.
Running configure() at module load makes rerouting active before any of that.
Customize rerouting policy
The full parameter list for configure() is in the Python API reference.
Override rewrite_db_name to match an external naming convention: Django's _test suffix, an existing xdist _gw0 scheme, or any project-specific layout.
Override should_reroute to leave specific databases untouched.
Common case: shared read-only reference data (geo lookups, config tables) that all workers should query directly.
Per-test preparation
Tests within a worker share the same per-worker DB, so each test needs to start from a known state.
The prepare(db_name) callback runs once per test, before the first connection of that test, against the rerouted DB.
Use it to bring the DB to a clean baseline: truncate tables, reset sequences, drop temporary objects, or load fixture data.
For the common case of clearing all user tables, each integration exposes a truncate(db_name, **connect_kwargs) method on its config object.
It connects to the named DB and empties every user table.
Call it from prepare:
def _prepare(db_name):
PG_CONFIG.truncate(
db_name, host="localhost", port=5432, user="postgres", password="postgres",
)
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=_prepare,
)
If your tests already wrap themselves in a transaction that rolls back, no preparation is needed: pass prepare=None.
For tests that need different prep than the global callback (or want to take over and skip it), see "Manual preparation" below.
Manual preparation
A test (or its fixtures) can prepare its own DB and tell the framework not to run the auto-prepare callback for that test. This is per-test: the marker is cleared before every test, so other tests still get auto-prepare as configured.
get_manual_init_db_name(name) is the entry point. It does two things:
- Resolves the literal DB name to its per-worker form through the configured policy.
get_manual_init_db_name("myapp")returnsmyapp_jw00on worker 0,myapp_jw01on worker 1, etc. Always use this rather than hard-coding the suffix; it picks up any customrewrite_db_nameyou have set. - Marks the resolved name as already initialized for the current test, so the auto-
preparecallback skips it on the first connection of this test.
Typical use: a fixture that pre-loads a fixture dataset and does not want the global truncate-everything prepare to wipe it.
@pytest.fixture
def populated_db():
db = PG_CONFIG.get_manual_init_db_name("myapp")
with psycopg.connect(dbname=db) as conn:
# Do whatever this test needs; auto-prepare will not run for this test.
_seed_reference_data(conn)
yield conn
The mechanism: _current_inited is a per-test set of "already prepared" DB names. get_manual_init_db_name adds to it. _prepare_db checks it before invoking the auto-prepare callback. DAGZ clears the set at the start of every test (a test_prerun hook), so manual init does not carry across tests.
For URL-based connection strings, the parallel get_manual_init_db_url(url) parses the URL, rewrites the path component, and reassembles. Same per-test skip semantics.
If prepare=None (no auto-prepare configured), get_manual_init_db_name is just a name resolver: it returns the per-worker name and adds it to a set the framework will not consult.
Worker initialization
worker_init(worker_num) runs once per worker, on the first DB access of that worker.
Use it for setup that is expensive to repeat per test: running migrations, loading reference data, warming caches.
def _migrate_worker(worker_num):
db = f"myapp_jw{worker_num:02}"
subprocess.run(["alembic", "-x", f"db={db}", "upgrade", "head"], check=True)
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
worker_init=_migrate_worker,
)
Worker init runs after worker DBs exist. If you create them in a setup fixture, that fixture must complete before any test runs.
Bypassing rerouting
When code needs to connect using a literal DB name and not a rerouted one, wrap it in the bypass() context manager.
Setup code that talks to admin databases is the most common case, but the same pattern applies to schema dumps, cross-DB queries during a test, and any other path that already targets the real name.
from dagz.integ.psycopg import PG_CONFIG
import psycopg
with PG_CONFIG.bypass():
conn = psycopg.connect(dbname="postgres") # admin, not rerouted
conn.execute("SELECT datname FROM pg_database")
bypass() is backed by contextvars, so it is thread-safe and async-task-local.
Special cases
Most tests fit the standard model: each worker gets a private database, and tests use the rerouted name transparently. Two patterns do not fit and need special-case scheduling.
Tests that can't be rerouted
Some tests may use literal DB names that DAGZ cannot intercept. Examples:
- The SQL contains the database name as a string literal:
SELECT * FROM other_db.users. - The test invokes a subprocess (
psql,mysqldump, a service binary) that connects without going through the instrumented driver. - The test reads or writes the catalog using the literal name (
pg_database,information_schema.SCHEMATA).
For these tests, set main_worker_bypass=True in configure() and mark each affected test with @pytest.mark.dagz_main_worker:
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
main_worker_bypass=True,
)
@pytest.mark.dagz_main_worker
def test_cross_db_query(...):
...
main_worker_bypass=True makes the main worker skip all rerouting, so the DB name stays as written.
dagz_main_worker schedules the test on the main worker.
Other workers continue to reroute normally, so the rest of the suite still parallelizes.
Tests that reset the DB service
A test that wipes or restarts the database server, drops all databases, runs FLUSHALL, or otherwise affects state shared across all worker DBs on the node.
While such a test runs, no other test on the same worker node can safely connect.
Mark these tests with @pytest.mark.dagz_node_mutex:
@pytest.mark.dagz_node_mutex
def test_full_db_reset(...):
...
DAGZ schedules a dagz_node_mutex test with no other test running on the same worker node.
Other nodes in a multi-node run keep working in parallel.
Single-worker runs
By default, rerouting applies regardless of worker count.
A --dagz-workers=1 run still rewrites myapp to myapp_jw00.
This keeps the routing behavior consistent across worker counts: a passing test at --dagz-workers=4 runs against the same DB names at --dagz-workers=1.
Pass single_worker_disable=True to opt into a different behavior: when DAGZ is running with one worker, the integration unhooks entirely.
Connections go straight to the driver, no rewrite, no overhead.
Use this when you want a serial test run to behave identically to one without DAGZ.
Supported drivers
By default, DAGZ reroutes:
| Driver | What gets rewritten |
|---|---|
psycopg (v3) and psycopg2 | dbname parameter |
pymysql and aiomysql | database / db parameter |
cassandra-driver | keyspace |
redis-py (sync + async) | logical DB number (each worker maps to a distinct slot) |
RabbitMQ (pika, aiormq) | virtual host |
System databases are skipped automatically: postgres, information_schema, Cassandra's system* keyspaces, and similar.
Adding a new database or driver
DAGZ separates two concerns:
- Database support owns the rerouting policy for one DB family. It defines the suffix scheme, lists the system DBs to skip, and exposes the
truncateutility. One per family. Concrete examples:PostgresConfig(singletonPG_CONFIG),MysqlConfig(singletonMYSQL_CONFIG). - Client driver support hooks one specific Python client library (
psycopg,psycopg2,pymysql,aiomysql,adbc_driver_postgresql, ...) and consults the database support to decide what to rewrite. Many client drivers can share one database support.
This split matters when a project uses more than one client library against the same DB, which is common: a service layer on psycopg, an analytics path on ADBC, a Django ORM on psycopg2, all hitting the same Postgres. Each library needs its own hook, but they all share one rerouting policy through PG_CONFIG. Configure once, every client routes consistently.
Both pieces are extension points you can add in your own code (typically conftest.py). The policy configs and helper APIs are public; you do not need DAGZ to ship support for every client library or DB family your project uses. You typically only need to add client driver support: the DB family is already covered by DAGZ, but a new client library needs hooking. Adding database support is rarer, needed only when DAGZ does not yet support that family.
Adding a client driver in your conftest
In your conftest, import your target library, capture its connect function, and replace it with an override that consults an existing config (e.g. PG_CONFIG).
For example, adbc_driver_postgresql.dbapi.connect(uri) takes a URI as its first positional argument. The library reaches the same Postgres server as psycopg, so it shares PG_CONFIG:
try:
from dagz.integ.psycopg import PG_CONFIG
from adbc_driver_postgresql import dbapi as _adbc_dbapi
except ImportError:
pass
else:
_orig_adbc_connect = _adbc_dbapi.connect
def _adbc_override_connect(uri, *args, **kwargs):
uri = PG_CONFIG.maybe_reroute_uri(uri)
return _orig_adbc_connect(uri, *args, **kwargs)
_adbc_dbapi.connect = _adbc_override_connect
ADBC connections now route through PG_CONFIG's policy alongside psycopg and psycopg2. The patch sits at conftest module load, not in a fixture, for the same reason as the main configure() block: it must be in place before any collection-time DB code runs.
Client libraries expose the dbname differently. The config provides helpers for the common cases (maybe_reroute(name) for kwarg-style, maybe_reroute_uri(uri) for URI-style); see driver-level helpers for the full list.
Adding a new database family
For a DB family DAGZ does not yet support, define a DbConfig subclass in your own code. The subclass owns the policy: a singleton config (e.g. MYDB_CONFIG), an implementation of default_should_reroute that knows the family's system DBs, a truncate utility, and the patching for at least one client library.
The DAGZ source's pymysql.py is the canonical reference: it bundles MySQL database support with the pymysql client driver. aiomysql.py then adds aiomysql client support on top of MYSQL_CONFIG. Both follow the same pattern your project would.