Maison >Java >javaDidacticiel >Créer un système de collecte et d'analyse de journaux basé sur Spring Boot et Flume

Créer un système de collecte et d'analyse de journaux basé sur Spring Boot et Flume

WBOY
WBOYoriginal
2023-06-23 08:53:491899parcourir

À mesure que l'échelle des systèmes d'entreprise continue de croître, les journaux système deviennent de plus en plus volumineux. Sans un système fiable de collecte et d'analyse des journaux, il est difficile de surveiller et de maintenir efficacement le système. Cet article explique comment créer un système efficace de collecte et d'analyse de journaux basé sur Spring Boot et Flume.

  1. Prerequis pour commencer, vous devez installer et configurer le logiciel suivant:

jdk 8 ou plus

    Maven 3.3 ou au-dessus de
  • apache Flume 1.9.0 ou au-dessus de
  • Lasticsearch 7.6. ou version supérieure
  • Kibana 7.6.2 ou version supérieure
  • Configuration de l'application Spring Boot
  1. Tout d'abord, nous devons créer une application Spring Boot et ajouter les dépendances requises :
  2. <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
dans le fichier application.properties, ajoutez la configuration suivante :

# 应用端口号
server.port=8080

# log4j2配置
logging.config=classpath:log4j2.xml

# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414

# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200

Dans la configuration ci-dessus, nous avons spécifié le numéro de port de l'application, le fichier de configuration log4j2, la configuration liée à Flume et l'URI d'accès Elasticsearch.

Log Collector

  1. Afin d'envoyer les journaux d'application à Flume, nous devons créer un appender log4j2 personnalisé.
  2. @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
    public class FlumeAppender extends AbstractAppender {
    
        private static final ObjectMapper MAPPER = new ObjectMapper();
        private final FlumeClient client;
        private final String sourceType;
    
        protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                                FlumeClient client, String sourceType) {
            super(name, filter, layout, true);
            this.client = client;
            this.sourceType = sourceType;
        }
    
        @PluginFactory
        public static FlumeAppender createAppender(@PluginAttr("name") String name,
                                                   @PluginElement("Filters") Filter filter,
                                                   @PluginElement("Layout") Layout<? extends Serializable> layout,
                                                   @PluginAttr("sourceType") String sourceType,
                                                   @PluginAttr("hosts") String hosts) {
            if (name == null) {
                LOGGER.error("FlumeAppender missing name");
                return null;
            }
            if (client == null) {
                LOGGER.error("FlumeAppender missing client");
                return null;
            }
            return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
        }
    
        private static FlumeClient createClient(String hosts) {
            LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
            String[] hostArray = hosts.split(",");
            for (String host : hostArray) {
                String[] hostParts = host.split(":");
                rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
            }
            Properties props = new Properties();
            props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
            props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
            props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
            AvroEventSerializer serializer = new AvroEventSerializer();
            serializer.configure(props, false);
            return new FlumeClient(rpcClient, serializer);
        }
    
        @Override
        public void append(LogEvent event) {
            try {
                byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
                Map<String, String> headers = new HashMap<>();
                headers.put("timestamp", Long.toString(event.getTimeMillis()));
                headers.put("source", "log4j");
                headers.put("sourceType", sourceType);
                Event flumeEvent = EventBuilder.withBody(body, headers);
                client.sendEvent(flumeEvent);
            } catch (Exception e) {
                LOGGER.error("Failed to send event to Flume", e);
            }
        }
    }
Dans le code ci-dessus, nous avons implémenté un log4j2 Appender, qui regroupera l'événement de journal dans un événement Flume et l'enverra au serveur Flume.

Créez un fichier de configuration log4j2 et configurez FlumeAppender.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="flume"/>
        </Root>
    </Loggers>
</Configuration>

Dans ce fichier de configuration log4j2, nous définissons un FlumeAppender et le référençons dans le Root Logger.

Configuration Flume

  1. Nous devons configurer Flume pour recevoir les messages de journal envoyés depuis l'application dans Flume Agent et les envoyer à Elasticsearch.
Créez un fichier de configuration Flume comme indiqué ci-dessous.

# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1

# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414

# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000

# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}

# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1

Dans le fichier de configuration Flume, nous définissons un agent, une source et un récepteur. La source est de type Avro, liée au port 41414, Channel1 est un type de mémoire, la capacité est de 10 000 et transactionCapacity est de 1 000. Le récepteur est un type ElasticsearchSink qui crée un index nommé type sur le port 9200 de l'hôte local et le soumet à Elasticsearch par lots lorsque 1 000 événements sont atteints.

Configuration d'Elasticsearch et Kibana

  1. Enfin, nous devons configurer Elasticsearch et Kibana. Dans Elasticsearch, nous devons créer un index qui correspond au nom d'index défini dans le fichier de configuration Flume.
Dans Kibana, nous devons créer un schéma d'index. Dans le menu principal de Kibana, sélectionnez « Gestion » puis « Kibana ». Dans le modèle d'index Kibana, sélectionnez « Créer un modèle d'index ». Entrez le nom d'index défini dans le fichier de configuration Flume et suivez les invites pour le configurer.

Nous devons également créer un tableau de bord pour Kibana afin de visualiser les messages du journal de l'application. Dans le menu principal de Kibana, sélectionnez « Tableau de bord » puis « Créer un tableau de bord ». Dans l'onglet "Visualisations", sélectionnez "Ajouter une visualisation". Sélectionnez Table de données et configurez les champs requis et les options de visualisation.

Conclusion

  1. Dans cet article, nous avons présenté comment utiliser Spring Boot et Flume pour créer un système efficace de collecte et d'analyse de journaux. Nous avons implémenté un appender log4j2 personnalisé pour envoyer les événements de journal de l'application au serveur Flume, et utilisé Elasticsearch et Kibana pour l'analyse et la visualisation des journaux. J'espère que cet article pourra vous aider à créer votre propre système de collecte et d'analyse de journaux.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn