>  기사  >  Java  >  Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?

Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?

王林
王林앞으로
2023-04-22 23:22:072283검색

롱 폴링 구현 방법 분석

요즘 주요 미들웨어는 모두 롱 폴링 데이터 상호작용 방법을 사용하고 있으며, 현재 더 많이 사용되는 것은 Nacos Configuration Center, RocketMQ Pull(풀 모드) 메시지 등을 사용하여 구현됩니다. 긴 폴링 방법. 예를 들어, Nacos 구성 센터에서 서버는 어떻게 구성 변경을 감지하고 이를 실시간으로 클라이언트에 푸시할 수 있습니까?

롱 폴링과 쇼트 폴링

롱 폴링이라고 하면 그 반대도 있겠네요. 일단은 쇼트 폴링이라고 부르겠습니다.

숏 폴링도 풀 모드입니다. 이는 서버 데이터의 업데이트 여부에 관계없이 클라이언트가 일정 기간마다 데이터 가져오기를 요청하면 업데이트된 데이터가 반환될 수도 있고 아무것도 없을 수도 있음을 의미합니다. 구성 센터에서 이 방법을 사용하면 다음과 같은 문제가 발생합니다.

구성 데이터가 자주 변경되지 않기 때문에 항상 요청이 전송되면 필연적으로 서버에 많은 부담을 주게 됩니다. 또한 데이터 푸시가 지연될 수 있습니다. 예를 들어 구성이 11초마다 업데이트되면 푸시가 9초 지연되어 다음 요청을 기다리게 됩니다.

균형을 맞출 수 없습니다. 푸시 지연과 서버 압력을 중화하세요. 폴링 간격을 낮추면 지연이 감소하고 압력이 증가합니다. 폴링 간격을 늘리면 압력이 감소하고 지연이 증가합니다.

Long Polling Short Polling 문제를 해결하기 위해 클라이언트는 Long Polling을 시작합니다. 서버의 데이터가 변경되지 않으면 서버의 데이터가 변경될 때까지 요청을 보류하거나 대기합니다. 일정 시간이 지나면 반환됩니다. 반환 후 클라이언트는 수신 대기를 위한 다음 긴 폴링 요청을 시작합니다.

이 디자인의 이점:

  • 낮은 대기 시간에 비해 클라이언트는 긴 폴링을 시작하고 서버는 데이터가 변경되었음을 감지한 후 즉시 클라이언트에 응답을 반환할 수 있습니다.

  • 서버에 대한 부담이 줄어들고 클라이언트는 긴 폴링을 시작합니다. 데이터가 변경되지 않은 경우 서버는 클라이언트의 요청을 보류하며 일반적으로 보류 요청 시간은 30초 또는 60초로 설정됩니다. 요청을 보류해도 서버 측 리소스가 너무 많이 소모되지는 않습니다.

다음 그림은 프로세스를 설명하는 데 사용됩니다.

Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?

  • 먼저 클라이언트가 긴 폴링 요청을 시작하고 서버가 클라이언트의 요청을 받습니다. 이때 클라이언트의 요청은 일시 중지됩니다. 서버 측에서 설계한 경우 30초 이내에 변경 사항이 없으면 서버는 클라이언트에 데이터가 변경되지 않았다고 응답하고 클라이언트는 계속해서 요청을 보냅니다.

  • 서비스 데이터가 30초 이내에 변경되면 서버는 변경된 데이터를 클라이언트에 푸시합니다.

구성 센터 긴 폴링 디자인

Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?

위의 전체 아이디어를 소개했습니다. 코드로 구현해 보겠습니다.

  • 먼저 클라이언트가 서버에 HTTP 요청을 보냅니다. 데이터 변경 사항이 없으면 현재 요청이 일시 중단됩니다(Tomcat에는 200개의 스레드만 있고 긴 폴링은 Tomcat의 비즈니스 스레드를 차단해서는 안 되므로 구성 센터는 긴 폴링을 구현할 때 비동기 응답을 사용하는 경우가 많습니다). 비동기 HTTP를 구현하는 더 편리한 방법은 Servlet3.0에서 제공하는

    AsyncContext 메커니즘입니다.

  • 서버가 설정한 제한 시간 내에 여전히 데이터 변경이 없으면 클라이언트에 변경 사항이 없다고 반환합니다. 심벌 마크. 예를 들어 304 상태 코드에 응답합니다.

  • 서버가 설정한 제한 시간 내에 데이터 변경이 있으면 클라이언트가 변경한 내용이 반환됩니다.

구성 센터 긴 폴링 구현

다음 코드는 긴 폴링을 구현하는 데 사용됩니다.

Clientimplementation

 @Slf4j
 public class ConfigClientWorker {
 
     private final CloseableHttpClient httpClient;
 
     private final ScheduledExecutorService executorService;
 
     public ConfigClientWorker(String url, String dataId) {
         this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
             Thread thread = new Thread(runnable);
             thread.setName("client.worker.executor-%d");
             thread.setDaemon(true);
             return thread;
         });
 
         // ① httpClient 客户端超时时间要大于长轮询约定的超时时间
         RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build();
         this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
 
         executorService.execute(new LongPollingRunnable(url, dataId));
     }
 
     class LongPollingRunnable implements Runnable {
 
         private final String url;
         private final String dataId;
 
         public LongPollingRunnable(String url, String dataId) {
             this.url = url;
             this.dataId = dataId;
         }
 
         @SneakyThrows
         @Override
         public void run() {
             String endpoint = url + "?dataId=" + dataId;
             log.info("endpoint: {}", endpoint);
             HttpGet request = new HttpGet(endpoint);
             CloseableHttpResponse response = httpClient.execute(request);
             switch (response.getStatusLine().getStatusCode()) {
                 case 200: {
                     BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity()
                             .getContent()));
                     StringBuilder result = new StringBuilder();
                     String line;
                     while ((line = rd.readLine()) != null) {
                         result.append(line);
                     }
                     response.close();
                     String configInfo = result.toString();
                     log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo);
                     break;
                 }
                 // ② 304 响应码标记配置未变更
                 case 304: {
                     log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId);
                     break;
                 }
                 default: {
                     throw new RuntimeException("unExcepted HTTP status code");
                 }
             }
             executorService.execute(this);
         }
     }
 
     public static void main(String[] args) throws IOException {
 
         new ConfigClientWorker("http://127.0.0.1:8080/listener", "user");
         System.in.read();
     }
 }

  • httpClient 클라이언트 시간 초과는 긴 폴링에서 합의된 시간 초과보다 커야 합니다. 그렇지 않으면 서버가 반환되기 전에 클라이언트 자체가 시간 초과됩니다.

  • 304 응답 코드 태그 구성은 변경되지 않았습니다.

  • http://127.0.0.1:8080/listener는 서버 주소입니다.

서버 구현

 @RestController
 @Slf4j
 @SpringBootApplication
 public class ConfigServer {
 
     @Data
     private static class AsyncTask {
         // 长轮询请求的上下文,包含请求和响应体
         private AsyncContext asyncContext;
         // 超时标记
         private boolean timeout;
 
         public AsyncTask(AsyncContext asyncContext, boolean timeout) {
             this.asyncContext = asyncContext;
             this.timeout = timeout;
         }
     }
 
     // guava 提供的多值 Map,一个 key 可以对应多个 value
     private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
     private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
             .build();
     private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
 
     // 配置监听接入点
     @RequestMapping("/listener")
     public void addListener(HttpServletRequest request, HttpServletResponse response) {
 
         String dataId = request.getParameter("dataId");
 
         // 开启异步!!!
         AsyncContext asyncContext = request.startAsync(request, response);
         AsyncTask asyncTask = new AsyncTask(asyncContext, true);
 
         // 维护 dataId 和异步请求上下文的关联
         dataIdContext.put(dataId, asyncTask);
 
         // 启动定时器,30s 后写入 304 响应
         timeoutChecker.schedule(() -> {
             if (asyncTask.isTimeout()) {
                 dataIdContext.remove(dataId, asyncTask);
                 response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
               // 标志此次异步线程完成结束!!!
                 asyncContext.complete();
             }
         }, 30000, TimeUnit.MILLISECONDS);
     }
 
     // 配置发布接入点
     @RequestMapping("/publishConfig")
     @SneakyThrows
     public String publishConfig(String dataId, String configInfo) {
         log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo);
         Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId);
         for (AsyncTask asyncTask : asyncTasks) {
             asyncTask.setTimeout(false);
             HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
             response.setStatus(HttpServletResponse.SC_OK);
             response.getWriter().println(configInfo);
             asyncTask.getAsyncContext().complete();
         }
         return "success";
     }
 
     public static void main(String[] args) {
         SpringApplication.run(ConfigServer.class, args);
     }
 }

  • 클라이언트가 먼저 요청합니다. 비동기 스레드 request.startAsync(request, response);를 시작하여 Tomcat 스레드가 사용되지 않는지 확인하세요. 이때 Tomcat 스레드가 릴리스됩니다. asyncContext.complete()와 함께 사용됩니다.

    request.startAsync(request, response);保证不占用Tomcat线程。此时Tomcat线程以及释放。配合asyncContext.complete()使用。

  • dataIdContext.put(dataId, asyncTask);会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文

  • Multimap<string asynctask> dataIdContext</string>它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为 Map<string>></string>

  • timeoutChecker.schedule()

    🎜dataIdContext.put(dataId, asyncTask);는 dataId를 비동기 요청 컨텍스트와 연결하므로 구성 및 게시 시 해당 컨텍스트를 얻을 수 있습니다🎜🎜🎜🎜Multimap dataIdContext다중 값 Map입니다. 하나의 키가 여러 값에 대응할 수도 있습니다. Map<string>></string>🎜🎜 🎜🎜 timeoutChecker.schedule()은 타이머를 시작하고 30초 후에 304 응답을 작성합니다🎜
  • @RequestMapping("/publishConfig") ,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。

  • asyncTask.getAsyncContext().complete();表示这次异步请求结束了。

启动配置监听

先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码

 16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again

请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld

服务端打印日志:

 2022-08-25 16:45:56.663  INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer         : publish configInfo dataId: [user], configInfo: helloworld

위 내용은 Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 yisu.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제