1.
本文主要使用其中的Interceptor和Channel Selector
Interceptor:
对source中的数据在进入channel之前进行拦截做一些处理,比如过滤掉一些数据,或者加上一些key/value等。flume内置了一些拦截器,也可以通过自己编写java代码进行拦截器的自定义。
Channel Selector:
ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
2.需求
需求:使用Flume 采集服务器本地日志(json格式)到kafka,要求flume采集过程中要是碰到了非json数据,则发送到Kafka的错误队列。
此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,在采集日志是进行json格式校验,为json数据和非json数据分别赋予不同的header值,再由channel selecter根据自定义header的值进行判断,发往不同的topic
3.环境说明
flume 使用1.9.0 版本
JAVA 1.8.0
4.自定义拦截器实现
1、Maven依赖配置:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <!--因为部署的flume 有flume-ng-core这个包,所以使用provided过滤掉--> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <!--引入打包插件,将外部包(fastjson)打进jar包里面--> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
2、编写json解析工具类代码
public class JSONUtil { //通过异常捕捉是否是json 异常:不是 正常: 是 public static boolean isJSONValidate(String log) { try { JSONObject.parseObject(log); return true; } catch (Exception e) { return false; } } }
3、编写拦截器代码
import com.fblinux.flume.utils.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class ETLInterceptor implements Interceptor { // 初始化资源 @Override public void initialize() { } // 单event @Override public Event intercept(Event event) { //1 获取header和body当中的数据 Map<String, String> headers = event.getHeaders(); byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); //2 校验数据是否是完整的json //3 是:return event并修改header值为json 不是: return event并修改header值为nojson if (JSONUtil.isJSONValidate(log)) { headers.put("state", "json"); }else { headers.put("state", "nojson"); } return event; } // 多event方法 @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { // 处理event intercept(event); } return list; } //关闭资源 @Override public void close() { } // 定义了一个静态方法 public static class Builder implements Interceptor.Builder{ // 构建拦截器 @Override public Interceptor build() { return new ETLInterceptor(); } // 配置相关的,不需要管,内部已经封装进去了 @Override public void configure(Context context) { } } }
4、打包
5.flume 配置
1、flume配置文件内容如下
# cat conf/file_to_kafka.conf #定义组件 a1.sources = r1 a1.channels = c1 c2 #配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/log/app.* a1.sources.r1.positionFile = /data/flume/taildir_position.json a1.sources.r1.selector.header = state # 多路复用配置 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.mapping.json = c1 a1.sources.r1.selector.mapping.nojson = c2 # 自定义flume拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.fblinux.flume.log.ETLInterceptor$Builder #配置正常日志channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false # 配置异常日志channel a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092 a1.channels.c2.kafka.topic = topic_errlog a1.channels.c2.parseAsFlumeEvent = false #组装 a1.sources.r1.channels = c1 c2
2、启动flume
# ./bin/flume-ng agent -n a1 -f conf/file_to_kafka.conf -Dflume.root.logger=info,console
这个时候自定义拦截器的代码和flume 集成就配置完成了
6.测试
测试启动Kafka 消费者,手动往/data/log/app.* 文件中造一些错误的非json数据和正确的json数据,验证自定义拦截器是否生效
转载请注明:西门飞冰的博客 » flume 多路复用+自定义拦截器实践