はじめに
TIG DXユニット真野です。
GCP連載2021の1日目です。2020年のGCP連載でもCloud Loggingネタでトップバッター、2021年も先陣を切ることができて光栄です。
本記事では2021年3月に発表されたCloud PubSubのメッセージスキーマを試します。
なお、2021/03/07時点ではプレビューであるのでご注意ください。今後フィードバックにより挙動なり設定方法が変更される可能性があります。
Cloud PubSubとは
公式ドキュメントから引用します。
Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。メッセージング指向のミドルウェア(ストリーミング分析パイプラインにイベントの取り込みと配信を行う)として、Pub/Sub を使用できます。
https://cloud.google.com/pubsub/docs/overview
メッセージングということで、当然1:N(ファンアウト)、N:1(ファンイン)、N:Nなどの通信が可能です。書き込み側をPublisher、読み込み側をSubscriberと呼びます。あとはAt-Least-Onceな配信(つまりSubscriber側が重複したメッセージを受け取ってしまうことがある)であるといったことが重要事項でしょうか。
この技術ブログでは、Go Cloud#2 Pub/Subの概要紹介や、Go Cloud#7 PubSubドライバー(pubだけ)を実装してみる といった記事があります。
PubSubのメッセージスキーマ機能とは
PubSubのあるトピックに対してスキーマを紐付け、Publishする時に検証を行うことができるようになりました(待望の機能ですね!)
今のところは、以下のスキーマが利用できるとのこと。
個人的には、JSON Schemaが欲しかったところですが、エンコーディングをJSONにすれば、JSONの検証も可能です。今後の拡張に期待しましょう。
Apache AvroはConfluent Schema Registryを意識しているからでしょうか。Kakfaユーザも乗り換えがしやすいかもしれませんね。Protocol Bufferでの検証が可能ということですが、PubSubはgRPCインタフェースでのPublishが可能です。つまり指定されたProtocol Bufferのスキーマで検証しつつ、gRPCでのやり取りもできそうです。凄い。
スキーマで嬉しいこと
従来のPubSubは(AWS Kinesis DataStreamsなどでも)以下のようにどんなメッセージでもバイナリ形式で登録可能でした。
topic, err := pubsubClient.CreateTopic(context.Background(), "<トピック名>") |
なんでもとりあえずPublishできるので楽なのですが、Subscribe側では困ったことになりがちです。
例えば、Subscribe側でスキーマ付きのデータストア(BigQueryやCloudSQLなど)に永続化する場合に、登録が失敗するようなレイアウト違反のデータを扱う必要があります。具体的には、Dead Letter Queue(DLQ)に登録したり、エラーログを出してシステムリカバリ運用に乗せたりを検討する必要がありました。
これがHTTPのAPIであれば、HTTPレスポンスでエラーメッセージを応答することで、クライアント(今回だとPublisher)側に通知できましたが、PubSubを経由すると、Enterprise Integration PatternでいうRequest-Replyを実装するのであれば別ですが、これはこれで両者がアプリケーションレベルで密結合になりますし、実装もそう簡単ではありません。
今回のメッセージスキーマ機能を用いると、Publisher側が明らかに契約違反のメッセージを連携した場合に早期にNGを返すことができるので、今後はこの手の考慮事項を減らすことできます。嬉しいですね。
試してみる
GCPプロジェクトはこちらの手順に従い作成します。また、gcloudコマンドもこちらに従いインストールしておきます。
今回はAvroスキーマを利用します。コードはこちらに上げています。
(1)スキーマ付きトピックの作成
- IAMと管理からPubSubの管理者権限を付与したサービスアカウントを作成し、鍵(JSONファイル)をダウロードします
- ここの手順に従い、取得した鍵のパスに環境変数を通します
export GOOGLE_APPLICATION_CREDENTIALS=<ダウンロードパス>.json
- gcloudコマンドからトピックと、スキーマを作成します
gcloud beta pubsub schemas create avroschema1 \ |
続いてトピックを作成します。管理コンソールからだと以下のように、スキーマを使用する のチェックボックスが増えていました。
今回はgcloud経由で作成します。こちらの説明に従いオプションを追加します。
>gcloud beta pubsub topics create avrotopic \ |
登録内容をgcloud経由で確認すると、以下の用に設定されています。
# トピックの情報 |
(2)Publishしてみる
Go SDK経由でPublishしてみます。
package main |
動かしてみると、登録したAvroスキーマ違反な3番目の入力をPublishする時にエラーになります。まだ、go-sdk側にメッセージスキーマのエラーハンドリンクが組み込まれていないのか、色気がない内容ですね。
>go run main.go |
エラーメッセージの表示内容はさておき、メッセージスキーマによってPublisher側が良からぬメッセージを連携してしまうことは防ぐことができました。
Avroスキーマの検証
Avroスキーマの動作検証ですが、gcloudコマンドのpubsub schemas validate-message
で検証できます。
# 正常なケース |
こちらもエラーメッセージに優しさは無いので、今後の拡張に期待です。
どの程度ちゃんとスキーマを検証してくれるかもう少し細かく突っ込んでみます。BooleanField
を文字列xxx
を入れてみます。
>gcloud beta pubsub schemas validate-message \ |
無事エラーになりました。同様にFloatField
に数値以外の文字列を入れてもエラー担ってくれました。
スキーママイグレーションの前方互換性で重要そうな、スキーマに存在しないフィールドextra
を追加してみます。
>gcloud beta pubsub schemas validate-message \ |
これは実行結果が{}
なので問題ないようです。
エンコードにバイナリを指定する
続いて、スキーマのエンコードにバイナリ
を指定します。
>gcloud beta pubsub topics create avrotopic2 \ |
GoでAvroを利用するために、github.com/linkedin/goavro/v2
を利用します
import ( |
これを用いて実行します。登録は map[string]interface{}
を先程作成したcodec
で変換して登録します。
func main() { |
事項すると、1つ目の入力はOK、2つ目はクライアント側のAvro形式への変換でエラーで落ちました。クライアントサイドで落ちた方がエラーメッセージは切り分けしやすいですね。
>go run main.go |
念の為、不正なバイナリ値を送信してみます。
func main() { |
>go run main.go |
こちらもシンプルな例外メッセージですが、メッセージスキーマで不正なPublishを防ぐことに成功しています。
スキーママイグレーションについて
メッセージスキーマを付与した後に、アプリケーション要件の変化に併せてスキーマもアップデートしたいものです。2021/03/07時点では、一度作成したスキーマを変更する機能も、トピックに紐づくスキーマを変更する機能も存在しないようです。このあたりの機能追加や利用者側のナレッジも蓄積できると良いですね。
また、現状はスキーマをPub/Subの設定に紐付けるような形式なようです。スキーマはPublish/Subscribeのどちらにも公開されるべきものなので、Schema Registryのマネージドサービスが待ち遠しいですね。現状はGCSにもスキーマファイルを配備するなど、一工夫の設計余地がありそうだなと思いました。
まとめ
Cloud Pub/SubでメッセージスキーマでPublish時のペイロードの検証ができるようになりました。今の所、AvroとProtocol Buffersで定義可能で、エンコードJSONにすれば、テキストJSONも検証できます。
スキーマファイルは今の所GCSなどの連携ができず、Pub/Sub側に直接定義するようです。
まだまだ発展途上な機能だと思いますが、不正なデータがPub/Subトピックに混入されることを防ぐことができるのは画期的だと思いますので、ぜひ利用を検討したいと思います。
明日は市川さんの20210310_本番データが急に欲しいアナタに贈る、CloudSQLのサーバレスエクスポートを試してみたです。