---
title: "Database pipeline workshop"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Database pipeline workshop}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
```

## Purpose of this workshop

The previous vignettes introduced the two central ideas of `featdelta`:

1. feature logic can be written and organized in R;
2. the computed features can be stored in a database table and refreshed over
   time.

This workshop focuses on the database side of the package. It shows how
`featdelta` decides which rows need features, how it writes computed features
back to the database, and how the full pipeline is orchestrated by `fd_run()`.

The database pipeline is useful when raw observations arrive repeatedly and the
feature table should be kept up to date without rebuilding everything by hand.
The package is designed around three stages:

1. `fd_fetch()` finds raw rows that are missing from the feature table;
2. `fd_compute()` computes R feature definitions on those rows;
3. `fd_upsert()` inserts or updates the computed features in the database.

Most users will call `fd_run()`, which combines these stages. The individual
functions are still useful when you want to inspect, debug, or test one stage at
a time.

## Supported database dialects

`featdelta` uses DBI connections and selects SQL templates for the database
dialect. The supported dialects are:

* SQLite;
* PostgreSQL;
* MySQL or MariaDB.

In this vignette, the runnable examples use an in-memory SQLite database because
it does not require credentials or an external server. The same pipeline is
intended to work with PostgreSQL and MySQL through DBI-compatible drivers.

```{r, eval = FALSE}
# SQLite
con <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")

# PostgreSQL
con <- DBI::dbConnect(
  RPostgres::Postgres(),
  host = "localhost",
  dbname = "analytics",
  user = "analyst",
  password = "secret"
)

# MySQL or MariaDB
con <- DBI::dbConnect(
  RMariaDB::MariaDB(),
  host = "localhost",
  dbname = "analytics",
  user = "analyst",
  password = "secret"
)
```

Usually the dialect is detected from the connection. If detection is not
possible, you can pass `dialect = "sqlite"`, `dialect = "postgres"`, or
`dialect = "mysql"` to `fd_run()` or `fd_upsert()`.

## Workshop database

We will use a small e-commerce order dataset. The data is intentionally simple,
but it gives us a realistic database workflow:

1. raw orders arrive in a table;
2. R definitions calculate transformed features;
3. features are written to a separate feature table;
4. new raw orders are processed incrementally.

```{r}
library(DBI)
library(RSQLite)
library(featdelta)

con <- dbConnect(SQLite(), ":memory:")

orders <- data.frame(
  order_id = 1:7,
  customer_id = c(101, 102, 103, 101, 104, 105, 102),
  gross_amount = c(120, 250, 80, 310, 45, 520, 160),
  discount_amount = c(0, 25, 5, 30, 0, 60, 10),
  shipping_fee = c(8, 0, 6, 0, 5, 0, 7),
  order_to_ship_days = c(1, 3, 2, 5, 1, 4, 2),
  stringsAsFactors = FALSE
)

day_one <- 1:4
day_two <- 5:7

dbWriteTable(
  con,
  "raw_orders",
  orders[day_one, ],
  overwrite = TRUE
)

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
```

The raw table contains day-one observations only. The feature table does not
exist yet.

## Define the source query

`featdelta` does not require the source data to come from one table. The source
dataset is defined by a SQL query. In real projects, this query may join several
tables, filter to a modelling population, or select only specific records.

```{r}
source_sql <- "
  SELECT
    order_id,
    customer_id,
    gross_amount,
    discount_amount,
    shipping_fee,
    order_to_ship_days
  FROM raw_orders
  ORDER BY order_id
"

key <- "order_id"

dbGetQuery(con, source_sql)
```

There is one important requirement: the query must return the key column. The
key connects the raw dataset to the feature table. It is how `featdelta` knows
whether a raw row has already been processed.

## Define features in R

The feature logic is still ordinary R. Here we define a few transformed
variables that are more useful for analytics or reporting than the raw columns.

```{r}
defs <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3
)

defs
```

The database pipeline does not require you to write these transformations in
SQL. You can keep them in R, test them with `fd_compute()`, and then use the
same definitions in the database pipeline.

## First run: create and populate the feature table

On the first run, the feature table is missing. `fd_run()` executes the source
query, computes the features, creates the feature table, and inserts the result.

```{r}
run_day_one <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
```

The feature table is now a database table. Other R scripts, dashboards, model
training jobs, or monitoring jobs can read it without rerunning the feature
engineering code.

## Second run: fetch only rows that are missing features

Now add more raw orders. This simulates new observations arriving after
the first feature run.

```{r}
dbAppendTable(con, "raw_orders", orders[day_two, ])

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")
```

The raw table contains seven orders, but the feature table still contains
features for only the first four.

```{r}
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
```

When we run the pipeline again with the default `fetch_mode = "new_only"`,
`featdelta` finds the raw keys missing from the feature table and computes only
those rows.

```{r}
run_day_two <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
```

The existing feature rows were not duplicated. Only the missing keys were
inserted.

## Inspect fd_fetch() directly

Most of the time, `fd_run()` is enough. During development, it can be useful to
look at the fetch stage directly. `fd_fetch()` returns raw rows whose key is not
present in the feature table.

Let's demonstrate that with an example: first, add one more raw order.

```{r}
new_order <- data.frame(
  order_id = 8,
  customer_id = 106,
  gross_amount = 275,
  discount_amount = 20,
  shipping_fee = 0,
  order_to_ship_days = 6
)

dbAppendTable(con, "raw_orders", new_order)

new_rows <- fd_fetch(
  con = con,
  sql = source_sql,
  key = key,
  feat_table_name = "order_features"
)

new_rows
```

This is the row that would be computed on the next incremental run. The fetch
result also carries metadata, including the SQL that was executed.

```{r}
attr(new_rows, "fd_fetch")
```

This can help when you want to verify that the package is selecting the rows you
expect.

## Inspect fd_upsert() directly

`fd_upsert()` writes a feature data frame into the database. It is useful when
you already have computed features and want to use only the persistence part of
the package.

```{r}
new_features <- fd_compute(
  data = new_rows,
  defs = defs,
  key = key
)

upsert_report <- fd_upsert(
  con = con,
  features_df = new_features,
  feat_table_name = "order_features",
  key = key,
  verbose = FALSE
)

upsert_report

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
```

The upsert report summarizes what the write attempted to do. The `would_insert`
and `would_update` counts are existence-based counts computed before the merge.
They tell you how many incoming keys were new and how many already existed.

## Add a new feature column

Feature tables often evolve. Suppose we add a new feature, `high_value_order`.
If we run the pipeline with `fetch_mode = "new_only"`, only new rows would be
processed. Existing rows would not receive the new column values.

When you want to backfill or refresh existing rows, use `fetch_mode = "all"`.

```{r}
defs_v2 <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3,
  high_value_order = gross_amount >= 250
)

refresh_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "order_features",
  fetch_mode = "all",
  verbose = FALSE
)

dbListFields(con, "order_features")

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")
```

With the default `alter_table = TRUE`, `fd_upsert()` adds missing feature
columns to the target table. This schema evolution is intentionally conservative:
columns can be added, but columns are not dropped, renamed, or type-changed
automatically.

## Understand create_table and alter_table

The default `create_table = "auto"` creates the feature table when it does not
exist and uses the existing table when it does. This is convenient for ordinary
incremental pipelines.

If you want to be stricter, use:

```{r, eval = FALSE}
# Error if the table does not already exist
fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  create_table = FALSE
)

# Error if new feature columns are missing from the existing table
fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "order_features",
  alter_table = FALSE
)
```

This is useful in production environments where table creation or schema changes
must be handled through a separate approval process.

## Choose between new_only and all

The most important database-pipeline decision is the fetch mode.

Use `fetch_mode = "new_only"` for ordinary incremental processing. This mode
uses the feature table to identify keys that have not yet been processed. It is
the default.

Use `fetch_mode = "all"` when existing feature rows should be recomputed. Common
reasons include:

1. you changed a feature definition;
2. you added a new feature and want old rows to receive values;
3. you are performing an explicit backfill;
4. you want to rebuild a feature table from the current source query.

`fetch_mode = "new_only"` is key-based. It does not know whether your feature
logic changed. If the definitions changed and existing rows should be updated,
use `fetch_mode = "all"`.

## Insert-only mode

By default, `fd_upsert()` updates existing keys and inserts new keys. This is
usually what you want for refreshes and backfills.

If you want a stricter insert-only workflow, set `update_table = FALSE`. In that
mode, any incoming key that already exists in the feature table is treated as a
conflict.

```{r, eval = FALSE}
fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  fetch_mode = "all",
  update_table = FALSE
)
```

This can be useful when your pipeline is intended only to append new feature
rows and you want accidental refreshes to fail loudly.

## Process large writes in chunks

For larger feature frames, use `chunk_size` to write the data in batches. This
can reduce memory and packet-size pressure on the database connection.

```{r, eval = FALSE}
fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  chunk_size = 5000
)
```

Chunking does not change the feature logic. It only changes how computed rows
are staged and merged into the target table.

## Dialect notes for PostgreSQL and MySQL

SQLite is convenient for local examples. In production, many users will connect
to PostgreSQL, MySQL, or MariaDB.

For PostgreSQL and MySQL, you can use schema-qualified feature table names when
the driver supports them through DBI identifiers:

```{r, eval = FALSE}
fd_run(
  con = con,
  sql = "SELECT * FROM raw_schema.orders",
  defs = defs,
  key = "order_id",
  feat_table_name = "feature_schema.order_features",
  dialect = "postgres"
)

fd_run(
  con = con,
  sql = "SELECT * FROM raw_schema.orders",
  defs = defs,
  key = "order_id",
  feat_table_name = "feature_schema.order_features",
  dialect = "mysql"
)
```

Existing target tables used with `update_table = TRUE` must have a primary key
or unique constraint on the key column. Tables created by `fd_upsert()` include
that primary key automatically.

If your DBI connection is correctly identified, you usually do not need the
`dialect` argument. It is available as an explicit override when automatic
detection is not sufficient.

## Read the run report

The database table is the main result of the pipeline, but the run report is
useful for checking what happened.

```{r}
names(refresh_report)

refresh_report$fetch

refresh_report$compute$feature_names

refresh_report$upsert$counts

refresh_report$upsert$columns_added
```

The report is especially useful in development and monitoring. It shows which
stage ran, how many rows were fetched and computed, and what the upsert stage
planned to insert or update.

## What to remember

The database pipeline exists to keep a feature table current while letting the
feature logic stay in R.

Use `fd_run()` for the regular end-to-end workflow. Use `fd_fetch()` when you
want to inspect which raw rows still need features. Use `fd_upsert()` when you
already have a feature data frame and want to persist it safely.

For daily or repeated processing, start with the default
`fetch_mode = "new_only"`. When feature definitions change and old rows need to
be refreshed, switch to `fetch_mode = "all"`. For production databases, make
sure the feature table has a stable key and choose the dialect that matches the
DBI connection: SQLite, PostgreSQL, or MySQL/MariaDB.

```{r cleanup, include = FALSE}
dbDisconnect(con)
```
