Maison >Java >javaDidacticiel >Méthode de traitement des messages asynchrones Springboot
Au travail, nous rencontrons souvent des scénarios métiers qui nécessitent un traitement asynchrone des messages. Il existe des méthodes de traitement complètement différentes selon la nature du message.
1. Les messages ne sont pas indépendants
Les messages indépendants ont généralement des dépendances séquentielles. À ce stade, le mécanisme de traitement des messages dégénérera en un mode de traitement de file d'attente linéaire et ne pourra être consommé. par un seul consommateur. Ou accédez à un seul fil de discussion pour traiter les messages.
2. Les messages sont complètement indépendants
Des messages complètement indépendants peuvent être traités simultanément par plusieurs consommateurs (threads) en même temps, obtenant ainsi des capacités de traitement simultanées maximales.
3. Les messages ne sont pas complètement indépendants
Habituellement, c'est le cas où il est nécessaire d'ordonner des messages homologues (du même producteur), et l'ordre des messages hétérogènes est non pertinent.
Le traitement des messages dans ce scénario sera relativement compliqué. Afin de garantir l'ordre des messages provenant de la même source, il est facile de penser à lier des fils de discussion fixes aux messages provenant de la même source. est très simple mais pose beaucoup de problèmes.
Si le nombre de producteurs est important, le nombre de threads liés peut ne pas être suffisant. Bien sûr, vous pouvez réutiliser les ressources du thread et lier plusieurs sources de messages au même thread pour le traitement. problème : interactions des messages entre sources.
Considérez le scénario suivant :
Le producteur P1 génère un grand nombre de messages et entre dans la file d'attente et est affecté au thread consommateur C1 pour le traitement (C1 peut prendre beaucoup de temps pour processus). A ce moment, le producteur P2 a généré un message, mais malheureusement il a également été affecté au thread consommateur C1 pour le traitement
Ensuite, le traitement du message du producteur P2 sera bloqué par le grand nombre. des messages de P1, entraînant un écart entre P1 et P2. L'influence mutuelle et les autres threads de consommation ne peuvent pas être pleinement utilisés, ce qui entraîne un déséquilibre.
Nous devons donc envisager d'éviter de tels problèmes. Atteindre la rapidité du traitement de la consommation (dès que possible), l'isolement (en évitant les interférences mutuelles) et l'équilibre (maximiser le traitement simultané)
Dans la mise en œuvre, il y aura deux modes. Le plus simple à penser est. Modèle de répartition des threads (méthode PUSH), la méthode spécifique est généralement la suivante :
1 Il existe un répartiteur de messages global qui interroge la file d'attente pour récupérer les messages.
2. Selon la source du message, envoyez-le au fil de discussion approprié pour traitement.
Le mécanisme de l'algorithme de distribution peut être aussi simple qu'un hachage basé sur la source du message, ou aussi complexe que la charge actuelle de chaque thread consommateur, la longueur de la file d'attente et la complexité du message, et peuvent être sélectionnés pour être distribués sur la base d’une analyse complète.
Simple Hash rencontrera certainement les problèmes décrits dans le scénario ci-dessus, mais les calculs de distribution complexes sont évidemment très gênants et compliqués à mettre en œuvre, et l'efficacité n'est pas nécessairement bonne. Il est également difficile d'obtenir un résultat parfait. équilibre en termes d'équilibre.
Le deuxième mode utilise la méthode PULL, et le fil extrait à la demande. La méthode spécifique est la suivante :
1. temporaire correspondant à la file d'attente source (comme indiqué ci-dessous, chaque session représente une source de message différente), puis placez la session dans une file d'attente de blocage pour avertir le fil de traitement
2. la file d'attente en même temps pour rivaliser pour les messages (assurez-vous qu'un seul thread l'obtient
3. Vérifiez si l'indicateur de file d'attente est traité par d'autres threads (la mise en œuvre nécessite une synchronisation de détection basée sur les messages de même origine au niveau du thread)
4. S'ils ne sont pas traités par d'autres threads, indiquez l'état dans le traitement des paramètres de la zone de synchronisation et traitez les messages dans la file d'attente temporaire après avoir quitté la zone de synchronisation
# 🎜🎜#5. Une fois le traitement terminé, entrez à nouveau dans la zone de synchronisation. Définissez l'état d'indication de traitement sur inactifCe qui suit est un morceau de code pour décrire le processus de traitement du thread de consommation : #🎜🎜 #public void run() { try { for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) { // first check any worker is processing this session? // if any other worker thread is processing this event with same session, just ignore it. synchronized (s) { if (!s.isEventProcessing()) { s.setEventProcessing(true); } else { continue; } } // fire events with same session fire(s); // last reset processing flag and quit current thread processing s.setEventProcessing(false); // if remaining events, so re-insert to session queue if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) { squeue.offer(s); } } } catch (InterruptedException e) { LOG.warn(e.getMessage(), e); } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!