Skip to content

Feature request: parallel downloading #112

@zorat321

Description

@zorat321

Some S3 providers are not very fast when downloading only one chunk/offset at a time. I think parallel downloading support would be nice.

If it is useful for getting started, I used an LLM to generate some prototype code, and tuned some of the offsets on my network. However, I suggest that you at most use it as a starting point, to reduce the proliferation of AI slop. (Probably there's something more elegant than the semaphore, and it would definitely better to use some more dynamic logic for the staggering the "workers" than something tuned to my home network.) This code is about 10x faster, which practically makes it much more useful for me, but still about 2x off from my network bandwidth. Perhaps alignment of requests with uploaded parts (when doing multi-part upload) or such would improve it.

import 'dart:async';
import 'dart:math';
import 'dart:typed_data';

import 'package:minio/minio.dart';

class ParallelMinio extends Minio {
  ParallelMinio({
    required super.endPoint,
    required super.accessKey,
    required super.secretKey,
    super.port,
    super.useSSL,
    super.sessionToken,
    super.region,
    super.pathStyle,
    super.enableTrace,
  });

  /// Downloads the whole object in parallel, with staggered chunk starts
  /// to keep the network utilisation smooth.
  ///
  /// - [chunkSize] size of each individual range request (default 5 MiB)
  /// - [maxConcurrency] maximum number of simultaneous requests
  /// - [staggerDelay] maximum random delay (in milliseconds) added before
  ///   each new request to desynchronise start times (default 50 ms)
  Future<Uint8List> downloadObject(
    String bucket,
    String object, {
    int chunkSize = 4 * 1024 * 1024,
    int maxConcurrency = 20,
    int staggerStartIndex = 5,
    int staggerDelayMs = 300,
    void Function(int downloaded, int total)? onProgress,
  }) async {
    final stat = await statObject(bucket, object);
    final totalSize = stat.size;
    if (totalSize == null) {
      throw Exception('Unable to determine object size for $object');
    }
    if (totalSize == 0) return Uint8List(0);

    final numChunks = (totalSize / chunkSize).ceil();
    final results = List<Uint8List?>.filled(numChunks, null);

    final semaphore = _Semaphore(maxConcurrency);
    var downloaded = 0;
    final futures = <Future>[];

    // Launch one worker per chunk. Each worker first waits for a permit,
    // then sleeps a small random time before actually starting the download.
    // This simple jitter prevents all concurrent downloads from starting
    // at exactly the same moment.
    for (var i = 0; i < numChunks; i++) {
      final index = i;
      final start = index * chunkSize;
      final length = min(chunkSize, totalSize - start);

      futures.add(semaphore.withPermit(() async {
        // Stagger the start: wait a random duration up to staggerDelayMs.
        if (index >= staggerStartIndex && index < maxConcurrency) {
          final delayMs = (index - staggerStartIndex) * staggerDelayMs;
          await Future.delayed(Duration(milliseconds: delayMs));
        }

        // Download the chunk.
        final stream = await getPartialObject(bucket, object, start, length);
        final builder = await stream.fold<BytesBuilder>(
          BytesBuilder(),
          (b, bytes) => b..add(bytes),
        );
        final data = builder.toBytes();

        results[index] = data;
        downloaded += data.length;
        onProgress?.call(downloaded, totalSize);
      }));
    }

    await Future.wait(futures);

    // Combine chunks in order.
    final resultBuilder = BytesBuilder();
    for (final chunk in results) {
      resultBuilder.add(chunk!);
    }
    return resultBuilder.toBytes();
  }
}

/// A basic semaphore for limiting concurrent async operations.
class _Semaphore {
  final int maxCount;
  int _current = 0;
  final _waiters = <Completer<void>>[];

  _Semaphore(this.maxCount);

  Future<T> withPermit<T>(Future<T> Function() callback) async {
    await acquire();
    try {
      return await callback();
    } finally {
      release();
    }
  }

  Future<void> acquire() {
    if (_current < maxCount) {
      _current++;
      return Future.value();
    }
    final completer = Completer<void>();
    _waiters.add(completer);
    return completer.future;
  }

  void release() {
    _current--;
    if (_waiters.isNotEmpty) {
      final completer = _waiters.removeAt(0);
      _current++;
      completer.complete();
    }
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions