Maison >développement back-end >Golang >Synchronisez les requêtes entre deux points de terminaison de limitation de débit distincts
Dans le développement Web, nous rencontrons souvent des situations où nous devons effectuer des requêtes synchrones entre deux points de terminaison de limitation de débit distincts. À ce stade, nous devons trouver un moyen de garantir que les demandes sont envoyées dans les délais appropriés et attendre lorsque la limite de débit est atteinte. Dans cet article, l'éditeur PHP Apple présentera une solution pour vous aider à implémenter cette fonction de requête synchrone et garantir l'exactitude et la stabilité des données. Jetons un coup d'œil à la mise en œuvre spécifique de cette solution !
J'utilise des API tierces et chaque API a sa propre limite de débit. Le point de terminaison 1 a une limite de débit de 10/s et le point de terminaison 2 a une limite de débit de 20/s.
Je dois traiter les données via le point de terminaison 1 qui renverra un tableau d'objets (entre 2 et 3 000 objets). Je dois ensuite récupérer chaque objet et envoyer des données à un deuxième point de terminaison tout en respectant la limite de débit du deuxième point de terminaison.
Je prévois d'envoyer par lots 10 requêtes à la fois dans ma routine go, en m'assurant que si les 10 requêtes se terminent en
À terme, j'aimerais pouvoir limiter le nombre de réponses simultanées que chaque point de terminaison envoie en même temps. Surtout si je dois réessayer une demande qui a échoué en raison de quelque chose comme plus de 500 réponses du serveur.
Pour les besoins de la question, j'utilise des requêtes httpbin pour simuler le scénario suivant :
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "sync" "time" ) type HttpBinGetRequest struct { url string } type HttpBinGetResponse struct { Uuid string `json:"uuid"` StatusCode int } type HttpBinPostRequest struct { url string uuid string // Item to post to API } type HttpBinPostResponse struct { Data string `json:"data"` StatusCode int } func main() { // Prepare GET requests for 500 requests var requests []*HttpBinGetRequest for i := 0; i < 500; i++ { uri := "https://httpbin.org/uuid" request := &HttpBinGetRequest{ url: uri, } requests = append(requests, request) } // Create semaphore and rate limit for the GET endpoint getSemaphore := make(chan struct{}, 10) getRate := make(chan struct{}, 10) for i := 0; i < cap(getRate); i++ { getRate <- struct{}{} } go func() { // ticker corresponding to 1/10th of a second ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for range ticker.C { _, ok := <-getRate if !ok { return } } }() // Send our GET requests to obtain a random UUID var wg sync.WaitGroup for _, request := range requests { wg.Add(1) // Go func to make request and receive the response go func(r *HttpBinGetRequest) { defer wg.Done() // Check the rate limiter and block if it is empty getRate <- struct{}{} // Add a token to the semaphore getSemaphore <- struct{}{} // Remove token when function is complete defer func() { <-getSemaphore }() resp, _ := get(r) fmt.Printf("%+v\n", resp) }(request) } wg.Wait() // I need to add code that obtains the response data from the above for loop // then sends the UUID it to its own go routines for a POST request, following a similar pattern above // To not violate the rate limit of the second endpoint which is 20 calls per second // postSemaphore := make(chan struct{}, 20) // postRate := make(chan struct{}, 20) // for i := 0; i < cap(postRate); i++ { // postRate <- struct{}{} // } } func get(hbgr *HttpBinGetRequest) (*HttpBinGetResponse, error) { httpResp := &HttpBinGetResponse{} client := &http.Client{} req, err := http.NewRequest("GET", hbgr.url, nil) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println(err) fmt.Println("error getting response") return httpResp, err } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode return httpResp, nil } // Method to post data to httpbin func post(hbr *HttpBinPostRequest) (*HttpBinPostResponse, error) { httpResp := &HttpBinPostResponse{} client := &http.Client{} req, err := http.NewRequest("POST", hbr.url, bytes.NewBuffer([]byte(hbr.uuid))) if err != nil { fmt.Println("error making request") return httpResp, err } req.Header = http.Header{ "accept": {"application/json"}, } resp, err := client.Do(req) if err != nil { fmt.Println("error getting response") return httpResp, err } if resp.StatusCode == 429 { fmt.Println(resp.Header.Get("Retry-After")) } // Read Response body, err := io.ReadAll(resp.Body) if err != nil { fmt.Println("error reading response body") return httpResp, err } json.Unmarshal(body, &httpResp) httpResp.StatusCode = resp.StatusCode fmt.Printf("%+v", httpResp) return httpResp, nil }
C'est le modèle producteur/consommateur. Vous pouvez utiliser chan pour les connecter.
Pour le limiteur de débit, j'utiliserais le package golang.org/x/time/rate
.
Depuis que nous avons décidé d'utiliser le chan pour connecter les producteurs et les consommateurs, il est naturel d'envoyer les tâches ayant échoué au même chan afin que les consommateurs puissent réessayer.
J'ai résumé la logique en scheduler[t]
types. Voir la démo ci-dessous. Veuillez noter que cette démo a été écrite à la hâte et est destinée uniquement à illustrer l'idée. Pas complètement testé.
package main import ( "context" "fmt" "io" "log" "math/rand" "net/http" "net/http/httptest" "sort" "sync" "time" "golang.org/x/time/rate" ) type task[t any] struct { param t failedcount int } type scheduler[t any] struct { name string limit int maxtries int wg sync.waitgroup tasks chan task[t] action func(param t) error } // newscheduler creates a scheduler that runs the action with the specified rate limit. // it will retry the action if the action returns a non-nil error. func newscheduler[t any](name string, limit, maxtries, chansize int, action func(param t) error) *scheduler[t] { return &scheduler[t]{ name: name, limit: limit, maxtries: maxtries, tasks: make(chan task[t], chansize), action: action, } } func (s *scheduler[t]) addtask(param t) { s.wg.add(1) s.tasks <- task[t]{param: param} } func (s *scheduler[t]) retrylater(t task[t]) { s.wg.add(1) s.tasks <- t } func (s *scheduler[t]) run() { lim := rate.newlimiter(rate.limit(s.limit), 1) for t := range s.tasks { t := t if err := lim.wait(context.background()); err != nil { log.fatalf("wait: %s", err) return } go func() { defer s.wg.done() err := s.action(t.param) if err != nil { log.printf("task %s, param %v failed: %v", s.name, t.param, err) t.failedcount++ if t.failedcount == s.maxtries { log.printf("task %s, param %v failed with %d tries", s.name, t.param, s.maxtries) return } s.retrylater(t) } }() } } func (s *scheduler[t]) wait() { s.wg.wait() close(s.tasks) } func main() { s := &server{} ts := httptest.newserver(s) defer ts.close() schedulerpost := newscheduler("post", 20, 3, 1, func(param string) error { return post(fmt.sprintf("%s/%s", ts.url, param)) }) go schedulerpost.run() schedulerget := newscheduler("get", 10, 3, 1, func(param int) error { id, err := get(fmt.sprintf("%s/%d", ts.url, param)) if err != nil { return err } schedulerpost.addtask(id) return nil }) go schedulerget.run() for i := 0; i < 100; i++ { schedulerget.addtask(i) } schedulerget.wait() schedulerpost.wait() s.printstats() } func get(url string) (string, error) { resp, err := http.get(url) if err != nil { return "", err } defer resp.body.close() if resp.statuscode != 200 { return "", fmt.errorf("unexpected status code: %d", resp.statuscode) } body, err := io.readall(resp.body) if err != nil { return "", err } return string(body), nil } func post(url string) error { resp, err := http.post(url, "", nil) if err != nil { return err } defer resp.body.close() if resp.statuscode != 200 { return fmt.errorf("unexpected status code: %d", resp.statuscode) } return nil } type server struct { gmu sync.mutex gets []int64 pmu sync.mutex posts []int64 } func (s *server) servehttp(w http.responsewriter, r *http.request) { log.printf("%s: %s", r.method, r.url.path) // collect request stats. if r.method == http.methodget { s.gmu.lock() s.gets = append(s.gets, time.now().unixmilli()) s.gmu.unlock() } else { s.pmu.lock() s.posts = append(s.posts, time.now().unixmilli()) s.pmu.unlock() } n := rand.intn(1000) // simulate latency. time.sleep(time.duration(n) * time.millisecond) // simulate errors. if n%10 == 0 { w.writeheader(http.statusinternalservererror) return } if r.method == http.methodget { fmt.fprintf(w, "%s", r.url.path[1:]) return } } func (s *server) printstats() { log.printf("gets (total: %d):\n", len(s.gets)) printstats(s.gets) log.printf("posts (total: %d):\n", len(s.posts)) printstats(s.posts) } func printstats(ts []int64) { sort.slice(ts, func(i, j int) bool { return ts[i] < ts[j] }) count := 0 to := ts[0] + 1000 for i := 0; i < len(ts); i++ { if ts[i] < to { count++ } else { fmt.printf(" %d: %d\n", to, count) i-- // push back the current item count = 0 to += 1000 } } if count > 0 { fmt.printf(" %d: %d\n", to, count) } }
Le résultat ressemble à ceci :
... 2023/03/25 21:03:30 GETS (total: 112): 1679749398998: 10 1679749399998: 10 1679749400998: 10 1679749401998: 10 1679749402998: 10 1679749403998: 10 1679749404998: 10 1679749405998: 10 1679749406998: 10 1679749407998: 10 1679749408998: 10 1679749409998: 2 2023/03/25 21:03:30 POSTS (total: 111): 1679749399079: 8 1679749400079: 8 1679749401079: 12 1679749402079: 8 1679749403079: 10 1679749404079: 9 1679749405079: 9 1679749406079: 8 1679749407079: 14 1679749408079: 12 1679749409079: 9 1679749410079: 4
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!