フューチャー技術ブログ

PostgreSQLのPub/Sub機能とJavaのクライアント実装

本記事は「珠玉のアドベントカレンダー記事をリバイバル公開します」企画のために、以前Qiitaに投稿した記事を改訂したものです。

はじめに

Pub/Sub型のメッセージングアーキテクチャを採用するにあたっては、kafkaなどのブローカーミドルウェアや、Amazon SNS、Google Cloud Pub/Subなどのマネージドサービスを利用するケースが多いかと思います。ところでPostgreSQLでも実はPub/Subができます。

すでに業務でPostgreSQLを使っていれば、新たにPub/Subブローカーを構築しなくても、疎結合なシステム間通信を簡易的に実現できます。

本記事ではこの機能の紹介と、Pub/SubクライアントをJavaで実装する場合の選択肢、考慮点を示しています。
※実行環境はPostgreSQL 16.2とJava 21です
※データベースの文字コードはUTF-8としています

NOTIFY/LISTEN

PostgreSQLのPub/Sub機能に関連するクエリは次の3つです。

  • NOTIFY(Publishを実行)
    • 構文:NOTIFY channel [ , payload ]
    • 同じ機能の関数としてpg_notifyも用意されている
  • LISTEN(Subscribeを開始)
    • 構文:LISTEN channel
  • UNLISTEN(Subscribeを終了)
    • 構文:UNLISTEN { channel | * }

基本的な使い方と挙動を見ていきます。

チャネル"foo"のSubscribeを開始
LISTEN foo;
チャネル"foo"に"hello"というデータをPublish
NOTIFY foo, 'hello';
-- または
SELECT pg_notify('foo', 'hello');

-- "foo"をSubscribe済みのセッションには次の通知が届く
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.
payload無しの通知も可能
NOTIFY foo;

-- Asynchronous notification "foo" received from server process with PID 14728.
チャネル"foo"のSuscribeを終了する
UNLISTEN foo;

とてもシンプルですね。
続いて、本機能の主な仕様を挙げつつ利用時の考慮点を示します。
詳細は公式ドキュメントをご覧いただければと思います。

チャネル

  • チャネルはPub/Sub通信する際のキーとなる任意の文字列です。LISTEN対象のチャネルとNOTIFYを実行するチャネルが異なるとデータのやり取りができません。

  • 1つのセッションで複数のチャネルをLISTENできます。

  • チャネルに指定できる文字は、ASCIIの場合英数字とアンダースコア(_)が使用できます。大文字/小文字は区別されません。なお、マルチバイト文字も使用できることを確認しています。

    NOTIFY こんにちは, '世界';

    -- Asynchronous notification "こんにちは" with payload "世界" received from server process with PID 14728.
  • 63バイトを超えるチャネルは登録できません。超えた分は切り捨てて処理されます。この制限はテーブルなど他のデータベースオブジェクトとも共通しています。

スコープ

  • Pub/Subを行うDBセッションは、同一データベースに接続し、かつ同じチャネルを通知対象としなければいけません。
  • データベースが同じであれば、スキーマが異なっていても通知できます。!
68747470733a2f2.png

ペイロードのデータ型・サイズ

  • ペイロードに乗せられるデータはテキストのみで、バイナリは送受信できません。

  • バイナリデータを乗せる場合はencode関数でテキスト形式に変換したり、呼出元アプリでJSON文字列等にシリアライズしてあげる必要があります。

  • ペイロードのサイズ上限は8000バイト未満で、これを超えると次のエラーが返ります。

    ERROR:  payload string too long
    SQL state: 22023

トランザクション

  • トランザクション内でNOTIFYしたデータは、COMMITしたタイミングで、LISTENしたセッションに通知されます。ROLLBACKすると通知されません。
  • トランザクション内でNOTIFYしたデータの中で、ペイロードが同一のものはまとめられます。送信順序は保証されます。
まとめられた通知
BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;

-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.

未処理メッセージの蓄積サイズ

  • DBインスタンスには、トランザクションが未完了なメッセージをメモリ上に溜めておくことが出来るNotificationキューを持っています。標準インストールの場合サイズは8GBで、使用量が半分になると警告ログが出力されます。
  • トランザクションが終了するとキューデータがクリーンアップされます。ペイロードを目一杯使った場合およそ100万件で上限に掛かるため、適当な件数単位でCOMMITしましょう。
  • Notificationキューの使用率はpg_notification_queue_usage関数で確認できます(0から1までの小数で表現)。

JavaによるPub/Subクライアント実装

これまで記載したPub/Sub通信をJavaで実装するときのパターンを3種類紹介します。

PostgreSQL JDBCドライバ

PostgreSQL本家のJDBCドライバを使った実装例です(本家の実装例はこちら)。
Mavenを使う場合は以下の依存関係を追加します。

pom.xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
// 事前にLISTEN用コネクションを作成しておく
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);

/**
* 通知の受信を開始します。
*
* @param channel チャネル
*/
private void startListen(final String channel) {
try {
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
while (true) {
var notifications = pgconn.getNotifications(10 * 1000);
if (this.terminated) {
return;
}
if (notifications == null) {
continue;
}
for (var n : notifications) {
LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
}
}
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}

/**
* PostgreSQLサーバに通知を行います。
*
* @param channel チャネル
* @param payload メッセージペイロード
*/
private void notify(final String channel, final String payload) {
try {
var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
try (conn; pstmt) {
pstmt.setString(1, channel);
pstmt.setString(2, payload);
pstmt.execute();
}
LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
} catch (SQLException e) {
LOG.error("exception thrown", e);
}
}
  • PgConnection#getNotifications(int timeoutMillis)を使うと、通知が来るまで指定の時間ここでブロックするため、ループで囲えばロングポーリング的なロジックになります。
  • なおNOTIFYクエリでペイロードのパラメータバインドを試みるとorg.postgresql.util.PSQLExceptionが出てしまうので代わりにpg_notifyを実行しています。1

PGJDBC-NG

  • PGJDBC-NGはJDBC4.2に準拠し、PostgreSQLの機能をより高度に使うことをめざして開発されているOSSです。
pom.xml
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.9</version>
</dependency>
// 事前にLISTEN用コネクションを作成しておく
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);

/**
* 通知の受信を開始します。
*
* @param channel チャネル
*/
private void startListen(final String channel) {
try {
this.listenerConn.addNotificationListener(new PGNotificationListener() {
@Override
public void notification(final int pid, final String channel, final String payload) {
LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
}
});
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}

// notify()は、PostgreSQL JDBCドライバと同様

ご覧の通り、こちらは通知受信時の動作をイベントリスナーの形で実装できます。
チャネルを指定してリスナーを登録することも可能です。

R2DBC

R2DBCは、リアクティブプログラミングの観点から新たに開発されたJDBCドライバです。
Reactive Streamsに完全準拠し、I/Oは完全にノンブロッキングであると謳っています。

pom.xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
// 事前に送受信用のコネクションを設定しておく
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;

var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("...")
.port(5432)
.username("...")
.password("...")
.database("...")
.build());
this.receiver = connFactory.create();
this.sender = connFactory.create();

/**
* 通知の受信を開始します。
*
* @param channel チャネル
*/
private void startListen(final String channel) {
this.receiver.map(pgconn -> {
return pgconn.createStatement("LISTEN " + channel)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.thenMany(pgconn.getNotifications())
.doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
.doOnSubscribe(s -> LOG.info("listen start"))
.subscribe();
}).subscribe();
}

/**
* PostgreSQLサーバに通知を行います。
*
* @param channel チャネル
* @param payload メッセージペイロード
*/
private void notify(final String channel, final String payload) {
this.sender.map(pgconn -> {
return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then()
.doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
.subscribe();
}).subscribe();
}

R2DBCを使う際は、依存するProject ReactorのAPIを全面的に使うことになります。

今回は簡単な説明にとどめますが、クエリを実行、結果をハンドリングし、指定のタイミングで動く付帯的なアクションを設定する、という一連のフローを構築して、最後にこのフローが動き出すようにsubscribe()を呼び出しています。doOnNext()で通知が届いたときのアクションを、doOnSubscribe()でsubscribeしたとき(クエリを実行するタイミング)のアクションを設定しており、ここでは単純にログ出力しています。
JavaのStream APIと似たスタイルで非同期・ストリーム処理を作る感じで、私も初見は面食らったのですが、こちらのページがとても勉強になりました。

おわりに

PostgreSQLのNOTIFY/LISTENはリリース9.0で、待ち状態イベントの格納先が従来のシステムテーブルからメモリキューに代わり、通知と一緒にペイロードを送信できるようになったことで、おおよそ現在の形になりました。近年もリリース13.0で性能向上を遂げており、地味な機能ながら進化を続けているようです。

機能自体は古くから搭載されています2がQiitaではこれまで記事化されていないため、社内の技術検証で得た情報整理も兼ねて記事化してみました。


  1. 1.公式ドキュメントにはNOTIFYのペイロードにはリテラル文字列を設定しなければならず、一方でpg_notifyには不定のチャネル、ペイロードに対応すると謳われているので、NOTIFYはパラメータバインドには対応していない模様です。
  2. 2.NOTIFY/LISTENのリリースノートの初出は1995年7月のリリース0.03のバグフィックスなので、本当に最初期から搭載された機能だとわかります。