Go Cloudはいろいろなドライバーが整備されているものの当然のことながら、この世のすべてのバックエンドに対応しているわけではありません。AWSやGCPやAzureが提供されているサービス以外にも、自前で運用しているミドルウェアにも対応したくなったりするはずです。Go Cloudの中を覗き見るついでに、自分でドライバーを実装してみました。
Go CloudにはPubSubを扱うパッケージ群があります。GCPのPubSub、AWSのSNS/SQS、Azure Service Busというクラウドベンダーのサービス以外に、RabbitMQ、NATS、Kafkaといったオンプレでも使えるミドルウェア(KafkaはAWSのサービスもありますが)もあります。
アプリケーションのログの収集というと、最近はOpenTelemetryとかNew Relic APMのようなパフォーマンスモニタリングや大規模なマイクロサービスのサポートを目的とした分散トレースなどもありますが、ここで対応されているKafka以外にもFluentdをオンプレやクラウドなどで動かしているお客様も多いので、PubSubのバックエンドとして、Fluentdのpublisherを実装してみます。受け側(Subscription)はなく、サーバーへの送信(Publish)だけなので、fluentdpubという名前でApacheライセンスで公開しました。もとからあるKafkaも合わせれば、ログ収集基盤への送信にもGo Cloudが利用できるようになります。
実装は次の3ステップに分けて実装します。
- Fluentd特化の
fluentdpub.OpenTopic()
を実装する fluentdpub.URLOpener
を実装する- Resolverに登録し、
pubsub.OpenTopic()
で使えるようにする
Fluentd特化のfluentdpub.OpenTopic()
を実装してみる
Fluentdに実際に接続するのは、公式の"github.com/fluent/fluent-logger-golang/fluent"
パッケージを利用します。
まず、Fluentd特化のOpenTopic()
を実装しますが、まず、fluent
パッケージのコネクション情報のインスタンスをそのまま受け取るようにします。このようにすることで、fluent
パッケージの機能をすべて利用する、という使い方をかんたんにユーザーに提供できます。すべての機能をラップするのは大変ですので。
Go Cloudのお作法として、インタフェースをあまり使わないというものがあります。PubSubに関しては次のようなコードになります。
gocloud.dev/pubsub/driver
パッケージのTopic
インタフェースを満たす構造体を作る。これが最終的にFluentdへのアクセスを担うことになる。Topic
インタフェースを満たす構造体のポインタをgocloud.dev/pubsub
パッケージのNewTopic()
に渡しし、ユーザーに*pubsub.Topic
のポインタを返す
利用者はgocloud.dev/pubsub
パッケージだけを知っていればいいが、実装者はそれに加えてgocloud.dev/pubsub/driver
パッケージを実装するという区分けになっています。
それでは実装していきます。PubSubのAPIとFluentdを比べると、Fluentdにはタグがあり、PubSubにはBodyがあるという違いがあります。 OpenTopic()
にはこの違いを吸収するオプションを追加します。Bodyで設定されたコンテンツはmessage
の値に設定し、逆に tag
で設定された値はtagPrefix
と組み合わせてタグとなる(tagPrefix.tag
が最終的なタグになる)、というルールにします。このあたりのルールの違いを吸収する方法はここで検討が必要になります。
type TopicOptions struct { |
全メソッドの実装の紹介はしませんが、一番のコアとなるSendBatch()
メソッドだけ紹介します。メッセージの配列が来るので、自分のモジュールが対象としているエクスポート先(ここではFluentd)に情報を流します。
メタタグの一部をタグとして取り出して、構造体のフィールドのtagPrefixなどと組み合わせて、出力先のタグ名を決定します。
func (t topic) SendBatch(ctx context.Context, ms []*driver.Message) error { |
このOpenTopic
も、他のPubSub APIでは公開APIとなっていますし、かんたんですが、これだけでもすでに使えるようになっているはずです。これで、fluentdpub
の背骨が出来上がりました。
fluentdpub.URLOpener
を実装する
次に、URLなどの文字列をパースしてFluentdに接続する部分を実装していきます。これはNATS向けのPubSubでも公開構造体・メソッドとして実装されていますが、ドキュメントには書かれていません。
pubsub
パッケージの共通初期化関数の場合、対象のサービスを選択する方法はURLしかありません。先ほどの関数は挙動を変えるオプションはTopicOptions
構造体を利用していましたが、この追加のオプションも渡せないので、すべてURLの中に情報をもたせる必要があります。
なお、NATS向けの実装では、オペレーターが気にするNATSサーバーの接続先はNATS_SERVER_URL
で指定し、ソースコード中に登場しうるURLにはトピック名などの開発者向けの情報のみという使い分けがされています。これも、Go Cloudの設計思想に従った役割分担と言えます。fluentdpub
も、FLUENTD_UPSTREAM_URL
という環境変数に接続情報(プロトコル、ホスト、ポート)は任せて、URLはタグ名のみとします。それ以外の情報をここでは扱います。
TagPrefixというのはFluentd全体でタグを先頭につけたい場合に使うものとします。というのも、PubとSubが対応した他のPubSubのコードの識別子と比べると、Fluentdの方が分析の方は分析に任せるものとして、なるべく詳細な情報をタグに載せようとする分、識別子は長くなります。prodとかdevみたいな文字列をオペレータ視点で入れたくなるかもしれませんので、これを追加できるようにしておきます。
実装としては以下の通りです。先ほど作ったtopic
構造体に対して、TopicOptions
ではなく、URLをキーとして扱えるようにする機能がこれで実現されました。もう少しです。
type URLOpener struct { |
リゾルバーに登録
それでは最終段階に入ります。URLスキーマを登録することで、共通APIのpubsub.OpenTopic()
などの関数からも使えるようになり、マルチクラウドに一歩近づきます。
const Scheme = "fluentd" |
最終的にはこの内部の先ほど実装したURLOpener.OpenTopicURL()
メソッドを呼んでいます。一点異なるのは、環境変数のパース部分でしょう。URL形式で渡されたものをパースしています。
実装したものの整理
この記事ではストレートに一発でできたかのように書いていますが、実際作成中は何が何をするものか整理ができておらず、結構混乱して手戻りしつつ実装しました。整理をしてみると、コードが少ない薄い機能を先に実装し、徐々にロジックが多い文字列からの初期化に手を出していっていることがわかります。
実装したもの | 接続情報 | タグ設定 | 最後に何をする? | |
---|---|---|---|---|
ステップ1 | OpenTopic() |
*fluent.Fluent | TopicOptions |
pubsub.NewTopic() 呼び出し |
ステップ2 | URLOpener |
*fluent.Fluent | URL をパース |
ステップ1で作ったものを呼び出し |
ステップ3 | defaultDialer |
環境変数をパース | URL をパース |
ステップ2で作ったものを呼び出し |
使い方とまとめ
テスト用にDockerを起動します。
docker run --rm -v ${PWD}/tmp/fluentd:/fluentd/log --name fluentd -p 24224:24224 fluent/fluentd:latest |
よく使うと思われるステップ3は、環境変数を設定し、URLとして接続情報を渡す必要があります。ここでは、すべてのタグのプレフィックスとして、first
を設定しています(省略可能)。タグ名にピリオドを差し込むことで2段階、3段階でも深いタグを設定できます。
export FLUENTD_UPSTREAM_URL=tcp://localhost:24224/first |
pubsub.OpenTopic
にはURLを設定します。プレフィックスの次に設定されるタグも指定します(省略可能)。
topic, err := pubsub.OpenTopic(context.Background(), "fluentd://second") |
あとは送信だけですね。ここでもタグを設定しています(省略可能)。
topic.Send(context.Background(), &pubsub.Message{ |
3か所でそれぞれタグを設定していますが、それらは結合されて、first.second.third
というタグになるという実装にしています。実行してみると、Dockerで起動したFluentdのフォルダにファイルが生成されてログが出力されていることが確認できるでしょう。
まだ全部のコードを読んだわけではないですが、だいたいはこのような階層構造になっているようです。テストもしやすいですし、もし、何かを実装したくなった場合もこのような手順で実装していくとスムーズでしょう。
Go Cloud集中連載の結びの言葉
10月ごろに、社内チャットのGoチャンネルにGo Cloudをみんなで集まって技術ブログに集中連載してみませんか? と軽く声をかけたところ、何人かから声があがり、5人で7本の記事が集まりました(1人はPCトラブルで復旧中)。Go Cloudは幅広いライブラリですし、学ぼうとしても、個人の興味のあるところ以外はどうしても手薄になりがちです。いろいろな興味・仕事のメンバーを集めたことで、それぞれの興味がオーバーラップして、紹介記事としてのカバレッジをあげることができました。
内容としては主要な機能の説明は網羅できたと思いますし、ウェブで情報を見たことがないローカルでのAWS/GCPのエミュレーションと組み合わせたGo Cloudのテスト環境構築、ドライバーの実装と幅も深さも1人では書けない記事をお届けできたと思います。中には、実現のための苦労が滲み出る記事もありましたが、今後も、仕事を通じてGo Cloudのノウハウが溜まったら、不定期で記事を公開していこうと思っています。
- Go Cloud#1 概要とBlobへの活用方法
- Go Cloud#2 Pub/Subの概要紹介
- Go Cloud#3 Go CloudのDocStoreを使う
- Go Cloud#4 URLを編集するパッケージ
- Go Cloud#5 AWSのローカルモック環境であるLocalStackを活用する
- Go Cloud#6 GCPのローカルエミュレータを活用する
- Go Cloud#7 PubSubドライバー(pubだけ)を実装してみる(この記事です)