Skip to main content

Parallelizing DB Tests

When tests that access a database run in parallel, their operations could collide on the same tables: one test truncating while another reads, race conditions on auto-increment IDs, deadlocks under load. The usual fix is to modify tests to use some contextual info (like a worker ID) and modify connection parameters.

DAGZ does this automatically. It intercepts DB connection calls and reroutes every connection to a per-worker database, with no changes to your application code.

This is an opt-in feature, so you control exactly how rerouting should work in your test suite. You can customize the naming scheme, which databases get rerouted, and per-test cleanup logic.

DAGZ supports Postgres, MySQL, Cassandra, Redis, and RabbitMQ, with more drivers in planning. Adding a new driver is straightforward, and you can add your own integration if your database isn't supported yet.

How it works

Each DAGZ worker is assigned a numeric ID and a suffix (_jw03 for worker 3). When your app calls psycopg.connect(dbname="myapp"), DAGZ rewrites the name to myapp_jw03 before the call reaches the driver:

worker 0 → myapp_jw00
worker 1 → myapp_jw01
worker 2 → myapp_jw02
...

The rewrite happens at the driver level by instrumenting DB driver code. Your code, your ORM, your fixtures, and your migrations require no modification.

If your test spawns a subprocess that opens its own connections, it inherits the worker's suffix and connects to the same per-worker DB.

Standard Setup

In your conftest.py, configure each driver you use. configure() requires you to choose explicitly: pass the bundled default_* implementations for the typical case, or substitute your own.

try:
from dagz.integ.psycopg import PG_CONFIG
from dagz.integ.pymysql import MYSQL_CONFIG
except ImportError:
pass
else:
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
)
MYSQL_CONFIG.configure(
rewrite_db_name=MYSQL_CONFIG.default_rewrite_db_name,
should_reroute=MYSQL_CONFIG.default_should_reroute,
prepare=None,
)

The setup runs at conftest module load, not inside a session-scoped fixture. This is deliberate: a session fixture only runs after collection, by which point fixture parameter resolution, parametrize() expressions, or other collection-time code may have already opened a DB connection. Running configure() at module load makes rerouting active before any of that.

Customize rerouting policy

The full parameter list for configure() is in the Python API reference.

Override rewrite_db_name to match an external naming convention: Django's _test suffix, an existing xdist _gw0 scheme, or any project-specific layout.

Override should_reroute to leave specific databases untouched. Common case: shared read-only reference data (geo lookups, config tables) that all workers should query directly.

Per-test preparation

Tests within a worker share the same per-worker DB, so each test needs to start from a known state. The prepare(db_name) callback runs once per test, before the first connection of that test, against the rerouted DB. Use it to bring the DB to a clean baseline: truncate tables, reset sequences, drop temporary objects, or load fixture data.

For the common case of clearing all user tables, each integration exposes a truncate(db_name, **connect_kwargs) method on its config object. It connects to the named DB and empties every user table. Call it from prepare:

def _prepare(db_name):
PG_CONFIG.truncate(
db_name, host="localhost", port=5432, user="postgres", password="postgres",
)

PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=_prepare,
)

If your tests already wrap themselves in a transaction that rolls back, no preparation is needed: pass prepare=None.

For tests that need different prep than the global callback (or want to take over and skip it), see "Manual preparation" below.

Manual preparation

A test (or its fixtures) can prepare its own DB and tell the framework not to run the auto-prepare callback for that test. This is per-test: the marker is cleared before every test, so other tests still get auto-prepare as configured.

get_manual_init_db_name(name) is the entry point. It does two things:

  1. Resolves the literal DB name to its per-worker form through the configured policy. get_manual_init_db_name("myapp") returns myapp_jw00 on worker 0, myapp_jw01 on worker 1, etc. Always use this rather than hard-coding the suffix; it picks up any custom rewrite_db_name you have set.
  2. Marks the resolved name as already initialized for the current test, so the auto-prepare callback skips it on the first connection of this test.

Typical use: a fixture that pre-loads a fixture dataset and does not want the global truncate-everything prepare to wipe it.

@pytest.fixture
def populated_db():
db = PG_CONFIG.get_manual_init_db_name("myapp")
with psycopg.connect(dbname=db) as conn:
# Do whatever this test needs; auto-prepare will not run for this test.
_seed_reference_data(conn)
yield conn

The mechanism: _current_inited is a per-test set of "already prepared" DB names. get_manual_init_db_name adds to it. _prepare_db checks it before invoking the auto-prepare callback. DAGZ clears the set at the start of every test (a test_prerun hook), so manual init does not carry across tests.

For URL-based connection strings, the parallel get_manual_init_db_url(url) parses the URL, rewrites the path component, and reassembles. Same per-test skip semantics.

If prepare=None (no auto-prepare configured), get_manual_init_db_name is just a name resolver: it returns the per-worker name and adds it to a set the framework will not consult.

Worker initialization

worker_init(worker_num) runs once per worker, on the first DB access of that worker. Use it for setup that is expensive to repeat per test: running migrations, loading reference data, warming caches.

def _migrate_worker(worker_num):
db = f"myapp_jw{worker_num:02}"
subprocess.run(["alembic", "-x", f"db={db}", "upgrade", "head"], check=True)

PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
worker_init=_migrate_worker,
)

Worker init runs after worker DBs exist. If you create them in a setup fixture, that fixture must complete before any test runs.

Bypassing rerouting

When code needs to connect using a literal DB name and not a rerouted one, wrap it in the bypass() context manager. Setup code that talks to admin databases is the most common case, but the same pattern applies to schema dumps, cross-DB queries during a test, and any other path that already targets the real name.

from dagz.integ.psycopg import PG_CONFIG
import psycopg

with PG_CONFIG.bypass():
conn = psycopg.connect(dbname="postgres") # admin, not rerouted
conn.execute("SELECT datname FROM pg_database")

bypass() is backed by contextvars, so it is thread-safe and async-task-local.

Special cases

Most tests fit the standard model: each worker gets a private database, and tests use the rerouted name transparently. Two patterns do not fit and need special-case scheduling.

Tests that can't be rerouted

Some tests may use literal DB names that DAGZ cannot intercept. Examples:

  • The SQL contains the database name as a string literal: SELECT * FROM other_db.users.
  • The test invokes a subprocess (psql, mysqldump, a service binary) that connects without going through the instrumented driver.
  • The test reads or writes the catalog using the literal name (pg_database, information_schema.SCHEMATA).

For these tests, set main_worker_bypass=True in configure() and mark each affected test with @pytest.mark.dagz_main_worker:

PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
main_worker_bypass=True,
)

@pytest.mark.dagz_main_worker
def test_cross_db_query(...):
...

main_worker_bypass=True makes the main worker skip all rerouting, so the DB name stays as written. dagz_main_worker schedules the test on the main worker. Other workers continue to reroute normally, so the rest of the suite still parallelizes.

Tests that reset the DB service

A test that wipes or restarts the database server, drops all databases, runs FLUSHALL, or otherwise affects state shared across all worker DBs on the node. While such a test runs, no other test on the same worker node can safely connect.

Mark these tests with @pytest.mark.dagz_node_mutex:

@pytest.mark.dagz_node_mutex
def test_full_db_reset(...):
...

DAGZ schedules a dagz_node_mutex test with no other test running on the same worker node. Other nodes in a multi-node run keep working in parallel.

Single-worker runs

By default, rerouting applies regardless of worker count. A --dagz-workers=1 run still rewrites myapp to myapp_jw00. This keeps the routing behavior consistent across worker counts: a passing test at --dagz-workers=4 runs against the same DB names at --dagz-workers=1.

Pass single_worker_disable=True to opt into a different behavior: when DAGZ is running with one worker, the integration unhooks entirely. Connections go straight to the driver, no rewrite, no overhead. Use this when you want a serial test run to behave identically to one without DAGZ.

Supported drivers

By default, DAGZ reroutes:

DriverWhat gets rewritten
psycopg (v3) and psycopg2dbname parameter
pymysql and aiomysqldatabase / db parameter
cassandra-driverkeyspace
redis-py (sync + async)logical DB number (each worker maps to a distinct slot)
RabbitMQ (pika, aiormq)virtual host

System databases are skipped automatically: postgres, information_schema, Cassandra's system* keyspaces, and similar.

Adding a new database or driver

DAGZ separates two concerns:

  • Database support owns the rerouting policy for one DB family. It defines the suffix scheme, lists the system DBs to skip, and exposes the truncate utility. One per family. Concrete examples: PostgresConfig (singleton PG_CONFIG), MysqlConfig (singleton MYSQL_CONFIG).
  • Client driver support hooks one specific Python client library (psycopg, psycopg2, pymysql, aiomysql, adbc_driver_postgresql, ...) and consults the database support to decide what to rewrite. Many client drivers can share one database support.

This split matters when a project uses more than one client library against the same DB, which is common: a service layer on psycopg, an analytics path on ADBC, a Django ORM on psycopg2, all hitting the same Postgres. Each library needs its own hook, but they all share one rerouting policy through PG_CONFIG. Configure once, every client routes consistently.

Both pieces are extension points you can add in your own code (typically conftest.py). The policy configs and helper APIs are public; you do not need DAGZ to ship support for every client library or DB family your project uses. You typically only need to add client driver support: the DB family is already covered by DAGZ, but a new client library needs hooking. Adding database support is rarer, needed only when DAGZ does not yet support that family.

Adding a client driver in your conftest

In your conftest, import your target library, capture its connect function, and replace it with an override that consults an existing config (e.g. PG_CONFIG).

For example, adbc_driver_postgresql.dbapi.connect(uri) takes a URI as its first positional argument. The library reaches the same Postgres server as psycopg, so it shares PG_CONFIG:

try:
from dagz.integ.psycopg import PG_CONFIG
from adbc_driver_postgresql import dbapi as _adbc_dbapi
except ImportError:
pass
else:
_orig_adbc_connect = _adbc_dbapi.connect

def _adbc_override_connect(uri, *args, **kwargs):
uri = PG_CONFIG.maybe_reroute_uri(uri)
return _orig_adbc_connect(uri, *args, **kwargs)

_adbc_dbapi.connect = _adbc_override_connect

ADBC connections now route through PG_CONFIG's policy alongside psycopg and psycopg2. The patch sits at conftest module load, not in a fixture, for the same reason as the main configure() block: it must be in place before any collection-time DB code runs.

Client libraries expose the dbname differently. The config provides helpers for the common cases (maybe_reroute(name) for kwarg-style, maybe_reroute_uri(uri) for URI-style); see driver-level helpers for the full list.

Adding a new database family

For a DB family DAGZ does not yet support, define a DbConfig subclass in your own code. The subclass owns the policy: a singleton config (e.g. MYDB_CONFIG), an implementation of default_should_reroute that knows the family's system DBs, a truncate utility, and the patching for at least one client library.

The DAGZ source's pymysql.py is the canonical reference: it bundles MySQL database support with the pymysql client driver. aiomysql.py then adds aiomysql client support on top of MYSQL_CONFIG. Both follow the same pattern your project would.