 
はじめに
TIG DXユニットの真野です。
DynamoDBやKinesis Data Streamsなどを利用するサービスをそれなりの期間で稼働させているとポツポツ下記のようなエラーが発生することが分かりました。
| [MY-APP-ERROR-LOG] RequestError: send request failedcaused by: | 
ここで疑問に思ったのは、少なくてもAWS SDK for Goを使っている限りは必要に応じてデフォルトでリトライをしてくれているはずです。下記のドキュメントでは通常は3回のリトライを実施してくれるとあります
では、上記のようなエラーがでるということはリトライを使い果たしても失敗したのでしょうか? そもそも read: connection reset by peer って正確には何だ? という状態だったので調べました。
read: connection reset by peer とは
サーバ側から(今回だとKinesis Data Streamsのエンドポイントのサーバ)から RST(Reset TCP) パケット(正確言うとRSTフラグが1のパケット)が送られて来た時にハンドリングされたエラーメッセージです。これを送信された場合は、接続要求や通信状態が拒否されたものとみなし、通信をリセットして終了する必要があるとのことです。発生条件はサーバ側の処理能力を超えた場合などに発生しうるそうです。
発生箇所は色々考えられますが、 エラーメッセージに read tcp xxxx とある場合はリクエストを送信して、レスポンスを読み込もうとして(read tcpしようとして)発生したと推測できます。
つまり、今回のログで言うと Post "https://kinesis.ap-northeast-1.amazonaws.com/" のリクエストはサーバ側に届いたものの、レスポンスを受信するタイミングでTPCレイヤーで通信に失敗したと見なせると思います(自信が無いので間違っていましたらご指摘下さい)
Go側ではRSTパッケージを送られたかどうかは、エラーの文字列に connection reset by peer が含まれているかどうかでも分かりますし、ガンバるのであれば、syscallパッケージで判定できそうです。
| import ( | 
本題から少し逸れたので、リトライについて話を戻します。
AWS SDK for Go側のリトライハンドリングについて
AWS SDK for Goのリトライ処理についてはカスタマイズ可能です。方法は辻さんが過去にブログを書いてくれています。
デフォルトの仕組みは、DefaultRetryerのShouldRetry で、どのようなエラーが発生した時に、リトライすべきか否か を判定しています。ShoudRetry をさらに追っていくと、IsErrorRetryable という関数からさらに isErrConnectionReset という関数があることに気が付きます。
connection_reset_error.goに実装された関数isErrConnectionResetを見ると、かなり興味深い実装です。
| func isErrConnectionReset(err error) bool { | 
なんと、read: connection reset が含まれている場合は、 リトライを行わない 判定になっていました。read が入っていない connection reset はリトライを行うとは対照的です。
コミットのハッシュ値からこのコードへの補足を探すと、簡潔に説明しているコメントが見つかります。
- https://github.com/aws/aws-sdk-go/pull/2926#issuecomment-553196888
- https://github.com/aws/aws-sdk-go/pull/2926#issuecomment-553637658
書いていることを整理しました。
- (今回で言うとKinesis)へのサービスへのリクエストの書き込みに成功/失敗について、SDK側は分からない- レスポンス読み取りに失敗しただけなので、リクエスト自体は成功した(Kinesisにデータはputできた)かもしれない
 
- とはいえ、失敗した可能性があるのであれば自動でリトライをしても良い気がするが…?- SDKとしては指定された操作が冪等であるか分からないので、デフォルトの挙動としては安全側に倒しリトライしない
 
…なるほど、理由が分かるとスッキリしますね。readがない connection reset をリトライするのは、おそらく書き込み側(リクエストを送信する時)にエラーになったケースなので、その場合は処理が成功することはありえないので、リトライを行うということだったようです。
その上で今回はリトライすべきかどうか
今回の構築したサービスの仕様だと、Kinesis Data Streamsをサブスクライブしているアプリは 冪等 であることを期待しているので、重複してputすることを許容し、そのままリトライさせることにします(ていうかKinesisであればそもそもサービスとしてAt Least Onceなので、SDK側の判断で重複リトライしてもよいのでは..という気もしましたが、ダメなケースがあるのかな)。
リトライ方法ですが、先程のカスタムリトライの記事にあったとおり、DefaultRetryerを拡張して実装します。
カスタムリトライの実装
aws/aws-sdk-go のリポジトリの exampleフォルダにカスタムリトライのサンプルコードがあり参考にできます。
実装を見ると、500番台のエラーは常に リトライしない という拡張なようです。
| type CustomRetryer struct { | 
今回私が実装したいのは、read: connection reset の時も リトライを行いたい ということなのでその条件のときに return true という、ほぼ同じ考えが適用できるステキなサンプルでした。
次に実装をあげますが、元のDefaultRetryerがtemporaryというインタフェースでスイッチしていた実装なのでそれを切り貼りしています。
| type CustomRetryer struct { | 
上記の実装だと、read: connection resetが発生した場合に規定の回数より多くリトライをしてしまうのでは? という懸念が浮かびましたが、ドキュメントを読むと最大リトライの配慮は別処理でなされるので問題ないようです。
// Implementations may consider request attempt count when determining if a
// request is retryable, but the SDK will use MaxRetries to limit the
// number of attempts a request are made
ShouldRetry(*Request) bool
https://docs.aws.amazon.com/sdk-for-go/api/aws/request/#Retryer
それ以外の判定はDefaultRetryerに最終的な判断を移譲させます。
実装したカスタムリトライは aws.session で設定できます。
| var kc = kinesis.New(session.Must(session.NewSession(&aws.Config{ | 
DefaultRetryer側の設定を変えたい場合は、埋め込んでいるためそのまま設定できます。
| var kc = kinesis.New(session.Must( | 
既存のパッケージの機能をそのまま使えるのは安心感があると思います。こういう薄いラッパーが作りやすいのは嬉しい仕組みですね。
errors.Is() で判定できるのでは?
私は試していませんが、 syscall.ECONNRESET を用いて判定もできるかと思います。先ほどのisErrReadConnectionReset()を少し書き換えます。
awserror.Error は %w で下のエラーをラップしていないため、 OrigErr() に対して判定するのがコツだと思われます。
| func isErrReadConnectionReset(err error) bool { | 
さいごに
今まであまり深く気に留めていなかった read: connection reset by peer といったエラーに関しても、SDK実装者側の設計や配慮を抑え、アプリ開発に活かすと不明瞭な点が減り、より自信を持ったコードを書けるようになりました。
AWS SDK for GoはGitHub上でのやり取り含めてちゃんと運用されており、学びになります。ハマったときはコードの内部を追ってみるのもオススメだと思いました。