Maison > Article > développement back-end > Implémenter la file d'attente de messages en utilisant Kafka dans Beego
Dans les applications Web modernes, une messagerie efficace est un élément très important. La file d'attente de messages est une solution de livraison asynchrone de messages entre différents systèmes, qui peut optimiser la livraison des données et l'efficacité du traitement. Dans le langage Go, le framework Beego est un framework web très populaire qui prend en charge le développement d'applications web et d'API. Dans cet article, nous explorerons comment implémenter une file d'attente de messages à l'aide de kafka dans Beego pour une livraison efficace des messages.
1. Introduction à Kafka
Kafka est un système de file d'attente de messages distribué, partitionné et à copies multiples. Il a été initialement développé par LinkedIn et ensuite maintenu par Apache Software Foundation. Kafka est principalement utilisé pour traiter de grandes quantités de données en temps réel, prendre en charge la messagerie à haut débit et également prendre en charge une variété d'applications auprès de plusieurs consommateurs et producteurs.
Les concepts fondamentaux de kafka sont les sujets, les partitions et les compensations. Le sujet fait référence à la classification des messages et chaque message appartient à un sujet spécifique. Une partition est un sous-ensemble d'une rubrique et chaque partition est une file d'attente de messages ordonnée et immuable. Chaque partition peut être répliquée sur plusieurs serveurs pour prendre en charge plusieurs consommateurs traitant simultanément la même partition. Le décalage est une valeur qui identifie de manière unique chaque message. Les consommateurs peuvent spécifier un décalage spécifique à partir duquel commencer à lire les messages.
2. Utiliser Kafka dans Beego
Installer kafka est très simple. Il vous suffit de télécharger le package compressé depuis le site officiel de kafka et de le décompresser dans le répertoire spécifié. L'exemple utilise la version kafka_2.12-2.3.0.
Avant de commencer à utiliser kafka, vous devez créer un nouveau sujet et une nouvelle partition. Vous pouvez utiliser le propre outil de gestion de Kafka (kafka-topics.sh) pour créer des sujets et des partitions. Exécutez la commande suivante dans la ligne de commande :
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Cette commande créera un sujet nommé "test" avec une seule partition et un numéro de sauvegarde de 1. Vous pouvez modifier le nombre de partitions et de sauvegardes selon vos besoins.
Les étapes pour créer un producteur kafka sont les suivantes :
package main import ( "github.com/Shopify/sarama" ) func main() { // 设置kafka配置 config := sarama.NewConfig() config.Producer.Return.Successes = true // 新建生产者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } // 构造消息 message := &sarama.ProducerMessage{ Topic: "test", Value: sarama.StringEncoder("test message"), } // 发送消息 _, _, err = producer.SendMessage(message) if err != nil { panic(err) } producer.Close() }
Parmi elles, sarama est une bibliothèque cliente en langage Go utilisée pour connecter et faire fonctionner un cluster kafka. Dans le code ci-dessus, nous créons un nouvel objet SyncProducer puis envoyons un message au sujet "test".
Les étapes pour créer un consommateur kafka sont les suivantes :
package main import ( "fmt" "github.com/Shopify/sarama" "log" "os" "os/signal" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true // 新建一个消费者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } // 准备订阅话题 topic := "test" partitionList, err := consumer.Partitions(topic) if err != nil { panic(err) } // 启动goroutine处理消息 for _, partition := range partitionList { // 构造一个partitionConsumer pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest) if err != nil { panic(err) } go func(partitionConsumer sarama.PartitionConsumer) { defer func() { // 关闭consumer if err := partitionConsumer.Close(); err != nil { log.Fatalln(err) } }() for msg := range partitionConsumer.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%s Value:%s ", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } // 处理中断信号 sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, os.Interrupt) <-sigterm fmt.Println("Shutdown") consumer.Close() }
Le code ci-dessus crée un nouvel objet consommateur et s'abonne au sujet "test". Ensuite, plusieurs goroutines sont démarrées pour traiter simultanément les messages provenant de différentes partitions. Une fois le message traité, la méthode Close() est appelée pour fermer le consommateur.
3. Résumé
Dans cet article, nous avons présenté comment utiliser kafka pour implémenter des files d'attente de messages dans Beego. Ceci est utile pour les applications Web qui doivent traiter des données à haut débit. En utilisant Kafka, nous pouvons transmettre des messages de manière asynchrone entre plusieurs consommateurs et producteurs afin de maximiser le transfert de données et l'efficacité du traitement. Si vous développez une application Beego et avez besoin d'une messagerie efficace, Kafka est un excellent choix.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!