Columnar data store (Arrow_Fdw)

Overview

PostgreSQL tables internally consist of 8KB blocks1, and block contains tuples which is a data structure of all the attributes and metadata per row. It collocates date of a row closely, so it works effectively for INSERT/UPDATE-major workloads, but not suitable for summarizing or analytics of mass-data.

It is not usual to reference all the columns in a table on mass-data processing, and we tend to reference a part of columns in most cases. In this case, the storage I/O bandwidth consumed by unreferenced columns are waste, however, we have no easy way to fetch only particular columns referenced from the row-oriented data structure.

In case of column oriented data structure, in an opposite manner, it has extreme disadvantage on INSERT/UPDATE-major workloads, however, it can pull out maximum performance of storage I/O on mass-data processing workloads because it can loads only referenced columns. From the standpoint of processor efficiency also, column-oriented data structure looks like a flat array that pulls out maximum bandwidth of memory subsystem for GPU, by special memory access pattern called Coalesced Memory Access.

Row/Column data structure

What is Apache Arrow?

Apache Arrow is a data format of structured data to save in columnar-form and to exchange other applications. Some applications for big-data processing support the format, and it is easy for self-developed applications to use Apache Arrow format since they provides libraries for major programming languages like C,C++ or Python.

Row/Column data structure

Apache Arrow format file internally contains Schema portion to define data structure, and one or more RecordBatch to save columnar-data based on the schema definition. For data types, it supports integers, strint (variable-length), date/time types and so on. Indivisual columnar data has its internal representation according to the data types.

Data representation in Apache Arrow is not identical with the representation in PostgreSQL. For example, epoch of timestamp in Arrow is 1970-01-01 and it supports multiple precision. On the other hands, epoch of timestamp in PostgreSQL is 2001-01-01 and it has microseconds accuracy.

Arrow_Fdw allows to read Apache Arrow files on PostgreSQL using foreign table mechanism. If an Arrow file contains 8 of record batches that has million items for each column data, for example, we can access 8 million rows on the Arrow files through the foreign table.

Operations

Creation of foreign tables

Usually it takes the 3 steps below to create a foreign table.

  • Define a foreign-data-wrapper using CREATE FOREIGN DATA WRAPPER command
  • Define a foreign server using CREATE SERVER command
  • Define a foreign table using CREATE FOREIGN TABLE command

The first 2 steps above are included in the CREATE EXTENSION pg_strom command. All you need to run individually is CREATE FOREIGN TABLE command last.

CREATE FOREIGN TABLE flogdata (
    ts        timestamp,
    sensor_id int,
    signal1   smallint,
    signal2   smallint,
    signal3   smallint,
    signal4   smallint,
) SERVER arrow_fdw
  OPTIONS (file '/path/to/logdata.arrow');

Data type of columns specified by the CREATE FOREIGN TABLE command must be matched to schema definition of the Arrow files to be mapped.

Arrow_Fdw also supports a useful manner using IMPORT FOREIGN SCHEMA statement. It automatically generates a foreign table definition using schema definition of the Arrow files. It specifies the foreign table name, schema name to import, and path name of the Arrow files using OPTION-clause. Schema definition of Arrow files contains data types and optional column name for each column. It declares a new foreign table using these information.

IMPORT FOREIGN SCHEMA flogdata
  FROM SERVER arrow_fdw
  INTO public
OPTIONS (file '/path/to/logdata.arrow');

Foreign table options

Arrow_Fdw supports the options below. Right now, all the options are for foreign tables.

Target Option Description
foreign table file It maps an Arrow file specified on the foreign table.
foreign table files It maps multiple Arrow files specified by comma (,) separated files list on the foreign table.
foreign table dir It maps all the Arrow files in the directory specified on the foreign table.
foreign table suffix When dir option is given, it maps only files with the specified suffix, like .arrow for example.

Data type mapping

Arrow data types are mapped on PostgreSQL data types as follows.

Arrow data types PostgreSQL data types Remarks
Int int2,int4,int8 is_signed attribute is ignored. bitWidth attribute supports only 16,32 or 64.
FloatingPoint float2,float4,float8 float2 is enhanced by PG-Strom.
Binary bytea
Utf8 text
Decimal numeric
Date date Adjusted as if unitsz=Day
Time time Adjusted as if unitsz=MicroSecond
Timestamp timestamp Adjusted as if unitsz=MicroSecond
Interval interval
List array of base type It supports only 1-dimensional List(WIP).
Struct composite type PG composite type must be preliminary defined.
Union --------
FixedSizeBinary char(n)
FixedSizeList --------
Map --------

How to read EXPLAIN

EXPLAIN command show us information about Arrow files reading.

The example below is an output of query execution plan that includes flineorder foreign table that mapps an Arrow file of 309GB.

=# EXPLAIN
    SELECT sum(lo_extendedprice*lo_discount) as revenue
      FROM flineorder,date1
     WHERE lo_orderdate = d_datekey
       AND d_year = 1993
       AND lo_discount between 1 and 3
       AND lo_quantity < 25;
                                             QUERY PLAN
-----------------------------------------------------------------------------------------------------
 Aggregate  (cost=12632759.02..12632759.03 rows=1 width=32)
   ->  Custom Scan (GpuPreAgg)  (cost=12632754.43..12632757.49 rows=204 width=8)
         Reduction: NoGroup
         Combined GpuJoin: enabled
         GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
         ->  Custom Scan (GpuJoin) on flineorder  (cost=9952.15..12638126.98 rows=572635 width=12)
               Outer Scan: flineorder  (cost=9877.70..12649677.69 rows=4010017 width=16)
               Outer Scan Filter: ((lo_discount >= 1) AND (lo_discount <= 3) AND (lo_quantity < 25))
               Depth 1: GpuHashJoin  (nrows 4010017...572635)
                        HashKeys: flineorder.lo_orderdate
                        JoinQuals: (flineorder.lo_orderdate = date1.d_datekey)
                        KDS-Hash (size: 66.06KB)
               GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
               NVMe-Strom: enabled
               referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
               files0: /opt/nvme/lineorder_s401.arrow (size: 309.23GB)
               ->  Seq Scan on date1  (cost=0.00..78.95 rows=365 width=4)
                     Filter: (d_year = 1993)
(18 rows)

According to the EXPLAIN output, we can see Custom Scan (GpuJoin) scans flineorder foreign table. file0 item shows the filename (/opt/nvme/lineorder_s401.arrow) on behalf of the foreign table and its size. If multiple files are mapped, any files are individually shown, like file1, file2, ... The referenced item shows the list of referenced columns. We can see this query touches lo_orderdate, lo_quantity, lo_extendedprice and lo_discount columns.

In addition, GPU Preference: GPU0 (Tesla V100-PCIE-16GB) and NVMe-Strom: enabled shows us the scan on flineorder uses SSD-to-GPU Direct SQL mechanism.

VERBOSE option outputs more detailed information.

=# EXPLAIN VERBOSE
    SELECT sum(lo_extendedprice*lo_discount) as revenue
      FROM flineorder,date1
     WHERE lo_orderdate = d_datekey
       AND d_year = 1993
       AND lo_discount between 1 and 3
       AND lo_quantity < 25;
                              QUERY PLAN
--------------------------------------------------------------------------------
 Aggregate  (cost=12632759.02..12632759.03 rows=1 width=32)
   Output: sum((pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount))))
   ->  Custom Scan (GpuPreAgg)  (cost=12632754.43..12632757.49 rows=204 width=8)
         Output: (pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount)))
         Reduction: NoGroup
         GPU Projection: flineorder.lo_extendedprice, flineorder.lo_discount, pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount))
         Combined GpuJoin: enabled
         GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
         ->  Custom Scan (GpuJoin) on public.flineorder  (cost=9952.15..12638126.98 rows=572635 width=12)
               Output: flineorder.lo_extendedprice, flineorder.lo_discount
               GPU Projection: flineorder.lo_extendedprice::bigint, flineorder.lo_discount::integer
               Outer Scan: public.flineorder  (cost=9877.70..12649677.69 rows=4010017 width=16)
               Outer Scan Filter: ((flineorder.lo_discount >= 1) AND (flineorder.lo_discount <= 3) AND (flineorder.lo_quantity < 25))
               Depth 1: GpuHashJoin  (nrows 4010017...572635)
                        HashKeys: flineorder.lo_orderdate
                        JoinQuals: (flineorder.lo_orderdate = date1.d_datekey)
                        KDS-Hash (size: 66.06KB)
               GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
               NVMe-Strom: enabled
               referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
               files0: /opt/nvme/lineorder_s401.arrow (size: 309.23GB)
                 lo_orderpriority: 33.61GB
                 lo_extendedprice: 17.93GB
                 lo_ordertotalprice: 17.93GB
                 lo_revenue: 17.93GB
               ->  Seq Scan on public.date1  (cost=0.00..78.95 rows=365 width=4)
                     Output: date1.d_datekey
                     Filter: (date1.d_year = 1993)
(28 rows)

The verbose output additionally displays amount of column-data to be loaded on reference of columns. The load of lo_orderdate, lo_quantity, lo_extendedprice and lo_discount columns needs to read 87.4GB in total. It is 28.3% towards the filesize (309.2GB).

How to make Arrow files

This section introduces the way to transform dataset already stored in PostgreSQL database system into Apache Arrow file.

Using PyArrow+Pandas

A pair of PyArrow module, developed by Arrow developers community, and Pandas data frame can dump PostgreSQL database into an Arrow file.

The example below reads all the data in table t0, then write out them into /tmp/t0.arrow.

import pyarrow as pa
import pandas as pd

X = pd.read_sql(sql="SELECT * FROM t0", con="postgresql://localhost/postgres")
Y = pa.Table.from_pandas(X)
f = pa.RecordBatchFileWriter('/tmp/t0.arrow', Y.schema)
f.write_table(Y,1000000)      # RecordBatch for each million rows
f.close()

Please note that the above operation once keeps query result of the SQL on memory, so should pay attention on memory consumption if you want to transfer massive rows at once.

Using Pg2Arrow

On the other hand, pg2arrow command, developed by PG-Strom Development Team, enables us to write out query result into Arrow file. This tool is designed to write out massive amount of data into storage device like NVME-SSD. It fetch query results from PostgreSQL database system, and write out Record Batches of Arrow format for each data size specified by the -s|--segment-size option. Thus, its memory consumption is relatively reasonable.

pg2arrow command is distributed with PG-Strom. It shall be installed on the bin directory of PostgreSQL related utilities.

$ pg2arrow --help
Usage:
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      (default creates a temporary file)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default: 256MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodbcom.

The -h or -U option specifies the connection parameters of PostgreSQL, like psql or pg_dump. The simplest usage of this command is running a SQL command specified by -c|--command option on PostgreSQL server, then write out results into the file specified by -o|--output option in Arrow format.

The example below reads all the data in table t0, then write out them into the file /tmp/t0.arrow.

$ pg2arrow -U kaigai -d postgres -c "SELECT * FROM t0" -o /tmp/t0.arrow

Although it is an option for developers, --dump <filename> prints schema definition and record-batch location and size of Arrow file in human readable form.

Advanced Usage

SSDtoGPU Direct SQL

In case when all the Arrow files mapped on the Arrow_Fdw foreign table satisfies the terms below, PG-Strom enables SSD-to-GPU Direct SQL to load columnar data.

  • Arrow files are on NVME-SSD volume.
  • NVME-SSD volume is managed by Ext4 filesystem.
  • Total size of Arrow files exceeds the pg_strom.nvme_strom_threshold configuration.

Partition configuration

Arrow_Fdw foreign tables can be used as a part of partition leafs. Usual PostgreSQL tables can be mixtured with Arrow_Fdw foreign tables. So, pay attention Arrow_Fdw foreign table does not support any writer operations. And, make boundary condition of the partition consistent to the contents of the mapped Arrow file. It is a responsibility of the database administrators.

Example of partition configuration

A typical usage scenario is processing of long-standing accumulated log-data.

Unlike transactional data, log-data is mostly write-once and will never be updated / deleted. Thus, by migration of the log-data after a lapse of certain period into Arrow_Fdw foreign table that is read-only but rapid processing, we can accelerate summarizing and analytics workloads. In addition, log-data likely have timestamp, so it is quite easy design to add partition leafs periodically, like monthly, weekly or others.

The example below defines a partitioned table that mixes a normal PostgreSQL table and Arrow_Fdw foreign tables.

The normal PostgreSQL table, is read-writable, is specified as default partition2, so DBA can migrate only past log-data into Arrow_Fdw foreign table under the database system operations.

CREATE TABLE lineorder (
    lo_orderkey numeric,
    lo_linenumber integer,
    lo_custkey numeric,
    lo_partkey integer,
    lo_suppkey numeric,
    lo_orderdate integer,
    lo_orderpriority character(15),
    lo_shippriority character(1),
    lo_quantity numeric,
    lo_extendedprice numeric,
    lo_ordertotalprice numeric,
    lo_discount numeric,
    lo_revenue numeric,
    lo_supplycost numeric,
    lo_tax numeric,
    lo_commit_date character(8),
    lo_shipmode character(10)
) PARTITION BY RANGE (lo_orderdate);

CREATE TABLE lineorder__now PARTITION OF lineorder default;

CREATE FOREIGN TABLE lineorder__1993 PARTITION OF lineorder
   FOR VALUES FROM (19930101) TO (19940101)
SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1993.arrow');

CREATE FOREIGN TABLE lineorder__1994 PARTITION OF lineorder
   FOR VALUES FROM (19940101) TO (19950101)
SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1994.arrow');

CREATE FOREIGN TABLE lineorder__1995 PARTITION OF lineorder
   FOR VALUES FROM (19950101) TO (19960101)
SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1995.arrow');

CREATE FOREIGN TABLE lineorder__1996 PARTITION OF lineorder
   FOR VALUES FROM (19960101) TO (19970101)
SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1996.arrow');

Below is the query execution plan towards the table. By the query condition lo_orderdate between 19950701 and 19960630 that touches boundary condition of the partition, the partition leaf lineorder__1993 and lineorder__1994 are pruned, so it makes a query execution plan to read other (foreign) tables only.

=# EXPLAIN
    SELECT sum(lo_extendedprice*lo_discount) as revenue
      FROM lineorder,date1
     WHERE lo_orderdate = d_datekey
       AND lo_orderdate between 19950701 and 19960630
       AND lo_discount between 1 and 3
       ABD lo_quantity < 25;

                                 QUERY PLAN
--------------------------------------------------------------------------------
 Aggregate  (cost=172088.90..172088.91 rows=1 width=32)
   ->  Hash Join  (cost=10548.86..172088.51 rows=77 width=64)
         Hash Cond: (lineorder__1995.lo_orderdate = date1.d_datekey)
         ->  Append  (cost=10444.35..171983.80 rows=77 width=67)
               ->  Custom Scan (GpuScan) on lineorder__1995  (cost=10444.35..33671.87 rows=38 width=68)
                     GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND
                                  (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND
                                  (lo_quantity < '25'::numeric))
                     referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
                     files0: /opt/tmp/lineorder_1995.arrow (size: 892.57MB)
               ->  Custom Scan (GpuScan) on lineorder__1996  (cost=10444.62..33849.21 rows=38 width=68)
                     GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND
                                  (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND
                                  (lo_quantity < '25'::numeric))
                     referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
                     files0: /opt/tmp/lineorder_1996.arrow (size: 897.87MB)
               ->  Custom Scan (GpuScan) on lineorder__now  (cost=11561.33..104462.33 rows=1 width=18)
                     GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND
                                  (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND
                                  (lo_quantity < '25'::numeric))
         ->  Hash  (cost=72.56..72.56 rows=2556 width=4)
               ->  Seq Scan on date1  (cost=0.00..72.56 rows=2556 width=4)
(16 rows)

The operation below extracts the data in 1997 from lineorder__now table, then move to a new Arrow_Fdw foreign table.

$ pg2arrow -d sample  -o /opt/tmp/lineorder_1997.arrow \
           -c "SELECT * FROM lineorder WHERE lo_orderdate between 19970101 and 19971231"

pg2arrow command extracts the data in 1997 from the lineorder table into a new Arrow file.

BEGIN;
--
-- remove rows in 1997 from the read-writable table
--
DELETE FROM lineorder WHERE lo_orderdate BETWEEN 19970101 AND 19971231;
--
-- define a new partition leaf which maps log-data in 1997
--
CREATE FOREIGN TABLE lineorder__1997 PARTITION OF lineorder
   FOR VALUES FROM (19970101) TO (19980101)
SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1997.arrow');

COMMIT;

A series of operations above delete the data in 1997 from lineorder__new that is a PostgreSQL table, then maps an Arrow file (/opt/tmp/lineorder_1997.arrow) which contains an identical contents as a foreign table lineorder__1997.


  1. For correctness, block size is configurable on build from 4KB to 32KB. 

  2. Supported at PostgreSQL v11 or later.