diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 2e3487b1..a77b8319 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -337,6 +337,19 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .withDescription("Control the interval between periodic gPRC keepalive pings for VStream." + " Defaults to Long.MAX_VALUE (disabled)."); + public static final Field TRANSACTION_CHUNK_SIZE_BYTES = Field.create(VITESS_CONFIG_GROUP_PREFIX + "transaction.chunk.size.bytes") + .withDisplayName("Transaction chunk size (bytes)") + .withType(Type.LONG) + .withDefault(0L) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.HIGH) + .withValidation(VitessConnectorConfig::validateTransactionChunkSize) + .withDescription("Enables transaction chunking on the VTGate VStream when set to a positive value (in bytes). " + + "When a transaction exceeds this threshold, both the VTGate and Debezium will process the transaction in chunks " + + "rather than buffering the entire transaction in memory. This is recommended to prevent out-of-memory errors " + + "when processing large transactions. A value of 0 (default) disables chunking. " + + "See https://vitess.io/docs/24.0/reference/vreplication/vstream/#transactionchunksize for more details."); + public static final Field GRPC_HEADERS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "grpc.headers") .withDisplayName("VStream gRPC headers") .withType(Type.STRING) @@ -512,6 +525,20 @@ private static int validateLoadBalancingPolicy(Configuration config, Field field return 0; } + private static int validateTransactionChunkSize(Configuration config, Field field, ValidationOutput problems) { + long chunkSize = config.getLong(TRANSACTION_CHUNK_SIZE_BYTES, 0L); + if (chunkSize < 0) { + problems.accept(TRANSACTION_CHUNK_SIZE_BYTES, chunkSize, "Transaction chunk size must be >= 0"); + return 1; + } + if (chunkSize == 0) { + LOGGER.warn("Transaction chunk size is disabled (set to 0). Large transactions may cause out-of-memory errors. " + + "Consider setting {} to a value like 134217728 (128MB) to enable transaction chunking.", + TRANSACTION_CHUNK_SIZE_BYTES.name()); + } + return 0; + } + public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER .withDefault(VitessSourceInfoStructMaker.class.getName()); @@ -530,6 +557,7 @@ private static int validateLoadBalancingPolicy(Configuration config, Field field TABLET_TYPE, STOP_ON_RESHARD_FLAG, KEEPALIVE_INTERVAL_MS, + TRANSACTION_CHUNK_SIZE_BYTES, GRPC_HEADERS, GRPC_MAX_INBOUND_MESSAGE_SIZE, BINARY_HANDLING_MODE, @@ -743,6 +771,10 @@ public Duration getKeepaliveInterval() { return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS); } + public long getTransactionChunkSizeBytes() { + return getConfig().getLong(TRANSACTION_CHUNK_SIZE_BYTES, 0L); + } + public Map getGrpcHeaders() { String grpcHeaders = getConfig().getString(GRPC_HEADERS); diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 94a42c54..f1e7790c 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -118,10 +118,13 @@ public void startStreaming( StreamObserver responseObserver = new StreamObserver() { private List bufferedEvents = new ArrayList<>(); private Vgtid newVgtid; + private Vgtid lastVgtid = vgtid; private boolean beginEventSeen; private boolean commitEventSeen; - private int numOfRowEvents; private int numResponses; + private final boolean isChunkingEnabled = config.getTransactionChunkSizeBytes() > 0; + private boolean inTransaction = false; + private boolean endedChunkedTransaction = false; private boolean isInVStreamCopy = vgtid.willTriggerVStreamCopy(); @Override @@ -134,7 +137,6 @@ public void onNext(Vtgate.VStreamResponse response) { LOGGER.debug("VEvent: {}", event); switch (event.getType()) { case ROW: - numOfRowEvents++; break; case VGTID: // We always use the latest VGTID if any. @@ -155,7 +157,13 @@ public void onNext(Vtgate.VStreamResponse response) { break; case BEGIN: // We should only see BEGIN before seeing COMMIT. - if (commitEventSeen) { + inTransaction = true; + if (commitEventSeen && isChunkingEnabled) { + LOGGER.info("Transaction chunking: received BEGIN after COMMIT, " + + "indicating a chunked transaction has been committed, and a new transaction" + + "is beginning within the same VStreamResponse"); + } + else if (commitEventSeen) { String msg = "Received BEGIN event after receiving COMMIT event"; setError(msg); return; @@ -179,11 +187,16 @@ public void onNext(Vtgate.VStreamResponse response) { break; case COMMIT: // We should only see COMMIT after seeing BEGIN. - if (!beginEventSeen) { + if (!beginEventSeen && inTransaction && isChunkingEnabled) { + LOGGER.info("Transaction chunking: received COMMIT event in separate chunk"); + endedChunkedTransaction = true; + } + else if (!beginEventSeen) { String msg = "Received COMMIT event before receiving BEGIN event"; setError(msg); return; } + inTransaction = false; if (commitEventSeen) { String msg = "Received duplicate COMMIT events"; setError(msg); @@ -211,6 +224,13 @@ public void onNext(Vtgate.VStreamResponse response) { numResponses++; + // If chunking is enabled, and we are either in a chunked transaction (possibly split across + // several responses) or we just ended a chunked transaction, then send the events now. + if (isChunkingEnabled && (inTransaction || endedChunkedTransaction)) { + LOGGER.info("Transaction chunking: received {} events during this chunk, " + + "transaction chunk size bytes {}", bufferedEvents.size(), transactionChunkSizeBytes); + sendNow = true; + } // We only proceed when we receive a complete transaction after seeing both BEGIN and COMMIT events, // OR if sendNow flag is true (meaning we should process buffered events immediately). if ((!beginEventSeen || !commitEventSeen) && !sendNow) { @@ -221,25 +241,40 @@ public void onNext(Vtgate.VStreamResponse response) { LOGGER.debug("Processing multi-response transaction: number of responses is {}", numResponses); } // If there is a heartbeat event we do not want to skip (we want to send the heartbeat) - if (newVgtid == null && !heartbeatReceived) { + // newVgtid may be null with chunking since events are sent eagerly (not buffered up to VGTID & COMMIT) + if (!isChunkingEnabled && newVgtid == null && !heartbeatReceived) { LOGGER.warn("Skipping because no vgtid is found in buffered event types: {}", bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", "))); reset(); return; } - // Send the buffered events that belong to the same transaction. + // If chunking is enabled, use the previous (trailing) VGTID for all messages in the transaction + // since we can't know what the committed VGTID of this transaction is during the earlier chunks + Vgtid messageVgtid; + if (isChunkingEnabled) { + messageVgtid = lastVgtid; + } + // If chunking is disabled, set all the messages' VGTID to the committed VGTID for this transaction + // (since we buffered the entire transaction in memory before sending, we know the committed VGTID) + else { + messageVgtid = newVgtid; + } + // If a newVgtid has been read (a new VGTID event received) so it is not null, then set the + // lastVgtid to be equal to this newValue ahead of the next response + if (newVgtid != null) { + lastVgtid = newVgtid; + } + + // Send the buffered events that belong to the same transaction (note: it may be only part of the + // transaction if chunking is enabled). try { - int rowEventSeen = 0; for (int i = 0; i < bufferedEvents.size(); i++) { Binlogdata.VEvent event = bufferedEvents.get(i); - if (event.getType() == Binlogdata.VEventType.ROW) { - rowEventSeen++; - } if (isInVStreamCopy && event.getType() == Binlogdata.VEventType.COPY_COMPLETED) { isInVStreamCopy = false; } - messageDecoder.processMessage(bufferedEvents.get(i), processor, newVgtid, isInVStreamCopy); + messageDecoder.processMessage(bufferedEvents.get(i), processor, messageVgtid, isInVStreamCopy); } } catch (InterruptedException e) { @@ -272,7 +307,6 @@ private void reset() { newVgtid = null; beginEventSeen = false; commitEventSeen = false; - numOfRowEvents = 0; numResponses = 0; } @@ -291,6 +325,7 @@ private void setError(String msg) { Vtgate.VStreamFlags.Builder vStreamFlagsBuilder = Vtgate.VStreamFlags.newBuilder() .setStopOnReshard(config.getStopOnReshard()) .setExcludeKeyspaceFromTableName(config.getExcludeKeyspaceFromTableName()) + .setTransactionChunkSize(config.getTransactionChunkSizeBytes()) .setHeartbeatInterval(getHeartbeatSeconds()) .setStreamKeyspaceHeartbeats(config.getStreamKeyspaceHeartbeats()); diff --git a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java index ccaf5d60..08f775fa 100644 --- a/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java +++ b/src/test/java/io/debezium/connector/vitess/AbstractVitessConnectorTest.java @@ -15,6 +15,7 @@ import static org.junit.jupiter.api.Assertions.fail; import java.lang.management.ManagementFactory; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -1112,4 +1113,41 @@ private void assertSchema(Struct content) { VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema); } } + + protected static class TransactionMetadata { + final Long epoch; + final BigDecimal rank; + final Long totalOrder; + + TransactionMetadata(Long epoch, BigDecimal rank, Long totalOrder) { + this.epoch = epoch; + this.rank = rank; + this.totalOrder = totalOrder; + } + } + + protected TransactionMetadata extractTransactionMetadata(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct transaction = value.getStruct("transaction"); + + Long epoch = transaction.getInt64("transaction_epoch"); + BigDecimal rank = (BigDecimal) transaction.get("transaction_rank"); + Long totalOrder = transaction.getInt64("total_order"); + + return new TransactionMetadata(epoch, rank, totalOrder); + } + + protected int compareTransactionMetadata(TransactionMetadata m1, TransactionMetadata m2) { + int epochCompare = Long.compare(m1.epoch, m2.epoch); + if (epochCompare != 0) { + return epochCompare; + } + + int rankCompare = m1.rank.compareTo(m2.rank); + if (rankCompare != 0) { + return rankCompare; + } + + return Long.compare(m1.totalOrder, m2.totalOrder); + } } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java index 5bfafb48..bd36848f 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java @@ -266,4 +266,48 @@ public void shouldFilterTablesToCopyWithExactTableName() { assertThat(tablesToCopy).containsExactly("numeric_table"); } + @Test + public void shouldDefaultTransactionChunkSizeToZero() { + Configuration configuration = TestHelper.defaultConfig().build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + assertThat(connectorConfig.getTransactionChunkSizeBytes()).isEqualTo(0L); + } + + @Test + public void shouldSetTransactionChunkSizeToPositiveValue() { + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 134217728L) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + assertThat(connectorConfig.getTransactionChunkSizeBytes()).isEqualTo(134217728L); + } + + @Test + public void shouldNegativeTransactionChunkSizeFailValidation() { + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, -1L) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES), printConsumer); + assertThat(inputs.size()).isEqualTo(1); + } + + @Test + public void shouldZeroTransactionChunkSizePassValidation() { + Configuration configuration = TestHelper.defaultConfig() + .with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 0L) + .build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES), printConsumer); + assertThat(inputs.size()).isEqualTo(0); + } + } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 7a2fcf39..865f1307 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -41,6 +41,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.LongStream; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; @@ -1824,6 +1825,165 @@ public void shouldSanitizeDecimalValue() throws Exception { assertInsert(INSERT_NUMERIC_TYPES_STMT, fields, TestHelper.PK_FIELD); } + @Test + public void shouldChunkLargeTransactionsAcrossMultipleShards() throws Exception { + final LogInterceptor logInterceptor = new LogInterceptor(VitessReplicationConnection.class); + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); + TestHelper.applyVSchema("vitess_vschema.json"); + startConnector(config -> config + .with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 100000L) + .with(VitessConnectorConfig.PROVIDE_TRANSACTION_METADATA, true), + true, + "-80,80-"); + assertConnectorIsRunning(); + + int transactionEvents = 4; + int dataEvents = 50; + int totalEvents = transactionEvents + dataEvents; + consumer = testConsumer(totalEvents); + + String largeString = "x".repeat(10000); + StringBuilder insertStatement = new StringBuilder("INSERT INTO string_table (text_col, mediumtext_col, longtext_col) VALUES "); + for (int i = 0; i < dataEvents; i++) { + if (i > 0) { + insertStatement.append(", "); + } + insertStatement.append("('").append(largeString).append("', '") + .append(largeString).append("', '").append(largeString).append("')"); + } + + executeAndWait(insertStatement.toString(), TEST_SHARDED_KEYSPACE); + + String firstShard = null; + String secondShard = null; + int shardSwitchIndex = -1; + Map> shardToEventIndices = new HashMap<>(); + Map> shardToTotalOrders = new HashMap<>(); + int dataRecordCount = 0; + + for (int i = 0; i < totalEvents; i++) { + SourceRecord record = consumer.remove(); + + if (record.topic().endsWith(".transaction")) { + continue; + } + + Struct value = (Struct) record.value(); + Struct source = value.getStruct("source"); + String currentShard = source.getString("shard"); + + Struct transaction = value.getStruct("transaction"); + long totalOrder = transaction.getInt64("total_order"); + + if (firstShard == null) { + firstShard = currentShard; + } + else if (secondShard == null && !currentShard.equals(firstShard)) { + secondShard = currentShard; + shardSwitchIndex = dataRecordCount; + } + + shardToEventIndices.computeIfAbsent(currentShard, k -> new ArrayList<>()).add(dataRecordCount); + shardToTotalOrders.computeIfAbsent(currentShard, k -> new ArrayList<>()).add(totalOrder); + + dataRecordCount++; + } + + assertThat(dataRecordCount).isEqualTo(dataEvents); + assertThat(shardSwitchIndex).isGreaterThan(0).isLessThan(dataEvents); + + List firstShardIndices = shardToEventIndices.get(firstShard); + List secondShardIndices = shardToEventIndices.get(secondShard); + + assertThat(firstShardIndices).containsExactlyElementsOf( + IntStream.range(0, firstShardIndices.size()).boxed().toList()); + assertThat(secondShardIndices).containsExactlyElementsOf( + IntStream.range(shardSwitchIndex, shardSwitchIndex + secondShardIndices.size()).boxed().toList()); + + List firstShardOrders = shardToTotalOrders.get(firstShard); + List secondShardOrders = shardToTotalOrders.get(secondShard); + + assertThat(firstShardOrders).containsExactlyElementsOf( + LongStream.rangeClosed(1, firstShardOrders.size()).boxed().toList()); + assertThat(secondShardOrders).containsExactlyElementsOf( + LongStream.rangeClosed(1, secondShardOrders.size()).boxed().toList()); + + assertThat(logInterceptor.containsMessage("Transaction chunking")).isTrue(); + } + + @Test + public void shouldMaintainTransactionOrderingWhenEnablingChunking() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl", TEST_UNSHARDED_KEYSPACE); + + startConnector(config -> config + .with(VitessConnectorConfig.KEYSPACE, TEST_UNSHARDED_KEYSPACE) + .with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true), + false); + assertConnectorIsRunning(); + + int txMetadataEventCount = 2; + int tx1DataEvents = 2; + int tx1TotalEvents = tx1DataEvents + txMetadataEventCount; + consumer = testConsumer(tx1TotalEvents); + executeAndWait("INSERT INTO numeric_table (tinyint_col) VALUES (1), (2)", TEST_UNSHARDED_KEYSPACE); + + List tx1Metadata = new ArrayList<>(); + while (tx1Metadata.size() < tx1DataEvents) { + SourceRecord record = consumer.remove(); + if (record.topic().endsWith(".transaction")) { + continue; + } + tx1Metadata.add(extractTransactionMetadata(record)); + } + + stopConnector(); + assertConnectorNotRunning(); + + startConnector(config -> config + .with(VitessConnectorConfig.KEYSPACE, TEST_UNSHARDED_KEYSPACE) + .with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 100000L) + .with(CommonConnectorConfig.TRANSACTION_METADATA_FACTORY, VitessOrderedTransactionMetadataFactory.class) + .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true), + false); + assertConnectorIsRunning(); + + int tx2DataEvents = 50; + int tx2TotalEvents = tx2DataEvents + txMetadataEventCount; + consumer = testConsumer(tx2TotalEvents); + + String largeString = "x".repeat(10000); + StringBuilder insertStatement = new StringBuilder("INSERT INTO string_table (text_col, mediumtext_col, longtext_col) VALUES "); + for (int i = 0; i < tx2DataEvents; i++) { + if (i > 0) { + insertStatement.append(", "); + } + insertStatement.append("('").append(largeString).append("', '") + .append(largeString).append("', '").append(largeString).append("')"); + } + executeAndWait(insertStatement.toString(), TEST_UNSHARDED_KEYSPACE); + + List tx2Metadata = new ArrayList<>(); + while (tx2Metadata.size() < tx2DataEvents) { + SourceRecord record = consumer.remove(); + if (record.topic().endsWith(".transaction")) { + continue; + } + tx2Metadata.add(extractTransactionMetadata(record)); + } + + for (TransactionMetadata tx1Meta : tx1Metadata) { + for (TransactionMetadata tx2Meta : tx2Metadata) { + int comparison = compareTransactionMetadata(tx1Meta, tx2Meta); + assertThat(comparison) + .as("tx1 event (epoch=%s, rank=%s, order=%s) should be ordered before tx2 event (epoch=%s, rank=%s, order=%s)", + tx1Meta.epoch, tx1Meta.rank, tx1Meta.totalOrder, + tx2Meta.epoch, tx2Meta.rank, tx2Meta.totalOrder) + .isLessThan(0); + } + } + } + @Test @FixFor("DBZ-3668") public void shouldOutputRecordsInCloudEventsFormat() throws Exception {