Rumah >Java >javaTutorial >Bagaimana untuk melaksanakan pengundian panjang mudah di Jawa?
Kini, semua perisian tengah utama menggunakan kaedah interaksi data tinjauan panjang Pada masa ini, yang lebih popular ialah mesej pusat konfigurasi Nacos dan RocketMQ Pull (mod tarik). , semuanya dilaksanakan menggunakan kaedah pengundian panjang. Contohnya, dalam pusat konfigurasi Nacos, bagaimanakah konfigurasi deria pelayan boleh berubah dan menolaknya kepada klien dalam masa nyata?
Bercakap tentang tinjauan panjang, mesti ada sebaliknya >Pengundian ringkas
juga merupakan mod tarik. Ini bermakna tidak kira sama ada data pelayan dikemas kini atau tidak, pelanggan meminta untuk menarik data setiap tempoh masa yang tetap mungkin ada data yang dikemas kini yang dikembalikan, atau mungkin tiada apa-apa. Jika pusat konfigurasi menggunakan kaedah ini, masalah berikut akan berlaku:Memandangkan data konfigurasi tidak kerap berubah, jika permintaan dihantar sepanjang masa, ia pasti akan memberi banyak tekanan pada pelayan. Ia juga akan menyebabkan kelewatan dalam menolak data, contohnya: konfigurasi diminta setiap 10 saat Jika konfigurasi dikemas kini pada saat ke-11, tolakan akan ditangguhkan selama 9 saat, menunggu permintaan seterusnya; 🎜> tidak boleh ditangguhkan dalam tolakan dan tekanan pelayan dinetralkan
. Kurangkan selang pengundian, kelewatan akan berkurangan, dan tekanan akan meningkat, tekanan akan berkurangan dan kelewatan akan meningkat.Tinjauan panjangUntuk menyelesaikan masalah undian pendek, pelanggan memulakan tinjauan panjang Jika data pada pelayan tidak berubah, permintaan akan ditahan sehingga data pada pelayan berubah , atau tunggu untuk tempoh masa tertentu sebelum kembali. Selepas kembali, pelanggan memulakan permintaan pengundian panjang seterusnya untuk mendengar.
Faedah reka bentuk ini:Berbanding dengan kependaman rendah, pelanggan memulakan tinjauan panjang dan pelayan boleh segera mengembalikan respons selepas merasakan bahawa data telah berubah pelanggan.
Pertama, pelanggan memulakan proses yang panjang permintaan pengundian Apabila pelayan menerima permintaan pelanggan, ia akan menggantung permintaan pelanggan Jika tiada perubahan dalam tempoh 30-an yang direka oleh pelayan, pelayan akan membalas semula kepada pelanggan bahawa data tidak berubah, dan pelanggan akan. terus menghantar permintaan.
mekanisme AsyncContext yang disediakan oleh Servlet3.0
.)Jika masih tiada perubahan data dalam tempoh tamat masa yang ditetapkan oleh pelayan, maka pengecam yang tidak berubah akan dikembalikan kepada klien. Contohnya, balas kod status 304;
Tamat masa klien httpClient Ia mestilah lebih besar daripada tamat masa yang dipersetujui oleh tinjauan panjang, jika tidak, pelanggan akan tamat masa sebelum pelayan kembali.
@Slf4j public class ConfigClientWorker { private final CloseableHttpClient httpClient; private final ScheduledExecutorService executorService; public ConfigClientWorker(String url, String dataId) { this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor-%d"); thread.setDaemon(true); return thread; }); // ① httpClient 客户端超时时间要大于长轮询约定的超时时间 RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build(); this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); executorService.execute(new LongPollingRunnable(url, dataId)); } class LongPollingRunnable implements Runnable { private final String url; private final String dataId; public LongPollingRunnable(String url, String dataId) { this.url = url; this.dataId = dataId; } @SneakyThrows @Override public void run() { String endpoint = url + "?dataId=" + dataId; log.info("endpoint: {}", endpoint); HttpGet request = new HttpGet(endpoint); CloseableHttpResponse response = httpClient.execute(request); switch (response.getStatusLine().getStatusCode()) { case 200: { BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity() .getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } response.close(); String configInfo = result.toString(); log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo); break; } // ② 304 响应码标记配置未变更 case 304: { log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId); break; } default: { throw new RuntimeException("unExcepted HTTP status code"); } } executorService.execute(this); } } public static void main(String[] args) throws IOException { new ConfigClientWorker("http://127.0.0.1:8080/listener", "user"); System.in.read(); } }konfigurasi teg kod respons tidak berubah; 🎜 >
Pelaksanaan sisi pelayan
@RestController @Slf4j @SpringBootApplication public class ConfigServer { @Data private static class AsyncTask { // 长轮询请求的上下文,包含请求和响应体 private AsyncContext asyncContext; // 超时标记 private boolean timeout; public AsyncTask(AsyncContext asyncContext, boolean timeout) { this.asyncContext = asyncContext; this.timeout = timeout; } } // guava 提供的多值 Map,一个 key 可以对应多个 value private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create()); private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d") .build(); private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory); // 配置监听接入点 @RequestMapping("/listener") public void addListener(HttpServletRequest request, HttpServletResponse response) { String dataId = request.getParameter("dataId"); // 开启异步!!! AsyncContext asyncContext = request.startAsync(request, response); AsyncTask asyncTask = new AsyncTask(asyncContext, true); // 维护 dataId 和异步请求上下文的关联 dataIdContext.put(dataId, asyncTask); // 启动定时器,30s 后写入 304 响应 timeoutChecker.schedule(() -> { if (asyncTask.isTimeout()) { dataIdContext.remove(dataId, asyncTask); response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); // 标志此次异步线程完成结束!!! asyncContext.complete(); } }, 30000, TimeUnit.MILLISECONDS); } // 配置发布接入点 @RequestMapping("/publishConfig") @SneakyThrows public String publishConfig(String dataId, String configInfo) { log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo); Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId); for (AsyncTask asyncTask : asyncTasks) { asyncTask.setTimeout(false); HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse(); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(configInfo); asyncTask.getAsyncContext().complete(); } return "success"; } public static void main(String[] args) { SpringApplication.run(ConfigServer.class, args); } }
Ia ialah Peta berbilang nilai Satu kunci boleh sepadan dengan berbilang nilai Anda juga boleh memahaminya sebagai request.startAsync(request, response);
asyncContext.complete()
Mulakan pemasa dan tulis 304. tindak balas selepas 30 saatdataIdContext.put(dataId, asyncTask);
@RequestMapping("/publishConfig")
,配置发布的入口。配置变更后,根据 dataId 一次拿出所有的长轮询,为之写入变更的响应。
asyncTask.getAsyncContext().complete();
表示这次异步请求结束了。
启动配置监听
先启动 ConfigServer,再启动 ConfigClient。30s之后控制台打印第一次超时之后收到服务端304的状态码
16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
请求一下配置发布,请求localhost:8080/publishConfig?dataId=user&configInfo=helloworld
服务端打印日志:
2022-08-25 16:45:56.663 INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan pengundian panjang mudah di Jawa?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!