MictlanX Client is a Python SDK for a decentralized, router-backed object storage system.
It lets you PUT/GET large objects reliably across a pool of storage peers through one or more routers, handling chunked transfers, concurrency, retries, and integrity checks for you.
Highlights
- Chunked uploads/downloads for large files with bounded memory use.
- Concurrent I/O for higher throughput.
- Router-aware: work with one or many routers; easy to rotate or fail over.
- End-to-end integrity: SHA-256 checksum verification on writes/reads.
- Metadata & tags: attach and query key/value tags per object (ball) and bucket.
- Client-side caching to speed up repeated reads.
- Simple API & examples to get productive fast.
MictlanX Client targets object storage use cases (store, fetch, list, and replicate objects).
It’s alpha software—interfaces may evolve between minor versions.
- Prerequisites
- Conceptual Architecture
- Getting started
- Project Structure
- Keypair generation (Optional)
- Contributing
- License
- Contact
You must meet the prerequisites to run successfully the MictlanX Client:
-
Clone this repository to run the examples.
git clone git@github.com:jub-ecosystem/mictlanx-client.git && cd mictlanx-client
-
Install poetry
pip3 install poetry
-
Installing dependencies using the following command:
poetry shell # Start the virtualenv poetry install # properly install the dependencies
-
You should create a folder to save the client's log, the default path is at
/mictlanx/client:export CLIENT_LOG_PATH=/mictlanx/client sudo mkdir -p $CLIENT_LOG_PATH && sudo chmod 774 -R $CLIENT_LOG_PATH && sudo chown $USER:$USER $CLIENT_LOG_PATH
⚠️ Make sure yo assign the right permissions -
Deploy a peer (standalone version)
chmod +x ./deploy_peer.sh && ./deploy_peer.sh -
Deploy a router + replica manager + 2 peers:
chmod +x ./deploy_router.sh && ./deploy_router.sh
MictlanX exposes three progressively higher-level ways to talk to the storage layer:
-
Peer – talk directly to a storage peer. You create metadata and then upload the bytes. You can also fetch metadata and download the balls.
-
Router – talk to a router that manages one or more peers for you. The router handles placement, replication, and lookups.
-
MictlanX Client – a high-level client with retries, backoff, load balancing across routers, chunking, integrity checks, and a in-memory cache.
In MictlanX, every payload you store—whether it starts as a memoryview, a file, or an object—is first segmented into fixed-size chunks (e.g., 256KB, 1MB). Each chunk is indexed (0..N-1) so it can be transmitted, retried, and verified independently. These chunks are then assembled into a “ball”, which is the logical object the system manages. A ball is identified by the pair (bucket_id, ball_id) and carries both metadata (size, checksum, content type, tags like fullname, extension, num_chunks, full_checksum) and data (the ordered set of chunks). Individual chunk keys follow the convention {ball_id}_{index} —for example, b1_0, b1_1, …—which makes random access and streaming straightforward.
Multiple balls live inside a bucket, which serves as the namespace and lifecycle boundary for related data. Buckets then reside within a Virtual Storage Space (VSS)—the execution domain that maps and replicates balls across storage peers. This layered design (chunks → balls → bucket → VSS) lets the client pipeline large objects efficiently, recover from partial failures with fine-grained retries, verify integrity end-to-end via checksums, and scale reads/writes across healthy peers without changing application-level semantics.
Use this when you want fine-grained control or are testing a single node.
Assuming you have a compose file named mictlanx-peer.yml that brings up two peers on localhost:24000 and localhost:24001:
docker compose -f mictlanx-peer.yml up -d
# quick health checks (optional)
curl -fsS http://localhost:24000/health
curl -fsS http://localhost:24001/health
# or
chmod +x ./deploy_peer.sh && ./deploy_peer.sh
from mictlanx.services import AsyncPeer
peer = AsyncPeer(
peer_id = "mictlanx-peer-0",
ip_addr = "localhost",
port = 24000,
protocol = "http",
api_version = 4,
)This points the client at a single peer at localhost:24000.
import hashlib
bucket_id = "mictlanx" # logical namespace
ball_id = "b1"
key = "hello-object" # your logical object name
body = b"Hello from AsyncPeer"
checksum = hashlib.sha256(body).hexdigest() # integrity guardThis register the ball metadata (size, checksum, tags)
meta_res = await peer.put_metadata(
key = key,
size = len(body),
checksum = checksum,
producer_id = "client-0",
content_type = "text/plain",
ball_id = ball_id,
bucket_id = bucket_id,
tags = {"fullname": "hello.txt", "extension": "txt"},
)
if meta_res.is_err:
print("PUT_METADATA failed:", meta_res.unwrap_err())
return
task_id = meta_res.unwrap().task_id
print("task_id:", task_id)This example is implemented in examples/peer/01_put_metadata.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/peer/01_put_metadata.py --bucket_id mictlanx --ball_id b1 --key hello-objectTwo-step PUT lets the server validate your metadata (size, checksum) before accepting data.
data_res = await peer.put_data(
task_id=task_id,
key=key,
value=body,
content_type="text/plain",
)
if data_res.is_err:
print("PUT data failed:", data_res.unwrap_err())
return
print("PUT completed")This example is implemented in examples/peer/02_put_data.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/peer/02_put_data.py --task_id=t-sdfF3f124f --key hello-object--task_id with the returned task_id from the put_metedata.
After you’ve uploaded a ball (PUT step 1-2), you typically consume it in two moves:
-
Fetch metadata (HEAD-equivalent): size, checksum, content-type, tags, etc.
-
Fetch data (GET): read bytes (optionally in chunks), then use them (write to file, print, process).
metadata = await peer.get_metadata(bucket_id=bucket_id, key=key)
data_res = await peer.get_streaming(bucket_id=bucket_id, key=key)
# body = data_res.dataThis example is implemented in examples/peer/03_get.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/peer/03_get.py --bucket_id mictlanx --key hello-objectThe figure shows how objects (balls) are represented inside the system:
- Metadata: A lightweight description of the ball. Think of it as the label on the ball.
- Data: The actual bytes of the ball. Can be streamed in ranges/chunks.
- Ball: A combination of Metadata + Data. This is the storage unit that lives in a bucket. Each ball is uniquely identified by its
(bucket_id, ball_id)tuple. - Bucket: A logical namespace that groups many balls together. Equivalent to a folder or container, identified by
bucket_id.
Idea: many balls (objects) can share the same ball_id. That groups them into one logical unit (a set, a timeline, a version, etc.). Each ball still has its own key, metadata, and data.
A bucket stores many balls. If you reuse the same ball_id across several balls, they become a group:
- Each ball has its own
key,size,checksum,content_type, andtags. - All balls with the same
(bucket_id, ball_id)belong to the same collection. - You can list/retrieve them by
ball_idto process them together.
Represent a video (or GIF) as N balls that all share ball_id = "video123":
Each ball = one frame:
- Data: The image bytes for that frame.
- Metadata: frame_number(ordering)
This example is implemented in examples/peer/04_put_frames.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/peer/04_put_frames.py --bucket_id videos --ball_id video123 --frames_dir ./examples/data/framesOnce the frames have been stored as balls with the same ball_id, you can retrieve them all together.
The get_by_ball_id call returns the list of balls (frames) that belong to the group.
You then sort them using the frame_number tag in their metadata and stream each frame in sequence.
How it works:
- Query by ball_id →
peer.get_by_ball_id(bucket_id, ball_id)returns all frame-balls. - Order frames → sort them using the
frame_numbertag to preserve sequence. - Stream data → fetch each frame’s bytes with
get_streaming. - Preview → display frames in order with a configurable delay (
--delay_ms) or reconstruct into an animation.
This example is implemented in examples/peer/05_get_frames.py.
You can run it directly from the CLI:
python3 examples/peer/05_get_frames.py --bucket_id videos --ball_id video123 --delay_ms 40Notes: Some peer examples require GUI support to display frames using matplotlib. Even if all Python dependencies are installed, the script may fail to show frames if GUI libraries are missing.
Required packages (optional):
- Tkinter (Python GUI toolkit)
- Other system packages needed for GUI support depending on the environment
Use AsyncRouter when you want the router to choose a healthy peer (load balancing, retries, awareness of cluster state or replication) and operate the system as a Virtual Storage Space (VSS) . The API is intentionally very similar to Peer.
A VSS is the logical boundary formed by one router (or more, for HA) plus a set of storage peers. From the client’s point of view, a VSS behaves like one storage service—even though data is actually placed, replicated, and served by multiple peers behind the router.
What the Router does inside a VSS
-
Peer selection & load-balancing: chooses a target peer for each request based on health and load.
-
Retries & failover: transparently retries on other peers if a node is slow/unavailable.
-
Cluster awareness: tracks which peers hold replicas and where to read/write.
-
Policy hooks: can coordinate replication, placement, or future policies without changing client code.
At the “global” or higher-level view of the system, a VSS is represented as a hexagon. Think of the hexagon as the logical boundary that groups:
-
one (or more) Router(s) in front, and
-
a pool of Storage Peers behind it,
so the VSS behaves like a single storage service to your applications.
Inside that hexagon, the Router decides which peer should handle a request, while Peers store and serve balls inside buckets.
-
PUT: Client → Router → chosen Peer(s). Router validates metadata first (size/checksum) and then streams bytes to the selected peer. Replication can be triggered after the first successful write.
-
GET: Client → Router → best Peer for that ball. Router picks a close/healthy replica and streams the data back to the client.
These examples assume you have a router running locally (e.g., from mictlanx-router.yml) at http://127.0.0.1:60666 and two test peers behind it.
router = AsyncRouter(
router_id = "router-0",
ip_addr = "localhost",
port = 60666,
protocol = "http",
api_version = 4,
http2 = False,
)
bucket_id = "mictlanx"
ball_id = "bx" # logical group
key = "hello-object-router" # this concrete object
body = b"Hello from AsyncRouter"
checksum = hashlib.sha256(body).hexdigest()res = router.put_metadata(
bucket_id = bucket_id,
ball_id = ball_id,
key = key,
size = len(body),
checksum = checksum,
producer_id = "client-0",
content_type = "text/plain",
tags = {"fullname": "hello.txt", "extension": "txt"},
)
if res.is_err:
print("PUT_METADATA failed:", res.unwrap_err())
return
tasks_ids:List[str] = res.unwrap().tasks_idsThis example is implemented in examples/01_router_put_metadata.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/router/01_put_metadata.py --bucket_id bk1 --ball_id b1 --key k1 --rf 1Note: The --rf argument must be provided to indicate how many copies of the object or metadata should be stored across the peers in the distributed system.
Uploads the actual bytes using a task_id from step 2.
body = b"Hello from AsyncRouter" # must match size/checksum sent in metadata
put_data_result = await router.put_data(
task_id = task_id,
key = key,
value = body,
content_type = "text/plain",
)This example is implemented in examples/02_router_put_data.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/router/02_put_data.py --task_id <PASTE_TASK_ID> --key k1AsyncClient is the high-level, batteries-included interface: it talks to one or more Routers, picks a healthy one (load-balancing), streams data in chunks, applies retries + exponential backoff, and verifies integrity with SHA-256. It also provides convenience helpers such as put_file() and get_to_file().
AsyncClient receives a single uri string and internally builds Router objects.
The format is parsed by MictlanXURI.parse() (shown below) and supports one or more routers.
- Scheme:
mictlanx:// - Routers list: comma-separated specs, each as
router_id@host:port. - Global query (applies to all routers):
protocol,api_version,http2. - The first
/or?separates the router list from the global query block.
Canonical form (what MictlanXURI.build() produces):
mictlanx://<router_id@host:port>[,<router_id@host:port>]/?protocol=<http|https>&api_version=<int>&http2=<0|1>- Single local router:
uri = "mictlanx://mictlanx-router-0@localhost:60666/?protocol=http&api_version=4&http2=0"- Two routees HTTPS + HTTP/2
uri = (
"mictlanx://"
"mictlanx-router-0@alpha.tamps.cinvestav.mx:443,"
"mictlanx-router-1@beta.tamps.cinvestav.mx:443"
"/?protocol=https&api_version=4&http2=1"
)- Programmatically build a URI from
AsyncRouterinstances:
from mictlanx.services import AsyncRouter
from mictlanx.utils.uri import MictlanXURI
routers = [
AsyncRouter(router_id="mictlanx-router-0", ip_addr="localhost", port=60666, protocol="http", api_version=4),
AsyncRouter(router_id="mictlanx-router-1", ip_addr="localhost", port=60667, protocol="http", api_version=4),
]
uri = MictlanXURI.build(routers) # -> "mictlanx://mictlanx-router-0@localhost:60666,.../?protocol=http&api_version=4&http2=0"ValueError: no routers, you likely left an empty entry (e.g. trailing comma) or didn’t pass any routers before the query.
How to deploy it:
chmod +x ./deploy_router.sh && ./deploy_router.shimport asyncio
from mictlanx import AsyncClient
async def main():
uri = "mictlanx://mictlanx-router-0@localhost:60666/?protocol=http&api_version=4&http2=0"
client = AsyncClient(
uri = uri,
client_id = "client-0",
debug = True, # console DEBUG logs
log_output_path = "/mictlanx/client", # file logs live here (rotated)
eviction_policy = "LRU",
capacity_storage = "1GB",
verify = False # set True or a CA bundle path for HTTPS
)
asyncio.run(main())The client cuts your payload into chunks, uploads them in parallel with retries, and stores the checksum in the object’s metadata for integrity verification later.
client.put(bucket_id=bucket_id, key=key, value = data, tags ={},chunk_size = "1KB", )This example is implemented in examples/client/01_put.py.
You can run it directly from the CLI with your chosen arguments:
python3 examples/client/01_put.py \
--bucket_id mictlanx \
--ball_id hello-object \
--path ./samples/hello.txt \
--chunk_size 1MB \
--tag content_type=text/plainAsyncClient.get() looks up the object’s metadata to discover how many chunks exist, downloads them in parallel with retries + exponential backoff, reassembles the bytes, and (by default) verifies integrity against the stored SHA-256.
# fetch bytes back into memory
res = await client.get(
bucket_id = bucket_id,
key = key, # the same logical id you used on PUT
chunk_size = "1MB", # request size hint; peer may adjust
max_retries = 8,
max_paralell_gets = 8, # parallel chunk downloads
)
if res.is_ok:
out = res.unwrap() # AsyncGetResponse
data = out.data.tobytes() # assembled bytes
metas = out.metadatas # per-chunk metadata (list)
print("got", len(data), "bytes")
else:
print("GET failed:", res.unwrap_err())If you prefer to write directly to disk (streamed, ordered), use get_to_file():
path_res = await client.get_to_file(
bucket_id = bucket_id,
ball_id = key, # same value as `key` above
output_path = "./downloads",
fullname = "hello.txt", # optional; defaults from tags if present
chunk_size = "1MB",
)
if path_res.is_ok:
print("saved to:", path_res.unwrap())
else:
print("GET->file failed:", path_res.unwrap_err())This example is implemented in examples/client/02_get.py.
Run it from the CLI (works with the local stack started by deploy_router.sh):
# bytes into memory (prints size)
python3 examples/client/02_get.py \
--bucket_id mictlanx \
--ball_id hello-object \
--chunk_size 1MB
# stream directly to a file
python3 examples/client/02_get.py \
--bucket_id mictlanx \
--ball_id hello-object \
--to_file \
--outdir ./downloads \
--chunk_size 1MB--key must match the logical id you used on PUT.
.
├── CHANGELOG.md # Release notes (latest tracked: 0.1.0a0)
├── CODE_OF_CONDUCT.md # Community standards
├── CONTRIBUTING.md # How to contribute
├── Dockerfile # Base image for building/running the client
├── LICENSE # MIT
├── README.md # This document
├── SECURITY.md # Security reporting policy
├── build.sh # Local build helper (wheel/sdist, etc.)
├── clean_dist.sh # Clean dist artifacts
├── deploy_peer.sh # Helper to run a peer with env/flags (local/dev)
├── deploy_router.sh # Helper to run a router with env/flags (local/dev)
├── docs/ # MkDocs site (source)
│ ├── api.md
│ ├── architecture.md
│ ├── assets/
│ ├── getting-started.md
│ ├── index.md
│ └── prerequisites.md
├── examples/ # Minimal, runnable examples
│ ├── client/ # AsyncClient examples (put/get, files, metadata)
│ ├── data/ # Sample assets for examples
│ ├── peer/ # AsyncPeer examples (direct to peer)
│ ├── router/ # AsyncRouter examples (LB, retries)
│ └── use-cases/ # Higher-level scenarios
├── mictlanx/ # Python package (flattened layout)
│ ├── __init__.py
│ ├── apac/ # (Reserved) Availability/Policy/Control modules
│ ├── asyncx/ # Async helpers: load balancer, async utils
│ ├── caching/ # CacheFactory & in-memory stores (LRU, etc.)
│ ├── errors/ # Typed exceptions & error helpers
│ ├── interfaces/ # Typed dataclasses/DTOs (Peer, Router, Metadata…)
│ ├── ipc/ # Client<->service contracts & envelopes (if any)
│ ├── logger/ # Structured logging utilities
│ ├── retry/ # RetryPolicy, raf() (exponential backoff/jitter)
│ ├── services/ # AsyncPeer / AsyncRouter client implementations
│ ├── types/ # Common type aliases (e.g. VerifyType)
│ └── utils/ # Utilities (URI parser, chunking, compression, etc.)
├── mictlanx-peer.yml # Docker Compose: 1+ peers (localhost)
├── mictlanx-router.yml # Docker Compose: 1 router + 2 peers (localhost)
├── mkdocs.yml # Docs site config (MkDocs Material, nav, theme)
├── poetry.lock # Locked deps (Poetry)
├── publish.sh # Publish helper (testpypi/pypi)
├── pull_request_template.md # PR checklist/template
├── pyproject.toml # Project metadata & dependencies
├── requirements.txt # Runtime deps (pip install -r)
├── run_docs.sh # Serve docs locally (mkdocs serve)
├── scripts/
│ └── gen_ref_pages.py # Script to generate API ref pages into docs/
└── tests/ # Pytest suite (unit/integration)
├── test_chunks.py # Chunking/segmentation tests
├── test_compression.py # Compression roundtrips
├── test_delete.py # Delete-by-key / delete-ball flows
├── test_get_async.py # Async GET streaming & integrity checks
├── test_get_bucket.py # Bucket metadata aggregation
├── test_get_metadata.py # Single object metadata fetch
├── test_peer.py # AsyncPeer smoke tests
├── test_put.py # PUT (bytes/file/chunks)
├── test_raf.py # Retry-After-Function helpers
├── test_router.py # AsyncRouter APIs
├── test_router_lb.py # Router LB selection behavior
├── test_summoner.py # (Reserved/module tests)
├── test_uri.py # MictlanXURI parsing/building
└── test_utils.py # Generic utilities
To create tour keypair you can use the next code, but take into account that the default output path of the keypair generator is mictlanx/xolo/.keys you can change it if assign a different value to the env XOLO_SECRET_PATH for example:
export XOLO_SECRET_PATH = /another/pathfrom xolo.utils.utils import Utils as XoloUtils
bob_keypair = XoloUtils.X25519_key_pair_generator("bob")
# in another computer
alice_keypair = XoloUtils.X25519_key_pair_generator("alice")Now you can create a shared secret and use it to encrypt the data:
bob_priv = XoloUtils.load_private_key(filename="bob").unwrap()
alice_pub = XoloUtils.load_public_key(filename="alice").unwrap()
shared_key = bob_priv.exchange(peer_public_key=alice_pub)Contributions are what make the open source community such an amazing place to learn, inspire, and create. Any contributions you make are greatly appreciated.
If you have a suggestion that would make this better, please fork the repo and create a pull request. You can also simply open an issue with the tag "enhancement". Don't forget to give the project a star! Thanks again!
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature) - Commit your Changes (
git commit -m 'Add some AmazingFeature') - Push to the Branch (
git push origin feature/AmazingFeature) - Open a Pull Request
Distributed under the MIT License. See LICENSE.txt for more information.
Ignacio Castillo - @NachoCastillo - jesus.castillo.b@cinvestav.mx