本篇文章给大家带来了关于Redis的相关知识,其中主要介绍的是分布式缓存和本地缓存的使用技巧,包括缓存种类介绍,各种的使用场景,以及如何使用,最后再给出实战案例,下面一起来看一下,希望对大家有帮助。
推荐学习:Redis视频教程
众所周知,缓存最主要的目的就是加速访问,缓解数据库压力。最常用的缓存就是分布式缓存,比如redis,在面对大部分并发场景或者一些中小型公司流量没有那么高的情况,使用redis基本都能解决了。但是在流量较高的情况下可能得使用到本地缓存了,比如guava的LoadingCache和快手开源的ReloadableCache。
这部分会介绍redis,比如guava的LoadingCache和快手开源的ReloadableCache的使用场景和局限,通过这一部分的介绍就能知道在怎样的业务场景下应该使用哪种缓存,以及为什么。
如果宽泛的说redis何时使用,那么自然就是用户访问量过高的地方使用,从而加速访问,并且缓解数据库压力。如果细分的话,还得分为单节点问题和非单节点问题。
如果一个页面用户访问量比较高,但是访问的不是同一个资源。比如用户详情页,访问量比较高,但是每个用户的数据都是不一样的,这种情况显然只能用分布式缓存了,如果使用redis,key为用户唯一键,value则是用户信息。
redis导致的缓存击穿。
但是需要注意一点,一定要设置过期时间,而且不能设置到同一时间点过期。举个例子,比如用户又个活动页,活动页能看到用户活动期间获奖数据,粗心的人可能会设置用户数据的过期时间点为活动结束,这样会
单(热)点问题
单节点问题说的是redis的单个节点的并发问题,因为对于相同的key会落到redis集群的同一个节点上,那么如果对这个key的访问量过高,那么这个redis节点就存在并发隐患,这个key就称为热key。
如果所有用户访问的都是同一个资源,比如小爱同学app首页对所有用户展示的内容都一样(初期),服务端给h5返回的是同一个大json,显然得使用到缓存。首先我们考虑下用redis是否可行,由于redis存在单点问题,如果流量过大的话,那么所有用户的请求到达redis的同一个节点,需要评估该节点能否抗住这么大流量。我们的规则是,如果单节点qps达到了千级别就要解决单点问题了(即使redis号称能抗住十万级别的qps),最常见的做法就是使用本地缓存。显然小爱app首页流量不过百,使用redis是没问题的。
对于这上面说的热key问题,我们最直接的做法就是使用本地缓存,比如你最熟悉的guava的LoadingCache,但是使用本地缓存要求能够接受一定的脏数据,因为如果你更新了首页,本地缓存是不会更新的,它只会根据一定的过期策略来重新加载缓存,不过在我们这个场景是完全没问题的,因为一旦在后台推送了首页后就不会再去改变了。即使改变了也没问题,可以设置写过期为半小时,超过半小时重新加载缓存,这种短时间内的脏数据我们是可以接受的。
LoadingCache导致的缓存击穿
虽然说本地缓存和机器上强相关的,虽然代码层面写的是半小时过期,但由于每台机器的启动时间不同,导致缓存的加载时间不同,过期时间也就不同,也就不会所有机器上的请求在同一时间缓存失效后都去请求数据库。但是对于单一一台机器也是会导致缓存穿透的,假如有10台机器,每台1000的qps,只要有一台缓存过期就可能导致这1000个请求同时打到了数据库。这种问题其实比较好解决,但是容易被忽略,也就是在设置LoadingCache的时候使用LoadingCache的load-miss方法,而不是直接判断cache.getIfPresent()== null然后去请求db;前者会加虚拟机层面的锁,保证只有一个请求打到数据库去,从而完美的解决了这个问题。
但是,如果对于实时性要求较高的情况,比如有段时间要经常做活动,我要保证活动页面能近实时更新,也就是运营在后台配置好了活动信息后,需要在C端近实时展示这次配置的活动信息,此时使用LoadingCache肯定就不能满足了。
对于上面说的LoadingCache不能解决的实时问题,可以考虑使用ReloadableCache,这是快手开源的一个本地缓存框架,最大的特点是支持多机器同时更新缓存,假设我们修改了首页信息,然后请求打到的是A机器,这个时候重新加载ReloadableCache,然后它会发出通知,监听了同一zk节点的其他机器收到通知后重新更新缓存。使用这个缓存一般的要求是将全量数据加载到本地缓存,所以如果数据量过大肯定会对gc造成压力,这种情况就不能使用了。由于小爱同学首页这个首页是带有状态的,一般online状态的就那么两个,所以完全可以使用ReloadableCache来只装载online状态的首页。
到这里三种缓存基本都介绍完了,做个小结:
不管哪种本地缓存虽然都带有虚拟机层面的加锁来解决击穿问题,但是意外总有可能以你意想不到的方式发生,保险起见你可以使用两级缓存的方式即本地缓存 redis db。
这里redis的使用就不再多说了,相信很多人对api的使用比我还熟悉
这个是guava提供的网上一抓一大把,但是给两点注意事项
V get(K key, Callabled1336e9e686742fb22c1860557381905 loader)
;要么使用build的时候使用的是build(CacheLoader2ffe6bce707a67f84d4df961dbeb183a loader)
这个时候可以直接使用get()了。此外建议使用load-miss,而不是getIfPresent==null的时候再去查数据库,这可能导致缓存击穿;LoadingCache<String, String> cache = CacheBuilder.newBuilder() .maximumSize(1000L) .expireAfterAccess(Duration.ofHours(1L)) // 多久不访问就过期 .expireAfterWrite(Duration.ofHours(1L)) // 多久这个key没修改就过期 .build(new CacheLoader<String, String>() { @Override public String load(String key) throws Exception { // 数据装载方式,一般就是loadDB return key + " world"; } }); String value = cache.get("hello"); // 返回hello world
导入三方依赖
<dependency> <groupId>com.github.phantomthief</groupId> <artifactId>zknotify-cache</artifactId> <version>0.1.22</version> </dependency>
需要看文档,不然无法使用,有兴趣自己写一个也行的。
public interface ReloadableCache<T> extends Supplier<T> { /** * 获取缓存数据 */ @Override T get(); /** * 通知全局缓存更新 * 注意:如果本地缓存没有初始化,本方法并不会初始化本地缓存并重新加载 * * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()} */ void reload(); /** * 更新本地缓存的本地副本 * 注意:如果本地缓存没有初始化,本方法并不会初始化并刷新本地的缓存 * * 如果需要初始化本地缓存,请先调用 {@link ReloadableCache#get()} */ void reloadLocal(); }
这三个真的是亘古不变的问题,如果流量大确实需要考虑。
简单说就是缓存失效,导致大量请求同一时间打到了数据库。对于缓存击穿问题上面已经给出了很多解决方案了。
1.2和都说过,主要来看3。假如业务愿意只能使用redis而无法使用本地缓存,比如数据量过大,实时性要求比较高。那么当缓存失效的时候就得想办法保证只有少量的请求打到数据库。很自然的就想到了使用分布式锁,理论上说是可行的,但实际上存在隐患。我们的分布式锁相信很多人都是使用redis lua的方式实现的,并且在while中进行了轮训,这样请求量大,数据多的话会导致无形中让redis成了隐患,并且占了太多业务线程,其实仅仅是引入了分布式锁就加大了复杂度,我们的原则就是能不用就不用。
那么我们是不是可以设计一个类似分布式锁,但是更可靠的rpc服务呢?当调用get方法的时候这个rpc服务保证相同的key打到同一个节点,并且使用synchronized来进行加锁,之后完成数据的加载。在快手提供了一个叫cacheSetter的框架。下面提供一个简易版,自己写也很容易实现。
import com.google.common.collect.Lists; import org.apache.commons.collections4.CollectionUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; /** * @Description 分布式加载缓存的rpc服务,如果部署了多台机器那么调用端最好使用id做一致性hash保证相同id的请求打到同一台机器。 **/ public abstract class AbstractCacheSetterService implements CacheSetterService { private final ConcurrentMap<String, CountDownLatch> loadCache = new ConcurrentHashMap<>(); private final Object lock = new Object(); @Override public void load(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } CountDownLatch latch; Collection<CountDownLatch> loadingLatchList; synchronized (lock) { loadingLatchList = excludeLoadingIds(needLoadIds); needLoadIds = Collections.unmodifiableCollection(needLoadIds); latch = saveLatch(needLoadIds); } System.out.println("needLoadIds:" + needLoadIds); try { if (CollectionUtils.isNotEmpty(needLoadIds)) { loadCache(needLoadIds); } } finally { release(needLoadIds, latch); block(loadingLatchList); } } /** * 加锁 * @param loadingLatchList 需要加锁的id对应的CountDownLatch */ protected void block(Collection<CountDownLatch> loadingLatchList) { if (CollectionUtils.isEmpty(loadingLatchList)) { return; } System.out.println("block:" + loadingLatchList); loadingLatchList.forEach(l -> { try { l.await(); } catch (InterruptedException e) { e.printStackTrace(); } }); } /** * 释放锁 * @param needLoadIds 需要释放锁的id集合 * @param latch 通过该CountDownLatch来释放锁 */ private void release(Collection<String> needLoadIds, CountDownLatch latch) { if (CollectionUtils.isEmpty(needLoadIds)) { return; } synchronized (lock) { needLoadIds.forEach(id -> loadCache.remove(id)); } if (latch != null) { latch.countDown(); } } /** * 加载缓存,比如根据id从db查询数据,然后设置到redis中 * @param needLoadIds 加载缓存的id集合 */ protected abstract void loadCache(Collection<String> needLoadIds); /** * 对需要加载缓存的id绑定CountDownLatch,后续相同的id请求来了从map中找到CountDownLatch,并且await,直到该线程加载完了缓存 * @param needLoadIds 能够正在去加载缓存的id集合 * @return 公用的CountDownLatch */ protected CountDownLatch saveLatch(Collection<String> needLoadIds) { if (CollectionUtils.isEmpty(needLoadIds)) { return null; } CountDownLatch latch = new CountDownLatch(1); needLoadIds.forEach(loadId -> loadCache.put(loadId, latch)); System.out.println("loadCache:" + loadCache); return latch; } /** * 哪些id正在加载数据,此时持有相同id的线程需要等待 * @param ids 需要加载缓存的id集合 * @return 正在加载的id所对应的CountDownLatch集合 */ private Collection<CountDownLatch> excludeLoadingIds(Collection<String> ids) { List<CountDownLatch> loadingLatchList = Lists.newArrayList(); Iterator<String> iterator = ids.iterator(); while (iterator.hasNext()) { String id = iterator.next(); CountDownLatch latch = loadCache.get(id); if (latch != null) { loadingLatchList.add(latch); iterator.remove(); } } System.out.println("loadingLatchList:" + loadingLatchList); return loadingLatchList; } }
业务实现
import java.util.Collection; public class BizCacheSetterRpcService extends AbstractCacheSetterService { @Override protected void loadCache(Collection<String> needLoadIds) { // 读取db进行处理 // 设置缓存 } }
简单来说就是请求的数据在数据库不存在,导致无效请求打穿数据库。
解法也很简单,从db获取数据的方法(getByKey(K key))一定要给个默认值。
比如我有个奖池,金额上限是1W,用户完成任务的时候给他发笔钱,并且使用redis记录下来,并且落表,用户在任务页面能实时看到奖池剩余金额,在任务开始的时候显然奖池金额是不变的,redis和db里面都没有发放金额的记录,这就导致每次必然都去查db,对于这种情况,从db没查出来数据应该缓存个值0到缓存。
就是大量缓存集中失效打到了db,当然肯定都是一类的业务缓存,归根到底是代码写的有问题。可以将缓存失效的过期时间打散,别让其集中失效就可以了。
推荐学习:Redis视频教程
以上是高并发技巧之Redis和本地缓存使用技巧分享的详细内容。更多信息请关注PHP中文网其他相关文章!