Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ sinks-logs = [
"sinks-http",
"sinks-humio",
"sinks-influxdb",
"sinks-journald",
"sinks-kafka",
"sinks-keep",
"sinks-loki",
Expand Down Expand Up @@ -824,6 +825,7 @@ sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
sinks-influxdb = []
sinks-journald = ["nix/fs", "nix/socket", "nix/uio"]
sinks-kafka = ["dep:rdkafka"]
sinks-keep = []
sinks-mezmo = []
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/19177_add_journald_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added journald sink.

authors: wiktorsikora
48 changes: 48 additions & 0 deletions config/examples/journald.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Journald Sink Example
# ------------------------------------------------------------------------------
# A simple example showing how to send logs to systemd journal.
# This demonstrates forwarding logs from a dummy source, parsing syslog messages,
# and mapping syslog severity levels to journal priority levels.

sources:
dummy_logs:
type: "demo_logs"
format: "syslog"
interval: 1

transforms:
parse_syslog:
type: "remap"
inputs: ["dummy_logs"]
source: |
. = parse_syslog!(string!(.message))

map_severity_to_priority:
# Map the syslog severity to journal priority levels
type: "remap"
inputs: ["parse_syslog"]
source: |
.priority = if .severity == "emerg" {
0
} else if .severity == "alert" {
1
} else if .severity == "crit" {
2
} else if .severity == "err" {
3
} else if .severity == "warning" {
4
} else if .severity == "notice" {
5
} else if .severity == "info" {
6
} else if .severity == "debug" {
7
} else {
6
}

sinks:
local_journal:
inputs: ["map_severity_to_priority"]
type: "journald"
87 changes: 87 additions & 0 deletions src/sinks/journald/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use futures::{future, FutureExt};
use vector_lib::configurable::configurable_component;

use crate::{
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{journald::sink::JournaldSink, Healthcheck, VectorSink},
};

/// Configuration for the `journald` sink.
#[configurable_component(sink(
"journald",
"Send observability events to the systemd journal for local logging."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct JournaldSinkConfig {
/// Path to the journald socket.
/// If not specified, the default systemd journal socket will be used.
#[configurable(metadata(docs::examples = "\"/run/systemd/journal/socket\".to_string()"))]
#[serde(default = "default_journald_path")]
pub journald_path: String,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}

fn default_journald_path() -> String {
"/run/systemd/journal/socket".to_string()
}

impl Default for JournaldSinkConfig {
fn default() -> Self {
Self {
journald_path: default_journald_path(),
acknowledgements: AcknowledgementsConfig::default(),
}
}
}

impl GenerateConfig for JournaldSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self::default()).unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "journald")]
impl SinkConfig for JournaldSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = JournaldSink::new(self.clone())?;
let healthcheck = future::ok(()).boxed();

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::log()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn generate_config() {
crate::test_util::test_generate_config::<JournaldSinkConfig>();
}

#[test]
fn test_config_default() {
let config = JournaldSinkConfig::default();
assert_eq!(
config.journald_path,
"/run/systemd/journal/socket".to_string()
);
}
}
128 changes: 128 additions & 0 deletions src/sinks/journald/journald_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use nix::fcntl::{FcntlArg, SealFlag};
use nix::sys::socket::{ControlMessage, MsgFlags};
use std::io;
use std::os::fd::AsRawFd;
use std::path::Path;

/// A writer for journald that sends log messages over a Unix domain socket.
/// For protocol details, see [this link](https://systemd.io/JOURNAL_NATIVE_PROTOCOL/)
pub struct JournaldWriter {
socket: tokio::net::UnixDatagram,
buf: Vec<u8>,
}

impl JournaldWriter {
pub fn new(journald_path: impl AsRef<Path>) -> io::Result<Self> {
let socket = tokio::net::UnixDatagram::unbound()?;
socket.connect(journald_path)?;
let writer = Self {
socket,
buf: vec![],
};
Ok(writer)
}

/// Add a string field to the buffer.
pub fn add_str(&mut self, key: &str, value: &str) {
self.write_with_length(key, |w| {
w.buf.extend_from_slice(value.as_bytes());
});
}

/// Add a field with arbitrary bytes to the buffer.
pub fn add_bytes(&mut self, key: &str, value: &[u8]) {
self.write_with_length(key, |w| {
w.buf.extend_from_slice(value);
});
}

/// Write the buffered data to journald.
/// Returns the number of bytes sent.
pub async fn write(&mut self) -> io::Result<usize> {
if self.buf.is_empty() {
return Ok(0);
}
let bytes_sent = self.send_payload(&self.buf).await?;
// Clear the buffer after sending
// We could also keep the buffer for reuse, but by doing this we ensure that
// we don't allocate too much memory for long time in case of rare large payloads.
self.buf = vec![];
Ok(bytes_sent)
Comment on lines +45 to +50
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If send_payload returns an error here, self.buf is not cleared, so the next event may append to the previous failed payload.

Suggested change
let bytes_sent = self.send_payload(&self.buf).await?;
// Clear the buffer after sending
// We could also keep the buffer for reuse, but by doing this we ensure that
// we don't allocate too much memory for long time in case of rare large payloads.
self.buf = vec![];
Ok(bytes_sent)
let payload = std::mem::take(&mut self.buf);
let bytes_sent = self.send_payload(&payload).await?;
Ok(bytes_sent)

}

/// Send the payload to journald.
/// If the payload is too large, it will attempt to send it via a memfd.
/// Returns the number of bytes sent.
async fn send_payload(&self, payload: &[u8]) -> io::Result<usize> {
self.socket.send(payload).await.or_else(|error| {
if Some(nix::libc::EMSGSIZE) == error.raw_os_error() {
self.send_with_memfd(payload)
} else {
Err(error)
}
})
}

/// Send the payload using a memfd if the payload is too large for a direct send.
/// This method uses a blocking call to write the payload to a memfd.
fn send_with_memfd(&self, payload: &[u8]) -> io::Result<usize> {
// If the payload is too large, we should try to send it via a memfd
// This method is described in the journald protocol: https://systemd.io/JOURNAL_NATIVE_PROTOCOL/
let memfd = nix::sys::memfd::memfd_create(
c"journald_payload",
nix::sys::memfd::MemFdCreateFlag::MFD_ALLOW_SEALING,
)?;

// Write the payload to the memfd
let written = nix::unistd::write(memfd, payload)?;
if written != payload.len() {
return Err(io::Error::other("Failed to write all data to memfd"));
}

// Seal the memfd as required by journald protocol
nix::fcntl::fcntl(memfd, FcntlArg::F_ADD_SEALS(SealFlag::all()))?;

// Send the memfd file descriptor to journald
let scm = &[memfd];
let cmsgs = [ControlMessage::ScmRights(scm)];
nix::sys::socket::sendmsg::<()>(
self.socket.as_raw_fd(),
&[],
&cmsgs,
MsgFlags::empty(),
None,
)?;

Ok(written)
}

/// Append a sanitized and length-encoded field into the buffer.
fn write_with_length(&mut self, key: &str, write_cb: impl FnOnce(&mut Self)) {
self.sanitize_key(key);
self.buf.push(b'\n');
self.buf.extend_from_slice(&[0; 8]); // Length tag, to be populated after writing the value
let start = self.buf.len();
write_cb(self);
let end = self.buf.len();
self.buf[start - 8..start].copy_from_slice(&((end - start) as u64).to_le_bytes());
self.buf.push(b'\n');
}

/// Sanitize a key and convert it to uppercase.
fn sanitize_key(&mut self, key: &str) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small edge case I noticed: this may still produce field names that journald rejects.

The native receiver validates both plain and length-encoded fields with journal_field_valid(..., false): plain fields, length-encoded fields.

A few examples that look worth handling:

  • 🔥 can sanitize to an empty name, but empty names are rejected: l <= 0
  • _SYSTEMD_UNIT keeps the leading _, but client-provided protected fields are rejected: p[0] == '_'
  • 123abc keeps the leading digit, but digit-prefixed names are rejected: ascii_isdigit(p[0])
  • if an escape/fallback prefix is added, the final name still needs to stay within 64 bytes: l > 64

self.buf.extend(
key.bytes()
.map(|c| match c {
// As per journald protocol, '=' and '\n' are illegal in keys so we replace them with '_'.
b'=' | b'\n' => b'_',
// '.' is ignored in keys, so we replace it with '_'.
b'.' => b'_',
_ => c,
})
.filter(|&c| c == b'_' || char::from(c).is_ascii_alphanumeric())
.map(|c| char::from(c).to_ascii_uppercase() as u8)
// Journald keys are limited to 64 bytes
.take(64),
);
}
}
6 changes: 6 additions & 0 deletions src/sinks/journald/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod config;
mod sink;

mod journald_writer;

pub use config::JournaldSinkConfig;
Loading
Loading