フューチャー技術ブログ

Go Cloud#7 PubSubドライバー(pubだけ)を実装してみる

Go Cloudはいろいろなドライバーが整備されているものの当然のことながら、この世のすべてのバックエンドに対応しているわけではありません。AWSやGCPやAzureが提供されているサービス以外にも、自前で運用しているミドルウェアにも対応したくなったりするはずです。Go Cloudの中を覗き見るついでに、自分でドライバーを実装してみました。

Go CloudにはPubSubを扱うパッケージ群があります。GCPのPubSub、AWSのSNS/SQS、Azure Service Busというクラウドベンダーのサービス以外に、RabbitMQ、NATS、Kafkaといったオンプレでも使えるミドルウェア(KafkaはAWSのサービスもありますが)もあります。

アプリケーションのログの収集というと、最近はOpenTelemetryとかNew Relic APMのようなパフォーマンスモニタリングや大規模なマイクロサービスのサポートを目的とした分散トレースなどもありますが、ここで対応されているKafka以外にもFluentdをオンプレやクラウドなどで動かしているお客様も多いので、PubSubのバックエンドとして、Fluentdのpublisherを実装してみます。受け側(Subscription)はなく、サーバーへの送信(Publish)だけなので、fluentdpubという名前でApacheライセンスで公開しました。もとからあるKafkaも合わせれば、ログ収集基盤への送信にもGo Cloudが利用できるようになります。

実装は次の3ステップに分けて実装します。

  • Fluentd特化のfluentdpub.OpenTopic()を実装する
  • fluentdpub.URLOpenerを実装する
  • Resolverに登録し、pubsub.OpenTopic()で使えるようにする

Fluentd特化のfluentdpub.OpenTopic()を実装してみる

Fluentdに実際に接続するのは、公式の"github.com/fluent/fluent-logger-golang/fluent"パッケージを利用します。

まず、Fluentd特化のOpenTopic()を実装しますが、まず、fluentパッケージのコネクション情報のインスタンスをそのまま受け取るようにします。このようにすることで、fluentパッケージの機能をすべて利用する、という使い方をかんたんにユーザーに提供できます。すべての機能をラップするのは大変ですので。

Go Cloudのお作法として、インタフェースをあまり使わないというものがあります。PubSubに関しては次のようなコードになります。

  • gocloud.dev/pubsub/driverパッケージのTopicインタフェースを満たす構造体を作る。これが最終的にFluentdへのアクセスを担うことになる。
  • Topicインタフェースを満たす構造体のポインタをgocloud.dev/pubsubパッケージのNewTopic()に渡しし、ユーザーに*pubsub.Topicのポインタを返す

利用者はgocloud.dev/pubsubパッケージだけを知っていればいいが、実装者はそれに加えてgocloud.dev/pubsub/driverパッケージを実装するという区分けになっています。

それでは実装していきます。PubSubのAPIとFluentdを比べると、Fluentdにはタグがあり、PubSubにはBodyがあるという違いがあります。 OpenTopic() にはこの違いを吸収するオプションを追加します。Bodyで設定されたコンテンツはmessageの値に設定し、逆に tag で設定された値はtagPrefixと組み合わせてタグとなる(tagPrefix.tagが最終的なタグになる)、というルールにします。このあたりのルールの違いを吸収する方法はここで検討が必要になります。

type TopicOptions struct {
BodyKey string
TagKey string
}

func OpenTopic(f *fluent.Fluent, tagPrefix string, opt TopicOptions) (*pubsub.Topic, error) {
if f == nil {
return nil, errors.New("fluentdpub: fluent.Fluent is required")
}
if opt.BodyKey == "" {
opt.BodyKey = "message"
}
if opt.TagKey == "" {
opt.TagKey = "tag"
}
return pubsub.NewTopic(&topic{
f: f,
tagPrefix: tagPrefix,
bodyKey: opt.BodyKey,
tagKey: opt.TagKey,
}, nil), nil
}

全メソッドの実装の紹介はしませんが、一番のコアとなるSendBatch()メソッドだけ紹介します。メッセージの配列が来るので、自分のモジュールが対象としているエクスポート先(ここではFluentd)に情報を流します。

メタタグの一部をタグとして取り出して、構造体のフィールドのtagPrefixなどと組み合わせて、出力先のタグ名を決定します。

func (t topic) SendBatch(ctx context.Context, ms []*driver.Message) error {
for _, msg := range ms {
var fullTag string
tag, ok := msg.Metadata[t.tagKey]
if ok {
delete(msg.Metadata, t.tagKey)
if t.tagPrefix != "" {
fullTag = t.tagPrefix + "." + tag
} else {
fullTag = tag
}
} else {
fullTag = t.tagPrefix
}
if fullTag == "" {
return fmt.Errorf("Message %v doesn't have tag", msg.AckID)
}
msg.Metadata[t.bodyKey] = string(msg.Body)
err := t.f.Post(fullTag, msg.Metadata)
if err != nil {
return err
}
}
return nil
}

このOpenTopicも、他のPubSub APIでは公開APIとなっていますし、かんたんですが、これだけでもすでに使えるようになっているはずです。これで、fluentdpubの背骨が出来上がりました。

fluentdpub.URLOpenerを実装する

次に、URLなどの文字列をパースしてFluentdに接続する部分を実装していきます。これはNATS向けのPubSubでも公開構造体・メソッドとして実装されていますが、ドキュメントには書かれていません。

pubsubパッケージの共通初期化関数の場合、対象のサービスを選択する方法はURLしかありません。先ほどの関数は挙動を変えるオプションはTopicOptions構造体を利用していましたが、この追加のオプションも渡せないので、すべてURLの中に情報をもたせる必要があります。

なお、NATS向けの実装では、オペレーターが気にするNATSサーバーの接続先はNATS_SERVER_URLで指定し、ソースコード中に登場しうるURLにはトピック名などの開発者向けの情報のみという使い分けがされています。これも、Go Cloudの設計思想に従った役割分担と言えます。fluentdpubも、FLUENTD_UPSTREAM_URLという環境変数に接続情報(プロトコル、ホスト、ポート)は任せて、URLはタグ名のみとします。それ以外の情報をここでは扱います。

TagPrefixというのはFluentd全体でタグを先頭につけたい場合に使うものとします。というのも、PubとSubが対応した他のPubSubのコードの識別子と比べると、Fluentdの方が分析の方は分析に任せるものとして、なるべく詳細な情報をタグに載せようとする分、識別子は長くなります。prodとかdevみたいな文字列をオペレータ視点で入れたくなるかもしれませんので、これを追加できるようにしておきます。

実装としては以下の通りです。先ほど作ったtopic構造体に対して、TopicOptionsではなく、URLをキーとして扱えるようにする機能がこれで実現されました。もう少しです。

type URLOpener struct {
Connection *fluent.Fluent
TagPrefix string
}

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
var opt TopicOptions
key := u.Query().Get("bodykey")
if key != "" {
opt.BodyKey = key
u.Query().Del("bodykey")
}
tagkey := u.Query().Get("tagkey")
if tagkey != "" {
opt.TagKey = tagkey
u.Query().Del("tagkey")
}
for param := range u.Query() {
return nil, fmt.Errorf("open topic %v: invalid query parameter %s", u, param)
}
var subject string
if o.TagPrefix != "" && u.Hostname() != "" {
subject = o.TagPrefix + "." + u.Hostname()
} else {
subject = o.TagPrefix + u.Hostname()
}
return OpenTopic(o.Connection, subject, opt)
}

リゾルバーに登録

それでは最終段階に入ります。URLスキーマを登録することで、共通APIのpubsub.OpenTopic()などの関数からも使えるようになり、マルチクラウドに一歩近づきます。

const Scheme = "fluentd"

func init() {
o := new(defaultDialer)
pubsub.DefaultURLMux().RegisterTopic(Scheme, o)
}

type defaultDialer struct {
opener *URLOpener
err error
}

func (d defaultDialer) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
c, tag, err := parseEnvVar(os.Getenv("FLUENTD_UPSTREAM_URL"))
if err != nil {
return nil, err
}
conn, err := fluent.New(*c)
if err != nil {
return nil, err
}
o := URLOpener{
Connection: conn,
TagPrefix: tag,
}
return o.OpenTopicURL(ctx, u)
}

最終的にはこの内部の先ほど実装したURLOpener.OpenTopicURL()メソッドを呼んでいます。一点異なるのは、環境変数のパース部分でしょう。URL形式で渡されたものをパースしています

実装したものの整理

この記事ではストレートに一発でできたかのように書いていますが、実際作成中は何が何をするものか整理ができておらず、結構混乱して手戻りしつつ実装しました。整理をしてみると、コードが少ない薄い機能を先に実装し、徐々にロジックが多い文字列からの初期化に手を出していっていることがわかります。

実装したもの 接続情報 タグ設定 最後に何をする?
ステップ1 OpenTopic() *fluent.Fluent TopicOptions pubsub.NewTopic()呼び出し
ステップ2 URLOpener *fluent.Fluent URLをパース ステップ1で作ったものを呼び出し
ステップ3 defaultDialer 環境変数をパース URLをパース ステップ2で作ったものを呼び出し

使い方とまとめ

テスト用にDockerを起動します。

$ docker run --rm -v ${PWD}/tmp/fluentd:/fluentd/log --name fluentd -p 24224:24224 fluent/fluentd:latest

よく使うと思われるステップ3は、環境変数を設定し、URLとして接続情報を渡す必要があります。ここでは、すべてのタグのプリフィックスとして、firstを設定しています(省略可能)。タグ名にピリオドを差し込むことで2段階、3段階でも深いタグを設定できます。

$ export FLUENTD_UPSTREAM_URL=tcp://localhost:24224/first

pubsub.OpenTopicにはURLを設定します。プリフィックスの次に設定されるタグも指定します(省略可能)。

topic, err := pubsub.OpenTopic(context.Background(), "fluentd://second")

あとは送信だけですね。ここでもタグを設定しています(省略可能)。

topic.Send(context.Background(), &pubsub.Message{
Body: []byte("Hello, World!\n"),
Metadata: map[string]string{
"tag": "third",
"language": "en",
"importance": "high",
},
})

3か所でそれぞれタグを設定していますが、それらは結合されて、first.second.thirdというタグになるという実装にしています。実行してみると、Dockerで起動したFluentdのフォルダにファイルが生成されてログが出力されていることが確認できるでしょう。

まだ全部のコードを読んだわけではないですが、だいたいはこのような階層構造になっているようです。テストもしやすいですし、もし、何かを実装したくなった場合もこのような手順で実装していくとスムーズでしょう。

Go Cloud集中連載の結びの言葉

10月ごろに、社内チャットのGoチャンネルにGo Cloudをみんなで集まって技術ブログに集中連載してみませんか?と軽く声をかけたところ、何人かから声があがり、5人で7本の記事が集まりました(1人はPCトラブルで復旧中)。Go Cloudは幅広いライブラリですし、学ぼうとしても、個人の興味のあるところ以外はどうしても手薄になりがちです。いろいろな興味・仕事のメンバーを集めたことで、それぞれの興味がオーバーラップして、紹介記事としてのカバレッジをあげることができました。

内容としては主要な機能の説明は網羅できたと思いますし、ウェブで情報を見たことがないローカルでのAWS/GCPのエミュレーションと組み合わせたGo Cloudのテスト環境構築、ドライバーの実装と幅も深さも1人では書けない記事をお届けできたと思います。中には、実現のための苦労が滲み出る記事もありましたが、今後も、仕事を通じてGo Cloudのノウハウが溜まったら、不定期で記事を公開していこうと思っています。