Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ storm.nimbus.zookeeper.acls.fixup: true

storm.auth.simple-white-list.users: []
storm.cluster.state.store: "org.apache.storm.cluster.ZKStateStorageFactory"
storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate"
storm.meta.serialization.delegate: "org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate"
storm.compression.zstd.level: 3
storm.compression.zstd.max.decompressed.bytes: 104857600
storm.compression.gzip.max.decompressed.bytes: 104857600
storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
storm.workers.artifacts.dir: "workers-artifacts"
storm.health.check.dir: "healthchecks"
Expand Down
124 changes: 124 additions & 0 deletions docs/Cluster-State-Serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
---
title: Cluster State Serialization
layout: documentation
documentation: true
---

This page describes how Storm serializes the *meta* state it persists in
ZooKeeper (and other configured state stores) such as topology assignments, Nimbus
summaries, `StormBase` records, log configs, credentials, worker heartbeats,
profile requests, errors, etc.

It is distinct from
[tuple serialization](Serialization.html), which covers payloads exchanged
between spouts and bolts at runtime via Kryo.

## Background

All cluster state writes go through `Utils.serialize(...)` /
`Utils.deserialize(...)`, which in turn delegate to a pluggable
`SerializationDelegate` selected by the
`storm.meta.serialization.delegate` config.

## Configuration

| Key | Default | Range | Description |
|---|---|---|---|
| `storm.meta.serialization.delegate` | `org.apache.storm.serialization.ZstdBridgeThriftSerializationDelegate` | any `SerializationDelegate` impl | Class used to (de)serialize cluster state. |
| `storm.compression.zstd.level` | `3` | `1`–`19` | Zstandard compression level. Higher = smaller + slower. Levels 20–22 are rejected by the validator. |
| `storm.compression.zstd.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any zstd-decompressed payload. |
| `storm.compression.gzip.max.decompressed.bytes` | `104857600` (100 MiB) | `> 0` | Hard cap on the size of any gzip-decompressed payload. Also enforced by `GzipSerializationDelegate`. |

## Choosing a delegate

* **`ZstdBridgeThriftSerializationDelegate`** *(default)* — recommended.
Writes zstd, reads anything previously written. Use this unless you
have a specific reason not to.
* **`ZstdThriftSerializationDelegate`** — pure zstd, refuses non-zstd
input. Only safe to deploy after every znode in your state store has
been rewritten by a bridge delegate (e.g. by submitting / killing each
topology, or by force-rewriting Nimbus state). Use only when you want
to *enforce* the new format.
* **`GzipBridgeThriftSerializationDelegate`** — legacy default; still
available for clusters that want to roll forward without touching the
codec.
* **`ThriftSerializationDelegate`** — raw Thrift.

## Migration to Zstandard compression

Starting with Apache Storm 3.X, Zstandard is supported as the default
compression codec for cluster state, replacing gzip for better
performance — faster compression and decompression at comparable or
better ratios. Earlier versions used `GzipThriftSerializationDelegate`,
wrapped by `GzipBridgeThriftSerializationDelegate` to allow rolling
upgrades from clusters that had previously stored raw Thrift bytes; the
new `ZstdBridgeThriftSerializationDelegate` plays the equivalent bridge
role for the gzip to zstd transition.

| Area | Gzip | Zstandard |
|---|---------------------------------------------------------|-----------------------------------------------------|
| Default delegate | `GzipThriftSerializationDelegate` (via `GzipBridge...`) | `ZstdBridgeThriftSerializationDelegate` |
| Compression codec | gzip (`java.util.zip`) | Zstandard (via `commons-compress` + `zstd-jni`) |
| Decompression bound | none | bounded (`BoundedInputStream`), default 100 MiB |
| Format detection | gzip magic only | gzip magic *and* zstd magic |
| Config validation | none for compression | `ZstdLevelValidator` (1–19), positive bounds checks |

### Zstandard `SerializationDelegate` implementations

* `ZstdThriftSerializationDelegate`: pure zstd Thrift codec. Serializes
any `TBase` with zstd at the configured level; deserialization
requires the input to begin with the zstd magic number
(`0xFD2FB528`).
* `ZstdBridgeThriftSerializationDelegate`: the new default, implemented to
allow rolling upgrades from clusters that had previously stored payloads
as gzip-compressed. Always *writes* zstd. On read, dispatches based on a
magic-byte sniff:

```
ZstdBridgeThriftSerializationDelegate.deserialize(bytes)
├── bytes starts with zstd magic (0xFD2FB528) delegates to ZstdThriftSerializationDelegate
└── otherwise, delegates to GzipBridgeThriftSerializationDelegate.deserialize(bytes)
├── bytes starts with gzip magic (0x1F8B) delegates to GzipThriftSerializationDelegate
└── otherwise delegates to ThriftSerializationDelegate (raw Thrift)
```

This delegation chain is the key property that makes the new default
rolling-upgrade safe: nodes running the new code can still read every
older payload that may already exist in ZooKeeper, while new writes use
zstd.

### Zip-bomb protection

`GzipUtils.decompress` and `ZstdUtils.decompress` (both in
`org.apache.storm.utils.Utils`) wrap the decompressor stream in an Apache
Commons `BoundedInputStream` with `maxCount` set to the configured cap.
After draining the bounded stream, the underlying decompressor is probed
with one extra `read()`; if any byte remains, the call fails with:

```
Decompression threshold exceeded! Possible security risk or invalid data size.
```

The same guard is applied to the legacy `GzipSerializationDelegate` (the
non-Thrift Java-serialization variant).

### Upgrading an existing cluster

1. **Roll Nimbus and Supervisors onto the new build.** The bridge
delegate is the default, so no config change is required for a safe
upgrade.
2. **(Optional) Tune `storm.compression.zstd.level`** if you want a
tighter compression / latency trade-off. Most state writes are
infrequent; level 3 is a good default.
3. **(Optional) Tune `storm.compression.zstd.max.decompressed.bytes`** if
you legitimately persist payloads larger than 100 MiB. The cap
guards against malformed or hostile data, raise it deliberately.
4. **(Optional) Switch to the strict `ZstdThriftSerializationDelegate`**
*only* after every legacy payload has been rewritten. The bridge
delegate is sufficient for the vast majority of deployments.

### Dependencies

The zstd codec is provided by Apache Commons Compress
(`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni`
native binding.
2 changes: 2 additions & 0 deletions docs/Serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ documentation: true
---
This page is about how the serialization system in Storm works for versions 0.6.0 and onwards. Storm used a different serialization system prior to 0.6.0 which is documented on [Serialization (prior to 0.6.0)](Serialization-\(prior-to-0.6.0\).html).

> This page covers **tuple** serialization (data flowing between spouts and bolts). For how Storm serializes the meta state it persists in ZooKeeper and related configuration, see [Cluster State Serialization](Cluster-State-Serialization.html).

Tuples can be comprised of objects of any types. Since Storm is a distributed system, it needs to know how to serialize and deserialize objects when they're passed between tasks.

Storm uses [Kryo](https://github.com/EsotericSoftware/kryo) for serialization. Kryo is a flexible and fast serialization library that produces small serializations.
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

<!-- dependency versions -->
<commons-compress.version>1.28.0</commons-compress.version>
<zstd-jni.version>1.5.7-8</zstd-jni.version>
<commons-io.version>2.22.0</commons-io.version>
<commons-lang3.version>3.20.0</commons-lang3.version>
<commons-exec.version>1.6.0</commons-exec.version>
Expand Down Expand Up @@ -514,6 +515,11 @@
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions storm-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
19 changes: 18 additions & 1 deletion storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,24 @@ public class Config extends HashMap<String, Object> {
*/
@IsString
public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate";

/**
* GZIP max decompression bytes. Defaults to 104857600 (100MB).
*/
@IsPositiveNumber(includeZero = false)
public static final String STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES = "storm.compression.gzip.max.decompressed.bytes";
/**
* Zstandard compression level.
* Supported range: 1 to 19. Default: 3.
* <b>Prohibited:</b> Levels 20-22 (Ultra mode) are not allowed as they
* require dramatically more working memory per call.
*/
@CustomValidator(validatorClass = ConfigValidation.ZstdLevelValidator.class)
public static final String STORM_COMPRESSION_ZSTD_LEVEL = "storm.compression.zstd.level";
/**
* Zstandard max decompression bytes. Defaults to 104857600 (100MB).
*/
@IsPositiveNumber(includeZero = false)
public static final String STORM_COMPRESSION_ZSTD_MAX_DECOMPRESSED_BYTES = "storm.compression.zstd.max.decompressed.bytes";
/**
* Configure the topology metrics reporters to be used on workers.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -20,15 +20,22 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.storm.Config;
import org.apache.storm.shade.org.apache.commons.io.input.BoundedInputStream;
import org.apache.storm.utils.ObjectReader;

/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
*/
public class GzipSerializationDelegate implements SerializationDelegate {

private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
private int maxDecompressedBytes;

@Override
public void prepare(Map<String, Object> topoConf) {
// No-op
this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
DEFAULT_MAX_DECOMPRESSED_BYTES));
}

@Override
Expand All @@ -47,17 +54,21 @@ public byte[] serialize(Object object) {

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
GZIPInputStream gis = new GZIPInputStream(bis);
ObjectInputStream ois = new ObjectInputStream(gis);
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
GZIPInputStream gis = new GZIPInputStream(bis);
BoundedInputStream lis = BoundedInputStream.builder()
.setMaxCount(this.maxDecompressedBytes)
.setInputStream(gis)
.setPropagateClose(true)
.get();
ObjectInputStream ois = new ObjectInputStream(lis)) {
Object ret = ois.readObject();
ois.close();
return (T) ret;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
if (gis.read() != -1) {
throw new IOException("Decompression threshold exceeded! Possible security risk or invalid data size.");
}
return clazz.cast(ret);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Deserialization failed: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,32 @@
package org.apache.storm.serialization;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TException;
import org.apache.storm.thrift.TSerializer;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;

/**
* Note, this assumes it's deserializing a gzip byte stream, and will err if it encounters any other serialization.
*/
public class GzipThriftSerializationDelegate implements SerializationDelegate {

private static final int DEFAULT_MAX_DECOMPRESSED_BYTES = 100 * 1024 * 1024;
private int maxDecompressedBytes;

@Override
public void prepare(Map<String, Object> topoConf) {
// No-op
this.maxDecompressedBytes = ObjectReader.getInt(topoConf.getOrDefault(Config.STORM_COMPRESSION_GZIP_MAX_DECOMPRESSED_BYTES,
DEFAULT_MAX_DECOMPRESSED_BYTES));
}

@Override
public byte[] serialize(Object object) {
try {
return Utils.gzip(new TSerializer().serialize((TBase) object));
return Utils.GzipUtils.compress(new TSerializer().serialize((TBase) object));
} catch (TException e) {
throw new RuntimeException(e);
}
Expand All @@ -48,7 +54,7 @@ public byte[] serialize(Object object) {
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try {
TBase instance = (TBase) clazz.newInstance();
new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
new TDeserializer().deserialize(instance, Utils.GzipUtils.decompress(bytes, this.maxDecompressedBytes));
return (T) instance;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.serialization;

import java.util.Map;
import org.apache.storm.utils.Utils;

/**
* Always writes Zstd out, but tests incoming bytes to determine the format.
* If Zstd magic is found, it uses {@link ZstdThriftSerializationDelegate}.
* If not, it falls back to {@link ThriftSerializationDelegate} for raw Thrift.
*/
public class ZstdBridgeThriftSerializationDelegate implements SerializationDelegate {

private final GzipBridgeThriftSerializationDelegate defaultDelegate = new GzipBridgeThriftSerializationDelegate();
private final ZstdThriftSerializationDelegate zstdDelegate = new ZstdThriftSerializationDelegate();

@Override
public void prepare(Map<String, Object> topoConf) {
defaultDelegate.prepare(topoConf);
zstdDelegate.prepare(topoConf);
}

@Override
public byte[] serialize(Object object) {
// Always compress new data with Zstd
return zstdDelegate.serialize(object);
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
if (Utils.ZstdUtils.isZstd(bytes)) {
return zstdDelegate.deserialize(bytes, clazz);
} else {
// Fallback to ZstdBridgeThriftSerializationDelegate
// it delegates to the proper SerializationDelegate (GzipThriftSerializationDelegate or ThriftSerializationDelegate)
return defaultDelegate.deserialize(bytes, clazz);
}
}
}
Loading
Loading