Dans le scénario de travail, nous avons rencontré une telle exigence : mettre à jour les informations associées d'autres modèles en fonction de l'adresse IP de l'hôte. Les exigences sont très simples et impliquent uniquement des requêtes générales de liaison de base de données et des opérations de mise à jour. Cependant, au cours du processus de mise en œuvre du codage, il a été découvert qu'en raison du grand nombre d'hôtes, le traitement des requêtes et des mises à jour prend beaucoup de temps. environ 30 à 40 secondes pour appeler une interface. Temps minimum pour terminer l'opération.
Par conséquent, afin de réduire efficacement le temps d'exécution des méthodes d'interface, envisagez d'utiliser des méthodes de programmation simultanée multithread, en tirant parti des capacités d'exécution parallèle des processeurs multicœurs et en traitant les données de manière asynchrone, ce qui peut considérablement réduire le temps d'exécution, améliorer l'efficacité de l'exécution.
Ici, nous utilisons un pool de threads FixedThreadPool
qui peut réutiliser un nombre fixe de threads, et utilisons l'outil de contrôle de processus de concurrence fourni par la concurrence CountDownLatch
classe d'outils sous forme de combinaison. Assurer un fonctionnement normal pendant la programmation simultanée multithread : FixedThreadPool
,并使用 CountDownLatch
并发工具类提供的并发流程控制工具作为配合使用,保证多线程并发编程过程中的正常运行:
首先,通过 Runtime.getRuntime().availableProcessors()
方法获取运行机器的 CPU 线程数,用于后续设置固定线程池的线程数量。
其次,判断任务的特性,如果为计算密集型任务则设置线程数为 CPU 线程数+1
,如果为 IO 密集型任务则设置线程数为 2 * CPU 线程数
,由于在方法中需要与数据库进行频繁的交互,因此属于 IO 密集型任务。
之后,对数据进行分组切割,每个线程处理一个分组的数据,分组的组数与线程数保持一致,并且还要创建计数器对象 CountDownLatch
,调用构造函数,初始化参数值为线程数个数,保证主线程等待所有子线程运行结束后,再进行后续的操作。
然后,调用 executorService.execute()
方法,重写 run
方法编写业务逻辑与数据处理代码,执行完当前线程后记得将计数器减1操作。最后,当所有子线程执行完成后,关闭线程池。
在省略工作场景中的业务逻辑代码后,通用的处理方法示例如下所示:
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ExecutorService executorService = Executors.newFixedThreadPool(threadNum/2); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executorService.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executorService.shutdown(); return ResponseData.success(); }
那么在使用多线程异步更新的策略后,从当初调用接口所需的大概时间为 30-40 min
下降到了 8-10 min
,大大提高了执行效率。
需要注意的是,这里使用的
newFixedThreadPool
创建线程池,它有一个缺陷就是,它的阻塞队列默认是一个无界队列,默认值为Integer.MAX_VALUE
极有可能会造成 OOM 问题。因此,一般可以使用ThreadPoolExecutor
- Tout d'abord, passez Runtime.getRuntime La méthode ().availableProcessors() obtient le nombre de threads CPU de la machine en cours d'exécution, qui est utilisé pour définir ultérieurement le nombre de threads dans le pool de threads fixe.
Deuxièmement, déterminez les caractéristiques de la tâche. S'il s'agit d'une tâche gourmande en calcul, définissez le nombre de threads sur nombre de. Threads CPU + 1 code>, s'il s'agit d'une tâche gourmande en E/S, définissez le nombre de threads sur <code>2 * Nombre de threads CPU
Étant donné que la méthode nécessite une interaction fréquente avec la base de données, c'est une tâche gourmande en IO.
Après cela, les données sont regroupées et coupées. Chaque thread traite une donnée groupée. Le nombre de groupes groupés est cohérent avec le nombre de. threads, et vous devez également créer un objet compteur CountDownLatch
, appeler le constructeur et initialiser la valeur du paramètre avec le nombre de threads pour vous assurer que le thread principal attend que tous les threads enfants aient fini de s'exécuter avant d'exécuter opérations ultérieures.
Ensuite, appelez la méthode executorService.execute()
et remplacez le run
méthode Écrivez la logique métier et le code de traitement des données, et n'oubliez pas de décrémenter le compteur de 1 après avoir exécuté le thread en cours. Enfin, lorsque tous les threads enfants ont terminé leur exécution, fermez le pool de threads.
Après avoir omis le code de logique métier dans le scénario de travail, l'exemple de méthode de traitement général est le suivant :
public ResponseData updateHostDept() { // ... List<Map> hostMapList = mongoTemplate.find(query, Map.class, "host"); // split the hostMapList for the following multi-threads task // return the number of logical CPUs int processorsNum = Runtime.getRuntime().availableProcessors(); // set the threadNum as 2*(the number of logical CPUs) for handling IO Tasks, // if Computing Tasks set the threadNum as (the number of logical CPUs) + 1 int threadNum = processorsNum * 2; // the number of each group data int eachGroupNum = hostMapList.size() / threadNum; List<List<Map>> groupList = new ArrayList<>(); for (int i = 0; i < threadNum; i++) { int start = i * eachGroupNum; if (i == threadNum - 1) { int end = mapList.size(); groupList.add(hostMapList.subList(start, end)); } else { int end = (i+1) * eachGroupNum; groupList.add(hostMapList.subList(start, end)); } } // update data by using multi-threads asynchronously ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (List<Map> group : groupList) { executor.execute(()->{ try { for (Map map : group) { // update the data in mongodb } } catch (Exception e) { e.printStackTrace(); } finally { // let counter minus one countDownLatch.countDown(); } }); } try { // main thread donnot execute until all child threads finish countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } // remember to shutdown the threadPool executor.shutdown(); return ResponseData.success(); }#🎜🎜# Ensuite, utilisez le multi-threading Après la mise en œuvre de la stratégie de mise à jour asynchrone, le temps initial approximatif requis pour appeler l'interface a été réduit de
30-40 min
à 8-10 min
, ce qui a considérablement amélioré l'efficacité de l'exécution. . #🎜🎜##🎜🎜#Il convient de noter que lenewFixedThreadPool
utilisé ici crée un pool de threads. Il a le défaut que sa file d'attente de blocage soit une file d'attente illimitée par défaut, et la valeur par défaut. la valeur estInteger.MAX_VALUE
est très susceptible de provoquer des problèmes de MOO. Par conséquent, vous pouvez généralement utiliserThreadPoolExecutor
pour créer un pool de threads et vous pouvez spécifier le nombre de threads dans la file d'attente pour éviter les problèmes de MOO. #🎜🎜##🎜🎜#rrreee#🎜🎜#Dans le code ci-dessus, le nombre de threads principaux et le nombre maximum de threads sont respectivement de 5 et 8, et ne sont pas définis sur des valeurs très élevées, car s'ils sont définis sur une valeur élevée, les changements de contexte fréquents des threads augmenteront également la consommation de temps, mais ne maximiseront pas les avantages du multi-threading. Quant à la manière de choisir les paramètres appropriés, elle doit être déterminée en fonction des paramètres de la machine et du type de tâche. #🎜🎜##🎜🎜#Enfin, si vous souhaitez obtenir le nombre de threads CPU de la machine via une méthode sans codage, c'est également très simple Dans le système Windows, vous pouvez visualiser le nombre de threads CPU via la tâche. manager et sélectionnez "Performance". La situation est telle que montrée dans l'image ci-dessous : #🎜🎜##🎜🎜##🎜🎜##🎜🎜##🎜🎜#Comme vous pouvez le voir sur l'image ci-dessus, le cœur de mon La machine est composée de huit processeurs, mais grâce à la technologie hyper-threading, un cœur de processeur physique peut être simulé en deux threads de processeur logiques, ma machine prend donc en charge 8 cœurs et 16 threads. #🎜🎜#
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!