Future Tech Blog
フューチャー開発者ブログ

Go Cloud#2 Pub/Subの概要紹介

  • 辻大志郎

はじめに

こんにちは。TIG DXチーム1の辻です。今回は Go Cloudシリーズの第二弾として、Go Cloud の Pub/Sub を紹介します。

ビジネスニーズとして、今までは AWS のみでアプリケーションを構成していたけど、GCP のサービスも取り入れてマルチクラウド構成にしていきたいという声も聞きます。

また Google Trends で過去 5 年間の検索傾向を見ると、ゆるやかな上昇傾向にあるように見えます。

マルチクラウドなアプリケーションの実装としては、各サービスのインターフェースに依存することが多く、独自に各サービスのインターフェースをラップして実装するのもなかなか大変なのではないでしょうか。

Go Cloud はアプリケーションのポータビリティに役に立つと考えています。

Go Cloud記事はこちらもご参考ください。

Pub/Sub

Producer-Consumer モデルを用いたメッセージングは、サーバーレスアーキテクチャやマイクロサービスアーキテクチャでよく利用されるような、非同期サービス間通信のモデルです。

キューイングサービスとしてよく用いられるサービスとして、AWS だと Amazon Simple Queue Service 、GCP だと Google Cloud Pub/Sub、ミドルウェアだと Kafka などが挙げられます。

現時点(2019/11/08)では以下のサービスをサポートしています。

  • Google Cloud Pub/Sub
  • Amazon Simple Notification Service
  • Amazon Simple Queue Service
  • Azure Service Bus
  • RabbitMQ
  • NATS
  • Kafka
  • In-Memory

本記事では Google Cloud Pub/Sub と Amazon Simple Queue Service と In-Memory について Go Cloud で Publish / Subscribe した例を紹介します。

GCP

Go Cloud を用いて Google Cloud Pub/Sub に対して Publish と Subscribe をしてみます。

先に topic は手動で作成しておきます。クレデンシャルはデフォルトのクレデンシャルを用います。サービスアカウントキーを払い出しておきましょう。
今回は環境変数 GOOGLE_APPLICATION_CREDENTIALS にサービスアカウントキーの json へのパスが設定されているものとします。

Publish

以下のようにして topic に Publish できます。

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"context"
"log"

"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub"
)

func main() {
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "gcppubsub://projects/develop-123456/topics/test-topic")
if err != nil {
log.Fatal(err)
}
defer topic.Shutdown(ctx)

err = topic.Send(ctx, &pubsub.Message{
Body: []byte("Hello, World!\n"),
Metadata: map[string]string{
"Env": "test",
},
})
if err != nil {
log.Fatal(err)
}
}

管理コンソールから確認すると topic にメッセージが Publish されていることが分かります。

Subscribe

続いて topic から Subscribe してみます。

以下の実装で topic を監視します。この状態でコンソールからメッセージを publish します。

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"fmt"
"log"

"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/gcppubsub"
)

func main() {
ctx := context.Background()
subscription, err := pubsub.OpenSubscription(ctx, "gcppubsub://projects/develop-123456/subscriptions/test-sub")
if err != nil {
fmt.Errorf("could not open topic subscription: %v", err)
}
defer subscription.Shutdown(ctx)

for {
msg, err := subscription.Receive(ctx)
if err != nil {
log.Printf("Receiving message: %v", err)
break
}
fmt.Printf("Got message: %q\n", msg.Body)
msg.Ack()
}
}

メッセージを公開すると、標準出力に

1
Got message: "test-msg"

と出力されることが確認できました。管理コンソールからもメッセージが削除され、取得できていることが確認できました。

SQS

GCP の Pub/Sub と同様に AWS の SQS でも Go Cloud を試してみます。

キューは予め作成しておきました。標準キューです。

Publish

キューにメッセージを送信してみます。

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"log"

"gocloud.dev/pubsub/awssnssqs"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"

"gocloud.dev/pubsub"
)

func main() {
ctx := context.Background()
sess, err := session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
})
if err != nil {
log.Fatal(err)
}
const queueURL = "https://sqs.ap-northeast-1.amazonaws.com/123456789012/test-queue"
topic := awssnssqs.OpenSQSTopic(ctx, sess, queueURL, nil)
defer topic.Shutdown(ctx)

err = topic.Send(ctx, &pubsub.Message{
Body: []byte("Hello, World!\n"),
Metadata: map[string]string{
"Env": "test",
},
})
if err != nil {
log.Fatal(err)
}
}

送信できていることがコンソールから確認できました。

Subscribe

続いてキューからメッセージを Subscribe します。以下のようなメッセージをコンソールから送信します。

それでは受信してみます。

main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"fmt"
"log"

"gocloud.dev/pubsub/awssnssqs"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
)

func main() {
ctx := context.Background()
sess, err := session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
})
if err != nil {
log.Fatal(err)
}
const queueURL = "https://sqs.ap-northeast-1.amazonaws.com/123456789012/test-queue"
subscription := awssnssqs.OpenSubscription(ctx, sess, queueURL, nil)
defer subscription.Shutdown(ctx)

for {
msg, err := subscription.Receive(ctx)
if err != nil {
log.Printf("Receiving message: %v", err)
break
}
fmt.Printf("Got message: %q\n", msg.Body)
msg.Ack()
}
}

コンソールから送信すると、以下のように標準出力されることを確認できました。キューからもメッセージが削除されていました。

1
Got message: "test-msg"

In-Memory

最後に In-Memory を扱ってみます。GoDoc にもあるように本番では使ってはダメです。ローカルでの開発とテストを目的としています。

mempubsub should not be used for production: it is intended for local development and testing.

Publish and Subscribe

In-Memory で Publish と Subscriber の動作は以下のようにして確認できました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
"context"
"fmt"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/mempubsub"
"log"
)

func main() {
ctx := context.Background()

topic, err := pubsub.OpenTopic(ctx, "mem://myTopic")
if err != nil {
log.Fatal(err)
}
defer topic.Shutdown(ctx)

subscription, err := pubsub.OpenSubscription(ctx, "mem://myTopic")
if err != nil {
log.Fatal(err)
}
defer subscription.Shutdown(ctx)

if err := topic.Send(ctx, &pubsub.Message{Body: []byte("test-msg")}); err != nil {
log.Fatal(err)
}

msg, err := subscription.Receive(ctx)
if err != nil {
log.Printf("Receiving message: %v", err)
}

fmt.Printf("Got message: %q\n", msg.Body)
msg.Ack()
}
1
Got message: "test-msg"

ちなみに Google Cloud Pub/Sub や Amazon Simple Queue Service のときは必要ありませんでしたが、In-Memeory の場合は明示的に OpenTopic する必要があります。OpenTopic しないと、以下のようにエラーになります。エラーになる理由は mem.go を見れば分かります。

1
2019/11/11 20:27:50 open subscription mem://myTopic: no topic "myTopic" has been created

所感

Go Cloud の API から Google Cloud Pub/Sub と Amazon Simple Queue Service 、ローカルの In-Memeory での Publish/Subscribe を操作できることが確認できました。GoDoc からもわかりますが API として指定できるパラメータは少ないです。キューイングサービスをエンタープライズで利用する上で、ポーリング間隔や可視性タイムアウト(確認応答期限)などは気になるポイントだと思います。

細かい実装はドキュメントには記載されておらず、pubsub.go を見ると、各ドライバーの SendAcks や ReceiveBatch メソッドなどの実装に依存することがわかります。OSS はコードを確認すれば挙動が把握できるので便利ですね。

おわりに

今回は Go Cloud を用いた Pub/Sub の簡単な例を紹介しました。API としてベンダー中立な汎用 API を提供し、マルチクラウドの展開をサポートする試みは非常に興味深いです。

以下にあるように Go Cloud プロジェクトはアルファ版で枯れておらず、今後より発展していくと考えられます。プロジェクトで検証/導入するチャンスを見つけてフィードバックしていきたいですね。

The APIs are still in alpha, but we think they are production-ready and are actively looking for feedback from early adopters. If you have comments or questions, you can post to the Go Cloud mailing list or email us at go-cdk-feedback@google.com.


Go Cloud記事はこちらもご参考ください。

関連記事: