diff --git a/src/main/java/io/debezium/connector/vitess/TablePrimaryKeys.java b/src/main/java/io/debezium/connector/vitess/TablePrimaryKeys.java index 031fa64b..009b97d9 100644 --- a/src/main/java/io/debezium/connector/vitess/TablePrimaryKeys.java +++ b/src/main/java/io/debezium/connector/vitess/TablePrimaryKeys.java @@ -151,10 +151,12 @@ public TableLastPrimaryKey(String tableName, Query.QueryResult lastPrimaryKey) { @JsonIgnore public Binlogdata.TableLastPK getRawTableLastPrimaryKey() { - return Binlogdata.TableLastPK.newBuilder() - .setTableName(tableName) - .setLastpk(lastPrimaryKey.getRawQueryResult()) - .build(); + Binlogdata.TableLastPK.Builder builder = Binlogdata.TableLastPK.newBuilder() + .setTableName(tableName); + if (lastPrimaryKey != null) { + builder.setLastpk(lastPrimaryKey.getRawQueryResult()); + } + return builder.build(); } @Override diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 94a42c54..bf82ae86 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -164,7 +164,8 @@ public void onNext(Vtgate.VStreamResponse response) { String msg = "Received duplicate BEGIN events"; // During a copy operation, we receive the duplicate event once when no record is copied. String eventTypes = bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", ")); - if (eventTypes.equals("BEGIN, FIELD") || eventTypes.equals("BEGIN, FIELD, VGTID") || eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD")) { + if (eventTypes.equals("BEGIN, FIELD") || eventTypes.equals("BEGIN, FIELD, VGTID") || eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD") + || eventTypes.equals("COPY_COMPLETED, BEGIN, FIELD, VGTID")) { msg += String.format(" during a copy operation. No harm to skip the buffered events. Buffered event types: %s", eventTypes); LOGGER.info(msg); @@ -367,6 +368,9 @@ private ManagedChannel newChannel() { .maxInboundMessageSize(config.getGrpcMaxInboundMessageSize()) .keepAliveTime(config.getKeepaliveInterval().toMillis(), TimeUnit.MILLISECONDS) .build(); + LOGGER.info("Channel configs - {}:{}, load balancer: {}, max inbound message size: {}, keep alive interval: {}", + config.getVtgateHost(), config.getVtgatePort(), config.getGrpcDefaultLoadBalancingPolicy(), config.getGrpcMaxInboundMessageSize(), + config.getKeepaliveInterval()); return channel; } diff --git a/src/test/java/io/debezium/connector/vitess/TablePrimaryKeysTest.java b/src/test/java/io/debezium/connector/vitess/TablePrimaryKeysTest.java index 71a08ee8..0825491a 100644 --- a/src/test/java/io/debezium/connector/vitess/TablePrimaryKeysTest.java +++ b/src/test/java/io/debezium/connector/vitess/TablePrimaryKeysTest.java @@ -194,4 +194,19 @@ public void testTablePKsFromJson() { TablePrimaryKeys tablePrimaryKeys = TablePrimaryKeys.of(TEST_LAST_PKS_JSON); JSONAssert.assertEquals(tablePrimaryKeys.toString(), TEST_LAST_PKS_JSON, true); } + + @Test + public void shouldHandleNullLastPkInVgtidJson() { + String vgtidJsonWithNullLastPk = "[{\"keyspace\":\"test_keyspace\",\"shard\":\"-80\"," + + "\"gtid\":\"MySQL56/a790d864-9ba1-11ea-99f6-0242ac11000a:1-1513\"," + + "\"table_p_ks\":[{\"table_name\":\"test_table\",\"lastpk\":null}]}]"; + + Vgtid vgtid = Vgtid.of(vgtidJsonWithNullLastPk); + + assertThat(vgtid.getShardGtids()).hasSize(1); + assertThat(vgtid.getShardGtids().get(0).getTableLastPrimaryKeys()).hasSize(1); + TablePrimaryKeys.TableLastPrimaryKey tableLastPK = vgtid.getShardGtids().get(0).getTableLastPrimaryKeys().get(0); + assertThat(tableLastPK.getTableName()).isEqualTo("test_table"); + assertThat(tableLastPK.getLastPrimaryKey()).isNull(); + } }