Home  >  Article  >  Java  >  How to implement simple long polling in Java?

How to implement simple long polling in Java?

王林
王林forward
2023-04-22 23:22:072282browse

Analyze the implementation method of long polling

Nowadays, all major middlewares use the long polling data interaction method. Currently, the more popular ones are Nacos configuration center and RocketMQ Pull (pull mode) messages. etc., they are all implemented using the long polling method. For example, in the Nacos configuration center, how can the server sense configuration changes and push them to the client in real time?

Long polling and short polling

Speaking of long polling, there must be its opposite. Let’s call it short polling for now. Let’s briefly introduce short polling:

Short polling is also a pull mode. It means that regardless of whether the server data has been updated or not, the client requests to pull data every fixed period of time. There may be updated data returned, or there may be nothing. If the configuration center uses this method, there will be the following problems:

Since the configuration data does not change frequently, if requests are sent all the time, it will inevitably put a lot of pressure on the server. It will also cause a delay in pushing data, for example: the configuration is requested every 10 seconds. If the configuration is updated at the 11th second, the push will be delayed by 9 seconds, waiting for the next request;

cannot be delayed in the push Neutralize between the two and server pressure. Lower the polling interval, the delay will decrease, and the pressure will increase; increase the polling interval, the pressure will decrease, and the delay will increase.

Long pollingIn order to solve the problem of short polling, the client initiates long polling. If the data on the server does not change, the request will be held until the data on the server changes. , or wait for a certain period of time before returning. After returning, the client initiates the next long polling request to listen.

The benefits of this design:

  • Compared with low latency, the client initiates long polling, and the server can immediately return a response after sensing that the data has changed. client.

  • The pressure on the server is reduced, and the client initiates a long poll. If the data does not change, the server will hold the client's request. The time to hold the request is generally set. to 30s or 60s, and the server holds the request without consuming too many server resources.

The following pictures are used to illustrate the process:

How to implement simple long polling in Java?

  • First, the client initiates a long polling request. When the server receives the client's request, it will suspend the client's request. If there is no change within the 30s designed by the server, the server will respond back to the client that the data has not changed, and the client will continue to send requests.

  • If the service data changes within 30s, the server will push the changed data to the client.

Configuration center long polling design

How to implement simple long polling in Java?

We have introduced the whole idea above, let’s implement it with code:

  • First the client sends an HTTP request to the server; the server will open an asynchronous thread. If there is no data change, the current request will be suspended (a Tomcat has 200 threads, a long round Querying should not block Tomcat's business thread, so the configuration center often uses asynchronous response when implementing long polling. A common way to implement asynchronous HTTP more conveniently is the AsyncContext mechanism provided by Servlet3.0 .)

  • If there is still no data change within the timeout period set by the server, then an unchanged identifier will be returned to the client. For example, respond to the 304 status code;

  • If there is a data change within the timeout period set by the server, the content changed by the client will be returned;

Configuration center long polling implementation

The following code is used to implement long polling:

Client implementation

 @Slf4j
 public class ConfigClientWorker {
 
     private final CloseableHttpClient httpClient;
 
     private final ScheduledExecutorService executorService;
 
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 
         // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 
     class LongPollingRunnable implements Runnable {
 
         private final String url;
         private final String dataId;
 
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 响应码标记配置未变更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 
     public static void main(String[] args) throws IOException {
 
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }
  • httpClient client timeout should be greater than The timeout period agreed upon by long polling, otherwise the client will time out before the server returns.

  • 304 response code tag configuration has not changed;

  • http://127.0.0.1:8080/listener is the server address;

Server-side implementation

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 
     @Data
     private static class AsyncTask {
         // 长轮询请求的上下文,包含请求和响应体
         private AsyncContext asyncContext;
         // 超时标记
         private boolean timeout;
 
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 
     // guava 提供的多值 Map,一个 key 可以对应多个 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
     private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
             .build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 
     // 配置监听接入点
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
 
         String dataId = request.getParameter("dataId");
 
         // 开启异步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 
         // 维护 dataId 和异步请求上下文的关联
         dataIdContext.put(dataId, asyncTask);
 
         // 启动定时器,30s 后写入 304 响应
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 标志此次异步线程完成结束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 
     // 配置发布接入点
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }
  • When the client requests it, first open an asynchronous threadrequest.startAsync(request, response); Ensure that the Tomcat thread is not occupied. At this time the Tomcat thread is released. Used with asyncContext.complete().

  • dataIdContext.put(dataId, asyncTask);will associate dataId with the asynchronous request context to facilitate configuration and release, and get the corresponding context

  • ##Multimap dataIdContextIt is a multi-value Map. One key can correspond to multiple values. You can also understand it as Map>

  • timeoutChecker.schedule() Start the timer and write a 304 response after 30s

  • @RequestMapping("/publishConfig") ,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。

  • asyncTask.getAsyncContext().complete();表示这次异步请求结束了。

启动配置监听

先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码

 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

服务端打印日志:

 2022-08-25 16:45:56.663  INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer         : publish configInfo dataId: [user], configInfo: helloworld

The above is the detailed content of How to implement simple long polling in Java?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete