A general communication framework for Massive GPU Infrastructure.
MGI encompasses a set of efficient communication optimizations for performing data processing tasks, e.g., Exchange, GroupBy, and Join, across many GPUs. These optimizations include:
- Pipelining
- Batching
- Multi-path
- NIC-direct
All these techniques are automatically applied behind an intuitive, on-device API.
include/control_plane/
├── gig/ # GIG: Vertex / Edge / Gig,
│ # Path / PathSet / concat,
│ # Edmonds-Karp + decomposition,
│ # FwdTable + §4.4 merge
├── agent/ # local_discover (NVML/hwloc/sysfs)
│ # + Agent runtime
├── controller/ # Controller process + 3 modules
│ ├── td/ # Topology Discovery (assemble v_net)
│ ├── to/ # Transfer Optimizer (Fig 6 EK + cache)
│ └── cc/ # Channel Creation (build FwdTables)
└── common/ # codec (length-prefix framing),
# messages (8 opcodes),
# tcp (server + client base)
src/control_plane/
├── CMakeLists.txt # standalone project; ctest labels
├── gig/, agent/, controller/, common/ # impls of the headers above
└── tests/
├── test_unit.cpp # CPU-only algorithms + codec
├── test_node.cpp # 1-node loopback e2e
├── test_integration.cu # CP → multisession_exchange
├── test_integration_onetothree.cu # CP → onetothree_exchange (relays)
└── cluster_smoke.sh # 2-node ssh-orchestrated e2e
include/data_plane/ # library only (no test fixtures)
├── config.cuh, common.cuh, on_path_helper.cuh # foundations & shared helpers
├── api/ # device-side API (core.cuh)
├── infra/ # endpoint, kbuffering, semaphore
├── channel/ # ChannelRuntime, VirtualChannel,
│ # PhysicalChannel, PathScheduler,
│ # deadlock monitor
├── runtime/ # host_worker_runtime, flow scheduler
├── transport/ # ICopyOp backends: NVLink direct,
│ # NVLink multipath, PCIe, datapath
├── network/ # inter-node RDMA (UCX, multinode)
├── legacy/ # pre-channel-refactor code (kept
│ # for reference / comparison)
└── backup/ # archived host_worker variant
src/data_plane/
├── CMakeLists.txt
└── tests/ # all test fixtures + drivers
├── scenarios/ # exchange/groupby/multisession
│ # orchestration (was in include/)
├── patterns/ # p2p direct/multipath, 1-to-M,
│ # M-to-1 patterns (was in include/)
├── schema/ # TPC-H row types (was in include/)
├── test_exchange_without_worker.cu
├── test_multisession_exchange.cu
├── test_one_to_three_multipath.cu
├── test_tpch_query{3,5,10,12,14,19}.cu
└── tpch_queries/ # per-query kernel implementations
The control plane implements Section 4 of the MGI paper: a central Controller + a per-host Agent that together discover the cluster topology (the GIG — GPU Infrastructure Graph), plan max-flow paths between GPU endpoints, and ship per-GPU forwarding tables to the data plane.
┌─────────────────────────────────────────────────────────────────┐
│ Controller process (one per cluster, port 20000) │
│ Topology Discovery (TD) merges agent subgraphs + adds v_net│
│ Transfer Optimizer (TO) hierarchical Edmonds-Karp + cache │
│ Channel Creation (CC) build per-GPU forwarding tables │
└──────────────┬──────────────────────────────┬──────────────────┘
│ TCP frames │ TCP frames
▼ ▼
┌──────────────────────────┐ ┌──────────────────────────┐
│ Agent process on host A │ │ Agent process on host B │
│ local_discover ─ §4.2 │ │ local_discover ─ §4.2 │
│ NVML / hwloc / sysfs │ │ NVML / hwloc / sysfs │
│ inbound listener │ │ inbound listener │
│ receives FwdTables │ │ receives FwdTables │
│ fires on_fwd_table │ │ fires on_fwd_table │
└──────────────────────────┘ └──────────────────────────┘
Wire protocol (include/control_plane/common/messages.h): three core opcodes drive everything. Length-prefix framing + 1 B version + 1 B opcode + payload.
| Direction | Opcode | Carries |
|---|---|---|
| Agent → Controller | kOpReportSubgraph |
local GIG subgraph + GDR signals + push port |
| Agent → Controller | kOpCreateChannel |
channel id + src GPU vertex ids + dst vertex ids |
| Controller → Agent | kOpDeliverFwdTable |
channel id + per-owner-GPU FwdTables slice |
Plus kOpDeleteChannel, kOpHeartbeat, kOpAck, kOpError for lifecycle.
Forwarding table semantics (GPU-only, see gig/forwarding.h): each owner GPU's table maps dst_gpu → list of (next_gpu, [physical paths]) bins. The next_gpu is always another GPU; CPU/NIC/v_net hops live inside the subpath of each path so the data plane can replay the bounce (e.g. gpu0 → cpu3 → ibp3s0 → v_net → remote_nic → remote_cpu → remote_gpu for a non-GDR cross-server transfer).
Topology detection rules (Section 4.2, agent/local_discover.cu):
- NVSwitch / direct NVLink: distinguished by
nvmlDeviceGetNvLinkRemoteDeviceTypeper link - NIC ↔ GPU direct edge: emitted only when (a) NIC is RDMA-capable (
/sys/class/infiniband/*binds same PCI BDF), (b) NIC and GPU share a PCIe-switch ancestor in/sys/bus/pci/devices/<bdf>/../, AND (c) admin opts into GDR viaMGI_GDR_FORCE=on - NUMA mesh: SLIT distance matrix scales
xpi_base_bw_gbpsper pair (default 50 GB/s); no QPI clock heuristics
The data plane is built on top of channels. A channel holds the K-buffering slots, path scheduler, and virtual-channel state for a pair of GPUs, and is reused across successive data transfers — once constructed, the same runtime carries multiple transfers (or multiple sessions with different tuple types) without tearing down and re-creating the underlying buffers, streams, or RDMA registrations.
┌────────────────────────────────────────────────────────────────┐
│ Application layer (scenarios/*.cuh, user kernels) │
│ device-side calls only: │
│ mgi::send_direct() write tuples → send buf │
│ mgi::recv_direct_drain() drain a recv buffer │
│ mgi::flush_direct_nb() flush partial send buffer │
│ mgi::eof_send_direct_nb() mark all send slots as EOF │
└──────────────────────────┬─────────────────────────────────────┘
│ shared-memory K-buffers (GDR)
┌──────────────────────────▼─────────────────────────────────────┐
│ Channel layer (channel/channel_runtime.cuh) │
│ ChannelRuntime<GRID_SIZE, TupleType> │
│ ├─ VirtualChannel[dst] N physical paths per dest │
│ │ IPathScheduler (WeightedRR / LeastLoad / custom) │
│ │ └─ IPhysicalChannel → ICopyOp backend │
│ │ NVLink (P2PMemcpyOp) │
│ │ PCIe bounce (PcieBounceSyncOp) │
│ │ Multi-hop relay (P2PRelayOp) │
│ ├─ host worker thread (runtime/host_worker_runtime.cuh) │
│ │ polls send buffers → dispatches to VirtualChannel │
│ └─ deadlock monitor thread (channel/deadlock.cuh) │
│ watches WorkerProgress; aborts on stall │
└────────────────────────────────────────────────────────────────┘
Application kernels never touch channel lifecycle. ChannelRuntime owns everything from init() to finalize(); kernels only call the device-side primitives.
| Call | Return | Purpose |
|---|---|---|
mgi::send_direct<T, INTRA, INTER>(ptr, n) |
MGI_STATUS_SUCCESS / FAILED |
Scatter n tuples into per-partition send slots. FAILED = buffer full, retry. |
mgi::recv_direct_drain<T>(fn) |
SUCCESS / FAILED / EOF |
Drain the next ready recv buffer; invokes fn(tuple) per tuple. |
mgi::recv_direct_self_drain<T>(fn) |
SUCCESS / FAILED |
Drain the self-partition slot in the send buffer (fast path, no copy). |
mgi::send_direct_cell<T, INTRA, INTER>(ptr, n) |
SUCCESS / FAILED |
Cell-based send for tuples ≥ 64B (splits into 64B cells). |
mgi::recv_direct_cell<T>(&tuple) |
SUCCESS / FAILED / EOF |
Cell-based receive, reassembles tuple from 64B cells. |
mgi::flush_direct_nb() |
non-zero when done | Non-blocking: flush partial send buffers. |
mgi::eof_send_direct_nb() |
non-zero when done | Non-blocking: mark all send slots MGI_EOF. |
Termination is EOF-driven: kernel flushes → signals EOF → host worker broadcasts MGI_EOF to recv side → receiver's recv_direct_drain returns EOF after K_BUFFERING_K markers → finalize() joins threads.
module load cuda ucx-cuda # CUDA + UCX toolchain
export CUDA_MODULE_LOADING=EAGER
export MGI_NETWORK_IFACES=eno4,ibp67s0,ibp73s0,ibp3s0,ibp9s0
# actual NIC names from `ip -o link show`A single-host run needs ≥ 2 GPUs (typically 4× H100 with NVLink). Multi-host runs need each host reachable from the controller's IP. CMake ≥ 3.31.
The top-level CMakeLists.txt composes both planes plus the integration tests. Each plane is also its own standalone CMake project, so you can build only one when you don't need the other.
# (a) Combined — recommended. Top-level build dir.
cmake -S . -B build
cmake --build build -j
# (b) Combined with a plane disabled (skip whole subprojects).
cmake -S . -B build -DMGI_BUILD_DATA_PLANE=OFF
cmake -S . -B build -DMGI_BUILD_CONTROL_PLANE=OFF
cmake -S . -B build -DMGI_BUILD_INTEGRATION_TEST=OFF
cmake --build build -j
# (c) Standalone control plane only. Build dir under src/control_plane/.
cmake -S src/control_plane -B src/control_plane/build
cmake --build src/control_plane/build -j
# (d) Standalone data plane only. Build dir under src/data_plane/.
cmake -S src/data_plane -B src/data_plane/build
cmake --build src/data_plane/build -jThe combined build (a) drops every binary into build/ under the matching subproject path:
build/
├── src/control_plane/
│ ├── mgi_controller ← controller process
│ ├── mgi_agent ← per-host agent
│ ├── test_cp_unit / test_cp_node
│ └── test_cp_integration / test_cp_integration_onetothree
├── src/data_plane/
│ ├── test_exchange_without_worker
│ ├── test_multisession_exchange
│ ├── test_one_to_three_multipath
│ └── test_tpch_query{3,5,10,12,14,19}
└── test_logs/ ← stdout/stderr per ctest run (see Tests)
To deploy MGI as a service for your own application, launch one Controller on a routable host and one Agent on each GPU host:
# Controller — central node, any host with a routable IP. Listens on 0.0.0.0:20000.
./build/src/control_plane/mgi_controller -a 0.0.0.0 -p 20000
# Agent — every GPU host. Connects to the controller, opens an inbound
# port for DeliverFwdTable pushes.
export MGI_NETWORK_IFACES=eno4,ibp67s0,ibp73s0,ibp3s0,ibp9s0
./build/src/control_plane/mgi_agent \
-c <controller_ip> \
--controller-port 20000 \
-p 20001 # this agent's inbound port (controller dials this)
# Optional: admin opt-in to NIC↔GPU direct edges (only when GDR is verified to work).
MGI_GDR_FORCE=on ./build/src/control_plane/mgi_agent ...Application code links against mgi_cp and uses agent::Agent directly to call create_channel / on_fwd_table (see src/control_plane/tests/test_integration.cu for a worked example).
Each scenario is also a standalone executable (no controller required for self-contained tests):
./build/src/data_plane/test_exchange_without_worker # 64 B all-to-all
./build/src/data_plane/test_one_to_three_multipath # broadcast w/ relays
./build/src/data_plane/test_multisession_exchange # 2-session reuse
./build/src/data_plane/test_tpch_query5 # TPC-H Q5These are the exact same binaries that ctest -L data and -L data_tpch invoke. Output (Throughput, Path statistics, etc.) goes to stdout.
The runtime reads compile-time constants in include/data_plane/config.cuh:
- Multipath —
PCIE_MULTI_PATH_TESTtoggles 1-to-1 multipath - GDR —
GDR_COPYenables gdrcopy paths (default0; seegdr_close_safe) - Launch geometry —
USER_KERNEL_GRID_SIZE,USER_KERNEL_BLOCK_SIZE,WORKER_BLOCK_SIZE,WORKER_GRID_SIZE - K-buffering —
K_BUFFERING_K,KBUFFERING_INTRA_PARTITION_SIZE,KBUFFERING_INTER_PARTITION_SIZE
Keep USER_KERNEL_GRID_SIZE == WORKER_GRID_SIZE for exchange_without_worker.
The whole test suite is driven from one CMake build. Configure once at the top level (or per-plane), build, then ctest from the build dir. Tests are split into labels so you can run just the ones you need.
module load cuda ucx-cuda
cmake -S . -B build
cmake --build build -j
cd build
# everything that's runnable in this env:
ctest
# subsets:
ctest -L unit # control-plane algorithms (no GPU, no network)
ctest -L node # control-plane single-node e2e (1 GPU host)
ctest -L cluster # control-plane two-host e2e (needs MGI_TEST_HOSTS)
ctest -L data # data-plane scenarios (4 GPU NVLink)
ctest -L data_tpch # data-plane TPC-H queries (heavier)
ctest -L integration # combined: CP-derived FwdTable -> DP scenario
# one specific test:
ctest -R dp_test_one_to_three_multipath --output-on-failure
ctest -R cp_integration_onetothree --output-on-failurectest --output-on-failure shows the test's stdout when it fails; pass -V to always show it. The runner also tees every test's stdout/stderr into a per-test file under build/test_logs/<test_name>.log (overwritten on each ctest run), so you can grep/diff logs after the fact:
ls build/test_logs/
# cp_unit.log
# cp_node.log
# cp_integration.log
# dp_test_exchange_without_worker.log
# dp_test_multisession_exchange.log
# dp_test_tpch_query5.log
# ...
grep -H Throughput build/test_logs/dp_test_*.logCUDA_MODULE_LOADING=EAGER is set automatically by every ctest entry. Set MGI_NETWORK_IFACES=eno4,ibp67s0,... in your shell so local_discover knows which NICs to probe.
| Test | Source | Coverage |
|---|---|---|
cp_unit |
src/control_plane/tests/test_unit.cpp |
9 cases on the gig algorithms (max-flow + path decomposition + concat + §4.4 forwarding-table merge), assemble_global v_net fanout, TransferOptimizer cache reuse, and codec round-trips. ~50 ms, no GPU, no network. |
| Test | Source | Coverage |
|---|---|---|
cp_node |
src/control_plane/tests/test_node.cpp |
Spawns Controller + Agent in one process on TCP loopback, runs real discover_local() (NVML + hwloc + sysfs), drives an all-to-all create_channel for the host's GPUs, and waits for DeliverFwdTable push to fire the on-callback. ~3 s. |
| Test | Source | Coverage |
|---|---|---|
cp_cluster |
src/control_plane/tests/cluster_smoke.sh |
Started on HOST_A: launches Controller + Agent_A locally with --exchange-channel 1234 --exchange-delay-ms 1500, ssh-launches Agent_B on HOST_B, polls the controller log for two reported subgraph lines and one pushed DeliverFwdTable channel=1234 line. Skipped (exit 0) if MGI_TEST_HOSTS=HOST_A,HOST_B isn't set. ~17 s. |
All run on a 4-GPU NVLink full mesh; each tests a different communication pattern.
| Test | Source | Coverage |
|---|---|---|
dp_test_exchange_without_worker |
src/data_plane/tests/test_exchange_without_worker.cu |
64 B all-to-all exchange (no host worker thread). The simplest sanity check that ChannelRuntime + send_direct + recv_direct round-trip on every (src, dst) pair. ~22 s. |
dp_test_multisession_exchange |
src/data_plane/tests/test_multisession_exchange.cu |
Channel reuse across two sessions: 64 B (LineitemRow) then 256 B (SupplierRow256B, cell path). Same Channel/Endpoint/host-worker carries both. ~5 min, prints aggregate Throughput per session (~1800 GB/s on H100×4). |
dp_test_multisession_join |
src/data_plane/tests/test_multisession_join.cu |
Channel reuse across two sessions: 64 B build (LineitemRow) then 16 B probe (JoinOrderTuple) with bitmap hit. Same shape as multisession_exchange but the second session is a hash join probe. |
dp_test_p2p_chain_direct |
src/data_plane/tests/test_p2p_chain_direct.cu |
Single-direction GPU→GPU chain transfer over NVLink. Stress-tests the direct path without aggregation. ~27 s. |
dp_test_one_to_three |
src/data_plane/tests/test_one_to_three.cu |
One sender broadcasts to three receivers, single direct NVLink path per (sender, dst). Baseline for the multipath variant. |
dp_test_one_to_three_multipath |
src/data_plane/tests/test_one_to_three_multipath.cu |
Same one-to-three pattern but with relay paths: one direct NVLink + N-2 relays through every intermediate GPU per receiver, scheduled by the WeightedRR path scheduler. Demonstrates P2PRelayOp. ~16 s, ~353 GB/s sender throughput. |
dp_test_group_by_direct |
src/data_plane/tests/test_group_by_direct.cu |
All-to-all exchange + per-block reduction (a one-stage GroupBy). Validates the recv_direct_drain callback path. |
dp_test_p2p_aggregation_direct |
src/data_plane/tests/test_p2p_aggregation_direct.cu |
Many-to-one aggregation over a single direct NVLink per source. |
dp_test_p2p_aggregation_multipath |
src/data_plane/tests/test_p2p_aggregation_multipath.cu |
Many-to-one aggregation, multipath variant: each source uses direct + relays. |
Each query generates synthetic TPC-H tables in-memory (no external dataset needed) and runs a multi-GPU plan over MGI's exchange / join / group-by primitives.
| Test | Source | Query |
|---|---|---|
dp_test_tpch_query3 |
src/data_plane/tests/test_tpch_query3.cu |
Q3: shipping priority (lineitem ⨝ orders ⨝ customer with date filter, group by orderkey + sum revenue). ~50 s. |
dp_test_tpch_query5 |
src/data_plane/tests/test_tpch_query5.cu |
Q5: revenue by nation (lineitem ⨝ orders ⨝ customer ⨝ supplier ⨝ nation ⨝ region). ~50 s. |
dp_test_tpch_query10 |
src/data_plane/tests/test_tpch_query10.cu |
Q10: returned items report (lineitem ⨝ orders ⨝ customer ⨝ nation, group by customer + nation). |
dp_test_tpch_query12 |
src/data_plane/tests/test_tpch_query12.cu |
Q12: shipping mode (lineitem ⨝ orders, group by shipmode with priority case). |
dp_test_tpch_query14 |
src/data_plane/tests/test_tpch_query14.cu |
Q14: promotion effect (lineitem with date filter + part group). |
dp_test_tpch_query19 |
src/data_plane/tests/test_tpch_query19.cu |
Q19: discounted revenue (lineitem ⨝ part with multiple disjunctive predicates). |
| Test | Source | Coverage |
|---|---|---|
cp_integration |
src/control_plane/tests/test_integration.cu |
Full handoff: in-process Controller + Agent → real CreateChannel → on_fwd_table callback → translate FwdTable into multisession_exchange's fwd_tables_override → run two full sessions of the data-plane scenario. Verifies the FwdTable shape is consumable by data plane and routing decisions are correct (final aggregate sums match expected). ~5 min. |
cp_integration_onetothree |
src/control_plane/tests/test_integration_onetothree.cu |
Same shape, but the controller picks the relay-mid set per receiver (FwdTable bins where next != dst) and hands it to onetothree_exchange as relay_mids_per_dst. On full mesh this matches the standalone hardcode; on non-mesh topologies the controller picks a strict subset. ~50 s. |
ctest -LE cluster on a single 4×H100 host (cluster requires MGI_TEST_HOSTS=hostA,hostB and is exercised separately):
100% tests passed, 0 tests failed out of 19
Total Test time (real) = 916.17 sec (~15 min)
Label Time Summary:
unit = 0.01 s (1 test )
node = 3.30 s (1 test )
integration = 336.30 s (2 tests)
data = 298.83 s (9 tests)
data_tpch = 277.67 s (6 tests)
Per-test (selected highlights):
cp_unit .................................... 0.01 s
cp_node .................................... 3.30 s discovery (4 GPUs / 13 verts / 42 edges) + in-process e2e
cp_integration ............................. 285.56 s CP-derived FwdTable -> multisession_exchange (2 sessions)
cp_integration_onetothree .................. 50.74 s controller-picked relay set -> onetothree multipath
dp_test_multisession_exchange .............. 123.90 s 2 back-to-back exchange sessions (heaviest scenario)
dp_test_tpch_query{3,5,10,12,14,19} ........ 41-51 s one entry per query
(other dp_* scenarios) ..................... 17-27 s
Cluster reference (2 hosts via MGI_TEST_HOSTS=A,B ctest -L cluster): cp_cluster Passed in 17.10 s — 2 hosts: register × 2 + CreateChannel(1234) + push (4 tables / 2312 B).
A note on gdr_close: the data-plane gdrcopy integration has a known segfault on gdr_close() at process exit. All tests call gdr_close_safe(g) instead, which is a no-op when GDR_COPY=0 (the default in include/data_plane/config.cuh). When you flip GDR_COPY to 1 to actually use gdrcopy, you'll need to debug the segfault separately.
Please follow the README.md in each evaluation subdirectory