由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Zookeeper分布式锁实现

JAVA 西门飞冰 835℃
[隐藏]

1.为什么需要分布式锁

在分布式架构中,多个程序访问统一资源的时候,传统的synchronized是无效的,它只针对一个JVM进程内多个线程起到同步作用,对跨进程无效。

解决方案:

1、利用数据库select … for update 语句对库存进行锁定,依赖数据库自身特性,遇到跨库(分库分表)处理起来比较麻烦

2、利用Zookeeper、Redis实现分布式锁特性,通过分布式锁调度进程处理,数据程序级别控制,处理更为灵活。

2.“锁”带来的问题

无论是数据库排它锁,还是ZK、Redis的分布式锁都属于“悲观锁”的范畴,虽然以阻塞的方式保证数据的一致性,但并发量也会直线下降,也是要付出的代价。适用分布式锁有以下几个场景:

  • 数据价值大,必须要保证一致性的。例如:金融业务系统间的转账汇款等。
  • 并发量底但重要的业务系统。比如:各种大宗商品的分布式交易。

总结下:重要的但对并发要求高的系统可以使用分布式锁,对于并发量高、数据价值小、对一致性要求没那么高的系统可以进行最终一致性(BASE)处理,保证并发的前提下通过重试、程序矫正、人工补录的方式进行处理。

3.Zookeeper分布式锁的实现原理

image-20221008161959465

3.1.Znode分为四种类型

持久节点:默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在。

持久顺序节点:所谓顺序节点,就是在创建节点时,zookeeper根据创建的时间顺序给该节点名称进行编号。

临时节点:和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

临时顺序节点:顾名思义,临时顺序节点结合临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

3.2.为什么是临时顺序节点

image-20221008162939078

利用临时顺序节点可以避免”惊群效应”,当某一个客户端释放锁以后,其他客户端不会一窝蜂的涌入争抢锁资源,而是按时间顺序一个一个来获取锁进行处理。

3.3.临时顺序节点的实现原理

image-20221008163818550

  • 每一个客户端在尝试获取锁时都会自动由ZK创建该顺序节点的“子节点”,按0001、0002这样的编号标识访问顺序
  • 从第二个客户端开始,ZK不但创建0002子节点,还会监听前一个0001节点。当客户端1处理完毕或者其他原因释放0001节点后,ZK会通过Watch监听机制通知客户端2进行后续处理,以此保证处理的有序性,避免“惊群效应”发生。

4.代码实现ZK分布式锁

需求:通过ZK分布式锁,解决商品库存超卖问题

4.1.Zookeeper安装

docker run --privileged=true -d --name zookeeper -p 2181:2181 -p 8080:8080 -d zookeeper:latest

4.2.创建Spring Boot工程,引入依赖项

Apache Curator(https://curator.apache.org/)是一个比较完善的ZooKeeper客户端框架,通 过封装的一套高级API 简化了ZooKeeper的操作。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.13.0</version>
</dependency>

4.3.开发Service,引入ZK分布式锁

@Service
public class WarehouseService {
    public static int shoe = 10;

    public int outOfWarehouseWithLock() throws Exception {
        //设置ZK客户端重试策略, 每隔5秒重试一次,最多重试10次
        RetryPolicy policy = new ExponentialBackoffRetry(5000, 10);

        //创建ZK客户端,连接到Zookeeper服务器
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.103:2181").retryPolicy(policy).build();
        //创建与Zookeeper服务器的连接
        client.start();

        //声明锁对象,本质就是ZK顺序临时节点
        final InterProcessMutex mutex = new InterProcessMutex(client, "/locks/wh-shoe");
        try {
            //请求锁,创建锁
            mutex.acquire();
            //处理业务逻辑
            if (WarehouseService.shoe &amp;amp;gt; 0) {
                Thread.sleep(1000);
                //扣减库存
                return --WarehouseService.shoe;
            } else {
                //库存不足,
                throw new RuntimeException("运动鞋库存不足");
            }
        } finally {
            //释放锁
            mutex.release();
        }
    }

    public int outOfWarehouse() throws Exception {
        //处理业务逻辑
        if (WarehouseService.shoe &amp;amp;gt; 0) {
            Thread.sleep(1000);
            //扣减库存
            return --WarehouseService.shoe;
        } else {
            throw new RuntimeException("运动鞋库存不足");
        }
    }
}

4.4.开发控制器,对外暴露服务

@RestController
public class OrderController {
    @Resource
    private WarehouseService warehouseService;
    @GetMapping("/create_order")
    public String createOrder(String name){
        try {
            //创建订单
            int i = warehouseService.outOfWarehouseWithLock();
            System.out.println("[" + Thread.currentThread().getName() + "]商品出库成功,剩余库存:" + i);
            return "{\"code\":\"0\"}";
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("[" + Thread.currentThread().getName() + "]商品出库失败,异常信息:" + e.getMessage());
            return "{\"code\":\"500\"}";
        }
    }

    @GetMapping("/reset")
    public String reset(){
        WarehouseService.shoe = 10;
        return "success";
    }
}

转载请注明:西门飞冰的博客 » Zookeeper分布式锁实现

喜欢 (0)or分享 (0)