>  기사  >  백엔드 개발  >  Failsafe-go lib를 사용하는 마이크로서비스 간 통신의 복원력

Failsafe-go lib를 사용하는 마이크로서비스 간 통신의 복원력

PHPz
PHPz원래의
2024-08-27 06:04:02548검색

Resilience in communication between microservices using the failsafe-go lib

처음부터 시작해 보세요. 탄력성이란 무엇입니까? 이 게시물의 정의가 마음에 듭니다.

예상된 조건과 예상치 못한 조건 모두에서 필요한 작동을 유지할 수 있도록 변경 및 교란 이전, 도중 또는 이후에 기능을 조정하는 시스템의 본질적인 능력

광의적인 용어인 만큼 이번 포스팅에서는 마이크로서비스 간의 통신에 중점을 두겠습니다. 이를 위해 Go를 사용하여 serviceA와 serviceB라는 두 가지 서비스를 만들었습니다(이 게시물을 작성할 당시에는 창의성이 높지 않았습니다).

두 가지의 초기 코드는 다음과 같습니다.

package main

// serviceA
import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        resp, err := http.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

package main

//serviceB
import (
    "net/http"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

코드에서 볼 수 있듯이 serviceB에 문제가 발생하면 통신 장애를 처리하지 않으므로 serviceA의 기능에 영향을 미칩니다. lib Failsafe-go를 사용하여 이를 개선하겠습니다.

공식 웹사이트의 문서에 따르면:

Failsafe-go는 탄력적이고 내결함성을 갖춘 Go 애플리케이션을 구축하기 위한 라이브러리입니다. 필요에 따라 결합하고 구성할 수 있는 하나 이상의 복원력 정책으로 기능을 래핑하여 작동합니다.

사용 가능한 정책을 적용하고 구성을 테스트해 보겠습니다.

시간 초과

우리가 테스트할 첫 번째 정책은 serviceB가 응답하는 데 너무 오랜 시간이 걸리고 클라이언트가 그 이유를 알고 있는 경우 연결이 중단되도록 하는 시간 초과를 포함하는 가장 간단한 정책입니다.

첫 번째 단계는 시나리오를 더 쉽게 시연할 수 있도록 지연을 포함하도록 serviceB를 변경하는 것이었습니다.

package main
//serviceB
import (
    "net/http"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        time.Sleep(5 * time.Second) //add a delay to simulate a slow service
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

failsafe-go를 설치한 후 다음 명령을 사용하세요.

❯ cd serviceA
❯ go get github.com/failsafe-go/failsafe-go

serviceA/main.go의 코드는 다음과 같습니다.

package main

import (
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a Timeout for 1 second
        timeout := newTimeout(logger)

        // Use the Timeout with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, timeout)
        client := &http.Client{Transport: roundTripper}
        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](1 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

작동 방식을 테스트하기 위해 컬을 사용하여 서비스에 액세스했습니다.

❯ curl -v http://localhost:3000
* Host localhost:3000 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:3000...
* Connected to localhost (::1) port 3000
> GET / HTTP/1.1
> Host: localhost:3000
> User-Agent: curl/8.7.1
> Accept: */*
>
* Request completely sent off
< HTTP/1.1 500 Internal Server Error
< Date: Fri, 23 Aug 2024 19:43:23 GMT
< Content-Length: 45
< Content-Type: text/plain; charset=utf-8
<
* Connection #0 to host localhost left intact
Get "http://localhost:3001": timeout exceeded⏎

serviceA에서는 다음 출력이 생성됩니다.

go run main.go
{"time":"2024-08-20T08:37:36.852886-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-20T08:37:36.856079-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-20T08:37:35.851262-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63409","referer":"","length":0},"response":{"time":"2024-08-20T08:37:36.856046-03:00","latency":1004819000,"status":500,"length":45},"id":""}

이를 통해 클라이언트(이 경우 컬)는 효과적인 응답을 하였고, serviceA는 큰 영향을 미치지 않았음을 알 수 있습니다.

또 다른 유용한 정책인 재시도를 조사하여 애플리케이션의 복원력을 개선해 보겠습니다.

다시 해 보다

다시 한번 무작위 오류를 추가하려면 serviceB를 변경해야 했습니다.

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

이해를 돕기 위해 한 번에 하나의 정책을 보여드리고 있으며, 이로 인해 serviceA가 시간 초과 버전이 아닌 원래 버전으로 변경되었습니다. 나중에 애플리케이션의 탄력성을 높이기 위해 여러 정책을 구성하는 방법을 살펴보겠습니다.

serviceA/main.go 코드는 다음과 같습니다.

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a RetryPolicy that only handles 500 responses, with backoff delays between retries
        retryPolicy := newRetryPolicy(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, retryPolicy)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

이렇게 하면 serviceB가 StatusServiceUnavailable(코드 503) 상태를 반환하는 경우 WithBackoff 함수 구성 덕분에 점진적인 간격으로 연결이 다시 시도됩니다. 컬을 통해 액세스할 때 serviceA의 출력은 다음과 유사해야 합니다.

go run main.go
{"time":"2024-08-20T08:43:38.297621-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:38.283715-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63542","referer":"","length":0},"response":{"time":"2024-08-20T08:43:38.297556-03:00","latency":13840708,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:39.946562-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:39.943394-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63544","referer":"","length":0},"response":{"time":"2024-08-20T08:43:39.946545-03:00","latency":3151000,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:43:40.845862-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-20T08:43:41.85287-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-20T08:43:43.860694-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:43:40.841468-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63545","referer":"","length":0},"response":{"time":"2024-08-20T08:43:43.860651-03:00","latency":3019287458,"status":200,"length":71},"id":""}

이 예에서는 serviceB에 액세스할 때 오류가 발생했으며 lib는 연결이 성공할 때까지 다시 연결을 실행한 것을 확인할 수 있습니다. 연결에서 계속 오류가 발생하면 클라이언트에 'http://localhost:3001': 재시도가 초과되었습니다.

라는 오류 메시지가 표시됩니다.

프로젝트에 회로 차단기를 추가하여 복원력에 대해 더 깊이 살펴보겠습니다.

회로 차단기

서킷 브레이커 개념은 서비스에 대한 액세스를 더 강력하게 제어할 수 있는 고급 정책입니다. 패턴 회로 차단기는 닫힘(오류 없음), 열림(오류 있음, 전송 중단), 세미 열림(복구 테스트가 어려운 서비스에 제한된 수의 요청 전송)의 세 가지 상태로 작동합니다.

이 정책을 사용하기 위해 더 많은 오류 시나리오와 지연이 발생할 수 있도록 serviceB의 새 버전을 만들었습니다.

package main

import (
    "math/rand"
    "net/http"
    "strconv"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()
    r.Use(middleware.Logger)
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        retryAfterDelay := 1 * time.Second
        if fail() {
            w.Header().Add("Retry-After", strconv.Itoa(int(retryAfterDelay.Seconds())))
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        if sleep() {
            time.Sleep(1 * time.Second)
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"message": "hello from service B"}`))
    })
    http.ListenAndServe(":3001", r)
}

func fail() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

func sleep() bool {
    if flipint := rand.Intn(2); flipint == 0 {
        return true
    }
    return false
}

serviceA의 코드는 다음과 같습니다.

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        // Create a CircuitBreaker that handles 503 responses and uses a half-open delay based on the Retry-After header
        circuitBreaker := newCircuitBreaker(logger)

        // Use the RetryPolicy with a failsafe RoundTripper
        roundTripper := failsafehttp.NewRoundTripper(nil, circuitBreaker)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}

serviceA의 출력에서 ​​볼 수 있듯이 회로 차단기가 작동 중입니다.

❯ go run main.go
{"time":"2024-08-20T08:51:37.770611-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-20T08:51:38.771682-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:38.776743-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:39.777821-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:39.784897-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-20T08:51:40.786209-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-20T08:51:40.792457-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-20T08:51:40.792733-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:51:37.756947-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63699","referer":"","length":0},"response":{"time":"2024-08-20T08:51:40.792709-03:00","latency":3036065875,"status":200,"length":71},"id":""}

이 정책을 사용하면 오류를 더욱 강력하게 제어할 수 있으므로 serviceB에 문제가 발생한 경우 복구할 수 있습니다.

그런데 어떤 이유로든 serviceB가 더 이상 반환할 수 없는 경우에는 어떻게 하나요? 이러한 경우 대체를 사용할 수 있습니다.

대체

이 정책의 취지는 원하는 서비스에 더 심각한 문제가 있고 반환하는 데 오랜 시간이 걸릴 경우 대안을 마련하는 것입니다. 이를 위해 코드 서비스A를 변경합니다:

package main

import (
    "bytes"
    "encoding/json"
    "io"
    "log/slog"
    "net/http"
    "os"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        fallback := newFallback(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback)
        client := &http.Client{Transport: roundTripper}

        resp, err := client.Get("http://localhost:3001")
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        type response struct {
            Message string `json:"message"`
        }
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

newFallback 함수에서 사용자 serviceB가 응답하지 않는 경우 lib가 사용할 하나의 http.Response가 생성되는 것을 볼 수 있습니다.

이 기능을 사용하면 serviceB 담당 팀이 서비스를 다시 시작하고 실행할 시간을 갖는 동안 우리는 고객에게 응답할 수 있습니다.

serviceA의 출력은 다음과 유사합니다.

❯ go run main.go
{"time":"2024-08-20T08:55:27.326475-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:27.31306-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63772","referer":"","length":0},"response":{"time":"2024-08-20T08:55:27.326402-03:00","latency":13343208,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:31.756765-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:31.754348-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63774","referer":"","length":0},"response":{"time":"2024-08-20T08:55:31.756753-03:00","latency":2404750,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:34.091845-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:33.086273-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63775","referer":"","length":0},"response":{"time":"2024-08-20T08:55:34.091812-03:00","latency":1005580625,"status":200,"length":71},"id":""}
{"time":"2024-08-20T08:55:37.386512-03:00","level":"INFO","msg":"Fallback executed result"}
{"time":"2024-08-20T08:55:37.386553-03:00","level":"INFO","msg":"200: OK","request":{"time":"2024-08-20T08:55:37.38415-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:63777","referer":"","length":0},"response":{"time":"2024-08-20T08:55:37.386544-03:00","latency":2393916,"status":200,"length":76},"id":""}

In the next step, we will combine the concepts we've seen to create a more resilient application.

Policy composition

To do this, we need to change the code of serviceA so that it makes use of the policies we have seen so far:

package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "net/http"
    "os"
    "time"

    "github.com/failsafe-go/failsafe-go"
    "github.com/failsafe-go/failsafe-go/circuitbreaker"
    "github.com/failsafe-go/failsafe-go/failsafehttp"
    "github.com/failsafe-go/failsafe-go/fallback"
    "github.com/failsafe-go/failsafe-go/retrypolicy"
    "github.com/failsafe-go/failsafe-go/timeout"
    "github.com/go-chi/chi/v5"
    slogchi "github.com/samber/slog-chi"
)

func main() {
    logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
    r := chi.NewRouter()
    r.Use(slogchi.New(logger))
    r.Get("/", func(w http.ResponseWriter, r *http.Request) {
        type response struct {
            Message string `json:"message"`
        }
        retryPolicy := newRetryPolicy(logger)
        fallback := newFallback(logger)
        circuitBreaker := newCircuitBreaker(logger)
        timeout := newTimeout(logger)

        roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)
        client := &http.Client{Transport: roundTripper}

        sendGet := func() (*http.Response, error) {
            resp, err := client.Get("http://localhost:3001")
            return resp, err
        }
        maxRetries := 3
        resp, err := sendGet()
        for i := 0; i < maxRetries; i++ {
            if err == nil && resp != nil && resp.StatusCode != http.StatusServiceUnavailable && resp.StatusCode != http.StatusTooManyRequests {
                break
            }
            time.Sleep(circuitBreaker.RemainingDelay()) // Wait for circuit breaker's delay, provided by the Retry-After header
            resp, err = sendGet()
        }
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        body, err := io.ReadAll(resp.Body)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        defer resp.Body.Close()
        var data response
        err = json.Unmarshal(body, &data)
        if err != nil {
            w.WriteHeader(http.StatusInternalServerError)
            w.Write([]byte(err.Error()))
            return
        }
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"messageA": "hello from service A","messageB": "` + data.Message + `"}`))
    })
    http.ListenAndServe(":3000", r)
}

func newTimeout(logger *slog.Logger) timeout.Timeout[*http.Response] {
    return timeout.Builder[*http.Response](10 * time.Second).
        OnTimeoutExceeded(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Connection timed out")
        }).Build()
}

func newFallback(logger *slog.Logger) fallback.Fallback[*http.Response] {
    resp := &http.Response{
        StatusCode: http.StatusOK,
        Header:     map[string][]string{"Content-Type": {"application/json"}},
        Body:       io.NopCloser(bytes.NewBufferString(`{"message": "error accessing service B"}`)),
    }
    return fallback.BuilderWithResult[*http.Response](resp).
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        OnFallbackExecuted(func(e failsafe.ExecutionDoneEvent[*http.Response]) {
            logger.Info("Fallback executed result")
        }).
        Build()
}

func newRetryPolicy(logger *slog.Logger) retrypolicy.RetryPolicy[*http.Response] {
    return retrypolicy.Builder[*http.Response]().
        HandleIf(func(response *http.Response, _ error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithBackoff(time.Second, 10*time.Second).
        OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[*http.Response]) {
            logger.Info(fmt.Sprintf("Retry %d after delay of %d", e.Attempts(), e.Delay))
        }).Build()
}

func newCircuitBreaker(logger *slog.Logger) circuitbreaker.CircuitBreaker[*http.Response] {
    return circuitbreaker.Builder[*http.Response]().
        HandleIf(func(response *http.Response, err error) bool {
            return response != nil && response.StatusCode == http.StatusServiceUnavailable
        }).
        WithDelayFunc(failsafehttp.DelayFunc).
        OnStateChanged(func(event circuitbreaker.StateChangedEvent) {
            logger.Info(fmt.Sprintf("circuit breaker state changed from %s to %s", event.OldState.String(), event.NewState.String()))
        }).
        Build()
}


In the code:

roundTripper := failsafehttp.NewRoundTripper(nil, fallback, retryPolicy, circuitBreaker, timeout)

It is possible to view the use of all defined policies. The lib will execute it in the "rightmost" order, that is:

timeout -> circuitBreaker -> retryPolicy -> fallback

We can see the execution of the policies by observing the serviceA output:

go run main.go
{"time":"2024-08-19T10:15:29.226553-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:29.226841-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:30.227941-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:30.234182-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:30.234258-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:32.235282-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:42.23622-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:15:42.237942-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to closed"}
{"time":"2024-08-19T10:15:42.238043-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:29.215709-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52527","referer":"","length":0},"response":{"time":"2024-08-19T10:15:42.238008-03:00","latency":13022704750,"status":500,"length":45},"id":""}
{"time":"2024-08-19T10:15:56.53476-03:00","level":"INFO","msg":"circuit breaker state changed from closed to open"}
{"time":"2024-08-19T10:15:56.534803-03:00","level":"INFO","msg":"Retry 1 after delay of 1000000000"}
{"time":"2024-08-19T10:15:57.535108-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:57.53889-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:57.538911-03:00","level":"INFO","msg":"Retry 2 after delay of 2000000000"}
{"time":"2024-08-19T10:15:59.539948-03:00","level":"INFO","msg":"circuit breaker state changed from open to half-open"}
{"time":"2024-08-19T10:15:59.544425-03:00","level":"INFO","msg":"circuit breaker state changed from half-open to open"}
{"time":"2024-08-19T10:15:59.544575-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:15:56.5263-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52542","referer":"","length":0},"response":{"time":"2024-08-19T10:15:59.544557-03:00","latency":3018352000,"status":500,"length":245},"id":""}
{"time":"2024-08-19T10:16:11.044207-03:00","level":"INFO","msg":"Connection timed out"}
{"time":"2024-08-19T10:16:11.046026-03:00","level":"ERROR","msg":"500: Internal Server Error","request":{"time":"2024-08-19T10:16:01.043317-03:00","method":"GET","host":"localhost:3000","path":"/","query":"","params":{},"route":"/","ip":"[::1]:52544","referer":"","length":0},"response":{"time":"2024-08-19T10:16:11.045601-03:00","latency":10002596334,"status":500,"length":45},"id":""}

Conclusion

One of the advantages of microservices architecture is that we can break a complex domain into smaller, specialized services that communicate with each other to complete the necessary logic. Ensuring that this communication is resilient and will continue to work even in the face of failures and unforeseen events is fundamental. Using libraries such as failsafe-go makes this process easier.

You can find the codes presented in this post on my Github.

Originally published at https://eltonminetto.dev on August 24, 2024

위 내용은 Failsafe-go lib를 사용하는 마이크로서비스 간 통신의 복원력의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.