Maison  >  Article  >  Java  >  Comment implémenter une interrogation longue et simple en Java ?

Comment implémenter une interrogation longue et simple en Java ?

王林
王林avant
2023-04-22 23:22:072283parcourir

Analyser la méthode d'implémentation de l'interrogation longue

De nos jours, tous les principaux middlewares utilisent la méthode d'interaction des données d'interrogation longue. Actuellement, les plus populaires sont le centre de configuration Nacos, les messages RocketMQ Pull (mode pull), etc., ils utilisent tous Implémenté en utilisant. méthode de sondage longue. Par exemple, dans le centre de configuration Nacos, comment le serveur peut-il détecter les modifications de configuration et les transmettre au client en temps réel ?

Sondage long et sondage court

En parlant de sondage long, il doit y avoir quelque chose de contraire. Appelons-le sondage court pour l'instant. Présentons brièvement le sondage court :

Sondage court aussi le mode pull. Cela signifie que, que les données du serveur soient mises à jour ou non, le client demande d'extraire les données à chaque période de temps fixe. Des données mises à jour peuvent être renvoyées, ou rien. Si le centre de configuration utilise cette méthode, il y aura les problèmes suivants :

Étant donné que les données de configuration ne changent pas fréquemment, si des requêtes sont envoyées tout le temps, cela mettra inévitablement beaucoup de pression sur le serveur. Cela entraînera également des retards dans la transmission des données, par exemple : une configuration est demandée toutes les 10 secondes. Si la configuration est mise à jour à la 11ème seconde, la transmission sera retardée de 9 secondes, en attendant la prochaine demande

ne peut pas être équilibrée ; délai de poussée et pression du serveur neutraliser entre . Réduisez l'intervalle d'interrogation, le délai diminuera et la pression augmentera ; augmentez l'intervalle d'interrogation, la pression diminuera et le délai augmentera.

Interrogation longueAfin de résoudre le problème de l'interrogation courte, le client lance une interrogation longue si les données sur le serveur ne changent pas, la demande sera conservée jusqu'à ce que les données sur le serveur changent, ou il attend une interrogation longue. une certaine période de temps pour revenir. Après son retour, le client lance la prochaine longue demande d'interrogation pour écouter.

Avantages de cette conception :

  • Par rapport à une faible latence, le client lance une longue interrogation et le serveur peut immédiatement renvoyer une réponse au client après avoir détecté que les données ont changé.

  • La pression sur le serveur est réduite et le client lance une longue interrogation. Si les données n'ont pas changé, le serveur conservera la demande du client. Le temps de demande de mise en attente est généralement fixé à 30 s ou 60 s. La conservation de la requête ne consommera pas trop de ressources côté serveur.

Les images suivantes sont utilisées pour illustrer le processus :

Comment implémenter une interrogation longue et simple en Java ?

  • Tout d'abord, le client lance une longue demande d'interrogation et le serveur reçoit la demande du client. À ce moment, la demande du client sera suspendue. S'il est conçu côté serveur, s'il n'y a aucun changement dans les 30 secondes, le serveur répondra au client que les données n'ont pas changé et le client continuera à envoyer des requêtes.

  • Si les données du service changent dans les 30 secondes, le serveur transmettra les données modifiées au client.

Conception d'interrogation longue du centre de configuration

Comment implémenter une interrogation longue et simple en Java ?

Nous avons présenté toute l'idée ci-dessus, implémentons-la avec du code :

  • Tout d'abord, le client envoie une requête HTTP au serveur, le serveur ouvrira un fichier asynchrone ; threads, s'il n'y a aucune modification de données, la requête en cours sera suspendue (un Tomcat n'a que 200 threads, et une interrogation longue ne devrait pas bloquer les threads métier de Tomcat, donc le centre de configuration utilise souvent des réponses asynchrones lors de la mise en œuvre d'une interrogation longue) Pour implémenter, et le moyen le plus pratique d'implémenter HTTP asynchrone est le Mécanisme AsyncContext fourni par Servlet3.0)

  • S'il n'y a toujours pas de changement de données dans le délai d'expiration défini par le serveur, renvoyez au client qu'il n'y a pas de changement .logo. Par exemple, répondez au code d'état 304 ;

  • S'il y a une modification des données dans le délai d'attente défini par le serveur, le contenu modifié par le client sera renvoyé

Mise en œuvre de l'interrogation longue du centre de configuration

 ; Le code suivant est utilisé pour implémenter une interrogation longue :

Implémentation du client

 @Slf4j
 public class ConfigClientWorker {
 
     private final CloseableHttpClient httpClient;
 
     private final ScheduledExecutorService executorService;
 
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 
         // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 
     class LongPollingRunnable implements Runnable {
 
         private final String url;
         private final String dataId;
 
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 响应码标记配置未变更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 
     public static void main(String[] args) throws IOException {
 
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }
  • httpClient Le délai d'expiration du client doit être supérieur au délai d'expiration convenu par l'interrogation longue, sinon le client lui-même expirera avant le retour du serveur.

  • La configuration de la balise du code de réponse 304 n'a pas changé ;

  • http://127.0.0.1:8080/listener est l'adresse du serveur

Implémentation du serveur

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 
     @Data
     private static class AsyncTask {
         // 长轮询请求的上下文,包含请求和响应体
         private AsyncContext asyncContext;
         // 超时标记
         private boolean timeout;
 
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 
     // guava 提供的多值 Map,一个 key 可以对应多个 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
     private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
             .build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 
     // 配置监听接入点
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
 
         String dataId = request.getParameter("dataId");
 
         // 开启异步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 
         // 维护 dataId 和异步请求上下文的关联
         dataIdContext.put(dataId, asyncTask);
 
         // 启动定时器,30s 后写入 304 响应
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 标志此次异步线程完成结束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 
     // 配置发布接入点
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }
  • Le client la demande en premier ; Démarrez un thread asynchrone request.startAsync(request, réponse); pour vous assurer que le thread Tomcat n'est pas occupé. A ce moment, le thread Tomcat est publié. Utilisé avec asyncContext.complete(). request.startAsync(request, response);保证不占用Tomcat线程。此时Tomcat线程以及释放。配合asyncContext.complete()使用。

  • dataIdContext.put(dataId, asyncTask);会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文

  • Multimap<string asynctask> dataIdContext</string>它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为 Map<string>></string>

  • timeoutChecker.schedule()

    🎜🎜dataIdContext.put(dataId, asyncTask); associera dataId au contexte de requête asynchrone, afin que vous puissiez obtenir le contexte correspondant lors de la configuration et de la publication🎜🎜🎜🎜Multimap dataIdContextC'est une carte à valeurs multiples. Une clé peut correspondre à plusieurs valeurs. Vous pouvez également la comprendre comme Map<string>></string>🎜🎜. 🎜🎜 timeoutChecker.schedule() démarre le minuteur et écrit une réponse 304 après 30 secondes🎜
  • @RequestMapping("/publishConfig") ,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。

  • asyncTask.getAsyncContext().complete();表示这次异步请求结束了。

启动配置监听

先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码

 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

服务端打印日志:

 2022-08-25 16:45:56.663  INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer         : publish configInfo dataId: [user], configInfo: helloworld

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer