由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

nifi 的两个案例实操

大数据 西门飞冰 636℃
[隐藏]

1.简介

nifi 是一个数据同步的框架,像是flume和datax可以完成的操作,nifi都可以完成,本文展示了两个nifi的实际案例,来学习nifi的使用。

案例一:使用nifi离线同步mysql数据到hdfs,模拟datax的常用场景

案例二:使用nifi实时监控Kafka数据到hdfs,模拟flume的常用场景

2.离线同步mysql数据到hdfs

1、添加处理组

image-20230426114655652

2、添加处理器:ExecuteSQL来读取mysql中的数据

image-20230426114844840

2.1.配置ExecuteSQL处理器

(1)添加mysql的连接池

配置之前需要先创建一个连接池

image-20230426115142777

image-20230427102004711

点击箭头进行编辑连接池(mysql 驱动 要保证nifi的每个节点都存在)

image-20230427104130054

然后将state改为Enable。

image-20230427104308980

到这里mysql的连接池就添加完了

(2)编辑executeSQL信息,主要的就是SQL select query,同步的数据就是这条sql读取的内容

image-20230427104825284

选择当前任务的运行节点,配置调度周期(这里配置1s调度一次主要是为了方便验证数据)

image-20230427113454665

2.2.配置HDFS 处理器

添加putHdfs将数据写出到hdfs。

image-20230427105856000

image-20230427111216245

参数解析:

(1)Hadoop Configuration Resources:hadoop配置文件的地址,写core-site.xml和hdfs-site.xml的地址

(2)Directory:写入到hdfs的路径

(3)Conflict Resolution Strategy:文件名冲突解决策略,默认fail报错,同步文件选择append追加写入,同步数据库选择replace

(4)Writing Strategy: 写入策略,默认写入加改名。打到块大小文件滚动

(5)Block Size: 块大小

(6)Compression codec: 压缩格式,这里选择通用性更高的GZIP

2.3.连接处理器

点击上游处理器的箭头拖动到下游即可连接,连接时需要点击将上游哪种情况的数据输出到下游,这里选择success。

image-20230427111327261

在mysql 连接器配置一下,失败了直接终止

image-20230427112124147

添加成功之后,上游处理完毕。Nifi运行要求每一个处理器的数据情况都要有处理。所以PutHDFS需要自己解决自身数据的情况。

配置数据发送失败或者成功都直接终止当前数据flowFile即可。

image-20230427112215010

配置完成之后如果参数没有问题,处理器会出现终止的按钮,表示没有运行。

image-20230427112308766

启动处理器之后就可以看到有数据输出和进入

image-20230427113313544

查看hdfs目录也有了数据

image-20230427113357238

2.4.修改数据格式为json

在读取和写出数据中间添加一个转换json的处理器,这个处理器是不需要配置什么东西的,只需要把读取过来的数据转换为json格式

image-20230427145259375

配置出现非json格式的数据就将转换停掉

image-20230427145435087

在管道中查看数据

image-20230427145636254

image-20230427145609038

2.5.写入动态目录

需求:添加动态参数控制数据写入到hdfs的路径。

默认情况下所有的上传文件都会发送到同一个文件夹,导致文件混乱。UpdateAttribute 可以定义变量信息

这里定义了一个dt变量值是获取当前时间:${now():format(‘yyyy-MM-dd’)}

image-20230427151232560

这样数据在经过管道传输就可以看到新定义的dt变量和值了

image-20230427151717180

在hdfs 处理器配置dt变量

image-20230427151836702

在hdfs下就可以看到对应日期的目录

image-20230427151803526

完整配置如下:

image-20230427151903321

2.6.修改文件名控制文件滚动

NIFI读取数据的文件名称为自动生成的字符串,没有任何含义,可以通过修改Attribute参数来修改fileName。

这个重新修改了filename的值为 ${now():toNumber()}.data

image-20230427153342668

再次运行在HDFS下就可以看到文件名称已经被修改了

image-20230427151803526

filename修改为如下变量${now():format(‘yyyy-MM-dd-HH’)}.data,表示相同小时的数据进入同一个文件

image-20230427153342668

HDFS 根据需求配置成追加还是替换,要是配置成追加就是同一个文件名字的就会不断追加写入到同一个文件里面

image-20230427154157171

最终结果验证:

image-20230427154416656

3.实时监控kafka数据到hdfs

需求:实时监控kafka主题,将数据同步发送到hdfs。

新建组

image-20230427154850664

3.1.配置Kafka处理器

image-20230427161557947

参数说明:
Kafka Brokers:Kafka 服务器地址,多个地址用逗号分隔。
Topic(s):要消费的 Kafka 主题名,多个主题名用逗号分隔。
Group ID:消费者组 ID,用于标识一组消费者。
Auto Commit Enabled:是否启用自动提交偏移量。
Auto Offset Reset:当没有初始偏移量或当前偏移量无效时,如何处理。
Maximum Poll Records:每次拉取的最大记录数。
Poll Interval:拉取间隔,以毫秒为单位。

3.2.配置HDFS 处理器

image-20230427160027428

3.3.配置UpdateAttribute处理器

UpdateAttribute处理器配置一小时生成一个文件解决小文件问题

image-20230427160306188

3.4.运行测试

image-20230427161822067

4.nifi 背压机制

NiFi的背压机制是一种自动调节流量的机制,用于避免数据流过快而导致系统崩溃或数据丢失的问题。当数据流过快时,NiFi会自动减缓数据的流动速度,以避免数据积压和丢失。具体来说,NiFi会在数据流进入某个处理器之前,检查该处理器的输入队列是否已满,如果已满,则会暂停数据流的输入,直到该队列中的数据被处理完成。这样可以保证系统在高负载时仍能正常运行,并且不会丢失数据。

nifi的背压机制配置是在组里面,默认配置是1G:

image-20230427163518482

5.总结

nifi 非常重要的一个特点就是简单好用,对比其他数据同步的框架最大的优势就是有一个web ui,使用和调试都非常方便,可视化的页面也可以直观的看到数据在什么位置。

如果是一些特定的场景,比如监控mysql 里面实时变化的数据,就要使用专业的CDC工具来实现。

转载请注明:西门飞冰的博客 » nifi 的两个案例实操

喜欢 (1)or分享 (0)