Heim >Java >javaLernprogramm >Erstellen Sie ein Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume

Erstellen Sie ein Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume

WBOY
WBOYOriginal
2023-06-23 08:53:491882Durchsuche

Da die Größe der Unternehmenssysteme immer größer wird, werden die Systemprotokolle immer größer. Ohne ein zuverlässiges Protokollerfassungs- und -analysesystem ist es schwierig, das System effektiv zu überwachen und zu warten. In diesem Artikel wird erläutert, wie Sie ein effizientes Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume erstellen.

  1. Preroquisites

Bevor Sie beginnen, müssen Sie die folgende Software installieren und einrichten:

  • jdk 8 oder über
  • Maven 3.3 oder über
  • apache Flume 1.9.0 oder über
  • elasticsearch 7.6 oder höhere Version
  • Kibana 7.6.2 oder höhere Version
  1. Spring Boot-Anwendungskonfiguration

Zuerst müssen wir eine Spring Boot-Anwendung erstellen und die erforderlichen Abhängigkeiten hinzufügen:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

in der Datei „application.properties“ hinzufügen die folgende Konfiguration:

# 应用端口号
server.port=8080

# log4j2配置
logging.config=classpath:log4j2.xml

# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414

# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200

In der obigen Konfiguration haben wir die Portnummer der Anwendung, die log4j2-Konfigurationsdatei, die Flume-bezogene Konfiguration und den Elasticsearch-Zugriffs-URI angegeben.

  1. Log Collector

Um Anwendungsprotokolle an Flume zu senden, müssen wir einen benutzerdefinierten log4j2-Appender erstellen.

@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
public class FlumeAppender extends AbstractAppender {

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final FlumeClient client;
    private final String sourceType;

    protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                            FlumeClient client, String sourceType) {
        super(name, filter, layout, true);
        this.client = client;
        this.sourceType = sourceType;
    }

    @PluginFactory
    public static FlumeAppender createAppender(@PluginAttr("name") String name,
                                               @PluginElement("Filters") Filter filter,
                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
                                               @PluginAttr("sourceType") String sourceType,
                                               @PluginAttr("hosts") String hosts) {
        if (name == null) {
            LOGGER.error("FlumeAppender missing name");
            return null;
        }
        if (client == null) {
            LOGGER.error("FlumeAppender missing client");
            return null;
        }
        return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
    }

    private static FlumeClient createClient(String hosts) {
        LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
        String[] hostArray = hosts.split(",");
        for (String host : hostArray) {
            String[] hostParts = host.split(":");
            rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
        }
        Properties props = new Properties();
        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
        props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
        AvroEventSerializer serializer = new AvroEventSerializer();
        serializer.configure(props, false);
        return new FlumeClient(rpcClient, serializer);
    }

    @Override
    public void append(LogEvent event) {
        try {
            byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
            Map<String, String> headers = new HashMap<>();
            headers.put("timestamp", Long.toString(event.getTimeMillis()));
            headers.put("source", "log4j");
            headers.put("sourceType", sourceType);
            Event flumeEvent = EventBuilder.withBody(body, headers);
            client.sendEvent(flumeEvent);
        } catch (Exception e) {
            LOGGER.error("Failed to send event to Flume", e);
        }
    }
}

Im obigen Code haben wir einen log4j2-Appender implementiert, der das Protokollereignis in ein Flume-Ereignis verpackt und an den Flume-Server sendet.

Erstellen Sie eine log4j2-Konfigurationsdatei und konfigurieren Sie FlumeAppender.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="flume"/>
        </Root>
    </Loggers>
</Configuration>

In dieser log4j2-Konfigurationsdatei definieren wir einen FlumeAppender und referenzieren ihn im Root Logger.

  1. Flume-Konfiguration

Wir müssen Flume so konfigurieren, dass es von der Anwendung in Flume Agent gesendete Protokollnachrichten empfängt und an Elasticsearch sendet.

Erstellen Sie eine Flume-Konfigurationsdatei wie unten gezeigt.

# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1

# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414

# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000

# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}

# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1

In der Flume-Konfigurationsdatei definieren wir einen Agenten, eine Quelle und eine Senke. Quelle ist ein Avro-Typ, gebunden an Port 41414, Kanal1 ist ein Speichertyp, die Kapazität beträgt 10000 und die Transaktionskapazität beträgt 1000. Die Senke ist ein ElasticsearchSink-Typ, der einen Index mit dem Namen „type“ auf Port 9200 des lokalen Hosts erstellt und ihn stapelweise an Elasticsearch übermittelt, wenn 1000 Ereignisse erreicht sind.

  1. Elasticsearch- und Kibana-Konfiguration

Abschließend müssen wir Elasticsearch und Kibana konfigurieren. In Elasticsearch müssen wir einen Index erstellen, der dem in der Flume-Konfigurationsdatei definierten Indexnamen entspricht.

In Kibana müssen wir ein Indexschema erstellen. Wählen Sie im Hauptmenü von Kibana „Verwaltung“ und dann „Kibana“. Wählen Sie im Kibana-Indexmuster „Indexmuster erstellen“ aus. Geben Sie den in der Flume-Konfigurationsdatei definierten Indexnamen ein und befolgen Sie die Anweisungen zur Konfiguration.

Wir müssen außerdem ein Dashboard für Kibana erstellen, um die Protokollmeldungen der Anwendung anzuzeigen. Wählen Sie im Hauptmenü von Kibana „Dashboard“ und dann „Dashboard erstellen“. Wählen Sie im Reiter „Visualisierungen“ die Option „Visualisierung hinzufügen“ aus. Wählen Sie Datentabelle aus und konfigurieren Sie die erforderlichen Felder und Visualisierungsoptionen.

  1. Fazit

In diesem Artikel haben wir vorgestellt, wie man mit Spring Boot und Flume ein effizientes System zur Protokollerfassung und -analyse aufbaut. Wir haben einen benutzerdefinierten log4j2-Appender implementiert, um die Protokollereignisse der Anwendung an den Flume-Server zu senden, und haben Elasticsearch und Kibana für die Protokollanalyse und -visualisierung verwendet. Ich hoffe, dieser Artikel kann Ihnen beim Aufbau Ihres eigenen Protokollerfassungs- und Analysesystems helfen.

Das obige ist der detaillierte Inhalt vonErstellen Sie ein Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn