Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,19 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withDescription("Control the interval between periodic gPRC keepalive pings for VStream." +
" Defaults to Long.MAX_VALUE (disabled).");

public static final Field TRANSACTION_CHUNK_SIZE_BYTES = Field.create(VITESS_CONFIG_GROUP_PREFIX + "transaction.chunk.size.bytes")
.withDisplayName("Transaction chunk size (bytes)")
.withType(Type.LONG)
.withDefault(0L)
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(VitessConnectorConfig::validateTransactionChunkSize)
.withDescription("Enables transaction chunking on the VTGate VStream when set to a positive value (in bytes). " +
"When a transaction exceeds this threshold, both the VTGate and Debezium will process the transaction in chunks " +
"rather than buffering the entire transaction in memory. This is recommended to prevent out-of-memory errors " +
"when processing large transactions. A value of 0 (default) disables chunking. " +
"See https://vitess.io/docs/24.0/reference/vreplication/vstream/#transactionchunksize for more details.");

public static final Field GRPC_HEADERS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "grpc.headers")
.withDisplayName("VStream gRPC headers")
.withType(Type.STRING)
Expand Down Expand Up @@ -512,6 +525,20 @@ private static int validateLoadBalancingPolicy(Configuration config, Field field
return 0;
}

private static int validateTransactionChunkSize(Configuration config, Field field, ValidationOutput problems) {
long chunkSize = config.getLong(TRANSACTION_CHUNK_SIZE_BYTES, 0L);
if (chunkSize < 0) {
problems.accept(TRANSACTION_CHUNK_SIZE_BYTES, chunkSize, "Transaction chunk size must be >= 0");
return 1;
}
if (chunkSize == 0) {
LOGGER.warn("Transaction chunk size is disabled (set to 0). Large transactions may cause out-of-memory errors. " +
"Consider setting {} to a value like 134217728 (128MB) to enable transaction chunking.",
TRANSACTION_CHUNK_SIZE_BYTES.name());
}
return 0;
}

public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(VitessSourceInfoStructMaker.class.getName());

Expand All @@ -530,6 +557,7 @@ private static int validateLoadBalancingPolicy(Configuration config, Field field
TABLET_TYPE,
STOP_ON_RESHARD_FLAG,
KEEPALIVE_INTERVAL_MS,
TRANSACTION_CHUNK_SIZE_BYTES,
GRPC_HEADERS,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
Expand Down Expand Up @@ -743,6 +771,10 @@ public Duration getKeepaliveInterval() {
return getConfig().getDuration(KEEPALIVE_INTERVAL_MS, ChronoUnit.MILLIS);
}

public long getTransactionChunkSizeBytes() {
return getConfig().getLong(TRANSACTION_CHUNK_SIZE_BYTES, 0L);
}

public Map<String, String> getGrpcHeaders() {
String grpcHeaders = getConfig().getString(GRPC_HEADERS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,13 @@ public void startStreaming(
StreamObserver<Vtgate.VStreamResponse> responseObserver = new StreamObserver<Vtgate.VStreamResponse>() {
private List<VEvent> bufferedEvents = new ArrayList<>();
private Vgtid newVgtid;
private Vgtid lastVgtid = vgtid;
private boolean beginEventSeen;
private boolean commitEventSeen;
private int numOfRowEvents;
private int numResponses;
private final boolean isChunkingEnabled = config.getTransactionChunkSizeBytes() > 0;
private boolean inTransaction = false;
private boolean endedChunkedTransaction = false;
private boolean isInVStreamCopy = vgtid.willTriggerVStreamCopy();

@Override
Expand All @@ -134,7 +137,6 @@ public void onNext(Vtgate.VStreamResponse response) {
LOGGER.debug("VEvent: {}", event);
switch (event.getType()) {
case ROW:
numOfRowEvents++;
break;
case VGTID:
// We always use the latest VGTID if any.
Expand All @@ -155,7 +157,13 @@ public void onNext(Vtgate.VStreamResponse response) {
break;
case BEGIN:
// We should only see BEGIN before seeing COMMIT.
if (commitEventSeen) {
inTransaction = true;
if (commitEventSeen && isChunkingEnabled) {
LOGGER.info("Transaction chunking: received BEGIN after COMMIT, " +
"indicating a chunked transaction has been committed, and a new transaction" +
"is beginning within the same VStreamResponse");
}
else if (commitEventSeen) {
String msg = "Received BEGIN event after receiving COMMIT event";
setError(msg);
return;
Expand All @@ -179,11 +187,16 @@ public void onNext(Vtgate.VStreamResponse response) {
break;
case COMMIT:
// We should only see COMMIT after seeing BEGIN.
if (!beginEventSeen) {
if (!beginEventSeen && inTransaction && isChunkingEnabled) {
LOGGER.info("Transaction chunking: received COMMIT event in separate chunk");
endedChunkedTransaction = true;
}
else if (!beginEventSeen) {
String msg = "Received COMMIT event before receiving BEGIN event";
setError(msg);
return;
}
inTransaction = false;
if (commitEventSeen) {
String msg = "Received duplicate COMMIT events";
setError(msg);
Expand Down Expand Up @@ -211,6 +224,13 @@ public void onNext(Vtgate.VStreamResponse response) {

numResponses++;

// If chunking is enabled, and we are either in a chunked transaction (possibly split across
// several responses) or we just ended a chunked transaction, then send the events now.
if (isChunkingEnabled && (inTransaction || endedChunkedTransaction)) {
LOGGER.info("Transaction chunking: received {} events during this chunk, " +
"transaction chunk size bytes {}", bufferedEvents.size(), transactionChunkSizeBytes);
sendNow = true;
}
// We only proceed when we receive a complete transaction after seeing both BEGIN and COMMIT events,
// OR if sendNow flag is true (meaning we should process buffered events immediately).
if ((!beginEventSeen || !commitEventSeen) && !sendNow) {
Expand All @@ -221,25 +241,40 @@ public void onNext(Vtgate.VStreamResponse response) {
LOGGER.debug("Processing multi-response transaction: number of responses is {}", numResponses);
}
// If there is a heartbeat event we do not want to skip (we want to send the heartbeat)
if (newVgtid == null && !heartbeatReceived) {
// newVgtid may be null with chunking since events are sent eagerly (not buffered up to VGTID & COMMIT)
if (!isChunkingEnabled && newVgtid == null && !heartbeatReceived) {
LOGGER.warn("Skipping because no vgtid is found in buffered event types: {}",
bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", ")));
reset();
return;
}

// Send the buffered events that belong to the same transaction.
// If chunking is enabled, use the previous (trailing) VGTID for all messages in the transaction
// since we can't know what the committed VGTID of this transaction is during the earlier chunks
Vgtid messageVgtid;
if (isChunkingEnabled) {
messageVgtid = lastVgtid;
}
// If chunking is disabled, set all the messages' VGTID to the committed VGTID for this transaction
// (since we buffered the entire transaction in memory before sending, we know the committed VGTID)
else {
messageVgtid = newVgtid;
}
// If a newVgtid has been read (a new VGTID event received) so it is not null, then set the
// lastVgtid to be equal to this newValue ahead of the next response
if (newVgtid != null) {
lastVgtid = newVgtid;
}

// Send the buffered events that belong to the same transaction (note: it may be only part of the
// transaction if chunking is enabled).
try {
int rowEventSeen = 0;
for (int i = 0; i < bufferedEvents.size(); i++) {
Binlogdata.VEvent event = bufferedEvents.get(i);
if (event.getType() == Binlogdata.VEventType.ROW) {
rowEventSeen++;
}
if (isInVStreamCopy && event.getType() == Binlogdata.VEventType.COPY_COMPLETED) {
isInVStreamCopy = false;
}
messageDecoder.processMessage(bufferedEvents.get(i), processor, newVgtid, isInVStreamCopy);
messageDecoder.processMessage(bufferedEvents.get(i), processor, messageVgtid, isInVStreamCopy);
}
}
catch (InterruptedException e) {
Expand Down Expand Up @@ -272,7 +307,6 @@ private void reset() {
newVgtid = null;
beginEventSeen = false;
commitEventSeen = false;
numOfRowEvents = 0;
numResponses = 0;
}

Expand All @@ -291,6 +325,7 @@ private void setError(String msg) {
Vtgate.VStreamFlags.Builder vStreamFlagsBuilder = Vtgate.VStreamFlags.newBuilder()
.setStopOnReshard(config.getStopOnReshard())
.setExcludeKeyspaceFromTableName(config.getExcludeKeyspaceFromTableName())
.setTransactionChunkSize(config.getTransactionChunkSizeBytes())
.setHeartbeatInterval(getHeartbeatSeconds())
.setStreamKeyspaceHeartbeats(config.getStreamKeyspaceHeartbeats());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.junit.jupiter.api.Assertions.fail;

import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -1112,4 +1113,41 @@ private void assertSchema(Struct content) {
VerifyRecord.assertConnectSchemasAreEqual(field.name(), field.schema(), this.schema);
}
}

protected static class TransactionMetadata {
final Long epoch;
final BigDecimal rank;
final Long totalOrder;

TransactionMetadata(Long epoch, BigDecimal rank, Long totalOrder) {
this.epoch = epoch;
this.rank = rank;
this.totalOrder = totalOrder;
}
}

protected TransactionMetadata extractTransactionMetadata(SourceRecord record) {
Struct value = (Struct) record.value();
Struct transaction = value.getStruct("transaction");

Long epoch = transaction.getInt64("transaction_epoch");
BigDecimal rank = (BigDecimal) transaction.get("transaction_rank");
Long totalOrder = transaction.getInt64("total_order");

return new TransactionMetadata(epoch, rank, totalOrder);
}

protected int compareTransactionMetadata(TransactionMetadata m1, TransactionMetadata m2) {
int epochCompare = Long.compare(m1.epoch, m2.epoch);
if (epochCompare != 0) {
return epochCompare;
}

int rankCompare = m1.rank.compareTo(m2.rank);
if (rankCompare != 0) {
return rankCompare;
}

return Long.compare(m1.totalOrder, m2.totalOrder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,48 @@ public void shouldFilterTablesToCopyWithExactTableName() {
assertThat(tablesToCopy).containsExactly("numeric_table");
}

@Test
public void shouldDefaultTransactionChunkSizeToZero() {
Configuration configuration = TestHelper.defaultConfig().build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
assertThat(connectorConfig.getTransactionChunkSizeBytes()).isEqualTo(0L);
}

@Test
public void shouldSetTransactionChunkSizeToPositiveValue() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 134217728L)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
assertThat(connectorConfig.getTransactionChunkSizeBytes()).isEqualTo(134217728L);
}

@Test
public void shouldNegativeTransactionChunkSizeFailValidation() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, -1L)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

@Test
public void shouldZeroTransactionChunkSizePassValidation() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES, 0L)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.TRANSACTION_CHUNK_SIZE_BYTES), printConsumer);
assertThat(inputs.size()).isEqualTo(0);
}

}
Loading
Loading