Da die Größe der Unternehmenssysteme immer größer wird, werden die Systemprotokolle immer größer. Ohne ein zuverlässiges Protokollerfassungs- und -analysesystem ist es schwierig, das System effektiv zu überwachen und zu warten. In diesem Artikel wird erläutert, wie Sie ein effizientes Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume erstellen.
Bevor Sie beginnen, müssen Sie die folgende Software installieren und einrichten:
Zuerst müssen wir eine Spring Boot-Anwendung erstellen und die erforderlichen Abhängigkeiten hinzufügen:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency>
in der Datei „application.properties“ hinzufügen die folgende Konfiguration:
# 应用端口号 server.port=8080 # log4j2配置 logging.config=classpath:log4j2.xml # flume配置 flume.agentName=myflume flume.sourceType=avro flume.clientType=load-balancing flume.hosts=localhost:41414 # elasticsearch配置 spring.elasticsearch.rest.uris=http://localhost:9200
In der obigen Konfiguration haben wir die Portnummer der Anwendung, die log4j2-Konfigurationsdatei, die Flume-bezogene Konfiguration und den Elasticsearch-Zugriffs-URI angegeben.
Um Anwendungsprotokolle an Flume zu senden, müssen wir einen benutzerdefinierten log4j2-Appender erstellen.
@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) public class FlumeAppender extends AbstractAppender { private static final ObjectMapper MAPPER = new ObjectMapper(); private final FlumeClient client; private final String sourceType; protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout, FlumeClient client, String sourceType) { super(name, filter, layout, true); this.client = client; this.sourceType = sourceType; } @PluginFactory public static FlumeAppender createAppender(@PluginAttr("name") String name, @PluginElement("Filters") Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttr("sourceType") String sourceType, @PluginAttr("hosts") String hosts) { if (name == null) { LOGGER.error("FlumeAppender missing name"); return null; } if (client == null) { LOGGER.error("FlumeAppender missing client"); return null; } return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType); } private static FlumeClient createClient(String hosts) { LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient(); String[] hostArray = hosts.split(","); for (String host : hostArray) { String[] hostParts = host.split(":"); rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts); props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000"); AvroEventSerializer serializer = new AvroEventSerializer(); serializer.configure(props, false); return new FlumeClient(rpcClient, serializer); } @Override public void append(LogEvent event) { try { byte[] body = ((StringLayout) this.getLayout()).toByteArray(event); Map<String, String> headers = new HashMap<>(); headers.put("timestamp", Long.toString(event.getTimeMillis())); headers.put("source", "log4j"); headers.put("sourceType", sourceType); Event flumeEvent = EventBuilder.withBody(body, headers); client.sendEvent(flumeEvent); } catch (Exception e) { LOGGER.error("Failed to send event to Flume", e); } } }
Im obigen Code haben wir einen log4j2-Appender implementiert, der das Protokollereignis in ein Flume-Ereignis verpackt und an den Flume-Server sendet.
Erstellen Sie eine log4j2-Konfigurationsdatei und konfigurieren Sie FlumeAppender.
<?xml version="1.0" encoding="UTF-8"?> <Configuration> <Appenders> <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Flume> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="flume"/> </Root> </Loggers> </Configuration>
In dieser log4j2-Konfigurationsdatei definieren wir einen FlumeAppender und referenzieren ihn im Root Logger.
Wir müssen Flume so konfigurieren, dass es von der Anwendung in Flume Agent gesendete Protokollnachrichten empfängt und an Elasticsearch sendet.
Erstellen Sie eine Flume-Konfigurationsdatei wie unten gezeigt.
# Define the agent name and the agent sources and sinks myflume.sources = mysource myflume.sinks = mysink myflume.channels = channel1 # Define the source myflume.sources.mysource.type = avro myflume.sources.mysource.bind = 0.0.0.0 myflume.sources.mysource.port = 41414 # Define the channel myflume.channels.channel1.type = memory myflume.channels.channel1.capacity = 10000 myflume.channels.channel1.transactionCapacity = 1000 # Define the sink myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink myflume.sinks.mysink.hostNames = localhost:9200 myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd} myflume.sinks.mysink.batchSize = 1000 myflume.sinks.mysink.typeName = ${type} # Link the source and sink with the channel myflume.sources.mysource.channels = channel1 myflume.sinks.mysink.channel = channel1
In der Flume-Konfigurationsdatei definieren wir einen Agenten, eine Quelle und eine Senke. Quelle ist ein Avro-Typ, gebunden an Port 41414, Kanal1 ist ein Speichertyp, die Kapazität beträgt 10000 und die Transaktionskapazität beträgt 1000. Die Senke ist ein ElasticsearchSink-Typ, der einen Index mit dem Namen „type“ auf Port 9200 des lokalen Hosts erstellt und ihn stapelweise an Elasticsearch übermittelt, wenn 1000 Ereignisse erreicht sind.
Abschließend müssen wir Elasticsearch und Kibana konfigurieren. In Elasticsearch müssen wir einen Index erstellen, der dem in der Flume-Konfigurationsdatei definierten Indexnamen entspricht.
In Kibana müssen wir ein Indexschema erstellen. Wählen Sie im Hauptmenü von Kibana „Verwaltung“ und dann „Kibana“. Wählen Sie im Kibana-Indexmuster „Indexmuster erstellen“ aus. Geben Sie den in der Flume-Konfigurationsdatei definierten Indexnamen ein und befolgen Sie die Anweisungen zur Konfiguration.
Wir müssen außerdem ein Dashboard für Kibana erstellen, um die Protokollmeldungen der Anwendung anzuzeigen. Wählen Sie im Hauptmenü von Kibana „Dashboard“ und dann „Dashboard erstellen“. Wählen Sie im Reiter „Visualisierungen“ die Option „Visualisierung hinzufügen“ aus. Wählen Sie Datentabelle aus und konfigurieren Sie die erforderlichen Felder und Visualisierungsoptionen.
In diesem Artikel haben wir vorgestellt, wie man mit Spring Boot und Flume ein effizientes System zur Protokollerfassung und -analyse aufbaut. Wir haben einen benutzerdefinierten log4j2-Appender implementiert, um die Protokollereignisse der Anwendung an den Flume-Server zu senden, und haben Elasticsearch und Kibana für die Protokollanalyse und -visualisierung verwendet. Ich hoffe, dieser Artikel kann Ihnen beim Aufbau Ihres eigenen Protokollerfassungs- und Analysesystems helfen.
Das obige ist der detaillierte Inhalt vonErstellen Sie ein Protokollerfassungs- und Analysesystem basierend auf Spring Boot und Flume. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!