diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java index 032c2672..153e6266 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorTaskTest.java @@ -30,9 +30,10 @@ import io.debezium.config.Configuration; import io.debezium.connector.common.BaseSourceTask; -import io.debezium.embedded.KafkaConnectUtil; import io.debezium.junit.logging.LogInterceptor; import io.debezium.pipeline.ChangeEventSourceCoordinator; +import io.debezium.storage.kafka.KafkaConnectOffsetStoreAdapter; +import io.debezium.storage.kafka.offset.KafkaMemoryOffsetProvider; import io.debezium.util.Collect; import io.debezium.util.Testing; @@ -217,7 +218,7 @@ static class ContextHelper { SourceTaskContext sourceTaskContext; ContextHelper() { - this.offsetStore = KafkaConnectUtil.memoryOffsetBackingStore(); + this.offsetStore = ((KafkaConnectOffsetStoreAdapter) (new KafkaMemoryOffsetProvider()).create(null)).getDelegate(); this.sourceTaskContext = initSourceTaskContext(); } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java index 00375860..a53eeb6f 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorTest.java @@ -52,10 +52,11 @@ import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory; -import io.debezium.embedded.KafkaConnectUtil; import io.debezium.junit.logging.LogInterceptor; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Offsets; +import io.debezium.storage.kafka.KafkaConnectOffsetStoreAdapter; +import io.debezium.storage.kafka.offset.KafkaMemoryOffsetProvider; import io.debezium.util.Collect; import io.debezium.util.Testing; @@ -1889,7 +1890,7 @@ private Map> getOffsetFromStorage(int numTasks, List private Map> getOffsetFromStorage(int numTasks, List shards, int gen, int prevNumTasks, Map serverOffsets, Map> taskOffsets, Function customConfig) { - final OffsetBackingStore offsetStore = KafkaConnectUtil.memoryOffsetBackingStore(); + final OffsetBackingStore offsetStore = ((KafkaConnectOffsetStoreAdapter) (new KafkaMemoryOffsetProvider()).create(null)).getDelegate(); offsetStore.start(); storeOffsets(offsetStore, serverOffsets, taskOffsets); @@ -1913,7 +1914,7 @@ static class ContextHelper { SourceConnectorContext sourceConnectorContext; ContextHelper() { - this.offsetStore = KafkaConnectUtil.memoryOffsetBackingStore(); + this.offsetStore = ((KafkaConnectOffsetStoreAdapter) (new KafkaMemoryOffsetProvider()).create(null)).getDelegate(); this.sourceConnectorContext = initSourceConnectorContext(); }