Readme
RSP-RS
A high-performance RDF Stream Processing engine in Rust built on Oxigraph .
Installation
[ dependencies ]
rsp-rs = " 0.3.5"
Or:
cargo add rsp-rs
Quick Start
use rsp_rs:: RSPEngine;
use oxigraph:: model:: * ;
fn main ( ) -> Result < ( ) , Box < dyn std:: error:: Error> > {
// Define RSP-QL query
let query = r #"
PREFIX ex: <https://rsp.rs/>
REGISTER RStream <output> AS
SELECT *
FROM NAMED WINDOW ex:w1 ON STREAM ex:stream1 [RANGE 10000 STEP 2000]
WHERE {
WINDOW ex:w1 { ?s ?p ?o }
}
"# ;
// Initialize engine
let mut engine = RSPEngine:: new( query. to_string ( ) ) ;
engine. initialize ( ) ? ;
// Get stream and start processing
let stream = engine. get_stream ( " https://rsp.rs/stream1" ) . unwrap ( ) ;
let results = engine. start_processing ( ) ;
// Add data with timestamps
let quad = Quad:: new(
NamedNode:: new( " https://rsp.rs/subject" ) ? ,
NamedNode:: new( " https://rsp.rs/predicate" ) ? ,
NamedNode:: new( " https://rsp.rs/object" ) ? ,
GraphName:: DefaultGraph,
) ;
stream. add_quads ( vec! [ quad] , 1000 ) ? ;
// Close stream to get final results
engine. close_stream ( " https://rsp.rs/stream1" , 10000 ) ? ;
Ok ( ( ) )
}
Key Concepts
Window Closure & Results
Results emit when windows close , triggered by event timestamps (not wall-clock time):
stream. add_quads ( vec! [ quad1] , 0 ) ? ; // Added to window
stream. add_quads ( vec! [ quad2] , 1000 ) ? ; // Added to window
stream. add_quads ( vec! [ quad3] , 2000 ) ? ; // Closes window - results emitted!
Important: Always call close_stream ( ) after your last event to trigger final window closures.
Timestamps vs Wall-Clock
The system is timestamp-driven :
You can add all events instantly
Only the timestamp parameter matters
Windows close when an event's timestamp exceeds the window's end time
Important: Large Timestamp Support (v0.3.5+)
rsp-rs v0.3.5 fixes a critical precision issue with large timestamps (e.g., Unix milliseconds).
Before v0.3.5 : Large timestamps (~1.76 trillion) would cause silent failures or incorrect results
v0.3.5+ : All timestamp ranges work correctly, from 0 to i64 :: MAX
No workarounds needed : Use Unix timestamps directly without normalization
See LARGE_TIMESTAMP_FIX.md for details and migration guide.
Features
RSP-QL Support - Full RSP-QL syntax for continuous queries
Sliding Windows - Time-based windows with configurable range and step
SPARQL Aggregations - COUNT, AVG, MIN, MAX, SUM with GROUP BY
Stream-Static Joins - Join streaming data with static knowledge
Multi-threaded - Efficient concurrent processing
Cloneable Streams - No lifetime issues, easy API
API
RSPEngine
new ( query) - Create engine with RSP-QL query
initialize ( ) - Initialize windows and streams
start_processing ( ) - Start processing, returns result receiver
get_stream ( name) - Get stream for adding data
close_stream ( uri, timestamp) - Trigger final window closures
add_static_data ( quad) - Add static background data
RDFStream
add_quads ( quads, timestamp) - Add quads with event timestamp
Cloneable - can be stored and reused
Debugging
let window = engine. get_window ( " window_name" ) . unwrap ( ) ;
let mut w = window. lock ( ) . unwrap ( ) ;
println! ( " Active windows: {} " , w. get_active_window_count ( ) ) ;
w. set_debug_mode ( true ) ; // Enable verbose logging
Throughput : Up to 1.28M quads/second
Latency : ~400-700us query execution on 30s windows
Memory : ~2.5KB per quad in window
Run benchmarks:
cargo bench
Examples
See examples/streaming_lifecycle.rs and tests/integration/ for more examples.
Documentation
License
MIT License - Copyright Ghent University - imec
Acknowledgments
Rust port of RSP-JS . Thanks to the original authors for their excellent work.
Kush Bisen or create an issue on GitHub.