首頁 >Java >java教程 >基於Spring Boot和Flume建置日誌收集和分析系統

基於Spring Boot和Flume建置日誌收集和分析系統

WBOY
WBOY原創
2023-06-23 08:53:491876瀏覽

隨著企業系統規模的不斷擴大,系統的日誌越來越龐大,如果沒有一個可靠的日誌收集和分析系統,就很難有效地監控和維護系統。本文將介紹如何基於Spring Boot和Flume建立一個高效的日誌收集和分析系統。

  1. 前置條件

在開始之前,需要安裝並設定以下軟體:

  • JDK 8 以上版本
  • Maven 3.3 或以上版本
  • Apache Flume 1.9.0 或以上版本
  • Elasticsearch 7.6.2 或以上版本
  • Kibana 7.6.2 或以上版本
  1. Spring Boot應用程式設定

首先,我們需要建立一個Spring Boot應用,並且加入所需的依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

在application.properties文件中,新增以下配置:

# 应用端口号
server.port=8080

# log4j2配置
logging.config=classpath:log4j2.xml

# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414

# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200

以上配置中,我們指定了應用程式的連接埠號碼、log4j2設定檔、Flume的相關設定和Elasticsearch的存取URI。

  1. 日誌收集器

為了將應用程式日誌傳送到Flume,我們需要建立一個自訂的log4j2 Appender。

@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
public class FlumeAppender extends AbstractAppender {

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final FlumeClient client;
    private final String sourceType;

    protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                            FlumeClient client, String sourceType) {
        super(name, filter, layout, true);
        this.client = client;
        this.sourceType = sourceType;
    }

    @PluginFactory
    public static FlumeAppender createAppender(@PluginAttr("name") String name,
                                               @PluginElement("Filters") Filter filter,
                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
                                               @PluginAttr("sourceType") String sourceType,
                                               @PluginAttr("hosts") String hosts) {
        if (name == null) {
            LOGGER.error("FlumeAppender missing name");
            return null;
        }
        if (client == null) {
            LOGGER.error("FlumeAppender missing client");
            return null;
        }
        return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
    }

    private static FlumeClient createClient(String hosts) {
        LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
        String[] hostArray = hosts.split(",");
        for (String host : hostArray) {
            String[] hostParts = host.split(":");
            rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
        }
        Properties props = new Properties();
        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
        props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
        AvroEventSerializer serializer = new AvroEventSerializer();
        serializer.configure(props, false);
        return new FlumeClient(rpcClient, serializer);
    }

    @Override
    public void append(LogEvent event) {
        try {
            byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
            Map<String, String> headers = new HashMap<>();
            headers.put("timestamp", Long.toString(event.getTimeMillis()));
            headers.put("source", "log4j");
            headers.put("sourceType", sourceType);
            Event flumeEvent = EventBuilder.withBody(body, headers);
            client.sendEvent(flumeEvent);
        } catch (Exception e) {
            LOGGER.error("Failed to send event to Flume", e);
        }
    }
}

以上程式碼中,我們實作了一個log4j2 Appender,它會將日誌事件打包成一個Flume Event,並傳送到Flume伺服器。

建立一個log4j2設定文件,設定FlumeAppender。

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="flume"/>
        </Root>
    </Loggers>
</Configuration>

在這個log4j2設定檔中,我們定義了一個FlumeAppender,並在Root Logger中引用它。

  1. Flume設定

我們需要設定Flume,在Flume Agent中接收從應用程式傳送的日誌訊息,並將它們傳送到Elasticsearch。

建立一個Flume設定文件,如下所示。

# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1

# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414

# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000

# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}

# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1

在Flume設定檔中,我們定義了一個agent,一個source和一個sink。 source是avro類型,綁定到41414埠上,channel1是一個memory類型,capacity為10000,transactionCapacity為1000。 sink是一個ElasticsearchSink類型,在本地主機的9200連接埠上建立一個名為type的索引,在1000個事件達到時批次提交到Elasticsearch。

  1. Elasticsearch和Kibana配置

最後,我們需要設定Elasticsearch和Kibana。在Elasticsearch中,我們需要建立一個與Flume設定檔中定義的索引名稱相符的索引。

在Kibana中,我們需要建立一個索引模式。在Kibana的主選單中,選擇"Management",然後選擇"Kibana"。在Kibana索引模式中,選擇"Create Index Pattern"。輸入Flume設定檔中定義的索引名稱,並依照提示進行設定。

我們還需要為Kibana建立一個Dashboard,以便查看應用程式的日誌訊息。在Kibana的主選單中,選擇"Dashboard",然後選擇"Create Dashboard"。在"Visualizations"標籤中,選擇"Add a visualization"。選擇"Data Table",然後配置所需的欄位和視覺化選項。

  1. 結論

在本文中,我們介紹如何使用Spring Boot和Flume建立一個高效的日誌收集和分析系統。我們實作了一個自訂的log4j2 Appender,將應用程式的日誌事件傳送到Flume伺服器,並使用Elasticsearch和Kibana進行日誌分析和視覺化。希望這篇文章能對你建立自己的日誌收集和分析系統有所幫助。

以上是基於Spring Boot和Flume建置日誌收集和分析系統的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn