Maison >développement back-end >Golang >Streaming des journaux en temps réel dans Go

Streaming des journaux en temps réel dans Go

王林
王林original
2024-07-23 12:30:24850parcourir

Presque une simulation tail -f mais de manière intéressante.

Abordons ce problème en le décomposant en tâches gérables, en fournissant des explications claires pour chaque étape. Nous commencerons par un aperçu, puis approfondirons chaque tâche.

Aperçu

  1. Surveillance des fichiers : surveillez en permanence un fichier journal pour détecter le contenu nouvellement ajouté.
  2. Configuration du serveur : établissez un serveur pour gérer les connexions client entrantes et diffuser les messages.
  3. Gestion des connexions clients : Gérer les connexions et les déconnexions des clients.
  4. Diffusion de messages : diffusez les entrées de journal nouvellement ajoutées à tous les clients connectés.
  5. Tests et optimisation : assurez-vous que la solution est efficace et robuste.

Répartition des tâches

1 - Surveillance des fichiers
Objectif : Mettre en place un mécanisme pour surveiller un fichier journal pour les nouveaux ajouts en temps réel.
Étapes :

  • Utilisez le package OS pour lire et surveiller les fichiers.
  • Lisez continuellement le fichier à partir de la dernière position connue.
  • Détectez et lisez le contenu nouvellement ajouté.

Mise en œuvre :

package main

import (
    "os"
    "time"
    "io"
    "log"
)

func tailFile(filePath string, lines chan<- string) {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalf("failed to open file: %s", err)
    }
    defer file.Close()

    fi, err := file.Stat()
    if err != nil {
        log.Fatalf("failed to get file stats: %s", err)
    }

    // Start reading from end of file
    file.Seek(0, io.SeekEnd)
    offset := fi.Size()

    for {
        // Check the file size
        fi, err := file.Stat()
        if err != nil {
            log.Fatalf("failed to get file stats: %s", err)
        }

        if fi.Size() > offset {
            // Seek to the last position
            file.Seek(offset, io.SeekStart)
            buf := make([]byte, fi.Size()-offset)
            _, err := file.Read(buf)
            if err != nil && err != io.EOF {
                log.Fatalf("failed to read file: %s", err)
            }

            lines <- string(buf)
            offset = fi.Size()
        }

        time.Sleep(1 * time.Second)
    }
}

Cette fonction lira le nouveau contenu du fichier spécifié et l'enverra au canal de lignes.

2- Configuration du serveur
Objectif : configurer un serveur de base à l'aide de Gorilla WebSocket pour gérer les connexions client.
Étapes :

  • Utilisez le package github.com/gorilla/websocket.
  • Créez un serveur HTTP qui met à niveau les connexions vers WebSocket.

Mise en œuvre :

package main

import (
    "net/http"
    "github.com/gorilla/websocket"
    "log"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // Allow all connections
        return true
    },
}

func handleConnections(w http.ResponseWriter, r *http.Request, clients map[*websocket.Conn]bool) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Fatalf("failed to upgrade connection: %s", err)
    }
    defer ws.Close()

    // Register the new client
    clients[ws] = true

    // Wait for new messages
    for {
        var msg string
        err := ws.ReadJSON(&msg)
        if err != nil {
            delete(clients, ws)
            break
        }
    }
}

func main() {
    clients := make(map[*websocket.Conn]bool)

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        handleConnections(w, r, clients)
    })

    log.Println("Server started on :8080")
    err := http.ListenAndServe(":8080", nil)
    if err != nil {
        log.Fatalf("failed to start server: %s", err)
    }
}

3- Gestion des connexions clients
Objectif : Gérer les connexions et déconnexions des clients, en garantissant une gestion robuste.
Étapes :

  • Maintenir une carte des clients actifs.
  • Ajoutez et supprimez des clients en toute sécurité.

Mise en œuvre :

package main

var clients = make(map[*websocket.Conn]bool)

func handleConnections(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("error upgrading to websocket: %v", err)
        return
    }
    defer ws.Close()

    clients[ws] = true

    for {
        _, _, err := ws.ReadMessage()
        if err != nil {
            delete(clients, ws)
            break
        }
    }
}

4- Diffusion de messages
Objectif : Diffuser les nouvelles lignes de log à tous les clients connectés.
Étapes :

  • Lire depuis le canal des lignes.
  • Diffusion à tous les clients connectés.

Mise en œuvre :

package main

func broadcastMessages(lines <-chan string, clients map[*websocket.Conn]bool) {
    for {
        msg := <-lines
        for client := range clients {
            err := client.WriteMessage(websocket.TextMessage, []byte(msg))
            if err != nil {
                client.Close()
                delete(clients, client)
            }
        }
    }
}

5- Intégration et optimisation
Objectif : Intégrer tous les composants et optimiser les performances.
Étapes :

  • Combinez la surveillance des fichiers, la configuration du serveur et la diffusion de messages.
  • Ajoutez des mécanismes de contrôle de concurrence appropriés (canaux, mutex).

Dans cette étape, nous intégrerons les fonctionnalités de surveillance des fichiers journaux, de configuration du serveur, de gestion des connexions client et de diffusion de messages dans un seul programme cohérent. Nous ajouterons également des mécanismes de contrôle de concurrence pour garantir la sécurité et la robustesse des threads.

Intégration complète du code

package main

import (
    "log"
    "net/http"
    "os"
    "sync"
    "time"

    "github.com/gorilla/websocket"
)

// Upgrade configuration
var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        // Allow cross-origin requests
        return true
    },
}

var (
    clients = make(map[*websocket.Conn]bool) // Map to store all active clients
    mu      sync.Mutex                       // Mutex to ensure thread safety
)

// handleConnections handles incoming websocket connections.
func handleConnections(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("error upgrading to websocket: %v", err)
        return
    }
    defer ws.Close()

    mu.Lock()
    clients[ws] = true
    mu.Unlock()

    // Keep the connection open
    for {
        if _, _, err := ws.ReadMessage(); err != nil {
            mu.Lock()
            delete(clients, ws)
            mu.Unlock()
            ws.Close()
            break
        }
    }
}

// broadcastMessages reads from the lines channel and sends to all clients.
func broadcastMessages(lines <-chan string) {
    for {
        msg := <-lines
        mu.Lock()
        for client := range clients {
            err := client.WriteMessage(websocket.TextMessage, []byte(msg))
            if err != nil {
                client.Close()
                delete(clients, client)
            }
        }
        mu.Unlock()
    }
}

// tailFile watches the given file for changes and sends new lines to the lines channel.
func tailFile(filePath string, lines chan<- string) {
    file, err := os.Open(filePath)
    if err != nil {
        log.Fatalf("failed to open file: %v", err)
    }
    defer file.Close()

    fi, err := file.Stat()
    if err != nil {
        log.Fatalf("failed to get file stats: %v", err)
    }

    // Start reading from end of file
    file.Seek(0, io.SeekEnd)
    offset := fi.Size()

    for {
        fi, err := file.Stat()
        if err != nil {
            log.Fatalf("failed to get file stats: %v", err)
        }

        if fi.Size() > offset {
            // Seek to the last position
            file.Seek(offset, io.SeekStart)
            buf := make([]byte, fi.Size()-offset)
            _, err := file.Read(buf)
            if err != nil && err != io.EOF {
                log.Fatalf("failed to read file: %v", err)
            }

            lines <- string(buf)
            offset = fi.Size()
        }

        time.Sleep(1 * time.Second)
    }
}

// main function to start the server and initialize goroutines.
func main() {
    lines := make(chan string)

    go tailFile("test.log", lines)       // Start file tailing in a goroutine
    go broadcastMessages(lines)         // Start broadcasting messages in a goroutine

    http.HandleFunc("/ws", handleConnections) // Websocket endpoint

    log.Println("Server started on :8080")
    err := http.ListenAndServe(":8080", nil) // Start HTTP server
    if err != nil {
        log.Fatalf("Failed to start server: %v", err)
    }
}

Image description

Explication du code :

Surveillance des fichiers :

  • La fonction tailFile est exécutée dans une goroutine, surveillant en permanence le fichier journal pour détecter tout nouveau contenu et envoyant de nouvelles lignes à un canal (lignes).

Configuration du serveur :

  • Le serveur HTTP est configuré avec le http.HandleFunc("/ws", handleConnections) qui met à niveau les connexions HTTP vers WebSockets à l'aide de la bibliothèque Gorilla WebSocket.

Gestion des clients :

  • Les clients sont gérés dans handleConnections. Les connexions sont mises à niveau vers WebSocket et chaque connexion est gérée dans une carte appelée clients.
  • Mutex (mu) est utilisé pour garantir la sécurité des threads lors de l'ajout ou de la suppression de clients.

Diffusion de messages :

  • La fonction BroadcastMessages lit le canal des lignes et envoie le contenu à tous les clients connectés.
  • La fonction s'exécute dans sa propre goroutine et utilise le mutex pour garantir la sécurité des threads lors de l'accès à la carte des clients.

Intégration et optimisation :

  • Tous les composants sont intégrés et exécutés simultanément à l'aide de goroutines.
  • La synchronisation est gérée avec un mutex pour garantir que les opérations sur la carte des clients sont thread-safe.

Exécution du programme

1- Enregistrez le code dans un fichier, par exemple main.go.
2- Assurez-vous que le package Gorilla WebSocket est installé :

go get github.com/gorilla/websocket

3- Lancer le programme Go :

go run main.go

4- Utilisez un client WebSocket pour vous connecter à ws://localhost:8080/ws.

  • La création d'un client WebSocket peut être réalisée à l'aide de divers outils et méthodes. Ci-dessous, je fournirai des instructions et des exemples pour créer un client WebSocket en utilisant à la fois un outil CLI (comme websocat)
  • Utilisation d'un outil CLI : websocat
  • websocat est un simple client WebSocket pour la ligne de commande. Vous pouvez l'installer et l'utiliser pour vous connecter à votre serveur WebSocket.

Installation :

  • On macOS, you can install websocat using Homebrew:
brew install websocat
  • On Ubuntu, you can install it via Snap:
sudo snap install websocat

You can also download the binary directly from the GitHub releases page.

Usage:

To connect to your WebSocket server running at ws://localhost:8080/ws, you can use:

websocat ws://localhost:8080/ws

Type a message and hit Enter to send it. Any messages received from the server will also be displayed in the terminal.

WebSockets are a widely used protocol for real-time, bidirectional communication between clients and servers. However, they do come with some limitations. Let's discuss these limitations and explore some alternatives that might be more suitable depending on the use case.

Limitations of Using WebSocket

Scalability: While WebSockets are effective for low to moderate traffic, scaling to handle a large number of concurrent connections can be challenging. This often requires sophisticated load balancing and infrastructure management.

State Management: WebSockets are stateful, which means each connection maintains its own state. This can become complicated when scaling horizontally because you need to ensure that sessions are properly managed across multiple servers (e.g., using sticky sessions or a distributed session store).

Resource Intensive: Each WebSocket connection consumes server resources. If you have many clients, this can rapidly consume memory and processing power, necessitating robust resource management.

Firewalls and Proxies: Some corporate firewalls and proxy servers block WebSocket connections because they don’t conform to the traditional HTTP request-response model. This can limit the accessibility of your application.

Security: Although WebSockets can be used over encrypted connections (wss://), they can still be vulnerable to attacks such as cross-site WebSocket hijacking (CSWSH). Ensuring robust security measures is essential.

Latency: While WebSockets have low latency, they are not always the best option for applications that require ultra-low latency or where the timing of messages is critical.

Alternatives to WebSocket

1- Server-Sent Events (SSE)

SSE is a standard allowing servers to push notifications to clients in a unidirectional stream over HTTP.
It is simpler to implement than WebSockets and works natively in many browsers without requiring additional libraries.
Use Cases:

Real-time updates like live feeds, notifications, or social media updates where the data flow is largely unidirectional (server to client).

  • Pros:
    Simpler protocol and easier to implement.
    Built-in reconnection logic.
    Less resource-intensive than WebSockets for unidirectional data flow.

  • Cons:
    Unidirectional (server-to-client) only.
    Less suitable for applications requiring bi-directional communication.

Example:

const eventSource = new EventSource('http://localhost:8080/events');

eventSource.onmessage = function(event) {
    console.log('New message from server: ', event.data);
};

2- HTTP/2 and HTTP/3

The newer versions of HTTP (HTTP/2 and HTTP/3) support persistent connections and multiplexing, which can effectively simulate real-time communication.
They include features like server push, which allows the server to send data to clients without an explicit request.

Use Cases:
When you need to improve the performance and latency of web applications that already use HTTP for communication.

  • Pros:
    Improved performance and lower latency due to multiplexing.
    Better support and broader compatibility with existing HTTP infrastructure.

  • Cons:
    Requires updating server infrastructure to support HTTP/2 or HTTP/3.
    More complex than HTTP/1.1.

3- WebRTC

WebRTC (Web Real-Time Communication) is a technology designed for peer-to-peer communication, primarily for audio and video streaming.
It can also be used for real-time data transfer.

Use Cases:
Real-time audio and video communication.
Peer-to-peer file sharing or live streaming.

  • Pros:
    Peer-to-peer connections reduce server load.
    Built-in support for NAT traversal and encryption.

  • Cons:
    More complex to implement than WebSockets or SSE.
    Requires good understanding of signaling and peer connection management.

4- Message Brokers (e.g., MQTT, AMQP)

Protocols like MQTT and AMQP are designed for message queuing and are optimized for different use cases.
MQTT is lightweight and commonly used in IoT devices.
AMQP is more robust and feature-rich, suited for enterprise-level messaging.

Cas d'utilisation :
Applications IoT.
Systèmes distribués nécessitant une livraison fiable des messages.
Applications avec des besoins complexes en matière de routage et de mise en file d'attente des messages.

  • Avantages :
    Robuste et riche en fonctionnalités (notamment AMQP).
    Convient aux réseaux peu fiables et contraints (notamment MQTT).

  • Inconvénients :
    Introduit une complexité d’infrastructure supplémentaire.
    Nécessite un serveur de courtier de messages et généralement plus de configuration.

Résumé

En fonction de vos besoins spécifiques, les WebSockets peuvent toujours être un bon choix. Cependant, si vous rencontrez des limites en termes d'évolutivité, de complexité ou d'adéquation, envisager l'une des alternatives telles que les événements envoyés par le serveur (SSE), HTTP/2/3, WebRTC ou des courtiers de messages spécialisés comme MQTT ou AMQP pourrait être plus approprié. . Chacune de ces alternatives a ses propres atouts et ses scénarios d'utilisation optimale, et les comprendre vous aidera à choisir la technologie la plus adaptée à votre application.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn