現在、主要なミドルウェアはすべてロング ポーリング データ インタラクション方式を採用していますが、現在よく使われているのは Nacos コンフィギュレーション センターや RocketMQ Pull (プル モード) メッセージなどです。 、それらはすべてロングポーリング方式を使用して実装されます。たとえば、Nacos 構成センターでは、サーバーはどのようにして構成の変更を感知し、リアルタイムでクライアントにプッシュできるのでしょうか?
ロング ポーリングと言えば、その逆もあるはずです。ここではショート ポーリングと呼びましょう。ショート ポーリングについて簡単に紹介します。
ショートポーリングもプルモードです。これは、サーバーのデータが更新されているかどうかに関係なく、クライアントは一定期間ごとにデータのプルを要求することを意味し、更新されたデータが返される場合もあれば、何も返されない場合もあります。構成センターがこの方法を使用すると、次のような問題が発生します。
構成データは頻繁には変更されないため、常にリクエストが送信されると、必然的にサーバーに大きな負荷がかかります。また、データのプッシュにも遅延が発生します (例: 構成は 10 秒ごとに要求されます。構成が 11 秒目に更新される場合、プッシュは 9 秒遅れ、次の要求を待機します。
#)## は、サーバーの圧力と 2 つの間のプッシュ Neutralize で遅延することはできません。ポーリング間隔を短くすると、遅延が減少し、圧力が増加します。ポーリング間隔を増加すると、圧力が減少し、遅延が増加します。
ロング ポーリングショート ポーリングの問題を解決するために、クライアントはロング ポーリングを開始します。サーバー上のデータが変更されない場合、リクエストはサーバー上のデータが変更されるまで保留されます。サーバーが変更されるか、一定時間待ってから戻ってください。戻った後、クライアントはリッスンする次のロング ポーリング リクエストを開始します。
この設計の利点:アイデア全体を上で紹介しました。コードで実装してみましょう:
AsyncContext メカニズムです。)
@Slf4j public class ConfigClientWorker { private final CloseableHttpClient httpClient; private final ScheduledExecutorService executorService; public ConfigClientWorker(String url, String dataId) { this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor-%d"); thread.setDaemon(true); return thread; }); // ① httpClient 客户端超时时间要大于长轮询约定的超时时间 RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build(); this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); executorService.execute(new LongPollingRunnable(url, dataId)); } class LongPollingRunnable implements Runnable { private final String url; private final String dataId; public LongPollingRunnable(String url, String dataId) { this.url = url; this.dataId = dataId; } @SneakyThrows @Override public void run() { String endpoint = url + "?dataId=" + dataId; log.info("endpoint: {}", endpoint); HttpGet request = new HttpGet(endpoint); CloseableHttpResponse response = httpClient.execute(request); switch (response.getStatusLine().getStatusCode()) { case 200: { BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity() .getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } response.close(); String configInfo = result.toString(); log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo); break; } // ② 304 响应码标记配置未变更 case 304: { log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId); break; } default: { throw new RuntimeException("unExcepted HTTP status code"); } } executorService.execute(this); } } public static void main(String[] args) throws IOException { new ConfigClientWorker("http://127.0.0.1:8080/listener", "user"); System.in.read(); } }
304 応答コード タグの構成は変更されていません;
http://127.0.0.1:8080/listener はサーバー アドレスです。
@RestController @Slf4j @SpringBootApplication public class ConfigServer { @Data private static class AsyncTask { // 长轮询请求的上下文,包含请求和响应体 private AsyncContext asyncContext; // 超时标记 private boolean timeout; public AsyncTask(AsyncContext asyncContext, boolean timeout) { this.asyncContext = asyncContext; this.timeout = timeout; } } // guava 提供的多值 Map,一个 key 可以对应多个 value private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create()); private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d") .build(); private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory); // 配置监听接入点 @RequestMapping("/listener") public void addListener(HttpServletRequest request, HttpServletResponse response) { String dataId = request.getParameter("dataId"); // 开启异步!!! AsyncContext asyncContext = request.startAsync(request, response); AsyncTask asyncTask = new AsyncTask(asyncContext, true); // 维护 dataId 和异步请求上下文的关联 dataIdContext.put(dataId, asyncTask); // 启动定时器,30s 后写入 304 响应 timeoutChecker.schedule(() -> { if (asyncTask.isTimeout()) { dataIdContext.remove(dataId, asyncTask); response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); // 标志此次异步线程完成结束!!! asyncContext.complete(); } }, 30000, TimeUnit.MILLISECONDS); } // 配置发布接入点 @RequestMapping("/publishConfig") @SneakyThrows public String publishConfig(String dataId, String configInfo) { log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo); Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId); for (AsyncTask asyncTask : asyncTasks) { asyncTask.setTimeout(false); HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse(); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(configInfo); asyncTask.getAsyncContext().complete(); } return "success"; } public static void main(String[] args) { SpringApplication.run(ConfigServer.class, args); } }
dataIdContext.put(dataId, asyncTask);
##Multimap<string asynctask> dataIdContext</string>
これは複数値の Map です。1 つのキーは複数の値に対応できます。
timeoutChecker.schedule()
タイマーを開始し、30 秒後に 304 応答を書き込みます
@RequestMapping("/publishConfig")
,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。
asyncTask.getAsyncContext().complete();
表示这次异步请求结束了。
启动配置监听
先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码
16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld
服务端打印日志:
2022-08-25 16:45:56.663 INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld
以上がJavaで単純なロングポーリングを実装するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。