フューチャー技術ブログ

PubSubのメッセージスキーマを試してみる

はじめに

TIG DXユニット真野です。

GCP連載2021の1日目です。2020年のGCP連載でもCloud Loggingネタでトップバッター、2021年も先陣を切ることができて光栄です。

本記事では2021年3月に発表されたCloud PubSubのメッセージスキーマを試します。

なお、2021/03/07時点ではプレビューであるのでご注意ください。今後フィードバックにより挙動なり設定方法が変更される可能性があります。

Cloud PubSubとは

公式ドキュメントから引用します。

Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。メッセージング指向のミドルウェア(ストリーミング分析パイプラインにイベントの取り込みと配信を行う)として、Pub/Sub を使用できます。
https://cloud.google.com/pubsub/docs/overview

メッセージングということで、当然1:N(ファンアウト)、N:1(ファンイン)、N:Nなどの通信が可能です。書き込み側をPublisher、読み込み側をSubscriberと呼びます。あとはAt-Least-Onceな配信(つまりSubscriber側が重複したメッセージを受け取ってしまうことがある)であるといったことが重要事項でしょうか。

この技術ブログでは、Go Cloud#2 Pub/Subの概要紹介や、Go Cloud#7 PubSubドライバー(pubだけ)を実装してみる といった記事があります。

PubSubのメッセージスキーマ機能とは

PubSubのあるトピックに対してスキーマを紐付け、Publishする時に検証を行うことができるようになりました。(待望の機能ですね!)

今のところは、以下のスキーマが利用できるとのこと。

  1. Apache Avro
  2. Protocol Buffer

個人的には、JSON Schemaが欲しかったところですが、エンコーディングをJSONにすれば、JSONの検証も可能です。今後の拡張に期待しましょう。

Apache AvroはConfluent Schema Registryを意識しているからでしょうか。Kakfaユーザも乗り換えがしやすいかもしれませんね。Protocol Bufferでの検証が可能ということですが、PubSubはgRPCインターフェースでのPublishが可能です。つまり指定されたProtocol Bufferのスキーマで検証しつつ、gRPCでのやり取りもできそうです。凄い。

スキーマで嬉しいこと

従来のPubSubは(AWS Kinesis DataStreamsなどでも)以下のようにどんなメッセージでもバイナリ形式で登録可能でした。

pubsubのサンプル.go
topic, err := pubsubClient.CreateTopic(context.Background(), "<トピック名>")

res := topic.Publish(ctx, &pubsub.Message{Data: []byte("<任意のバイナリ>")})

なんでもとりあえずPublishできるので楽なのですが、Subscribe側では困ったことになりがちです。
例えば、Subscribe側でスキーマ付きのデータストア(BigQueryやCloudSQLなど)に永続化する場合に、登録が失敗するようなレイアウト違反のデータを扱う必要があります。具体的には、Dead Letter Queue(DLQ)に登録したり、エラーログを出してシステムリカバリ運用に乗せたりを検討する必要がありました。

これがHTTPのAPIであれば、HTTPレスポンスでエラーメッセージを応答することで、クライアント(今回だとPublisher)側に通知することができましたが、PubSubを経由すると、Enterprise Integration PatternでいうRequest-Replyを実装するのであれば別ですが、これはこれで両者がアプリケーションレベルで密結合になりますし、実装もそう簡単ではありません。

今回のメッセージスキーマ機能を用いると、Publisher側が明らかに契約違反のメッセージを連携した場合に早期にNGを返すことができるので、今後はこの手の考慮事項を減らすことできます。嬉しいですね。

試してみる

GCPプロジェクトはこちらの手順に従い作成します。また、gcloudコマンドもこちらに従いインストールしておきます。

今回はAvroスキーマを利用します。コードはこちらに上げています。

①スキーマ付きトピックの作成

  1. IAMと管理からPubSubの管理者権限を付与したサービスアカウントを作成し、鍵(JSONファイル)をダウロードします
  2. ここの手順に従い、取得した鍵のパスに環境変数を通します
    • export GOOGLE_APPLICATION_CREDENTIALS=<ダウンロードパス>.json
  3. gcloudコマンドからトピックと、スキーマを作成します
avroスキーマ作成
gcloud beta pubsub schemas create avroschema1 \
--type=AVRO \
--definition={"type":"record","name":"Avro","fields":[{"name":"StringField","type":"string"},{"name":"FloatField","type":"float"},{"name":"BooleanField","type":"boolean"}]}

続いてトピックを作成します。管理コンソールからだと以下のように、スキーマを使用する のチェックボックスが増えていました。

今回はgcloud経由で作成します。こちらの説明に従いオプションを追加します。

>gcloud beta pubsub topics create avrotopic \
--message-encoding=JSON \
--schema=avroschema1
Created topic [projects/<YOUR PROJECT ID>/topics/avrotopic].

登録内容をgcloud経由で確認すると、以下の用に設定されています。

# トピックの情報
$ gcloud pubsub topics describe avrotopic
name: projects/<YOUR PROJECT ID>/topics/avrotopic
schemaSettings:
encoding: JSON
schema: projects/<YOUR PROJECT ID>/schemas/avroschema1

# スキーマの状態
>gcloud beta pubsub schemas describe avroschema1
definition: '{"type":"record","name":"Avro","fields":[{"name":"StringField","type":"string"},{"name":"FloatField","type":"float"},{"name":"BooleanField","type":"boolean"}]}'
name: projects/<YOUR PROJECT ID>/schemas/avroschema1
type: AVRO

②Publishしてみる

Go SDK経由でPublishしてみます。

package main

import (
"context"
"fmt"
"log"

"cloud.google.com/go/pubsub"
)

func main() {
ctx := context.Background()

c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>")
if err != nil {
log.Fatal(err)
}
topic := c.Topic("avrotopic") // 今回作成したトピック名

data := []string{
`{"StringField":"hello", "FloatField":123.45, "BooleanField":true}`,
`{"StringField":"world", "FloatField":0, "BooleanField":false}`,
`{"NGField":"dummy"}`,
}

for _, v := range data {
res := topic.Publish(ctx, &pubsub.Message{
Data: []byte(v),
})
if _, err := res.Get(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("success publish", v)
}

}

動かしてみると、登録したAvroスキーマ違反な3番目の入力をPublishする時にエラーになります。まだ、go-sdk側にメッセージスキーマのエラーハンドリンクが組み込まれていないのか、色気がない内容ですね。

実行結果
>go run main.go
success publish {"StringField":"hello", "FloatField":123.45, "BooleanField":true}
success publish {"StringField":"world", "FloatField":0, "BooleanField":false}
2021/03/07 16:20:20 rpc error: code = InvalidArgument desc = Request contains an invalid argument.

エラーメッセージの表示内容はさておき、メッセージスキーマによってPublisher側が良からぬメッセージを連携してしまうことは防ぐことができました。

Avroスキーマの検証

Avroスキーマの動作検証ですが、gcloudコマンドのpubsub schemas validate-messageで検証することができます。

# 正常なケース
> gcloud beta pubsub schemas validate-message \
--message-encoding=json \
--message={"StringField":"hello","FloatField":123.45,"BooleanField":true} ^
--schema-name=avroschema1
{}

# 明らかにNGなケース
> gcloud beta pubsub schemas validate-message \
--message-encoding=json \
--message={"NGField":"dummy"} \
--schema-name=avroschema1
ERROR: (gcloud.beta.pubsub.schemas.validate-message) INVALID_ARGUMENT: Request contains an invalid argument.

こちらもエラーメッセージに優しさは無いので、今後の拡張に期待です。

どの程度ちゃんとスキーマを検証してくれるかもう少し細かく突っ込んでみます。BooleanFieldを文字列xxxを入れてみます。

>gcloud beta pubsub schemas validate-message \
--message-encoding=json \
--message={"StringField":"hello","FloatField":123.45,"BooleanField":"xxx"} \
--schema-name=avroschema1
ERROR: (gcloud.beta.pubsub.schemas.validate-message) INVALID_ARGUMENT: Request contains an invalid argument.

無事エラーになりました。同様にFloatFieldに数値以外の文字列を入れてもエラー担ってくれました。

スキーママイグレーションの前方互換性で重要そうな、スキーマに存在しないフィールドextraを追加してみます。

>gcloud beta pubsub schemas validate-message \
--message-encoding=json \
--message={"StringField":"hello","FloatField":123.45,"BooleanField":true,"extra":"aaa"} \
--schema-name=avroschema1
{}

これは実行結果が{}なので問題ないようです。

エンコードにバイナリを指定する

続いて、スキーマのエンコードにバイナリを指定します。

>gcloud beta pubsub topics create avrotopic2 \
--message-encoding=BINARY \
--schema=avroschema1
Created topic [projects/<YOUR PROJECT ID>/topics/avrotopic2].

GoでAvroを利用するために、github.com/linkedin/goavro/v2 を利用します

import (
// 略
"github.com/linkedin/goavro/v2"
)

// Goのデータ構造をAvro形式に変換するコーデック
var codec, _ = goavro.NewCodec(`
{
"type" : "record",
"name" : "Avro",
"fields" : [
{"name" : "StringField", "type" : "string"},
{"name" : "FloatField", "type" : "float"},
{"name" : "BooleanField", "type" : "boolean"}
]
}
`)

これを用いて実行します。登録は map[string]interface{} を先程作成したcodecで変換して登録します。

func main() {
ctx := context.Background()
c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>")
if err != nil {
log.Fatal(err)
}
topic := c.Topic("avrotopic2") // 先程作成したトピック

data := []map[string]interface{}{
{"StringField": "hello", "FloatField": 123.45, "BooleanField": true},
{"NGField": "dummy"},
}

for _, v := range data {
binary, err := codec.BinaryFromNative(nil, v)
if err != nil {
log.Fatal("codec.BinaryFromNative", err)
}

res := topic.Publish(ctx, &pubsub.Message{
Data: binary,
})
if _, err := res.Get(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("success publish", v)
}

}

事項すると、1つ目の入力はOK、2つ目はクライアント側のAvro形式への変換でエラーで落ちました。クライアントサイドで落ちた方がエラーメッセージは切り分けしやすいですね。

実行結果
>go run main.go
success publish map[BooleanField:true FloatField:123.45 StringField:hello]
2021/03/07 16:48:41 codec.BinaryFromNativecannot encode binary record "Avro" field "StringField": schema does not specify default value and no value provided

念の為、不正なバイナリ値を送信してみます。

func main() {
ctx := context.Background()
c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>")
if err != nil {
log.Fatal(err)
}

res := c.Topic("avrotopic2").Publish(ctx, &pubsub.Message{
Data: []byte("dummy payload"),
})
if _, err := res.Get(ctx); err != nil {
log.Fatal(err)
}
fmt.Println("success publish")

}
実行結果
>go run main.go
2021/03/07 16:55:04 rpc error: code = InvalidArgument desc = Request contains an invalid argument.

こちらもシンプルな例外メッセージですが、メッセージスキーマで不正なPublishを防ぐことに成功しています。

スキーママイグレーションについて

メッセージスキーマを付与した後に、アプリケーション要件の変化に併せてスキーマもアップデートしたいものです。2021/03/07時点では、一度作成したスキーマを変更する機能も、トピックに紐づくスキーマを変更する機能も存在しないようです。このあたりの機能追加や利用者側のナレッジも蓄積できると良いですね。

また、現状はスキーマをPub/Subの設定に紐付けるような形式なようです。スキーマはPublish/Subscribeのどちらにも公開されるべきものなので、Schema Registryのマネージドサービスが待ち遠しいですね。現状はGCSにもスキーマファイルを配備するなど、一工夫の設計余地がありそうだなと思いました。

まとめ

Cloud Pub/SubでメッセージスキーマでPublish時のペイロードの検証ができるようになりました。今の所、AvroとProtocol Buffersで定義可能で、エンコードJSONにすれば、テキストJSONも検証できます。

スキーマファイルは今の所GCSなどの連携ができず、Pub/Sub側に直接定義するようです。

まだまだ発展途上な機能だと思いますが、不正なデータがPub/Subトピックに混入されることを防ぐことができるのは画期的だと思いますので、ぜひ利用を検討したいと思います。

明日は市川さんの20210310_本番データが急に欲しいアナタに贈る、CloudSQLのサーバレスエクスポートを試してみたです。