The basis of this library was written the old-fashioned way in 2023 as modules within another project. I adulterated them with Claude in 2025, inside another toy project, and the first upload of it as a library, including the initial version of this README, is the result of my asking Claude Sonnet to extract it into a library. As of this writing, I haven't even checked if it works.
A comprehensive Elixir client library for interacting with Corrosion database clusters.
Corrosion is a SQLite-based distributed database that provides strong consistency with multi-master replication. This library provides a complete interface for working with Corrosion clusters from Elixir applications.
- Full SQL Support - Execute arbitrary queries and transactions with parameterization
- Cluster Operations - Inspect cluster members, peers, and system state
- Real-time Subscriptions - Stream database changes with robust reconnection logic
- Connection Pooling - Built on Req/Finch with configurable pooling and timeouts
- Comprehensive Error Handling - Detailed error reporting and graceful degradation
- Zero Phoenix Dependencies - Pure OTP/GenServer implementation
# Connect to a Corrosion node
{:ok, conn} = CorroClient.connect("http://localhost:8081")
# Execute queries with parameters
{:ok, users} = CorroClient.query(conn, "SELECT * FROM users WHERE active = ?", [true])
# With named parameters
{:ok, users} = CorroClient.query(conn, "SELECT * FROM users WHERE id = :id", %{id: 42})
# Execute atomic transactions
{:ok, _} = CorroClient.transaction(conn, [
"INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com')",
{"UPDATE stats SET user_count = user_count + 1", []},
{"INSERT INTO audit_log (user_id, action) VALUES (:id, :action)", %{id: 1, action: "created"}}
])
# Get cluster information
{:ok, cluster_info} = CorroClient.get_cluster_info(conn)
IO.puts("Active nodes: #{cluster_info.total_active_nodes}")
# Subscribe to real-time changes
{:ok, sub_pid} = CorroClient.subscribe(conn, "SELECT * FROM messages ORDER BY created_at DESC",
on_event: fn
{:new_row, msg} -> IO.puts("New message: #{msg["content"]}")
{:change, "UPDATE", msg} -> IO.puts("Updated: #{msg["content"]}")
event -> IO.inspect(event)
end,
on_connect: fn watch_id -> IO.puts("Subscription connected: #{watch_id}") end
)Both /v1/queries and /v1/transactions use Corrosion's Statement enum, so
the client accepts every format supported by the API:
- Simple string:
"SELECT * FROM users" - Positional params:
{ "SELECT * FROM users WHERE id = ?", [123] } - Named params:
{ "SELECT * FROM users WHERE id = :id", %{id: 123} } - Verbose map:
%{query: "SELECT * FROM users", params: [...], named_params: %{...}}
Transactions take a list of statements and you can mix formats freely:
statements = [
"INSERT INTO users (name) VALUES ('Alice')",
{"UPDATE stats SET count = count + 1", []},
{"INSERT INTO audit (user_id) VALUES (:id)", %{id: 1}},
%{query: "INSERT INTO log(message) VALUES (?)", params: ["created"]}
]
CorroClient.transaction(conn, statements)Subscriptions accept the same statement formats:
CorroClient.subscribe(conn,
{"SELECT * FROM messages WHERE channel_id = :channel", %{channel: 42}},
on_event: &handle_message/1
)Add corro_client to your list of dependencies in mix.exs:
def deps do
[
{:corro_client, "~> 0.1.0"}
]
end# Simple connection
conn = CorroClient.connect("http://localhost:8081")
# With custom timeouts
conn = CorroClient.connect("http://localhost:8081",
receive_timeout: 10_000
)CorroClient uses Req (built on Finch) for HTTP transport. You can configure connection pooling:
conn = CorroClient.connect("http://localhost:8081",
finch_options: [
pool_timeout: 5_000,
pool_max_idle_time: 30_000,
pool_size: 10
]
)For cluster deployments, create separate connections to different nodes:
nodes = [
CorroClient.connect("http://node1:8081"),
CorroClient.connect("http://node2:8081"),
CorroClient.connect("http://node3:8081")
]
# Round-robin queries across nodes
conn = Enum.random(nodes)
{:ok, result} = CorroClient.query(conn, "SELECT * FROM users")CorroClient.connect/2- Create connection to a Corrosion nodeCorroClient.ping/1- Test connectivityCorroClient.query/3- Execute SQL queries with optional parametersCorroClient.transaction/2- Execute atomic transactions
CorroClient.get_cluster_info/1- Comprehensive cluster stateCorroClient.get_cluster_members/1- Active cluster membersCorroClient.get_tracked_peers/1- Replication peer statusCorroClient.get_database_info/1- Schema and table information
CorroClient.subscribe/3- Start streaming subscriptionCorroClient.subscription_status/1- Check subscription healthCorroClient.restart_subscription/1- Manually restart subscriptionCorroClient.stop_subscription/1- Stop subscription
Subscriptions receive these event types via the :on_event callback:
{:subscription_ready}- Initial data loaded, now streaming live changes{:columns_received, columns}- Column information received{:initial_row, row_data}- Initial data row (during startup){:new_row, row_data}- New row inserted{:change, change_type, row_data}- Row updated ("UPDATE", "DELETE", etc.)
The library provides comprehensive error handling:
case CorroClient.query(conn, "SELECT * FROM users") do
{:ok, users} ->
# Process results
IO.puts("Found #{length(users)} users")
{:error, {:connection_error, reason}} ->
# Network/connectivity issues
Logger.error("Connection failed: #{inspect(reason)}")
{:error, {:http_error, status, body}} ->
# HTTP-level errors (4xx, 5xx)
Logger.error("HTTP #{status}: #{inspect(body)}")
endSubscriptions include automatic reconnection with exponential backoff:
{:ok, pid} = CorroClient.subscribe(conn, "SELECT * FROM events",
on_event: &handle_event/1,
on_error: fn error -> Logger.error("Subscription error: #{inspect(error)}") end,
max_reconnect_attempts: 10,
reconnect_delays: [1_000, 2_000, 5_000, 10_000, 30_000]
)CorroClient is built with clean separation of concerns:
CorroClient.Client- Low-level HTTP transport and SQL executionCorroClient.Cluster- High-level cluster operations and data parsingCorroClient.Subscriber- Real-time streaming with reconnection logic
The main CorroClient module provides a unified interface that delegates to these specialized modules. You can also use the individual modules directly for more control.
Run the test suite:
mix testThe tests include unit tests for parsing logic, connection management, and subscription handling. Integration tests require a running Corrosion instance.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes with tests
- Run
mix testandmix format - Commit your changes (
git commit -am 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.