7.4 读写锁原理

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁读读可以并发,提高性能。类似于数据库中select ... from ... lock in share mode
提供一个数据容器类内部分别使用读锁保护数据的read(),写锁保护数据的write()方法。

@Slf4j(topic = "c.DataContainer")
class DataContainer {
    private Object data;
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock r = rw.readLock();
    private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

    public Object read() {
        log.debug("获取读锁...");
        r.lock();
        try {
            log.debug("读取");
            Thread.sleep(1000);
            return data;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            log.debug("释放读锁...");
            r.unlock();
        }
    }

    public void write() {
        log.debug("获取写锁...");
        w.lock();
        try {
            log.debug("写入");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            log.debug("释放写锁...");
            w.unlock();
        }
    }
}

测试读锁-读锁可以并发

public class TestReentrantReadWriteLock {
    public static void main(String[] args) {
        DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        new Thread(() -> {
            dataContainer.read();
        }, "t2").start();
    }
}

输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响

17:41:10.773 [t2] - 获取读锁...
17:41:10.773 [t1] - 获取读锁...
17:41:10.774 [t2] - 读取
17:41:10.774 [t1] - 读取
17:41:11.776 [t1] - 释放读锁...
17:41:11.779 [t2] - 释放读锁...

测试读锁-写锁相互阻塞

public class TestReentrantReadWriteLock {
    public static void main(String[] args) throws InterruptedException {
        DataContainer dataContainer = new DataContainer();
        new Thread(() -> {
            dataContainer.read();
        }, "t1").start();

        Thread.sleep(100);
        
        new Thread(() -> {
            dataContainer.write();
        }, "t2").start();
    }
}

输出结果

17:50:57.280 [t1] - 获取读锁...
17:50:57.281 [t1] - 读取
17:50:57.384 [t2] - 获取写锁...
17:50:58.287 [t1] - 释放读锁...
17:50:58.287 [t2] - 写入
17:50:59.291 [t2] - 释放写锁...

写锁-写锁也是相互阻塞的,这里就不测试了
注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();
try {
    // ...
    w.lock();
    try {
        // ...
    } finally {
        w.unlock();
    }
} finally {
    r.unlock();
}
  • 重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {
     Object data;
     // 是否有效,如果失效,需要重新计算 data
     volatile boolean cacheValid;
     final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
     void processCachedData() {
         rwl.readLock().lock();
         if (!cacheValid) {
             // 获取写锁前必须释放读锁
             rwl.readLock().unlock();
             rwl.writeLock().lock();
             try {
                 // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                 if (!cacheValid) {
                 data = ...
                 cacheValid = true;
                 }
                 // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
                 rwl.readLock().lock();
             } finally {
                 rwl.writeLock().unlock();
             }
         }
         // 自己用完数据, 释放读锁 
         try {
             use(data);
         } finally {
             rwl.readLock().unlock();
         }
     }
}

缓存

缓存更新策略

更新时,是先清缓存还是先更新数据库?
先清缓存image.png
假设有两个线程,线程 B 清空缓存后,其 CPU 时间片到,线程 A 拿到 CPU 控制权,此时线程 A 查询缓存未命中,便去查询数据库,并将查询到的结果放入缓存,此时线程 A 的 CPU 时间片到,线程 B 继续执行,更新数据库。这样就导致线程 A 后续查询到的结果都是旧值。
先更新数据库
image.png
假设有两个线程,线程 B 更新数据库,此时 CPU 时间片到,切换至线程 A,线程 A 查询缓存,为旧数据。线程 A 时间片到,切换至线程 B 继续执行,线程 B 在更新完数据库后,接着清空缓存。线程 A 在重新拿到 CPU 控制权后,便会去查询数据库中的数据,并将结果更新至缓存中。这样后续查询到的结果都是新值。
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
image.png
这种情况出现几率非常小。

读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存

public class GenericDao {
    static String URL = "jdbc:mysql://localhost:3306/test";
    static String USERNAME = "root";
    static String PASSWORD = "root";

    public <T> List<T> queryList(Class<T> beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
        return queryList(sql, mapper, args);
    }

    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        BeanRowMapper<T> mapper = new BeanRowMapper<>(beanClass);
        return queryOne(sql, mapper, args);
    }

    private <T> List<T> queryList(String sql, RowMapper<T> mapper, Object... args) {
        try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
            try (PreparedStatement psmt = conn.prepareStatement(sql)) {
                if (args != null) {
                    for (int i = 0; i < args.length; i++) {
                        psmt.setObject(i + 1, args[i]);
                    }
                }
                List<T> list = new ArrayList<>();
                try (ResultSet rs = psmt.executeQuery()) {
                    while (rs.next()) {
                        T obj = mapper.map(rs);
                        list.add(obj);
                    }
                }
                return list;
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    private <T> T queryOne(String sql, RowMapper<T> mapper, Object... args) {
        List<T> list = queryList(sql, mapper, args);
        return list.size() == 0 ? null : list.get(0);
    }

    public int update(String sql, Object... args) {
        System.out.println("sql: [" + sql + "] params:" + Arrays.toString(args));
        try (Connection conn = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
            try (PreparedStatement psmt = conn.prepareStatement(sql)) {
                if (args != null) {
                    for (int i = 0; i < args.length; i++) {
                        psmt.setObject(i + 1, args[i]);
                    }
                }
                return psmt.executeUpdate();
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    interface RowMapper<T> {
        T map(ResultSet rs);
    }

    static class BeanRowMapper<T> implements RowMapper<T> {

        private Class<T> beanClass;
        private Map<String, PropertyDescriptor> propertyMap = new HashMap<>();

        public BeanRowMapper(Class<T> beanClass) {
            this.beanClass = beanClass;
            try {
                BeanInfo beanInfo = Introspector.getBeanInfo(beanClass);
                PropertyDescriptor[] propertyDescriptors = beanInfo.getPropertyDescriptors();
                for (PropertyDescriptor pd : propertyDescriptors) {
                    propertyMap.put(pd.getName().toLowerCase(), pd);
                }
            } catch (IntrospectionException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public T map(ResultSet rs) {
            try {
                ResultSetMetaData metaData = rs.getMetaData();
                int columnCount = metaData.getColumnCount();
                T t = beanClass.newInstance();
                for (int i = 1; i <= columnCount; i++) {
                    String columnLabel = metaData.getColumnLabel(i);
                    PropertyDescriptor pd = propertyMap.get(columnLabel.toLowerCase());
                    if (pd != null) {
                        pd.getWriteMethod().invoke(t, rs.getObject(i));
                    }
                }
                return t;
            } catch (SQLException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

public class GenericCachedDao extends GenericDao{
    private GenericDao dao = new GenericDao();
    private Map<SqlPair, Object> map = new HashMap<>();
    private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();


    @Override
    public <T> T queryOne(Class<T> beanClass, String sql, Object... args) {
        SqlPair key = new SqlPair(sql, args);
        rw.readLock().lock();
        try {
            // 先从缓存中找,找到直接返回
            T value = (T) map.get(key);
            if(value != null) {
                return value;
            }
        } finally {
            rw.readLock().unlock();
        }
        rw.writeLock().lock();
        try {
            // 可能有多个线程进入查询
            T value = (T) map.get(key);
            if(value == null) {
                // 缓存中没有,查询数据库
                value = dao.queryOne(beanClass, sql, args);
                map.put(key, value);
            }
            return value;
        } finally {
            rw.writeLock().unlock();
        }
    }

    @Override
    public int update(String sql, Object... args) {
        rw.writeLock().lock();
        try {
            int update = dao.update(sql, args);
            // 清空缓存
            map.clear();
            return update;
        } finally {
            rw.writeLock().unlock();
        }
    }
}

:::info
注意:

  • 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑:
    • 适合读多写少,如果写操作比较频繁,以上实现性能低
    • 没有考虑缓存容量
    • 没有考虑缓存过期
    • 只适合单机
    • 并发性还是低,目前只会用一把锁
    • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
  • 乐观锁实现:用 CAS 去更新
    :::

StampedLock

该类自 JDK8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加/解读锁

long stamp = lock.readLock();
lock.unlockRead(stamp);

加/解写锁

long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持tryOptimisticRead()方法(乐观读),读取完毕后需要做一次戳校验。如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)) {
    // 锁升级
}

提供一个数据容器类,内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法

@Slf4j(topic = "c.DataContainerStamped")
class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) {
        this.data = data;
    }

    public int read(int readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("optimistic read locking...{}", stamp);
        try {
            Thread.sleep(readTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        if (lock.validate(stamp)) {
            log.debug("read finish...{}, data: {}", stamp, data);
            return data;
        }
        // 锁升级 - 读锁
        log.debug("updating to read lock...{}", stamp);
        try {
            stamp = lock.readLock();
            log.debug("read lock {}", stamp);
            Thread.sleep(readTime);
            log.debug("read finish...{}, data: {}", stamp, data);
            return data;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            log.debug("read unlock {}", stamp);
            lock.unlockRead(stamp);
        }
    }

    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("write lock {}", stamp);
        try {
            Thread.sleep(2000);
            this.data = newData;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            log.debug("write unlock {}", stamp);
            lock.unlockWrite(stamp);
        }

    }
}

测试读 - 读,可以优化

public class TestStampedLock {
    public static void main(String[] args) throws InterruptedException {
        DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainerStamped.read(1000);
        }, "t1").start();

        Thread.sleep(500);

        new Thread(() -> {
            dataContainerStamped.read(0);
        }, "t2").start();
    }
}

输出结果,可以看到实际没有加读锁

19:51:41.647 [t1] - optimistic read locking...256
19:51:42.152 [t2] - optimistic read locking...256
19:51:42.152 [t2] - read finish...256, data: 1
19:51:42.654 [t1] - read finish...256, data: 1

测试读 - 写时,优化读补加读锁

public class TestStampedLock {
    public static void main(String[] args) throws InterruptedException {
        DataContainerStamped dataContainerStamped = new DataContainerStamped(1);
        new Thread(() -> {
            dataContainerStamped.read(1000);
        }, "t1").start();

        Thread.sleep(500);

        new Thread(() -> {
            dataContainerStamped.write(100);
        }, "t2").start();
    }
}

输出结果

19:54:36.255 [t1] - optimistic read locking...256
19:54:36.759 [t2] - write lock 384
19:54:37.261 [t1] - updating to read lock...256      // 锁升级
19:54:38.765 [t2] - write unlock 384
19:54:38.765 [t1] - read lock 513   // 戳由 256 变为 513
19:54:39.770 [t1] - read finish...513, data: 100
19:54:39.771 [t1] - read unlock 513

:::info
注意:

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入
    :::

相关推荐

  1. C++

    2024-07-13 19:16:02       43 阅读
  2. 关于Redission

    2024-07-13 19:16:02       32 阅读
  3. 乐观、悲观、互斥

    2024-07-13 19:16:02       26 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-13 19:16:02       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-13 19:16:02       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-13 19:16:02       58 阅读
  4. Python语言-面向对象

    2024-07-13 19:16:02       69 阅读

热门阅读

  1. conda常用命令

    2024-07-13 19:16:02       22 阅读
  2. 卸载docker

    2024-07-13 19:16:02       19 阅读
  3. Redis的一个典型应用

    2024-07-13 19:16:02       16 阅读
  4. Python 列表深度解析:功能强大的数据结构

    2024-07-13 19:16:02       23 阅读
  5. 什么是天使投资

    2024-07-13 19:16:02       20 阅读
  6. C++中的自定义数据类型:类和结构体

    2024-07-13 19:16:02       18 阅读
  7. 【PLC】基本概念

    2024-07-13 19:16:02       19 阅读
  8. package.json 脚本配置使用环境文件

    2024-07-13 19:16:02       22 阅读