The previous vignettes introduced the two central ideas of
featdelta:
This workshop focuses on the database side of the package. It shows
how featdelta decides which rows need features, how it
writes computed features back to the database, and how the full pipeline
is orchestrated by fd_run().
The database pipeline is useful when raw observations arrive repeatedly and the feature table should be kept up to date without rebuilding everything by hand. The package is designed around three stages:
fd_fetch() finds raw rows that are missing from the
feature table;fd_compute() computes R feature definitions on those
rows;fd_upsert() inserts or updates the computed features in
the database.Most users will call fd_run(), which combines these
stages. The individual functions are still useful when you want to
inspect, debug, or test one stage at a time.
featdelta uses DBI connections and selects SQL templates
for the database dialect. The supported dialects are:
In this vignette, the runnable examples use an in-memory SQLite database because it does not require credentials or an external server. The same pipeline is intended to work with PostgreSQL and MySQL through DBI-compatible drivers.
# SQLite
con <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
# PostgreSQL
con <- DBI::dbConnect(
RPostgres::Postgres(),
host = "localhost",
dbname = "analytics",
user = "analyst",
password = "secret"
)
# MySQL or MariaDB
con <- DBI::dbConnect(
RMariaDB::MariaDB(),
host = "localhost",
dbname = "analytics",
user = "analyst",
password = "secret"
)Usually the dialect is detected from the connection. If detection is
not possible, you can pass dialect = "sqlite",
dialect = "postgres", or dialect = "mysql" to
fd_run() or fd_upsert().
We will use a small e-commerce order dataset. The data is intentionally simple, but it gives us a realistic database workflow:
library(DBI)
library(RSQLite)
library(featdelta)
con <- dbConnect(SQLite(), ":memory:")
orders <- data.frame(
order_id = 1:7,
customer_id = c(101, 102, 103, 101, 104, 105, 102),
gross_amount = c(120, 250, 80, 310, 45, 520, 160),
discount_amount = c(0, 25, 5, 30, 0, 60, 10),
shipping_fee = c(8, 0, 6, 0, 5, 0, 7),
order_to_ship_days = c(1, 3, 2, 5, 1, 4, 2),
stringsAsFactors = FALSE
)
day_one <- 1:4
day_two <- 5:7
dbWriteTable(
con,
"raw_orders",
orders[day_one, ],
overwrite = TRUE
)
dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
#> order_id customer_id gross_amount discount_amount shipping_fee
#> 1 1 101 120 0 8
#> 2 2 102 250 25 0
#> 3 3 103 80 5 6
#> 4 4 101 310 30 0
#> order_to_ship_days
#> 1 1
#> 2 3
#> 3 2
#> 4 5The raw table contains day-one observations only. The feature table does not exist yet.
featdelta does not require the source data to come from
one table. The source dataset is defined by a SQL query. In real
projects, this query may join several tables, filter to a modelling
population, or select only specific records.
source_sql <- "
SELECT
order_id,
customer_id,
gross_amount,
discount_amount,
shipping_fee,
order_to_ship_days
FROM raw_orders
ORDER BY order_id
"
key <- "order_id"
dbGetQuery(con, source_sql)
#> order_id customer_id gross_amount discount_amount shipping_fee
#> 1 1 101 120 0 8
#> 2 2 102 250 25 0
#> 3 3 103 80 5 6
#> 4 4 101 310 30 0
#> order_to_ship_days
#> 1 1
#> 2 3
#> 3 2
#> 4 5There is one important requirement: the query must return the key
column. The key connects the raw dataset to the feature table. It is how
featdelta knows whether a raw row has already been
processed.
The feature logic is still ordinary R. Here we define a few transformed variables that are more useful for analytics or reporting than the raw columns.
defs <- fd_define(
net_revenue = gross_amount - discount_amount + shipping_fee,
discount_rate = discount_amount / gross_amount,
free_shipping = shipping_fee == 0,
slow_fulfillment = order_to_ship_days > 3
)
defs
#> <featdelta_defs>
#> Definition steps (4):
#> - [column] net_revenue -> gross_amount - discount_amount + shipping_fee
#> - [column] discount_rate -> discount_amount/gross_amount
#> - [column] free_shipping -> shipping_fee == 0
#> - [column] slow_fulfillment -> order_to_ship_days > 3The database pipeline does not require you to write these
transformations in SQL. You can keep them in R, test them with
fd_compute(), and then use the same definitions in the
database pipeline.
On the first run, the feature table is missing. fd_run()
executes the source query, computes the features, creates the feature
table, and inserts the result.
run_day_one <- fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "order_features",
verbose = FALSE
)
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#> order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1 1 128 0.00000000 0 0
#> 2 2 225 0.10000000 1 0
#> 3 3 81 0.06250000 0 0
#> 4 4 280 0.09677419 1 1The feature table is now a database table. Other R scripts, dashboards, model training jobs, or monitoring jobs can read it without rerunning the feature engineering code.
Now add more raw orders. This simulates new observations arriving after the first feature run.
dbAppendTable(con, "raw_orders", orders[day_two, ])
#> [1] 3
dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
#> order_id customer_id gross_amount discount_amount shipping_fee
#> 1 1 101 120 0 8
#> 2 2 102 250 25 0
#> 3 3 103 80 5 6
#> 4 4 101 310 30 0
#> 5 5 104 45 0 5
#> 6 6 105 520 60 0
#> 7 7 102 160 10 7
#> order_to_ship_days
#> 1 1
#> 2 3
#> 3 2
#> 4 5
#> 5 1
#> 6 4
#> 7 2The raw table contains seven orders, but the feature table still contains features for only the first four.
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#> order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1 1 128 0.00000000 0 0
#> 2 2 225 0.10000000 1 0
#> 3 3 81 0.06250000 0 0
#> 4 4 280 0.09677419 1 1When we run the pipeline again with the default
fetch_mode = "new_only", featdelta finds the
raw keys missing from the feature table and computes only those
rows.
run_day_two <- fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "order_features",
verbose = FALSE
)
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#> order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1 1 128 0.00000000 0 0
#> 2 2 225 0.10000000 1 0
#> 3 3 81 0.06250000 0 0
#> 4 4 280 0.09677419 1 1
#> 5 5 50 0.00000000 0 0
#> 6 6 460 0.11538462 1 1
#> 7 7 157 0.06250000 0 0The existing feature rows were not duplicated. Only the missing keys were inserted.
Most of the time, fd_run() is enough. During
development, it can be useful to look at the fetch stage directly.
fd_fetch() returns raw rows whose key is not present in the
feature table.
Let’s demonstrate that with an example: first, add one more raw order.
new_order <- data.frame(
order_id = 8,
customer_id = 106,
gross_amount = 275,
discount_amount = 20,
shipping_fee = 0,
order_to_ship_days = 6
)
dbAppendTable(con, "raw_orders", new_order)
#> [1] 1
new_rows <- fd_fetch(
con = con,
sql = source_sql,
key = key,
feat_table_name = "order_features"
)
new_rows
#> order_id customer_id gross_amount discount_amount shipping_fee
#> 1 8 106 275 20 0
#> order_to_ship_days
#> 1 6This is the row that would be computed on the next incremental run. The fetch result also carries metadata, including the SQL that was executed.
attr(new_rows, "fd_fetch")
#> $key
#> [1] "order_id"
#>
#> $feat_table_name
#> [1] "order_features"
#>
#> $use_max_key
#> [1] FALSE
#>
#> $max_key
#> [1] NA
#>
#> $sql
#> [1] "SELECT\n order_id,\n customer_id,\n gross_amount,\n discount_amount,\n shipping_fee,\n order_to_ship_days\n FROM raw_orders\n ORDER BY order_id"
#>
#> $executed_sql
#> [1] "SELECT r.* FROM (SELECT\n order_id,\n customer_id,\n gross_amount,\n discount_amount,\n shipping_fee,\n order_to_ship_days\n FROM raw_orders\n ORDER BY order_id) AS r LEFT JOIN `order_features` AS f ON r.`order_id` = f.`order_id` WHERE f.`order_id` IS NULL"
#>
#> $n_rows
#> [1] 1This can help when you want to verify that the package is selecting the rows you expect.
fd_upsert() writes a feature data frame into the
database. It is useful when you already have computed features and want
to use only the persistence part of the package.
new_features <- fd_compute(
data = new_rows,
defs = defs,
key = key
)
upsert_report <- fd_upsert(
con = con,
features_df = new_features,
feat_table_name = "order_features",
key = key,
verbose = FALSE
)
upsert_report
#> $feat_table_name
#> [1] "order_features"
#>
#> $key
#> [1] "order_id"
#>
#> $dialect
#> [1] "sqlite"
#>
#> $n_rows
#> [1] 1
#>
#> $n_chunks
#> [1] 1
#>
#> $table_created
#> [1] FALSE
#>
#> $columns_added
#> character(0)
#>
#> $extra_columns
#> character(0)
#>
#> $counts
#> $counts$would_insert
#> [1] 1
#>
#> $counts$would_update
#> [1] 0
#>
#>
#> $chunk_details
#> chunk n would_insert would_update
#> 1 1 1 1 0
#>
#> attr(,"class")
#> [1] "fd_upsert_report"
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#> order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1 1 128 0.00000000 0 0
#> 2 2 225 0.10000000 1 0
#> 3 3 81 0.06250000 0 0
#> 4 4 280 0.09677419 1 1
#> 5 5 50 0.00000000 0 0
#> 6 6 460 0.11538462 1 1
#> 7 7 157 0.06250000 0 0
#> 8 8 255 0.07272727 1 1The upsert report summarizes what the write attempted to do. The
would_insert and would_update counts are
existence-based counts computed before the merge. They tell you how many
incoming keys were new and how many already existed.
Feature tables often evolve. Suppose we add a new feature,
high_value_order. If we run the pipeline with
fetch_mode = "new_only", only new rows would be processed.
Existing rows would not receive the new column values.
When you want to backfill or refresh existing rows, use
fetch_mode = "all".
defs_v2 <- fd_define(
net_revenue = gross_amount - discount_amount + shipping_fee,
discount_rate = discount_amount / gross_amount,
free_shipping = shipping_fee == 0,
slow_fulfillment = order_to_ship_days > 3,
high_value_order = gross_amount >= 250
)
refresh_report <- fd_run(
con = con,
sql = source_sql,
defs = defs_v2,
key = key,
feat_table_name = "order_features",
fetch_mode = "all",
verbose = FALSE
)
dbListFields(con, "order_features")
#> [1] "order_id" "net_revenue" "discount_rate" "free_shipping"
#> [5] "slow_fulfillment" "high_value_order"
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
#> order_id net_revenue discount_rate free_shipping slow_fulfillment
#> 1 1 128 0.00000000 0 0
#> 2 2 225 0.10000000 1 0
#> 3 3 81 0.06250000 0 0
#> 4 4 280 0.09677419 1 1
#> 5 5 50 0.00000000 0 0
#> 6 6 460 0.11538462 1 1
#> 7 7 157 0.06250000 0 0
#> 8 8 255 0.07272727 1 1
#> high_value_order
#> 1 0
#> 2 1
#> 3 0
#> 4 1
#> 5 0
#> 6 1
#> 7 0
#> 8 1With the default alter_table = TRUE,
fd_upsert() adds missing feature columns to the target
table. This schema evolution is intentionally conservative: columns can
be added, but columns are not dropped, renamed, or type-changed
automatically.
The default create_table = "auto" creates the feature
table when it does not exist and uses the existing table when it does.
This is convenient for ordinary incremental pipelines.
If you want to be stricter, use:
# Error if the table does not already exist
fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "order_features",
create_table = FALSE
)
# Error if new feature columns are missing from the existing table
fd_run(
con = con,
sql = source_sql,
defs = defs_v2,
key = key,
feat_table_name = "order_features",
alter_table = FALSE
)This is useful in production environments where table creation or schema changes must be handled through a separate approval process.
The most important database-pipeline decision is the fetch mode.
Use fetch_mode = "new_only" for ordinary incremental
processing. This mode uses the feature table to identify keys that have
not yet been processed. It is the default.
Use fetch_mode = "all" when existing feature rows should
be recomputed. Common reasons include:
fetch_mode = "new_only" is key-based. It does not know
whether your feature logic changed. If the definitions changed and
existing rows should be updated, use
fetch_mode = "all".
By default, fd_upsert() updates existing keys and
inserts new keys. This is usually what you want for refreshes and
backfills.
If you want a stricter insert-only workflow, set
update_table = FALSE. In that mode, any incoming key that
already exists in the feature table is treated as a conflict.
fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "order_features",
fetch_mode = "all",
update_table = FALSE
)This can be useful when your pipeline is intended only to append new feature rows and you want accidental refreshes to fail loudly.
For larger feature frames, use chunk_size to write the
data in batches. This can reduce memory and packet-size pressure on the
database connection.
fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "order_features",
chunk_size = 5000
)Chunking does not change the feature logic. It only changes how computed rows are staged and merged into the target table.
SQLite is convenient for local examples. In production, many users will connect to PostgreSQL, MySQL, or MariaDB.
For PostgreSQL and MySQL, you can use schema-qualified feature table names when the driver supports them through DBI identifiers:
fd_run(
con = con,
sql = "SELECT * FROM raw_schema.orders",
defs = defs,
key = "order_id",
feat_table_name = "feature_schema.order_features",
dialect = "postgres"
)
fd_run(
con = con,
sql = "SELECT * FROM raw_schema.orders",
defs = defs,
key = "order_id",
feat_table_name = "feature_schema.order_features",
dialect = "mysql"
)Existing target tables used with update_table = TRUE
must have a primary key or unique constraint on the key column. Tables
created by fd_upsert() include that primary key
automatically.
If your DBI connection is correctly identified, you usually do not
need the dialect argument. It is available as an explicit
override when automatic detection is not sufficient.
The database table is the main result of the pipeline, but the run report is useful for checking what happened.
names(refresh_report)
#> [1] "success" "stage" "started_at" "finished_at"
#> [5] "time_sec" "key" "feat_table_name" "dialect"
#> [9] "sql" "fetch" "compute" "upsert"
#> [13] "preview" "data" "error"
refresh_report$fetch
#> $mode
#> [1] "all"
#>
#> $source
#> [1] "all"
#>
#> $table_exists
#> [1] TRUE
#>
#> $use_max_key
#> [1] FALSE
#>
#> $limit
#> NULL
#>
#> $limit_applied
#> [1] FALSE
#>
#> $n_rows_before_limit
#> [1] 8
#>
#> $n_rows
#> [1] 8
#>
#> $fd_fetch
#> NULL
refresh_report$compute$feature_names
#> [1] "net_revenue" "discount_rate" "free_shipping" "slow_fulfillment"
#> [5] "high_value_order"
refresh_report$upsert$counts
#> $would_insert
#> [1] 0
#>
#> $would_update
#> [1] 8
refresh_report$upsert$columns_added
#> [1] "high_value_order"The report is especially useful in development and monitoring. It shows which stage ran, how many rows were fetched and computed, and what the upsert stage planned to insert or update.
The database pipeline exists to keep a feature table current while letting the feature logic stay in R.
Use fd_run() for the regular end-to-end workflow. Use
fd_fetch() when you want to inspect which raw rows still
need features. Use fd_upsert() when you already have a
feature data frame and want to persist it safely.
For daily or repeated processing, start with the default
fetch_mode = "new_only". When feature definitions change
and old rows need to be refreshed, switch to
fetch_mode = "all". For production databases, make sure the
feature table has a stable key and choose the dialect that matches the
DBI connection: SQLite, PostgreSQL, or MySQL/MariaDB.