Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ private static MemoryRecords constructRecordsFromFile(
return null; // Doesn't cover entire batch range - incomplete batch
}

// All extents cover the batch range, safe to allocate full buffer
// All extents cover the batch range, allocate buffer and copy data.
// Note: We always copy because createMemoryRecords mutates the buffer (setLastOffset,
// setMaxTimestamp), and FileExtent data is cached/shared across concurrent fetches.
final byte[] buffer = new byte[Math.toIntExact(batchRange.bufferSize())];

for (FileExtent file : files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,27 @@ public FileFetchJob(Time time,

// visible for testing
static FileExtent createFileExtent(ObjectKey object, ByteRange byteRange, ByteBuffer buffer) {
// Handle both heap and direct/read-only ByteBuffers
// buffer.array() returns the entire backing array and ignores position/limit/arrayOffset,
// so we can only use it directly when the buffer spans the entire array.
byte[] data;
if (buffer.hasArray()
&& buffer.arrayOffset() == 0
&& buffer.position() == 0
&& buffer.remaining() == buffer.array().length) {
// Buffer spans the entire backing array - use directly without copy
data = buffer.array();
} else {
// Copy from direct/read-only buffer or buffer with non-zero position/arrayOffset
data = new byte[buffer.remaining()];
buffer.get(data);
}
return new FileExtent()
.setObject(object.value())
.setRange(new FileExtent.ByteRange()
.setOffset(byteRange.offset())
.setLength(buffer.limit()))
.setData(buffer.array());
.setLength(data.length))
.setData(data);
Comment thread
Mwea marked this conversation as resolved.
}

@Override
Expand All @@ -70,7 +85,8 @@ public FileExtent call() throws Exception {
}

private FileExtent doWork() throws IOException, StorageBackendException {
final ByteBuffer byteBuffer = objectFetcher.readToByteBuffer(objectFetcher.fetch(key, range));
// Use fetchToByteBuffer for direct ByteBuffer access (avoids channel/stream overhead)
final ByteBuffer byteBuffer = objectFetcher.fetchToByteBuffer(key, range);
return createFileExtent(key, range, byteBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
Expand Down Expand Up @@ -168,15 +169,18 @@ void commit(final ClosedFile file) throws InterruptedException {
totalBytesInProgress.addAndGet(file.size());

// Start uploading and add to the commit queue (as Runnable).
// This ensures files are uploaded in concurrently, but committed to the control plane sequentially,
// This ensures files are uploaded concurrently, but committed to the control plane sequentially,
// because `executorServiceCommit` is single-threaded.
final FileUploadJob uploadJob = FileUploadJob.createFromByteArray(
// Use ByteBuffer upload path for zero-copy S3 uploads (avoids internal byte[] copy).
// asReadOnlyBuffer() provides defense against accidental modification while
// preserving zero-copy semantics (no memory allocation beyond the view).
final FileUploadJob uploadJob = FileUploadJob.createFromByteBuffer(
objectKeyCreator,
storage,
time,
maxFileUploadAttempts,
fileUploadRetryBackoff,
file.data(),
ByteBuffer.wrap(file.data()).asReadOnlyBuffer(),
metrics::fileUploadFinished
);
final Future<ObjectKey> uploadFuture = executorServiceUpload.submit(uploadJob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Callable;
Expand All @@ -50,10 +51,14 @@ public class FileUploadJob implements Callable<ObjectKey> {
private final Time time;
private final int attempts;
private final Duration retryBackoff;
private final Supplier<InputStream> data;
private final Supplier<InputStream> dataStream;
private final ByteBuffer dataBuffer;
private final long length;
private final Consumer<Long> durationCallback;

/**
* Constructor for InputStream-based uploads.
*/
public FileUploadJob(final ObjectKeyCreator objectKeyCreator,
final ObjectUploader objectUploader,
final Time time,
Expand All @@ -70,11 +75,41 @@ public FileUploadJob(final ObjectKeyCreator objectKeyCreator,
}
this.attempts = attempts;
this.retryBackoff = Objects.requireNonNull(retryBackoff, "retryBackoff cannot be null");
this.data = Objects.requireNonNull(data, "data cannot be null");
this.dataStream = Objects.requireNonNull(data, "data cannot be null");
this.dataBuffer = null;
this.length = length;
this.durationCallback = Objects.requireNonNull(durationCallback, "durationCallback cannot be null");
}

/**
* Constructor for ByteBuffer-based uploads (zero-copy path).
* Note: The buffer is stored by reference, not copied. The caller must ensure the buffer
* is not modified between construction and when call() completes. The buffer's position
* and limit are captured at upload time via duplicate() in the ObjectUploader implementation,
* which preserves retry support without modifying the original buffer.
*/
Comment thread
Mwea marked this conversation as resolved.
private FileUploadJob(final ObjectKeyCreator objectKeyCreator,
final ObjectUploader objectUploader,
final Time time,
final int attempts,
final Duration retryBackoff,
final ByteBuffer dataBuffer,
final Consumer<Long> durationCallback) {
this.objectKeyCreator = Objects.requireNonNull(objectKeyCreator, "objectKeyCreator cannot be null");
this.objectUploader = Objects.requireNonNull(objectUploader, "objectUploader cannot be null");
this.time = Objects.requireNonNull(time, "time cannot be null");
if (attempts <= 0) {
throw new IllegalArgumentException("attempts must be positive");
}
this.attempts = attempts;
this.retryBackoff = Objects.requireNonNull(retryBackoff, "retryBackoff cannot be null");
this.dataStream = null;
// Store the buffer reference - position/limit are preserved via duplicate() during upload
this.dataBuffer = Objects.requireNonNull(dataBuffer, "dataBuffer cannot be null");
this.length = dataBuffer.remaining();
this.durationCallback = Objects.requireNonNull(durationCallback, "durationCallback cannot be null");
}

public static FileUploadJob createFromByteArray(final ObjectKeyCreator objectKeyCreator,
final ObjectUploader objectUploader,
final Time time,
Expand All @@ -93,7 +128,32 @@ public static FileUploadJob createFromByteArray(final ObjectKeyCreator objectKey
data.length,
durationCallback
);
}

/**
* Creates a FileUploadJob for ByteBuffer data using the zero-copy upload path.
* The ByteBuffer's position will not be modified (uses duplicate internally for retries).
*/
public static FileUploadJob createFromByteBuffer(final ObjectKeyCreator objectKeyCreator,
final ObjectUploader objectUploader,
final Time time,
final int attempts,
final Duration retryBackoff,
final ByteBuffer data,
final Consumer<Long> durationCallback) {
Objects.requireNonNull(data, "data cannot be null");
if (data.remaining() <= 0) {
throw new IllegalArgumentException("data must have remaining bytes");
}
return new FileUploadJob(
objectKeyCreator,
objectUploader,
time,
attempts,
retryBackoff,
data,
durationCallback
);
}

@Override
Expand All @@ -106,7 +166,17 @@ private ObjectKey callInternal() throws Exception {
final Exception uploadError;
try {
objectKey = objectKeyCreator.create(Uuid.randomUuid().toString());
uploadError = uploadWithRetry(objectKey, data, length);
if (dataBuffer != null) {
LOGGER.debug("Uploading {} via ByteBuffer (zero-copy)", objectKey);
uploadError = uploadWithRetry(objectKey, () -> objectUploader.upload(objectKey, dataBuffer));
} else {
LOGGER.debug("Uploading {} via InputStream", objectKey);
uploadError = uploadWithRetry(objectKey, () -> {
try (InputStream stream = dataStream.get()) {
objectUploader.upload(objectKey, stream, length);
}
});
}
} catch (final Exception e) {
LOGGER.error("Unexpected exception", e);
throw e;
Expand All @@ -119,39 +189,54 @@ private ObjectKey callInternal() throws Exception {
}
}

private Exception uploadWithRetry(final ObjectKey objectKey, final Supplier<InputStream> data, final long length) {
LOGGER.debug("Uploading {}", objectKey);
/**
* Executes the upload operation with retry logic.
* @param objectKey the object key being uploaded (for logging)
* @param uploadOperation the upload operation to execute
* @return null on success, or the last exception on failure after all retries exhausted
*/
private Exception uploadWithRetry(final ObjectKey objectKey, final UploadOperation uploadOperation) {
Exception error = null;
for (int attempt = 0; attempt < attempts; attempt++) {
try (InputStream stream = data.get()) {
objectUploader.upload(objectKey, stream, length);
try {
uploadOperation.execute();
LOGGER.debug("Successfully uploaded {}", objectKey);
return null;
} catch (final StorageBackendException | IOException e) {
error = e;
// Sleep on all attempts but last.
final boolean lastAttempt = attempt == attempts - 1;
if (lastAttempt) {
if (e instanceof StorageBackendTimeoutException) {
LOGGER.error("Error uploading {} due to timeout, giving up: {}", objectKey, safeGetCauseMessage(e));
} else {
LOGGER.error("Error uploading {}, giving up", objectKey, e);
}
} else {
if (e instanceof StorageBackendTimeoutException) {
LOGGER.error("Error uploading {} due to timeout, retrying in {} ms: {}",
objectKey, retryBackoff.toMillis(), safeGetCauseMessage(e));
} else {
LOGGER.error("Error uploading {}, retrying in {} ms",
objectKey, retryBackoff.toMillis(), e);
}
logRetryableError(objectKey, lastAttempt, e);
if (!lastAttempt) {
time.sleep(retryBackoff.toMillis());
}
}
}
return error;
}

private void logRetryableError(final ObjectKey objectKey, final boolean lastAttempt, final Exception e) {
if (lastAttempt) {
if (e instanceof StorageBackendTimeoutException) {
LOGGER.error("Error uploading {} due to timeout, giving up: {}", objectKey, safeGetCauseMessage(e));
} else {
LOGGER.error("Error uploading {}, giving up", objectKey, e);
}
} else {
if (e instanceof StorageBackendTimeoutException) {
LOGGER.error("Error uploading {} due to timeout, retrying in {} ms: {}",
objectKey, retryBackoff.toMillis(), safeGetCauseMessage(e));
} else {
LOGGER.error("Error uploading {}, retrying in {} ms",
objectKey, retryBackoff.toMillis(), e);
}
}
}

@FunctionalInterface
private interface UploadOperation {
void execute() throws StorageBackendException, IOException;
}

private static String safeGetCauseMessage(final Exception e) {
return e.getCause() != null ? e.getCause().getMessage() : "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ public interface ObjectFetcher extends Closeable {

ReadableByteChannel fetch(ObjectKey key, ByteRange range) throws StorageBackendException, IOException;

/**
* Fetches object data directly into a ByteBuffer, avoiding intermediate channel/stream copies.
* Implementations that can provide direct ByteBuffer access (e.g., S3 getObjectAsBytes) should
* override this method for better performance.
*
* <p>The default implementation falls back to fetch() + readToByteBuffer() for compatibility.
*
* @param key the object key to fetch
* @param range the byte range to fetch, or null for entire object
* @return ByteBuffer containing the fetched data with position at 0 and limit at data length
*/
default ByteBuffer fetchToByteBuffer(ObjectKey key, ByteRange range) throws StorageBackendException, IOException {
try (ReadableByteChannel channel = fetch(key, range)) {
return readToByteBuffer(channel);
}
}

default ByteBuffer readToByteBuffer(final ReadableByteChannel readableByteChannel) throws IOException {
final ByteBuffer byteBuffer;
final List<ByteBuffer> buffers = new ArrayList<>(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
*/
package io.aiven.inkless.storage_backend.common;

import org.apache.kafka.common.utils.ByteBufferInputStream;

import java.io.Closeable;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Objects;

import io.aiven.inkless.common.ObjectKey;

Expand All @@ -35,4 +39,21 @@ public interface ObjectUploader extends Closeable {
*/
void upload(ObjectKey key, InputStream inputStream, long length) throws StorageBackendException;

/**
* Uploads an object to object storage from a ByteBuffer.
* The buffer's position will not be modified (uses duplicate internally).
* @param key key of the object to upload.
* @param byteBuffer data of the object that will be uploaded.
* @throws StorageBackendException if there are errors during the upload.
*/
default void upload(ObjectKey key, ByteBuffer byteBuffer) throws StorageBackendException {
Objects.requireNonNull(key, "key cannot be null");
Objects.requireNonNull(byteBuffer, "byteBuffer cannot be null");
if (byteBuffer.remaining() <= 0) {
throw new IllegalArgumentException("byteBuffer must have remaining bytes");
}
final ByteBuffer duplicate = byteBuffer.duplicate();
upload(key, new ByteBufferInputStream(duplicate), duplicate.remaining());
}

}
Loading
Loading