Depending on the format of your incoming message contents, and whether you want to forward these contents in .JSON format instead of a raw message format, you may have to filter, parse, or otherwise modify your incoming message contents.
The following examples illustrate what incoming message contents look like in raw message format and in .JSON message format, and how to handle and process message contents in .JSON format.
If your incoming message contents are in a raw message format, and you do not filter or otherwise modify the contents, syslog-ng PE will automatically forward the contents in the same raw message format, and the output will look similar to this:
Example: incoming Google Pub/Sub Cloud message contents (in raw message format), the resulting $MESSAGE macro contents, and the processed output message contents with a prepended Message header
Incoming message contents in raw message format on the Google Pub/Sub Cloud side:
<38>Feb 25 14:09:07 testhost testapp: test message mytestmessage
The contents of the relevant $MESSAGE macros:
* name='MESSAGE', value='{"data":"<38>Feb 25 14:09:07 testhost testapp: test message mytestmessage"}'
By default, the syslog-ng PE application prepends a Message header to the $MESSAGE macro contents to form an output with a similar structure:
<13>Sep 29 17:03:58 ubuntu {"data":"<38>Feb 25 14:09:07 testhost testapp: test message mytestmessage"}\x0a
The syslog-ng PE application's google-pubsub() source collects Google Pub/Sub messages in a format that has two message parts (Message body and Message attributes) on the Google Cloud Platform side.
However, depending on how you configure the Google Pub/Sub messaging service on the Platform side, you may receive incoming messages in .JSON format.
Configuring syslog-ng PE to process incoming .JSON message formats
Even if your incoming message contents are originally in .JSON format, syslog-ng PE will store them in a raw message format.
If you want to forward your message contents in a .JSON format along with message attributes, you can use format-json() as a rewrite rule or as a destination template.
Example: configuring syslog-ng PE to transform raw incoming message format to .JSON message format using format-json()
The following configuration example illustrates how you can use format-json() to configure syslog-ng PE to transform the raw message format of the incoming message to a .JSON format message that contains the contents of both the Message body and the Message attributes.
log {
source {
google-pubsub(project("syslog-ng-pubsub-src") subscription("sub") credentials("syslog-ng-pubsub-creds.json"));
};
if {
parser { json-parser( prefix(".gpub.data.") template("$MESSAGE")); };
}
else {
rewrite { set("$MESSAGE" value(".gpub.data")); };
};
destination {
file("/tmp/output" template("$(format-json --key .pubsub.* --shift 8 --key .gpub.* --shift 6)\n"));
};
};
In this case, the incoming message contents on the Google Pub/Sub Platform side are the following:
{"message_body_json_field1": "value1", "message_body_json_field2": "value2"}
The Pub/Sub attributes of the message are the following:
pubsubmsgattribute1
pubsubmsgattribute2
The output message looks like this:
{"pubsubmsgattribute2":"pubsubattrvalue2","pubsubmsgattribute1":"pubsubattrvalue1","data":{"message_body_json_field2":"value2","message_body_json_field1":"value1"}
The google-pubsub() source has the following options.
Required parameters
-
credentials()
-
project()
-
subscription()
Optional parameters
-
ack-tracker-batch-size()
-
ack-tracker-timeout()
-
log-fetch-limit()
-
prefix()
-
time-reopen()
-
workers()
The google-pubsub() source options, in more detail:
ack-tracker-batch-size()
Type: |
string |
Default: |
100 |
Required: |
no |
Description: Optional parameter.
The syslog-ng PE application retains acknowledgements on the source side and either acknowledges an ack-tracker-batch-size() number of messages in a batch, or sends acknowledgements after the ack-tracker-timeout() expires. If the value of your ack-tracker-timeout() is larger than the value of your Acknowledgement deadline, it may result in message duplication.
ack-tracker-timeout()
Type: |
time [milliseconds] |
Default: |
3000 |
Required: |
no |
Description: Optional parameter.
The syslog-ng PE application retains acknowledgements on the source side and either acknowledges an ack-tracker-batch-size() number of messages in a batch, or sends acknowledgements after the ack-tracker-timeout() expires. If the value of your ack-tracker-timeout() is larger than the value of your Acknowledgement deadline, it may result in message duplication.
credentials()
Type: |
string |
Default: |
n/a |
Required: |
yes |
Description: Required parameter.
The credentials of your Google Pub/Sub project.
log-fetch-limit()
Type: |
number |
Default: |
100 |
Required: |
no |
Description: Optional parameter.
The maximum number of messages fetched from a source during a single poll loop.
prefix()
Type: |
string |
Default: |
.pubsub. |
Required: |
no |
Description: Optional parameter.
This prefix will be added to the name of the macros created from the message attributes of the Google Pub/Sub message.
project()
Type: |
string |
Default: |
n/a |
Required: |
yes |
Description: Required parameter.
The ID of your Google Pub/Sub project.
subscription()
Type: |
string |
Default: |
n/a |
Required: |
yes |
Description: Required parameter.
The ID of your Google Pub/Sub subscription.
time-reopen()
Type: |
number (seconds) |
Default: |
60 |
Required: |
no |
Description: Optional parameter.
The time to wait in seconds before a broken connection is reestablished.
workers()
Type: |
integer |
Default: |
1 |
Required: |
no |
Description: Optional parameter.
Specifies the number of worker threads (at least 1) that syslog-ng PE uses to receive messages from the Google Pub/Sub messaging service. Increasing the number of worker threads can drastically improve the performance of the destination.