Skip to main content

Parallelizing DB tests

DAGZ rewrites the DB name in each driver-level connect call so every worker reaches a distinct database. Calling code is unchanged.

DAGZ does not manage databases. An integration sets up its database support by calling configure() and supplying lifecycle callbacks that invoke the project's existing DB tooling at fixed points in the test run.

Overview

DAGZ provides utilities for parallelizing tests:

  • DB family support (DbConfig): abstract class defining a DB-agnostic model for rerouting.
  • Per-test prepare (prepare(db_name)): callback that runs before each test on the rerouted DB.
  • Per-worker setup (worker_init(worker_num)): callback that runs once per worker on its first DB access.
  • Manual init (get_manual_init_db_name(name)): resolves to the per-worker form and marks it skip-prepare for the current test.
  • Bypass (bypass()): context manager that suppresses rerouting.
  • Main-worker marker (@pytest.mark.dagz_main_worker): schedules a test on the main worker, which can be configured to bypass rerouting per DB.
  • Node-mutex marker (@pytest.mark.dagz_node_mutex): schedules a test alone on its worker node.
  • Marker assertions (assert_main_worker, assert_node_mutex, plus _at patching variants): force-fail tests that should carry a marker but don't. The _at form is a function-patching convenience that injects the assertion at a target call site.
  • Packaged drivers: DbConfig implementations for popular databases and drivers.

Subprocesses inherit the worker suffix and connect to the same per-worker DB.

Standard parallelization

DAGZ provides built-in support for most popular databases. See the full list in Supported drivers.

A project configures a DB integration by calling configure() on the database support object (e.g. PG_CONFIG.configure(...)) with these keyword arguments:

  • rewrite_db_name(db_name, session) -> str: computes the rerouted name. The default default_rewrite_db_name appends the worker suffix.
  • should_reroute(db_name) -> bool: filters which names get rerouted. The default default_should_reroute skips the family's system DBs.
  • prepare(db_name): runs before each test against the rerouted DB; typically calls the integration's truncate helper to empty user tables. Pass None to skip.
  • worker_init(worker_num) (optional): runs once per worker on its first DB access; typically creates the per-worker DB and runs migrations. create_worker_init([db, ...], **conn_kwargs) is the factory for the standard CREATE DATABASE flow.
  • main_worker_bypass=False, single_worker_disable=False: opt-in flags described under Tests that can't be rerouted and the Reference section.

The config object also exposes helper methods that fixtures and client driver hooks call directly:

  • bypass(): context manager that suppresses rerouting for one block (admin DBs, schema dumps, cross-DB queries). Backed by contextvars; safe under threads and async tasks.
  • get_manual_init_db_name(name) / get_manual_init_db_url(url): resolve a name (or URL path) to its per-worker form and mark it as already-prepared, skipping the prepare callback for the current test.
  • truncate(db_name, **connect_kwargs) -> TruncateResult: empty all user tables; returns whether the DB existed and which tables were truncated. Typically called from inside prepare.

Connection names are rewritten by appending the worker suffix. Worker 3 sees myapp as myapp_jw03.

The following Postgres configure() call sets up per-worker CREATE DATABASE and per-test truncate:

from dagz.integ.psycopg2 import PG_CONFIG, create_worker_init

def _prepare(db_name):
PG_CONFIG.truncate(
db_name, host="localhost", port=5432, user="postgres", password="postgres",
)

PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=_prepare,
worker_init=create_worker_init(
["myapp"], host="localhost", port=5432, user="postgres", password="postgres",
),
)

The full parameter list is in the Python API reference.

Tests that can't be rerouted

Two patterns require explicit marking.

Literal DB names

Tests that use a DB name DAGZ can't intercept: SQL string literals, subprocesses connecting outside the instrumented driver, queries against the catalog by literal name. Mark with @pytest.mark.dagz_main_worker. With main_worker_bypass=True in configure(), the main worker skips rerouting while other workers continue normally.

To find tests that need the marker systematically, wrap the at-risk function with assert_main_worker_at in conftest.py; calls without the marker raise a DagzError instead of silently producing wrong results.

Resetting the DB service

Tests that wipe or restart the database server, drop all databases, run FLUSHALL, or otherwise mutate state shared across all workers on the node. Mark with @pytest.mark.dagz_node_mutex; DAGZ schedules them with no other test on the same worker node, while other nodes keep running.

assert_node_mutex_at is the systematic check, same shape as assert_main_worker_at.

Customizing the rerouting policy

Two configure() parameters control which DB names get rewritten and how:

  • Naming scheme (rewrite_db_name): override to match an external convention (Django's _test suffix, an xdist _gw0 scheme, a project-specific layout).
  • Exclusions (should_reroute): override to leave specific databases untouched. Common case: shared read-only reference data that every worker should query directly.

The full parameter list is in the Python API reference.

Adding driver or DB support

DAGZ separates extension into two layers:

  • Database support (DbConfig subclass): defines the rerouting policy for one DB family (suffix scheme, system DBs to skip, truncate helper). Examples: PostgresConfig (singleton PG_CONFIG), MysqlConfig (MYSQL_CONFIG).
  • Client driver support: hooks one Python client library and consults the database support. Many client libraries can share one database support (a service layer on psycopg, an analytics path on ADBC, and a Django ORM on psycopg2 all share PG_CONFIG).

Adding a client driver hook in conftest.py is the common case; adding a new DB family is rarer.

A client driver hook captures the library's connect function at conftest module load (not in a fixture, so rerouting is active before any collection-time code runs) and replaces it with an override that calls into the appropriate config, typically via maybe_reroute(name) for kwarg-style libraries or maybe_reroute_uri(uri) for URI-style. See driver-level helpers for the full list.

For a DB family DAGZ doesn't cover, subclass DbConfig: a singleton config, a default_should_reroute that knows the family's system DBs, a truncate helper, and the patching for at least one client library. The DAGZ source's pymysql.py is the canonical reference.

Reference

DAGZ's built-in drivers rewrite one parameter each:

DriverWhat gets rewritten
psycopg (v3) and psycopg2dbname parameter
pymysql and aiomysqldatabase / db parameter
cassandra-driverkeyspace
redis-py (sync + async)logical DB number (each worker maps to a distinct slot)
RabbitMQ (pika, aiormq)virtual host

System databases are skipped automatically: postgres, information_schema, Cassandra's system* keyspaces, and similar.

By default, rerouting applies regardless of worker count: --dagz-workers=1 still rewrites the DB name. Pass single_worker_disable=True in configure() to unhook entirely when DAGZ runs with one worker.