Skip to content

fardatalab/MGI

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

MGI

A general communication framework for Massive GPU Infrastructure.

MGI encompasses a set of efficient communication optimizations for performing data processing tasks, e.g., Exchange, GroupBy, and Join, across many GPUs. These optimizations include:

  • Pipelining
  • Batching
  • Multi-path
  • NIC-direct

All these techniques are automatically applied behind an intuitive, on-device API.

Table of Contents

Project Layout

include/control_plane/
├── gig/                                         # GIG: Vertex / Edge / Gig,
│                                                #   Path / PathSet / concat,
│                                                #   Edmonds-Karp + decomposition,
│                                                #   FwdTable + §4.4 merge
├── agent/                                       # local_discover (NVML/hwloc/sysfs)
│                                                #   + Agent runtime
├── controller/                                  # Controller process + 3 modules
│   ├── td/                                      #   Topology Discovery (assemble v_net)
│   ├── to/                                      #   Transfer Optimizer (Fig 6 EK + cache)
│   └── cc/                                      #   Channel Creation (build FwdTables)
└── common/                                      # codec (length-prefix framing),
                                                 #   messages (8 opcodes),
                                                 #   tcp (server + client base)

src/control_plane/
├── CMakeLists.txt                               # standalone project; ctest labels
├── gig/, agent/, controller/, common/           # impls of the headers above
└── tests/
    ├── test_unit.cpp                            # CPU-only algorithms + codec
    ├── test_node.cpp                            # 1-node loopback e2e
    ├── test_integration.cu                      # CP → multisession_exchange
    ├── test_integration_onetothree.cu           # CP → onetothree_exchange (relays)
    └── cluster_smoke.sh                         # 2-node ssh-orchestrated e2e

include/data_plane/                              # library only (no test fixtures)
├── config.cuh, common.cuh, on_path_helper.cuh   # foundations & shared helpers
├── api/                                         # device-side API (core.cuh)
├── infra/                                       # endpoint, kbuffering, semaphore
├── channel/                                     # ChannelRuntime, VirtualChannel,
│                                                #   PhysicalChannel, PathScheduler,
│                                                #   deadlock monitor
├── runtime/                                     # host_worker_runtime, flow scheduler
├── transport/                                   # ICopyOp backends: NVLink direct,
│                                                #   NVLink multipath, PCIe, datapath
├── network/                                     # inter-node RDMA (UCX, multinode)
├── legacy/                                      # pre-channel-refactor code (kept
│                                                #   for reference / comparison)
└── backup/                                      # archived host_worker variant

src/data_plane/
├── CMakeLists.txt
└── tests/                                       # all test fixtures + drivers
    ├── scenarios/                               # exchange/groupby/multisession
    │                                            #   orchestration (was in include/)
    ├── patterns/                                # p2p direct/multipath, 1-to-M,
    │                                            #   M-to-1 patterns (was in include/)
    ├── schema/                                  # TPC-H row types (was in include/)
    ├── test_exchange_without_worker.cu
    ├── test_multisession_exchange.cu
    ├── test_one_to_three_multipath.cu
    ├── test_tpch_query{3,5,10,12,14,19}.cu
    └── tpch_queries/                            # per-query kernel implementations

Control Plane

The control plane implements Section 4 of the MGI paper: a central Controller + a per-host Agent that together discover the cluster topology (the GIG — GPU Infrastructure Graph), plan max-flow paths between GPU endpoints, and ship per-GPU forwarding tables to the data plane.

┌─────────────────────────────────────────────────────────────────┐
│  Controller process (one per cluster, port 20000)               │
│    Topology Discovery (TD)   merges agent subgraphs + adds v_net│
│    Transfer Optimizer  (TO)  hierarchical Edmonds-Karp + cache  │
│    Channel Creation    (CC)  build per-GPU forwarding tables    │
└──────────────┬──────────────────────────────┬──────────────────┘
               │ TCP frames                   │ TCP frames
               ▼                              ▼
┌──────────────────────────┐    ┌──────────────────────────┐
│  Agent process on host A │    │  Agent process on host B │
│   local_discover ─ §4.2  │    │   local_discover ─ §4.2  │
│     NVML / hwloc / sysfs │    │     NVML / hwloc / sysfs │
│   inbound listener       │    │   inbound listener       │
│     receives FwdTables   │    │     receives FwdTables   │
│     fires on_fwd_table   │    │     fires on_fwd_table   │
└──────────────────────────┘    └──────────────────────────┘

Wire protocol (include/control_plane/common/messages.h): three core opcodes drive everything. Length-prefix framing + 1 B version + 1 B opcode + payload.

Direction Opcode Carries
Agent → Controller kOpReportSubgraph local GIG subgraph + GDR signals + push port
Agent → Controller kOpCreateChannel channel id + src GPU vertex ids + dst vertex ids
Controller → Agent kOpDeliverFwdTable channel id + per-owner-GPU FwdTables slice

Plus kOpDeleteChannel, kOpHeartbeat, kOpAck, kOpError for lifecycle.

Forwarding table semantics (GPU-only, see gig/forwarding.h): each owner GPU's table maps dst_gpu → list of (next_gpu, [physical paths]) bins. The next_gpu is always another GPU; CPU/NIC/v_net hops live inside the subpath of each path so the data plane can replay the bounce (e.g. gpu0 → cpu3 → ibp3s0 → v_net → remote_nic → remote_cpu → remote_gpu for a non-GDR cross-server transfer).

Topology detection rules (Section 4.2, agent/local_discover.cu):

  • NVSwitch / direct NVLink: distinguished by nvmlDeviceGetNvLinkRemoteDeviceType per link
  • NIC ↔ GPU direct edge: emitted only when (a) NIC is RDMA-capable (/sys/class/infiniband/* binds same PCI BDF), (b) NIC and GPU share a PCIe-switch ancestor in /sys/bus/pci/devices/<bdf>/../, AND (c) admin opts into GDR via MGI_GDR_FORCE=on
  • NUMA mesh: SLIT distance matrix scales xpi_base_bw_gbps per pair (default 50 GB/s); no QPI clock heuristics

Data Plane

The data plane is built on top of channels. A channel holds the K-buffering slots, path scheduler, and virtual-channel state for a pair of GPUs, and is reused across successive data transfers — once constructed, the same runtime carries multiple transfers (or multiple sessions with different tuple types) without tearing down and re-creating the underlying buffers, streams, or RDMA registrations.

Channel Architecture

┌────────────────────────────────────────────────────────────────┐
│  Application layer  (scenarios/*.cuh, user kernels)            │
│    device-side calls only:                                     │
│      mgi::send_direct()            write tuples → send buf    │
│      mgi::recv_direct_drain()      drain a recv buffer        │
│      mgi::flush_direct_nb()        flush partial send buffer  │
│      mgi::eof_send_direct_nb()     mark all send slots as EOF │
└──────────────────────────┬─────────────────────────────────────┘
                           │  shared-memory K-buffers (GDR)
┌──────────────────────────▼─────────────────────────────────────┐
│  Channel layer  (channel/channel_runtime.cuh)                  │
│    ChannelRuntime<GRID_SIZE, TupleType>                        │
│      ├─ VirtualChannel[dst]   N physical paths per dest        │
│      │    IPathScheduler (WeightedRR / LeastLoad / custom)     │
│      │      └─ IPhysicalChannel → ICopyOp backend              │
│      │           NVLink (P2PMemcpyOp)                          │
│      │           PCIe bounce (PcieBounceSyncOp)                │
│      │           Multi-hop relay (P2PRelayOp)                  │
│      ├─ host worker thread  (runtime/host_worker_runtime.cuh)  │
│      │    polls send buffers → dispatches to VirtualChannel    │
│      └─ deadlock monitor thread  (channel/deadlock.cuh)        │
│           watches WorkerProgress; aborts on stall              │
└────────────────────────────────────────────────────────────────┘

Application kernels never touch channel lifecycle. ChannelRuntime owns everything from init() to finalize(); kernels only call the device-side primitives.

Device-side API

Call Return Purpose
mgi::send_direct<T, INTRA, INTER>(ptr, n) MGI_STATUS_SUCCESS / FAILED Scatter n tuples into per-partition send slots. FAILED = buffer full, retry.
mgi::recv_direct_drain<T>(fn) SUCCESS / FAILED / EOF Drain the next ready recv buffer; invokes fn(tuple) per tuple.
mgi::recv_direct_self_drain<T>(fn) SUCCESS / FAILED Drain the self-partition slot in the send buffer (fast path, no copy).
mgi::send_direct_cell<T, INTRA, INTER>(ptr, n) SUCCESS / FAILED Cell-based send for tuples ≥ 64B (splits into 64B cells).
mgi::recv_direct_cell<T>(&tuple) SUCCESS / FAILED / EOF Cell-based receive, reassembles tuple from 64B cells.
mgi::flush_direct_nb() non-zero when done Non-blocking: flush partial send buffers.
mgi::eof_send_direct_nb() non-zero when done Non-blocking: mark all send slots MGI_EOF.

Termination is EOF-driven: kernel flushes → signals EOF → host worker broadcasts MGI_EOF to recv side → receiver's recv_direct_drain returns EOF after K_BUFFERING_K markers → finalize() joins threads.

Experiments

Prerequisites

module load cuda ucx-cuda                 # CUDA + UCX toolchain
export CUDA_MODULE_LOADING=EAGER
export MGI_NETWORK_IFACES=eno4,ibp67s0,ibp73s0,ibp3s0,ibp9s0
                                          # actual NIC names from `ip -o link show`

A single-host run needs ≥ 2 GPUs (typically 4× H100 with NVLink). Multi-host runs need each host reachable from the controller's IP. CMake ≥ 3.31.

Build

The top-level CMakeLists.txt composes both planes plus the integration tests. Each plane is also its own standalone CMake project, so you can build only one when you don't need the other.

# (a) Combined — recommended. Top-level build dir.
cmake -S . -B build
cmake --build build -j

# (b) Combined with a plane disabled (skip whole subprojects).
cmake -S . -B build -DMGI_BUILD_DATA_PLANE=OFF
cmake -S . -B build -DMGI_BUILD_CONTROL_PLANE=OFF
cmake -S . -B build -DMGI_BUILD_INTEGRATION_TEST=OFF
cmake --build build -j

# (c) Standalone control plane only. Build dir under src/control_plane/.
cmake -S src/control_plane -B src/control_plane/build
cmake --build src/control_plane/build -j

# (d) Standalone data plane only. Build dir under src/data_plane/.
cmake -S src/data_plane -B src/data_plane/build
cmake --build src/data_plane/build -j

The combined build (a) drops every binary into build/ under the matching subproject path:

build/
├── src/control_plane/
│   ├── mgi_controller            ← controller process
│   ├── mgi_agent                 ← per-host agent
│   ├── test_cp_unit / test_cp_node
│   └── test_cp_integration / test_cp_integration_onetothree
├── src/data_plane/
│   ├── test_exchange_without_worker
│   ├── test_multisession_exchange
│   ├── test_one_to_three_multipath
│   └── test_tpch_query{3,5,10,12,14,19}
└── test_logs/                     ← stdout/stderr per ctest run (see Tests)

Running the MGI runtime (controller + agents)

To deploy MGI as a service for your own application, launch one Controller on a routable host and one Agent on each GPU host:

# Controller — central node, any host with a routable IP. Listens on 0.0.0.0:20000.
./build/src/control_plane/mgi_controller -a 0.0.0.0 -p 20000

# Agent — every GPU host. Connects to the controller, opens an inbound
# port for DeliverFwdTable pushes.
export MGI_NETWORK_IFACES=eno4,ibp67s0,ibp73s0,ibp3s0,ibp9s0
./build/src/control_plane/mgi_agent \
    -c <controller_ip>              \
    --controller-port 20000         \
    -p 20001                        # this agent's inbound port (controller dials this)

# Optional: admin opt-in to NIC↔GPU direct edges (only when GDR is verified to work).
MGI_GDR_FORCE=on ./build/src/control_plane/mgi_agent ...

Application code links against mgi_cp and uses agent::Agent directly to call create_channel / on_fwd_table (see src/control_plane/tests/test_integration.cu for a worked example).

Running data-plane scenarios directly

Each scenario is also a standalone executable (no controller required for self-contained tests):

./build/src/data_plane/test_exchange_without_worker      # 64 B all-to-all
./build/src/data_plane/test_one_to_three_multipath       # broadcast w/ relays
./build/src/data_plane/test_multisession_exchange        # 2-session reuse
./build/src/data_plane/test_tpch_query5                  # TPC-H Q5

These are the exact same binaries that ctest -L data and -L data_tpch invoke. Output (Throughput, Path statistics, etc.) goes to stdout.

Configuration knobs

The runtime reads compile-time constants in include/data_plane/config.cuh:

  • MultipathPCIE_MULTI_PATH_TEST toggles 1-to-1 multipath
  • GDRGDR_COPY enables gdrcopy paths (default 0; see gdr_close_safe)
  • Launch geometryUSER_KERNEL_GRID_SIZE, USER_KERNEL_BLOCK_SIZE, WORKER_BLOCK_SIZE, WORKER_GRID_SIZE
  • K-bufferingK_BUFFERING_K, KBUFFERING_INTRA_PARTITION_SIZE, KBUFFERING_INTER_PARTITION_SIZE

Keep USER_KERNEL_GRID_SIZE == WORKER_GRID_SIZE for exchange_without_worker.

Tests

The whole test suite is driven from one CMake build. Configure once at the top level (or per-plane), build, then ctest from the build dir. Tests are split into labels so you can run just the ones you need.

module load cuda ucx-cuda
cmake -S . -B build
cmake --build build -j
cd build

# everything that's runnable in this env:
ctest

# subsets:
ctest -L unit          # control-plane algorithms (no GPU, no network)
ctest -L node          # control-plane single-node e2e (1 GPU host)
ctest -L cluster       # control-plane two-host e2e (needs MGI_TEST_HOSTS)
ctest -L data          # data-plane scenarios (4 GPU NVLink)
ctest -L data_tpch     # data-plane TPC-H queries (heavier)
ctest -L integration   # combined: CP-derived FwdTable -> DP scenario

# one specific test:
ctest -R dp_test_one_to_three_multipath --output-on-failure
ctest -R cp_integration_onetothree --output-on-failure

ctest --output-on-failure shows the test's stdout when it fails; pass -V to always show it. The runner also tees every test's stdout/stderr into a per-test file under build/test_logs/<test_name>.log (overwritten on each ctest run), so you can grep/diff logs after the fact:

ls build/test_logs/
# cp_unit.log
# cp_node.log
# cp_integration.log
# dp_test_exchange_without_worker.log
# dp_test_multisession_exchange.log
# dp_test_tpch_query5.log
# ...

grep -H Throughput build/test_logs/dp_test_*.log

CUDA_MODULE_LOADING=EAGER is set automatically by every ctest entry. Set MGI_NETWORK_IFACES=eno4,ibp67s0,... in your shell so local_discover knows which NICs to probe.

Test catalog

Label unit — pure-CPU control plane

Test Source Coverage
cp_unit src/control_plane/tests/test_unit.cpp 9 cases on the gig algorithms (max-flow + path decomposition + concat + §4.4 forwarding-table merge), assemble_global v_net fanout, TransferOptimizer cache reuse, and codec round-trips. ~50 ms, no GPU, no network.

Label node — single GPU host, control plane only

Test Source Coverage
cp_node src/control_plane/tests/test_node.cpp Spawns Controller + Agent in one process on TCP loopback, runs real discover_local() (NVML + hwloc + sysfs), drives an all-to-all create_channel for the host's GPUs, and waits for DeliverFwdTable push to fire the on-callback. ~3 s.

Label cluster — two GPU hosts, control plane only

Test Source Coverage
cp_cluster src/control_plane/tests/cluster_smoke.sh Started on HOST_A: launches Controller + Agent_A locally with --exchange-channel 1234 --exchange-delay-ms 1500, ssh-launches Agent_B on HOST_B, polls the controller log for two reported subgraph lines and one pushed DeliverFwdTable channel=1234 line. Skipped (exit 0) if MGI_TEST_HOSTS=HOST_A,HOST_B isn't set. ~17 s.

Label data — data plane scenarios on the local GPUs

All run on a 4-GPU NVLink full mesh; each tests a different communication pattern.

Test Source Coverage
dp_test_exchange_without_worker src/data_plane/tests/test_exchange_without_worker.cu 64 B all-to-all exchange (no host worker thread). The simplest sanity check that ChannelRuntime + send_direct + recv_direct round-trip on every (src, dst) pair. ~22 s.
dp_test_multisession_exchange src/data_plane/tests/test_multisession_exchange.cu Channel reuse across two sessions: 64 B (LineitemRow) then 256 B (SupplierRow256B, cell path). Same Channel/Endpoint/host-worker carries both. ~5 min, prints aggregate Throughput per session (~1800 GB/s on H100×4).
dp_test_multisession_join src/data_plane/tests/test_multisession_join.cu Channel reuse across two sessions: 64 B build (LineitemRow) then 16 B probe (JoinOrderTuple) with bitmap hit. Same shape as multisession_exchange but the second session is a hash join probe.
dp_test_p2p_chain_direct src/data_plane/tests/test_p2p_chain_direct.cu Single-direction GPU→GPU chain transfer over NVLink. Stress-tests the direct path without aggregation. ~27 s.
dp_test_one_to_three src/data_plane/tests/test_one_to_three.cu One sender broadcasts to three receivers, single direct NVLink path per (sender, dst). Baseline for the multipath variant.
dp_test_one_to_three_multipath src/data_plane/tests/test_one_to_three_multipath.cu Same one-to-three pattern but with relay paths: one direct NVLink + N-2 relays through every intermediate GPU per receiver, scheduled by the WeightedRR path scheduler. Demonstrates P2PRelayOp. ~16 s, ~353 GB/s sender throughput.
dp_test_group_by_direct src/data_plane/tests/test_group_by_direct.cu All-to-all exchange + per-block reduction (a one-stage GroupBy). Validates the recv_direct_drain callback path.
dp_test_p2p_aggregation_direct src/data_plane/tests/test_p2p_aggregation_direct.cu Many-to-one aggregation over a single direct NVLink per source.
dp_test_p2p_aggregation_multipath src/data_plane/tests/test_p2p_aggregation_multipath.cu Many-to-one aggregation, multipath variant: each source uses direct + relays.

Label data_tpch — TPC-H query benchmarks

Each query generates synthetic TPC-H tables in-memory (no external dataset needed) and runs a multi-GPU plan over MGI's exchange / join / group-by primitives.

Test Source Query
dp_test_tpch_query3 src/data_plane/tests/test_tpch_query3.cu Q3: shipping priority (lineitem ⨝ orders ⨝ customer with date filter, group by orderkey + sum revenue). ~50 s.
dp_test_tpch_query5 src/data_plane/tests/test_tpch_query5.cu Q5: revenue by nation (lineitem ⨝ orders ⨝ customer ⨝ supplier ⨝ nation ⨝ region). ~50 s.
dp_test_tpch_query10 src/data_plane/tests/test_tpch_query10.cu Q10: returned items report (lineitem ⨝ orders ⨝ customer ⨝ nation, group by customer + nation).
dp_test_tpch_query12 src/data_plane/tests/test_tpch_query12.cu Q12: shipping mode (lineitem ⨝ orders, group by shipmode with priority case).
dp_test_tpch_query14 src/data_plane/tests/test_tpch_query14.cu Q14: promotion effect (lineitem with date filter + part group).
dp_test_tpch_query19 src/data_plane/tests/test_tpch_query19.cu Q19: discounted revenue (lineitem ⨝ part with multiple disjunctive predicates).

Label integration — CP drives DP

Test Source Coverage
cp_integration src/control_plane/tests/test_integration.cu Full handoff: in-process Controller + Agent → real CreateChannel → on_fwd_table callback → translate FwdTable into multisession_exchange's fwd_tables_override → run two full sessions of the data-plane scenario. Verifies the FwdTable shape is consumable by data plane and routing decisions are correct (final aggregate sums match expected). ~5 min.
cp_integration_onetothree src/control_plane/tests/test_integration_onetothree.cu Same shape, but the controller picks the relay-mid set per receiver (FwdTable bins where next != dst) and hands it to onetothree_exchange as relay_mids_per_dst. On full mesh this matches the standalone hardcode; on non-mesh topologies the controller picks a strict subset. ~50 s.

Reference output (4 H100, full NVLink mesh)

ctest -LE cluster on a single 4×H100 host (cluster requires MGI_TEST_HOSTS=hostA,hostB and is exercised separately):

100% tests passed, 0 tests failed out of 19
Total Test time (real) = 916.17 sec  (~15 min)

Label Time Summary:
  unit         =   0.01 s   (1 test )
  node         =   3.30 s   (1 test )
  integration  = 336.30 s   (2 tests)
  data         = 298.83 s   (9 tests)
  data_tpch    = 277.67 s   (6 tests)

Per-test (selected highlights):
  cp_unit ....................................   0.01 s
  cp_node ....................................   3.30 s   discovery (4 GPUs / 13 verts / 42 edges) + in-process e2e
  cp_integration ............................. 285.56 s   CP-derived FwdTable -> multisession_exchange (2 sessions)
  cp_integration_onetothree ..................  50.74 s   controller-picked relay set -> onetothree multipath
  dp_test_multisession_exchange .............. 123.90 s   2 back-to-back exchange sessions (heaviest scenario)
  dp_test_tpch_query{3,5,10,12,14,19} ........  41-51 s   one entry per query
  (other dp_* scenarios) .....................  17-27 s

Cluster reference (2 hosts via MGI_TEST_HOSTS=A,B ctest -L cluster): cp_cluster Passed in 17.10 s — 2 hosts: register × 2 + CreateChannel(1234) + push (4 tables / 2312 B).

A note on gdr_close: the data-plane gdrcopy integration has a known segfault on gdr_close() at process exit. All tests call gdr_close_safe(g) instead, which is a no-op when GDR_COPY=0 (the default in include/data_plane/config.cuh). When you flip GDR_COPY to 1 to actually use gdrcopy, you'll need to debug the segfault separately.

Baselines

Please follow the README.md in each evaluation subdirectory

About

An interface for Massive GPU Infrastructures

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors