ホームページ >Java >&#&チュートリアル >Spring BootとFlumeをベースにしたログ収集・分析システムを構築する

Spring BootとFlumeをベースにしたログ収集・分析システムを構築する

WBOY
WBOYオリジナル
2023-06-23 08:53:491880ブラウズ

企業システムの規模が拡大するにつれ、システムログはますます大規模になり、信頼性の高いログ収集および分析システムがなければ、システムを効果的に監視および保守することが困難になります。この記事では、Spring Boot と Flume をベースにした効率的なログ収集・分析システムの構築方法を紹介します。

  1. 前提条件

開始する前に、次のソフトウェアをインストールして設定する必要があります:

  • JDK 8 以降
  • Maven 3.3 以降
  • Apache Flume 1.9.0 以降
  • Elasticsearch 7.6.2 以降
  • Kibana 7.6.2 以降
  1. Spring Boot アプリケーションの構成

まず、Spring Boot アプリケーションを作成し、必要な依存関係を追加する必要があります。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

application.properties ファイルに、次の設定:

# 应用端口号
server.port=8080

# log4j2配置
logging.config=classpath:log4j2.xml

# flume配置
flume.agentName=myflume
flume.sourceType=avro
flume.clientType=load-balancing
flume.hosts=localhost:41414

# elasticsearch配置
spring.elasticsearch.rest.uris=http://localhost:9200

上記の設定では、アプリケーションのポート番号、log4j2 設定ファイル、Flume 関連の設定、および Elasticsearch のアクセス URI を指定しました。

  1. ログ コレクター

アプリケーション ログを Flume に送信するには、カスタム log4j2 アペンダーを作成する必要があります。

@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
public class FlumeAppender extends AbstractAppender {

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private final FlumeClient client;
    private final String sourceType;

    protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout,
                            FlumeClient client, String sourceType) {
        super(name, filter, layout, true);
        this.client = client;
        this.sourceType = sourceType;
    }

    @PluginFactory
    public static FlumeAppender createAppender(@PluginAttr("name") String name,
                                               @PluginElement("Filters") Filter filter,
                                               @PluginElement("Layout") Layout<? extends Serializable> layout,
                                               @PluginAttr("sourceType") String sourceType,
                                               @PluginAttr("hosts") String hosts) {
        if (name == null) {
            LOGGER.error("FlumeAppender missing name");
            return null;
        }
        if (client == null) {
            LOGGER.error("FlumeAppender missing client");
            return null;
        }
        return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType);
    }

    private static FlumeClient createClient(String hosts) {
        LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient();
        String[] hostArray = hosts.split(",");
        for (String host : hostArray) {
            String[] hostParts = host.split(":");
            rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1])));
        }
        Properties props = new Properties();
        props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance");
        props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts);
        props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000");
        AvroEventSerializer serializer = new AvroEventSerializer();
        serializer.configure(props, false);
        return new FlumeClient(rpcClient, serializer);
    }

    @Override
    public void append(LogEvent event) {
        try {
            byte[] body = ((StringLayout) this.getLayout()).toByteArray(event);
            Map<String, String> headers = new HashMap<>();
            headers.put("timestamp", Long.toString(event.getTimeMillis()));
            headers.put("source", "log4j");
            headers.put("sourceType", sourceType);
            Event flumeEvent = EventBuilder.withBody(body, headers);
            client.sendEvent(flumeEvent);
        } catch (Exception e) {
            LOGGER.error("Failed to send event to Flume", e);
        }
    }
}

上記のコードでは、ログ イベントを Flume イベントにパッケージ化して Flume サーバーに送信する log4j2 アペンダーを実装しました。

log4j2 設定ファイルを作成し、FlumeAppender を設定します。

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
    <Appenders>
        <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Flume>
    </Appenders>
    <Loggers>
        <Root level="info">
            <AppenderRef ref="flume"/>
        </Root>
    </Loggers>
</Configuration>

この log4j2 構成ファイルでは、FlumeAppender を定義し、ルート ロガーでそれを参照します。

  1. Flume 構成

Flume エージェント内のアプリケーションから送信されたログ メッセージを受信し、Elasticsearch に送信するように Flume を構成する必要があります。

以下に示すように Flume 設定ファイルを作成します。

# Define the agent name and the agent sources and sinks
myflume.sources = mysource
myflume.sinks = mysink
myflume.channels = channel1

# Define the source
myflume.sources.mysource.type = avro
myflume.sources.mysource.bind = 0.0.0.0
myflume.sources.mysource.port = 41414

# Define the channel
myflume.channels.channel1.type = memory
myflume.channels.channel1.capacity = 10000
myflume.channels.channel1.transactionCapacity = 1000

# Define the sink
myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink
myflume.sinks.mysink.hostNames = localhost:9200
myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd}
myflume.sinks.mysink.batchSize = 1000
myflume.sinks.mysink.typeName = ${type}

# Link the source and sink with the channel
myflume.sources.mysource.channels = channel1
myflume.sinks.mysink.channel = channel1

Flume 構成ファイルでは、エージェント、ソース、シンクを定義します。ソースは avro タイプ、ポート 41414 にバインドされ、channel1 はメモリ タイプ、容量は 10000、transactionCapacity は 1000 です。シンクは ElasticsearchSink タイプで、ローカル ホストのポート 9200 に type という名前のインデックスを作成し、イベントが 1000 件に達するとバッチで Elasticsearch に送信します。

  1. Elasticsearch と Kibana の構成

最後に、Elasticsearch と Kibana を構成する必要があります。 Elasticsearch では、Flume 設定ファイルで定義されたインデックス名と一致するインデックスを作成する必要があります。

Kibana では、インデックス スキーマを作成する必要があります。 Kibana のメイン メニューで、[管理]、[Kibana] の順に選択します。 Kibanaのインデックスパターンで「インデックスパターンの作成」を選択します。 Flume 構成ファイルで定義されているインデックス名を入力し、プロンプトに従って構成します。

アプリケーションのログ メッセージを表示するには、Kibana のダッシュボードを作成する必要もあります。 Kibana のメイン メニューで、[ダッシュボード]、[ダッシュボードの作成] の順に選択します。 「ビジュアライゼーション」タブで、「ビジュアライゼーションの追加」を選択します。データ テーブルを選択し、必要なフィールドと視覚化オプションを設定します。

  1. 結論

この記事では、Spring Boot と Flume を使用して効率的なログ収集と分析システムを構築する方法を紹介しました。アプリケーションのログ イベントを Flume サーバーに送信するためにカスタム log4j2 Appender を実装し、ログ分析と視覚化に Elasticsearch と Kibana を使用しました。この記事が独自のログ収集および分析システムの構築に役立つことを願っています。

以上がSpring BootとFlumeをベースにしたログ収集・分析システムを構築するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。