由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 实现http数据采集服务

JAVA 西门飞冰 1087℃
[隐藏]

1.前言

本文主要通过Netty实现一个Http协议的数据采集服务,并将Netty接收的请求转换成消息发送给Kafka:

关于采集程序的几个规则:

1、Netty判断请求的合规性通过request header中携带的key和value进行判断,没有携带指定key和value的request请求全部丢弃。

2、采集的数据仅包括request的body全部内容,且request body的内容需要为json格式,如果需要请求的header信息,可以在Channel中扩展,代码中有获取request header的示例。

3、因为这是自己写着练手用的,没有实际的业务数据,所以采集程序并没有对数据合规性进行校验,要是需要对数据合规性进行判断,可以在Channel中扩展,也可以把这个需求交给消费者或者具体使用数据的团队。

采集架构图如下:

image-20221112203229841

说明:Kafka 集群相关内容略过,请各位看官自己搭建。

2.实现代码

代码结构如下:

image-20221112164718236

(1)导入所需的依赖包

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.74.Final</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.5.2</version>
    </dependency>
</dependencies>

(2)添加my.setting配置文件,通过hutool工具包中的Setting类型读取配置文件信息

#netty启动监听的端口号
server.port=8080

#kafka集群服务
kafka.servers=172.16.247.3:9092,172.16.247.4:9092,172.16.247.4:9092

#指定kafka中的topic
kafka.topic=netty-collect

# 设置请求的校验header信息
request.headerkey=check
request.headervalue=haha

添加配置文件获取类

public class MyConfig {
    public static Setting setting;

    static {
        //读取配置文件,读取文件的路径是在classpath下
        setting = new Setting("my.setting");
    }
}

(3)NettyCollectServer类,负责启动Netty服务,添加http相关的编解码器。

import cn.hutool.core.util.StrUtil;
import com.fblinux.config.MyConfig;
import com.fblinux.handler.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;

public class NettyCollectServer {
    public static void main(String[] args) throws InterruptedException {
        // 定义BossGroup,用于接收用户的链接请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

        // 定义WorkerGroup,用于业务逻辑的处理,默认线程数:cpu核数*2
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // Netty启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    // 指定通道处理类型,使用的是Nio方式
                    .channel(NioServerSocketChannel.class)
                    // 业务逻辑处理
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    // 对于http协议的解码器
                                    .addLast(new HttpRequestDecoder())
                                    // 对于http协议的编码器,用于数据响应
                                    .addLast(new HttpResponseEncoder())
                                    // 将请求的数据,url中或请求头中聚合在一起,得到FullHttpRequest对象
                                    // 传入参数是最大长度
                                    .addLast(new HttpObjectAggregator(1024 * 128))
                                    // 添加自定义处理器
                                    .addLast(new ServerHandler());
                        }
                    });

            // 启动服务
            int port = MyConfig.setting.getInt("server.port");
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println(StrUtil.format("Netty服务启动了,端口号为:{}。。。。。", port));

            //等待监听关闭的信号,阻塞当前的线程,等待客户端的请求
            future.channel().closeFuture().sync();
        } finally {
            //优雅的关闭服务
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

(4)ServerHandler类,负责接收用户请求,以及发送kafka消息。

import cn.hutool.core.map.MapUtil;
import cn.hutool.json.JSONUtil;
import com.fblinux.config.MyConfig;
import com.fblinux.service.KafkaService;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

// http web服务处理器
public class ServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private KafkaService kafkaService = new KafkaService();

    // Channel数据读取方法,每一条用户请求都会经过这里
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {

        // 从配置中获取校验的header key和value信息
        String headerkey = MyConfig.setting.get("request.headerkey");
        String headervalue = MyConfig.setting.get("request.headervalue");

        // 获取校验header的值
        String headerCehck = fullHttpRequest.headers().get(headerkey);
        // 判断请求是否配置了指定的请求头和值
        if (headerCehck.equals(headervalue)){
            // 获取请求中的body
            String body = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
            // 发送消息到Kafka
            this.kafkaService.sendMsg(body);
            // 响应客户端请求
            String result = JSONUtil.toJsonStr(MapUtil.builder().put("status", "ok").build());
            this.response(ctx,result);
        }else {
            // 响应客户端请求
            String result = JSONUtil.toJsonStr(MapUtil.builder().put("status", "false").build());
            this.response(ctx,result);
        }
    }

    private void response(ChannelHandlerContext ctx,String result){
        // 给客户端响应
        DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        // 响应的内容
        httpResponse.content().writeBytes(Unpooled.copiedBuffer(result, CharsetUtil.UTF_8));
        // 设置响应头
        httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"application/json; charset=utf-8");
        //响应完成后需要将Channel关闭掉
        ctx.writeAndFlush(httpResponse).addListener(ChannelFutureListener.CLOSE);
    }
}

(5)KafkaService类,完成发送kafka消息的业务逻辑。

// 实现向Kafka发送消息逻辑
public class KafkaService {
    private KafkaProducer<String, String> producer;

    public KafkaService() {
        // 配置生产者参数
        Properties properties = new Properties();
        // 配置集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, MyConfig.setting.get("kafka.servers"));
        // 消息key的序列化方式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        // 消息value的序列化方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

        this.producer = new KafkaProducer<String, String>(properties);

    }

    // 发送消息到Kafka
    public Boolean sendMsg(String body) {
        // 获取发送数据的topic
        String topic = MyConfig.setting.get("kafka.topic");
        // 发送数据
        this.producer.send(new ProducerRecord<>(topic, body), (metadata, exception) -> {
            if (null != exception) {
                // 发送失败
                System.out.println(exception);
            }
        });
        return true;
    }
}

3.测试

测试正常的请求:即配置了正确的header key和header value

image-20221112165412294

通过postman发送数据,可以看到netty 返回的状态是OK

 image-20221112165356474

在Kafka 上也可以消费到body里面的数据

image-20221112165506160

发送错误的请求:即配置了错误的header key和header value,可以看到netty server直接返回了失败,观察Kafka 也没有数据发送过来

image-20221112165653529

转载请注明:西门飞冰的博客 » Netty 实现http数据采集服务

喜欢 (1)or分享 (0)