With the kafka-c() destination of syslog-ng PE, you can directly publish log messages to the Apache Kafka message bus, where subscribers can access them. The kafka-c() destination has the following options.
Required options
The following options are required:
-
bootstrap-servers()
-
topic().
batch-lines()
Type: | number [lines] |
Default: | 1 |
Description: Specifies how many lines are flushed to a destination in one batch. The syslog-ng PE application waits for this number of lines to accumulate and sends them off in a single batch. Increasing this number increases throughput as more messages are sent in a single batch, but also increases message latency.
For example, if you set batch-lines() to 100, syslog-ng PE waits for 100 messages.
If the batch-timeout() option is disabled, the syslog-ng PE application flushes the messages if it has sent batch-lines() number of messages, or the queue became empty. If you stop or reload syslog-ng PE or in case of network sources, the connection with the client is closed, syslog-ng PE automatically sends the unsent messages to the destination.
If the batch-timeout() option is enabled and the queue becomes empty, syslog-ng PE flushes the messages only if batch-timeout() expires, or the batch reaches the limit set in batch-lines().
For optimal performance, make sure that the syslog-ng PE source that feeds messages to this destination is configured properly: the value of the log-iw-size() option of the source must be higher than the batch-lines()*workers() of the destination. Otherwise, the size of the batches cannot reach the batch-lines() limit.
NOTE: The syslog-ng PE configuration accepts this option with sync-send() set to both "yes" or "no", but the option will only take effect if you set sync-send() to "yes".
NOTE: If you set sync-send() to "yes", the number you specify for batch-lines() affects how many messages syslog-ng PE packs into once transaction.
batch-timeout()
Type: | time [milliseconds] |
Default: | -1 (disabled) |
Description: Specifies the time syslog-ng PE waits for lines to accumulate in the output buffer. The syslog-ng PE application sends batches to the destinations evenly. The timer starts when the first message arrives to the buffer, so if only few messages arrive, syslog-ng PE sends messages to the destination once every batch-timeout() milliseconds at most.
NOTE: The syslog-ng PE configuration accepts this option with sync-send() set to both "yes" or "no", but the option will only take effect if you set sync-send() to "yes".
NOTE: When setting batch-timeout(), consider the value of the transaction.timeout.ms Kafka property. If in case of timeout (that is, if syslog-ng PE does not receive batch-lines() amount of messages) the value of batch-timeout() exceeds the value of transaction.timeout.ms, syslog-ng PE will not send out messages in time.
For more information about the default values of the transaction.timeout.ms Kafka property, see the librdkafka documentation.
bootstrap-servers()
Type: | string |
Default: | N/A |
Description: Required option. Specifies the hostname or IP address of the Kafka server. When specifying an IP address, IPv4 (for example, 192.168.0.1) or IPv6 (for example, [::1]) can be used as well. Use a colon (:) after the address to specify the port number of the server. When specifying multiple addresses, use a comma to separate the addresses, for example, bootstrap-servers("127.0.0.1:2525,remote-server-hostname:6464")
config()
Type: | N/A |
Default: | N/A |
Description: Advanced configuration option to fine-tune all properties of the official Kafka producer. For details, see the librdkafka documentation.
The syntax of the config() option is the following:
config ( "acks" => "all" "compression.type" => "snappy" )
disk-buffer()
Description: This option enables putting outgoing messages into the disk buffer of the destination to avoid message loss in case of a system failure on the destination side. It has the following options:
reliable() | |||
Type: | yes|no | ||
Default: | no | ||
Description: If set to yes, syslog-ng PE cannot lose logs in case of reload/restart, unreachable destination or syslog-ng PE crash. This solution provides a slower, but more reliable disk-buffer option. It is created and initialized at startup and gradually grows as new messages arrive. If set to no, the normal disk-buffer will be used. This provides a faster, but less reliable disk-buffer option.
|
compaction() | |
Type: | yes|no |
Default: | no |
Description: If set to yes, syslog-ng PE prunes the unused space in the LogMessage representation, making the disk queue size smaller at the cost of some CPU time. Setting the compaction() argument to yes is recommended when numerous name-value pairs are unset during processing, or when the same names are set multiple times. |
disk-buf-size() | |
Type: | number (bytes) |
Default: | |
Description: This is a required option. The maximum size of the disk-buffer in bytes. The minimum value is 1048576 bytes. If you set a smaller value, the minimum value will be used automatically. It replaces the old log-disk-fifo-size() option. |
mem-buf-length() | |
Type: | number (messages) |
Default: | 10000 |
Description: Use this option if the option reliable() is set to no. This option contains the number of messages stored in overflow queue. It replaces the old log-fifo-size() option. It inherits the value of the global log-fifo-size() option if provided. If it is not provided, the default value is 10000 messages. Note that this option will be ignored if the option reliable() is set to yes. |
mem-buf-size() | |
Type: | number (bytes) |
Default: | 163840000 |
Description: Use this option if the option reliable() is set to yes. This option contains the size of the messages in bytes that is used in the memory part of the disk buffer. It replaces the old log-fifo-size() option. It does not inherit the value of the global log-fifo-size() option, even if it is provided. Note that this option will be ignored if the option reliable() is set to no. |
qout-size() | |
Type: | number (messages) |
Default: | 1000 |
Description: The number of messages stored in the output buffer of the destination. NOTE: If you change the value of this option and the disk-buffer already exists, the change will take effect when the disk-buffer becomes empty. |
Example: Examples for using disk-buffer()
In the following case reliable disk-buffer() is used.
destination d_demo { network( "127.0.0.1" port(3333) disk-buffer( mem-buf-size(10000) disk-buf-size(2000000) reliable(yes) dir("/tmp/disk-buffer") ) ); };
In the following case normal disk-buffer() is used.
destination d_demo { network( "127.0.0.1" port(3333) disk-buffer( mem-buf-length(10000) disk-buf-size(2000000) reliable(no) dir("/tmp/disk-buffer") ) ); };
truncate-size-ratio() | |||
Type: | number (between 0 and 1) | ||
Default: | 0.1 (10%) | ||
Description: Limits the truncation of the disk-buffer file. Truncating the disk-buffer file can slow down the disk IO operations, but it saves disk space, so syslog-ng only truncates the file, if the possible disk gain is more than truncate-size-ratio() times disk-buf-size().
|
fallback-topic()
Type: | string |
Default: | N/A |
Description: If the resolved topic() template is not a valid Kafka topic , syslog-ng PE will use fallback-topic() to send messages.
NOTE: If instead of strings, you use actual templates (that is, a macro like ${MESSAGE}, or a template function like $(format-json)) in the topic() option, configuring the fallback-topic() option is required.
frac-digits()
Type: | number |
Default: | 0 |
Description: The syslog-ng application can store fractions of a second in the timestamps according to the ISO8601 format. The frac-digits() parameter specifies the number of digits stored. The digits storing the fractions are padded by zeros if the original timestamp of the message specifies only seconds. Fractions can always be stored for the time the message was received.
NOTE: The syslog-ng PE application can add the fractions to non-ISO8601 timestamps as well.
NOTE: As syslog-ng PE is precise up to the microsecond, when the frac-digits() option is set to a value higher than 6, syslog-ng PE will truncate the fraction seconds in the timestamps after 6 digits.
flush-timeout-on-reload()
Type: | integer in milliseconds |
Default: | 1000 |
Description: When syslog-ng PE reloads, the Kafka client will also reload.
The flush-timeout-on-reload() option specifies the number of milliseconds syslog-ng PE waits for the Kafka client to flush out in-flight messages. In-flight messages may be:
-
messages that are passed to the Kafka client for sending, which have been sent, but not delivered
-
messages not yet sent out.
flush-timeout-on-shutdown()
Type: | integer in milliseconds |
Default: | 60000 |
Description: When syslog-ng PE shuts down, the Kafka client will also shut down.
The flush-timeout-on-shutdown() option specifies the number of milliseconds syslog-ng PE waits for the Kafka client to flush out in-flight messages. In-flight messages may be:
-
Messages passed to the Kafka client for sending, already sent, but not yet delivered.
-
Messages not yet sent by the Kafka client.
NOTE: To avoid losing messages, One Identity recommends that you use the sync-send() option set to "yes" in addition to using the disk-buffer() option.
hook-commands()
Description: This option makes it possible to run external programs when the relevant driver is initialized or torn down. The hook-commands() can be used with all source and destination drivers with the exception of the usertty() and internal() drivers.
NOTE: The syslog-ng PE application must be able to start and restart the external program, and have the necessary permissions to do so. For example, if your host is running AppArmor or SELinux, you might have to modify your AppArmor or SELinux configuration to enable syslog-ng PE to run external applications.
Using hook-commands() when syslog-ng PE starts or stops
To run an external program when syslog-ng PE starts or stops, use the following options:
startup() | |
Type: |
string |
Default: |
N/A |
Description: Defines the external program that is run as syslog-ng PE starts. |
shutdown() | |
Type: |
string |
Default: |
N/A |
Description: Defines the external program that is run as syslog-ng PE stops. |
Using hook-commands() when syslog-ng PE reloads
To run an external program when the syslog-ng PE configuration is initiated or torn down, for example, on startup/shutdown or during a syslog-ng PE reload, use the following options:
setup() | |
Type: |
string |
Default: |
N/A |
Description: Defines an external program that is run when the syslog-ng PE configuration is initiated, for example, on startup or during a syslog-ng PE reload. |
teardown() | |
Type: |
string |
Default: |
N/A |
Description: Defines an external program that is run when the syslog-ng PE configuration is stopped or torn down, for example, on shutdown or during a syslog-ng PE reload. |
Example: Using hook-commands() with a network source
In the following example, the hook-commands() is used with the network() driver and it opens an iptables port automatically as syslog-ng PE is started/stopped.
The assumption in this example is that the LOGCHAIN chain is part of a larger ruleset that routes traffic to it. Whenever the syslog-ng PE created rule is there, packets can flow, otherwise the port is closed.
source { network(transport(udp) hook-commands( startup("iptables -I LOGCHAIN 1 -p udp --dport 514 -j ACCEPT") shutdown("iptables -D LOGCHAIN 1") ) ); };
key()
Type: | template |
Default: | empty string |
Description: The key of the partition under which the message is published. You can use templates to change the topic dynamically based on the source or the content of the message, for example, key("${PROGRAM}").
local-time-zone()
Type: | name of the timezone, or the timezone offset |
Default: | The local timezone. |
Description: Sets the timezone used when expanding filename and tablename templates.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")), or as the timezone offset in +/-HH:MM format, for example, +01:00). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo directory.
message()
Type: | message template |
Default: | $ISODATE $HOST $MSGHDR$MSG |
Description: The message as published to Apache Kafka. You can use templates and template functions (for example, format-json()) to format the message, for example, message("$(format-json --scope rfc5424 --exclude DATE --key ISODATE)").
For details on formatting messages in JSON format, see format-json.
on-error()
Accepted values: |
drop-message|drop-property|fallback-to-string| silently-drop-message|silently-drop-property|silently-fallback-to-string |
Default: | Use the global setting (which defaults to drop-message) |
Description: Controls what happens when type-casting fails and syslog-ng PE cannot convert some data to the specified type. By default, syslog-ng PE drops the entire message and logs the error. Currently the value-pairs() option uses the settings of on-error().
-
drop-message: Drop the entire message and log an error message to the internal() source. This is the default behavior of syslog-ng PE.
-
drop-property: Omit the affected property (macro, template, or message-field) from the log message and log an error message to the internal() source.
-
fallback-to-string: Convert the property to string and log an error message to the internal() source.
-
silently-drop-message: Drop the entire message silently, without logging the error.
-
silently-drop-property: Omit the affected property (macro, template, or message-field) silently, without logging the error.
-
silently-fallback-to-string: Convert the property to string silently, without logging the error.
persist-name()
Type: | string |
Default: |
Description:If you receive the following error message during syslog-ng PE startup, set the persist-name() option of the duplicate drivers:
Error checking the uniqueness of the persist names, please override it with persist-name option. Shutting down.
This error happens if you use identical drivers in multiple sources, for example, if you configure two file sources to read from the same file. In this case, set the persist-name() of the drivers to a custom string, for example, persist-name("example-persist-name1").
poll-timeout()
Type: | integer in milliseconds |
Default: | 1000 |
Description: Specifies the frequency your syslog-ng PE queries the Kafka client about the amount of messages sent since the last poll-timeout (). In case of multithreading, the first syslog-ng PE worker is responsible for poll-timeout().
retries()
Type: | number (of attempts) |
Default: | 3 |
Description: If syslog-ng PE cannot send a message, it will try again until the number of attempts reaches retries().
If the number of attempts reaches retries(), syslog-ng PE will wait for time-reopen() time, then tries sending the message again.
send-time-zone()
Accepted values: | name of the timezone, or the timezone offset |
Default: | local timezone |
Description: Specifies the time zone associated with the messages sent by syslog-ng, if not specified otherwise in the message or in the destination driver. For details, see Timezones and daylight saving.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")), or as the timezone offset in +/-HH:MM format, for example, +01:00). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo directory.
sync-send()
Type: | yes | no |
Default: | no |
Description: When sync-send() is set to "yes", syslog-ng PE sends the message reliably: it sends a message to the Kafka server, then waits for a reply. In case of failure, syslog-ng PE repeats sending the message, as set in the retries() parameter. If sending the message fails for retries() times, syslog-ng PE will wait for time-reopen() time, then tries sending the message again.
This method ensures reliable message transfer, but is very slow.
When sync-send() is set to "no", syslog-ng PE sends messages asynchronously, and receives the response asynchronously. In case of a problem, syslog-ng PE cannot resend the messages.
NOTE: The underlying Kafka client (that is, librdkafka) may retry sending messages to syslog-ng PE independently several times.
This method is fast, but the transfer is not reliable. Several thousands of messages can be lost before syslog-ng PE recognizes the error.
Caution:
Hazard of data loss! If sync-send() is set to "no", the messages passed to the Kafka client can be lost. To avoid data loss, One Identity recommends that you set sync-send() to "yes", as this setting delivers messages to the Kafka client more reliably. |
NOTE: If you want to use the sync-send() option set to "yes", One Identity recommends that you use a Kafka server with version number 0.11.0 or higher.
throttle()
Type: | number |
Default: | 0 |
Description: Sets the maximum number of messages sent to the destination per second. Use this output-rate-limiting functionality only when using disk-buffer as well to avoid the risk of losing messages. Specifying 0 or a lower value sets the output limit to unlimited.
time-reopen()
Type: | number (seconds) |
Default: | 60 |
Description: Optional parameter.
If message sending fails, syslog-ng PE retries sending the messages for retries() time (3 times by default) before waiting for time-reopen() time to try sending it again.
time-zone()
Type: | name of the timezone, or the timezone offset |
Default: | unspecified |
Description: Convert timestamps to the timezone specified by this option. If this option is not set, then the original timezone information in the message is used. Converting the timezone changes the values of all date-related macros derived from the timestamp, for example, HOUR. For the complete list of such macros, see Date-related macros.
The timezone can be specified by using the name, for example, time-zone("Europe/Budapest")), or as the timezone offset in +/-HH:MM format, for example, +01:00). On Linux and UNIX platforms, the valid timezone names are listed under the /usr/share/zoneinfo directory.
topic()
Type: | string or template |
Default: | N/A |
Description: Required option. The Kafka topic under which the message is published. You can use templates to change the topic dynamically based on the source or the content of the message, for example, topic("${HOST}").
NOTE: Valid topic names for the topic() and fallback-topic() options have the following limitations:
-
The topic name must contain characters within the pattern [-._a-zA-Z0-9].
-
The length of the topic name must be between 1 and 249 characters.
NOTE: If you use templates with the topic() option, configuring the fallback-topic() option is also required.
ts-format()
Type: | rfc3164, bsd, rfc3339, iso |
Default: | rfc3164 |
Description: Override the global timestamp format (set in the global ts-format() parameter) for the specific destination. For details, see ts-format().
NOTE: This option applies only to file and file-like destinations. Destinations that use specific protocols (for example, network(), or syslog()) ignore this option. For protocol-like destinations, use a template locally in the destination, or use the proto-template option.
workers()
Type: | integer |
Default: | 1 |
Description: The workers are only responsible for formatting the messages that need to be delivered to the Kafka clients. Configure this option only if your Kafka clients have many threads and they do not receive enough messages. If you set the sync-send() option to yes, the number of workers is automatically set to 1.
NOTE: Kafka clients have their own threadpool, entirely independent from any syslog-ng PE settings. The workers() option has no effect on this threadpool.