Home >Java >javaTutorial >Build a log collection and analysis system based on Spring Boot and Flume
As the scale of enterprise systems continues to expand, system logs are becoming larger and larger. Without a reliable log collection and analysis system, it is difficult to effectively monitor and maintain the system. This article will introduce how to build an efficient log collection and analysis system based on Spring Boot and Flume.
Before you start, you need to install and set up the following software:
First, we need to create a Spring Boot application and add the required dependencies:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency>
In application.properties file, add the following configuration:
# 应用端口号 server.port=8080 # log4j2配置 logging.config=classpath:log4j2.xml # flume配置 flume.agentName=myflume flume.sourceType=avro flume.clientType=load-balancing flume.hosts=localhost:41414 # elasticsearch配置 spring.elasticsearch.rest.uris=http://localhost:9200
In the above configuration, we specified the port number of the application, log4j2 configuration file, Flume-related configuration and Elasticsearch access URI.
In order to send application logs to Flume, we need to create a custom log4j2 Appender.
@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) public class FlumeAppender extends AbstractAppender { private static final ObjectMapper MAPPER = new ObjectMapper(); private final FlumeClient client; private final String sourceType; protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout, FlumeClient client, String sourceType) { super(name, filter, layout, true); this.client = client; this.sourceType = sourceType; } @PluginFactory public static FlumeAppender createAppender(@PluginAttr("name") String name, @PluginElement("Filters") Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttr("sourceType") String sourceType, @PluginAttr("hosts") String hosts) { if (name == null) { LOGGER.error("FlumeAppender missing name"); return null; } if (client == null) { LOGGER.error("FlumeAppender missing client"); return null; } return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType); } private static FlumeClient createClient(String hosts) { LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient(); String[] hostArray = hosts.split(","); for (String host : hostArray) { String[] hostParts = host.split(":"); rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts); props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000"); AvroEventSerializer serializer = new AvroEventSerializer(); serializer.configure(props, false); return new FlumeClient(rpcClient, serializer); } @Override public void append(LogEvent event) { try { byte[] body = ((StringLayout) this.getLayout()).toByteArray(event); Map<String, String> headers = new HashMap<>(); headers.put("timestamp", Long.toString(event.getTimeMillis())); headers.put("source", "log4j"); headers.put("sourceType", sourceType); Event flumeEvent = EventBuilder.withBody(body, headers); client.sendEvent(flumeEvent); } catch (Exception e) { LOGGER.error("Failed to send event to Flume", e); } } }
In the above code, we implemented a log4j2 Appender, which packages log events into a Flume Event and sends it to the Flume server.
Create a log4j2 configuration file and configure FlumeAppender.
<?xml version="1.0" encoding="UTF-8"?> <Configuration> <Appenders> <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Flume> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="flume"/> </Root> </Loggers> </Configuration>
In this log4j2 configuration file, we define a FlumeAppender and reference it in the Root Logger.
We need to configure Flume to receive log messages sent from the application in the Flume Agent and send them to Elasticsearch.
Create a Flume configuration file as shown below.
# Define the agent name and the agent sources and sinks myflume.sources = mysource myflume.sinks = mysink myflume.channels = channel1 # Define the source myflume.sources.mysource.type = avro myflume.sources.mysource.bind = 0.0.0.0 myflume.sources.mysource.port = 41414 # Define the channel myflume.channels.channel1.type = memory myflume.channels.channel1.capacity = 10000 myflume.channels.channel1.transactionCapacity = 1000 # Define the sink myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink myflume.sinks.mysink.hostNames = localhost:9200 myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd} myflume.sinks.mysink.batchSize = 1000 myflume.sinks.mysink.typeName = ${type} # Link the source and sink with the channel myflume.sources.mysource.channels = channel1 myflume.sinks.mysink.channel = channel1
In the Flume configuration file, we define an agent, a source and a sink. Source is an avro type, bound to port 41414, channel1 is a memory type, capacity is 10000, and transactionCapacity is 1000. The sink is an ElasticsearchSink type that creates an index named type on port 9200 of the local host and submits it to Elasticsearch in batches when 1000 events are reached.
Finally, we need to configure Elasticsearch and Kibana. In Elasticsearch, we need to create an index that matches the index name defined in the Flume configuration file.
In Kibana, we need to create an index schema. In Kibana's main menu, select "Management" and then "Kibana." In Kibana index pattern, select "Create Index Pattern". Enter the index name defined in the Flume configuration file and follow the prompts to configure it.
We also need to create a Dashboard for Kibana to view the application's log messages. In Kibana's main menu, select "Dashboard" and then "Create Dashboard". In the "Visualizations" tab, select "Add a visualization". Select Data Table and configure the required fields and visualization options.
In this article, we introduced how to use Spring Boot and Flume to build an efficient log collection and analysis system. We implemented a custom log4j2 Appender to send the application's log events to the Flume server, and used Elasticsearch and Kibana for log analysis and visualization. I hope this article can help you build your own log collection and analysis system.
The above is the detailed content of Build a log collection and analysis system based on Spring Boot and Flume. For more information, please follow other related articles on the PHP Chinese website!