Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/main/java/io/debezium/connector/vitess/TablePrimaryKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ public TableLastPrimaryKey(String tableName, Query.QueryResult lastPrimaryKey) {

@JsonIgnore
public Binlogdata.TableLastPK getRawTableLastPrimaryKey() {
return Binlogdata.TableLastPK.newBuilder()
.setTableName(tableName)
.setLastpk(lastPrimaryKey.getRawQueryResult())
.build();
Binlogdata.TableLastPK.Builder builder = Binlogdata.TableLastPK.newBuilder()
.setTableName(tableName);
if (lastPrimaryKey != null) {
builder.setLastpk(lastPrimaryKey.getRawQueryResult());
}
return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public void onNext(Vtgate.VStreamResponse response) {
String msg = "Received duplicate BEGIN events";
// During a copy operation, we receive the duplicate event once when no record is copied.
String eventTypes = bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", "));
if (eventTypes.equals("BEGIN, FIELD") || eventTypes.equals("BEGIN, FIELD, VGTID") || eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD")) {
if (eventTypes.equals("BEGIN, FIELD") || eventTypes.equals("BEGIN, FIELD, VGTID") || eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD")
|| eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD, VGTID")) {
msg += String.format(" during a copy operation. No harm to skip the buffered events. Buffered event types: %s",
eventTypes);
LOGGER.info(msg);
Expand Down Expand Up @@ -367,6 +368,9 @@ private ManagedChannel newChannel() {
.maxInboundMessageSize(config.getGrpcMaxInboundMessageSize())
.keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS)
.build();
LOGGER.info("Channel configs - {}:{}, load balancer: {}, max inbound message size: {}, keep alive interval: {}",
config.getVtgateHost(), config.getVtgatePort(), config.getGrpcDefaultLoadBalancingPolicy(), config.getGrpcMaxInboundMessageSize(),
config.getKeepaliveInterval());
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,19 @@ public void testTablePKsFromJson() {
TablePrimaryKeys tablePrimaryKeys = TablePrimaryKeys.of(TEST_LAST_PKS_JSON);
JSONAssert.assertEquals(tablePrimaryKeys.toString(), TEST_LAST_PKS_JSON, true);
}

@Test
public void shouldHandleNullLastPkInVgtidJson() {
String vgtidJsonWithNullLastPk = "[{\"keyspace\":\"test_keyspace\",\"shard\":\"-80\"," +
"\"gtid\":\"MySQL56/a790d864-9ba1-11ea-99f6-0242ac11000a:1-1513\"," +
"\"table_p_ks\":[{\"table_name\":\"test_table\",\"lastpk\":null}]}]";

Vgtid vgtid = Vgtid.of(vgtidJsonWithNullLastPk);

assertThat(vgtid.getShardGtids()).hasSize(1);
assertThat(vgtid.getShardGtids().get(0).getTableLastPrimaryKeys()).hasSize(1);
TablePrimaryKeys.TableLastPrimaryKey tableLastPK = vgtid.getShardGtids().get(0).getTableLastPrimaryKeys().get(0);
assertThat(tableLastPK.getTableName()).isEqualTo("test_table");
assertThat(tableLastPK.getLastPrimaryKey()).isNull();
}
}
Loading