Integration Playbook
Hand the raw markdown to your coding agent and let it run the integration.
A walkthrough for bringing an existing pytest suite onto DAGZ parallel execution with a real database, aimed at first-time integrators. The companion reference is Parallelizing DB Tests; this page is the sequence of steps and the diagnostics you reach for when a step does not work.
What you should already have
- A pytest suite that runs serially against one or more real databases.
- A
conftest.pyat the repo root. - The DB services running locally or in CI. Postgres, MySQL, Cassandra, Redis, and RabbitMQ are supported.
- The
zbCLI installed (curl -LsSf https://dagz.run/install.sh | bash) and the project initialized (zb init). - The
dagzPython package installed in the project's virtual environment (uv pip install dagzorpip install dagz). This package contains the pytest plugin (--dagz-workersflag) and thedagz.integ.*driver helpers used below. Without it, none of the integration code in this guide is importable.
Step 1: wire up the integration in conftest.py
In the repo root conftest.py, configure each driver your suite uses. Wrap the imports in try / except ImportError so the file still loads when DAGZ is not installed (serial pytest runs, onprem deployments without DAGZ).
try:
from dagz.integ.psycopg import PG_CONFIG
from dagz.integ.redis import REDIS_CONFIG
# ...one import per driver you use...
except ImportError:
pass
else:
PG_CONFIG.configure(
rewrite_db_name=PG_CONFIG.default_rewrite_db_name,
should_reroute=PG_CONFIG.default_should_reroute,
prepare=None,
)
REDIS_CONFIG.configure(
rewrite_db_name=REDIS_CONFIG.default_rewrite_db_name,
)
Two things to notice:
- The block runs at module top level, not inside a fixture. Connections opened during pytest collection (parametrize expressions, factory imports) cannot be rerouted: the worker context does not exist yet, so DAGZ has no per-worker suffix to apply. With
configure()already in place at module load, DAGZ logs a warning the first time this happens:Accessing dagz.integ.<driver> DB during import: <db_name>(seedagz.integ.db_config.DbConfig.default_rewrite_db_name). Treat that warning as a signal that an init step is running too early; move it into a session-scoped fixture or DAGZ'sworker_initcallback so it runs inside the worker context. prepare=Nonemeans DAGZ does not run a per-test cleanup callback. PassNoneif your suite already cleans state between tests, or rolls back transactions, or your existing per-worker init creates fresh state. Add apreparelater only if you measure that you need it.
:::info Configure every driver you import
When DAGZ is active and one of its supported drivers is imported (psycopg, psycopg2, pymysql, aiomysql, cassandra-driver, redis-py, pika, aiormq), DAGZ auto-installs a connection-patching hook on that driver. If you import the driver but never call <DRIVER>_CONFIG.configure(...), the first connection raises:
RuntimeError: Connection attempted in multi-worker dagz without configuration.
Call dagz.integ.<driver>.configure() in conftest.py.
The check sits in should_reroute() (dagz.integ.db_config.DbConfig.check_configured), called from the patched connect of every supported driver. The error fails the test immediately rather than letting two workers silently write to the same DB. If a driver appears anywhere in your test stack, configure it; if you do not need rerouting for a particular driver, configure it with should_reroute=lambda db_name: False.
:::
Step 2: do not duplicate per-worker setup
Before writing any new per-worker setup, list what your project already does on first DB access. Many suites already have one of the following, and DAGZ either reuses it or does not need it:
- Django suites with a custom
parallel_test_suite.init_worker. DAGZ invokes that classmethod on every worker after fork. If it sets per-worker DB names, runs migrations, or seeds reference data, the same code runs under DAGZ with the names DAGZ rewrote. - Suites with an autouse session fixture that creates schema or loads fixtures once per process. These also run inside each DAGZ worker process.
- Suites that create the DB schema in a
conftestpytest_sessionstarthook. Same: DAGZ workers each runpytest_sessionstart.
DAGZ's own worker_init parameter is for projects that have no existing hook. If you do have one, do not move its work into worker_init; the duplication drifts. Pass worker_init=None and let the existing code run.
The full call chain inside a DAGZ worker, after fork:
dagz.pytest.worker.Worker.__init__
-> project per-worker init (Django parallel_test_suite, etc.)
-> dagz_worker_init_callbacks (registered via dagz.sensor.callbacks.add_worker_init)
-> dagz config worker_init callbacks (configured per-driver)
-> first DB access of each test triggers the per-test prepare callback (if any)
Step 3: verify with a small slice
Pick 5 to 10 test files that touch the database, then run them under DAGZ at a worker count higher than 1:
pytest --dagz-workers=4 \
path/to/test_a.py \
path/to/test_b.py \
path/to/test_c.py
A few outcomes are normal at first:
- Some tests fail under contention and pass on retry. DAGZ's flake detection handles this; check the run summary.
- Some tests fail on a clean DB only (they assumed seeded data left by an earlier test). These are real isolation bugs surfaced by parallelism. Fix them or mark them.
- Some tests use literal DB names that bypass the driver hook. Mark them with
@pytest.mark.dagz_main_worker(see Tests that can't be rerouted).
Step 4: diagnose a failing or hanging run
Two CLI tools cover almost everything:
zb jobs # list recent runs with timing and status
zb logs <id> # full streamed output from a single run
zb logs includes per-worker stderr, fork and init lifecycle messages, every DB driver call if the driver logs at debug, and the scheduler's per-worker progress. When a worker appears idle, search the logs for that worker's thread tag (jw0.tNNN) to see what it was doing last.
Common patterns:
| Symptom | Where to look | Common cause |
|---|---|---|
Workers idle, scheduler shows unfinished=N not changing | zb logs <id> filtered to the worker thread | A test makes external network calls without a timeout, or a per-worker init blocks on a service that is not running. |
| Tests pass alone, fail under load | Run again with --dagz-workers=1 | Real isolation bug, or DB resource limits hit (heap, connection pool, file handles). |
'dict' object has no attribute X | The exception trace | The session has a custom row factory (e.g., dict_factory); your code paths assume the default Row tuple. |
... already exists / Cannot drop ... on first run | The DDL call in your per-worker init | The DB or keyspace did not exist yet. Wrap creation in IF NOT EXISTS or catch the existing-object error. |
| Many transient timeouts under load | Both DAGZ logs and the DB server's logs | Concurrency too high for the DB server's resources. Lower --dagz-workers or raise the server's heap or connection limits. |
| Two workers writing to the same DB | Worker init logs for the rewritten DB name | Per-worker name was not actually rewritten; check that configure() ran at module top level, not inside a fixture. |
If a single test misbehaves only under parallel load, run it alone with --dagz-workers=1 to confirm whether the test or the parallel setup is at fault.
Step 5: optimize the per-test cleanup path
Once the suite passes, look at where time goes per test. Cleanup that runs between tests (truncate, redis flush, key deletions) sits on the hot path: it runs as many times as you have tests, so per-call cost compounds.
A common pattern in legacy code is "list tables, SELECT COUNT(*) per table, truncate only the non-empty ones." This made sense when TRUNCATE was expensive, but on modern Postgres, MySQL, and Cassandra, TRUNCATE on an empty table is cheap and the COUNT round trips dominate. Each driver integration ships a faster cleanup helper:
| Driver | Helper |
|---|---|
| Postgres | PG_CONFIG.truncate(db_name, **connect_kwargs) |
| MySQL | MYSQL_CONFIG.truncate(db_name, **connect_kwargs) |
| Cassandra | CASSANDRA_CONFIG.truncate_session(session, keyspace) |
You can swap a project's existing helper without touching the original function. Append a try / except ImportError / else: block at the bottom of the file:
# Purely additive. The original function above stays intact for environments
# without DAGZ.
try:
from dagz.integ.cassandra import CASSANDRA_CONFIG
except ImportError:
pass
else:
def cassandra_truncate_tables():
session = connection.get_session()
session.set_keyspace(settings.MY_KEYSPACE)
CASSANDRA_CONFIG.truncate_session(session, settings.MY_KEYSPACE)
Python rebinds the name at module load. Other modules that do from project.utils import cassandra_truncate_tables get the rebound version. Without DAGZ the original function still wins.
The same shape applies to any cleanup helper: flushdb for Redis, a generic truncate_all_tables for Postgres, and so on.
Keeping the diff small
A small diff against the existing test suite is easier to review, easier to roll back, and easier to land in a vendored or onprem fork.
Three guidelines:
- Reuse what exists. If a per-worker init hook already runs setup, do not move it into a DAGZ callback. The duplication drifts.
- Override, do not rewrite. A
try: from dagz.integ.X import Y / else:block at the bottom of a file can replace one helper without touching the original. The diff is N additive lines; the original function is the fallback. - Move generic logic upstream. If you find yourself adding the same retry, the same concurrency cap, or the same skip-empty optimization to your project, that logic belongs in
dagz.integ.<driver>. Open an issue or a PR; the next consumer benefits, and your local diff shrinks.