Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions tests/wrappers_tests/test_send_errors_and_concurrency.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
from concurrent.futures import ThreadPoolExecutor

import pytest
Expand All @@ -13,6 +14,7 @@
assert_event_invariants,
create_message_bindings,
get_node_multiaddr,
wait_for_connected,
wait_for_propagated,
wait_for_sent,
wait_for_error,
Expand All @@ -30,6 +32,12 @@
ERROR_AFTER_CACHE_EXPIRY_TIMEOUT_S = MAX_TIME_IN_CACHE_S + CACHE_EXPIRY_SLACK_S
RETRY_WINDOW_EXPIRED_MSG = "Unable to send within retry time window"

# Payload above DefaultMaxWakuMessageSize (150KiB), so the relay publish
# rejects it instead of failing with NO_PEERS_TO_RELAY.
OVERSIZED_PAYLOAD_BYTES = 200 * 1024
ERROR_TIMEOUT_S = 30.0
MESSAGE_SIZE_EXCEEDED_MSG = "Message size exceeded"

# S30: concurrent sends on the same content topic during initial auto-subscribe.
S30_CONCURRENT_SENDS = 5
S30_CONTENT_TOPIC = "/test/1/s30-concurrent/proto"
Expand Down Expand Up @@ -160,6 +168,79 @@ def test_s21_error_when_retry_window_expires(self, node_config):
assert_event_invariants(sender_collector, request_id)


class TestS13RelayHardFailureWithoutFallback(StepsCommon):
"""
S13: relay path is reachable (a relay peer is connected, so the publish
gets past NO_PEERS_TO_RELAY), but the relay publish fails for another
reason. An oversized payload is used so the relay processor rejects the
message immediately. No lightpush fallback is configured.
- Expected: Ok(RequestId), then a message_error event.
"""

def test_s13_relay_hard_failure_without_fallback(self, node_config):
sender_collector = EventCollector()

node_config.update(
{
"relay": True,
"numShardsInNetwork": 1,
}
)

sender_result = WrapperManager.create_and_start(
config=node_config,
event_cb=sender_collector.event_callback,
)
assert sender_result.is_ok(), f"Failed to start sender: {sender_result.err()}"

with sender_result.ok_value as sender_node:
relay_config = {
**node_config,
"staticnodes": [get_node_multiaddr(sender_node)],
"portsShift": 1,
}

relay_result = WrapperManager.create_and_start(config=relay_config)
assert relay_result.is_ok(), f"Failed to start relay peer: {relay_result.err()}"

with relay_result.ok_value:
# A connected relay peer means the publish gets past
# NO_PEERS_TO_RELAY and actually reaches the relay processor.
assert wait_for_connected(sender_collector) is not None, (
f"Sender did not reach Connected/PartiallyConnected. " f"Collected events: {sender_collector.events}"
)

oversized_payload = base64.b64encode(b"x" * OVERSIZED_PAYLOAD_BYTES).decode()
message = create_message_bindings(
payload=oversized_payload,
contentTopic="/test/1/s13-relay-hard-failure/proto",
)

send_result = sender_node.send_message(message=message)
assert send_result.is_ok(), f"send() must return Ok(RequestId), got: {send_result.err()}"

request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"

error_event = wait_for_error(
collector=sender_collector,
request_id=request_id,
timeout_s=ERROR_TIMEOUT_S,
)
assert error_event is not None, (
f"No message_error event within {ERROR_TIMEOUT_S}s from the " f"relay processor. Collected events: {sender_collector.events}"
)
assert error_event["requestId"] == request_id
assert MESSAGE_SIZE_EXCEEDED_MSG in (error_event.get("error") or ""), (
f"Expected error to contain {MESSAGE_SIZE_EXCEEDED_MSG!r}.\n" f"Got: {error_event.get('error')!r}\n" f"Full event: {error_event}"
)

propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert propagated is None, f"Unexpected message_propagated event for a failed relay publish: {propagated}"

assert_event_invariants(sender_collector, request_id)


class TestS30ConcurrentSendsDuringAutoSubscribe(StepsCommon):
"""
S30: concurrent sends on the same content topic during initial auto-subscribe.
Expand Down
88 changes: 88 additions & 0 deletions tests/wrappers_tests/test_send_lightpush_and_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,94 @@ def test_s15_lightpush_retryable_error_then_recovery(self):
assert_event_invariants(sender_collector, request_id)


class TestS16LightpushPeerAppearsLater(StepsCommon):
"""
S16 — No delivery peers at T0, lightpush peer appears later.
The edge sender has the lightpush service in its staticnodes, but the
service is stopped before the sender starts, so there is no reachable
delivery peer at T0. send() is called while the service is down. The
service is restarted during the retry window; the sender connects to it
and a later retry delivers the message.
Expected: send() returns Ok(RequestId), then eventually Propagated.
"""

@pytest.mark.xfail(reason="binding cannot restart a node or add peers at runtime")
def test_s16_lightpush_peer_appears_later(self):
sender_collector = EventCollector()

common = {
"store": False,
"filter": False,
"discv5Discovery": False,
"numShardsInNetwork": 1,
}

# Start the lightpush service once to obtain its multiaddr, then stop
# it so the sender has no reachable peer at T0. The same node object
# is restarted later, so the address stays valid.
service_config = build_node_config(relay=True, lightpush=True, **common)
service_result = WrapperManager.create_and_start(config=service_config)
assert service_result.is_ok(), f"Failed to start lightpush peer: {service_result.err()}"

with service_result.ok_value as service:
service_multiaddr = get_node_multiaddr(service)

stop_result = service.stop_node()
assert stop_result.is_ok(), f"Failed to stop lightpush peer: {stop_result.err()}"
delay(SERVICE_DOWN_SETTLE_S)

# Edge sender is a lightpush client; its only peer is the service,
# which is currently down.
edge_config = build_node_config(
mode="Edge",
lightpush=True,
staticnodes=[service_multiaddr],
**common,
)
edge_result = WrapperManager.create_and_start(
config=edge_config,
event_cb=sender_collector.event_callback,
)
assert edge_result.is_ok(), f"Failed to start edge sender: {edge_result.err()}"

with edge_result.ok_value as edge_sender:
# send() is invoked while the service is down.
msg = create_message_bindings(
payload=to_base64("S16 lightpush peer appears later"),
contentTopic="/test/1/s16-late-lightpush/proto",
)
send_result = edge_sender.send_message(message=msg)
assert send_result.is_ok(), f"send() failed: {send_result.err()}"
request_id = send_result.ok_value
assert request_id, "send() returned an empty RequestId"

delay(SERVICE_DOWN_SETTLE_S)

early_propagated = wait_for_propagated(sender_collector, request_id, timeout_s=0)
assert early_propagated is None, f"message_propagated arrived before the lightpush peer was reachable: {early_propagated}"

# The lightpush peer comes back during the retry window.
restart_result = service.start_node()
assert restart_result.is_ok(), f"Failed to restart lightpush peer: {restart_result.err()}"

propagated = wait_for_propagated(
collector=sender_collector,
request_id=request_id,
timeout_s=RECOVERY_TIMEOUT_S,
)
assert propagated is not None, (
f"No message_propagated within {RECOVERY_TIMEOUT_S}s "
f"after the lightpush peer joined. "
f"Collected events: {sender_collector.events}"
)
assert propagated["requestId"] == request_id

error = wait_for_error(sender_collector, request_id, timeout_s=0)
assert error is None, f"Unexpected message_error after recovery: {error}"

assert_event_invariants(sender_collector, request_id)


class TestS26LightpushPeerChurn(StepsCommon):
"""
S26: multiple lightpush peers, the selected one disappears,
Expand Down
Loading