Home  >  Article  >  Backend Development  >  How to use message queue in Go?

How to use message queue in Go?

PHPz
PHPzOriginal
2023-05-11 15:46:541577browse

Message queue is a common system architecture pattern, which plays an extremely important role in handling high concurrency and asynchronous task processing. In the Go language, through some open source message queue libraries and tools, using message queues has become very convenient and simple.

This article will introduce how to use message queues in Go, including the following:

  1. Understanding message queues
  2. Common message queues
  3. Advantages and applicable scenarios of using message queues in Go
  4. Message queue library in Go language
  5. Demonstrate how to use message queues in Go through an example
Understanding Message Queue

Message queue is an architectural pattern that uses queues to cache, asynchronously transmit and store messages. Message queues are generally divided into three parts: producers, consumers and queues. The producer sends the message to the queue, and the consumer takes the message from the queue for processing. The purpose of the message queue is to decouple the time and space dependencies between producers and consumers and implement asynchronous task processing.

The message queue can cache data, implement asynchronous processing, peak load shaving (to cope with high concurrent requests in a short period of time), load balancing and other tasks. It is an important part to support the design of large-scale distributed systems.

Common message queues

There are many message queue libraries and tools on the market that support various programming languages. Among the more common ones are the following:

  1. RabbitMQ: RabbitMQ is an open source message queue system that supports multiple protocols and programming languages, such as AMQP, STOMP, MQTT, etc. Developers can access it through various language clients, such as Go, Java, Python, etc. RabbitMQ is written in Erlang language and is widely used to support real-time processing scenarios such as IoT, group chat, and monitoring.
  2. Apache Kafka: Apache Kafka is a message queuing system based on the publish/subscribe model, developed by LinkedIn, and is mainly used to handle continuous streaming data processing. Kafka distributes messages through multiple partitions to support high throughput and high scalability.
  3. ActiveMQ: ActiveMQ is a popular JMS-based message queuing system that supports multiple transmission protocols and programming language access, such as AMQP, STOMP, Openwire, etc.
  4. NSQ: NSQ is a real-time distributed message processing platform, consisting of two components: nsq and nsqd. nsq is a TCP proxy server for client interaction, while nsqd is a service for persistent messages and queues.
Advantages and applicable scenarios of using message queues in Go

Go language natively supports coroutines, so using message queues to handle asynchronous tasks is particularly suitable. Go language provides a lot of open source libraries and tools for message queues, which are also relatively convenient to use.

In addition, because the message queue processes messages asynchronously, tasks can be offloaded to avoid high concurrency on a single machine. Therefore, the message queue can be used in the following scenarios:

  1. Processing of large amounts of data: such as processing of large amounts of server data in website logs, stress testing, etc.;
  2. Asynchronous processing and task distribution: Such as email sending, SMS notification, etc.;
  3. Distributed task queue: such as 0 queue, backlog queue, etc.;
  4. Multiple consumer concurrency scenarios: such as e-commerce flash sales, high concurrent comments, etc.;
  5. Application decoupling and expansion: such as integrating external message service notifications and separating data interactions between systems.
Message queue library in Go language

In Go language, there are many open source message queue libraries available, such as:

  1. RabbitMQ AMQP client library: https://github.com/streadway/amqp;
  2. Apache Kafka client library: https://github.com/confluentinc/confluent-kafka-go;
  3. NSQ client library: https://github.com/nsqio/go-nsq.

Using these open source libraries can easily connect to different message queue systems, allowing developers to focus more on logic development on the business line, improving development efficiency and code readability.

Show how to use the message queue in Go through an example

Below we will show how to use the message queue in Go through a simple example.

Suppose we want to crawl image data from some websites and save it locally. We can use go to complete this program. In order to implement asynchronous downloading of some pictures, we use RabbitMQ as the message queue and complete the following steps in Go:

Install RabbitMQ

  1. Install RabbitMQ, official website download address: https:/ /www.rabbitmq.com/download.html;
  2. Configure RabbitMQ. After installation, enter the bin directory (please ignore the .bat suffix for non-Windows platforms) and execute: ./rabbitmqctl start to start RabbitMQ;
  3. Create an MQ virtual host and execute: ./rabbitmqctl add_vhost test;
  4. Add users and assign permissions, execute: ./rabbitmqctl add_user test test, ./rabbitmqctl set_permissions -p test test "." "." ".*";
  5. Start the RabbitMQ web management interface, execute: ./rabbitmq-plugins enable rabbitmq_management, enter the address http://localhost:15672 in the browser to enter Management interface.

Writing code

We can use the github.com/streadway/amqp library to interact with RabbitMQ. Below is the code.

First write the crawler code to crawl the image address that needs to be downloaded and send it to RabbitMQ:

func main() {
    spider()
}

func spider() {
    url := "https://www.example.com"
    doc, _ := goquery.NewDocument(url)
    doc.Find(".img_wrapper img").Each(func(i int, s *goquery.Selection) {
        imgUrl, _ := s.Attr("src")
        publishToMQ(imgUrl)
    })
}

func publishToMQ(msg string) {
    conn, err := amqp.Dial("amqp://test:test@localhost:5672/test")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "image_downloader", // name
        true,               // durable
        false,              // delete when unused
        false,              // exclusive
        false,              // no-wait
        nil,                // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msg),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", msg)
}

Then write the image downloader. By listening to the message queue of RabbitMQ, asynchronous image downloading is achieved:

func main() {
    consumeMQ()
}

func consumeMQ() {
    conn, err := amqp.Dial("amqp://test:test@localhost:5672/test")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "image_downloader", // name
        true,               // durable
        false,              // delete when unused
        false,              // exclusive
        false,              // no-wait
        nil,                // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            downloadImage(string(d.Body))
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

func downloadImage(url string) {
    resp, err := http.Get(url)
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Body.Close()

    file, err := os.Create(uuid.New().String() + ".jpg")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    _, err = io.Copy(file, resp.Body)
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Downloaded an image: %s", url)
}

In the above code, we created a work queue "image-downloader". After the producer parses the image address of the html page, it goes to the work queue. Send a message. The consumer will listen to the work queue, and after receiving the message, call the downloadImage function to download the image file.

The above example is a simple use case using RabbitMQ. Using other message queue libraries is similar, you just need to implement connections and operations through different APIs.

Overview

In this article we introduce and explain what a message queue is. In a large amount of data processing scenarios, asynchronous consumption is essential. The Go language makes asynchronous task processing simple and efficient due to its own coroutine mechanism. Coupled with the rich open source libraries of the Go language itself, it becomes extremely easy to use message queues to implement asynchronous message processing.

Through the above examples, we can see that when implementing asynchronous task processing, using message queues can greatly improve processing efficiency, and using message queues in Go language is also very convenient. In projects, it is recommended to use open source message queue libraries, such as RabbitMQ or Apache Kafka.

The above is the detailed content of How to use message queue in Go?. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn