并发中如何保证缓存DB双写一致性(JAVA栗子)
- 2019 年 11 月 5 日
- 筆記
并发场景中大部分处理的是先更新DB,再(删缓、更新)缓存的处理方式,但是在实际场景中有可能DB更新成功了,但是缓存设置失败了,就造成了缓存与DB数据不一致的问题,下面就以实际情况说下怎么解决此类问题。
名词 Cache:本文内指redis,ReadRequest:请求从Cache、Db中拿去数据,WriteRequest:数据写入DB并删除缓存
若要保证数据库与缓存一直,我们需要采用先删缓存,在更新DB的情况,这时候有的同学可能会问,如果缓存删除成功了,而DB更新失败了怎么办,其实仔细考虑一下,DB虽然失败了,那真正是不会产生数据影响的,而当下次一次请求进来的时候,我们重新把DB中未更新的数据重新塞入缓存,从结果上来看是没有影响的。我们把请求分为ReadRequest 、WriteRequest,大部分同学都知道我们在使用Cache时 首先都会去Cache内查一下,如果Cache中没有拿到数据我们在从数据库中去获取数据,这个时候在高并发的场景的踩过坑的同学都知道恰巧在这时候有更新请求把缓存删除了,这时候大量请求进来,Cache内没有此项数据,请求就会直接落在DB上,就很容易造成缓存雪崩,数据库很可能瞬时就挂掉了,所以处理方案就是我们需要对查询写入的缓存进行排队处理,而正确从cache内获取的姿势:
1、每次查询数据的时候我们吧请求数据放入队列,由队列消费者去检查一下cache是否存在,不存在则进行插入,存在就跳过
2、当前readRequest就自循环,我们不断尝试从cache内去获取数据,拿到数据或超时当前线程立即退出
3、如果拿到数据了就返回结果,没有拿到数据我们就从DB去查
而WriteRequest 的处理相对就简单多了我们直接删除缓存后,更新DB即可,下面上代码说明:
消息队列这里我们基于jdk并发包内的BlockingQueue进行实现,使用MQ(Rabbit,Kafka等)的话思想差不多,只是需要交互一次mq的服务端。首先项目启动时我们在程序后台开辟监听线程,从数据共享缓冲区(ArrayBlockingQueue)内监听消息
public class BlockQueueThreadPool { /** * 核心线程数 */ private Integer corePoolSize = 10; /** * 线程池最大线程数 */ private Integer maximumPoolSize = 20; /** * 线程最大存活时间 */ private Long keepAliveTime = 60L; private ExecutorService threadPool = new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(this.corePoolSize)); public BlockQueueThreadPool() { RequestQueue requestQueue = RequestQueue.getInstance(); BlockingQueue<RequestAction> queue = new ArrayBlockingQueue<>(this.corePoolSize); requestQueue.add(queue); this.threadPool.submit(new JobThread(queue)); } }
PS:ArrayBlockingQueue中很好的利用了Condition中的等待和通知功能,这里我们就能实现对共享通道队列的事件监听了。
public class JobThread implements Callable<Boolean> { private BlockingQueue<RequestAction> queue; public JobThread(BlockingQueue<RequestAction> queue) { this.queue = queue; } @Override public Boolean call() throws Exception { try { while (true) { // ArrayBlockingQueue take方法 获取队列排在首位的对象,如果队列为空或者队列满了,则会被阻塞住 RequestAction request = this.queue.take(); RequestQueue requestQueue = RequestQueue.getInstance(); Map<String, Boolean> tagMap = requestQueue.getTagMap(); if (request instanceof ReadRequest) { Boolean tag = tagMap.get(request.getIdentity()); if (null == tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && tag) { tagMap.put(request.getIdentity(), Boolean.FALSE); } if (tag != null && !tag) { return Boolean.TRUE; } } else if (request instanceof WriteRequest) { // 如果是更新数据库的操作 tagMap.put(request.getIdentity(), Boolean.TRUE); } // 执行请求处理 log.info("缓存队列执行+++++++++++++++++,{}", request.getIdentity()); request.process(); } } catch (Exception e) { e.printStackTrace(); } return Boolean.TRUE; } }
接下来就要定义我们的WriteRequest、ReadRequest了
@Slf4j public class ReadRequest<TResult> extends BaseRequest { public ReadRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { TResult result = (TResult) action.exec(); if (Objects.isNull(result)) { //防止缓存击穿 redis.set(cacheKey, "", 10000); } else { redis.set(cacheKey, result, 10000); } } }
public class WriteRequest<TResult> extends BaseRequest { public WriteRequest(String cacheKey, GetDataSourceInterface action) { super(cacheKey, action); } @Override public void process() { redis.del(cacheKey); action.exec(); } }
这里我们需要坐下判断,在数据库内查询数据为空后把“”写入了缓存,这样子是避免有人恶意请求不存在的数据时造成缓存击穿。接下来就是我们针对各项业务场景中需要获取与更新缓存的路由端了
@UtilityClass public class RouteUtils { public static void route(RequestAction requestAction) { try { BlockingQueue<RequestAction> queue = RequestQueue.getInstance().getQueue(0); queue.put(requestAction); } catch (Exception e) { e.printStackTrace(); } } }
public class RequestQueue { private RequestQueue() { } private List<BlockingQueue<RequestAction>> queues = new ArrayList<>(); private Map<String, Boolean> tagMap = new ConcurrentHashMap<>(1); private static class Singleton { private static RequestQueue queue; static { queue = new RequestQueue(); } private static RequestQueue getInstance() { return queue; } } public static RequestQueue getInstance() { return Singleton.getInstance(); } public void add(BlockingQueue<RequestAction> queue) { this.queues.add(queue); } public BlockingQueue<RequestAction> getQueue(int index) { return this.queues.get(index); } public int size() { return this.queues.size(); } public Map<String, Boolean> getTagMap() { return this.tagMap; } }
这里有一个小的知识点,很多时候我们在保证线程安全的时候多数会使用DSL双锁模型,但是我始终觉得这类代码不够美观,所以我们可以利用JVM的类加载原则,使用静态类包裹初始化类,这样子也一定能保证单例模型,并且代码也更美观了。接下来就可以看下Service的代码
@Service public class StudentService { public Student getStudent(String name) { ReadRequest<Student> readRequest = new ReadRequest<>(name, () -> Student.builder().name(name).age(3).build()); return CacheProcessor.builder().build().getData(readRequest); } public void update(Student student) { WriteRequest<Student> writeRequest = new WriteRequest<>(student.getName(), () -> student); CacheProcessor.builder().build().setData(writeRequest); } }
Service内直接调用了Cachce的处理者,我们通过处理者来获取缓存与更新缓存
@Builder public class CacheProcessor { public <TResult> TResult getData(ReadRequest readRequest) { try { RouteUtils.route(readRequest); long startTime = System.currentTimeMillis(); long waitTime = 0L; while (true) { if (waitTime > 3000) { break; } TResult result = (TResult) readRequest.redis.get(readRequest.getIdentity()); if (!Objects.isNull(result)) { return result; } else { Thread.sleep(20); waitTime = System.currentTimeMillis() - startTime; } } return (TResult) readRequest.get(); } catch (Exception e) { return null; } } public void setData(WriteRequest writeRequest){ RouteUtils.route(writeRequest); } }
这里我们就先把请求数据发送到数据共享渠道,消费者端与当前的ReadRequest线程同步执行,拿到数据后ReadRequest就立马退出,超时后我们就从数据库中获取数据。这里面我使用了java8 @FunctionalInterface 标记接口,对各个业务中需要用到缓存的地方统一进行封装方便调用,以上的代码就已经基本说明并发中Db和Cache双休一致性的解决思路,聪明的小伙伴肯定能看出其实还有很多优化的地方,比如说我们栗子中是单线程吞吐量不高,采用多线程与多消费者端的时候我们还需要保证商品的更新和读取请求需要落在同一个消费者端等等问题。或者在使用外部MQ时,我们除了要考虑以上同一商品的读写保证落在一个消费节点上,还需要考虑队列内有插入缓存请求的时候需要跳过的处理等等,更多情况还需要根据实际情况大家自己去发现咯
参考:中华石杉的教程