A Lua library for expressing SQL‑like joins and queries over ReactiveX observable streams.
LQR (pronunced like "LiQoR", /ˈlɪk.ər/) sits on top of lua‑reactivex and adds a schema‑aware query layer:
- streaming joins (
inner,left,outer,anti*) with explicit join windows and per‑key buffers; WHERE-style filtering over joined rows;GROUP BY/HAVINGover sliding windows with aggregates;- per‑schema
distinct - expiration / observability streams;
- optional visualization adapters for showcasing or inspecting specific join behavior.
It is designed for in‑process, single‑threaded environments like Lua game runtimes, but works in any Lua 5.1 host that can run lua‑reactivex. Its mental model is “embedded stream processing,” borrowing ideas from Flink/Spark without aiming for their distributed guarantees. Like ReactiveX itself, LiQoR stays local: no clustering, checkpointing, or durability—it’s compositional stream processing inside your Lua app.
LQR.mp4
This example shows how to:
- wrap two in‑memory tables as schema‑tagged observables;
- join them by key with a simple count‑based join window;
- filter joined rows with a
WHERE‑style predicate; and - subscribe to both the joined stream and the expiration side channel.
local LQR = require("LQR")
local Query = LQR.Query
local Schema = LQR.Schema
-- 1) Wrap tables into schema-tagged observables.
local customers = Schema.observableFromTable("customers", {
{ id = 1, name = "Ada" },
{ id = 2, name = "Max" },
}, "id")
local orders = Schema.observableFromTable("orders", {
{ id = 10, customerId = 1, total = 50 },
{ id = 11, customerId = 1, total = 75 },
}, "id")
-- 2) Build a streaming left join: customers ⟕ orders USING id/customerId.
local query =
Query.from(customers, "customers")
:leftJoin(orders, "orders")
:using({ customers = "id", orders = "customerId" })
:joinWindow({ count = 1000 }) -- simple bounded cache per side
:where(function(row)
-- keep only customers that have at least one order
return row.orders.id ~= nil
end)
-- 3) Consume joined results.
query:subscribe(function(result)
local customer = result:get("customers")
local order = result:get("orders") -- may be nil for unmatched left rows
print(("[joined] customer=%s order=%s total=%s"):format(
customer and customer.name or "nil",
order and order.id or "none",
order and order.total or "n/a"
))
end)
-- 4) Observe expirations: when records leave the join window or expire for other reasons. Useful for debugging and query/cache optimization
query:expired():subscribe(function(packet)
print(("[expired] schema=%s key=%s reason=%s"):format(
tostring(packet.schema),
tostring(packet.key),
tostring(packet.reason)
))
end)In a real project you would typically:
- build observables from event buses or subjects instead of static tables; and
- layer additional operators like more complex
:where(...)predicates,:groupBy(...), and:distinct(...)on top of the join.
See examples.lua, tests/unit, and vizualisation/demo for more varied scenarios.
Reach for the LiQoR when you need to relate and group several event sources in real time; use plain reactive streams for simpler live work, and if polling already gives timely answers in a simple enough manner, you don’t need to "go reactive" at all.
- Queries that stay mounted and update incrementally
- Per‑key state that manages itself
- Many views over the same events
- Time is a first class concept
- One place for cross‑stream domain logic
- Helps avoid overload and spikes
- Handles late, missing, and out‑of‑order events
See advantages for the details.
In a nutshell:
- Skip reactive programming when:
- You can poll or call when needed and still meet latency requirements
- correctness/completeness matters more than immediacy
- You need strict ordering or transactions
- Use Plain Rx when:
- You can’t pull all data in one go
- You’re following/observing a single stream/thing
- You're handling primarily scalars/simple values
- “live enough” matters more than full completeness
- Try out LQR when:
- You can’t pull all data in one go
- You need to relate multiple streams/observations
- You need to support more complex shapes and different data rates
- You don't need to create a perfectly complete snapshot.
Quick comparison of what each approach may deliver
| Concern | Non-reactive (poll) | Plain Rx | LQR |
|---|---|---|---|
| Connectedness | High | Low | High |
| Completeness | High | Low | Depends |
| Simplicity | Depends | High | Low |
| Real-time Ability | Limited | High | High |
- Rx before LQR: clean/normalize sources (map/filter/debounce).
- LQR for correlation/state: joins, anti-joins, distinct, grouped aggregates with explicit retention.
- Rx after LQR: map/filter/share results to consumers or side-effects without redoing correlation.
- Game programming: If a player enters zone Z and triggers beacon B within 3s, spawn encounter X; otherwise mark the attempt stale.
LQR handles the keyed, time-bounded join and emits unmatched/expired events for debugging balance.
- Realtime personalization/ads: Show offer O to users who viewed product P and then searched for related term T within 10s; drop the impression if no add-to-cart happens within 30s.
LQR keeps streams keyed by user/session, joins them in windows, and emits expirations for dropped/late flows.
- Security/fraud: Flag logins from device X if they are followed by a high-value transaction from a different IP within 15s; rate-limit per account across sliding 5-minute windows.
LQR joins streams by account/device and provides grouped aggregates for rate controls.
- IoT/telemetry: Join telemetry from sensor D with its latest config/firmware event; if no telemetry arrives for 60s, emit a stale alert; group per site to cap concurrent stale devices.
LQR bring Cross-stream correlation, staleness, and grouped caps.
LQR depends on lua-reactivex, but keeps it as an external checkout. Place our fork where LQR can load it as ./reactivex/ next to ./LQR/):
# Option A (recommended): clone into ./reactivex
git clone https://github.com/christophstrasen/lua-reactivex.git reactivex
# Option B: add as a submodule at ./reactivex
git submodule add https://github.com/christophstrasen/lua-reactivex.git reactivex
git submodule update --init --recursive
Once present, require("reactivex/...") resolves for examples, tests, and the visualization pipeline.
In tick-driven runtimes (games/sims), upstream event sources can arrive in bursts (hundreds/thousands per frame). If you immediately push every event through joins/grouping, you can create frame spikes.
LQR/ingest provides a host-friendly "flow regulator" that implements a “Ingest → Buffer → Drain” boundary:
local Ingest = require("LQR/ingest")
local buffer = Ingest.buffer({
name = "engine.events",
mode = "dedupSet",
capacity = 5000,
key = function(item) return item.id end,
})
-- event handler:
buffer:ingest({ id = 123 })
-- tick:
buffer:drain({
maxItems = 200,
handle = function(item)
-- materialize + emit into schema/query streams here
end,
})Check our docs/concepts/ingest_buffering.md for more details including the handy buffer:advice methods.
- User & API docs: see docs/ (table of contents and links into conceptual guides, how‑tos, and references).
- Development workflow: see docs/development.md.
LQR builds directly on the work of others in the Lua ReactiveX ecosystem:
christophstrasen/lua-reactivex— forked from4O4/lua-reactivex, expected at./reactivexin this repo.bjornbytes/RxLua— the original Rx implementation
If you are new to ReactiveX in Lua, both repositories and especially ReactiveX are worth a look.
Parts of this project’s code, tests and documentation were drafted or refined with the assistance of AI tools (including OpenAI’s ChatGPT).
All code and text have been reviewed and edited by a human before inclusion.
No assets, proprietary content, or copyrighted material were conciously generated, reproduced, or distributed using AI tools.
LQR is provided entirely AS‑IS — still experimental and evolving.
There are no guarantees, no stable releases, and not even a firm notion of versioning at this point.
It may change tomorrow or be abandoned without notice.
By using or modifying this code, you accept that:
- You do so at your own risk.
- The authors take no responsibility for any harm, loss, or unintended side effects
- There is no warranty, express or implied
- Licensed under the MIT License (see the
LICENSEfile for details).