Data processing via parquet and duckdb

Einar Hjörleifsson

… this is work in progress

… and I am not a programming engineer

Feel like I have been given a key to a Ferrari without having any knowledge of what is under the hood.

Our real world is a mess

What are parquet files

  • Platform and program agnostic (R, Python, duckdb, …) data files.
  • Binary, fast read, preserve type.
  • Can be partitioned (e.g. by type, year, fleet, …)
  • Column orientated rather than row based (disk and memory?)

Think of them as csv on steroids, kind of like .RData but accessible by multiple programs.

What is duckdb

  • Fast, reliable, portable, and easy to use in-process SQL database
  • Can be stuffed with tables or linked to files on disk/cloud on-the-fly

Think of it as your own personal database, like sqlite but on steroids. No cumbersome database setup/management - besides your files.

Has (some) spatial capabilities.

Where does R come into it?

library(dplyr)                      # your code
library(duckdb)                     # the database
library(duckdbfs)                   # wrapper for connecting to files, loading spatial libraries
library(ggplot2)
trail <- open_dataset("data/ais/trail")
trail |> select(.rid:speed) |> glimpse()
Rows: ??
Columns: 7
Database: DuckDB 1.4.0 [einarhj@Linux 6.16.9-200.fc42.x86_64:R 4.5.1/:memory:]
$ .rid  <int> 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, …
$ vid   <int> 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, …
$ mmsi  <chr> NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, …
$ time  <dttm> 2007-06-04 14:13:39, 2007-06-04 14:24:10, 2007-06-04 14:35:08, …
$ lon   <dbl> -21.33652, -21.32895, -21.35017, -21.40607, -21.46295, -21.51467…
$ lat   <dbl> 63.85507, 63.83912, 63.81995, 63.81123, 63.80352, 63.79633, 63.7…
$ speed <dbl> 6.6, 5.2, 8.8, 11.8, 9.3, 8.9, 9.8, 11.0, 10.6, 8.6, 7.2, 10.3, …

What does the file system look like?

fs::dir_tree("data/ais/trail")
data/ais/trail
├── year=2007
│   └── part-0.parquet
├── year=2008
│   └── part-0.parquet
├── year=2009
│   └── part-0.parquet
├── year=2010
│   └── part-0.parquet
├── year=2011
│   └── part-0.parquet
├── year=2012
│   └── part-0.parquet
├── year=2013
│   └── part-0.parquet
├── year=2014
│   └── part-0.parquet
├── year=2015
│   └── part-0.parquet
├── year=2016
│   └── part-0.parquet
├── year=2017
│   └── part-0.parquet
├── year=2018
│   └── part-0.parquet
├── year=2019
│   └── part-0.parquet
├── year=2020
│   └── part-0.parquet
├── year=2021
│   └── part-0.parquet
├── year=2022
│   └── part-0.parquet
├── year=2023
│   └── part-0.parquet
├── year=2024
│   └── part-0.parquet
└── year=2025
    └── part-0.parquet

What one can do?

Example: Spatial grid of plaice catch at 0.01 x 0.005 degree resolution

# Use (almost) any {dplyr}-verbs, automatically translated to SQL
dx <- 0.01; dy = dx / 2    # my grid, this is were we are going
catch <- open_dataset("data/logbooks/catch-for-ais.parquet")
q <- 
  trail |> 
  filter(.cid > 0,                       # points out of harbour (trips)
         whack == FALSE,                 # no whackies
         between(lon, -28, -20),         # zoom into an area of interest
         between(lat, 65, 67.5),
         between(speed, s1, s2),         # metier 4 speed filter
         between(time,  t1, t2)) |>      # event filter
  mutate(x = lon %/% dx * dx + dx/2,     # My poor man's gridding machine
         y = lat %/% dy * dy + dy/2) |>
  select(x, y, .sid, lb_base) |> 
  inner_join(catch |> filter(sid == 23),      # only plaice
             by = join_by(.sid, lb_base)) |>  # think of this as joining be event
  group_by(.sid, lb_base) |> 
  mutate(catch = catch / n()) |>              # split among pings
  ungroup() |> 
  group_by(x, y) |>                           # summarise by 0.01 x 0.005, all years
  summarise(catch = sum(catch, na.rm = TRUE),   # remember - only plaice catch
            .groups = "drop")

Explain

q |> explain()
<SQL>
SELECT x, y, SUM(catch) AS catch
FROM (
  SELECT
    x,
    y,
    ".sid",
    lb_base,
    sid,
    catch / COUNT(*) OVER (PARTITION BY ".sid", lb_base) AS catch
  FROM (
    SELECT LHS.*, sid, catch
    FROM (
      SELECT
        (FDIV(lon, 0.01) * 0.01) + 0.01 / 2.0 AS x,
        (FDIV(lat, 0.005) * 0.005) + 0.005 / 2.0 AS y,
        ".sid",
        lb_base
      FROM rrdnbirrxhacidr
      WHERE
        (".cid" > 0.0) AND
        (whack = FALSE) AND
        (lon BETWEEN -28.0 AND -20.0) AND
        (lat BETWEEN 65.0 AND 67.5) AND
        (speed BETWEEN s1 AND s2) AND
        ("time" BETWEEN t1 AND t2)
    ) LHS
    INNER JOIN (
      SELECT irpvmlvuqgycyru.*
      FROM irpvmlvuqgycyru
      WHERE (sid = 23.0)
    ) RHS
      ON (LHS.".sid" = RHS.".sid" AND LHS.lb_base = RHS.lb_base)
  ) q01
) q01
GROUP BY x, y

Results - as plot

Spatial acrobatics??

Points-in-polygon (try using fao areas)

fao <- open_dataset("data/auxillary/fao.gpkg")  # connect to a gpkg-file
                                                # only some 87k datapoints
q <- 
  trail |>     # ~ 500 million rows
  filter(year == 2024) |>  # here just a test of 32 million records
  mutate(geom = ST_POINT(lon, lat)) |>  # a little bit of a SQL-lingo here
  duckdbfs::spatial_join(fao, by = "st_intersects", join = "left") |> 
  count(fao)

Explain

q |> explain()
<SQL>
SELECT fao, COUNT(*) AS n
FROM plxhnfoswotidfc
GROUP BY fao

<PLAN>
physical_plan        
┌───────────────────────────┐
│       HASH_GROUP_BY       │
│    ────────────────────   │
│         Groups: #0        │
│                           │
│        Aggregates:        │
│        count_star()       │
│                           │
│      ~1,818,642 rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         PROJECTION        │
│    ────────────────────   │
│            fao            │
│                           │
│      ~2,877,050 rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         PROJECTION        │
│    ────────────────────   │
│            fao            │
│                           │
│      ~2,877,050 rows      │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│        SPATIAL_JOIN       │
│    ────────────────────   │
│      Join Type: LEFT      │
│                           │
│        Conditions:        ├──────────────┐
│ ST_Intersects(geom, geom) │              │
│                           │              │
│      ~2,877,050 rows      │              │
└─────────────┬─────────────┘              │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│         PROJECTION        ││          ST_READ          │
│    ────────────────────   ││    ────────────────────   │
│            geom           ││     Function: ST_READ     │
│                           ││                           │
│                           ││        Projections:       │
│                           ││            fao            │
│                           ││            geom           │
│                           ││                           │
│      ~2,877,050 rows      ││         ~235 rows         │
└─────────────┬─────────────┘└───────────────────────────┘
┌─────────────┴─────────────┐
│       PARQUET_SCAN        │
│    ────────────────────   │
│         Function:         │
│        PARQUET_SCAN       │
│                           │
│        Projections:       │
│            lon            │
│            lat            │
│                           │
│       File Filters:       │
│       (year = 2024)       │
│                           │
│    Scanning Files: 1/19   │
│                           │
│      ~2,877,050 rows      │
└───────────────────────────┘

Get data into R - timing

tictoc::tic()
points_in_polygon <-
  q |> 
  collect()
points_in_polygon |> arrange(-n)
# A tibble: 38 × 2
   fao               n
   <chr>         <dbl>
 1 27.5.a.2   31325890
 2 27.5.b.1.b   143221
 3 27.2.a.2     136157
 4 27.14.b.2     76705
 5 27.4.a        48632
 6 27.2.a.1      30232
 7 27.3.c.22     28919
 8 27.3.a.21     27102
 9 27.5.b.2      25450
10 27.14.a       10585
# ℹ 28 more rows
tictoc::toc()
217.11 sec elapsed

At miminum, I avoided getting fried within R

Bottom line

  • I can start off by connecting to all the AIS year tables

  • I can pass all the heavy workload to duckdb, via R

  • I use my existing {dplyr}-coding, no need to learn SQL

  • I do not get fried within R

  • Mostly tested using a computer mainframe, but seems (as done here) to work on personal computer

  • Things just seem to work

Issues

  • Novelty means risk
    • Stability of {duckdb} and {duckdbfs}
    • Person’s may still be interested in above approach, prior to feeding things into the datacall workflow.
  • Change in official workflow has huge risks
    • Current workflow is a mis-hash of base R, data.table and dplyr
    • By going duckdb/SQL we need to change c-square stuff, allocate … and split-among-ping approach
  • {duckdb} thinking requires sticking to dplyr/SQL lingo
    • Going fully {dplyr/duckdb/SQL} may alienated important contributors (aka data.table)

Questions