When iterating on cohort definitions, you often change one part of a definition and re-run. Without caching, the entire pipeline re-executes from scratch: concept set expansion, primary events, qualified events, inclusion rules, and final cohort construction – even for the parts that haven’t changed.
atlasCohortGenerator implements incremental DAG
caching: a system that persists intermediate computation tables
in the database and skips recomputation of unchanged nodes. This is the
same principle behind build systems like Make and Bazel –
content-addressable caching with automatic invalidation via Merkle-tree
hashing.
This vignette explains:
Each cohort definition is decomposed into a directed acyclic graph (DAG) of typed computation nodes:
concept_set (CS)
|
primary_events (PE)
|
qualified_events (QE)
|
inclusion_rule (IR) [0..N]
|
included_events (IE)
|
cohort_exit (CE)
|
final_cohort (FC)
Each node is identified by a content hash – a deterministic 16-character hex string derived from the node’s definition. Two nodes with the same definition produce the same hash, regardless of which cohort they came from.
The critical property that makes caching safe is Merkle-tree hashing: each node’s hash incorporates the hashes of its dependencies, not just its own definition.
For example, a qualified_events node’s hash is computed
from:
primary_events nodeThis means if you change a concept set, the change propagates up through the entire DAG:
CS (concept_id=123) --> hash: a1b2c3d4...
PE (uses CS hash) --> hash: e5f6a7b8...
QE (uses PE hash) --> hash: 1234abcd...
...
CS (concept_id=999) --> hash: ff00ee11... <-- changed
PE (uses CS hash) --> hash: 2233aabb... <-- also changed (different dep hash)
QE (uses PE hash) --> hash: ccdd4455... <-- also changed
...
No explicit invalidation logic is needed. A change anywhere automatically produces different hashes for all downstream nodes.
The cache registry is a database table
(dag_cache_registry) that maps node hashes to materialized
table names:
| Column | Description |
|---|---|
node_hash |
16-char hex content hash (primary key) |
node_type |
Node type (primary_events,
qualified_events, etc.) |
table_name |
Fully qualified table name of the materialized result |
created_at |
When the node was first materialized |
last_used_at |
Last time the node was accessed (for GC) |
cohort_ids |
Comma-separated list of cohort IDs using this node |
The registry is created automatically the first time you use
cache = TRUE.
When caching is enabled, all node tables use a fixed
prefix (dagcache_) instead of the random
atlas_<uuid>_ prefix used for ephemeral runs. This
means the same computation always maps to the same table name:
dagcache_pe_a1b2c3d4 – primary events node with hash
starting a1b2c3d4dagcache_qe_e5f6a7b8 – qualified events node with hash
starting e5f6a7b8This determinism is what allows cache lookups to work across separate sessions.
When you run with cache = TRUE, the execution proceeds
as follows:
# First run: all nodes computed from scratch
cdm <- generateCohortSet2(cdm, cohortSet, name = "my_cohorts", cache = TRUE)
#> DAG cache: 0 hits, 12 misses (12 nodes to compute)
# Modify one cohort's inclusion rule and re-run:
cdm <- generateCohortSet2(cdm, cohortSet_v2, name = "my_cohorts", cache = TRUE)
#> DAG cache: 8 hits, 4 misses (4 nodes to compute)On the second run, only the nodes affected by the change are recomputed. Shared upstream nodes (concept sets, primary events) that haven’t changed are reused from the cache.
You can also use caching at the SQL generation level:
con <- DBI::dbConnect(...)
result <- atlas_json_to_sql_batch(
json_inputs = cohortSet,
cdm_schema = "cdm",
results_schema = "results",
target_dialect = "duckdb",
cache = TRUE,
con = con,
resolved_schema = "results"
)
# result$sql -- SQL string (only computes cache misses)
# result$cache_hits -- character vector of skipped node hashes
# result$cache_misses -- character vector of nodes to compute
# result$dag -- the full DAG objectThese are the intermediate computation tables that are expensive to compute and whose results depend only on their content hash:
| Node Type | Table Pattern | Description |
|---|---|---|
primary_events |
dagcache_pe_<hash> |
Events matching primary criteria |
qualified_events |
dagcache_qe_<hash> |
Events after additional criteria + limit |
inclusion_rule |
dagcache_ir_<hash> |
Per-rule matching person/event pairs |
included_events |
dagcache_ie_<hash> |
Events surviving all inclusion rules |
cohort_exit |
dagcache_ce_<hash> |
Cohort end dates |
These are either cheap to rebuild, specific to a single run, or inherently transient:
| Table | Why Not Cached |
|---|---|
dagcache_codesets |
Fast to rebuild; shared across all nodes |
dagcache_all_concepts |
Derived from codesets |
| Domain filtered tables | Depend on the full set of concepts in the current batch |
| Staging tables | Transient accumulation tables |
dagcache_fc_<hash> |
Final cohort inserts into staging; table itself is dropped |
Auxiliary tables (_ie, _se,
_cr) |
Intermediate join tables within a node |
Concept sets are handled via a single global
dagcache_codesets table that contains all unique concept
set expressions assigned global IDs. This table is rebuilt each run
because:
However, concept set hashes are still used in the Merkle tree – they affect downstream node hashes, ensuring correctness.
Over time, as cohort definitions evolve, old cached tables become orphaned – they’re no longer referenced by any current cohort definition. The garbage collector removes these:
# Remove entries not used in the last 30 days
dag_cache_gc(con, schema = "results", max_age_days = 30)
# Preview what would be removed (dry run)
dag_cache_gc(con, schema = "results", max_age_days = 7, dry_run = TRUE)
# Remove everything
dag_cache_gc(con, schema = "results", max_age_days = 0)The GC also detects orphaned entries – registry rows whose backing table has been dropped externally – and cleans those up regardless of age.
The caching system provides these guarantees:
Content-addressed: Two computations with identical inputs always produce the same hash. There are no false cache hits from stale data.
Merkle-tree propagation: Changing any upstream definition (concept set, criteria, observation window, etc.) produces a different hash for every downstream node. This is tested directly:
Physical validation: Before declaring a cache hit, the system verifies that the backing table still exists in the database. If someone drops a cached table externally, it will be recomputed on the next run.
Immutability: Cached tables are never modified after creation. The hash is a promise that the table contents match the definition.
No cross-contamination: The
final_cohort node includes cohort_id in its
hash, so two cohorts with identical logic but different IDs produce
separate final cohort nodes (which are not cached anyway – they insert
into staging and are dropped).
Caching is most beneficial when:
Caching adds minimal overhead (registry lookups are fast) and can be
disabled at any time by omitting cache = TRUE.
# Day 1: Generate 3 cohorts
cohortSet_v1 <- data.frame(
cohort_definition_id = c(1, 2, 3),
cohort = c(json_diabetes, json_hypertension, json_ckd)
)
cdm <- generateCohortSet2(cdm, cohortSet_v1, "cohorts", cache = TRUE)
#> DAG cache: 0 hits, 18 misses (18 nodes to compute)
# Day 2: Replace the CKD definition, keep diabetes and hypertension
cohortSet_v2 <- data.frame(
cohort_definition_id = c(1, 2, 3),
cohort = c(json_diabetes, json_hypertension, json_ckd_v2)
)
cdm <- generateCohortSet2(cdm, cohortSet_v2, "cohorts", cache = TRUE)
#> DAG cache: 12 hits, 6 misses (6 nodes to compute)
# Diabetes and hypertension nodes reused; only CKD nodes recomputed
# Day 3: Add a 4th cohort
cohortSet_v3 <- data.frame(
cohort_definition_id = c(1, 2, 3, 4),
cohort = c(json_diabetes, json_hypertension, json_ckd_v2, json_stroke)
)
cdm <- generateCohortSet2(cdm, cohortSet_v3, "cohorts", cache = TRUE)
#> DAG cache: 18 hits, 6 misses (6 nodes to compute)
# All 3 existing cohorts reused; only stroke computed
# Inspect what's cached
dag_cache_stats(con, "results")
#> $total_entries
#> [1] 24
# Clean up old entries
dag_cache_gc(con, "results", max_age_days = 90)