Maison >développement back-end >Golang >Résilience dans la communication entre les microservices à l'aide de la bibliothèque failsafe-go
Commençons par le début. Qu’est-ce que la résilience ? J'aime la définition dans ce post :
La capacité intrinsèque d'un système à ajuster son fonctionnement avant, pendant ou après des changements et des perturbations, afin qu'il puisse maintenir les opérations requises dans des conditions attendues et inattendues
Comme il s'agit d'un terme large, je me concentrerai sur la communication entre microservices dans cet article. Pour ce faire, j'ai créé deux services en utilisant Go : serviceA et serviceB (ma créativité n'était pas grande lors de la rédaction de cet article).
Le code initial pour les deux était le suivant :
package main // serviceA import ( "encoding/json" "io" "log/slog" "net/http" "os" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { type response struct { Message string `json:"message"` } resp, err := http.Get("http://localhost:3001") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) }
package main //serviceB import ( "net/http" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) func main() { r := chi.NewRouter() r.Use(middleware.Logger) r.Get("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"message": "hello from service B"}`)) }) http.ListenAndServe(":3001", r) }
Comme vous pouvez le voir dans le code, si le serviceB a un problème, cela affectera le fonctionnement du serviceA, car il ne gère aucun échec de communication. Nous améliorerons cela en utilisant lib failsafe-go.
D'après la documentation sur le site officiel :
Failsafe-go est une bibliothèque permettant de créer des applications Go résilientes et tolérantes aux pannes. Il fonctionne en enveloppant les fonctions avec une ou plusieurs politiques de résilience, qui peuvent être combinées et composées selon les besoins.
Commençons par appliquer certaines politiques disponibles et tester leur composition.
La première politique que nous testerons est la plus simple, incluant un délai d'attente pour garantir que la connexion est interrompue si le serviceB met trop de temps à répondre et que le client sait pourquoi.
La première étape a été de modifier le serviceB afin qu'il inclue un délai afin de faciliter la démonstration du scénario :
package main //serviceB import ( "net/http" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) func main() { r := chi.NewRouter() r.Use(middleware.Logger) r.Get("/", func(w http.ResponseWriter, r *http.Request) { time.Sleep(5 * time.Second) //add a delay to simulate a slow service w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"message": "hello from service B"}`)) }) http.ListenAndServe(":3001", r) }
Après avoir installé failsafe-go, en utilisant les commandes :
❯ cd serviceA ❯ go get github.com/failsafe-go/failsafe-go
Le code de serviceA/main.go est :
package main import ( "encoding/json" "io" "log/slog" "net/http" "os" "time" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/failsafehttp" "github.com/failsafe-go/failsafe-go/timeout" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { type response struct { Message string `json:"message"` } // Create a Timeout for 1 second timeout := newTimeout(logger) // Use the Timeout with a failsafe RoundTripper roundTripper := failsafehttp.NewRoundTripper(nil, timeout) client := &http.Client{Transport: roundTripper} resp, err := client.Get("http://localhost:3001") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) } func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] { return timeout.Builder[*http.Response](1 * time.Second). OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) { logger.Info("Connection timed out") }).Build() }
Pour tester son fonctionnement, j'ai utilisé curl pour accéder au serviceA :
❯ curl -v http://localhost:3000 * Host localhost:3000 was resolved. * IPv6: ::1 * IPv4: 127.0.0.1 * Trying [::1]:3000... * Connected to localhost (::1) port 3000 > GET / HTTP/1.1 > Host: localhost:3000 > User-Agent: curl/8.7.1 > Accept: */* > * Request completely sent off < HTTP/1.1 500 Internal Server Error < Date: Fri, 23 Aug 2024 19:43:23 GMT < Content-Length: 45 < Content-Type: text/plain; charset=utf-8 < * Connection #0 to host localhost left intact Get "http://localhost:3001": timeout exceeded⏎
Et le résultat suivant est généré par serviceA :
go run main.go {"time":"2024-08-20T08:37:36.852886-03:00","level":"INFO","msg":"Connection timed out"} {"time":"2024-08-20T08:37:36.856079-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-20T08:37:35.851262-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63409","referer":"","length":0},"response":{"time":"2024-08-20T08:37:36.856046-03:00","latency":1004819000,"status":500,"length":45},"id":""}
De cette façon, il est possible de constater que le client (curl dans ce cas) a eu une réponse efficace et que le serviceA n'a pas eu d'impact significatif.
Améliorons la résilience de notre application en étudiant une autre stratégie avantageuse : réessayer.
Encore une fois, il a fallu apporter une modification au serviceB pour ajouter des erreurs aléatoires :
package main import ( "math/rand" "net/http" "strconv" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) func main() { r := chi.NewRouter() r.Use(middleware.Logger) r.Get("/", func(w http.ResponseWriter, r *http.Request) { retryAfterDelay := 1 * time.Second if fail() { w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds()))) w.WriteHeader(http.StatusServiceUnavailable) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"message": "hello from service B"}`)) }) http.ListenAndServe(":3001", r) } func fail() bool { if flipint := rand.Intn(2); flipint == 0 { return true } return false }
Pour faciliter la compréhension, j'affiche une stratégie à la fois, c'est pourquoi serviceA a été remplacé par la version originale et non par la version avec un délai d'attente. Plus tard, nous examinerons comment composer plusieurs politiques pour rendre l’application plus résiliente.
Le code serviceA/main.go ressemblait à ceci :
package main import ( "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "time" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/failsafehttp" "github.com/failsafe-go/failsafe-go/retrypolicy" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { type response struct { Message string `json:"message"` } // Create a RetryPolicy that only handles 500 responses, with backoff delays between retries retryPolicy := newRetryPolicy(logger) // Use the RetryPolicy with a failsafe RoundTripper roundTripper := failsafehttp.NewRoundTripper(nil, retryPolicy) client := &http.Client{Transport: roundTripper} resp, err := client.Get("http://localhost:3001") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) } func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] { return retrypolicy.Builder[*http.Response](). HandleIf(func(response *http.Response, _ error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). WithBackoff(time.Second, 10*time.Second). OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) { logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay)) }).Build() }
De cette façon, si serviceB renvoie le statut StatusServiceUnavailable (code 503), la connexion sera retentée à intervalles progressifs, grâce à la configuration de la fonction WithBackoff. Le résultat de serviceA, lorsqu'il est accessible via curl, devrait ressembler à :
go run main.go {"time":"2024-08-20T08:43:38.297621-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:38.283715-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63542","referer":"","length":0},"response":{"time":"2024-08-20T08:43:38.297556-03:00","latency":13840708,"status":200,"length":71},"id":""} {"time":"2024-08-20T08:43:39.946562-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:39.943394-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63544","referer":"","length":0},"response":{"time":"2024-08-20T08:43:39.946545-03:00","latency":3151000,"status":200,"length":71},"id":""} {"time":"2024-08-20T08:43:40.845862-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"} {"time":"2024-08-20T08:43:41.85287-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"} {"time":"2024-08-20T08:43:43.860694-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:40.841468-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63545","referer":"","length":0},"response":{"time":"2024-08-20T08:43:43.860651-03:00","latency":3019287458,"status":200,"length":71},"id":""}
Dans cet exemple, il est possible de voir que des erreurs se sont produites lors de l'accès au serviceB et que la bibliothèque a réexécuté la connexion jusqu'à ce qu'elle réussisse. Si la connexion continue de générer une erreur, le client recevra un message d'erreur ”http://localhost:3001 : tentatives dépassées.
Approfondissons la résilience en ajoutant un disjoncteur à notre projet.
Le concept de disjoncteur est une politique plus avancée qui offre un meilleur contrôle sur l'accès aux services. Le disjoncteur modèle fonctionne dans trois états : fermé (pas d'erreur), ouvert (avec erreurs, interrompt la transmission) et semi-ouvert (envoie un nombre limité de requêtes au service en difficulté pour tester sa récupération).
Pour utiliser cette stratégie, j'ai créé une nouvelle version du serviceB afin qu'il puisse générer davantage de scénarios d'erreur et de retards :
package main import ( "math/rand" "net/http" "strconv" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" ) func main() { r := chi.NewRouter() r.Use(middleware.Logger) r.Get("/", func(w http.ResponseWriter, r *http.Request) { retryAfterDelay := 1 * time.Second if fail() { w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds()))) w.WriteHeader(http.StatusServiceUnavailable) return } if sleep() { time.Sleep(1 * time.Second) } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"message": "hello from service B"}`)) }) http.ListenAndServe(":3001", r) } func fail() bool { if flipint := rand.Intn(2); flipint == 0 { return true } return false } func sleep() bool { if flipint := rand.Intn(2); flipint == 0 { return true } return false }
Et le code de serviceA :
package main import ( "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "time" "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/failsafe-go/failsafe-go/failsafehttp" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { type response struct { Message string `json:"message"` } // Create a CircuitBreaker that handles 503 responses and uses a half-open delay based on the Retry-After header circuitBreaker := newCircuitBreaker(logger) // Use the RetryPolicy with a failsafe RoundTripper roundTripper := failsafehttp.NewRoundTripper(nil, circuitBreaker) client := &http.Client{Transport: roundTripper} sendGet := func() (*http.Response, error) { resp, err := client.Get("http://localhost:3001") return resp, err } maxRetries := 3 resp, err := sendGet() for i := 0; i < maxRetries; i++ { if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests { break } time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header resp, err = sendGet() } if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) } func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] { return circuitbreaker.Builder[*http.Response](). HandleIf(func(response *http.Response, err error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). WithDelayFunc(failsafehttp.DelayFunc). OnStateChanged(func(event circuitbreaker.StateChangedEvent) { logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String())) }). Build() }
Comme nous pouvons le voir dans la sortie du serviceA, le disjoncteur fonctionne :
❯ go run main.go {"time":"2024-08-20T08:51:37.770611-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"} {"time":"2024-08-20T08:51:38.771682-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-20T08:51:38.776743-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"} {"time":"2024-08-20T08:51:39.777821-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-20T08:51:39.784897-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"} {"time":"2024-08-20T08:51:40.786209-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-20T08:51:40.792457-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"} {"time":"2024-08-20T08:51:40.792733-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:51:37.756947-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63699","referer":"","length":0},"response":{"time":"2024-08-20T08:51:40.792709-03:00","latency":3036065875,"status":200,"length":71},"id":""}
Cette politique permet un meilleur contrôle sur les erreurs, permettant au serviceB de récupérer s'il rencontre un problème.
Mais que faire lorsque le serviceB ne peut plus revenir, pour une raison quelconque ? Dans ces cas-là, nous pouvons utiliser une solution de repli.
L'idée de cette politique est d'avoir une alternative si le service souhaité présente un problème plus grave et met beaucoup de temps à revenir. Pour ce faire, nous allons changer le code serviceA :
package main import ( "bytes" "encoding/json" "io" "log/slog" "net/http" "os" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/failsafehttp" "github.com/failsafe-go/failsafe-go/fallback" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { fallback := newFallback(logger) roundTripper := failsafehttp.NewRoundTripper(nil, fallback) client := &http.Client{Transport: roundTripper} resp, err := client.Get("http://localhost:3001") if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() type response struct { Message string `json:"message"` } var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) } func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] { resp := &http.Response{ StatusCode: http.StatusOK, Header: map[string][]string{"Content-Type": {"application/json"}}, Body: io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)), } return fallback.BuilderWithResult[*http.Response](resp). HandleIf(func(response *http.Response, err error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) { logger.Info("Fallback executed result") }). Build() }
Dans la fonction newFallback, on peut voir la création d'une http.Response que la lib utilisera si l'utilisateur serviceB ne répond pas.
Cette fonctionnalité nous permet de répondre au client pendant que l'équipe responsable du serviceB a le temps de remettre le service en marche.
Le résultat de serviceA ressemble à ceci :
❯ go run main.go {"time":"2024-08-20T08:55:27.326475-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:27.31306-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63772","referer":"","length":0},"response":{"time":"2024-08-20T08:55:27.326402-03:00","latency":13343208,"status":200,"length":71},"id":""} {"time":"2024-08-20T08:55:31.756765-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:31.754348-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63774","referer":"","length":0},"response":{"time":"2024-08-20T08:55:31.756753-03:00","latency":2404750,"status":200,"length":71},"id":""} {"time":"2024-08-20T08:55:34.091845-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:33.086273-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63775","referer":"","length":0},"response":{"time":"2024-08-20T08:55:34.091812-03:00","latency":1005580625,"status":200,"length":71},"id":""} {"time":"2024-08-20T08:55:37.386512-03:00","level":"INFO","msg":"Fallback executed result"} {"time":"2024-08-20T08:55:37.386553-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:37.38415-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63777","referer":"","length":0},"response":{"time":"2024-08-20T08:55:37.386544-03:00","latency":2393916,"status":200,"length":76},"id":""}
In the next step, we will combine the concepts we've seen to create a more resilient application.
To do this, we need to change the code of serviceA so that it makes use of the policies we have seen so far:
package main import ( "bytes" "encoding/json" "fmt" "io" "log/slog" "net/http" "os" "time" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/circuitbreaker" "github.com/failsafe-go/failsafe-go/failsafehttp" "github.com/failsafe-go/failsafe-go/fallback" "github.com/failsafe-go/failsafe-go/retrypolicy" "github.com/failsafe-go/failsafe-go/timeout" "github.com/go-chi/chi/v5" slogchi "github.com/samber/slog-chi" ) func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) r := chi.NewRouter() r.Use(slogchi.New(logger)) r.Get("/", func(w http.ResponseWriter, r *http.Request) { type response struct { Message string `json:"message"` } retryPolicy := newRetryPolicy(logger) fallback := newFallback(logger) circuitBreaker := newCircuitBreaker(logger) timeout := newTimeout(logger) roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout) client := &http.Client{Transport: roundTripper} sendGet := func() (*http.Response, error) { resp, err := client.Get("http://localhost:3001") return resp, err } maxRetries := 3 resp, err := sendGet() for i := 0; i < maxRetries; i++ { if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests { break } time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header resp, err = sendGet() } if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } body, err := io.ReadAll(resp.Body) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } defer resp.Body.Close() var data response err = json.Unmarshal(body, &data) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`)) }) http.ListenAndServe(":3000", r) } func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] { return timeout.Builder[*http.Response](10 * time.Second). OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) { logger.Info("Connection timed out") }).Build() } func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] { resp := &http.Response{ StatusCode: http.StatusOK, Header: map[string][]string{"Content-Type": {"application/json"}}, Body: io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)), } return fallback.BuilderWithResult[*http.Response](resp). HandleIf(func(response *http.Response, err error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) { logger.Info("Fallback executed result") }). Build() } func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] { return retrypolicy.Builder[*http.Response](). HandleIf(func(response *http.Response, _ error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). WithBackoff(time.Second, 10*time.Second). OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) { logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay)) }).Build() } func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] { return circuitbreaker.Builder[*http.Response](). HandleIf(func(response *http.Response, err error) bool { return response != nil && response.StatusCode == http.StatusServiceUnavailable }). WithDelayFunc(failsafehttp.DelayFunc). OnStateChanged(func(event circuitbreaker.StateChangedEvent) { logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String())) }). Build() }
In the code:
roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
It is possible to view the use of all defined policies. The lib will execute it in the "rightmost" order, that is:
timeout -> circuitBreaker -> retryPolicy -> fallback
We can see the execution of the policies by observing the serviceA output:
go run main.go {"time":"2024-08-19T10:15:29.226553-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"} {"time":"2024-08-19T10:15:29.226841-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"} {"time":"2024-08-19T10:15:30.227941-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-19T10:15:30.234182-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"} {"time":"2024-08-19T10:15:30.234258-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"} {"time":"2024-08-19T10:15:32.235282-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-19T10:15:42.23622-03:00","level":"INFO","msg":"Connection timed out"} {"time":"2024-08-19T10:15:42.237942-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"} {"time":"2024-08-19T10:15:42.238043-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:29.215709-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52527","referer":"","length":0},"response":{"time":"2024-08-19T10:15:42.238008-03:00","latency":13022704750,"status":500,"length":45},"id":""} {"time":"2024-08-19T10:15:56.53476-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"} {"time":"2024-08-19T10:15:56.534803-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"} {"time":"2024-08-19T10:15:57.535108-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-19T10:15:57.53889-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"} {"time":"2024-08-19T10:15:57.538911-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"} {"time":"2024-08-19T10:15:59.539948-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"} {"time":"2024-08-19T10:15:59.544425-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"} {"time":"2024-08-19T10:15:59.544575-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:56.5263-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52542","referer":"","length":0},"response":{"time":"2024-08-19T10:15:59.544557-03:00","latency":3018352000,"status":500,"length":245},"id":""} {"time":"2024-08-19T10:16:11.044207-03:00","level":"INFO","msg":"Connection timed out"} {"time":"2024-08-19T10:16:11.046026-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:16:01.043317-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52544","referer":"","length":0},"response":{"time":"2024-08-19T10:16:11.045601-03:00","latency":10002596334,"status":500,"length":45},"id":""}
One of the advantages of microservices architecture is that we can break a complex domain into smaller, specialized services that communicate with each other to complete the necessary logic. Ensuring that this communication is resilient and will continue to work even in the face of failures and unforeseen events is fundamental. Using libraries such as failsafe-go makes this process easier.
You can find the codes presented in this post on my Github.
Originally published at https://eltonminetto.dev on August 24, 2024
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!