A Crystal SDK for the Datastar hypermedia framework.
Datastar is a lightweight (~10KB) framework that brings reactive UI updates to server-rendered applications using Server-Sent Events (SSE) and HTML data attributes. This SDK provides a type-safe Crystal API for streaming DOM updates, managing reactive signals, and executing scripts in the browser—all from your server-side code.
- Datastar.cr
Datastar combines the simplicity of server-side rendering with the interactivity of modern frontend frameworks. Instead of sending JSON and rebuilding the UI in JavaScript, Datastar streams HTML fragments directly from the server using SSE.
This SDK implements the Datastar SSE protocol for Crystal, inspired by the official Ruby SDK.
Key features:
- Stream real-time UI updates via SSE
- Concurrent streaming with fiber-based concurrency
- Automatic heartbeat and connection health monitoring
- Built-in adapters for Kemal, Athena, and Blueprint frameworks
- Pub/sub system for multi-session synchronization
- Flexible rendering with the
Renderableprotocol
The examples/ directory contains working TodoMVC implementations:
- kemal-todomvc - TodoMVC with Kemal, demonstrating pub/sub synchronization across browser sessions
- athena-todomvc - TodoMVC with Athena framework and Blueprint components
Each example demonstrates the full feature set including SSE streaming, pub/sub for multi-session sync, and reactive UI updates.
Add the dependency to your shard.yml:
dependencies:
datastar:
github: watzon/datastar.crThen run:
shards installrequire "datastar"
def handle_events(request, response)
sse = Datastar::ServerSentEventGenerator.new(request, response)
sse.stream do |stream|
stream.patch_elements(%(<div id="greeting">Hello, Datastar!</div>))
end
endUse stream for long-lived connections with multiple updates:
sse.stream do |stream|
10.times do |i|
sleep 1.second
stream.patch_elements(%(<div id="count">#{i}</div>))
end
endThe stream block sets SSE headers, manages concurrency, and handles cleanup automatically.
For single updates without persistent connections:
sse.patch_elements(%(<div id="notification">Task completed!</div>))
sse.finishPatch HTML fragments into the DOM:
# Basic usage
sse.patch_elements(%(<div id="message">Hello!</div>))
# Target a specific element
sse.patch_elements(%(<p>Updated</p>), selector: "#target")
# Append to a list
sse.patch_elements(%(<li>New item</li>), selector: "#list", mode: Datastar::FragmentMergeMode::Append)
# Multiple fragments
sse.patch_elements([%(<div id="a">A</div>), %(<div id="b">B</div>)])Merge modes: Outer (default), Inner, Replace, Prepend, Append, Before, After, Remove
sse.remove_elements("#notification")Update reactive signals:
sse.patch_signals(count: 42, user: {name: "Alice"})
sse.patch_signals({enabled: true}, only_if_missing: true)sse.remove_signals(["user.name", "user.email"])# As JSON::Any
signals = sse.signals
count = signals["count"].as_i
# As typed struct
user = sse.signals(UserSignals)sse.execute_script(%(console.log("Hello!")))
sse.execute_script("initWidget()", auto_remove: false)
sse.execute_script(%(import('./mod.js')), attributes: {"type" => "module"})sse.redirect("/dashboard")# Lifecycle callbacks
sse.on_connect { puts "Connected" }
sse.on_client_disconnect { puts "Client left" }
sse.on_server_disconnect { puts "Done streaming" }
sse.on_error { |ex| Log.error { ex.message } }
# Manual connection check
sse.check_connection! # Raises IO::Error if closed
# Check connection state
sse.closed?require "kemal"
require "datastar/adapters/kemal"
# Streaming endpoint
get "/events" do |env|
env.datastar_stream do |sse|
10.times do |i|
sleep 1.second
sse.patch_elements(%(<div id="count">#{i}</div>))
end
end
end
# HTML response
get "/" do |env|
env.datastar_render("<h1>Hello, Datastar!</h1>")
end
# Check if request is from Datastar
get "/page" do |env|
if env.datastar_request?
env.datastar_render("<div>Fragment</div>")
else
env.datastar_render("<html><body>Full page</body></html>")
end
end
# Broadcast to all subscribed clients
post "/update" do |env|
env.datastar_broadcast("my-topic") do |sse|
sse.patch_elements("<div id='content'>Updated!</div>")
end
end
Kemal.runrequire "athena"
require "datastar"
require "datastar/adapters/athena"
class EventsController < ATH::Controller
include Datastar::Athena::LiveController
@[ARTA::Get("/events")]
def stream_events(request : ATH::Request) : ATH::StreamedResponse
datastar_stream(request) do |stream|
10.times do |i|
sleep 1.second
stream.patch_elements(%(<div id="count">#{i}</div>))
end
end
end
endThe LiveController mixin also provides datastar_render for HTML responses and datastar_broadcast for pub/sub:
@[ARTA::Get("/")]
def index : ATH::Response
datastar_render("<h1>Hello</h1>")
end
@[ARTA::Post("/update")]
def update : ATH::Response
datastar_broadcast("my-topic") do |sse|
sse.patch_elements("<div id='content'>Updated!</div>")
end
ATH::Response.new(status: :ok)
endUse Blueprint components with Datastar:
require "datastar"
require "datastar/adapters/blueprint"
class GreetingCard
include Blueprint::HTML
def initialize(@name : String); end
def blueprint
div id: "greeting" do
h1 { "Hello, #{@name}!" }
end
end
end
sse.patch_elements(GreetingCard.new("World"))Use Datastar::RequestDetection to tell whether a request came from Datastar:
request = HTTP::Request.new("GET", "/?datastar=%7B%7D")
Datastar.datastar_request?(request) # => trueThe Athena adapter exposes the same helper:
if datastar_request?(request)
datastar_render("<div>Datastar response</div>")
else
datastar_render("<html>Full page</html>")
endImplement Datastar::Renderable:
class MyComponent
include Datastar::Renderable
def initialize(@title : String); end
def to_datastar_html : String
%(<h1>#{@title}</h1>)
end
end
sse.patch_elements(MyComponent.new("Hello"))Enable real-time synchronization across multiple browser sessions. When one client makes a change, all clients subscribed to the same topic receive updates automatically.
require "datastar/pubsub"
# Configure at app startup
Datastar::PubSub.configure
# With lifecycle callbacks
Datastar::PubSub.configure do |config|
config.on_subscribe do |topic, conn_id|
Log.info { "Client #{conn_id} joined #{topic}" }
end
config.on_unsubscribe do |topic, conn_id|
Log.info { "Client #{conn_id} left #{topic}" }
end
endget "/subscribe/:list_id" do |env|
list_id = env.params.url["list_id"]
env.datastar_stream do |sse|
# Subscribe to receive broadcasts for this list
sse.subscribe("todos:#{list_id}")
# Send initial state (fragment includes its own ID)
sse.patch_elements(render_todos(list_id))
# Connection stays open, broadcasts arrive automatically
end
endpost "/todos/:list_id" do |env|
list_id = env.params.url["list_id"]
todo = create_todo(env.params.json)
# All subscribed clients receive this update
Datastar::PubSub.broadcast("todos:#{list_id}") do |sse|
sse.patch_elements(render_todos(list_id))
end
env.response.status_code = 201
endFor multi-server deployments, implement a custom backend:
class RedisBackend < Datastar::PubSub::Backend
def initialize(@redis : Redis::PooledClient)
end
def publish(topic : String, payload : String) : Nil
@redis.publish("datastar:#{topic}", payload)
end
def subscribe(topic : String, &block : String ->) : String
id = UUID.random.to_s
spawn do
@redis.subscribe("datastar:#{topic}") do |on|
on.message { |_, msg| block.call(msg) }
end
end
id
end
def unsubscribe(subscription_id : String) : Nil
# Cancel the subscription fiber
end
end
Datastar::PubSub.configure(backend: RedisBackend.new(redis))Datastar.configure do |config|
config.heartbeat = 5.seconds
config.on_error = ->(ex : Exception) { Log.error { ex.message } }
endsse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: 10.seconds)
sse = Datastar::ServerSentEventGenerator.new(request, response, heartbeat: false)PRs accepted.
- Fork it (https://github.com/watzon/datastar.cr/fork)
- Create your feature branch (
git checkout -b my-new-feature) - Write tests for your changes
- Ensure all tests pass (
crystal spec) - Format your code (
crystal tool format) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request
See the Datastar documentation for more information about the protocol.
MIT © Chris Watson