由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flume自定义拦截器解决数据漂移问题

JAVA 西门飞冰 1696℃
[隐藏]

1.场景介绍

在大数据业务采集场景中,经常会通过Flume把Kafka中的数据落地到HDFS进行持久保存和数据计算。为了数据计算和运维方便,通常会把每天的数据在HDFS通过天分区独立存储。

在数据落入HDFS 天分区目录的过程中,会出现数据跨天存储的问题,本来是2022年6月16日的数据,结果存储到了2022年6月17日的目录。这就是数据漂移。

下面就来分析一下数据漂移产生的原因和解决办法。

2.数据漂移问题产生原因

flume数据漂移问题产生的原因就是:

(1)数据落地到磁盘文件会产生数据的时间,这个时间是业务实际的时间

(2)数据通过采集工具收集到Kafka 会有一定的时间间隔

(3)flume 从Kafka 消费数据,也会有一定的时间间隔,并且flume消费Kafka数据会把当前消费数据的具体时间记录到自己event的header中,这个时间和业务实际时间一定是存在一定长短的不同的。

(4)Flume HDFS Sink默认基于Flume event当中的timestamp时间戳落盘,这个时候要是Flume event时间和业务时间跨天就产生了数据漂移。

image-20221017164315883

3.如何解决数据漂移问题

既然知道了数据漂移问题产生的原因是:业务数据实际时间和Flume event时间不同。

那我们就可以用 Flume 提供的自定义拦截器,来对event中的数据进行ETL处理。通过自己编写拦截器代码将Flume event body中的时间戳赋值给Flume header中的时间戳。

image-20221017164348315

注意:Flume 拦截器是针对source添加的,要是没有source就不能添加拦截器。

所以从Flume 从 Kafka读取数据落地HDFS,处理流程必须是Flume Kafka Source,不能是Flume kafka channel。

4.拦截器代码

日志格式说明:我这边业务输出的日志格式为json格式,时间字段存在ts字段中。不同的日志格式需要解析的代码不同。

1、Maven 依赖配置

		<dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <!--因为部署的flume 有flume-ng-core这个包,所以使用provided过滤掉-->
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
		<!--引入打包插件,将外部包(fastjson)打进jar包里面-->
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2、自定义拦截器代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1 获取header和body
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        Map<String, String> headers = event.getHeaders();
        //2 解析body获取ts字段,放到header的timestamp字段当中
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("ts");
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

3、将程序进行打包,并上传到flume服务器lib目录下吗

5.flume 配置文件

# vim conf/kafka_to_hdfs_log.conf
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = kafka01:9092,kafka02:9092,kafka03:9092
a1.sources.r1.kafka.topics=kafka_log
# 添加自定义的flume拦截器类
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.fblinux.flume.log.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/log/kafka_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然后启动flume,数据漂移的问题就解决了

转载请注明:西门飞冰的博客 » Flume自定义拦截器解决数据漂移问题

喜欢 (2)or分享 (0)