Skip to content

Commit 06a1e78

Browse files
committed
add flink logs collector
1 parent 7a58f03 commit 06a1e78

File tree

15 files changed

+1180
-0
lines changed

15 files changed

+1180
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>FlinkLogKafkaAppender</artifactId>
7+
<groupId>com.zhisheng.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>Log4j2KafkaAppender</artifactId>
13+
14+
<properties>
15+
<log4j.version>2.12.1</log4j.version>
16+
<flink.shaded.version>12.0</flink.shaded.version>
17+
<jackson.version>2.10.1</jackson.version>
18+
</properties>
19+
20+
21+
<dependencies>
22+
<dependency>
23+
<groupId>org.apache.logging.log4j</groupId>
24+
<artifactId>log4j-slf4j-impl</artifactId>
25+
<version>${log4j.version}</version>
26+
<scope>${scope}</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.logging.log4j</groupId>
30+
<artifactId>log4j-api</artifactId>
31+
<version>${log4j.version}</version>
32+
<scope>${scope}</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.logging.log4j</groupId>
36+
<artifactId>log4j-core</artifactId>
37+
<version>${log4j.version}</version>
38+
<scope>${scope}</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.flink</groupId>
42+
<artifactId>flink-shaded-jackson</artifactId>
43+
<version>${jackson.version}-${flink.shaded.version}</version>
44+
<scope>${scope}</scope>
45+
</dependency>
46+
</dependencies>
47+
48+
<build>
49+
<plugins>
50+
<plugin>
51+
<groupId>org.apache.maven.plugins</groupId>
52+
<artifactId>maven-shade-plugin</artifactId>
53+
<version>3.1.1</version>
54+
<executions>
55+
<execution>
56+
<id>shade-flink</id>
57+
<phase>package</phase>
58+
<goals>
59+
<goal>shade</goal>
60+
</goals>
61+
<configuration>
62+
<artifactSet>
63+
<includes>
64+
<include>org.apache.kafka:*</include>
65+
</includes>
66+
</artifactSet>
67+
</configuration>
68+
</execution>
69+
</executions>
70+
</plugin>
71+
</plugins>
72+
</build>
73+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package com.zhisheng.log.appender;
2+
3+
import com.zhisheng.log.model.LogEvent;
4+
import com.zhisheng.log.util.ExceptionUtil;
5+
import com.zhisheng.log.util.JacksonUtil;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
8+
import org.apache.kafka.clients.producer.KafkaProducer;
9+
import org.apache.kafka.clients.producer.Producer;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.apache.kafka.common.config.ConfigException;
12+
import org.apache.logging.log4j.core.Filter;
13+
import org.apache.logging.log4j.core.Layout;
14+
import org.apache.logging.log4j.core.appender.AbstractAppender;
15+
import org.apache.logging.log4j.core.config.Property;
16+
import org.apache.logging.log4j.core.config.plugins.Plugin;
17+
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
18+
import org.apache.logging.log4j.core.config.plugins.PluginElement;
19+
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
20+
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
21+
22+
import java.io.File;
23+
import java.io.Serializable;
24+
import java.net.InetAddress;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Properties;
28+
import java.util.UUID;
29+
30+
@Slf4j
31+
@Plugin(name = "KafkaLog4j2Appender", category = "Core", elementType = "appender", printObject = true)
32+
public class KafkaLog4j2Appender extends AbstractAppender {
33+
34+
private final String source;
35+
36+
private final String topic;
37+
38+
private final String level;
39+
40+
private final Producer<String, String> producer;
41+
42+
private String appId;
43+
44+
private String containerId;
45+
46+
private String containerType;
47+
48+
private final String taskName;
49+
50+
private final String taskId;
51+
52+
private String nodeIp;
53+
54+
protected KafkaLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties, String source, String bootstrapServers, String topic, String level) {
55+
super(name, filter, layout, ignoreExceptions, properties);
56+
this.source = source;
57+
this.topic = topic;
58+
this.level = level;
59+
60+
Properties envProperties = System.getProperties();
61+
Map<String, String> envs = System.getenv();
62+
String clusterId = envs.get("CLUSTER_ID");
63+
if (clusterId != null) {
64+
//k8s cluster
65+
appId = clusterId;
66+
containerId = envs.get("HOSTNAME");
67+
if (envs.get("HOSTNAME").contains("taskmanager")) {
68+
containerType = "taskmanager";
69+
} else {
70+
containerType = "jobmanager";
71+
}
72+
//k8s 物理机器 ip
73+
if (envs.get("_HOST_IP_ADDRESS") != null) {
74+
nodeIp = envs.get("_HOST_IP_ADDRESS");
75+
}
76+
} else {
77+
//yarn cluster
78+
String logFile = envProperties.getProperty("log.file");
79+
String[] values = logFile.split(File.separator);
80+
if (values.length >= 3) {
81+
appId = values[values.length - 3];
82+
containerId = values[values.length - 2];
83+
String log = values[values.length - 1];
84+
if (log.contains("jobmanager")) {
85+
containerType = "jobmanager";
86+
} else if (log.contains("taskmanager")) {
87+
containerType = "taskmanager";
88+
} else {
89+
containerType = "others";
90+
}
91+
} else {
92+
log.error("log.file Property ({}) doesn't contains yarn application id or container id", logFile);
93+
}
94+
}
95+
96+
taskName = envProperties.getProperty("taskName", null);
97+
taskId = envProperties.getProperty("taskId", null);
98+
99+
Properties props = new Properties();
100+
for (Property property : properties) {
101+
props.put(property.getName(), property.getValue());
102+
}
103+
104+
if (bootstrapServers != null) {
105+
props.setProperty("bootstrap.servers", bootstrapServers);
106+
} else {
107+
throw new ConfigException("The bootstrap servers property must be specified");
108+
}
109+
if (this.topic == null) {
110+
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
111+
}
112+
113+
String clientIdPrefix = taskId != null ? taskId : appId;
114+
115+
if (clientIdPrefix != null) {
116+
props.setProperty("client.id", clientIdPrefix + "_log");
117+
}
118+
119+
if (props.getProperty("acks") == null) {
120+
props.setProperty("acks", "0");
121+
}
122+
123+
if (props.getProperty("retries") == null) {
124+
props.setProperty("retries", "0");
125+
}
126+
127+
if (props.getProperty("batch.size") == null) {
128+
props.setProperty("batch.size", "16384");
129+
}
130+
131+
if (props.getProperty("linger.ms") == null) {
132+
props.setProperty("linger.ms", "5");
133+
}
134+
135+
if (props.getProperty("compression.type") == null) {
136+
props.setProperty("compression.type", "lz4");
137+
}
138+
139+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
140+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
141+
142+
producer = new KafkaProducer<>(props);
143+
}
144+
145+
146+
@Override
147+
public void append(org.apache.logging.log4j.core.LogEvent event) {
148+
try {
149+
if (level.contains(event.getLevel().toString().toUpperCase()) && !event.getLoggerName().contains("xxx")) { //控制哪些类的日志不收集
150+
producer.send(new ProducerRecord<>(topic, appId, subAppend(event)));
151+
}
152+
} catch (Exception e) {
153+
log.warn("Parsing the log event or send log event to kafka has exception", e);
154+
}
155+
}
156+
157+
private String subAppend(org.apache.logging.log4j.core.LogEvent event) throws JsonProcessingException {
158+
LogEvent logEvent = new LogEvent();
159+
Map<String, String> tags = new HashMap<>();
160+
String logMessage = null;
161+
try {
162+
InetAddress inetAddress = InetAddress.getLocalHost();
163+
tags.put("host_name", inetAddress.getHostName());
164+
tags.put("host_ip", inetAddress.getHostAddress());
165+
} catch (Exception e) {
166+
log.error("Error getting the ip and host name of the node where the job({}) is running", appId, e);
167+
} finally {
168+
try {
169+
logMessage = ExceptionUtil.stacktraceToString(event.getThrown());
170+
logEvent.setContent(logMessage);
171+
} catch (Exception e) {
172+
if (logMessage != null) {
173+
logMessage = logMessage + "\n\t" + e.getMessage();
174+
}
175+
logEvent.setContent(logMessage);
176+
} finally {
177+
logEvent.setId(UUID.randomUUID().toString());
178+
logEvent.setTimestamp(event.getTimeMillis());
179+
logEvent.setSource(source);
180+
if (logMessage != null) {
181+
logMessage = event.getMessage().getFormattedMessage() + "\n" + logMessage;
182+
} else {
183+
logMessage = event.getMessage().getFormattedMessage();
184+
}
185+
logEvent.setContent(logMessage);
186+
187+
StackTraceElement eventSource = event.getSource();
188+
tags.put("class_name", eventSource.getClassName());
189+
tags.put("method_name", eventSource.getMethodName());
190+
tags.put("file_name", eventSource.getFileName());
191+
tags.put("line_number", String.valueOf(eventSource.getLineNumber()));
192+
193+
tags.put("logger_name", event.getLoggerName());
194+
tags.put("level", event.getLevel().toString());
195+
tags.put("thread_name", event.getThreadName());
196+
tags.put("app_id", appId);
197+
tags.put("container_id", containerId);
198+
tags.put("container_type", containerType);
199+
if (taskId != null) {
200+
tags.put("task_id", taskId);
201+
}
202+
if (taskName != null) {
203+
tags.put("task_name", taskName);
204+
}
205+
if (nodeIp != null) {
206+
tags.put("node_ip", nodeIp);
207+
}
208+
logEvent.setTags(tags);
209+
}
210+
}
211+
return JacksonUtil.toJson(logEvent);
212+
}
213+
214+
215+
@PluginFactory
216+
public static KafkaLog4j2Appender createAppender(@PluginElement("Layout") final Layout<? extends Serializable> layout,
217+
@PluginElement("Filter") final Filter filter,
218+
@Required(message = "No name provided for KafkaLog4j2Appender") @PluginAttribute("name") final String name,
219+
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
220+
@Required(message = "No bootstrapServers provided for KafkaLog4j2Appender") @PluginAttribute("bootstrapServers") final String bootstrapServers,
221+
@Required(message = "No source provided for KafkaLog4j2Appender") @PluginAttribute("source") final String source,
222+
@Required(message = "No topic provided for KafkaLog4j2Appender") @PluginAttribute("topic") final String topic,
223+
@Required(message = "No level provided for KafkaLog4j2Appender") @PluginAttribute("level") final String level,
224+
@PluginElement("Properties") final Property[] properties) {
225+
return new KafkaLog4j2Appender(name, filter, layout, ignoreExceptions, properties, source, bootstrapServers, topic, level);
226+
}
227+
228+
@Override
229+
public void stop() {
230+
super.stop();
231+
if (producer != null) {
232+
producer.close();
233+
}
234+
}
235+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.zhisheng.log.model;
2+
3+
import lombok.Data;
4+
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
@Data
9+
public class LogEvent {
10+
11+
private String source; // default is flink, maybe others will use this kafka appender in future
12+
13+
private String id; // log id, default it is UUID
14+
15+
private Long timestamp;
16+
17+
private String content; // log message
18+
19+
private Map<String, String> tags = new HashMap<>(); // tags of the log, eg: host_name, application_id, job_name etc
20+
21+
}

0 commit comments

Comments
 (0)