Apache Arrow (Columnar Store)
PostgreSQL tables internally consist of 8KB blocks1, and block contains tuples which is a data structure of all the attributes and metadata per row. It collocates date of a row closely, so it works effectively for INSERT/UPDATE-major workloads, but not suitable for summarizing or analytics of mass-data.
It is not usual to reference all the columns in a table on mass-data processing, and we tend to reference a part of columns in most cases. In this case, the storage I/O bandwidth consumed by unreferenced columns are waste, however, we have no easy way to fetch only particular columns referenced from the row-oriented data structure.
In case of column oriented data structure, in an opposite manner, it has extreme disadvantage on INSERT/UPDATE-major workloads, however, it can pull out maximum performance of storage I/O on mass-data processing workloads because it can loads only referenced columns. From the standpoint of processor efficiency also, column-oriented data structure looks like a flat array that pulls out maximum bandwidth of memory subsystem for GPU, by special memory access pattern called Coalesced Memory Access.
What is Apache Arrow?
Apache Arrow is a data format of structured data to save in columnar-form and to exchange other applications. Some applications for big-data processing support the format, and it is easy for self-developed applications to use Apache Arrow format since they provides libraries for major programming languages like C,C++ or Python.
Apache Arrow format file internally contains Schema portion to define data structure, and one or more RecordBatch to save columnar-data based on the schema definition. For data types, it supports integers, strint (variable-length), date/time types and so on. Indivisual columnar data has its internal representation according to the data types.
Data representation in Apache Arrow is not identical with the representation in PostgreSQL. For example, epoch of timestamp in Arrow is
1970-01-01 and it supports multiple precision. In contrast, epoch of timestamp in PostgreSQL is
2001-01-01 and it has microseconds accuracy.
Arrow_Fdw allows to read Apache Arrow files on PostgreSQL using foreign table mechanism. If an Arrow file contains 8 of record batches that has million items for each column data, for example, we can access 8 million rows on the Arrow files through the foreign table.
Creation of foreign tables
Usually it takes the 3 steps below to create a foreign table.
- Define a foreign-data-wrapper using
CREATE FOREIGN DATA WRAPPERcommand
- Define a foreign server using
- Define a foreign table using
CREATE FOREIGN TABLEcommand
The first 2 steps above are included in the
CREATE EXTENSION pg_strom command. All you need to run individually is
CREATE FOREIGN TABLE command last.
CREATE FOREIGN TABLE flogdata ( ts timestamp, sensor_id int, signal1 smallint, signal2 smallint, signal3 smallint, signal4 smallint, ) SERVER arrow_fdw OPTIONS (file '/path/to/logdata.arrow');
Data type of columns specified by the
CREATE FOREIGN TABLE command must be matched to schema definition of the Arrow files to be mapped.
Arrow_Fdw also supports a useful manner using
IMPORT FOREIGN SCHEMA statement. It automatically generates a foreign table definition using schema definition of the Arrow files. It specifies the foreign table name, schema name to import, and path name of the Arrow files using OPTION-clause. Schema definition of Arrow files contains data types and optional column name for each column. It declares a new foreign table using these information.
IMPORT FOREIGN SCHEMA flogdata FROM SERVER arrow_fdw INTO public OPTIONS (file '/path/to/logdata.arrow');
Foreign table options
Arrow_Fdw supports the options below. Right now, all the options are for foreign tables.
- It maps an Arrow file specified on the foreign table.
- It maps multiple Arrow files specified by comma (,) separated files list on the foreign table.
- It maps all the Arrow files in the directory specified on the foreign table.
option is given, it maps only files with the specified suffix, like.arrow` for example.
- It tells the number of workers that should be used to assist a parallel scan of this foreign table; equivalent to
parallel_workersstorage parameter at normal tables.
- It allows execution of
INSERTcommand on the foreign table. See the section of "Writable Arrow_Fdw"
Data type mapping
Arrow data types are mapped on PostgreSQL data types as follows.
- mapped to either of
int8according to the
is_signedattribute shall be ignored.
int1is an enhanced data type by PG-Strom.
- mapped to either of
float8according to the
float2is an enhanced data type by PG-Strom.
- mapped to
- mapped to
- mapped to
datedata type; to be adjusted as if it has
- mapped to
timedata type; to be adjusted as if it has
- mapped to
timestampdata type; to be adjusted as if it has
- mapped to
- mapped to 1-dimensional array of the element data type.
- mapped to compatible composite data type; that shall be defined preliminary.
- mapped to
char(n)data type according to the
pg_type=TYPENAMEis configured, PG-Strom may assign the configured data type. Right now,
- Right now, PG-Strom cannot map these Arrow data types onto any of PostgreSQL data types.
How to read EXPLAIN
EXPLAIN command show us information about Arrow files reading.
The example below is an output of query execution plan that includes flineorder foreign table that mapps an Arrow file of 309GB.
=# EXPLAIN SELECT sum(lo_extendedprice*lo_discount) as revenue FROM flineorder,date1 WHERE lo_orderdate = d_datekey AND d_year = 1993 AND lo_discount between 1 and 3 AND lo_quantity < 25; QUERY PLAN ----------------------------------------------------------------------------------------------------- Aggregate (cost=12632759.02..12632759.03 rows=1 width=32) -> Custom Scan (GpuPreAgg) (cost=12632754.43..12632757.49 rows=204 width=8) Reduction: NoGroup Combined GpuJoin: enabled GPU Preference: GPU0 (Tesla V100-PCIE-16GB) -> Custom Scan (GpuJoin) on flineorder (cost=9952.15..12638126.98 rows=572635 width=12) Outer Scan: flineorder (cost=9877.70..12649677.69 rows=4010017 width=16) Outer Scan Filter: ((lo_discount >= 1) AND (lo_discount <= 3) AND (lo_quantity < 25)) Depth 1: GpuHashJoin (nrows 4010017...572635) HashKeys: flineorder.lo_orderdate JoinQuals: (flineorder.lo_orderdate = date1.d_datekey) KDS-Hash (size: 66.06KB) GPU Preference: GPU0 (Tesla V100-PCIE-16GB) NVMe-Strom: enabled referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount files0: /opt/nvme/lineorder_s401.arrow (size: 309.23GB) -> Seq Scan on date1 (cost=0.00..78.95 rows=365 width=4) Filter: (d_year = 1993) (18 rows)
According to the
EXPLAIN output, we can see Custom Scan (GpuJoin) scans
flineorder foreign table.
file0 item shows the filename (
/opt/nvme/lineorder_s401.arrow) on behalf of the foreign table and its size. If multiple files are mapped, any files are individually shown, like
file2, ... The
referenced item shows the list of referenced columns. We can see this query touches
GPU Preference: GPU0 (Tesla V100-PCIE-16GB) and
NVMe-Strom: enabled shows us the scan on
flineorder uses SSD-to-GPU Direct SQL mechanism.
VERBOSE option outputs more detailed information.
=# EXPLAIN VERBOSE SELECT sum(lo_extendedprice*lo_discount) as revenue FROM flineorder,date1 WHERE lo_orderdate = d_datekey AND d_year = 1993 AND lo_discount between 1 and 3 AND lo_quantity < 25; QUERY PLAN -------------------------------------------------------------------------------- Aggregate (cost=12632759.02..12632759.03 rows=1 width=32) Output: sum((pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount)))) -> Custom Scan (GpuPreAgg) (cost=12632754.43..12632757.49 rows=204 width=8) Output: (pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount))) Reduction: NoGroup GPU Projection: flineorder.lo_extendedprice, flineorder.lo_discount, pgstrom.psum((flineorder.lo_extendedprice * flineorder.lo_discount)) Combined GpuJoin: enabled GPU Preference: GPU0 (Tesla V100-PCIE-16GB) -> Custom Scan (GpuJoin) on public.flineorder (cost=9952.15..12638126.98 rows=572635 width=12) Output: flineorder.lo_extendedprice, flineorder.lo_discount GPU Projection: flineorder.lo_extendedprice::bigint, flineorder.lo_discount::integer Outer Scan: public.flineorder (cost=9877.70..12649677.69 rows=4010017 width=16) Outer Scan Filter: ((flineorder.lo_discount >= 1) AND (flineorder.lo_discount <= 3) AND (flineorder.lo_quantity < 25)) Depth 1: GpuHashJoin (nrows 4010017...572635) HashKeys: flineorder.lo_orderdate JoinQuals: (flineorder.lo_orderdate = date1.d_datekey) KDS-Hash (size: 66.06KB) GPU Preference: GPU0 (Tesla V100-PCIE-16GB) NVMe-Strom: enabled referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount files0: /opt/nvme/lineorder_s401.arrow (size: 309.23GB) lo_orderpriority: 33.61GB lo_extendedprice: 17.93GB lo_ordertotalprice: 17.93GB lo_revenue: 17.93GB -> Seq Scan on public.date1 (cost=0.00..78.95 rows=365 width=4) Output: date1.d_datekey Filter: (date1.d_year = 1993) (28 rows)
The verbose output additionally displays amount of column-data to be loaded on reference of columns. The load of
lo_discount columns needs to read 87.4GB in total. It is 28.3% towards the filesize (309.2GB).
How to make Arrow files
This section introduces the way to transform dataset already stored in PostgreSQL database system into Apache Arrow file.
A pair of PyArrow module, developed by Arrow developers community, and Pandas data frame can dump PostgreSQL database into an Arrow file.
The example below reads all the data in table
t0, then write out them into
import pyarrow as pa import pandas as pd X = pd.read_sql(sql="SELECT * FROM t0", con="postgresql://localhost/postgres") Y = pa.Table.from_pandas(X) f = pa.RecordBatchFileWriter('/tmp/t0.arrow', Y.schema) f.write_table(Y,1000000) # RecordBatch for each million rows f.close()
Please note that the above operation once keeps query result of the SQL on memory, so should pay attention on memory consumption if you want to transfer massive rows at once.
On the other hand,
pg2arrow command, developed by PG-Strom Development Team, enables us to write out query result into Arrow file. This tool is designed to write out massive amount of data into storage device like NVME-SSD. It fetch query results from PostgreSQL database system, and write out Record Batches of Arrow format for each data size specified by the
-s|--segment-size option. Thus, its memory consumption is relatively reasonable.
pg2arrow command is distributed with PG-Strom. It shall be installed on the
bin directory of PostgreSQL related utilities.
$ ./pg2arrow --help Usage: pg2arrow [OPTION]... [DBNAME [USERNAME]] General options: -d, --dbname=DBNAME database name to connect to -c, --command=COMMAND SQL command to run -f, --file=FILENAME SQL command from file (-c and -f are exclusive, either of them must be specified) -o, --output=FILENAME result file in Apache Arrow format --append=FILENAME result file to be appended --output and --append are exclusive to use at the same time. If neither of them are specified, it creates a temporary file.) Arrow format options: -s, --segment-size=SIZE size of record batch for each (default: 256MB) Connection options: -h, --host=HOSTNAME database server host -p, --port=PORT database server port -U, --username=USERNAME database user name -w, --no-password never prompt for password -W, --password force password prompt Other options: --dump=FILENAME dump information of arrow file --progress shows progress of the job --set=NAME:VALUE GUC option to set before SQL execution Report bugs to <pgstrom@heterodbcom>.
-U option specifies the connection parameters of PostgreSQL, like
pg_dump. The simplest usage of this command is running a SQL command specified by
-c|--command option on PostgreSQL server, then write out results into the file specified by
-o|--output option in Arrow format.
--append option is available, instead of
-o|--output option. It means appending data to existing Apache Arrow file. In this case, the target Apache Arrow file must have fully identical schema definition towards the specified SQL command.
The example below reads all the data in table
t0, then write out them into the file
$ pg2arrow -U kaigai -d postgres -c "SELECT * FROM t0" -o /tmp/t0.arrow
Although it is an option for developers,
--dump <filename> prints schema definition and record-batch location and size of Arrow file in human readable form.
--progress option enables to show progress of the task. It is useful when a huge table is transformed to Apache Arrow format.
Arrow_Fdw foreign tables that have
writable option allow to append data using
INSERT command, and to erase entire contents of the foreign table (that is Apache Arrow file on behalf of the foreign table) using
pgstrom.arrow_fdw_truncate() function. On the other hand,
DELETE commands are not supported.
In case of
writable option was enabled on Arrow_Fdw foreign tables, it accepts only one pathname specified by the
files option. You cannot specify multiple pathnames, and exclusive to the
It does not require that the Apache Arrow file actually exists on the specified path at the foreign table declaration time, on the other hands, PostgreSQL server needs to have permission to create a new file on the path.
The diagram above introduces the internal layout of Apache Arrow files. In addition to the metadata like header or footer, it can have multiple DictionayBatch (dictionary data for dictionary compression) and RecordBatch (user data) chunks.
RecordBatch is a unit of columnar data that have a particular number of rows. For example, on the Apache Arrow file that have
z fields, when RecordBatch contains 2,500 rows, it means 2,500 items of
z fields are located at the RecordBatch in columnar format. Also, when RecordBatch contains 4,000 rows, it also means 4,000 items of
z fields are located at the RecordBatch in columnar format. Therefore, appending user data to Apache Arrow file is addition of a new RecordBatch.
On Apache Arrow files, the file offset information towards DictionaryBatch and RecordBatch are internally held by the Footer chunk, which is next to the last RecordBatch. So, we can overwrite the original Footer chunk by the (k+1)th RecordBatch when
INSERT command appends new data, then reconstruct a new Footer.
Due to the data format, the newly appended RecordBatch has rows processed by the single
INSERT command. So, it makes the file usage worst efficiency if an
INSERT command added only a few rows. We recommend to insert as many rows as possible by a single
INSERT command, when you add data to Arrow_Fdw foreign table.
Write operations to Arrow_Fdw follows transaction control of PostgreSQL. No concurrent transactions can reference the rows newly appended until its commit, and user can rollback the pending written data, which is uncommited.
Due to the implementation reason, writes to Arrow_Fdw foreign table acquires
UPDATE on regular PostgreSQL tables acquire
RowExclusiveLock. It means only 1 transaction can write to a particular Arrow_Fdw foreign table concurrently.
It is not a problem usually because the workloads Arrow_Fdw expects are mostly bulk data loading. When you design many concurrent transaction try to write Arrow_Fdw foreign table, we recomment to use a temporary table for many small writes.
postgres=# CREATE FOREIGN TABLE ftest (x int) SERVER arrow_fdw OPTIONS (file '/dev/shm/ftest.arrow', writable 'true'); CREATE FOREIGN TABLE postgres=# INSERT INTO ftest (SELECT * FROM generate_series(1,100)); INSERT 0 100 postgres=# BEGIN; BEGIN postgres=# INSERT INTO ftest (SELECT * FROM generate_series(1,50)); INSERT 0 50 postgres=# SELECT count(*) FROM ftest; count ------- 150 (1 row) -- By the transaction rollback, the above INSERT shall be reverted. postgres=# ROLLBACK; ROLLBACK postgres=# SELECT count(*) FROM ftest; count ------- 100 (1 row)
Right now, PostgreSQL does not support
TRUNCATE statement on foreign tables.
As an alternative, Arrow_Fdw provide
pgstrom.arrow_fdw_truncate(regclass) function that eliminates all the contents of Apache Arrow file on behalf of the foreign table.
postgres=# SELECT count(*) FROM ftest; count ------- 100 (1 row) postgres=# SELECT pgstrom.arrow_fdw_truncate('ftest'); arrow_fdw_truncate -------------------- (1 row) postgres=# SELECT count(*) FROM ftest; count ------- 0 (1 row)
SSDtoGPU Direct SQL
In case when all the Arrow files mapped on the Arrow_Fdw foreign table satisfies the terms below, PG-Strom enables SSD-to-GPU Direct SQL to load columnar data.
- Arrow files are on NVME-SSD volume.
- NVME-SSD volume is managed by Ext4 filesystem.
- Total size of Arrow files exceeds the
Arrow_Fdw foreign tables can be used as a part of partition leafs. Usual PostgreSQL tables can be mixtured with Arrow_Fdw foreign tables. So, pay attention Arrow_Fdw foreign table does not support any writer operations. And, make boundary condition of the partition consistent to the contents of the mapped Arrow file. It is a responsibility of the database administrators.
A typical usage scenario is processing of long-standing accumulated log-data.
Unlike transactional data, log-data is mostly write-once and will never be updated / deleted. Thus, by migration of the log-data after a lapse of certain period into Arrow_Fdw foreign table that is read-only but rapid processing, we can accelerate summarizing and analytics workloads. In addition, log-data likely have timestamp, so it is quite easy design to add partition leafs periodically, like monthly, weekly or others.
The example below defines a partitioned table that mixes a normal PostgreSQL table and Arrow_Fdw foreign tables.
The normal PostgreSQL table, is read-writable, is specified as default partition2, so DBA can migrate only past log-data into Arrow_Fdw foreign table under the database system operations.
CREATE TABLE lineorder ( lo_orderkey numeric, lo_linenumber integer, lo_custkey numeric, lo_partkey integer, lo_suppkey numeric, lo_orderdate integer, lo_orderpriority character(15), lo_shippriority character(1), lo_quantity numeric, lo_extendedprice numeric, lo_ordertotalprice numeric, lo_discount numeric, lo_revenue numeric, lo_supplycost numeric, lo_tax numeric, lo_commit_date character(8), lo_shipmode character(10) ) PARTITION BY RANGE (lo_orderdate); CREATE TABLE lineorder__now PARTITION OF lineorder default; CREATE FOREIGN TABLE lineorder__1993 PARTITION OF lineorder FOR VALUES FROM (19930101) TO (19940101) SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1993.arrow'); CREATE FOREIGN TABLE lineorder__1994 PARTITION OF lineorder FOR VALUES FROM (19940101) TO (19950101) SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1994.arrow'); CREATE FOREIGN TABLE lineorder__1995 PARTITION OF lineorder FOR VALUES FROM (19950101) TO (19960101) SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1995.arrow'); CREATE FOREIGN TABLE lineorder__1996 PARTITION OF lineorder FOR VALUES FROM (19960101) TO (19970101) SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1996.arrow');
Below is the query execution plan towards the table. By the query condition
lo_orderdate between 19950701 and 19960630 that touches boundary condition of the partition, the partition leaf
lineorder__1994 are pruned, so it makes a query execution plan to read other (foreign) tables only.
=# EXPLAIN SELECT sum(lo_extendedprice*lo_discount) as revenue FROM lineorder,date1 WHERE lo_orderdate = d_datekey AND lo_orderdate between 19950701 and 19960630 AND lo_discount between 1 and 3 ABD lo_quantity < 25; QUERY PLAN -------------------------------------------------------------------------------- Aggregate (cost=172088.90..172088.91 rows=1 width=32) -> Hash Join (cost=10548.86..172088.51 rows=77 width=64) Hash Cond: (lineorder__1995.lo_orderdate = date1.d_datekey) -> Append (cost=10444.35..171983.80 rows=77 width=67) -> Custom Scan (GpuScan) on lineorder__1995 (cost=10444.35..33671.87 rows=38 width=68) GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric)) referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount files0: /opt/tmp/lineorder_1995.arrow (size: 892.57MB) -> Custom Scan (GpuScan) on lineorder__1996 (cost=10444.62..33849.21 rows=38 width=68) GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric)) referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount files0: /opt/tmp/lineorder_1996.arrow (size: 897.87MB) -> Custom Scan (GpuScan) on lineorder__now (cost=11561.33..104462.33 rows=1 width=18) GPU Filter: ((lo_orderdate >= 19950701) AND (lo_orderdate <= 19960630) AND (lo_discount >= '1'::numeric) AND (lo_discount <= '3'::numeric) AND (lo_quantity < '25'::numeric)) -> Hash (cost=72.56..72.56 rows=2556 width=4) -> Seq Scan on date1 (cost=0.00..72.56 rows=2556 width=4) (16 rows)
The operation below extracts the data in
lineorder__now table, then move to a new Arrow_Fdw foreign table.
$ pg2arrow -d sample -o /opt/tmp/lineorder_1997.arrow \ -c "SELECT * FROM lineorder WHERE lo_orderdate between 19970101 and 19971231"
pg2arrow command extracts the data in 1997 from the
lineorder table into a new Arrow file.
BEGIN; -- -- remove rows in 1997 from the read-writable table -- DELETE FROM lineorder WHERE lo_orderdate BETWEEN 19970101 AND 19971231; -- -- define a new partition leaf which maps log-data in 1997 -- CREATE FOREIGN TABLE lineorder__1997 PARTITION OF lineorder FOR VALUES FROM (19970101) TO (19980101) SERVER arrow_fdw OPTIONS (file '/opt/tmp/lineorder_1997.arrow'); COMMIT;
A series of operations above delete the data in 1997 from
lineorder__new that is a PostgreSQL table, then maps an Arrow file (
/opt/tmp/lineorder_1997.arrow) which contains an identical contents as a foreign table