Skip to content

Commit

Permalink
DBZ-6416 Add initial snapshot notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Jun 22, 2023
1 parent d7e4063 commit 34e28ac
Show file tree
Hide file tree
Showing 26 changed files with 540 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public MongoDbChangeEventSourceFactory(MongoDbConnectorConfig configuration, Err
}

@Override
public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MongoDbPartition> snapshotProgressListener) {
public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MongoDbPartition> snapshotProgressListener,
NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService) {
return new MongoDbSnapshotChangeEventSource(
configuration,
taskContext,
Expand All @@ -70,7 +71,8 @@ public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnap
dispatcher,
clock,
snapshotProgressListener,
errorHandler);
errorHandler,
notificationService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void sendEvent(EventDispatcher<MongoDbPartition, CollectionId> dispatc
context.sendEvent(keyFromRow(row));

MongoDbOffsetContext mongoDbOffsetContext = getMongoDbOffsetContext(offsetContext);
ReplicaSet replicaSet = replicaSets.getIncrementalSnapshotReplicaSet();
ReplicaSet replicaSet = replicaSets.getSnapshotReplicaSet();
ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
ReplicaSetPartition replicaSetPartition = mongoDbOffsetContext.getReplicaSetPartition(replicaSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,27 @@

import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

/**
* A {@link SnapshotChangeEventSource} that performs multi-threaded snapshots of replica sets.
* A {@link SnapshotChangeEventSource} that performs multithreaded snapshots of replica sets.
*
* @author Chris Cranford
*/
Expand All @@ -62,30 +64,60 @@ public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEven
private final MongoDbConnectorConfig connectorConfig;
private final MongoDbTaskContext taskContext;
private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
private final ConnectionContext connectionContext;
private final ReplicaSets replicaSets;
private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
private final Clock clock;
private final SnapshotProgressListener<MongoDbPartition> snapshotProgressListener;
private final ErrorHandler errorHandler;
private AtomicBoolean aborted = new AtomicBoolean(false);
private final AtomicBoolean aborted = new AtomicBoolean(false);

public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext,
MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher, Clock clock,
SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler) {
super(connectorConfig, snapshotProgressListener);
SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler,
NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService) {
super(connectorConfig, snapshotProgressListener, notificationService);
this.connectorConfig = connectorConfig;
this.taskContext = taskContext;
this.connections = connections;
this.connectionContext = taskContext.getConnectionContext();
this.replicaSets = replicaSets;
this.dispatcher = dispatcher;
this.clock = clock;
this.snapshotProgressListener = snapshotProgressListener;
this.errorHandler = errorHandler;
}

/*
* This is required because MongoDbPartition and MongoDbOffsetContext are not managed well for MongoDB. They are only correctly initialized just before starting CDC streaming
* In the future only ReplicaSetPartition and ReplicaSetOffset should be present and initialized in the MongoDbConnectorTask
*/
@Override
protected Offsets<MongoDbPartition, OffsetContext> getOffsets(SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext,
MongoDbOffsetContext mongoDbOffsetContext, SnapshottingTask snapshottingTask) {

final MongoDbSnapshottingTask mongoDbSnapshottingTask = (MongoDbSnapshottingTask) snapshottingTask;
final MongoDbSnapshotContext mongoDbSnapshotContext = (MongoDbSnapshotContext) snapshotContext;

ReplicaSet replicaSet = getReplicaSet(mongoDbSnapshottingTask);
if (mongoDbOffsetContext == null) {
initSnapshotStartOffsets(mongoDbSnapshotContext);
return Offsets.of(snapshotContext.offset.getReplicaSetPartition(replicaSet),
snapshotContext.offset.getReplicaSetOffsetContext(replicaSet));
}

return Offsets.of(mongoDbOffsetContext.getReplicaSetPartition(replicaSet),
mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet));
}

private ReplicaSet getReplicaSet(MongoDbSnapshottingTask mongoDbSnapshottingTask) {

// In case of a Sharded Cluster, for snapshot, only the connection.mode=sharded is supported. In this case only one ReplicaSet is present.
if (mongoDbSnapshottingTask.getReplicaSetsToSnapshot().isEmpty()) { // When snapshot mode is never
return replicaSets.getSnapshotReplicaSet();
}
return mongoDbSnapshottingTask.getReplicaSetsToSnapshot().get(0);
}

@Override
protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContext context,
MongoDbOffsetContext prevOffsetCtx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public List<ReplicaSet> all() {
}

/**
* Get the ReplicaSet for the incremental snapshot
* Get the ReplicaSet for the snapshot
*
* @return in case of a ReplicaSet deployments return the only ReplicaSet available.
* In case of a Sharded Cluster, for incremental snapshot, only the connection.mode=sharded is supported. In this case only one ReplicaSet is present.
*/
public ReplicaSet getIncrementalSnapshotReplicaSet() {
public ReplicaSet getSnapshotReplicaSet() {
return all().get(0);
}

Expand Down
Loading

0 comments on commit 34e28ac

Please sign in to comment.