Ropendal: Abstract Filesystem Access Interface for R via the Rust
crate opendal, using
savvy for R/Rust FFI. We
keep the bottom layer byte-first: filesystem operations move raw bytes,
and serializers or codecs explicitly materialize those bytes into R
objects.
Async work uses Aio handles inspired by
nanonext: we issue work with
fs_*_aio(), wait with call_aio(), and retrieve the payload (or an
opendalErrorValue) with collect_aio().
We expose the same principles to native packages through a pure C API in
inst/include/ropendal.h: submit async work, wait on an Aio, then read
borrowed results or fill caller-owned buffers without routing data
through R raw vectors.
Serializers and codecs are first-class. Serializer/deserializer pairs
control R-object materialization; native codecs such as gzip and
zlib transform raw bytes. Both layers stay explicit through mode,
serial_config(), codec_config(), serialize_raw(), and
deserialize_raw().
Backends are configured explicitly. We pass HTTP headers to
opendal("http") with headers, and we pass credential provider
objects such as credentials_s3(), credentials_gcs(),
credentials_azblob(), and credentials_gdrive() with auth.
Ropendal is available from R-universe:
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org")
)R-universe also publishes Linux binaries for Ubuntu noble on x86_64
and arm64 for R-release and R-devel. To prefer those binaries on
Linux, point repos at the known binary repositories for this universe
and CRAN dependencies:
options(repos = c(
Ropendal = sprintf(
"https://sounkou-bioinfo.r-universe.dev/bin/linux/noble-%s/%s/",
R.version$arch,
substr(getRversion(), 1, 3)
),
CRAN = sprintf(
"https://cran.r-universe.dev/bin/linux/noble-%s/%s/",
R.version$arch,
substr(getRversion(), 1, 3)
)
))
install.packages("Ropendal")Source builds can still use explicit OpenDAL feature flags when you need custom provider wiring.
# Keep only local filesystem, HTTP, S3-compatible, and Google Drive support.
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org"),
type = "source",
configure.args = "--without-default-rust-features --with-rust-features=fs,http,s3,gdrive"
)
# Add the current cloud-service feature group explicitly.
install.packages(
"Ropendal",
repos = c("https://sounkou-bioinfo.r-universe.dev", "https://cloud.r-project.org"),
type = "source",
configure.args = "--enable-cloud"
)A single filesystem handle gives us byte primitives, vectorized batches, Aio handles, explicit serializers/codecs, and lower-level iterators.
library(Ropendal)
root <- tempfile("ropendal-readme-")
dir.create(root, recursive = TRUE)
fs <- opendal("fs", root = root)
fs_write(fs, "note.txt", charToRaw("hello ropendal\n"))
#> [1] TRUE
rawToChar(fs_read(fs, "note.txt"))
#> [1] "hello ropendal\n"
fs_stat(fs, "note.txt")[c("path", "type", "size")]
#> $path
#> [1] "note.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 15
vapply(fs_ls(fs), `[[`, character(1), "path")
#> [1] "note.txt"paths <- c("batch/one.txt", "batch/two.txt")
fs_write(
fs,
paths,
list(charToRaw("one\n"), charToRaw("two\n")),
batch_concurrency = 2
)
#> [[1]]
#> [1] TRUE
#>
#> [[2]]
#> [1] TRUE
many <- fs_read(fs, paths, offset = c(0, 0), batch_concurrency = 2)
vapply(many, rawToChar, character(1))
#> [1] "one\n" "two\n"aio <- fs_read_aio(fs, "note.txt")
call_aio(aio)
rawToChar(collect_aio(aio))
#> [1] "hello ropendal\n"A serializer/deserializer can use
nanoarrow to turn a
data frame into Arrow IPC bytes and back. Native codecs such as gzip
remain explicit byte transforms.
arrow_config <- serial_config(
"data.frame",
sfunc = function(x) {
con <- rawConnection(raw(), "wb")
on.exit(close(con), add = TRUE)
nanoarrow::write_nanoarrow(x, con)
rawConnectionValue(con)
},
ufunc = function(raw) as.data.frame(nanoarrow::read_nanoarrow(raw))
)
arrow_tbl <- data.frame(
id = 1:3,
sample = c("HG001", "HG002", "HG003"),
depth = c(32.5, 28.0, 41.25)
)
fs_replace(fs, "tables/depth.arrows", arrow_tbl, mode = "serial", serial_config = arrow_config)
#> [1] TRUE
fs_read(fs, "tables/depth.arrows", mode = "serial", serial_config = arrow_config)
#> id sample depth
#> 1 1 HG001 32.50
#> 2 2 HG002 28.00
#> 3 3 HG003 41.25
fs_replace(fs, "objects/message.gz", charToRaw("compressed bytes\n"), mode = "codec", codec = "gzip")
#> [1] TRUE
rawToChar(fs_read(fs, "objects/message.gz", mode = "codec", codec = "gzip"))
#> [1] "compressed bytes\n"writer <- fs_write_iter(fs, "stream.txt")
write_iter_write(writer, charToRaw("hello "))
#> [1] TRUE
write_iter_write(writer, charToRaw("iterator\n"))
#> [1] TRUE
write_iter_close(writer)
#> [1] TRUE
reader <- fs_read_iter(fs, "stream.txt", chunk_size = 5)
first <- read_iter_next(reader)
rawToChar(first$data)
#> [1] "hello"
fs_tell(reader)
#> [1] 5
fs_seek(reader, 6)
#> [1] 6
rawToChar(read_iter_collect(reader))
#> [1] "iterator\n"The same fs_* calls apply to HTTP(S), S3, and Google Drive handles.
We define the headers once, require them on the local fixture, and pass the same headers to the HTTP filesystem handle.
root <- tempfile("ropendal-http-readme-")
dir.create(root, recursive = TRUE)
writeLines("hello http example", file.path(root, "hello.txt"))
headers <- list(
Authorization = "Bearer ropendal-readme",
`X-Ropendal-Example` = "headers"
)
fixture <- OpendalHttpFixture$start(root, required_headers = headers)
http_fs <- opendal("http", endpoint = fixture$endpoint(), root = "/", headers = headers)
rawToChar(fs_read(http_fs, "hello.txt"))
#> [1] "hello http example\n"
fs_stat(http_fs, "hello.txt")[c("path", "type", "size")]
#> $path
#> [1] "hello.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 19
fixture$stop()
#> [1] TRUEWe set up a temporary MinIO instance behind the scenes, then check that the S3-compatible store supports the same byte API.
We write, read, stat, and list objects with the same fs_* functions.
fs_write(s3_fs, "notes/a.txt", charToRaw("hello s3-compatible store\n"))
#> [1] TRUE
fs_write(s3_fs, "notes/b.txt", charToRaw("another object\n"))
#> [1] TRUE
rawToChar(fs_read(s3_fs, "notes/a.txt"))
#> [1] "hello s3-compatible store\n"
fs_stat(s3_fs, "notes/a.txt")[c("path", "type", "size")]
#> $path
#> [1] "notes/a.txt"
#>
#> $type
#> [1] "file"
#>
#> $size
#> [1] 26
vapply(fs_ls(s3_fs, "notes/"), `[[`, character(1), "path")
#> [1] "notes/a.txt" "notes/b.txt"The async path returns an Aio handle; we wait and collect when we need the payload.
aio <- fs_read_aio(s3_fs, "notes/b.txt")
call_aio(aio)
rawToChar(collect_aio(aio))
#> [1] "another object\n"For a small local comparison, we use a larger object to compare
Ropendal’s default path with its chunked/concurrent path. Local MinIO
removes most network latency, so concurrency helps most when the object
is large enough or when the store is remote; for many independent
objects, use batch_concurrency. paws.storage is included as a
single-GET baseline.
restore_aws_env <- readme_set_aws_env(minio)
paws_s3 <- paws.storage::s3(
endpoint = minio$endpoint,
region = minio$region,
config = list(s3_force_path_style = TRUE)
)
payload_size <- 64 * 1024 * 1024
payload <- rep(as.raw(0:255), length.out = payload_size)
bench_key <- "bench/payload.bin"
read_chunk <- 8 * 1024 * 1024
write_chunk <- 8 * 1024 * 1024We first compare upload/replace paths: the default Ropendal call,
Ropendal with explicit write chunking/concurrency, and
paws.storage::put_object().
bench::mark(
ropendal_replace = fs_replace(s3_fs, bench_key, payload),
ropendal_replace_concurrent = fs_replace(
s3_fs,
bench_key,
payload,
write_concurrency = 4,
chunk_size = write_chunk
),
paws_put = {
paws_s3$put_object(Bucket = minio$bucket, Key = bench_key, Body = payload)
TRUE
},
iterations = 3,
check = FALSE
)[, c("expression", "min", "median", "itr/sec", "mem_alloc", "n_gc")]
#> # A tibble: 3 × 5
#> expression min median `itr/sec` mem_alloc
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt>
#> 1 ropendal_replace 126.7ms 130.4ms 7.61 0B
#> 2 ropendal_replace_concurrent 81.8ms 83.5ms 12.0 0B
#> 3 paws_put 544.7ms 544.7ms 1.84 67.7MBThen we compare download paths. The Ropendal rows separate default
reads, chunked/concurrent reads, Aio reads, and Aio plus
chunked/concurrent reads; paws_get remains the single-GET baseline.
bench::mark(
ropendal_read = fs_read(s3_fs, bench_key),
ropendal_read_concurrent = fs_read(
s3_fs,
bench_key,
read_concurrency = 4,
chunk_size = read_chunk
),
ropendal_read_aio = collect_aio(fs_read_aio(s3_fs, bench_key)),
ropendal_read_aio_concurrent = collect_aio(fs_read_aio(
s3_fs,
bench_key,
read_concurrency = 4,
chunk_size = read_chunk
)),
paws_get = paws_s3$get_object(Bucket = minio$bucket, Key = bench_key)$Body,
iterations = 3,
check = FALSE
)[, c("expression", "min", "median", "itr/sec", "mem_alloc", "n_gc")]
#> # A tibble: 5 × 5
#> expression min median `itr/sec` mem_alloc
#> <bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt>
#> 1 ropendal_read 70.6ms 70.6ms 14.2 64MB
#> 2 ropendal_read_concurrent 34.8ms 35.4ms 28.3 64MB
#> 3 ropendal_read_aio 39.6ms 47.1ms 21.2 64MB
#> 4 ropendal_read_aio_concurrent 31.6ms 31.6ms 31.7 64MB
#> 5 paws_get 55.4ms 56.7ms 17.1 64.3MBFor Google Drive, we pass a credential provider object through auth
and keep secret material outside the filesystem handle printout. The
chunk evaluates when local credential files are available.
secret_json <- Sys.getenv("ROPENDAL_GDRIVE_SECRET_JSON")
tokens_json <- Sys.getenv("ROPENDAL_GDRIVE_TOKENS_JSON", unset = file.path(dirname(secret_json), "tokens.json"))
gdrive_root <- Sys.getenv("ROPENDAL_GDRIVE_ROOT", unset = "Ropendal")
gdrive_file <- Sys.getenv("ROPENDAL_GDRIVE_FILE", unset = "map_catalog.txt")
drive_fs <- opendal(
"gdrive",
root = gdrive_root,
auth = credentials_gdrive3(
secret_json = secret_json,
tokens_json = tokens_json,
scope = "https://www.googleapis.com/auth/drive"
)
)
drive_head <- rawToChar(fs_read(drive_fs, gdrive_file, size = 64))
drive_head
#> [1] "tr \":\" \" \" < hglft_genome_58b39_637d30_hdl_metal.bed | tr \"-\" \" "The native API is for other R packages that want OpenDAL-backed byte I/O
without calling R while async work is running. A downstream package can
declare LinkingTo: Ropendal, include <ropendal.h>, submit async
work, wait on Aio handles, and read borrowed results or fill
caller-owned buffers.
We exercise that installed C API in-process with
Rtinycc. We assemble the
C source in pieces, use Rtinycc’s bundled
Protothreads header, and let R resume a
native state machine while doing ordinary R-side work between
resumptions. The Rprintf() calls below are demo instrumentation
executed only when R resumes the C task on the main thread; OpenDAL
background work does not call R.
root <- tempfile("ropendal-c-api-readme-")
dir.create(root, recursive = TRUE)
ropendal_lib <- list.files(
system.file("libs", package = "Ropendal"),
pattern = paste0("Ropendal", .Platform$dynlib.ext, "$"),
recursive = TRUE,
full.names = TRUE
)We start with includes, status codes, and a native task struct. State that must survive a protothread yield lives in the struct, not in local C variables.
c_api_state <- r"(
#include <rtinycc/pt.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <R_ext/Print.h>
#include "ropendal.h"
enum { ROPENDAL_DEMO_DONE = 0, ROPENDAL_DEMO_RUNNING = 1, ROPENDAL_DEMO_ERROR = -1 };
static const uint8_t demo_payload[] = "hello native api\n";
typedef struct ropendal_demo_task {
struct pt pt;
ropendal_fs_t *fs;
ropendal_aio_t *aio;
ropendal_error_t *err;
ropendal_write_options_t write_opts;
ropendal_read_options_t read_opts;
ropendal_ls_options_t ls_opts;
const ropendal_entry_t *entry;
const ropendal_entry_t *entries;
size_t nentries;
size_t nread;
uint8_t dst[64];
int status;
int failed;
int done;
int tick;
const char *step;
char message[256];
} ropendal_demo_task_t;
)"Next we define cleanup and error helpers plus a constructor.
ropendal_fs_open() copies the root configuration during the call, so
the R string does not need to outlive the constructor.
c_api_lifecycle <- r"(
static void demo_set_error(ropendal_demo_task_t *task, const char *fallback) {
const char *message = fallback ? fallback : "native task failed";
if (task && task->err) {
const char *native_message = ropendal_error_message(task->err);
if (native_message && native_message[0]) message = native_message;
}
if (task) {
snprintf(task->message, sizeof(task->message), "%s", message);
task->step = message;
task->failed = 1;
if (task->err) {
ropendal_error_release(task->err);
task->err = 0;
}
}
}
#define DEMO_FAIL(task, message) do { demo_set_error((task), (message)); PT_EXIT(&(task)->pt); } while (0)
void *ropendal_demo_open(const char *root) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)calloc(1, sizeof(ropendal_demo_task_t));
if (!task) return NULL;
PT_INIT(&task->pt);
task->step = "open filesystem";
ropendal_kv_t cfg = { sizeof(ropendal_kv_t), "root", root };
task->status = ropendal_fs_open("fs", &cfg, 1, &task->fs, &task->err);
if (task->status != ROPENDAL_OK) demo_set_error(task, "ropendal_fs_open failed");
else task->step = "filesystem ready";
return task;
}
void ropendal_demo_free(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
if (!task) return;
if (task->aio) ropendal_aio_release(task->aio);
if (task->fs) ropendal_fs_release(task->fs);
if (task->err) ropendal_error_release(task->err);
free(task);
}
)"The protothread body submits async work, yields to R, polls for completion, and then extracts results. This performs write, stat, listing, and a read into a caller-owned C buffer.
c_api_protothread <- r"(
static int ropendal_demo_resume_internal(ropendal_demo_task_t *task) {
PT_BEGIN(&task->pt);
task->step = "submit write";
memset(&task->write_opts, 0, sizeof(task->write_opts));
task->write_opts.struct_size = sizeof(task->write_opts);
task->write_opts.path = "native.txt";
task->status = ropendal_write_aio(
task->fs, &task->write_opts, demo_payload, sizeof(demo_payload) - 1, &task->aio, &task->err
);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "write submit failed");
task->step = "wait write";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "write wait failed");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit stat";
task->status = ropendal_stat_aio(task->fs, "native.txt", 0, 0, &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "stat submit failed");
task->step = "wait stat";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "stat wait failed");
task->status = ropendal_aio_result_entry(task->aio, &task->entry, &task->err);
if (task->status != ROPENDAL_OK || !task->entry || !task->entry->has_content_length) {
DEMO_FAIL(task, "stat result failed");
}
if (task->entry->content_length != sizeof(demo_payload) - 1) DEMO_FAIL(task, "unexpected stat size");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit list";
memset(&task->ls_opts, 0, sizeof(task->ls_opts));
task->ls_opts.struct_size = sizeof(task->ls_opts);
task->ls_opts.path = "";
task->status = ropendal_ls_aio(task->fs, &task->ls_opts, &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "ls submit failed");
task->step = "wait list";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "ls wait failed");
task->status = ropendal_aio_result_entries(task->aio, &task->entries, &task->nentries, &task->err);
if (task->status != ROPENDAL_OK || task->nentries == 0) DEMO_FAIL(task, "ls result failed");
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "submit read_into";
memset(&task->read_opts, 0, sizeof(task->read_opts));
memset(task->dst, 0, sizeof(task->dst));
task->read_opts.struct_size = sizeof(task->read_opts);
task->read_opts.path = "native.txt";
task->status = ropendal_read_into_aio(task->fs, &task->read_opts, task->dst, sizeof(task->dst), &task->aio, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into submit failed");
task->step = "wait read_into";
PT_YIELD(&task->pt);
while (ropendal_aio_poll(task->aio) == ROPENDAL_AIO_PENDING) PT_YIELD(&task->pt);
task->status = ropendal_aio_wait(task->aio, -1, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into wait failed");
task->status = ropendal_aio_result_nread(task->aio, &task->nread, &task->err);
if (task->status != ROPENDAL_OK) DEMO_FAIL(task, "read_into nread failed");
if (task->nread != sizeof(demo_payload) - 1 || memcmp(task->dst, demo_payload, task->nread) != 0) {
DEMO_FAIL(task, "read_into bytes mismatch");
}
ropendal_aio_release(task->aio);
task->aio = 0;
task->step = "complete";
task->done = 1;
PT_END(&task->pt);
}
)"Finally we expose small C entry points for Rtinycc to call.
c_api_exports <- r"(
static const char *ropendal_demo_status_name(int status) {
if (status == ROPENDAL_DEMO_RUNNING) return "running";
if (status == ROPENDAL_DEMO_DONE) return "done";
return "error";
}
static int ropendal_demo_report(ropendal_demo_task_t *task, int status) {
task->tick += 1;
Rprintf(
"native request %d: %s; %s\n",
task->tick,
ropendal_demo_status_name(status),
task->step ? task->step : "unknown"
);
return status;
}
int ropendal_demo_resume(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
if (!task) return ROPENDAL_DEMO_ERROR;
if (task->failed) return ropendal_demo_report(task, ROPENDAL_DEMO_ERROR);
if (task->done) return ropendal_demo_report(task, ROPENDAL_DEMO_DONE);
task->status = ropendal_demo_resume_internal(task);
if (task->failed) return ropendal_demo_report(task, ROPENDAL_DEMO_ERROR);
if (task->status == PT_YIELDED || task->status == PT_WAITING) {
return ropendal_demo_report(task, ROPENDAL_DEMO_RUNNING);
}
task->done = 1;
return ropendal_demo_report(task, ROPENDAL_DEMO_DONE);
}
int ropendal_demo_nread(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
return task ? (int)task->nread : -1;
}
const char *ropendal_demo_error(void *ptr) {
ropendal_demo_task_t *task = (ropendal_demo_task_t *)ptr;
return task ? task->message : "null task";
}
)"We compile and bind that native code in memory.
c_api_code <- paste(c_api_state, c_api_lifecycle, c_api_protothread, c_api_exports, sep = "\n")
ffi <- Rtinycc::tcc_ffi() |>
Rtinycc::tcc_include(system.file("include", package = "Rtinycc")) |>
Rtinycc::tcc_include(system.file("include", package = "Ropendal")) |>
Rtinycc::tcc_library(ropendal_lib[[1]]) |>
Rtinycc::tcc_source(c_api_code) |>
Rtinycc::tcc_bind(
ropendal_demo_open = list(args = list("cstring"), returns = "ptr"),
ropendal_demo_resume = list(args = list("ptr"), returns = "i32"),
ropendal_demo_nread = list(args = list("ptr"), returns = "i32"),
ropendal_demo_error = list(args = list("ptr"), returns = "cstring"),
ropendal_demo_free = list(args = list("ptr"), returns = "void")
) |>
Rtinycc::tcc_compile()R controls the scheduling loop. Between native resumptions we do ordinary R work; the native code never creates R objects while OpenDAL I/O is running.
task <- ffi$ropendal_demo_open(root)
status <- 1L
r_ticks <- 0L
r_work <- 0L
while (status > 0L) {
status <- ffi$ropendal_demo_resume(task)
r_ticks <- r_ticks + 1L
r_work <- r_work + sum(seq_len(1000L))
Sys.sleep(0.001)
}
#> native request 1: running; wait write
#> native request 2: running; wait write
#> native request 3: running; wait write
#> native request 4: running; wait stat
#> native request 5: running; wait list
#> native request 6: running; wait read_into
#> native request 7: done; complete
if (status < 0L) {
message <- ffi$ropendal_demo_error(task)
ffi$ropendal_demo_free(task)
stop(message, call. = FALSE)
}
nread <- ffi$ropendal_demo_nread(task)
ffi$ropendal_demo_free(task)
#> NULL
c(
r_ticks = r_ticks,
r_work = r_work,
bytes_read_into_c_buffer = nread
)
#> r_ticks r_work bytes_read_into_c_buffer
#> 7 3503500 17Common targets:
make --no-print-directory help
Common development targets:
make rd regenerate savvy wrappers, roxygen docs, and NAMESPACE
make test-fast install current source and run non-network tinytest
make test-http run opt-in local HTTP fixture tests
make test-s3 run opt-in public read-only S3-compatible tests
make test-s3-minio start local MinIO and run writable S3-compatible tests
make test-gdrive run opt-in Google Drive tests using local gdrive3 JSON defaults
make test-ci run C API checks and CI-only tinytest
make rdm render README.md from README.Rmd
make bench-minio-paws render development MinIO benchmark
make check build and run R CMD check --as-cran --no-manual