由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

flume 多路复用+自定义拦截器实践

JAVA 西门飞冰 1203℃
[隐藏]

1.Flume 内部原理

image-20221018153129769

本文主要使用其中的Interceptor和Channel Selector

Interceptor:

对source中的数据在进入channel之前进行拦截做一些处理,比如过滤掉一些数据,或者加上一些key/value等。flume内置了一些拦截器,也可以通过自己编写java代码进行拦截器的自定义。

Channel Selector:

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

2.需求

需求:使用Flume 采集服务器本地日志(json格式)到kafka,要求flume采集过程中要是碰到了非json数据,则发送到Kafka的错误队列。

image-20221015225940181

此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,在采集日志是进行json格式校验,为json数据和非json数据分别赋予不同的header值,再由channel selecter根据自定义header的值进行判断,发往不同的topic

3.环境说明

flume 使用1.9.0 版本

JAVA 1.8.0

4.自定义拦截器实现

1、Maven依赖配置:

<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <!--因为部署的flume 有flume-ng-core这个包,所以使用provided过滤掉-->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
		<!--引入打包插件,将外部包(fastjson)打进jar包里面-->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2、编写json解析工具类代码

public class JSONUtil {
    //通过异常捕捉是否是json  异常:不是   正常: 是
    public static boolean isJSONValidate(String log) {
        try {
            JSONObject.parseObject(log);
            return true;
        } catch (Exception e) {
            return false;
        }
    }
}

3、编写拦截器代码

import com.fblinux.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class ETLInterceptor implements Interceptor {
    // 初始化资源
    @Override
    public void initialize() {

    }

    // 单event
    @Override
    public Event intercept(Event event) {
        //1 获取header和body当中的数据
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        //2 校验数据是否是完整的json
        //3 是:return event并修改header值为json  不是: return event并修改header值为nojson
        if (JSONUtil.isJSONValidate(log)) {
            headers.put("state", "json");
        }else {
            headers.put("state", "nojson");
        }
        return event;
    }
    // 多event方法
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            // 处理event
            intercept(event);
        }
        return list;
    }

    //关闭资源
    @Override
    public void close() {

    }

    // 定义了一个静态方法
    public static class Builder implements Interceptor.Builder{
        // 构建拦截器
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        // 配置相关的,不需要管,内部已经封装进去了
        @Override
        public void configure(Context context) {

        }
    }
}

4、打包

image-20221015223757295

5、将打好的包上传到flume服务器lib目录下面

5.flume 配置

1、flume配置文件内容如下

# cat conf/file_to_kafka.conf 
#定义组件
a1.sources = r1
a1.channels = c1 c2

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/log/app.*
a1.sources.r1.positionFile = /data/flume/taildir_position.json
a1.sources.r1.selector.header = state
# 多路复用配置
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.mapping.json = c1
a1.sources.r1.selector.mapping.nojson = c2
# 自定义flume拦截器
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.fblinux.flume.log.ETLInterceptor$Builder

#配置正常日志channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 配置异常日志channel
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092
a1.channels.c2.kafka.topic = topic_errlog
a1.channels.c2.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1 c2

2、启动flume

# ./bin/flume-ng agent -n a1 -f conf/file_to_kafka.conf -Dflume.root.logger=info,console

这个时候自定义拦截器的代码和flume 集成就配置完成了

6.测试

测试启动Kafka 消费者,手动往/data/log/app.* 文件中造一些错误的非json数据和正确的json数据,验证自定义拦截器是否生效

 

 

转载请注明:西门飞冰的博客 » flume 多路复用+自定义拦截器实践

喜欢 (1)or分享 (0)