À mesure que l'échelle des systèmes d'entreprise continue de croître, les journaux système deviennent de plus en plus volumineux. Sans un système fiable de collecte et d'analyse des journaux, il est difficile de surveiller et de maintenir efficacement le système. Cet article explique comment créer un système efficace de collecte et d'analyse de journaux basé sur Spring Boot et Flume.
jdk 8 ou plus
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency>
# 应用端口号 server.port=8080 # log4j2配置 logging.config=classpath:log4j2.xml # flume配置 flume.agentName=myflume flume.sourceType=avro flume.clientType=load-balancing flume.hosts=localhost:41414 # elasticsearch配置 spring.elasticsearch.rest.uris=http://localhost:9200Dans la configuration ci-dessus, nous avons spécifié le numéro de port de l'application, le fichier de configuration log4j2, la configuration liée à Flume et l'URI d'accès Elasticsearch.
Log Collector
@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) public class FlumeAppender extends AbstractAppender { private static final ObjectMapper MAPPER = new ObjectMapper(); private final FlumeClient client; private final String sourceType; protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout, FlumeClient client, String sourceType) { super(name, filter, layout, true); this.client = client; this.sourceType = sourceType; } @PluginFactory public static FlumeAppender createAppender(@PluginAttr("name") String name, @PluginElement("Filters") Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttr("sourceType") String sourceType, @PluginAttr("hosts") String hosts) { if (name == null) { LOGGER.error("FlumeAppender missing name"); return null; } if (client == null) { LOGGER.error("FlumeAppender missing client"); return null; } return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType); } private static FlumeClient createClient(String hosts) { LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient(); String[] hostArray = hosts.split(","); for (String host : hostArray) { String[] hostParts = host.split(":"); rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts); props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000"); AvroEventSerializer serializer = new AvroEventSerializer(); serializer.configure(props, false); return new FlumeClient(rpcClient, serializer); } @Override public void append(LogEvent event) { try { byte[] body = ((StringLayout) this.getLayout()).toByteArray(event); Map<String, String> headers = new HashMap<>(); headers.put("timestamp", Long.toString(event.getTimeMillis())); headers.put("source", "log4j"); headers.put("sourceType", sourceType); Event flumeEvent = EventBuilder.withBody(body, headers); client.sendEvent(flumeEvent); } catch (Exception e) { LOGGER.error("Failed to send event to Flume", e); } } }
<?xml version="1.0" encoding="UTF-8"?> <Configuration> <Appenders> <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Flume> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="flume"/> </Root> </Loggers> </Configuration>Dans ce fichier de configuration log4j2, nous définissons un FlumeAppender et le référençons dans le Root Logger.
Configuration Flume
# Define the agent name and the agent sources and sinks myflume.sources = mysource myflume.sinks = mysink myflume.channels = channel1 # Define the source myflume.sources.mysource.type = avro myflume.sources.mysource.bind = 0.0.0.0 myflume.sources.mysource.port = 41414 # Define the channel myflume.channels.channel1.type = memory myflume.channels.channel1.capacity = 10000 myflume.channels.channel1.transactionCapacity = 1000 # Define the sink myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink myflume.sinks.mysink.hostNames = localhost:9200 myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd} myflume.sinks.mysink.batchSize = 1000 myflume.sinks.mysink.typeName = ${type} # Link the source and sink with the channel myflume.sources.mysource.channels = channel1 myflume.sinks.mysink.channel = channel1Dans le fichier de configuration Flume, nous définissons un agent, une source et un récepteur. La source est de type Avro, liée au port 41414, Channel1 est un type de mémoire, la capacité est de 10 000 et transactionCapacity est de 1 000. Le récepteur est un type ElasticsearchSink qui crée un index nommé type sur le port 9200 de l'hôte local et le soumet à Elasticsearch par lots lorsque 1 000 événements sont atteints.
Configuration d'Elasticsearch et Kibana
Conclusion
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!