connect with Fluentd

This chapter introduces the cooperation with Fluentd via Apache Arrow data format for the efficient importing of IoT/M2M log data.

Overview

In the technological domain known as IoT/M2M, various software has been developed to store and analyze the large amount of log data generated by devices such as cell phones, automobiles, and various sensors, as well as PCs and servers. This is because the data generated by a large number of devices from time-by-time tend to grow up huge, and special architecture/technology is required to process it in a practical amount of time.

PG-Strom's features are designed and implemented for high-speed processing of log data of such a scale. On the other hand, it tends to be a time-consuming job to transfer and import these data into a database in order to make it searchable/summarizable on such a scale. Therefore, PG-Strom includes a fluent-plugin-arrow-file module for Fluentd that outputs the data in Apache Arrow format, and tries to deal with the problem.

Fluentd with PG-Strom Overview

Fluentd is a log collection tool developed by Sadayuki Furuhashi. It is the de facto standard for collecting and storing a wide variety of log data, ranging from server logs like Syslog to device logs of IoT/M2M devices. Fluentd allows customization of the input/output and processing of log data by adding plugins written in Ruby. As of 2022, more than 800 plugins have been introduced on the official website.

PG-Strom can handle two types of data formats: PostgreSQL Heap format (transactional row data) and Apache Arrow format (structured column data). The Apache Arrow format is suitable for workloads like those in the IoT/M2M, where huge amounts of data are generated time-by-time.

arrow-file plugin

This chapter describes the approach to write out the log data collected by Fluentd in Apache Arrow format, and to refere it with PG-Strom. We assume td-agent here, that is a stable version of Fluentd provided by Treasure Data.

PG-Strom includes the fluent-plugin-arrow-file module. This allows Fluentd to write out the log data it collects as an Apache Arrow format file with a specified schema structure. Using PG-Strom's Arrow_Fdw, this Apache Arrow format file can be accessed as an foreign table. In addition, GPU-Direct SQL allows to load these files extremely fast, if the storage system is appropriately configured.

This has the following advantages:

  • There is no need to import data into the DB because PG-Strom directly accesses the files output by Fluentd.
  • The data readout (I/O load) for the searching and summarizing process can be kept to a minimum because of the columnar data format.
  • You can archive outdated log data only by moving files on the OS.

On the other hand, in cases that it takes a long time to store a certain size of log (for example, log generation is rare), another method such as outputting to a PostgreSQL table is suitable for more real-time log analysis. This is because the size of the Record Batch needs to be reasonably large to acquire the performance benefits of the Apache Arrow format.

Internals

There are several types of plugins for Fluentd: Input plugins to receive logs from outside, Parser plugins to shape the logs, Buffer plugins to temporarily store the received logs, and Output plugins to output the logs. The arrow-file plugin is categorized as Output plugin. It writes out a "chunk" of log data passed from the Buffer plugin in Apache Arrow format with the schema structure specified in the configuration.

Fluentd Components

The Input/Parser plugin is responsible for converting the received logs into a common format so that the Buffer and Output plugins can handle the input data without being aware of its format.

The common format is a pair of tag, an identifier that can be used to sort the logs, a log timestamp time, and record, an associative array formed from the raw logs. The arrow-file plugin writes to an Apache Arrow format file with the tag and time fields and each element of the record associative array as a column (some may be omitted). Therefore, the output destination file name and schema definition information (mapping of associative array elements to columns/types) are mandatory configuration parameters.

Installation

Install the td-agent package for Linux distribution you are using. The rake-compiler module is required to install the arrow-file plugin, so please install it before.

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

$ sudo /opt/td-agent/bin/fluent-gem install rake-compiler

Next, download the source code for PG-Strom and build arrow-file plugin in the fluentd directory.

$ git clone https://github.com/heterodb/pg-strom.git
$ cd pg-strom/fluentd
$ make TD_AGENT=1 gem
$ sudo make TD_AGENT=1 install

To confirm that the Fluentd plugin is installed, run the following command.

$ /opt/td-agent/bin/fluent-gem list | grep arrow
fluent-plugin-arrow-file (0.2)

Configuration

As mentioned above, the arrow-file plugin requires the output path of the file and the schema definition at least.

In addition to this, in order to acquire the best performance for searching and aggregation processing, the single chunk of data inside the Apache Arrow file, called the Record Batch, needs to be reasonably large in size. The arrow-file plugin creates a Record Batch for each chunk passed from the Buffer plugin. Therefore, the buffer size of the Buffer plugin should be set with the size of the Record Batch in mind. By the default, the Buffer plugin is set to take a buffer size of 256MB.

The configuration parameters for the arrow-file plugin are as follows:

path [type: String ] (Required)
Specify the file path of the destination.
This is a required parameter, and you can use placeholders shown below.
Placeholders Description
%Y The current year expressed as a four-digit number.
%y The current year expressed as a number in the last two digits of the year.
%m A two-digit number representing the current month, 01-12.
%d A two-digit number representing the current day from 01 to 31.
%H A two-digit number representing the hour of the current time 00-23.
%M A two-digit number representing the minute of the current time, 00-59.
%S A two-digit number representing the second of the current time, 00-59.
%p The PID of the current Fluentd process.

The placeholder is replaced when the chunk is written out. If an Apache Arrow format file of the same name exists, the Record Batch will be appended to it. If it does not exist, a new Apache Arrow format file is created and the first Record Batch is written out.

However, if the size of the existing Apache Arrow file exceeds the filesize_threshold setting described below, rename the existing file and then create a new one.

(Example) path /tmp/arrow_logs/my_logs_%y%m%d.%p.log

The output Apache Arrow file updates the footer area to point to all Record Batches each time a chunk is written out. Therefore, the generated Apache Arrow file can be read immediately. However, to avoid access conflicts, exclusive handling is required using lockf(3).

schema_defs [type: String ] (Required)
Specify the schema definition of the Apache Arrow format file output by fluent-plugin-arrow-file.
This is a required parameter, and define the schema structure using strings in the following format.
  • schema_defs := column_def1[,column_def2 ...]
  • column_def := <column_name>=<column_type>[;<column_attrs>]
    • <column_name> is the name of the column, which must match the key value of the associative array passed from Fluentd to arrow-file plugin.
    • <column_type> is the data type of the column. See the following table.
    • <column_attrs> is an additional attribute for columns. At this time, only the following attributes are supported.
      • stat_enabled ... The statistics for the configured columns will be collected and the maximum/minimum values for each Record Batch will be set as custom metadata in the form of max_values=... and min_values=....

(Example) schema_defs "ts=Timestamp;stat_enabled,dev_id=Uint32,temperature=Float32,humidity=Float32"

Data types supported by the arrow-file plugin

Data types Description
Int8 Int16 Int32 Int64 Signed integer with the specified bit width.
Uint8 Uint16 Uint32 Uint64 Unsigned integer with the specified bit width
Float16 Float32 Float64 Floating point number with half-precision(16bit), single-precision(32bit) and double-precision(64bit).
Decimal Decimal128 128-bit fixed decimal; 256-bit fixed decimal is currently not supported.
Timestamp Timestamp[sec] Timestamp[ms] Timestamp[us] Timestamp[ns] Timestamp type with the specified precision. If the precision is omitted, it is treated the same as [us].
Time Time[sec] Time[ms] Time[us] Time[ns] Time type with the specified precision. If the precision is omitted, it is treated the same as [sec].
Date Date[Day] Date[ms] Date type with the specified precision. If the precision is omitted, it is treated the same as [day].
Utf8 String type.
Ipaddr4 IP address(IPv4) type. This is represented as a FixedSizeBinary type with byteWidth=4 and custom metadata pg_type=pg_catalog.inet.
Ipaddr6 IP address(IPv6) type. This is represented as a FixedSizeBinary type with byteWidth=16 and custom metadata pg_type=pg_catalog.inet.
ts_column [type: String / default: unspecified]
Specify a column name to set the timestamp value of the log passed from Fluentd (not from record).
This parameter is usually a date-time type such as Timestamp, and the stat_enabled attribute should also be specified to achieve fast search.
tag_column [type: String / default: unspecified]
Specify a column name to set the tag value of the log passed from Fluentd (not from record).
This parameter is usually a string type such as utf8.
filesize_threshold [type: Integer / default: 10000]
Specify the threshold for switching the output destination file in MB.
By default, the output destination is switched when the file size exceeds about 10GB.

Example

As a simple example, this chapter shows a configuration for monitoring the log of a local Apache Httpd server, parsing it field by field, and writing it to an Apache Arrow format file.

By setting <source>, /var/log/httpd/access_log will be the data source. Then, the apache2 Parse plugin will cut out the fields: host, user, time, method, path, code, size, referer, agent.

The fields are then passed to the arrow-file plugin as an associative array. In schema_defs in <match>, the column definitions of the Apache Arrow file corresponding to the fields are set. For simplicity of explanation, the chunk size is set to a maximum of 4MB / 200 lines in the <buffer> tag, and it is set to pass to the Output plugin in 10 seconds at most.

Example configuration of /etc/td-agent/td-agent.conf

<source>
  @type tail
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd_access.pos
  tag httpd
  format apache2
  <parse>
    @type apache2
    expression /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>(?:[^\"]|\\.)*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>(?:[^\"]|\\.)*)" "(?<agent>(?:[^\"]|\\.)*)")?$/
    time_format %d/%b/%Y:%H:%M:%S %z
  </parse>
</source>

<match httpd>
  @type arrow_file
  path /tmp/mytest%Y%m%d.%p.arrow
  schema_defs "ts=Timestamp[sec],host=Utf8,method=Utf8,path=Utf8,code=Int32,size=Int32,referer=Utf8,agent=Utf8"
  ts_column "ts"
  <buffer>
    flush_interval 10s
    chunk_limit_size 4MB
    chunk_limit_records 200
  </buffer>
</match>

Start the td-agent service.

$ sudo systemctl start td-agent

See the following output. The placeholder for /tmp/mytest%Y%m%d.%p.arrow set in path is replaced and the Apache Httpd log is written to /tmp/mytest20220124.3206341.arrow.

$ arrow2csv /tmp/mytest20220124.3206341.arrow --head --offset 300 --limit 10
"ts","host","method","path","code","size","referer","agent"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/js/theme_extra.js",200,195,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/js/theme.js",200,4401,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/img/fluentd_overview.png",200,121459,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/search/main.js",200,3027,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/Lato/lato-regular.woff2",200,182708,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/fontawesome-webfont.woff2?v=4.7.0",200,77160,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/RobotoSlab/roboto-slab-v7-bold.woff2",200,67312,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/Lato/lato-bold.woff2",200,184912,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:43","192.168.77.95","GET","/docs/ja/search/worker.js",200,3724,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:43","192.168.77.95","GET","/docs/ja/img/favicon.ico",200,1150,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"

Let's map the output file to PostgreSQL using PG-Strom's Arrow_Fdw.

postgres=# IMPORT FOREIGN SCHEMA mytest
           FROM SERVER arrow_fdw INTO public
           OPTIONS (file '/tmp/mytest20220124.3206341.arrow');
IMPORT FOREIGN SCHEMA

postgres=# SELECT ts, host, path FROM mytest WHERE code = 404;
         ts          |     host      |         path
---------------------+---------------+----------------------
 2022-01-24 12:02:06 | 192.168.77.73 | /~kaigai/ja/fluentd/
(1 row)

postgres=# EXPLAIN SELECT ts, host, path FROM mytest WHERE code = 404;
                                  QUERY PLAN
------------------------------------------------------------------------------
 Custom Scan (GpuScan) on mytest  (cost=4026.12..4026.12 rows=3 width=72)
   GPU Filter: (code = 404)
   referenced: ts, host, path, code
   files0: /tmp/mytest20220124.3206341.arrow (read: 128.00KB, size: 133.94KB)
(4 rows)

The example above shows how the generated Apache Arrow file can be mapped as an external table and accessed with SQL.

Search conditions can be given to search each field of the log formed on the Fluentd side. In the example above, the log with HTTP status code 404 is searched and one record is found.