Back
Featured image of post Caffeine缓存(二)

Caffeine缓存(二)

一、主体流程

上篇文章主要介绍了Caffeine缓存的各种策略及Caffeine缓存的配置,本篇文章主要为大家分享一下Caffeine缓存的主体源码部分的流程解析,后续文章为大家分享一下咖啡因缓存的淘汰策略和淘汰算法W-TinyLFU的分析。言归正传我们,下面我们来看一下主体流程图。

通过主体流程图我们可以发现Caffeine缓存主要采用了currentHashMap用来保存有效的数据,并通过异步的操作来调节整体缓存,包括增加缓存频率、过期缓存、淘汰缓存、整理缓存等。通过异步操作能够更加高效的来保证使用Caffeine缓存整体的速度。

二、主体方法详解

在进入主体方法分析之前我们先来看下Caffine有几种缓存结构存储缓存。Caffeine使用建造者模式在build()时生成的缓存结构可以分为有界缓存和无界缓存。

return isBounded() || refreshAfterWrite()
    ? new BoundedLocalCache.BoundedLocalLoadingCache<>(self, loader)
    : new UnboundedLocalCache.UnboundedLocalLoadingCache<>(self, loader);

那什么时候会生成有界缓存什么时候会生成无界缓存呢?进入isBounded()方法中可以看到当你设置了下面这些条件的时候例如: 过期策略、淘汰策略等就会生成有界缓存,在平时的项目开发中基于过期、内存等因素考虑绝大多数也是使用有界缓存的。

boolean isBounded() {
  return (maximumSize != UNSET_INT)
      || (maximumWeight != UNSET_INT)
      || (expireAfterAccessNanos != UNSET_INT)
      || (expireAfterWriteNanos != UNSET_INT)
      || (expiry != null)
      || (keyStrength != null)
      || (valueStrength != null);
}
  • put()

上面提到过Caffeine主要采用currentHashMap的存储结构,那么put()方法也是十分重要的。下面贴出整个put()方法的代码,我们来进行分析:

@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean onlyIfAbsent) {
  requireNonNull(key);
  requireNonNull(value);

  Node<K, V> node = null;
  long now = expirationTicker().read();
  //看这个节点值的权重
  int newWeight = weigher.weigh(key, value);
  //循环的目的是为了让异步操作执行成功
  for (;;) {
    //获取以前的节点
    Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));
    //若以前节点为null
    if (prior == null) {
      if (node == null) {
        //创建节点,根据具体策略工厂创建节点
        node = nodeFactory.newNode(key, keyReferenceQueue(),
            value, valueReferenceQueue(), newWeight, now);
        //设置有效时间
        setVariableTime(node, expireAfterCreate(key, value, expiry, now));
      }
      //查看map中是否存在当前key,若不存在将节点加入到map中
      prior = data.putIfAbsent(node.getKeyReference(), node);
      if (prior == null) {
        //开启异步线程处理节点(包括淘汰、过期、自动调节等..)
        afterWrite(new AddTask(node, newWeight));
        return null;
        //倘若存在prior根据onlyIfAbsent来确定是否更新值 false:更新 true:不更新
      } else if (onlyIfAbsent) {
        // An optimistic fast path to avoid unnecessary locking
        V currentValue = prior.getValue();
        //值不为空或者不过期
        if ((currentValue != null) && !hasExpired(prior, now)) {
          if (!isComputingAsync(prior)) {
            //不是异步提交,设置访问时间,以及是否更新有效期
            tryExpireAfterRead(prior, key, currentValue, expiry(), now);
            setAccessTime(prior, now);
          }
          afterRead(prior, now, /* recordHit */ false);
          return currentValue;
        }
      }
      //倘若存在prior根据onlyIfAbsent来确定是否更新值 false:更新 true:不更新
    } else if (onlyIfAbsent) {
      // An optimistic fast path to avoid unnecessary locking
      V currentValue = prior.getValue();
      if ((currentValue != null) && !hasExpired(prior, now)) {
        if (!isComputingAsync(prior)) {
          tryExpireAfterRead(prior, key, currentValue, expiry(), now);
          setAccessTime(prior, now);
        }
        afterRead(prior, now, /* recordHit */ false);
        return currentValue;
      }
    } else {
      //移除key映射
      discardRefresh(prior.getKeyReference());
    }

    V oldValue;
    long varTime;
    int oldWeight;
    boolean expired = false;
    boolean mayUpdate = true;
    boolean exceedsTolerance = false;
    synchronized (prior) {
      //同步执行需要更新的操作,如果节点的key不是WeakKeyReference,结束当前同步代码块
      if (!prior.isAlive()) {
        continue;
      }
      //获取老的值或权重
      oldValue = prior.getValue();
      oldWeight = prior.getWeight();
      //若旧值为空,代表被回收了
      if (oldValue == null) {
        //都是获取有效期
        varTime = expireAfterCreate(key, value, expiry, now);
        notifyEviction(key, null, RemovalCause.COLLECTED);
        //判断旧值是否过期了
      } else if (hasExpired(prior, now)) {
        expired = true;
        varTime = expireAfterCreate(key, value, expiry, now);
        notifyEviction(key, oldValue, RemovalCause.EXPIRED);
        //判断是不是putIfAbsent()
      } else if (onlyIfAbsent) {
        mayUpdate = false;
        varTime = expireAfterRead(prior, key, value, expiry, now);
      } else {
        varTime = expireAfterUpdate(prior, key, value, expiry, now);
      }
      //更新操作
      if (mayUpdate) {
        exceedsTolerance =
            (expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
            || (expiresVariable()
                && Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);

        setWriteTime(prior, now);
        prior.setWeight(newWeight);
        prior.setValue(value, valueReferenceQueue());
      }
      //设置有效期
      setVariableTime(prior, varTime);
      //设置访问时间
      setAccessTime(prior, now);
    }
    //开启一个异步线程,通知监听器
    if (expired) {
      notifyRemoval(key, oldValue, RemovalCause.EXPIRED);
    } else if (oldValue == null) {
      notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);
    } else if (mayUpdate) {
      notifyOnReplace(key, oldValue, value);
    }
    //重新计算权重和次数是否存在差异
    int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;
    //老值过期或权重变化
    if ((oldValue == null) || (weightedDifference != 0) || expired) {
      //开启异步线程
      afterWrite(new UpdateTask(prior, weightedDifference));
    } else if (!onlyIfAbsent && exceedsTolerance) {
      afterWrite(new UpdateTask(prior, weightedDifference));
    } else {
      //更新权重无变化
      if (mayUpdate) {
        setWriteTime(prior, now);
      }
      //命中数+1
      afterRead(prior, now, /* recordHit */ false);
    }
    //是否过期
    return expired ? null : oldValue;
  }
}

首先我们来分析一下put方法的参数:对于keyvalueexpiry就不用多说了,这个onlyIfAbsent参数是一个布尔值它就针对于putIfAbsent()方法如果值存在的话此次操作就相当于一次读操作,大家如果看过HashMap的源码也都清楚这个参数的作用。下面将put方法的主体流程划分为如下阶段:

计算权重->生成节点->设置过期时间->存储节点->开启异步任务

进入put方法中我们会看到int newWeight = weigher.weigh(key, value)这一步操作就是计算当前entry的权重值,在上篇Caffine的配置中提到过配置淘汰策略可以基于key数量进行配置也可以基于权重值的策略进行配置,那这个weigh是如何计算的呢?

Weigher<K1, V1> delegate = (weigher == null) || (weigher == Weigher.singletonWeigher())
    ? Weigher.singletonWeigher()
    : Weigher.boundedWeigher((Weigher<K1, V1>) weigher);
return isAsync ? (Weigher<K1, V1>) new AsyncWeigher(delegate) : delegate;

从它获取权重比较器的源码中我们可以分析得出,当我们没有设置weigher或者weighersingletonWeigher时它默认就会使用singletonWeigher进行计算,而singletonWeigher计算权重时就会将权重设置为1也就相当于当前我们传进的次数。

@Override public int weigh(Object key, Object value) {
  return 1;
}

接下来主体流程我们挑细节来看,当key的节点不存在时就会执行这段代码 node = nodeFactory.newNode(key, keyReferenceQueue(),value, valueReferenceQueue(), newWeight, now)这个方法的作用就是通过节点工厂来创建具体的策略节点,这里Caffine采用了工厂+策略+适配器的模式,每个节点工厂既作为具体的节点也可以用来生成节点,并且在开始也会根据客户端传来的条件初始化成某种具体的节点工厂。我个人觉得也是Caffeine设计结构的一大亮点,我们来看下它初始化节点工厂的代码。

......
if (builder.refreshAfterWrite()) {
  sb.append('R');
}
if (builder.evicts()) {
  sb.append('M');
  if (isAsync || (builder.isWeighted() && (builder.weigher != Weigher.singletonWeigher()))) {
    sb.append('W');
  } else {
    sb.append('S');
  }
}
try {
  Class<?> clazz = Class.forName(sb.toString());
  @SuppressWarnings("unchecked")
  NodeFactory<K, V> factory = (NodeFactory<K, V>) clazz.getDeclaredConstructor().newInstance();
  return factory;
} catch (ReflectiveOperationException e) {
  throw new IllegalStateException(sb.toString(), e);
}
class FD<K, V> extends Node<K, V> implements NodeFactory<K, V> {
......
}

通过根据客户端配置的各种策略,然后通过反射生成具体的节点工厂。

当获取到具体的节点工厂后,通过 setVariableTime(node, expireAfterCreate(key, value, expiry, now))方法设置缓存的有效期,expireAfterCreate()方法中就会根据我们设置不同种过期策略的值来转化成毫秒值,加上当前时间从而设置成有效期。

long expireAfterCreate(@Nullable K key, @Nullable V value,
    Expiry<? super K, ? super V> expiry, long now) {
  if (expiresVariable() && (key != null) && (value != null)) {
    //获取我们设置对应过期策略的毫秒值
    long duration = expiry.expireAfterCreate(key, value, now);
    //将当前时间+毫秒值
    return isAsync ? (now + duration) : (now + Math.min(duration, MAXIMUM_EXPIRY));
  }
  return 0L;
}

注意这里在设置有效期时expireAfterCreate()方法不是我们在配置策略中重写的expireAfterCreate()方法而是内部提供的。设置过期后就要存储节点了,执行这段代码prior = data.putIfAbsent(node.getKeyReference(), node);当节点存储成功后就开启异步任务执行afterWrite(new AddTask(node, newWeight));方法。点进这个开辟异步任务的方法可以看到:

void afterWrite(Runnable task) {
  //循环100次,若写Buffer队列添加异步线程执行
  for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
    if (writeBuffer.offer(task)) {
      //写后操作
      scheduleAfterWrite();
      return;
    }
    //调度清理
    scheduleDrainBuffers();
  }
   .....
   lock();
    try {
      //若100次还没成功则加锁,同步挂起,让当前线程处理(过期,。淘汰等)
      maintenance(task);
    } catch (RuntimeException e) {
      logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
    } finally {
      evictionLock.unlock();
    }      

这里writeBuffer采用的是一个Mpsc Queue(高性能无锁)任务队列的结构,使用循环将当次的异步任务添加到任务队列中,若添加成功则开启异步任务,若这一百次还没用添加到任务队列中,那么则同步挂起执行任务。就比如线程并发的同时存储数据,此时writeBuffer已经满了,那么就会同步挂起来执行任务,进行清理调整结束后再来执行异步任务,以保证过期、淘汰策略的准确。这正与我们主流程图所对应。

void scheduleAfterWrite() {
  for (;;) {
    switch (drainStatus()) {
      case IDLE:
        casDrainStatus(IDLE, REQUIRED);
        scheduleDrainBuffers();
        return;
      case REQUIRED:
        scheduleDrainBuffers();
        return;
      case PROCESSING_TO_IDLE:
        if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {
          return;
        }
        continue;
      case PROCESSING_TO_REQUIRED:
        return;
      default:
        throw new IllegalStateException();
    }
  }
}

task任务进入到writeBuffer后执行scheduleAfterWrite()方法,该方法通过原子性的设置状态的方式去执行 scheduleDrainBuffers()方法进行调度清理,通过线程池来处理触发异步任务。

void scheduleDrainBuffers() {
    if (drainStatus() >= PROCESSING_TO_IDLE) {
      return;
    }
    if (evictionLock.tryLock()) {
      try {
        int drainStatus = drainStatus();
        if (drainStatus >= PROCESSING_TO_IDLE) {
          return;
        }
        lazySetDrainStatus(PROCESSING_TO_IDLE);
        //drainBuffersTask主要就是执行maintenance方法的逻辑
        executor.execute(drainBuffersTask);
      } catch (Throwable t) {
        logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
        maintenance(/* ignored */ null);
      } finally {
        evictionLock.unlock();
      }
    }
  }

//任务具体处理逻辑
void maintenance(@Nullable Runnable task) {
  setDrainStatusRelease(PROCESSING_TO_IDLE);

  try {
    //消费ReadBuffer()
    drainReadBuffer();
    //消费WriteBuffer()
    drainWriteBuffer();
    if (task != null) {
      task.run();
    }
    //处理key引用
    drainKeyReferences();
    //处理value引用
    drainValueReferences();
    //过期节点
    expireEntries();
    //过期节点
    evictEntries();
    //调整内存
    climb();
  } finally {
    if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
      setDrainStatusOpaque(REQUIRED);
    }
  }
}

这就是具体所执行的任务的流程,主要做的有如下几步:清理ReadBuffer()、清理WriteBuffer()、执行此次存储的task、处理key引用、处理value引用、过期节点、过期节点、调整内存。那此次放到writeBuffer中的task任务都会做什么呢?下面我们看下task任务中具体的流程:

public void run() {
  if (evicts()) {
    //计算当前内存大小或key数
    long weightedSize = weightedSize();
    //设置新大小
    setWeightedSize(weightedSize + weight);
    //设置window区大小
    setWindowWeightedSize(windowWeightedSize() + weight);
    //设置主存区大小
    node.setPolicyWeight(node.getPolicyWeight() + weight);

    long maximum = maximum();
    if (weightedSize >= (maximum >>> 1)) {
      // Lazily initialize when close to the maximum
       //根据权重或者次数延迟初始化FrequencySketch容量
      long capacity = isWeighted() ? data.mappingCount() : maximum;
      frequencySketch().ensureCapacity(capacity);
    }

    K key = node.getKey();
    if (key != null) {
      //增加key使用频率
      frequencySketch().increment(key);
    }

    setMissesInSample(missesInSample() + 1);
  }

  // ignore out-of-order write operations
  boolean isAlive;
  synchronized (node) {
    isAlive = node.isAlive();
  }
 if (isAlive) {
        //放到writeOrderDeque队列||accessOrderWindowDeque队列||放入时间轮
        if (expiresAfterWrite()) {
          writeOrderDeque().add(node);
        }
        //如果是需要驱逐的
        if (evicts() && (weight > windowMaximum())) {
          accessOrderWindowDeque().offerFirst(node);
          //如果需要驱逐或者访问后过期策略
        } else if (evicts() || expiresAfterAccess()) {
          accessOrderWindowDeque().offerLast(node);
        }
        if (expiresVariable()) {
          //放到当前时间轮的链表下
          timerWheel().schedule(node);
      }
  }

其实在这个task任务中主要做的就是将当前这个节点的大小加到总内存中,并且根据不同过期策略放到不同队列下。这里Window区和主存区是W-TinyLFU的内存模型结构,下篇会详细介绍。而在过期中需要重点关注的是,如果是我们自定义的过期策略(即:在上篇中不同的key设置不同的时间),则需要放到时间轮下,在下篇中也会详细介绍。

  • get()

在源码中我们主要解析computeIfAbsent()方法,该方法对缓存中指定 key 的值进行重新计算,如果不存在这个 key,则添加到缓存中。这也是get()方法最常用的源码方法:

public @Nullable V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction,
    boolean recordStats, boolean recordLoad) {
  requireNonNull(key);
  requireNonNull(mappingFunction);
  long now = expirationTicker().read();
  //从节点工厂中获取当前key的节点
  // An optimistic fast path to avoid unnecessary locking
  Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
  if (node != null) {
    V value = node.getValue();
    //若值不为空或者当前节点并未过期
    if ((value != null) && !hasExpired(node, now)) {
      if (!isComputingAsync(node)) {
        //更新访问后过期的有效期
        tryExpireAfterRead(node, key, value, expiry(), now);
        //更新访问时间
        setAccessTime(node, now);
      }
      //开启异步操作
      var refreshed = afterRead(node, now, /* recordHit */ recordStats);
      //是否是刷新后的值
      return (refreshed == null) ? value : refreshed;
    }
  }
  if (recordStats) {
    //若启用为命中策略设置时间和条数,装饰Function
    mappingFunction = statsAware(mappingFunction, recordLoad);
  }
  //生成key引用
  Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
  return doComputeIfAbsent(key, keyRef, mappingFunction, new long[] { now }, recordStats);
}
@Nullable V doComputeIfAbsent(K key, Object keyRef,
    Function<? super K, ? extends V> mappingFunction, long[/* 1 */] now, boolean recordStats) {
  @SuppressWarnings("unchecked")
  V[] oldValue = (V[]) new Object[1];
  @SuppressWarnings("unchecked")
  V[] newValue = (V[]) new Object[1];
  @SuppressWarnings("unchecked")
  K[] nodeKey = (K[]) new Object[1];
  @SuppressWarnings({"unchecked", "rawtypes"})
  Node<K, V>[] removed = new Node[1];

  int[] weight = new int[2]; // old, new
  //创建移除原因的数组
  RemovalCause[] cause = new RemovalCause[1];
  Node<K, V> node = data.compute(keyRef, (k, n) -> {
    if (n == null) {
      //获取mappingFunction中的value
      newValue[0] = mappingFunction.apply(key);
      if (newValue[0] == null) {
        return null;
      }
      now[0] = expirationTicker().read();
      //计算权重
        weight[1] = weigher.weigh(key, newValue[0]);
      n = nodeFactory.newNode(key, keyReferenceQueue(),
          newValue[0], valueReferenceQueue(), weight[1], now[0]);
      //设置有效期
      setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
      //返回元素
      return n;
    }
    //防止并发操作
    synchronized (n) {
      nodeKey[0] = n.getKey();
      weight[0] = n.getWeight();
      oldValue[0] = n.getValue();
      //若老节点为空,则证明已经被驱逐
      if ((nodeKey[0] == null) || (oldValue[0] == null)) {
        //被驱逐
        cause[0] = RemovalCause.COLLECTED;
      } else if (hasExpired(n, now[0])) {
        //过期
        cause[0] = RemovalCause.EXPIRED;
      } else {
        return n;
      }
      //notify监听策略的监听器
      if (cause[0].wasEvicted()) {
        notifyEviction(nodeKey[0], oldValue[0], cause[0]);
      }
      //获取新值
      newValue[0] = mappingFunction.apply(key);
      if (newValue[0] == null) {
        removed[0] = n;
        n.retire();
        return null;
      }
      //计算权重
      weight[1] = weigher.weigh(key, newValue[0]);
      n.setValue(newValue[0], valueReferenceQueue());
      n.setWeight(weight[1]);

      now[0] = expirationTicker().read();
      //设置有效期
      setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
      //设置访问时间
      setAccessTime(n, now[0]);
      //设置写时间
      setWriteTime(n, now[0]);
      //动态刷新key引用
      discardRefresh(k);
      return n;
    }
  });

参数中使用Function()函数就对应着我们通过自动加载获取手动加载设置的不存在的key值。将get方法的主体流程为如下阶段:

获取节点->更新过期策略->开启异步任务->若不存在key存储新节点

大体的源码信息相信大家通过put()方法之后都可以看懂,我们主要具体来分析一下afterRead()方法:

@Nullable V afterRead(Node<K, V> node, long now, boolean recordHit) {
  //若存在命中策略,命中数+1
  if (recordHit) {
    statsCounter().recordHits(1);
  }
  //将节点存储进readBuffer中
  boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
  //设置状态机开启异步任务
  if (shouldDrainBuffers(delayable)) {
    scheduleDrainBuffers();
  }
  //是否需要刷新节点
  return refreshIfNeeded(node, now);
}

这里使用的readBuffer继承自StripedBuffer设计的思想是跟Striped64类似的,通过扩展结构把竞争热点分离。

具体实现是这样的,StripedBuffer维护一个Buffer[]数组,每个元素就是一个RingBuffer,每个线程用自己threadLocalRandomProbe属性作为 hash 值,这样就相当于每个线程都有自己“专属”的RingBuffer,就不会产生竞争了,而不是用 key 的hashCode作为 hash 值,因为会产生热点数据问题,作为线程私有从而减少竞争。

三、总结

这里我们分享了put()方法和get()方法的大体流程,详细的细节点还需要不断的研究,下篇我将主要针对maintenance()这个任务为大家分享一下Caffeine的W-TinyLFU淘汰算法,以及过期算法中使用的时间轮算法。