Home >Java >javaTutorial >Build a log collection and analysis system based on Spring Boot and Flume

Build a log collection and analysis system based on Spring Boot and Flume

WBOY
WBOYOriginal
2023-06-23 08:53:491882browse

As the scale of enterprise systems continues to expand, system logs are becoming larger and larger. Without a reliable log collection and analysis system, it is difficult to effectively monitor and maintain the system. This article will introduce how to build an efficient log collection and analysis system based on Spring Boot and Flume.

  1. Prerequisites

Before you start, you need to install and set up the following software:

  • JDK 8 or above
  • Maven 3.3 or above
  • Apache Flume 1.9.0 or above
  • Elasticsearch 7.6.2 or above
  • Kibana 7.6.2 or above
  1. Spring Boot application configuration

First, we need to create a Spring Boot application and add the required dependencies:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

In application.properties file, add the following configuration:

# 应用端口号
server.port=8080

# log4j2配置
logging.config=classpath:log4j2.xml

# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414

# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200

In the above configuration, we specified the port number of the application, log4j2 configuration file, Flume-related configuration and Elasticsearch access URI.

  1. Log Collector

In order to send application logs to Flume, we need to create a custom log4j2 Appender.

@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
public class FlumeAppender extends AbstractAppender {

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final FlumeClient client;
    private final String sourceType;

    protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                            FlumeClient client, String sourceType) {
        super(name, filter, layout, true);
        this.client = client;
        this.sourceType = sourceType;
    }

    @PluginFactory
    public static FlumeAppender createAppender(@PluginAttr("name") String name,
                                               @PluginElement("Filters") Filter filter,
                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
                                               @PluginAttr("sourceType") String sourceType,
                                               @PluginAttr("hosts") String hosts) {
        if (name == null) {
            LOGGER.error("FlumeAppender missing name");
            return null;
        }
        if (client == null) {
            LOGGER.error("FlumeAppender missing client");
            return null;
        }
        return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
    }

    private static FlumeClient createClient(String hosts) {
        LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
        String[] hostArray = hosts.split(",");
        for (String host : hostArray) {
            String[] hostParts = host.split(":");
            rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
        }
        Properties props = new Properties();
        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
        props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
        AvroEventSerializer serializer = new AvroEventSerializer();
        serializer.configure(props, false);
        return new FlumeClient(rpcClient, serializer);
    }

    @Override
    public void append(LogEvent event) {
        try {
            byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
            Map<String, String> headers = new HashMap<>();
            headers.put("timestamp", Long.toString(event.getTimeMillis()));
            headers.put("source", "log4j");
            headers.put("sourceType", sourceType);
            Event flumeEvent = EventBuilder.withBody(body, headers);
            client.sendEvent(flumeEvent);
        } catch (Exception e) {
            LOGGER.error("Failed to send event to Flume", e);
        }
    }
}

In the above code, we implemented a log4j2 Appender, which packages log events into a Flume Event and sends it to the Flume server.

Create a log4j2 configuration file and configure FlumeAppender.

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="flume"/>
        </Root>
    </Loggers>
</Configuration>

In this log4j2 configuration file, we define a FlumeAppender and reference it in the Root Logger.

  1. Flume Configuration

We need to configure Flume to receive log messages sent from the application in the Flume Agent and send them to Elasticsearch.

Create a Flume configuration file as shown below.

# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1

# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414

# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000

# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}

# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1

In the Flume configuration file, we define an agent, a source and a sink. Source is an avro type, bound to port 41414, channel1 is a memory type, capacity is 10000, and transactionCapacity is 1000. The sink is an ElasticsearchSink type that creates an index named type on port 9200 of the local host and submits it to Elasticsearch in batches when 1000 events are reached.

  1. Elasticsearch and Kibana configuration

Finally, we need to configure Elasticsearch and Kibana. In Elasticsearch, we need to create an index that matches the index name defined in the Flume configuration file.

In Kibana, we need to create an index schema. In Kibana's main menu, select "Management" and then "Kibana." In Kibana index pattern, select "Create Index Pattern". Enter the index name defined in the Flume configuration file and follow the prompts to configure it.

We also need to create a Dashboard for Kibana to view the application's log messages. In Kibana's main menu, select "Dashboard" and then "Create Dashboard". In the "Visualizations" tab, select "Add a visualization". Select Data Table and configure the required fields and visualization options.

  1. Conclusion

In this article, we introduced how to use Spring Boot and Flume to build an efficient log collection and analysis system. We implemented a custom log4j2 Appender to send the application's log events to the Flume server, and used Elasticsearch and Kibana for log analysis and visualization. I hope this article can help you build your own log collection and analysis system.

The above is the detailed content of Build a log collection and analysis system based on Spring Boot and Flume. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn