diff --git a/crates/bifrost/benches/replicated_loglet_serde.rs b/crates/bifrost/benches/replicated_loglet_serde.rs index 5100524995..3f46cf9e19 100644 --- a/crates/bifrost/benches/replicated_loglet_serde.rs +++ b/crates/bifrost/benches/replicated_loglet_serde.rs @@ -21,6 +21,7 @@ use pprof::flamegraph::Options; use prost::Message as _; use rand::distr::Alphanumeric; use rand::{Rng, RngCore, random}; +use restate_types::ReString; use restate_bifrost::InputRecord; use restate_core::network::protobuf::network::message::Body; @@ -68,7 +69,7 @@ fn invoke_cmd() -> Command { let idempotency_key: ByteString = rand_string(15).into(); let request_id = PartitionProcessorRpcRequestId::new(); let inv_source = restate_types::invocation::Source::Ingress(request_id); - let handler: ByteString = format!("aFunction_{}", rand_string(10)).into(); + let handler: ReString = format!("aFunction_{}", rand_string(10)).into(); Command::Invoke(Box::new(ServiceInvocation { invocation_id: InvocationId::generate( @@ -104,7 +105,7 @@ fn invoke_cmd() -> Command { fn invoker_effect_cmd() -> Command { let idempotency_key: ByteString = rand_string(15).into(); - let handler: ByteString = format!("aFunction_{}", rand_string(10)).into(); + let handler: ReString = format!("aFunction_{}", rand_string(10)).into(); let mut data = [0u8; 128]; rand::rng().fill_bytes(&mut data); diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index 0e24fe2b8d..709bbe01a8 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -525,7 +525,8 @@ where self.invocation_task .invocation_target .key() - .map(|bs| bs.as_bytes().clone()), + .cloned() + .map(Into::into), journal_size, partial_state, state_map, diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 7aa391b33a..60ed386ff6 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -86,6 +86,32 @@ const RATE_LIMITED_CODES: [StatusCode; 2] = [ StatusCode::TOO_MANY_REQUESTS, ]; +/// Convert prost-decoded bytes from the SDK into an `InvokeRequest`, validating UTF-8. +#[allow(clippy::too_many_arguments)] +fn try_invoke_request_from_call( + service_name: Bytes, + handler_name: Bytes, + key: Bytes, + parameter: Bytes, + headers: Vec
, + idempotency_key: Option, + scope: Option, + limit_key: Option, + span_relation: SpanRelation, +) -> Result { + Ok(InvokeRequest { + service_name: ReString::try_from(service_name)?, + handler_name: ReString::try_from(handler_name)?, + parameter, + headers, + key: ReString::try_from(key)?, + idempotency_key, + scope: scope.map(ReString::try_from).transpose()?, + limit_key: limit_key.map(ReString::try_from).transpose()?, + span_relation, + }) +} + /// Runs the interaction between the server and the service endpoint. pub struct ServiceProtocolRunner<'a, EE, Schemas> { invocation_task: &'a mut InvocationTask, @@ -706,7 +732,8 @@ where self.invocation_task .invocation_target .key() - .map(|bs| bs.as_bytes().clone()), + .cloned() + .map(Into::into), journal_size, partial_state, state_map, @@ -724,7 +751,8 @@ where self.invocation_task .invocation_target .key() - .map(|bs| bs.as_bytes().clone()), + .cloned() + .map(Into::into), journal_size, partial_state, state_map, @@ -1063,22 +1091,21 @@ where let name = cmd.name; let entry: Entry = OneWayCallCommand { request: shortcircuit!( - resolve_call_request( - self.invocation_task.schemas.live_load(), - InvokeRequest { - service_name: cmd.service_name.into(), - handler_name: cmd.handler_name.into(), - parameter: cmd.parameter, - headers: cmd.headers.into_iter().map(Into::into).collect(), - key: cmd.key.into(), - idempotency_key: cmd.idempotency_key.map(|s| s.into()), - scope: cmd.scope, - limit_key: cmd.limit_key, - span_relation: SpanRelation::Linked( - attempt_span.span_context().clone().into() - ) - } + try_invoke_request_from_call( + cmd.service_name, + cmd.handler_name, + cmd.key, + cmd.parameter, + cmd.headers.into_iter().map(Into::into).collect(), + cmd.idempotency_key.map(|s| s.into()), + cmd.scope, + cmd.limit_key, + SpanRelation::Linked(attempt_span.span_context().clone().into()), ) + .and_then(|req| resolve_call_request( + self.invocation_task.schemas.live_load(), + req + )) .map_err(|e| InvokerError::CommandPrecondition( self.command_index, EntryType::Command(CommandType::OneWayCall), @@ -1105,22 +1132,21 @@ where let name = cmd.name; let entry: Entry = CallCommand { request: shortcircuit!( - resolve_call_request( - self.invocation_task.schemas.live_load(), - InvokeRequest { - service_name: cmd.service_name.into(), - handler_name: cmd.handler_name.into(), - parameter: cmd.parameter, - headers: cmd.headers.into_iter().map(Into::into).collect(), - key: cmd.key.into(), - idempotency_key: cmd.idempotency_key.map(|s| s.into()), - scope: cmd.scope, - limit_key: cmd.limit_key, - span_relation: SpanRelation::Parent( - attempt_span.span_context().clone().into() - ) - } + try_invoke_request_from_call( + cmd.service_name, + cmd.handler_name, + cmd.key, + cmd.parameter, + cmd.headers.into_iter().map(Into::into).collect(), + cmd.idempotency_key.map(|s| s.into()), + cmd.scope, + cmd.limit_key, + SpanRelation::Parent(attempt_span.span_context().clone().into()), ) + .and_then(|req| resolve_call_request( + self.invocation_task.schemas.live_load(), + req + )) .map_err(|e| InvokerError::CommandPrecondition( self.command_index, EntryType::Command(CommandType::Call), @@ -1476,16 +1502,16 @@ where } pub struct InvokeRequest { - service_name: ByteString, - handler_name: ByteString, + service_name: ReString, + handler_name: ReString, headers: Vec
, /// Empty if service call. /// The reason this is not Option is that it cannot be distinguished purely from the message /// whether the key is none or empty. - key: ByteString, + key: ReString, idempotency_key: Option, - scope: Option, - limit_key: Option, + scope: Option, + limit_key: Option, span_relation: SpanRelation, parameter: Bytes, } diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index a4dddec643..de70897cb0 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -18,7 +18,7 @@ use std::collections::HashSet; use std::sync::LazyLock; use std::time::Duration; -use bytestring::ByteString; +use restate_util_string::ReString; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, JournalMetadata, ReadInvocationStatusTable, @@ -33,9 +33,9 @@ use restate_types::journal_v2::UnresolvedFuture; use restate_types::time::MillisSinceEpoch; const INVOCATION_TARGET_1: InvocationTarget = InvocationTarget::VirtualObject { - name: ByteString::from_static("abc"), - key: ByteString::from_static("1"), - handler: ByteString::from_static("myhandler"), + name: ReString::from_static("abc"), + key: ReString::from_static("1"), + handler: ReString::from_static("myhandler"), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }; @@ -43,9 +43,9 @@ static INVOCATION_ID_1: LazyLock = LazyLock::new(|| InvocationId::mock_generate(&INVOCATION_TARGET_1)); const INVOCATION_TARGET_2: InvocationTarget = InvocationTarget::VirtualObject { - name: ByteString::from_static("abc"), - key: ByteString::from_static("2"), - handler: ByteString::from_static("myhandler"), + name: ReString::from_static("abc"), + key: ReString::from_static("2"), + handler: ReString::from_static("myhandler"), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }; @@ -53,9 +53,9 @@ static INVOCATION_ID_2: LazyLock = LazyLock::new(|| InvocationId::mock_generate(&INVOCATION_TARGET_2)); const INVOCATION_TARGET_3: InvocationTarget = InvocationTarget::VirtualObject { - name: ByteString::from_static("abc"), - key: ByteString::from_static("3"), - handler: ByteString::from_static("myhandler"), + name: ReString::from_static("abc"), + key: ReString::from_static("3"), + handler: ReString::from_static("myhandler"), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }; @@ -63,9 +63,9 @@ static INVOCATION_ID_3: LazyLock = LazyLock::new(|| InvocationId::mock_generate(&INVOCATION_TARGET_3)); const INVOCATION_TARGET_4: InvocationTarget = InvocationTarget::VirtualObject { - name: ByteString::from_static("abc"), - key: ByteString::from_static("4"), - handler: ByteString::from_static("myhandler"), + name: ReString::from_static("abc"), + key: ReString::from_static("4"), + handler: ReString::from_static("myhandler"), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }; diff --git a/crates/partition-store/src/tests/journal_table_test/mod.rs b/crates/partition-store/src/tests/journal_table_test/mod.rs index 2406e2192e..152750686a 100644 --- a/crates/partition-store/src/tests/journal_table_test/mod.rs +++ b/crates/partition-store/src/tests/journal_table_test/mod.rs @@ -13,8 +13,8 @@ use std::sync::LazyLock; use std::time::Duration; use bytes::Bytes; -use bytestring::ByteString; use futures_util::StreamExt; +use restate_util_string::ReString; use restate_rocksdb::RocksDbManager; use restate_storage_api::Transaction; @@ -43,8 +43,8 @@ static MOCK_INVOKE_JOURNAL_ENTRY: LazyLock = LazyLock::new(|| { enrichment_result: Some(CallEnrichmentResult { invocation_id: InvocationId::from_parts(789, InvocationUuid::from_u128(456)), invocation_target: InvocationTarget::Service { - name: ByteString::from_static("MySvc"), - handler: ByteString::from_static("MyHandler"), + name: ReString::from_static("MySvc"), + handler: ReString::from_static("MyHandler"), scope: None, }, completion_retention_time: Some(Duration::from_secs(10)), diff --git a/crates/partition-store/src/tests/journal_table_v2_test/mod.rs b/crates/partition-store/src/tests/journal_table_v2_test/mod.rs index 4e480f9ec1..7848d4e383 100644 --- a/crates/partition-store/src/tests/journal_table_v2_test/mod.rs +++ b/crates/partition-store/src/tests/journal_table_v2_test/mod.rs @@ -31,6 +31,7 @@ use restate_types::journal_v2::{ }; use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; +use restate_util_string::ReString; const MOCK_INVOCATION_ID_1: InvocationId = InvocationId::from_parts(1, InvocationUuid::from_u128(12345678900001)); @@ -55,8 +56,8 @@ fn mock_call_command( request: CallRequest { invocation_id: InvocationId::from_parts(789, InvocationUuid::from_u128(456)), invocation_target: InvocationTarget::Service { - name: ByteString::from_static("MySvc"), - handler: ByteString::from_static("MyHandler"), + name: ReString::from_static("MySvc"), + handler: ReString::from_static("MyHandler"), scope: None, }, span_context: ServiceInvocationSpanContext::empty(), @@ -78,8 +79,8 @@ fn mock_one_way_call_command(invocation_id_completion_id: CompletionId) -> Entry request: CallRequest { invocation_id: InvocationId::from_parts(789, InvocationUuid::from_u128(456)), invocation_target: InvocationTarget::Service { - name: ByteString::from_static("MySvc"), - handler: ByteString::from_static("MyHandler"), + name: ReString::from_static("MySvc"), + handler: ReString::from_static("MyHandler"), scope: None, }, span_context: ServiceInvocationSpanContext::empty(), diff --git a/crates/service-protocol-v4/src/entry_codec.rs b/crates/service-protocol-v4/src/entry_codec.rs index 93e4a75bae..d430db8602 100644 --- a/crates/service-protocol-v4/src/entry_codec.rs +++ b/crates/service-protocol-v4/src/entry_codec.rs @@ -37,7 +37,7 @@ use restate_types::journal_v2::raw::{ RawNotificationResultVariant, }; use restate_types::{LimitKey, Scope, journal_v2::*}; -use restate_util_string::RestateString; +use restate_util_string::{ReString, RestateString}; use crate::proto; use crate::proto::{ @@ -133,20 +133,17 @@ impl Encoder for ServiceProtocolV4Codec { })) => RawCommand::new( CommandType::Call, proto::CallCommandMessage { - service_name: invocation_target.service_name().to_string(), - handler_name: invocation_target.handler_name().to_string(), + service_name: invocation_target.service_name().clone().into(), + handler_name: invocation_target.handler_name().clone().into(), parameter, headers: headers.into_iter().map(Into::into).collect(), - key: invocation_target - .key() - .unwrap_or(&ByteString::new()) - .to_string(), + key: invocation_target.key().cloned().unwrap_or_default().into(), idempotency_key: idempotency_key.map(|s| s.to_string()), - scope: invocation_target.scope().map(ToString::to_string), + scope: invocation_target.scope().map(|s| s.to_restring().into()), limit_key: if limit_key == LimitKey::None { None } else { - Some(limit_key.to_string()) + Some(Bytes::from(limit_key.to_string())) }, name: name.to_string(), invocation_id_notification_idx: invocation_id_completion_id, @@ -184,21 +181,18 @@ impl Encoder for ServiceProtocolV4Codec { })) => RawCommand::new( CommandType::OneWayCall, proto::OneWayCallCommandMessage { - service_name: invocation_target.service_name().to_string(), - handler_name: invocation_target.handler_name().to_string(), + service_name: invocation_target.service_name().clone().into(), + handler_name: invocation_target.handler_name().clone().into(), parameter, invoke_time: invoke_time.as_u64(), headers: headers.into_iter().map(Into::into).collect(), - key: invocation_target - .key() - .unwrap_or(&ByteString::new()) - .to_string(), + key: invocation_target.key().cloned().unwrap_or_default().into(), idempotency_key: idempotency_key.map(|s| s.to_string()), - scope: invocation_target.scope().map(ToString::to_string), + scope: invocation_target.scope().map(|s| s.to_restring().into()), limit_key: if limit_key == LimitKey::None { None } else { - Some(limit_key.to_string()) + Some(Bytes::from(limit_key.to_string())) }, name: name.to_string(), invocation_id_notification_idx: invocation_id_completion_id, @@ -654,6 +648,18 @@ macro_rules! to_string_or_bail { }; } +// Safe Bytes -> ReString conversion. Validates UTF-8 (input from SDK is untrusted). +macro_rules! to_restring_or_bail { + ($field:expr) => { + ReString::try_from($field).map_err(|e| { + DecodingError::from(GenericError::from(BadFieldError( + stringify!($field), + e.into(), + ))) + })? + }; +} + macro_rules! to_invocation_id_or_bail { ($field:ident) => { InvocationId::from_str(&$field).map_err(|e| { @@ -748,7 +754,9 @@ impl Decoder for ServiceProtocolV4Codec { completion_retention_duration: metadata.completion_retention_duration, journal_retention_duration: metadata.journal_retention_duration, limit_key: if let Some(limit_key) = limit_key { - limit_key.parse().map_err(GenericError::from)? + to_restring_or_bail!(limit_key) + .parse() + .map_err(GenericError::from)? } else { LimitKey::None }, @@ -786,7 +794,9 @@ impl Decoder for ServiceProtocolV4Codec { completion_retention_duration: metadata.completion_retention_duration, journal_retention_duration: metadata.journal_retention_duration, limit_key: if let Some(limit_key) = limit_key { - limit_key.parse().map_err(GenericError::from)? + to_restring_or_bail!(limit_key) + .parse() + .map_err(GenericError::from)? } else { LimitKey::None }, @@ -1619,17 +1629,17 @@ impl From for proto::attach_invocation_command_message:: AttachInvocationTarget::InvocationId(id) => Self::InvocationId(id.to_string()), AttachInvocationTarget::IdempotentRequest(id) => { Self::IdempotentRequestTarget(proto::IdempotentRequestTarget { - service_name: id.service_name.into(), - service_key: id.service_key.map(Into::into), - handler_name: id.service_handler.into(), + service_name: id.service_name.into_bytes(), + service_key: id.service_key.map(ByteString::into_bytes), + handler_name: id.service_handler.into_bytes(), idempotency_key: id.idempotency_key.into(), - scope: id.scope.map(|s| s.to_string()), + scope: id.scope.map(|s| s.to_restring().into()), }) } AttachInvocationTarget::Workflow(id) => Self::WorkflowTarget(proto::WorkflowTarget { - workflow_name: id.service_name.into(), - workflow_key: id.key.into(), - scope: id.scope.map(|s| s.to_string()), + workflow_name: id.service_name.into_bytes(), + workflow_key: id.key.into_bytes(), + scope: id.scope.map(|s| s.to_restring().into()), }), } } @@ -1650,31 +1660,41 @@ impl TryFrom for AttachInvocat proto::attach_invocation_command_message::Target::IdempotentRequestTarget( idempotent_request, ) => { - // Safety: Before we accept an idempotent request we validate in - // ServiceProtocolRunner::handle_message that the scope value is valid. let scope = idempotent_request .scope - .as_ref() - .map(|scope| unsafe { Scope::new_unchecked(scope) }); + .map(|scope| { + let s = to_restring_or_bail!(scope); + Scope::try_from_restring(s).map_err(GenericError::from) + }) + .transpose()?; Self::IdempotentRequest(IdempotencyId::new( - idempotent_request.service_name.into(), - idempotent_request.service_key.map(Into::into), - idempotent_request.handler_name.into(), + ByteString::try_from(idempotent_request.service_name) + .map_err(GenericError::from)?, + idempotent_request + .service_key + .map(ByteString::try_from) + .transpose() + .map_err(GenericError::from)?, + ByteString::try_from(idempotent_request.handler_name) + .map_err(GenericError::from)?, idempotent_request.idempotency_key.into(), scope, )) } proto::attach_invocation_command_message::Target::WorkflowTarget(workflow_target) => { - // Safety: Before we accept a workflow target we validate in - // ServiceProtocolRunner::handle_message that the scope value is valid. let scope = workflow_target .scope - .as_ref() - .map(|scope| unsafe { Scope::new_unchecked(scope) }); + .map(|scope| { + let s = to_restring_or_bail!(scope); + Scope::try_from_restring(s).map_err(GenericError::from) + }) + .transpose()?; Self::Workflow(ServiceId::new( scope, - workflow_target.workflow_name, - workflow_target.workflow_key, + ByteString::try_from(workflow_target.workflow_name) + .map_err(GenericError::from)?, + ByteString::try_from(workflow_target.workflow_key) + .map_err(GenericError::from)?, )) } }) @@ -1687,17 +1707,17 @@ impl From for proto::get_invocation_output_command_messa AttachInvocationTarget::InvocationId(id) => Self::InvocationId(id.to_string()), AttachInvocationTarget::IdempotentRequest(id) => { Self::IdempotentRequestTarget(proto::IdempotentRequestTarget { - service_name: id.service_name.into(), - service_key: id.service_key.map(Into::into), - handler_name: id.service_handler.into(), + service_name: id.service_name.into_bytes(), + service_key: id.service_key.map(ByteString::into_bytes), + handler_name: id.service_handler.into_bytes(), idempotency_key: id.idempotency_key.into(), - scope: id.scope.map(|s| s.to_string()), + scope: id.scope.map(|s| s.to_restring().into()), }) } AttachInvocationTarget::Workflow(id) => Self::WorkflowTarget(proto::WorkflowTarget { - workflow_name: id.service_name.into(), - workflow_key: id.key.into(), - scope: id.scope.map(|s| s.to_string()), + workflow_name: id.service_name.into_bytes(), + workflow_key: id.key.into_bytes(), + scope: id.scope.map(|s| s.to_restring().into()), }), } } @@ -1718,15 +1738,23 @@ impl TryFrom for AttachInv proto::get_invocation_output_command_message::Target::IdempotentRequestTarget( idempotent_request, ) => { - // Safety: Before we accept an idempotent request we validate in - // ServiceProtocolRunner::handle_message that the scope value is valid. let scope = idempotent_request .scope - .map(|ref scope| unsafe { Scope::new_unchecked(scope) }); + .map(|scope| { + let s = to_restring_or_bail!(scope); + Scope::try_from_restring(s).map_err(GenericError::from) + }) + .transpose()?; Self::IdempotentRequest(IdempotencyId::new( - idempotent_request.service_name.into(), - idempotent_request.service_key.map(Into::into), - idempotent_request.handler_name.into(), + ByteString::try_from(idempotent_request.service_name) + .map_err(GenericError::from)?, + idempotent_request + .service_key + .map(ByteString::try_from) + .transpose() + .map_err(GenericError::from)?, + ByteString::try_from(idempotent_request.handler_name) + .map_err(GenericError::from)?, idempotent_request.idempotency_key.into(), scope, )) @@ -1734,15 +1762,19 @@ impl TryFrom for AttachInv proto::get_invocation_output_command_message::Target::WorkflowTarget( workflow_target, ) => { - // Safety: Before we accept a workflow target we validate in - // ServiceProtocolRunner::handle_message that the scope value is valid. let scope = workflow_target .scope - .map(|ref scope| unsafe { Scope::new_unchecked(scope) }); + .map(|scope| { + let s = to_restring_or_bail!(scope); + Scope::try_from_restring(s).map_err(GenericError::from) + }) + .transpose()?; Self::Workflow(ServiceId::new( scope, - workflow_target.workflow_name, - workflow_target.workflow_key, + ByteString::try_from(workflow_target.workflow_name) + .map_err(GenericError::from)?, + ByteString::try_from(workflow_target.workflow_key) + .map_err(GenericError::from)?, )) } }) diff --git a/crates/service-protocol-v4/src/message_codec/mod.rs b/crates/service-protocol-v4/src/message_codec/mod.rs index ac330932c2..22b0841a2d 100644 --- a/crates/service-protocol-v4/src/message_codec/mod.rs +++ b/crates/service-protocol-v4/src/message_codec/mod.rs @@ -28,7 +28,7 @@ use restate_types::journal_v2::{ CommandIndex, CommandType, CompletionType, EntryType, NotificationType, }; use restate_types::{LimitKey, Scope}; -use restate_util_string::ReString; +use restate_util_string::{ReString, RestateString}; const CUSTOM_MESSAGE_MASK: u16 = 0xFC00; @@ -401,17 +401,15 @@ impl Message { known_entries, partial_state, state_map, - key: key - .and_then(|b| String::from_utf8(b.to_vec()).ok()) - .unwrap_or_default(), + key: key.unwrap_or_default(), retry_count_since_last_stored_entry, duration_since_last_stored_entry: duration_since_last_stored_entry.as_millis() as u64, random_seed, - scope: scope.map(|scope| scope.to_string()), + scope: scope.map(|scope| scope.to_restring().into()), limit_key: if limit_key == &LimitKey::None { None } else { - Some(limit_key.to_string()) + Some(Bytes::from(limit_key.to_string())) }, idempotency_key: idempotency_key.map(|value| value.to_string()), }) diff --git a/crates/service-protocol/src/codec.rs b/crates/service-protocol/src/codec.rs index d9d065be11..a766619562 100644 --- a/crates/service-protocol/src/codec.rs +++ b/crates/service-protocol/src/codec.rs @@ -259,9 +259,9 @@ mod test_util { enrichment_result: Some(CallEnrichmentResult { invocation_id, invocation_target: InvocationTarget::VirtualObject { - name: entry.request.service_name.clone(), - key: entry.request.key.clone(), - handler: entry.request.handler_name.clone(), + name: entry.request.service_name.clone().into(), + key: entry.request.key.clone().into(), + handler: entry.request.handler_name.clone().into(), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }, @@ -297,9 +297,9 @@ mod test_util { enrichment_result: CallEnrichmentResult { invocation_id, invocation_target: InvocationTarget::VirtualObject { - name: entry.request.service_name.clone(), - key: entry.request.key.clone(), - handler: entry.request.handler_name.clone(), + name: entry.request.service_name.clone().into(), + key: entry.request.key.clone().into(), + handler: entry.request.handler_name.clone().into(), handler_ty: VirtualObjectHandlerType::Exclusive, scope: None, }, diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index a9f7f7836b..587d4b70b9 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -23,15 +23,15 @@ message InvocationTarget { bytes name = 2; bytes handler = 3; bytes key = 4; - // Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. - optional string scope = 5; + // (string) Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. + optional bytes scope = 5; } message ServiceId { bytes service_name = 1; bytes service_key = 2; - // Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. - optional string scope = 3; + // (string) Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. + optional bytes scope = 3; } message IdempotencyId { @@ -39,8 +39,8 @@ message IdempotencyId { optional string service_key = 2; string handler_name = 3; string idempotency_key = 4; - // Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. - optional string scope = 5; + // (string) Since v1.7.0 — scope for vqueue partitioning. Empty string means no scope. + optional bytes scope = 5; } message GenerationalNodeId { @@ -393,7 +393,8 @@ message ServiceInvocation { // TODO(tillrohrmann): Revisit string serialization of limit_key. Currently serialized as // "level1" or "level1/level2" which requires parsing on read. Check whether a dedicated // Protobuf message would be faster to serialize/deserialize. - string limit_key = 15; + // (string) + bytes limit_key = 15; } message StateMutation { diff --git a/crates/storage-api/src/protobuf_types.rs b/crates/storage-api/src/protobuf_types.rs index 7ea2c8c577..7fcb99aee0 100644 --- a/crates/storage-api/src/protobuf_types.rs +++ b/crates/storage-api/src/protobuf_types.rs @@ -108,7 +108,7 @@ pub mod v1 { use restate_types::service_protocol::ServiceProtocolVersion; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, Scope, journal_v2}; - use restate_util_string::RestateString; + use restate_util_string::{ReString, RestateString}; use super::dedup_sequence_number::Variant; use super::enriched_entry_header::{ @@ -145,6 +145,14 @@ pub mod v1 { Future, NotificationEntryIndex, NotificationResultVariant, }; + // Unsafe Bytes -> ReString conversion. Skips UTF-8 validation since storage data was + // validated at write-time. + #[inline] + fn bytes_to_restring(mut bytes: Bytes) -> ReString { + // SAFETY: storage payloads are written by the runtime after validating UTF-8. + unsafe { ReString::from_utf8_buf_unchecked(&mut bytes) } + } + impl TryFrom for crate::service_status_table::VirtualObjectStatus { type Error = ConversionError; @@ -225,7 +233,7 @@ pub mod v1 { service_key: value.service_key.map(Into::into), handler_name: value.service_handler.into(), idempotency_key: value.idempotency_key.into(), - scope: value.scope.map(|s| s.to_string()), + scope: value.scope.map(|s| s.to_restring().into()), } } } @@ -234,11 +242,10 @@ pub mod v1 { type Error = ConversionError; fn try_from(value: IdempotencyId) -> Result { - let scope = value - .scope - .map(|scope| Scope::try_new(&scope)) - .transpose() - .map_err(ConversionError::invalid_data)?; + // SAFETY: storage payloads were validated at write-time. + let scope = value.scope.map(|scope| unsafe { + Scope::from_restring_unchecked(bytes_to_restring(scope)) + }); Ok(restate_types::identifiers::IdempotencyId::new( value.service_name.into(), value.service_key.map(Into::into), @@ -1671,7 +1678,9 @@ pub mod v1 { .transpose()?; // Scope is persisted as part of InvocationTarget since v1.7.0 - let limit_key = limit_key.parse().map_err(ConversionError::invalid_data)?; + let limit_key = bytes_to_restring(limit_key) + .parse() + .map_err(ConversionError::invalid_data)?; Ok(restate_types::invocation::ServiceInvocation { invocation_id, @@ -1694,7 +1703,7 @@ pub mod v1 { impl From for ServiceInvocation { fn from(value: restate_types::invocation::ServiceInvocation) -> Self { - let limit_key = value.limit_key.to_string(); + let limit_key = Bytes::from(value.limit_key.to_string()); // Scope is persisted as part of InvocationTarget since v1.7.0 let invocation_target = InvocationTarget::from(value.invocation_target); let span_context = SpanContext::from(value.span_context); @@ -1750,7 +1759,7 @@ pub mod v1 { idempotency_key: value.idempotency_key.as_ref().map(|s| s.to_string()), submit_notification_sink: value.submit_notification_sink.map(Into::into), restate_version: value.restate_version.clone().into_string(), - limit_key: value.limit_key.to_string(), + limit_key: Bytes::from(value.limit_key.to_string()), } } } @@ -1840,15 +1849,12 @@ pub mod v1 { type Error = ConversionError; fn try_from(value: InvocationTarget) -> Result { - let name = - ByteString::try_from(value.name).map_err(ConversionError::invalid_data)?; - let handler = - ByteString::try_from(value.handler).map_err(ConversionError::invalid_data)?; - let scope = if let Some(ref scope) = value.scope { - Some(Scope::try_new(scope).map_err(ConversionError::invalid_data)?) - } else { - None - }; + let name = bytes_to_restring(value.name); + let handler = bytes_to_restring(value.handler); + // SAFETY: storage payloads were validated at write-time. + let scope = value.scope.map(|scope| unsafe { + Scope::from_restring_unchecked(bytes_to_restring(scope)) + }); match invocation_target::Ty::try_from(value.service_and_handler_ty) { Ok(invocation_target::Ty::Service) => { @@ -1862,8 +1868,7 @@ pub mod v1 { Ok(restate_types::invocation::InvocationTarget::VirtualObject { name, handler, - key: ByteString::try_from(value.key) - .map_err(ConversionError::invalid_data)?, + key: bytes_to_restring(value.key), handler_ty: restate_types::invocation::VirtualObjectHandlerType::Exclusive, scope, @@ -1873,8 +1878,7 @@ pub mod v1 { Ok(restate_types::invocation::InvocationTarget::VirtualObject { name, handler, - key: ByteString::try_from(value.key) - .map_err(ConversionError::invalid_data)?, + key: bytes_to_restring(value.key), handler_ty: restate_types::invocation::VirtualObjectHandlerType::Shared, scope, }) @@ -1883,8 +1887,7 @@ pub mod v1 { Ok(restate_types::invocation::InvocationTarget::Workflow { name, handler, - key: ByteString::try_from(value.key) - .map_err(ConversionError::invalid_data)?, + key: bytes_to_restring(value.key), handler_ty: restate_types::invocation::WorkflowHandlerType::Workflow, scope, }) @@ -1893,8 +1896,7 @@ pub mod v1 { Ok(restate_types::invocation::InvocationTarget::Workflow { name, handler, - key: ByteString::try_from(value.key) - .map_err(ConversionError::invalid_data)?, + key: bytes_to_restring(value.key), handler_ty: restate_types::invocation::WorkflowHandlerType::Shared, scope, }) @@ -1909,13 +1911,13 @@ pub mod v1 { impl From for InvocationTarget { fn from(value: restate_types::invocation::InvocationTarget) -> Self { - let scope = value.scope().map(|s| s.to_string()); + let scope = value.scope().map(|s| s.to_restring().into()); match value { restate_types::invocation::InvocationTarget::Service { name, handler, .. } => InvocationTarget { - name: name.into_bytes(), - handler: handler.into_bytes(), + name: name.into(), + handler: handler.into(), service_and_handler_ty: invocation_target::Ty::Service.into(), scope, ..InvocationTarget::default() @@ -1927,9 +1929,9 @@ pub mod v1 { handler_ty, .. } => InvocationTarget { - name: name.into_bytes(), - handler: handler.into_bytes(), - key: key.into_bytes(), + name: name.into(), + handler: handler.into(), + key: key.into(), scope, service_and_handler_ty: match handler_ty { restate_types::invocation::VirtualObjectHandlerType::Shared => { @@ -1948,9 +1950,9 @@ pub mod v1 { handler_ty, .. } => InvocationTarget { - name: name.into_bytes(), - handler: handler.into_bytes(), - key: key.into_bytes(), + name: name.into(), + handler: handler.into(), + key: key.into(), scope, service_and_handler_ty: match handler_ty { restate_types::invocation::WorkflowHandlerType::Shared => { @@ -1968,13 +1970,13 @@ pub mod v1 { impl From<&restate_types::invocation::InvocationTarget> for InvocationTarget { fn from(value: &restate_types::invocation::InvocationTarget) -> Self { - let scope = value.scope().map(|s| s.to_string()); + let scope = value.scope().map(|s| s.to_restring().into()); match value { restate_types::invocation::InvocationTarget::Service { name, handler, .. } => InvocationTarget { - name: name.as_bytes().clone(), - handler: handler.as_bytes().clone(), + name: name.clone().into(), + handler: handler.clone().into(), service_and_handler_ty: invocation_target::Ty::Service.into(), scope, ..InvocationTarget::default() @@ -1986,9 +1988,9 @@ pub mod v1 { handler_ty, .. } => InvocationTarget { - name: name.as_bytes().clone(), - handler: handler.as_bytes().clone(), - key: key.as_bytes().clone(), + name: name.clone().into(), + handler: handler.clone().into(), + key: key.clone().into(), scope, service_and_handler_ty: match handler_ty { restate_types::invocation::VirtualObjectHandlerType::Shared => { @@ -2007,9 +2009,9 @@ pub mod v1 { handler_ty, .. } => InvocationTarget { - name: name.as_bytes().clone(), - handler: handler.as_bytes().clone(), - key: key.as_bytes().clone(), + name: name.clone().into(), + handler: handler.clone().into(), + key: key.clone().into(), scope, service_and_handler_ty: match handler_ty { restate_types::invocation::WorkflowHandlerType::Shared => { @@ -2029,13 +2031,10 @@ pub mod v1 { type Error = ConversionError; fn try_from(service_id: ServiceId) -> Result { - // Safety: In principle, data is meant to be validated _before_ they - // are written to storage. - // RestrictedValue. Therefore, we validate it here. - let scope = service_id - .scope - .as_ref() - .map(|scope| unsafe { Scope::new_unchecked(scope) }); + // SAFETY: storage payloads were validated at write-time. + let scope = service_id.scope.map(|scope| unsafe { + Scope::from_restring_unchecked(bytes_to_restring(scope)) + }); Ok(restate_types::identifiers::ServiceId::new( scope, ByteString::try_from(service_id.service_name) @@ -2051,7 +2050,7 @@ pub mod v1 { ServiceId { service_key: service_id.key.into_bytes(), service_name: service_id.service_name.into_bytes(), - scope: service_id.scope.map(|s| s.to_string()), + scope: service_id.scope.map(|s| s.to_restring().into()), } } } diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 034a753698..ca5b037ad5 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -33,7 +33,7 @@ restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-test-util = { workspace = true, optional = true } restate-util-time = { workspace = true, features = ["serde", "serde_with"] } restate-util-bytecount = { workspace = true, features = ["serde"] } -restate-util-string = { workspace = true, features = ["serde", "bilrost"] } +restate-util-string = { workspace = true, features = ["serde", "bilrost", "bytestring"] } restate-utoipa = { workspace = true } adaptive-timeout = { workspace = true, features = ["serde"] } diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index ce9ce60618..e5e3a5a545 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -719,9 +719,9 @@ impl IdempotencyId { idempotency_key: ByteString, ) -> Self { IdempotencyId { - service_name: invocation_target.service_name().clone(), - service_key: invocation_target.key().cloned(), - service_handler: invocation_target.handler_name().clone(), + service_name: invocation_target.service_name().clone().into(), + service_key: invocation_target.key().cloned().map(Into::into), + service_handler: invocation_target.handler_name().clone().into(), idempotency_key, scope: invocation_target.scope().cloned(), partition_key: invocation_id.partition_key(), @@ -1536,7 +1536,7 @@ mod tests { #[test] fn unscoped_service_invocations_use_consistent_bounded_partition_key_set() { - fn service_partition_key_set(service_name: &ByteString) -> HashSet { + fn service_partition_key_set(service_name: &str) -> HashSet { (0..UNSCOPED_SERVICE_PARTITION_KEY_FANOUT) .map(|bucket| unscoped_service_partition_key(service_name, bucket)) .collect() @@ -1561,7 +1561,7 @@ mod tests { #[test] fn unscoped_services_have_different_partition_key_sets() { - fn service_partition_key_set(service_name: &ByteString) -> HashSet { + fn service_partition_key_set(service_name: &str) -> HashSet { (0..UNSCOPED_SERVICE_PARTITION_KEY_FANOUT) .map(|bucket| unscoped_service_partition_key(service_name, bucket)) .collect() diff --git a/crates/types/src/invocation/mod.rs b/crates/types/src/invocation/mod.rs index a4b1f342fd..53b4f4106f 100644 --- a/crates/types/src/invocation/mod.rs +++ b/crates/types/src/invocation/mod.rs @@ -155,23 +155,23 @@ pub enum Short<'a> { #[derive(Eq, Hash, PartialEq, Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum InvocationTarget { Service { - name: ByteString, - handler: ByteString, + name: ReString, + handler: ReString, #[serde(skip_serializing_if = "Option::is_none")] scope: Option, }, VirtualObject { - name: ByteString, - key: ByteString, - handler: ByteString, + name: ReString, + key: ReString, + handler: ReString, handler_ty: VirtualObjectHandlerType, #[serde(skip_serializing_if = "Option::is_none")] scope: Option, }, Workflow { - name: ByteString, - key: ByteString, - handler: ByteString, + name: ReString, + key: ReString, + handler: ReString, handler_ty: WorkflowHandlerType, #[serde(skip_serializing_if = "Option::is_none")] scope: Option, @@ -179,7 +179,7 @@ pub enum InvocationTarget { } impl InvocationTarget { - pub fn service(name: impl Into, handler: impl Into) -> Self { + pub fn service(name: impl Into, handler: impl Into) -> Self { Self::Service { name: name.into(), handler: handler.into(), @@ -188,8 +188,8 @@ impl InvocationTarget { } pub fn scoped_service( - name: impl Into, - handler: impl Into, + name: impl Into, + handler: impl Into, scope: Scope, ) -> Self { Self::Service { @@ -209,9 +209,9 @@ impl InvocationTarget { } pub fn virtual_object( - name: impl Into, - key: impl Into, - handler: impl Into, + name: impl Into, + key: impl Into, + handler: impl Into, handler_ty: VirtualObjectHandlerType, ) -> Self { Self::VirtualObject { @@ -224,9 +224,9 @@ impl InvocationTarget { } pub fn scoped_virtual_object( - name: impl Into, - key: impl Into, - handler: impl Into, + name: impl Into, + key: impl Into, + handler: impl Into, handler_ty: VirtualObjectHandlerType, scope: Scope, ) -> Self { @@ -240,9 +240,9 @@ impl InvocationTarget { } pub fn workflow( - name: impl Into, - key: impl Into, - handler: impl Into, + name: impl Into, + key: impl Into, + handler: impl Into, handler_ty: WorkflowHandlerType, ) -> Self { Self::Workflow { @@ -255,9 +255,9 @@ impl InvocationTarget { } pub fn scoped_workflow( - name: impl Into, - key: impl Into, - handler: impl Into, + name: impl Into, + key: impl Into, + handler: impl Into, handler_ty: WorkflowHandlerType, scope: Scope, ) -> Self { @@ -288,7 +288,7 @@ impl InvocationTarget { } } - pub fn service_name(&self) -> &ByteString { + pub fn service_name(&self) -> &ReString { match self { InvocationTarget::Service { name, .. } => name, InvocationTarget::VirtualObject { name, .. } => name, @@ -304,10 +304,9 @@ impl InvocationTarget { key, handler_ty, .. - } if handler_ty == &VirtualObjectHandlerType::Exclusive => Some(LockName::new( - ServiceName::new(name.as_ref()), - ReString::new(key), - )), + } if handler_ty == &VirtualObjectHandlerType::Exclusive => { + Some(LockName::new(ServiceName::new(name.as_str()), key.clone())) + } // NOTE: Workflows don't have locks as their invariant (run once per ID) is enforced by // the partition processor at ingestion/creation time (via invocation/entry status) // Therefore, we treat them as normal services when it comes to locking and vqueue @@ -320,7 +319,7 @@ impl InvocationTarget { } } - pub fn key(&self) -> Option<&ByteString> { + pub fn key(&self) -> Option<&ReString> { match self { InvocationTarget::Service { .. } => None, InvocationTarget::VirtualObject { key, .. } => Some(key), @@ -328,7 +327,7 @@ impl InvocationTarget { } } - pub fn handler_name(&self) -> &ByteString { + pub fn handler_name(&self) -> &ReString { match self { InvocationTarget::Service { handler, .. } => handler, InvocationTarget::VirtualObject { handler, .. } => handler, @@ -1402,8 +1401,8 @@ impl InvocationQuery { InvocationQuery::Invocation(iid) => *iid, InvocationQuery::Workflow(wfid) => InvocationId::generate( &InvocationTarget::Workflow { - name: wfid.service_name.clone(), - key: wfid.key.clone(), + name: wfid.service_name.clone().into(), + key: wfid.key.clone().into(), // Doesn't matter handler: Default::default(), // Must be the workflow handler type diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index 9ed192d8e4..19f17f7b8b 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -66,9 +66,8 @@ pub use limit_key::LimitKey; pub use locking::*; pub use node_id::*; use restate_encoding::BilrostNewType; -use restate_util_string::{ - Interned, ReString, RestateString, RestrictedValue, RestrictedValueError, -}; +pub use restate_util_string::ReString; +use restate_util_string::{Interned, RestateString, RestrictedValue, RestrictedValueError}; pub use restate_version::*; pub use version::*; diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 33d92296d4..fd556352b5 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -5371,10 +5371,10 @@ impl StateMachineApplyContext<'_, S> { // synthesize a virtual object invocation target so we can generate the vqueue id from. let target = InvocationTarget::VirtualObject { - name: service_id.service_name.clone(), - key: service_id.key.clone(), + name: service_id.service_name.clone().into(), + key: service_id.key.clone().into(), // fake, doesn't matter. - handler: ByteString::from_static("_state_mutation"), + handler: ReString::from_static("_state_mutation"), handler_ty: VirtualObjectHandlerType::Exclusive, scope: service_id.scope.clone(), }; diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 4ea09d91a1..f2a981207b 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -73,7 +73,8 @@ message StartMessage { bool partial_state = 5; // If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. - string key = 6; + // (string) + bytes key = 6; // Retry count since the last stored entry. // @@ -92,12 +93,12 @@ message StartMessage { uint64 random_seed = 9; // If this invocation was called within a scope, returns the scope - // Since V7 - optional string scope = 10; + // (string) Since V7 + optional bytes scope = 10; // If this invocation was called with a limit key, returns the limit key - // Since V7 - optional string limit_key = 11; + // (string) Since V7 + optional bytes limit_key = 11; // If this invocation was called with an idempotency key, returns the idempotency key // Since V7 @@ -474,30 +475,33 @@ message SleepCompletionNotificationMessage { // Fallible: Yes // Type: 0x0400 + D message CallCommandMessage { - string service_name = 1; - string handler_name = 2; + // (string) + bytes service_name = 1; + // (string) + bytes handler_name = 2; bytes parameter = 3; repeated Header headers = 4; // If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. - string key = 5; + // (string) + bytes key = 5; // If present, it must be non empty. optional string idempotency_key = 6; // Scope for the invocation target. Empty string means no scope (unscoped invocation). - // Since V7. - optional string scope = 7; + // (string) Since V7. + optional bytes scope = 7; // Limit key for the invocation. Empty string means no limit key. // A limit key is only valid if scope is set. // TODO(tillrohrmann): Revisit string serialization of limit_key. Currently serialized as // "level1" or "level1/level2" which requires parsing on read. Check whether a dedicated // Protobuf message would be faster to serialize/deserialize. - // Since V7. - optional string limit_key = 8; + // (string) Since V7. + optional bytes limit_key = 8; uint32 invocation_id_notification_idx = 10; uint32 result_completion_id = 11; @@ -532,8 +536,10 @@ message CallCompletionNotificationMessage { // Fallible: Yes // Type: 0x0400 + E message OneWayCallCommandMessage { - string service_name = 1; - string handler_name = 2; + // (string) + bytes service_name = 1; + // (string) + bytes handler_name = 2; bytes parameter = 3; @@ -546,22 +552,23 @@ message OneWayCallCommandMessage { repeated Header headers = 5; // If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise. - string key = 6; + // (string) + bytes key = 6; // If present, it must be non empty. optional string idempotency_key = 7; // Scope for the invocation target. Empty string means no scope (unscoped invocation). - // Since V7. - optional string scope = 8; + // (string) Since V7. + optional bytes scope = 8; // Limit key for the invocation. Empty string means no limit key. // A limit key is only valid if scope is set. // TODO(tillrohrmann): Revisit string serialization of limit_key. Currently serialized as // "level1" or "level1/level2" which requires parsing on read. Check whether a dedicated // Protobuf message would be faster to serialize/deserialize. - // Since V7. - optional string limit_key = 9; + // (string) Since V7. + optional bytes limit_key = 9; uint32 invocation_id_notification_idx = 10; string name = 12; @@ -742,21 +749,26 @@ message Header { } message WorkflowTarget { - string workflow_name = 1; - string workflow_key = 2; + // (string) + bytes workflow_name = 1; + // (string) + bytes workflow_key = 2; // Scope for the invocation target. Empty string means no scope (unscoped invocation). - // Since V7. - optional string scope = 3; + // (string) Since V7. + optional bytes scope = 3; } message IdempotentRequestTarget { - string service_name = 1; - optional string service_key = 2; - string handler_name = 3; + // (string) + bytes service_name = 1; + // (string) + optional bytes service_key = 2; + // (string) + bytes handler_name = 3; string idempotency_key = 4; // Scope for the invocation target. Empty string means no scope (unscoped invocation). - // Since V7. - optional string scope = 5; + // (string) Since V7. + optional bytes scope = 5; } message Void { diff --git a/util/string/src/string.rs b/util/string/src/string.rs index 7d05e27abe..7a1abfb541 100644 --- a/util/string/src/string.rs +++ b/util/string/src/string.rs @@ -302,6 +302,16 @@ impl From for bytestring::ByteString { } } +#[cfg(feature = "bytestring")] +impl From for ReString { + #[inline] + fn from(value: bytestring::ByteString) -> Self { + let mut bytes: bytes::Bytes = value.into_bytes(); + // SAFETY: ByteString guarantees valid UTF-8. + unsafe { ReString::from_utf8_buf_unchecked(&mut bytes) } + } +} + #[derive(Clone)] enum Inner { RefCounted(Arc), @@ -374,7 +384,7 @@ impl Ord for ReString { impl Eq for ReString {} -impl> PartialEq for ReString { +impl + ?Sized> PartialEq for ReString { #[inline(always)] fn eq(&self, other: &T) -> bool { self.as_str().eq(other.as_ref())