はじめに TIG DXユニット真野です。
GCP連載2021 の1日目です。2020年のGCP連載でもCloud Loggingネタでトップバッター 、2021年も先陣を切ることができて光栄です。
本記事では2021年3月に発表されたCloud PubSubのメッセージスキーマを試します。
なお、2021/03/07時点ではプレビュー であるのでご注意ください。今後フィードバックにより挙動なり設定方法が変更される可能性があります。
Cloud PubSubとは
公式ドキュメントから引用します。
Pub/Sub は、イベントを処理するサービスとイベントを生成するサービスを切り離す非同期メッセージング サービスです。メッセージング指向のミドルウェア(ストリーミング分析パイプラインにイベントの取り込みと配信を行う)として、Pub/Sub を使用できます。https://cloud.google.com/pubsub/docs/overview
メッセージングということで、当然1:N(ファンアウト)、N:1(ファンイン)、N:Nなどの通信が可能です。書き込み側をPublisher、読み込み側をSubscriberと呼びます。あとはAt-Least-Onceな配信(つまりSubscriber側が重複したメッセージを受け取ってしまうことがある)であるといったことが重要事項でしょうか。
この技術ブログでは、Go Cloud#2 Pub/Subの概要紹介 や、Go Cloud#7 PubSubドライバー(pubだけ)を実装してみる といった記事があります。
PubSubのメッセージスキーマ機能とは PubSubのあるトピックに対してスキーマを紐付け、Publishする時に検証を行うことができるようになりました。(待望の機能ですね!)
今のところは、以下のスキーマが利用できるとのこと。
Apache Avro
Protocol Buffer
個人的には、JSON Schemaが欲しかったところですが、エンコーディングをJSONにすれば、JSONの検証も可能です。今後の拡張に期待しましょう。
Apache AvroはConfluent Schema Registryを意識しているからでしょうか。Kakfaユーザも乗り換えがしやすいかもしれませんね。Protocol Bufferでの検証が可能ということですが、PubSubはgRPCインターフェースでのPublishが可能です。つまり指定されたProtocol Bufferのスキーマで検証しつつ、gRPCでのやり取りもできそうです。凄い。
スキーマで嬉しいこと 従来のPubSubは(AWS Kinesis DataStreamsなどでも)以下のようにどんなメッセージでもバイナリ形式で登録可能でした。
pubsubのサンプル.go 1 2 3 topic, err := pubsubClient.CreateTopic(context.Background(), "<トピック名>" ) res := topic.Publish(ctx, &pubsub.Message{Data: []byte ("<任意のバイナリ>" )})
なんでもとりあえずPublishできるので楽なのですが、Subscribe側では困ったことになりがちです。 例えば、Subscribe側でスキーマ付きのデータストア(BigQueryやCloudSQLなど)に永続化する場合に、登録が失敗するようなレイアウト違反のデータを扱う必要があります。具体的には、Dead Letter Queue(DLQ)に登録したり、エラーログを出してシステムリカバリ運用に乗せたりを検討する必要がありました。
これがHTTPのAPIであれば、HTTPレスポンスでエラーメッセージを応答することで、クライアント(今回だとPublisher)側に通知することができましたが、PubSubを経由すると、Enterprise Integration PatternでいうRequest-Reply を実装するのであれば別ですが、これはこれで両者がアプリケーションレベルで密結合になりますし、実装もそう簡単ではありません。
今回のメッセージスキーマ機能を用いると、Publisher側が明らかに契約違反のメッセージを連携した場合に早期にNGを返すことができるので、今後はこの手の考慮事項を減らすことできます。嬉しいですね。
試してみる GCPプロジェクトはこちら の手順に従い作成します。また、gcloudコマンドもこちら に従いインストールしておきます。
今回はAvroスキーマを利用します。コードはこちら に上げています。
①スキーマ付きトピックの作成
IAMと管理 からPubSubの管理者権限を付与したサービスアカウントを作成し、鍵(JSONファイル)をダウロードします
ここ の手順に従い、取得した鍵のパスに環境変数を通します
export GOOGLE_APPLICATION_CREDENTIALS=<ダウンロードパス>.json
gcloudコマンドからトピックと、スキーマを作成します
avroスキーマ作成 1 2 3 gcloud beta pubsub schemas create avroschema1 \ --type =AVRO \ --definition={"type" :"record" ,"name" :"Avro" ,"fields" :[{"name" :"StringField" ,"type" :"string" },{"name" :"FloatField" ,"type" :"float" },{"name" :"BooleanField" ,"type" :"boolean" }]}
続いてトピックを作成します。管理コンソールからだと以下のように、スキーマを使用する のチェックボックスが増えていました。
今回はgcloud経由で作成します。こちら の説明に従いオプションを追加します。
1 2 3 4 >gcloud beta pubsub topics create avrotopic \ --message-encoding=JSON \ --schema=avroschema1 Created topic [projects/<YOUR PROJECT ID>/topics/avrotopic].
登録内容をgcloud経由で確認すると、以下の用に設定されています。
1 2 3 4 5 6 7 8 9 10 11 12 $ gcloud pubsub topics describe avrotopic name: projects/<YOUR PROJECT ID>/topics/avrotopic schemaSettings: encoding: JSON schema: projects/<YOUR PROJECT ID>/schemas/avroschema1 >gcloud beta pubsub schemas describe avroschema1 definition: '{"type":"record","name":"Avro","fields":[{"name":"StringField","type":"string"},{"name":"FloatField","type":"float"},{"name":"BooleanField","type":"boolean"}]}' name: projects/<YOUR PROJECT ID>/schemas/avroschema1 type : AVRO
②Publishしてみる Go SDK経由でPublishしてみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package mainimport ( "context" "fmt" "log" "cloud.google.com/go/pubsub" ) func main () { ctx := context.Background() c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>" ) if err != nil { log.Fatal(err) } topic := c.Topic("avrotopic" ) data := []string { `{"StringField":"hello", "FloatField":123.45, "BooleanField":true}` , `{"StringField":"world", "FloatField":0, "BooleanField":false}` , `{"NGField":"dummy"}` , } for _, v := range data { res := topic.Publish(ctx, &pubsub.Message{ Data: []byte (v), }) if _, err := res.Get(ctx); err != nil { log.Fatal(err) } fmt.Println("success publish" , v) } }
動かしてみると、登録したAvroスキーマ違反な3番目の入力をPublishする時にエラーになります。まだ、go-sdk側にメッセージスキーマのエラーハンドリンクが組み込まれていないのか、色気がない内容ですね。
実行結果 1 2 3 4 >go run main.go success publish {"StringField" :"hello" , "FloatField" :123.45, "BooleanField" :true } success publish {"StringField" :"world" , "FloatField" :0, "BooleanField" :false } 2021/03/07 16:20:20 rpc error: code = InvalidArgument desc = Request contains an invalid argument.
エラーメッセージの表示内容はさておき、メッセージスキーマによってPublisher側が良からぬメッセージを連携してしまうことは防ぐことができました。
Avroスキーマの検証 Avroスキーマの動作検証ですが、gcloudコマンドのpubsub schemas validate-message
で検証することができます。
1 2 3 4 5 6 7 8 9 10 11 12 13 > gcloud beta pubsub schemas validate-message \ --message-encoding=json \ --message={"StringField" :"hello" ,"FloatField" :123.45,"BooleanField" :true } ^ --schema-name=avroschema1 {} > gcloud beta pubsub schemas validate-message \ --message-encoding=json \ --message={"NGField" :"dummy" } \ --schema-name=avroschema1 ERROR: (gcloud.beta.pubsub.schemas.validate-message) INVALID_ARGUMENT: Request contains an invalid argument.
こちらもエラーメッセージに優しさは無いので、今後の拡張に期待です。
どの程度ちゃんとスキーマを検証してくれるかもう少し細かく突っ込んでみます。BooleanField
を文字列xxx
を入れてみます。
1 2 3 4 5 >gcloud beta pubsub schemas validate-message \ --message-encoding=json \ --message={"StringField" :"hello" ,"FloatField" :123.45,"BooleanField" :"xxx" } \ --schema-name=avroschema1 ERROR: (gcloud.beta.pubsub.schemas.validate-message) INVALID_ARGUMENT: Request contains an invalid argument.
無事エラーになりました。同様にFloatField
に数値以外の文字列を入れてもエラー担ってくれました。
スキーママイグレーションの前方互換性で重要そうな、スキーマに存在しないフィールドextra
を追加してみます。
1 2 3 4 5 >gcloud beta pubsub schemas validate-message \ --message-encoding=json \ --message={"StringField" :"hello" ,"FloatField" :123.45,"BooleanField" :true ,"extra" :"aaa" } \ --schema-name=avroschema1 {}
これは実行結果が{}
なので問題ないようです。
エンコードにバイナリを指定する 続いて、スキーマのエンコードにバイナリ
を指定します。
1 2 3 4 >gcloud beta pubsub topics create avrotopic2 \ --message-encoding=BINARY \ --schema=avroschema1 Created topic [projects/<YOUR PROJECT ID>/topics/avrotopic2].
GoでAvroを利用するために、github.com/linkedin/goavro/v2
を利用します
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import ( "github.com/linkedin/goavro/v2" ) var codec, _ = goavro.NewCodec(` { "type" : "record", "name" : "Avro", "fields" : [ {"name" : "StringField", "type" : "string"}, {"name" : "FloatField", "type" : "float"}, {"name" : "BooleanField", "type" : "boolean"} ] } ` )
これを用いて実行します。登録は map[string]interface{}
を先程作成したcodec
で変換して登録します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func main () { ctx := context.Background() c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>" ) if err != nil { log.Fatal(err) } topic := c.Topic("avrotopic2" ) data := []map [string ]interface {}{ {"StringField" : "hello" , "FloatField" : 123.45 , "BooleanField" : true }, {"NGField" : "dummy" }, } for _, v := range data { binary, err := codec.BinaryFromNative(nil , v) if err != nil { log.Fatal("codec.BinaryFromNative" , err) } res := topic.Publish(ctx, &pubsub.Message{ Data: binary, }) if _, err := res.Get(ctx); err != nil { log.Fatal(err) } fmt.Println("success publish" , v) } }
事項すると、1つ目の入力はOK、2つ目はクライアント側のAvro形式への変換でエラーで落ちました。クライアントサイドで落ちた方がエラーメッセージは切り分けしやすいですね。
実行結果 1 2 3 >go run main.go success publish map[BooleanField:true FloatField:123.45 StringField:hello] 2021/03/07 16:48:41 codec.BinaryFromNativecannot encode binary record "Avro" field "StringField" : schema does not specify default value and no value provided
念の為、不正なバイナリ値を送信してみます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { ctx := context.Background() c, err := pubsub.NewClient(ctx, "<YOUR PROJECT ID>" ) if err != nil { log.Fatal(err) } res := c.Topic("avrotopic2" ).Publish(ctx, &pubsub.Message{ Data: []byte ("dummy payload" ), }) if _, err := res.Get(ctx); err != nil { log.Fatal(err) } fmt.Println("success publish" ) }
実行結果 1 2 >go run main.go 2021/03/07 16:55:04 rpc error: code = InvalidArgument desc = Request contains an invalid argument.
こちらもシンプルな例外メッセージですが、メッセージスキーマで不正なPublishを防ぐことに成功しています。
スキーママイグレーションについて メッセージスキーマを付与した後に、アプリケーション要件の変化に併せてスキーマもアップデートしたいものです。2021/03/07時点では、一度作成したスキーマを変更する機能も、トピックに紐づくスキーマを変更する機能も存在しないようです。このあたりの機能追加や利用者側のナレッジも蓄積できると良いですね。
また、現状はスキーマをPub/Subの設定に紐付けるような形式なようです。スキーマはPublish/Subscribeのどちらにも公開されるべきものなので、Schema Registryのマネージドサービスが待ち遠しいですね。現状はGCSにもスキーマファイルを配備するなど、一工夫の設計余地がありそうだなと思いました。
まとめ Cloud Pub/SubでメッセージスキーマでPublish時のペイロードの検証ができるようになりました。今の所、AvroとProtocol Buffersで定義可能で、エンコードJSONにすれば、テキストJSONも検証できます。
スキーマファイルは今の所GCSなどの連携ができず、Pub/Sub側に直接定義するようです。
まだまだ発展途上な機能だと思いますが、不正なデータがPub/Subトピックに混入されることを防ぐことができるのは画期的だと思いますので、ぜひ利用を検討したいと思います。
明日は市川さんの20210310_本番データが急に欲しいアナタに贈る、CloudSQLのサーバレスエクスポートを試してみた です。