Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,18 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
.withValidation(VitessConnectorConfig::validateLoadBalancingPolicy)
.withDescription("Specify the default load balancing policy used to connect to Vitess, e.g., 'pick_first', 'round_robin'");

public static final Field MAX_STREAM_AGE_SECONDS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "max.stream.age.seconds")
.withDisplayName("VStream max stream age (seconds)")
.withType(Type.INT)
.withWidth(Width.SHORT)
.withDefault(0)
.withImportance(ConfigDef.Importance.MEDIUM)
.withValidation(Field::isNonNegativeInteger)
.withDescription("Maximum duration (in seconds) a VStream runs before the server terminates it with UNAVAILABLE. "
+ "The client reconnects automatically. A jitter of +/-10% is added server-side to spread out reconnections. "
+ "0 means no maximum age (disabled). "
+ "This enables periodic rebalancing for ORCA-aware gRPC load balancing.");

public static final Field INCLUDE_UNKNOWN_DATATYPES = Field.create("include.unknown.datatypes")
.withDisplayName("Include unknown datatypes")
.withType(Type.BOOLEAN)
Expand Down Expand Up @@ -554,7 +566,8 @@ private static int validateLoadBalancingPolicy(Configuration config, Field field
CONNECTOR_GENERATION,
STREAM_KEYSPACE_HEARTBEATS,
EXCLUDE_KEYSPACE_FROM_TABLE_NAME,
EXCLUDE_EMPTY_SHARDS)
EXCLUDE_EMPTY_SHARDS,
MAX_STREAM_AGE_SECONDS)
.events(
INCLUDE_UNKNOWN_DATATYPES,
SOURCE_INFO_STRUCT_MAKER)
Expand Down Expand Up @@ -787,6 +800,10 @@ public String getGrpcDefaultLoadBalancingPolicy() {
return getConfig().getString(GRPC_DEFAULT_LOAD_BALANCING_POLICY);
}

public int getMaxStreamAgeSeconds() {
return getConfig().getInteger(MAX_STREAM_AGE_SECONDS);
}

public boolean includeUnknownDatatypes() {
return getConfig().getBoolean(INCLUDE_UNKNOWN_DATATYPES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,21 @@ public void execute(ChangeEventSourceContext context, VitessPartition partition,

try {
AtomicReference<Throwable> error = new AtomicReference<>();
replicationConnection.startStreaming(
offsetContext.getRestartVgtid(), newReplicationMessageProcessor(partition, offsetContext), error);

while (context.isRunning() && error.get() == null) {
pauseNoMessage.sleepWhen(true);
replicationConnection.startStreaming(
offsetContext.getRestartVgtid(), newReplicationMessageProcessor(partition, offsetContext), error);

while (context.isRunning() && error.get() == null && !replicationConnection.isStreamRestartNeeded()) {
pauseNoMessage.sleepWhen(true);
}

if (replicationConnection.isStreamRestartNeeded()) {
replicationConnection.setStreamRestartNeeded(false);
continue;
}
}

if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
throw error.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,17 @@ public interface ReplicationConnection extends AutoCloseable {
*/
void startStreaming(
Vgtid vgtid, ReplicationMessageProcessor processor, AtomicReference<Throwable> error);

/**
* Check if the stream needs to be restarted due to server-initiated termination
* (e.g., max stream age exceeded).
*
* @return true if the stream should be restarted without bubbling up the error
*/
boolean isStreamRestartNeeded();

/**
* Set or clear the stream restart flag.
*/
void setStreamRestartNeeded(boolean needed);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class VitessReplicationConnection implements ReplicationConnection {
private final VitessConnectorConfig config;
// Channel closing is invoked from the change-event-source-coordinator thread
private final AtomicReference<ManagedChannel> managedChannel = new AtomicReference<>();
private final AtomicReference<Boolean> streamRestartNeeded = new AtomicReference<>(false);

public VitessReplicationConnection(VitessConnectorConfig config, VitessDatabaseSchema schema) {
this.messageDecoder = new VStreamOutputMessageDecoder(schema);
Expand Down Expand Up @@ -94,13 +95,30 @@ public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) {
return newBlockingStub(channel).execute(request);
}

@Override
public boolean isStreamRestartNeeded() {
return streamRestartNeeded.get();
}

@Override
public void setStreamRestartNeeded(boolean needed) {
streamRestartNeeded.set(needed);
}

@Override
public void startStreaming(
Vgtid vgtid, ReplicationMessageProcessor processor, AtomicReference<Throwable> error) {
Objects.requireNonNull(vgtid);

ManagedChannel channel = newChannel();
managedChannel.compareAndSet(null, channel);
ManagedChannel channel = managedChannel.get();
if (channel == null || channel.isShutdown() || channel.isTerminated()) {
LOGGER.info("Creating new gRPC channel for VStream");
channel = newChannel();
managedChannel.set(channel);
}
else {
LOGGER.debug("Reusing existing gRPC channel for VStream");
}

VitessGrpc.VitessStub stub = newStub(channel);

Expand Down Expand Up @@ -255,9 +273,16 @@ public void onNext(Vtgate.VStreamResponse response) {

@Override
public void onError(Throwable t) {
LOGGER.error("VStream streaming onError. Status: {}", Status.fromThrowable(t), t);
// Only propagate the first error
error.compareAndSet(null, t);
Status status = Status.fromThrowable(t);
if (shouldRestartStream(status)) {
LOGGER.info("VStream terminated, restarting stream on existing gRPC channel (preserves ORCA metrics): {}", status);
streamRestartNeeded.set(true);
}
else {
LOGGER.error("VStream streaming onError. Status: {}", Status.fromThrowable(t), t);
// Only propagate the first error
error.compareAndSet(null, t);
}
reset();
}

Expand Down Expand Up @@ -292,7 +317,8 @@ private void setError(String msg) {
.setStopOnReshard(config.getStopOnReshard())
.setExcludeKeyspaceFromTableName(config.getExcludeKeyspaceFromTableName())
.setHeartbeatInterval(getHeartbeatSeconds())
.setStreamKeyspaceHeartbeats(config.getStreamKeyspaceHeartbeats());
.setStreamKeyspaceHeartbeats(config.getStreamKeyspaceHeartbeats())
.setMaxStreamAgeSeconds(config.getMaxStreamAgeSeconds());

if (!Strings.isNullOrEmpty(config.getConfig().getString(CommonConnectorConfig.SNAPSHOT_MODE_TABLES))) {
final List<String> allTables = new VitessMetadata(config).getTables();
Expand Down Expand Up @@ -360,6 +386,20 @@ private <T extends AbstractStub<T>> T withCredentials(T stub) {
return stub;
}

/**
* Determines if the stream should be restarted (reopened) on the existing gRPC channel
* rather than propagating the error. This allows the channel to be preserved, which
* maintains ORCA metrics for weighted load balancing.
*/
private static boolean shouldRestartStream(Status status) {
if (status.getCode() == Status.Code.UNAVAILABLE
&& status.getDescription() != null
&& status.getDescription().contains("vstream exceeded maximum age")) {
return true;
}
return false;
}

private ManagedChannel newChannel() {
ManagedChannel channel = ManagedChannelBuilder.forAddress(config.getVtgateHost(), config.getVtgatePort())
.defaultLoadBalancingPolicy(config.getGrpcDefaultLoadBalancingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,34 @@ public void shouldGetConnectorGenerationDefaultValue() {
assertThat(connectorConfig.getConnectorGeneration()).isEqualTo(0);
}

@Test
public void shouldGetMaxStreamAgeSeconds() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.MAX_STREAM_AGE_SECONDS, 3600)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
assertThat(connectorConfig.getMaxStreamAgeSeconds()).isEqualTo(3600);
}

@Test
public void shouldGetMaxStreamAgeSecondsDefaultValue() {
Configuration configuration = TestHelper.defaultConfig().build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
assertThat(connectorConfig.getMaxStreamAgeSeconds()).isEqualTo(0);
}

@Test
public void shouldNegativeMaxStreamAgeSecondsFailValidation() {
Configuration configuration = TestHelper.defaultConfig()
.with(VitessConnectorConfig.MAX_STREAM_AGE_SECONDS, -1)
.build();
VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration);
List<String> inputs = new ArrayList<>();
Consumer<String> printConsumer = (input) -> {
inputs.add(input);
};
connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.MAX_STREAM_AGE_SECONDS), printConsumer);
assertThat(inputs.size()).isEqualTo(1);
}

}
26 changes: 26 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -2647,6 +2647,32 @@ private void testOffsetStorage(boolean offsetStoragePerTask) throws Exception {
Testing.print("*** Done with verifying without offset.storage.per.task");
}

@Test
public void shouldRestartStreamWhenMaxStreamAgeExceeded() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
final LogInterceptor replicationLogInterceptor = new LogInterceptor(VitessReplicationConnection.class);

int maxStreamAgeSeconds = 5;
startConnector(config -> config
.with(VitessConnectorConfig.MAX_STREAM_AGE_SECONDS, maxStreamAgeSeconds),
false);
assertConnectorIsRunning();

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);
consumer.expects(expectedRecordsCount);
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TestHelper.PK_FIELD);

int waitSeconds = maxStreamAgeSeconds * 2 + 10;
Awaitility.await()
.atMost(Duration.ofSeconds(waitSeconds))
.pollInterval(Duration.ofSeconds(1))
.until(() -> replicationLogInterceptor.containsMessage(
"VStream terminated, restarting stream on existing gRPC channel"));

assertConnectorIsRunning();
}

private void waitForGtidAcquiring(final LogInterceptor logInterceptor) {
// The inserts must happen only after GTID to stream from is obtained
Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
Expand Down
Loading