요즘 주요 미들웨어는 모두 롱 폴링 데이터 상호작용 방법을 사용하고 있으며, 현재 더 많이 사용되는 것은 Nacos Configuration Center, RocketMQ Pull(풀 모드) 메시지 등을 사용하여 구현됩니다. 긴 폴링 방법. 예를 들어, Nacos 구성 센터에서 서버는 어떻게 구성 변경을 감지하고 이를 실시간으로 클라이언트에 푸시할 수 있습니까?
롱 폴링이라고 하면 그 반대도 있겠네요. 일단은 쇼트 폴링이라고 부르겠습니다.
숏 폴링도 풀 모드입니다. 이는 서버 데이터의 업데이트 여부에 관계없이 클라이언트가 일정 기간마다 데이터 가져오기를 요청하면 업데이트된 데이터가 반환될 수도 있고 아무것도 없을 수도 있음을 의미합니다. 구성 센터에서 이 방법을 사용하면 다음과 같은 문제가 발생합니다.
구성 데이터가 자주 변경되지 않기 때문에 항상 요청이 전송되면 필연적으로 서버에 많은 부담을 주게 됩니다. 또한 데이터 푸시가 지연될 수 있습니다. 예를 들어 구성이 11초마다 업데이트되면 푸시가 9초 지연되어 다음 요청을 기다리게 됩니다.
균형을 맞출 수 없습니다. 푸시 지연과 서버 압력을 중화하세요. 폴링 간격을 낮추면 지연이 감소하고 압력이 증가합니다. 폴링 간격을 늘리면 압력이 감소하고 지연이 증가합니다.
Long Polling Short Polling 문제를 해결하기 위해 클라이언트는 Long Polling을 시작합니다. 서버의 데이터가 변경되지 않으면 서버의 데이터가 변경될 때까지 요청을 보류하거나 대기합니다. 일정 시간이 지나면 반환됩니다. 반환 후 클라이언트는 수신 대기를 위한 다음 긴 폴링 요청을 시작합니다.
이 디자인의 이점:AsyncContext 메커니즘입니다.
@Slf4j public class ConfigClientWorker { private final CloseableHttpClient httpClient; private final ScheduledExecutorService executorService; public ConfigClientWorker(String url, String dataId) { this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor-%d"); thread.setDaemon(true); return thread; }); // ① httpClient 客户端超时时间要大于长轮询约定的超时时间 RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build(); this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); executorService.execute(new LongPollingRunnable(url, dataId)); } class LongPollingRunnable implements Runnable { private final String url; private final String dataId; public LongPollingRunnable(String url, String dataId) { this.url = url; this.dataId = dataId; } @SneakyThrows @Override public void run() { String endpoint = url + "?dataId=" + dataId; log.info("endpoint: {}", endpoint); HttpGet request = new HttpGet(endpoint); CloseableHttpResponse response = httpClient.execute(request); switch (response.getStatusLine().getStatusCode()) { case 200: { BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity() .getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } response.close(); String configInfo = result.toString(); log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo); break; } // ② 304 响应码标记配置未变更 case 304: { log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId); break; } default: { throw new RuntimeException("unExcepted HTTP status code"); } } executorService.execute(this); } } public static void main(String[] args) throws IOException { new ConfigClientWorker("http://127.0.0.1:8080/listener", "user"); System.in.read(); } }
@RestController @Slf4j @SpringBootApplication public class ConfigServer { @Data private static class AsyncTask { // 长轮询请求的上下文,包含请求和响应体 private AsyncContext asyncContext; // 超时标记 private boolean timeout; public AsyncTask(AsyncContext asyncContext, boolean timeout) { this.asyncContext = asyncContext; this.timeout = timeout; } } // guava 提供的多值 Map,一个 key 可以对应多个 value private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create()); private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d") .build(); private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory); // 配置监听接入点 @RequestMapping("/listener") public void addListener(HttpServletRequest request, HttpServletResponse response) { String dataId = request.getParameter("dataId"); // 开启异步!!! AsyncContext asyncContext = request.startAsync(request, response); AsyncTask asyncTask = new AsyncTask(asyncContext, true); // 维护 dataId 和异步请求上下文的关联 dataIdContext.put(dataId, asyncTask); // 启动定时器,30s 后写入 304 响应 timeoutChecker.schedule(() -> { if (asyncTask.isTimeout()) { dataIdContext.remove(dataId, asyncTask); response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); // 标志此次异步线程完成结束!!! asyncContext.complete(); } }, 30000, TimeUnit.MILLISECONDS); } // 配置发布接入点 @RequestMapping("/publishConfig") @SneakyThrows public String publishConfig(String dataId, String configInfo) { log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo); Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId); for (AsyncTask asyncTask : asyncTasks) { asyncTask.setTimeout(false); HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse(); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(configInfo); asyncTask.getAsyncContext().complete(); } return "success"; } public static void main(String[] args) { SpringApplication.run(ConfigServer.class, args); } }
request.startAsync(request, response);
를 시작하여 Tomcat 스레드가 사용되지 않는지 확인하세요. 이때 Tomcat 스레드가 릴리스됩니다. asyncContext.complete()
와 함께 사용됩니다. request.startAsync(request, response);
保证不占用Tomcat线程。此时Tomcat线程以及释放。配合asyncContext.complete()
使用。
dataIdContext.put(dataId, asyncTask);
会将 dataId 和异步请求上下文给关联起来,方便配置发布时,拿到对应的上下文
Multimap<string asynctask> dataIdContext</string>
它是一个多值 Map,一个 key 可以对应多个 value,你也可以理解为 Map<string>></string>
timeoutChecker.schedule()
dataIdContext.put(dataId, asyncTask);
는 dataId를 비동기 요청 컨텍스트와 연결하므로 구성 및 게시 시 해당 컨텍스트를 얻을 수 있습니다🎜🎜🎜🎜Multimap dataIdContext
다중 값 Map입니다. 하나의 키가 여러 값에 대응할 수도 있습니다. Map<string>></string>
🎜🎜 🎜🎜 timeoutChecker.schedule()
은 타이머를 시작하고 30초 후에 304 응답을 작성합니다🎜@RequestMapping("/publishConfig")
,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。
asyncTask.getAsyncContext().complete();
表示这次异步请求结束了。
启动配置监听
先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码
16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld
服务端打印日志:
2022-08-25 16:45:56.663 INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld
위 내용은 Java에서 간단한 긴 폴링을 구현하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!