Incremental DAG Caching for Cohort Generation

Overview

When iterating on cohort definitions, you often change one part of a definition and re-run. Without caching, the entire pipeline re-executes from scratch: concept set expansion, primary events, qualified events, inclusion rules, and final cohort construction – even for the parts that haven’t changed.

atlasCohortGenerator implements incremental DAG caching: a system that persists intermediate computation tables in the database and skips recomputation of unchanged nodes. This is the same principle behind build systems like Make and Bazel – content-addressable caching with automatic invalidation via Merkle-tree hashing.

This vignette explains:

  1. How caching works (Merkle-tree hashing, the registry, and cache-aware execution)
  2. How to enable caching in your workflow
  3. What gets cached vs. what’s always recomputed
  4. Cache management (inspection, garbage collection, clearing)
  5. Correctness guarantees

How It Works

The Execution DAG

Each cohort definition is decomposed into a directed acyclic graph (DAG) of typed computation nodes:

concept_set (CS)
       |
primary_events (PE)
       |
qualified_events (QE)
       |
inclusion_rule (IR) [0..N]
       |
included_events (IE)
       |
cohort_exit (CE)
       |
final_cohort (FC)

Each node is identified by a content hash – a deterministic 16-character hex string derived from the node’s definition. Two nodes with the same definition produce the same hash, regardless of which cohort they came from.

Merkle-Tree Hashing

The critical property that makes caching safe is Merkle-tree hashing: each node’s hash incorporates the hashes of its dependencies, not just its own definition.

For example, a qualified_events node’s hash is computed from:

This means if you change a concept set, the change propagates up through the entire DAG:

CS (concept_id=123) --> hash: a1b2c3d4...
  PE (uses CS hash) --> hash: e5f6a7b8...
    QE (uses PE hash) --> hash: 1234abcd...
      ...

CS (concept_id=999) --> hash: ff00ee11...   <-- changed
  PE (uses CS hash) --> hash: 2233aabb...   <-- also changed (different dep hash)
    QE (uses PE hash) --> hash: ccdd4455... <-- also changed
      ...

No explicit invalidation logic is needed. A change anywhere automatically produces different hashes for all downstream nodes.

The Cache Registry

The cache registry is a database table (dag_cache_registry) that maps node hashes to materialized table names:

Column Description
node_hash 16-char hex content hash (primary key)
node_type Node type (primary_events, qualified_events, etc.)
table_name Fully qualified table name of the materialized result
created_at When the node was first materialized
last_used_at Last time the node was accessed (for GC)
cohort_ids Comma-separated list of cohort IDs using this node

The registry is created automatically the first time you use cache = TRUE.

Stable Table Naming

When caching is enabled, all node tables use a fixed prefix (dagcache_) instead of the random atlas_<uuid>_ prefix used for ephemeral runs. This means the same computation always maps to the same table name:

This determinism is what allows cache lookups to work across separate sessions.

Cache-Aware Execution

When you run with cache = TRUE, the execution proceeds as follows:

  1. Build the DAG from cohort definitions (same as non-cached mode)
  2. Compute content hashes for all nodes (Merkle-tree, bottom-up)
  3. Query the registry for each node hash
  4. Validate that cached tables still physically exist in the database
  5. Skip nodes that are valid cache hits
  6. Execute SQL for cache misses only
  7. Register newly computed nodes in the registry
  8. Clean up ephemeral tables (staging, domain-filtered), but preserve cached node tables

Usage

Basic Usage

# First run: all nodes computed from scratch
cdm <- generateCohortSet2(cdm, cohortSet, name = "my_cohorts", cache = TRUE)
#> DAG cache: 0 hits, 12 misses (12 nodes to compute)

# Modify one cohort's inclusion rule and re-run:
cdm <- generateCohortSet2(cdm, cohortSet_v2, name = "my_cohorts", cache = TRUE)
#> DAG cache: 8 hits, 4 misses (4 nodes to compute)

On the second run, only the nodes affected by the change are recomputed. Shared upstream nodes (concept sets, primary events) that haven’t changed are reused from the cache.

SQL-Only Usage

You can also use caching at the SQL generation level:

con <- DBI::dbConnect(...)

result <- atlas_json_to_sql_batch(
  json_inputs = cohortSet,
  cdm_schema = "cdm",
  results_schema = "results",
  target_dialect = "duckdb",
  cache = TRUE,
  con = con,
  resolved_schema = "results"
)

# result$sql         -- SQL string (only computes cache misses)
# result$cache_hits  -- character vector of skipped node hashes
# result$cache_misses -- character vector of nodes to compute
# result$dag         -- the full DAG object

What Gets Cached vs. Not

Cached (persistent across runs)

These are the intermediate computation tables that are expensive to compute and whose results depend only on their content hash:

Node Type Table Pattern Description
primary_events dagcache_pe_<hash> Events matching primary criteria
qualified_events dagcache_qe_<hash> Events after additional criteria + limit
inclusion_rule dagcache_ir_<hash> Per-rule matching person/event pairs
included_events dagcache_ie_<hash> Events surviving all inclusion rules
cohort_exit dagcache_ce_<hash> Cohort end dates

Not Cached (rebuilt every run)

These are either cheap to rebuild, specific to a single run, or inherently transient:

Table Why Not Cached
dagcache_codesets Fast to rebuild; shared across all nodes
dagcache_all_concepts Derived from codesets
Domain filtered tables Depend on the full set of concepts in the current batch
Staging tables Transient accumulation tables
dagcache_fc_<hash> Final cohort inserts into staging; table itself is dropped
Auxiliary tables (_ie, _se, _cr) Intermediate join tables within a node

Why Concept Sets Aren’t Individually Cached

Concept sets are handled via a single global dagcache_codesets table that contains all unique concept set expressions assigned global IDs. This table is rebuilt each run because:

  1. It’s cheap (just vocabulary lookups)
  2. It must contain exactly the concept sets needed by the current batch
  3. Its structure (single table with all sets) doesn’t fit the one-table-per-node caching model

However, concept set hashes are still used in the Merkle tree – they affect downstream node hashes, ensuring correctness.

Cache Management

Inspecting the Cache

# List all cached entries
dag_cache_list(con, schema = "results")

# Get summary statistics
dag_cache_stats(con, schema = "results")
#> $total_entries
#> [1] 15
#>
#> $by_type
#> primary_events qualified_events  inclusion_rule  included_events
#>              4                4               3                2
#>    cohort_exit
#>              2

Garbage Collection

Over time, as cohort definitions evolve, old cached tables become orphaned – they’re no longer referenced by any current cohort definition. The garbage collector removes these:

# Remove entries not used in the last 30 days
dag_cache_gc(con, schema = "results", max_age_days = 30)

# Preview what would be removed (dry run)
dag_cache_gc(con, schema = "results", max_age_days = 7, dry_run = TRUE)

# Remove everything
dag_cache_gc(con, schema = "results", max_age_days = 0)

The GC also detects orphaned entries – registry rows whose backing table has been dropped externally – and cleans those up regardless of age.

Clearing the Cache

To remove all cached tables and start fresh:

dag_cache_clear(con, schema = "results")
#> Cleared 15 cache entries.

Correctness Guarantees

The caching system provides these guarantees:

  1. Content-addressed: Two computations with identical inputs always produce the same hash. There are no false cache hits from stale data.

  2. Merkle-tree propagation: Changing any upstream definition (concept set, criteria, observation window, etc.) produces a different hash for every downstream node. This is tested directly:

    # Changing concept_id from 123 to 999 changes ALL downstream hashes
    dag_a <- build_execution_dag(cohort_a, ...)
    dag_b <- build_execution_dag(cohort_b, ...)
    # PE, QE, IE, CE, FC hashes all differ between dag_a and dag_b
  3. Physical validation: Before declaring a cache hit, the system verifies that the backing table still exists in the database. If someone drops a cached table externally, it will be recomputed on the next run.

  4. Immutability: Cached tables are never modified after creation. The hash is a promise that the table contents match the definition.

  5. No cross-contamination: The final_cohort node includes cohort_id in its hash, so two cohorts with identical logic but different IDs produce separate final cohort nodes (which are not cached anyway – they insert into staging and are dropped).

When to Use Caching

Caching is most beneficial when:

Caching adds minimal overhead (registry lookups are fast) and can be disabled at any time by omitting cache = TRUE.

Example: Incremental Update

# Day 1: Generate 3 cohorts
cohortSet_v1 <- data.frame(
  cohort_definition_id = c(1, 2, 3),
  cohort = c(json_diabetes, json_hypertension, json_ckd)
)
cdm <- generateCohortSet2(cdm, cohortSet_v1, "cohorts", cache = TRUE)
#> DAG cache: 0 hits, 18 misses (18 nodes to compute)

# Day 2: Replace the CKD definition, keep diabetes and hypertension
cohortSet_v2 <- data.frame(
  cohort_definition_id = c(1, 2, 3),
  cohort = c(json_diabetes, json_hypertension, json_ckd_v2)
)
cdm <- generateCohortSet2(cdm, cohortSet_v2, "cohorts", cache = TRUE)
#> DAG cache: 12 hits, 6 misses (6 nodes to compute)
# Diabetes and hypertension nodes reused; only CKD nodes recomputed

# Day 3: Add a 4th cohort
cohortSet_v3 <- data.frame(
  cohort_definition_id = c(1, 2, 3, 4),
  cohort = c(json_diabetes, json_hypertension, json_ckd_v2, json_stroke)
)
cdm <- generateCohortSet2(cdm, cohortSet_v3, "cohorts", cache = TRUE)
#> DAG cache: 18 hits, 6 misses (6 nodes to compute)
# All 3 existing cohorts reused; only stroke computed

# Inspect what's cached
dag_cache_stats(con, "results")
#> $total_entries
#> [1] 24

# Clean up old entries
dag_cache_gc(con, "results", max_age_days = 90)