From 2699b9c224b1a822cfd607c2c7dd5c0fdd5094b5 Mon Sep 17 00:00:00 2001 From: Thomas Thornton Date: Fri, 15 May 2026 16:01:23 -0700 Subject: [PATCH] debezium/dbz#1879 Add SMT EnforceRecordSize Signed-off-by: Thomas Thornton --- .../vitess/transforms/EnforceRecordSize.java | 264 +++++++++ .../connector/vitess/VitessConnectorIT.java | 92 ++++ .../transforms/EnforceRecordSizeTest.java | 513 ++++++++++++++++++ 3 files changed, 869 insertions(+) create mode 100644 src/main/java/io/debezium/connector/vitess/transforms/EnforceRecordSize.java create mode 100644 src/test/java/io/debezium/connector/vitess/transforms/EnforceRecordSizeTest.java diff --git a/src/main/java/io/debezium/connector/vitess/transforms/EnforceRecordSize.java b/src/main/java/io/debezium/connector/vitess/transforms/EnforceRecordSize.java new file mode 100644 index 00000000..0eebc66c --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/transforms/EnforceRecordSize.java @@ -0,0 +1,264 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.transforms; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; + +import io.debezium.Module; + +/** + * A Single Message Transform that enforces a maximum record size. + * + * This is useful when downstream systems have a maximum message size limit. + * The transform estimates the serialized size of the record and, if it exceeds + * the configured maximum, applies a size reduction strategy. + * + * Supported strategies: + * + * + * @author Thomas Thornton + */ +public class EnforceRecordSize> implements Transformation, Versioned { + + public static final String MAX_BYTES_CONF = "max.bytes"; + public static final String COMPRESSION_RATIO_CONF = "compression.ratio"; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(MAX_BYTES_CONF, + ConfigDef.Type.INT, + ConfigDef.NO_DEFAULT_VALUE, + ConfigDef.Importance.HIGH, + "The maximum record size in bytes. Records exceeding this size will have their " + + "string and bytes columns truncated proportionally to fit within this limit.") + .define(COMPRESSION_RATIO_CONF, + ConfigDef.Type.DOUBLE, + 1.0, + ConfigDef.Importance.MEDIUM, + "Ratio to account for record serialization differences. The estimated record size " + + "is multiplied by this ratio before comparing to the max size. " + + "For example, if your serialization compresses raw record size by 50%, " + + "set this to 0.5. Downstream systems typically provide metrics to discover " + + "the effective ratio, e.g. Kafka exposes " + + "kafka.producer:type=producer-metrics,client-id=/compression-rate-avg. " + + "Default is 1.0 (no adjustment)."); + + private int maxBytes; + private double compressionRatio; + + @Override + public R apply(R record) { + if (record == null) { + return null; + } + + if (!(record.value() instanceof Struct)) { + return record; + } + + int rawEstimate = estimateRecordSizeBytes(record); + int currentSize = (int) Math.ceil(rawEstimate * compressionRatio); + if (currentSize <= maxBytes) { + return record; + } + + Struct value = (Struct) record.value(); + int excess = currentSize - maxBytes; + + truncateStructFields(value, "before", excess, currentSize); + truncateStructFields(value, "after", excess, currentSize); + + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + value, + record.timestamp()); + } + + private void truncateStructFields(Struct envelope, String fieldName, int excess, int totalSize) { + Schema envelopeSchema = envelope.schema(); + if (envelopeSchema.field(fieldName) == null) { + return; + } + Object fieldValue = envelope.get(fieldName); + if (!(fieldValue instanceof Struct)) { + return; + } + + Struct dataStruct = (Struct) fieldValue; + List truncatableFields = findTruncatableFields(dataStruct); + if (truncatableFields.isEmpty()) { + return; + } + + int totalTruncatableBytes = truncatableFields.stream() + .mapToInt(field -> field.sizeBytes) + .sum(); + + if (totalTruncatableBytes == 0) { + return; + } + + for (TruncatableField field : truncatableFields) { + double proportion = (double) field.sizeBytes / totalTruncatableBytes; + int bytesToRemove = (int) Math.ceil(proportion * excess); + int newSizeBytes = Math.max(0, field.sizeBytes - bytesToRemove); + + if (field.isString) { + String original = (String) field.value; + if (original.length() > newSizeBytes) { + dataStruct.put(field.fieldName, original.substring(0, newSizeBytes)); + } + } + else if (field.isBytes) { + ByteBuffer original = (ByteBuffer) field.value; + if (original.limit() > newSizeBytes) { + dataStruct.put(field.fieldName, ByteBuffer.wrap(Arrays.copyOfRange(original.array(), 0, newSizeBytes))); + } + } + } + } + + // Uses str.length() as approximation (assumes 1 byte per char). + // Not exact for multi-byte characters but avoids O(n) getBytes allocation per field. + private static int estimateStringSize(String str) { + return str.length(); + } + + private static int estimateByteBufferSize(ByteBuffer buffer) { + return buffer.limit(); + } + + private static int estimateByteArraySize(byte[] bytes) { + return bytes.length; + } + + private static int estimateObjectSize(Object value) { + if (value == null) { + return 0; + } + if (value instanceof String) { + return estimateStringSize((String) value); + } + if (value instanceof ByteBuffer) { + return estimateByteBufferSize((ByteBuffer) value); + } + if (value instanceof byte[]) { + return estimateByteArraySize((byte[]) value); + } + if (value instanceof Struct) { + return estimateStructSize((Struct) value); + } + return 8; + } + + private List findTruncatableFields(Struct dataStruct) { + List result = new ArrayList<>(); + Schema schema = dataStruct.schema(); + + for (Field field : schema.fields()) { + Object value = dataStruct.get(field); + if (value == null) { + continue; + } + + Schema.Type type = field.schema().type(); + + if (type == Schema.Type.STRING) { + result.add(new TruncatableField(field.name(), value, estimateStringSize((String) value), true, false)); + } + else if (type == Schema.Type.BYTES) { + result.add(new TruncatableField(field.name(), value, estimateByteBufferSize((ByteBuffer) value), false, true)); + } + } + + return result; + } + + public static > int estimateRecordSizeBytes(ConnectRecord record) { + int size = 0; + size += estimateObjectSize(record.key()); + size += estimateObjectSize(record.value()); + return size; + } + + private static int estimateStructSize(Struct struct) { + int size = 0; + Schema schema = struct.schema(); + for (Field field : schema.fields()) { + size += estimateObjectSize(field.name()); + Object value = struct.get(field); + size += estimateObjectSize(value); + } + return size; + } + + @Override + public String version() { + return Module.version(); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + @Override + public void configure(Map props) { + AbstractConfig config = new AbstractConfig(CONFIG_DEF, props); + int maxSize = config.getInt(MAX_BYTES_CONF); + if (maxSize <= 0) { + throw new ConfigException(MAX_BYTES_CONF, maxSize, "Must be a positive integer"); + } + this.maxBytes = maxSize; + + double ratio = config.getDouble(COMPRESSION_RATIO_CONF); + if (ratio <= 0) { + throw new ConfigException(COMPRESSION_RATIO_CONF, ratio, "Must be a positive number"); + } + this.compressionRatio = ratio; + } + + private static class TruncatableField { + final String fieldName; + final Object value; + final int sizeBytes; + final boolean isString; + final boolean isBytes; + + TruncatableField(String fieldName, Object value, int sizeBytes, boolean isString, boolean isBytes) { + this.fieldName = fieldName; + this.value = value; + this.sizeBytes = sizeBytes; + this.isString = isString; + this.isBytes = isBytes; + } + } +} diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 321ddb9a..0fff2817 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -718,6 +718,98 @@ public void shouldTruncateByteArray() throws Exception { assertInsert(INSERT_BYTES_TYPES_STMT, schemasAndValuesForStringTypesTruncatedBlob(), TestHelper.PK_FIELD); } + @Test + public void shouldTruncateProportionallyBasedOnColumnSize() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + // Insert one 1000-char column and one 1500-char column + String smallText = "a".repeat(1000); + String largeText = "b".repeat(1500); + String insertStmt = "INSERT INTO string_table (text_col, mediumtext_col) " + + "VALUES ('" + smallText + "', '" + largeText + "');"; + startConnector(builder -> builder + .with("transforms", "truncateColumn") + .with("transforms.truncateColumn.type", "io.debezium.connector.vitess.transforms.EnforceRecordSize") + .with("transforms.truncateColumn.max.bytes", "1000"), + false, + false, 1, -1, -1, null, + VitessConnectorConfig.SnapshotMode.NEVER, ""); + assertConnectorIsRunning(); + + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount); + consumer.expects(expectedRecordsCount); + executeAndWait(insertStmt); + + SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(insertStmt), TestHelper.PK_FIELD); + Struct value = (Struct) record.value(); + Struct after = value.getStruct("after"); + String textResult = after.getString("text_col"); + String mediumtextResult = after.getString("mediumtext_col"); + // Both must be truncated from originals + assertThat(textResult.length()).isLessThan(1000); + assertThat(mediumtextResult.length()).isLessThan(1500); + // Proportional: the larger column (1500) should be truncated more in absolute bytes + int textBytesRemoved = 1000 - textResult.length(); + int mediumtextBytesRemoved = 1500 - mediumtextResult.length(); + assertThat(mediumtextBytesRemoved).isGreaterThan(textBytesRemoved); + } + + @Test + public void shouldCombineWithStaticColumnTruncateConfig() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + // Insert large values: text_col and mediumtext_col both 800 chars + String largeText = "c".repeat(800); + String insertStmt = "INSERT INTO string_table (text_col, mediumtext_col) " + + "VALUES ('" + largeText + "', '" + largeText + "');"; + // Static truncation: text_col is pre-truncated to 10 chars via column.truncate.to.10.chars + // Dynamic truncation: max message size is 600 bytes + // Expected: text_col stays at 10 (already small), mediumtext_col gets dynamically truncated + startConnector(builder -> builder + .with("column.truncate.to.10.chars", + TEST_UNSHARDED_KEYSPACE + ".string_table.text_col") + .with("transforms", "truncateColumn") + .with("transforms.truncateColumn.type", "io.debezium.connector.vitess.transforms.EnforceRecordSize") + .with("transforms.truncateColumn.max.bytes", "600"), + false, + false, 1, -1, -1, null, + VitessConnectorConfig.SnapshotMode.NEVER, ""); + assertConnectorIsRunning(); + + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount); + consumer.expects(expectedRecordsCount); + executeAndWait(insertStmt); + + SourceRecord record = assertRecordInserted(topicNameFromInsertStmt(insertStmt), TestHelper.PK_FIELD); + Struct value = (Struct) record.value(); + Struct after = value.getStruct("after"); + String textResult = after.getString("text_col"); + String mediumtextResult = after.getString("mediumtext_col"); + // text_col was statically truncated to 10 chars first, and should remain small + assertThat(textResult.length()).isLessThanOrEqualTo(10); + // mediumtext_col should be dynamically truncated from 800 + assertThat(mediumtextResult.length()).isLessThan(800); + } + + @Test + public void shouldNotTruncateWhenMessageSizeWithinBounds() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + startConnector(builder -> builder + .with("transforms", "truncateColumn") + .with("transforms.truncateColumn.type", "io.debezium.connector.vitess.transforms.EnforceRecordSize") + .with("transforms.truncateColumn.max.bytes", "100000"), + false, + false, 1, -1, -1, null, + VitessConnectorConfig.SnapshotMode.NEVER, ""); + assertConnectorIsRunning(); + + int expectedRecordsCount = 1; + consumer = testConsumer(expectedRecordsCount); + consumer.expects(expectedRecordsCount); + // Uses standard string insert which has small values (2-char, etc.) + assertInsert(INSERT_STRING_TYPES_STMT, schemasAndValuesForStringTypes(), TestHelper.PK_FIELD); + } + @Test public void shouldConsumeEventsWithExcludedColumn() throws Exception { String columnToExlude = "mediumtext_col"; diff --git a/src/test/java/io/debezium/connector/vitess/transforms/EnforceRecordSizeTest.java b/src/test/java/io/debezium/connector/vitess/transforms/EnforceRecordSizeTest.java new file mode 100644 index 00000000..8bf0fd4e --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/transforms/EnforceRecordSizeTest.java @@ -0,0 +1,513 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.transforms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class EnforceRecordSizeTest { + + private EnforceRecordSize transform; + + @BeforeEach + public void setup() { + transform = new EnforceRecordSize<>(); + } + + @AfterEach + public void teardown() { + transform.close(); + } + + @Test + public void shouldNotTruncateWhenMessageSizeUnderLimit() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "10000"); + transform.configure(config); + + SourceRecord record = createRecordWithStringValue("short"); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getString("text_col")).isEqualTo("short"); + } + + @Test + public void shouldTruncateStringColumnWhenMessageExceedsLimit() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + String largeValue = "a".repeat(500); + SourceRecord record = createRecordWithStringValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + String truncatedValue = after.getString("text_col"); + assertThat(truncatedValue.length()).isLessThan(largeValue.length()); + } + + @Test + public void shouldTruncateBytesColumnWhenMessageExceedsLimit() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + byte[] largeValue = new byte[500]; + SourceRecord record = createRecordWithBytesValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + byte[] truncatedValue = after.getBytes("blob_col"); + assertThat(truncatedValue.length).isLessThan(largeValue.length); + } + + @Test + public void shouldTruncateLargestColumnFirst() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "250"); + transform.configure(config); + + String smallValue = "small"; + String largeValue = "x".repeat(400); + SourceRecord record = createRecordWithTwoStringColumns(smallValue, largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + String smallResult = after.getString("small_col"); + String largeResult = after.getString("large_col"); + assertThat(smallResult.length()).isGreaterThanOrEqualTo(largeResult.length() > 0 ? 1 : 0); + assertThat(largeResult.length()).isLessThan(largeValue.length()); + } + + @Test + public void shouldTruncateProportionally() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "300"); + transform.configure(config); + + String col1Value = "a".repeat(200); + String col2Value = "b".repeat(400); + SourceRecord record = createRecordWithTwoStringColumns(col1Value, col2Value); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + String col1Result = after.getString("small_col"); + String col2Result = after.getString("large_col"); + assertThat(col1Result.length()).isLessThan(col1Value.length()); + assertThat(col2Result.length()).isLessThan(col2Value.length()); + } + + @Test + public void shouldNotTruncateNonStringColumns() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + SourceRecord record = createRecordWithIntAndString(42, "x".repeat(500)); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getInt32("int_col")).isEqualTo(42); + String truncatedStr = after.getString("text_col"); + assertThat(truncatedStr.length()).isLessThan(500); + } + + @Test + public void shouldHandleNullValues() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "100"); + transform.configure(config); + + SourceRecord record = createRecordWithNullColumn(); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getString("text_col")).isNull(); + assertThat(after.getInt32("int_col")).isEqualTo(1); + } + + @Test + public void shouldHandleNullRecord() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + SourceRecord result = transform.apply(null); + assertThat(result).isNull(); + } + + @Test + public void shouldNotModifyRecordWithNoAfterField() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + SourceRecord record = createDeleteRecord("x".repeat(500)); + SourceRecord result = transform.apply(record); + + Struct value = (Struct) result.value(); + Struct before = value.getStruct("before"); + assertThat(before.getString("text_col").length()).isLessThan(500); + } + + @Test + public void shouldTruncateBothBeforeAndAfterForUpdate() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "300"); + transform.configure(config); + + SourceRecord record = createUpdateRecord("x".repeat(400), "y".repeat(400)); + SourceRecord result = transform.apply(record); + + Struct value = (Struct) result.value(); + Struct before = value.getStruct("before"); + Struct after = value.getStruct("after"); + assertThat(before.getString("text_col").length()).isLessThan(400); + assertThat(after.getString("text_col").length()).isLessThan(400); + } + + @Test + public void shouldRejectInvalidMaxMessageSize() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "0"); + + assertThrows(ConfigException.class, () -> transform.configure(config)); + } + + @Test + public void shouldRejectNegativeMaxMessageSize() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "-1"); + + assertThrows(ConfigException.class, () -> transform.configure(config)); + } + + @Test + public void shouldPassThroughNonStructRecords() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + SourceRecord record = new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, "plaintext"); + SourceRecord result = transform.apply(record); + assertThat(result.value()).isEqualTo("plaintext"); + } + + @Test + public void shouldRespectMessageOverheadInSizeCalculation() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "500"); + transform.configure(config); + + String largeValue = "a".repeat(1000); + SourceRecord record = createRecordWithStringValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + String truncatedValue = after.getString("text_col"); + assertThat(truncatedValue.length()).isLessThan(1000); + assertThat(estimateSize(result)).isLessThanOrEqualTo(500); + } + + @Test + public void shouldHandleMultiByteCharacters() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + String multiByteValue = "δΈ­".repeat(500); + SourceRecord record = createRecordWithStringValue(multiByteValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + String truncatedValue = after.getString("text_col"); + assertThat(truncatedValue.length()).isLessThan(multiByteValue.length()); + } + + @Test + public void shouldApplyCompressionRatioDefaultNoEffect() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + transform.configure(config); + + String largeValue = "a".repeat(500); + SourceRecord record = createRecordWithStringValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getString("text_col").length()).isLessThan(500); + } + + @Test + public void shouldApplyCompressionRatioReducesEffectiveSize() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "600"); + config.put(EnforceRecordSize.COMPRESSION_RATIO_CONF, "0.5"); + transform.configure(config); + + String largeValue = "a".repeat(1000); + SourceRecord record = createRecordWithStringValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getString("text_col")).isEqualTo(largeValue); + } + + @Test + public void shouldApplyCompressionRatioIncreasesEffectiveSize() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "2000"); + config.put(EnforceRecordSize.COMPRESSION_RATIO_CONF, "2.0"); + transform.configure(config); + + String largeValue = "a".repeat(1500); + SourceRecord record = createRecordWithStringValue(largeValue); + SourceRecord result = transform.apply(record); + + Struct after = getAfterStruct(result); + assertThat(after.getString("text_col").length()).isLessThan(1500); + } + + @Test + public void shouldRejectInvalidCompressionRatio() { + Map config = new HashMap<>(); + config.put(EnforceRecordSize.MAX_BYTES_CONF, "200"); + config.put(EnforceRecordSize.COMPRESSION_RATIO_CONF, "0"); + + assertThrows(ConfigException.class, () -> transform.configure(config)); + } + + @Test + public void shouldEstimateSizeReasonablyComparedToJsonSerialization() { + org.apache.kafka.connect.json.JsonConverter converter = new org.apache.kafka.connect.json.JsonConverter(); + converter.configure(Map.of("schemas.enable", "false", "converter.type", "value"), false); + + String largeValue = "a".repeat(1000); + Schema recordSchema = SchemaBuilder.struct() + .optional() + .field("text_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + Schema sourceSchema = SchemaBuilder.struct().optional().field("db", Schema.OPTIONAL_STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.OPTIONAL_STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + Struct afterStruct = new Struct(recordSchema).put("text_col", largeValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + SourceRecord record = new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + + byte[] serialized = converter.fromConnectData(record.topic(), record.valueSchema(), record.value()); + int actualSerializedSize = serialized.length; + int ourEstimate = EnforceRecordSize.estimateRecordSizeBytes(record); + + assertThat(ourEstimate).isGreaterThan(actualSerializedSize / 3); + assertThat(ourEstimate).isLessThan(actualSerializedSize * 2); + } + + private int estimateSize(SourceRecord record) { + return EnforceRecordSize.estimateRecordSizeBytes(record); + } + + private Schema createSimpleRecordSchema() { + return SchemaBuilder.struct() + .field("text_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + private Schema createBytesRecordSchema() { + return SchemaBuilder.struct() + .field("blob_col", Schema.OPTIONAL_BYTES_SCHEMA) + .build(); + } + + private Schema createTwoColumnSchema() { + return SchemaBuilder.struct() + .field("small_col", Schema.OPTIONAL_STRING_SCHEMA) + .field("large_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + private Schema createIntAndStringSchema() { + return SchemaBuilder.struct() + .field("int_col", Schema.OPTIONAL_INT32_SCHEMA) + .field("text_col", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + } + + private Struct getAfterStruct(SourceRecord record) { + Struct value = (Struct) record.value(); + return value.getStruct("after"); + } + + private SourceRecord createRecordWithStringValue(String textValue) { + Schema recordSchema = createSimpleRecordSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct afterStruct = new Struct(recordSchema).put("text_col", textValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createRecordWithBytesValue(byte[] blobValue) { + Schema recordSchema = createBytesRecordSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct afterStruct = new Struct(recordSchema).put("blob_col", ByteBuffer.wrap(blobValue)); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createRecordWithTwoStringColumns(String smallValue, String largeValue) { + Schema recordSchema = createTwoColumnSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct afterStruct = new Struct(recordSchema) + .put("small_col", smallValue) + .put("large_col", largeValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createRecordWithIntAndString(int intValue, String textValue) { + Schema recordSchema = createIntAndStringSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct afterStruct = new Struct(recordSchema) + .put("int_col", intValue) + .put("text_col", textValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createRecordWithNullColumn() { + Schema recordSchema = createIntAndStringSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct afterStruct = new Struct(recordSchema) + .put("int_col", 1) + .put("text_col", null); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("after", afterStruct) + .put("op", "c") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createDeleteRecord(String textValue) { + Schema recordSchema = createSimpleRecordSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct beforeStruct = new Struct(recordSchema).put("text_col", textValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("before", beforeStruct) + .put("op", "d") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } + + private SourceRecord createUpdateRecord(String beforeValue, String afterValue) { + Schema recordSchema = createSimpleRecordSchema(); + Schema sourceSchema = SchemaBuilder.struct().field("db", Schema.STRING_SCHEMA).build(); + Schema envelopeSchema = SchemaBuilder.struct() + .field("before", recordSchema) + .field("after", recordSchema) + .field("op", Schema.STRING_SCHEMA) + .field("source", sourceSchema) + .build(); + + Struct beforeStruct = new Struct(recordSchema).put("text_col", beforeValue); + Struct afterStruct = new Struct(recordSchema).put("text_col", afterValue); + Struct sourceStruct = new Struct(sourceSchema).put("db", "test"); + Struct envelope = new Struct(envelopeSchema) + .put("before", beforeStruct) + .put("after", afterStruct) + .put("op", "u") + .put("source", sourceStruct); + + return new SourceRecord(null, null, "topic", 0, envelopeSchema, envelope); + } +}