Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider moving this CacheStoreJob refactoring to a separate PR? It's changes are not crucial to this change.
We can make it implement Consumer<ObjectKey> and rename storeToCache to accept.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.function.Consumer;

import io.aiven.inkless.TimeUtils;
Expand All @@ -32,32 +31,43 @@
import io.aiven.inkless.generated.CacheKey;
import io.aiven.inkless.generated.FileExtent;

public class CacheStoreJob implements Runnable {
/**
* Handles caching of uploaded file data to the object cache.
*
* <p>Implements {@link Consumer} for use with {@code CompletableFuture.thenAcceptAsync()}.
* When the upload completes successfully, the data is stored to cache.
*
* <p><b>Thread Safety:</b> The {@link #accept} method is safe to call from any thread.
* The caller controls the execution context via {@code thenAcceptAsync(..., executor)}.
*/
Comment thread
Mwea marked this conversation as resolved.
class CacheStoreJob implements Consumer<ObjectKey> {

private final Time time;
private final ObjectCache cache;
private final KeyAlignmentStrategy keyAlignmentStrategy;
private final byte[] data;
private final Future<ObjectKey> uploadFuture;
private final Consumer<Long> cacheStoreDurationCallback;

public CacheStoreJob(Time time, ObjectCache cache, KeyAlignmentStrategy keyAlignmentStrategy, byte[] data, Future<ObjectKey> uploadFuture, Consumer<Long> cacheStoreDurationCallback) {
public CacheStoreJob(Time time,
ObjectCache cache,
KeyAlignmentStrategy keyAlignmentStrategy,
byte[] data,
Consumer<Long> cacheStoreDurationCallback) {
this.time = time;
this.cache = cache;
this.keyAlignmentStrategy = keyAlignmentStrategy;
this.data = data;
this.uploadFuture = uploadFuture;
this.cacheStoreDurationCallback = cacheStoreDurationCallback;
}

/**
* Stores the uploaded file data to cache.
*
* <p>This method is only called when upload succeeds (via thenAcceptAsync).
*/
@Override
public void run() {
try {
ObjectKey objectKey = uploadFuture.get();
storeToCache(objectKey);
} catch (final Throwable e) {
// If the upload failed there's nothing to cache and we succeed vacuously.
}
public void accept(ObjectKey objectKey) {
storeToCache(objectKey);
}

private void storeToCache(ObjectKey objectKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.Function;

import io.aiven.inkless.TimeUtils;
import io.aiven.inkless.common.ObjectFormat;
Expand All @@ -40,32 +38,34 @@
/**
* The job of committing the already uploaded file to the control plane.
*
* <p>If the file was uploaded successfully, commit to the control plane happens. Otherwise, it doesn't.
* <p>This class implements {@link Function} for use with {@code CompletableFuture.thenApplyAsync()}.
* When the upload completes successfully, {@link #apply} is invoked to perform the actual commit.
* This eliminates blocking wait on upload completion, allowing the commit executor to
* only do actual commit work instead of waiting for S3 latency.
*
* <p>Upload failures are handled separately in the future chain and don't reach this job.
*/
class FileCommitJob implements Supplier<List<CommitBatchResponse>> {
class FileCommitJob implements Function<ObjectKey, List<CommitBatchResponse>> {
private static final Logger LOGGER = LoggerFactory.getLogger(FileCommitJob.class);

private final int brokerId;
private final ClosedFile file;
private final Future<ObjectKey> uploadFuture;
private final Time time;
private final ControlPlane controlPlane;
private final ObjectDeleter objectDeleter;
private final long commitSubmitTimeMs;
private volatile long commitSubmitTimeMs;
private final Consumer<Long> durationCallback;
private final Consumer<Long> commitWaitDurationCallback;

FileCommitJob(final int brokerId,
final ClosedFile file,
final Future<ObjectKey> uploadFuture,
final Time time,
final ControlPlane controlPlane,
final ObjectDeleter objectDeleter,
final Consumer<Long> durationCallback,
final Consumer<Long> commitWaitDurationCallback) {
this.brokerId = brokerId;
this.file = file;
this.uploadFuture = uploadFuture;
this.controlPlane = controlPlane;
this.time = time;
this.objectDeleter = objectDeleter;
Expand All @@ -75,48 +75,44 @@ class FileCommitJob implements Supplier<List<CommitBatchResponse>> {
this.commitWaitDurationCallback = commitWaitDurationCallback;
}

/**
* Resets the submit time to now. Call this when the commit is actually ready to be executed
* (e.g., after upload completes in async mode) to measure only the executor queue wait time,
* not the time waiting for previous commits in the chain.
*/
void markReadyToCommit() {
this.commitSubmitTimeMs = time.milliseconds();
}

/**
* {@inheritDoc}
*
* <p>Commits the uploaded file to the control plane.
* This method is only called when upload succeeds (via thenApplyAsync).
*/
@Override
public List<CommitBatchResponse> get() {
// The wait for upload is already measured, and should take the upload time or less if it was already completed.
final UploadResult uploadResult = waitForUpload();
// Measure the duration from the commit job submission to the moment we start committing.
// and should account for the wait time to execute the commit job on a single-threaded executor.
public List<CommitBatchResponse> apply(ObjectKey objectKey) {
// Measure the duration from markReadyToCommit() to the moment we start committing.
// markReadyToCommit() is called after upload completion and chain wait,
// so this measures only the executor queue wait time.
commitWaitDurationCallback.accept(time.milliseconds() - commitSubmitTimeMs);
return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(uploadResult), durationCallback);
}

private UploadResult waitForUpload() {
try {
final ObjectKey objectKey = uploadFuture.get();
return new UploadResult(objectKey, null);
} catch (final ExecutionException e) {
LOGGER.error("Failed upload", e);
return new UploadResult(null, e.getCause());
} catch (final InterruptedException e) {
// This is not expected as we try to shut down the executor gracefully.
LOGGER.error("Interrupted", e);
throw new RuntimeException(e);
}
return TimeUtils.measureDurationMsSupplier(time, () -> doCommit(objectKey), durationCallback);
}

private List<CommitBatchResponse> doCommit(final UploadResult result) {
if (result.objectKey != null) {
LOGGER.debug("Uploaded {} successfully, committing", result.objectKey);
try {
final var commitBatchResponses = controlPlane.commitFile(result.objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests());
LOGGER.debug("Committed successfully");
return commitBatchResponses;
} catch (final Exception e) {
LOGGER.error("Commit failed", e);
if (e instanceof ControlPlaneException) {
// only attempt to remove the uploaded file if it is a control plane error
tryDeleteFile(result.objectKey(), e);
}
throw e;
private List<CommitBatchResponse> doCommit(final ObjectKey objectKey) {
LOGGER.debug("Uploaded {} successfully, committing", objectKey);
try {
final var commitBatchResponses = controlPlane.commitFile(objectKey.value(), ObjectFormat.WRITE_AHEAD_MULTI_SEGMENT, brokerId, file.size(), file.commitBatchRequests());
LOGGER.debug("Committed successfully");
return commitBatchResponses;
} catch (final Exception e) {
LOGGER.error("Commit failed", e);
if (e instanceof ControlPlaneException) {
// only attempt to remove the uploaded file if it is a control plane error
tryDeleteFile(objectKey, e);
}
} else {
// no need to log here, it was already logged in waitForUpload
throw new FileUploadException(result.uploadError);
throw e;
}
}

Expand All @@ -141,6 +137,4 @@ private void tryDeleteFile(ObjectKey objectKey, Exception e) {
}
}

private record UploadResult(ObjectKey objectKey, Throwable uploadError) {
}
}
Loading
Loading