pg_clickhouse: ClickHouse 官方的 PostgreSQL 扩展

John Doe 十二月 18, 2025

pg_clickhouse 是一个 PostgreSQL 扩展,它可以直接在 PostgreSQL 中对 ClickHouse 执行分析查询,而无需重写任何 SQL。

image

它支持 PostgreSQL 13 及更高版本,以及 ClickHouse v23 及更高版本。下面让我们来测试一下:

创建数据表

首先,若尚未创建 ClickHouse 数据库,请先进行创建。登录 ClickHouse,使用纽约市出租车数据集创建一个简单数据库:

CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;

导入数据集

执行以下命令导入数据:

INSERT INTO taxi.trips SELECT * FROM s3(
    'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
    'TabSeparatedWithNames',
    "trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16"
) SETTINGS input_format_try_infer_datetimes = 0

验证查询功能后退出客户端:

SELECT count() FROM taxi.trips;
quit

连接 pg_clickhouse

可通过 PGXN 或 GitHub 编译安装 pg_clickhouse,连接 Postgres 并创建 pg_clickhouse 扩展:

CREATE EXTENSION pg_clickhouse;

使用 ClickHouse 数据库的主机名、端口和数据库名创建外部服务器:

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');

此处选择使用二进制驱动(基于 ClickHouse 二进制协议),也可使用 “http” 驱动(基于 HTTP 接口)。

接下来,将 PostgreSQL 用户映射到 ClickHouse 用户。最简单的方式是将当前 PostgreSQL 用户映射到外部服务器的远程用户:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'default');

也可指定password参数设置密码。

导入 ClickHouse 数据库中的数据表到 Postgres 模式:

CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;

数据表导入完成后,查询数据表:

SELECT count(*) FROM taxi.trips;
  count  
---------
 1999657
(1 row)

请注意,查询执行速度极快,pg_clickhouse 会将整个查询(包括COUNT()聚合函数)下推至 ClickHouse 执行,仅向 Postgres 返回单行结果。使用 EXPLAIN 命令查看执行计划:

EXPLAIN select count(*) from taxi.trips;
                   QUERY PLAN                    
-------------------------------------------------
 Foreign Scan  (cost=1.00..-0.90 rows=1 width=8)
   Relations: Aggregate on (trips)
(2 rows)

执行计划根部显示 “Foreign Scan”,表明整个查询已下推至 ClickHouse。

数据分析

运行以下查询示例进行数据分析,也可自定义 SQL 查询:

计算平均小费金额:

taxi=# \timing
Timing is on.
taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
 round 
-------
  1.68
(1 row)

Time: 9.438 ms

按乘客人数统计平均费用:

taxi=# SELECT
        passenger_count,
        avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
    FROM taxi.trips
    GROUP BY passenger_count;
 passenger_count | average_total_amount 
-----------------+----------------------
               0 |                22.68
               1 |                15.96
               2 |                17.14
               3 |                16.75
               4 |                17.32
               5 |                16.34
               6 |                16.03
               7 |                59.79
               8 |                36.40
               9 |                 9.79
(10 rows)

Time: 27.266 ms

按社区统计每日接单量:

taxi=# SELECT
    pickup_date,
    pickup_ntaname,
    SUM(1) AS number_of_trips
FROM taxi.trips
GROUP BY pickup_date, pickup_ntaname
ORDER BY pickup_date ASC LIMIT 10;
 pickup_date |         pickup_ntaname         | number_of_trips 
-------------+--------------------------------+-----------------
 2015-07-01  | Williamsburg                   |               1
 2015-07-01  | park-cemetery-etc-Queens       |               6
 2015-07-01  | Maspeth                        |               1
 2015-07-01  | Stuyvesant Town-Cooper Village |              44
 2015-07-01  | Rego Park                      |               1
 2015-07-01  | Greenpoint                     |               7
 2015-07-01  | Highbridge                     |               1
 2015-07-01  | Briarwood-Jamaica Hills        |               3
 2015-07-01  | Airport                        |             550
 2015-07-01  | East Harlem North              |              32
(10 rows)

Time: 30.978 ms

计算每次行程的分钟数,并按行程时长分组统计:

taxi=# SELECT
    avg(tip_amount) AS avg_tip,
    avg(fare_amount) AS avg_fare,
    avg(passenger_count) AS avg_passenger,
    count(*) AS count,
    round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
FROM taxi.trips
WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
GROUP BY trip_minutes
ORDER BY trip_minutes DESC
LIMIT 5;
      avg_tip      |     avg_fare     |  avg_passenger   | count | trip_minutes 
-------------------+------------------+------------------+-------+--------------
              1.96 |                8 |                1 |     1 |        27512
                 0 |               12 |                2 |     1 |        27500
 0.562727272727273 | 17.4545454545455 | 2.45454545454545 |    11 |         1440
 0.716564885496183 | 14.2786259541985 | 1.94656488549618 |   131 |         1439
  1.00945205479452 | 12.8787671232877 | 1.98630136986301 |   146 |         1438
(5 rows)

Time: 45.477 ms

按社区和时段统计接单量:

taxi=# SELECT
    pickup_ntaname,
    date_part('hour', pickup_datetime) as pickup_hour,
    SUM(1) AS pickups
FROM taxi.trips
WHERE pickup_ntaname != ''
GROUP BY pickup_ntaname, pickup_hour
ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
LIMIT 5;
 pickup_ntaname | pickup_hour | pickups 
----------------+-------------+---------
 Airport        |           0 |    3509
 Airport        |           1 |    1184
 Airport        |           2 |     401
 Airport        |           3 |     152
 Airport        |           4 |     213
(5 rows)

Time: 36.895 ms

参考

pg_clickhouse:https://github.com/ClickHouse/pg_clickhouse