本記事は「珠玉のアドベントカレンダー記事をリバイバル公開します」企画のために、以前Qiitaに投稿した記事を改訂したものです。
はじめに
Pub/Sub型のメッセージングアーキテクチャを採用するにあたっては、kafkaなどのブローカーミドルウェアや、Amazon SNS、Google Cloud Pub/Subなどのマネージドサービスを利用するケースが多いかと思います。ところでPostgreSQLでも実はPub/Subができます。
すでに業務でPostgreSQLを使っていれば、新たにPub/Subブローカーを構築しなくても、疎結合なシステム間通信を簡易的に実現できます。
本記事ではこの機能の紹介と、Pub/SubクライアントをJavaで実装する場合の選択肢、考慮点を示しています。
※実行環境はPostgreSQL 16.2とJava 21です
※データベースの文字コードはUTF-8としています
NOTIFY/LISTEN
PostgreSQLのPub/Sub機能に関連するクエリは次の3つです。
- NOTIFY(Publishを実行)
- 構文:
NOTIFY channel [ , payload ]
- 同じ機能の関数として
pg_notify
も用意されている
- 構文:
- LISTEN(Subscribeを開始)
- 構文:
LISTEN channel
- 構文:
- UNLISTEN(Subscribeを終了)
- 構文:
UNLISTEN { channel | * }
- 構文:
基本的な使い方と挙動を見ていきます。
LISTEN foo; |
NOTIFY foo, 'hello'; |
NOTIFY foo; |
UNLISTEN foo; |
とてもシンプルですね。
続いて、本機能の主な仕様を挙げつつ利用時の考慮点を示します。
詳細は公式ドキュメントをご覧いただければと思います。
チャネル
チャネルはPub/Sub通信する際のキーとなる任意の文字列です。LISTEN対象のチャネルとNOTIFYを実行するチャネルが異なるとデータのやり取りができません。
1つのセッションで複数のチャネルをLISTENできます。
チャネルに指定できる文字は、ASCIIの場合英数字とアンダースコア(_)が使用できます。大文字/小文字は区別されません。なお、マルチバイト文字も使用できることを確認しています。
NOTIFY こんにちは, '世界';
-- Asynchronous notification "こんにちは" with payload "世界" received from server process with PID 14728.63バイトを超えるチャネルは登録できません。超えた分は切り捨てて処理されます。この制限はテーブルなど他のデータベースオブジェクトとも共通しています。
スコープ
- Pub/Subを行うDBセッションは、同一データベースに接続し、かつ同じチャネルを通知対象としなければいけません。
- データベースが同じであれば、スキーマが異なっていても通知できます。!
ペイロードのデータ型・サイズ
ペイロードに乗せられるデータはテキストのみで、バイナリは送受信できません。
バイナリデータを乗せる場合は
encode
関数でテキスト形式に変換したり、呼出元アプリでJSON文字列等にシリアライズしてあげる必要があります。ペイロードのサイズ上限は8000バイト未満で、これを超えると次のエラーが返ります。
ERROR: payload string too long
SQL state: 22023
トランザクション
- トランザクション内でNOTIFYしたデータは、COMMITしたタイミングで、LISTENしたセッションに通知されます。ROLLBACKすると通知されません。
- トランザクション内でNOTIFYしたデータの中で、ペイロードが同一のものはまとめられます。送信順序は保証されます。
BEGIN; |
未処理メッセージの蓄積サイズ
- DBインスタンスには、トランザクションが未完了なメッセージをメモリ上に溜めておくことが出来るNotificationキューを持っています。標準インストールの場合サイズは8GBで、使用量が半分になると警告ログが出力されます。
- トランザクションが終了するとキューデータがクリーンアップされます。ペイロードを目一杯使った場合およそ100万件で上限に掛かるため、適当な件数単位でCOMMITしましょう。
- Notificationキューの使用率は
pg_notification_queue_usage
関数で確認できます(0から1までの小数で表現)。
JavaによるPub/Subクライアント実装
これまで記載したPub/Sub通信をJavaで実装するときのパターンを3種類紹介します。
PostgreSQL JDBCドライバ
PostgreSQL本家のJDBCドライバを使った実装例です(本家の実装例はこちら)。
Mavenを使う場合は以下の依存関係を追加します。
<dependency> |
// 事前にLISTEN用コネクションを作成しておく |
PgConnection#getNotifications(int timeoutMillis)
を使うと、通知が来るまで指定の時間ここでブロックするため、ループで囲えばロングポーリング的なロジックになります。- なお
NOTIFY
クエリでペイロードのパラメータバインドを試みるとorg.postgresql.util.PSQLException
が出てしまうので代わりにpg_notify
を実行しています。1
PGJDBC-NG
- PGJDBC-NGはJDBC4.2に準拠し、PostgreSQLの機能をより高度に使うことをめざして開発されているOSSです。
<dependency> |
// 事前にLISTEN用コネクションを作成しておく |
ご覧の通り、こちらは通知受信時の動作をイベントリスナーの形で実装できます。
チャネルを指定してリスナーを登録することも可能です。
R2DBC
R2DBCは、リアクティブプログラミングの観点から新たに開発されたJDBCドライバです。
Reactive Streamsに完全準拠し、I/Oは完全にノンブロッキングであると謳っています。
<dependency> |
// 事前に送受信用のコネクションを設定しておく |
R2DBCを使う際は、依存するProject ReactorのAPIを全面的に使うことになります。
今回は簡単な説明にとどめますが、クエリを実行、結果をハンドリングし、指定のタイミングで動く付帯的なアクションを設定する、という一連のフローを構築して、最後にこのフローが動き出すようにsubscribe()
を呼び出しています。doOnNext()
で通知が届いたときのアクションを、doOnSubscribe()
でsubscribeしたとき(クエリを実行するタイミング)のアクションを設定しており、ここでは単純にログ出力しています。
JavaのStream APIと似たスタイルで非同期・ストリーム処理を作る感じで、私も初見は面食らったのですが、こちらのページがとても勉強になりました。
おわりに
PostgreSQLのNOTIFY/LISTENはリリース9.0で、待ち状態イベントの格納先が従来のシステムテーブルからメモリキューに代わり、通知と一緒にペイロードを送信できるようになったことで、おおよそ現在の形になりました。近年もリリース13.0で性能向上を遂げており、地味な機能ながら進化を続けているようです。
機能自体は古くから搭載されています2がQiitaではこれまで記事化されていないため、社内の技術検証で得た情報整理も兼ねて記事化してみました。
- 1.公式ドキュメントにはNOTIFYのペイロードにはリテラル文字列を設定しなければならず、一方でpg_notifyには不定のチャネル、ペイロードに対応すると謳われているので、NOTIFYはパラメータバインドには対応していない模様です。 ↩
- 2.NOTIFY/LISTENのリリースノートの初出は1995年7月のリリース0.03のバグフィックスなので、本当に最初期から搭載された機能だとわかります。 ↩