Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Medion
MEF
Meizu
meme
memfd
Mertz
messagebird
metakey
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ sinks-logs = [
"sinks-http",
"sinks-humio",
"sinks-influxdb",
"sinks-journald",
"sinks-kafka",
"sinks-keep",
"sinks-loki",
Expand Down Expand Up @@ -847,6 +848,7 @@ sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
sinks-influxdb = []
sinks-journald = ["nix/fs", "nix/socket", "nix/uio"]
sinks-kafka = ["dep:rdkafka"]
sinks-keep = []
sinks-mezmo = []
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/19177_add_journald_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added journald sink.

authors: wiktorsikora
48 changes: 48 additions & 0 deletions config/examples/journald.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Journald Sink Example
# ------------------------------------------------------------------------------
# A simple example showing how to send logs to systemd journal.
# This demonstrates forwarding logs from a dummy source, parsing syslog messages,
# and mapping syslog severity levels to journal priority levels.

sources:
dummy_logs:
type: "demo_logs"
format: "syslog"
interval: 1

transforms:
parse_syslog:
type: "remap"
inputs: ["dummy_logs"]
source: |
. = parse_syslog!(string!(.message))

map_severity_to_priority:
# Map the syslog severity to journal priority levels
type: "remap"
inputs: ["parse_syslog"]
source: |
.priority = if .severity == "emerg" {
0
} else if .severity == "alert" {
1
} else if .severity == "crit" {
2
} else if .severity == "err" {
3
} else if .severity == "warning" {
4
} else if .severity == "notice" {
5
} else if .severity == "info" {
6
} else if .severity == "debug" {
7
} else {
6
}

sinks:
local_journal:
inputs: ["map_severity_to_priority"]
type: "journald"
87 changes: 87 additions & 0 deletions src/sinks/journald/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use futures::{FutureExt, future};
use vector_lib::configurable::configurable_component;

use crate::{
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{Healthcheck, VectorSink, journald::sink::JournaldSink},
};

/// Configuration for the `journald` sink.
#[configurable_component(sink(
"journald",
"Send observability events to the systemd journal for local logging."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct JournaldSinkConfig {
/// Path to the journald socket.
/// If not specified, the default systemd journal socket will be used.
#[configurable(metadata(docs::examples = "\"/run/systemd/journal/socket\".to_string()"))]
#[serde(default = "default_journald_path")]
pub journald_path: String,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}

fn default_journald_path() -> String {
"/run/systemd/journal/socket".to_string()
}

impl Default for JournaldSinkConfig {
fn default() -> Self {
Self {
journald_path: default_journald_path(),
acknowledgements: AcknowledgementsConfig::default(),
}
}
}

impl GenerateConfig for JournaldSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self::default()).unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "journald")]
impl SinkConfig for JournaldSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = JournaldSink::new(self.clone())?;
let healthcheck = future::ok(()).boxed();

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::log()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<JournaldSinkConfig>();
}

#[test]
fn test_config_default() {
let config = JournaldSinkConfig::default();
assert_eq!(
config.journald_path,
"/run/systemd/journal/socket".to_string()
);
}
}
128 changes: 128 additions & 0 deletions src/sinks/journald/journald_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use nix::fcntl::{FcntlArg, SealFlag};
use nix::sys::socket::{ControlMessage, MsgFlags};
use std::io;
use std::os::fd::AsRawFd;
use std::path::Path;

/// A writer for journald that sends log messages over a Unix domain socket.
/// For protocol details, see [this link](https://systemd.io/JOURNAL_NATIVE_PROTOCOL/)
pub struct JournaldWriter {
socket: tokio::net::UnixDatagram,
buf: Vec<u8>,
}

impl JournaldWriter {
pub fn new(journald_path: impl AsRef<Path>) -> io::Result<Self> {
let socket = tokio::net::UnixDatagram::unbound()?;
socket.connect(journald_path)?;
let writer = Self {
socket,
buf: vec![],
};
Ok(writer)
}

/// Add a string field to the buffer.
pub fn add_str(&mut self, key: &str, value: &str) {
self.write_with_length(key, |w| {
w.buf.extend_from_slice(value.as_bytes());
});
}

/// Add a field with arbitrary bytes to the buffer.
pub fn add_bytes(&mut self, key: &str, value: &[u8]) {
self.write_with_length(key, |w| {
w.buf.extend_from_slice(value);
});
}

/// Write the buffered data to journald.
/// Returns the number of bytes sent.
pub async fn write(&mut self) -> io::Result<usize> {
if self.buf.is_empty() {
return Ok(0);
}
let bytes_sent = self.send_payload(&self.buf).await?;
// Clear the buffer after sending
// We could also keep the buffer for reuse, but by doing this we ensure that
// we don't allocate too much memory for long time in case of rare large payloads.
self.buf = vec![];
Ok(bytes_sent)
Comment on lines +45 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If send_payload returns an error here, self.buf is not cleared, so the next event may append to the previous failed payload.

Suggested change
let bytes_sent = self.send_payload(&self.buf).await?;
// Clear the buffer after sending
// We could also keep the buffer for reuse, but by doing this we ensure that
// we don't allocate too much memory for long time in case of rare large payloads.
self.buf = vec![];
Ok(bytes_sent)
let payload = std::mem::take(&mut self.buf);
let bytes_sent = self.send_payload(&payload).await?;
Ok(bytes_sent)

}

/// Send the payload to journald.
/// If the payload is too large, it will attempt to send it via a memfd.
/// Returns the number of bytes sent.
async fn send_payload(&self, payload: &[u8]) -> io::Result<usize> {
self.socket.send(payload).await.or_else(|error| {
if Some(nix::libc::EMSGSIZE) == error.raw_os_error() {
self.send_with_memfd(payload)
} else {
Err(error)
}
})
}

/// Send the payload using a memfd if the payload is too large for a direct send.
/// This method uses a blocking call to write the payload to a memfd.
fn send_with_memfd(&self, payload: &[u8]) -> io::Result<usize> {
// If the payload is too large, we should try to send it via a memfd
// This method is described in the journald protocol: https://systemd.io/JOURNAL_NATIVE_PROTOCOL/
let memfd = nix::sys::memfd::memfd_create(
c"journald_payload",
nix::sys::memfd::MemFdCreateFlag::MFD_ALLOW_SEALING,
)?;

// Write the payload to the memfd
let written = nix::unistd::write(memfd, payload)?;
if written != payload.len() {
return Err(io::Error::other("Failed to write all data to memfd"));
}

// Seal the memfd as required by journald protocol
nix::fcntl::fcntl(memfd, FcntlArg::F_ADD_SEALS(SealFlag::all()))?;

// Send the memfd file descriptor to journald
let scm = &[memfd];
let cmsgs = [ControlMessage::ScmRights(scm)];
nix::sys::socket::sendmsg::<()>(
self.socket.as_raw_fd(),
&[],
&cmsgs,
MsgFlags::empty(),
None,
)?;

Ok(written)
}

/// Append a sanitized and length-encoded field into the buffer.
fn write_with_length(&mut self, key: &str, write_cb: impl FnOnce(&mut Self)) {
self.sanitize_key(key);
self.buf.push(b'\n');
self.buf.extend_from_slice(&[0; 8]); // Length tag, to be populated after writing the value
let start = self.buf.len();
write_cb(self);
let end = self.buf.len();
self.buf[start - 8..start].copy_from_slice(&((end - start) as u64).to_le_bytes());
self.buf.push(b'\n');
}

/// Sanitize a key and convert it to uppercase.
fn sanitize_key(&mut self, key: &str) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small edge case I noticed: this may still produce field names that journald rejects.

The native receiver validates both plain and length-encoded fields with journal_field_valid(..., false): plain fields, length-encoded fields.

A few examples that look worth handling:

  • 🔥 can sanitize to an empty name, but empty names are rejected: l <= 0
  • _SYSTEMD_UNIT keeps the leading _, but client-provided protected fields are rejected: p[0] == '_'
  • 123abc keeps the leading digit, but digit-prefixed names are rejected: ascii_isdigit(p[0])
  • if an escape/fallback prefix is added, the final name still needs to stay within 64 bytes: l > 64

self.buf.extend(
key.bytes()
.map(|c| match c {
// As per journald protocol, '=' and '\n' are illegal in keys so we replace them with '_'.
b'=' | b'\n' => b'_',
// '.' is ignored in keys, so we replace it with '_'.
b'.' => b'_',
_ => c,
})
.filter(|&c| c == b'_' || char::from(c).is_ascii_alphanumeric())
.map(|c| char::from(c).to_ascii_uppercase() as u8)
// Journald keys are limited to 64 bytes
.take(64),
);
}
}
6 changes: 6 additions & 0 deletions src/sinks/journald/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod config;
mod sink;

mod journald_writer;

pub use config::JournaldSinkConfig;
Loading
Loading