Home >Database >Redis >SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

2023-05-30 08:16:051736browse
The details are as follows:

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

What is malicious traffic penetration

Assume that our Redis has a set of user registration emails, so Email exists as a Key, and it corresponds to some fields of the User table in the DB.

Normally, we will first check whether the user is a member in Redis, because it is faster to read data from the cache. If this member does not exist in the cache, then we will query it in the DB.

Now imagine that there are tens of millions of requests from different IPs (don’t think there are none, we encountered them in 2018 and 2019, because the cost of the attack is very low) with keys that do not exist in Redis. Visit your website, then let’s imagine:

  • The request arrives at the Web server;

  • The request is dispatched to the application layer->Micro Service layer;

  • Requests to retrieve data from Redis, but this Key does not exist in Redis;

  • So the request reaches the DB layer and establishes a connection in the DB Then perform a query

Whether it is tens of millions or hundreds of millions of DB connection requests, whether Redis can afford it is not a problem, because the DB will also be overwhelmed immediately. This is "Redis penetration", also known as "cache breakdown". It will blow up your cache or even the DB, causing a series of "avalanche effects".

How to prevent

You can use Bloom filters to place all key query fields in the user table in Redis' Bloom filters. Some people will say, this is not crazy, I have 40 million members? so what!

It is an exaggeration for you to put 4,000 members in Redis. Some websites have 80 million or 100 million members? So I didn't ask you to put it directly in Redis, but put it in a Bloom filter!

The key and value are not put directly into the Bloom filter. The content it stores is like this:

BloomFilter is a space-efficient probabilistic data structure. Proposed by Burton Howard Bloom in 1970. Usually used to determine whether an element is in a set. Despite its excellent space efficiency, it can lead to false positive errors.

False positive&&False negatives
Because BloomFiter sacrifices a certain accuracy for space efficiency. So it brings about the problem of False positive.

False positive
When BloomFilter determines that an element is in the collection, there will be a certain error rate. This error rate is called False positive. Usually abbreviated as fpp.

False negatives
The error rate of BloomFilter when it judges that an element is not in the set. BloomFilter determines that the element is not in the set, then the element must not be in the set. Therefore, the probability of False negatives is 0.

BloomFilter uses a byte array with a length of m bits, uses k hash functions, and adds an element: map the element to k positions in the byte array through k hashes, and set the word at the corresponding position. Section is 1.
Query whether the element exists: Hash the element k times to get k positions. If the bit corresponding to the k positions is 1, it is considered to exist, otherwise it is considered not to exist.

Since all bits are stored in it, the amount of data will be very small. How small is it? When writing this blog, I inserted 1 million email messages into the Redis bloom filter and it only occupied less than 3Mb.

Bloom Filter will have several key values. Based on this value, you can roughly calculate how many pieces of data to put in and how much system resources it will occupy when it has a false damage rate. This algorithm has a URL: https://krisives.github.io/bloom-calculator/. We put in 1 million pieces of data and assume that the accidental injury rate is 0.001%. Look, it automatically determines that the system memory resources that Redis needs to apply for are How many?

So how to solve this accidental injury rate? It's very simple. When there is an accidental injury, the business or operation will report the accidental injury rate. At this time, you only need to add a small whitelist. Compared with 1 million pieces of data, 1,000 whitelists are not a problem. The query speed of Bloom filter is very fast. Generally, the query results can be returned within 80-100 milliseconds, telling the calling end whether the key exists.

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

Another use scenario of Bloom filter

Suppose I crawled 400 million URLs using a python crawler, do I need to remove duplicates?

Look, the Bloom filter is used in this scenario.

Let’s start our Redis BloomFilter journey.

Install Bloom Filter for Redis

Redis only supports bloom filter from 4.0, so in this example we use Redis5.4.

Redis’ bloom filter download address is here: https://github.com/RedisLabsModules/redisbloom.git

git clone https://github.com/RedisLabsModules/redisbloom.git
cd redisbloom
make # 编译

There are two ways to load bloom filter when Redis starts:

Manual loading:

redis-server --loadmodule ./redisbloom/rebloom.so

Self-loading at each startup:

Edit the redis.conf file of Redis and add:

loadmodule /soft/redisbloom/redisbloom.so

Like this:

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

在Redis里使用Bloom Filter


bf.reserve {key} {error_rate} {size}> bf.reserve userid 0.01 100000


bf.add {key} {item}> bf.add userid '181920'
(integer) 1


bf.exists {key} {item}> bf.exists userid '101310299'
(integer) 1

这个命令的作用是检查 Bloom 过滤器中是否包含指定 key 的值。存在:返回1,不存在:返回0。


网上很多写的都是要么是直接使用jedis来操作的,或者是java里execute一个外部进程来调用Redis的bloom filter指令的。许多代码调试不通或只能达到helloworld级别,无法用于生产级别的应用。




package org.sky.platform.util;
import com.google.common.base.Preconditions;
import com.google.common.hash.Funnel;
import com.google.common.hash.Hashing;
public class BloomFilterHelper<T> {
	private int numHashFunctions;
	private int bitSize;
	private Funnel<T> funnel;
	public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) {
		Preconditions.checkArgument(funnel != null, "funnel不能为空");
		this.funnel = funnel;
		bitSize = optimalNumOfBits(expectedInsertions, fpp);
		numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize);
	int[] murmurHashOffset(T value) {
		int[] offset = new int[numHashFunctions];
		long hash74 = Hashing.murmur3_128().hashObject(value, funnel).asLong();
		int hash2 = (int) hash74;
		int hash3 = (int) (hash74 >>> 32);
		for (int i = 1; i <= numHashFunctions; i++) {
			int nextHash = hash2 + i * hash3;
			if (nextHash < 0) {
				nextHash = ~nextHash;
			offset[i - 1] = nextHash % bitSize;
		return offset;
	 * 计算bit数组的长度
	private int optimalNumOfBits(long n, double p) {
		if (p == 0) {
			p = Double.MIN_VALUE;
		return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
	 * 计算hash方法执行次数
	private int optimalNumOfHashFunctions(long n, long m) {
		return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

搭建spring boot工程







SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


package org.sky.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisConfig extends CachingConfigurerSupport {
	 * 选择redis作为默认缓存工具
	 * @param redisTemplate
	 * @return
	public CacheManager cacheManager(RedisTemplate redisTemplate) {
		RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
		return rcm;
	 * retemplate相关配置
	 * @param factory
	 * @return
	public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
		RedisTemplate<String, Object> template = new RedisTemplate<>();
		// 配置连接工厂
		// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
		Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
		ObjectMapper om = new ObjectMapper();
		// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
		om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
		// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
		// 值采用json序列化
		// 使用StringRedisSerializer来序列化和反序列化redis的key值
		template.setKeySerializer(new StringRedisSerializer());
		// 设置hash key 和value序列化模式
		template.setHashKeySerializer(new StringRedisSerializer());
		return template;
	 * 对hash类型的数据操作
	 * @param redisTemplate
	 * @return
	public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
		return redisTemplate.opsForHash();
	 * 对redis字符串类型数据操作
	 * @param redisTemplate
	 * @return
	public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
		return redisTemplate.opsForValue();
	 * 对链表类型的数据操作
	 * @param redisTemplate
	 * @return
	public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
		return redisTemplate.opsForList();
	 * 对无序集合类型的数据操作
	 * @param redisTemplate
	 * @return
	public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
		return redisTemplate.opsForSet();
	 * 对有序集合类型的数据操作
	 * @param redisTemplate
	 * @return
	public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
		return redisTemplate.opsForZSet();


我们为此还要再封装一套Redis Util小组件,它们位于sky-common工程中


package org.sky.platform.util;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.base.Preconditions;
import org.springframework.data.redis.core.RedisTemplate;
public class RedisUtil {
	private RedisTemplate<String, String> redisTemplate;
	 * 默认过期时长,单位:秒
	public static final long DEFAULT_EXPIRE = 60 * 60 * 24;
	 * 不设置过期时长
	public static final long NOT_EXPIRE = -1;
	public boolean existsKey(String key) {
		return redisTemplate.hasKey(key);
	 * 重名名key,如果newKey已经存在,则newKey的原值被覆盖
	 * @param oldKey
	 * @param newKey
	public void renameKey(String oldKey, String newKey) {
		redisTemplate.rename(oldKey, newKey);
	 * newKey不存在时才重命名
	 * @param oldKey
	 * @param newKey
	 * @return 修改成功返回true
	public boolean renameKeyNotExist(String oldKey, String newKey) {
		return redisTemplate.renameIfAbsent(oldKey, newKey);
	 * 删除key
	 * @param key
	public void deleteKey(String key) {
	 * 删除多个key
	 * @param keys
	public void deleteKey(String... keys) {
		Set<String> kSet = Stream.of(keys).map(k -> k).collect(Collectors.toSet());
	 * 删除Key的集合
	 * @param keys
	public void deleteKey(Collection<String> keys) {
		Set<String> kSet = keys.stream().map(k -> k).collect(Collectors.toSet());
	 * 设置key的生命周期
	 * @param key
	 * @param time
	 * @param timeUnit
	public void expireKey(String key, long time, TimeUnit timeUnit) {
		redisTemplate.expire(key, time, timeUnit);
	 * 指定key在指定的日期过期
	 * @param key
	 * @param date
	public void expireKeyAt(String key, Date date) {
		redisTemplate.expireAt(key, date);
	 * 查询key的生命周期
	 * @param key
	 * @param timeUnit
	 * @return
	public long getKeyExpire(String key, TimeUnit timeUnit) {
		return redisTemplate.getExpire(key, timeUnit);
	 * 将key设置为永久有效
	 * @param key
	public void persistKey(String key) {
	 * 根据给定的布隆过滤器添加值
	public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
		Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
		int[] offset = bloomFilterHelper.murmurHashOffset(value);
		for (int i : offset) {
			redisTemplate.opsForValue().setBit(key, i, true);
	 * 根据给定的布隆过滤器判断值是否存在
	public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) {
		Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空");
		int[] offset = bloomFilterHelper.murmurHashOffset(value);
		for (int i : offset) {
			if (!redisTemplate.opsForValue().getBit(key, i)) {
				return false;
		return true;


package org.sky.platform.util;
public class RedisKeyUtil {
	 * redis的key 形式为: 表名:主键名:主键值:列名
	 * @param tableName     表名
	 * @param majorKey      主键名
	 * @param majorKeyValue 主键值
	 * @param column        列名
	 * @return
	public static String getKeyWithColumn(String tableName, String majorKey, String majorKeyValue, String column) {
		StringBuffer buffer = new StringBuffer();
		return buffer.toString();
	 * redis的key 形式为: 表名:主键名:主键值
	 * @param tableName     表名
	 * @param majorKey      主键名
	 * @param majorKeyValue 主键值
	 * @return
	public static String getKey(String tableName, String majorKey, String majorKeyValue) {
		StringBuffer buffer = new StringBuffer();
		return buffer.toString();

然后就是制作 redis里如何使用BloomFilter的BloomFilterHelper.java了,它也位于sky-common文件夹,源码如上已经贴了,因此此处就不再作重复。



package org.sky.vo;
import java.io.Serializable;
public class UserVO implements Serializable {
	private String name;
	private String address;
	private Integer age;
	private String email = "";
	public String getEmail() {
		return email;
	public void setEmail(String email) {
		this.email = email;
	public String getName() {
		return name;
	public void setName(String name) {
		this.name = name;
	public String getAddress() {
		return address;
	public void setAddress(String address) {
		this.address = address;
	public Integer getAge() {
		return age;
	public void setAge(Integer age) {
		this.age = age;


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


<project xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<description>Demo project for Spring Boot Dubbo Nacos</description>


<project xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">



SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


<project xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<description>Demo Redis Advanced Features</description>


package org.sky;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@ComponentScan(basePackages = { "org.sky" })
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);


  • public ResponseEntity addUser(@RequestBody String params),该方法用于接受来自外部的api post然后把一条email地址塞入redis的bloomfilter中;

  • public ResponseEntity findEmailInBloom(@RequestBody String params),该方法用于接受来自外部的api post然后去redis的bloomfilter中验证是否外部输入的user信息中的email地址在上百万的email记录中存在;

以此来完成验证塞入redis的bloom filter中上百万条记录占用了多少内存以及使用bloom filter查询一条记录有多快。


package org.sky.controller;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.sky.platform.util.BloomFilterHelper;
import org.sky.platform.util.RedisUtil;
import org.sky.vo.UserVO;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.hash.Funnel;
public class UserController extends BaseController {
	private RedisTemplate redisTemplate;
	private RedisUtil redisUtil;
	@PostMapping(value = "/addEmailToBloom", produces = "application/json")
	public ResponseEntity<String> addUser(@RequestBody String params) {
		ResponseEntity<String> response = null;
		String returnResultStr;
		HttpHeaders headers = new HttpHeaders();
		Map<String, Object> result = new HashMap<>();
		try {
			JSONObject requestJsonObj = JSON.parseObject(params);
			UserVO inputUser = getUserFromJson(requestJsonObj);
			BloomFilterHelper<String> myBloomFilterHelper = new BloomFilterHelper<>((Funnel<String>) (from,
					into) -> into.putString(from, Charsets.UTF_8).putString(from, Charsets.UTF_8), 1500000, 0.00001);
			redisUtil.addByBloomFilter(myBloomFilterHelper, "email_existed_bloom", inputUser.getEmail());
			result.put("code", HttpStatus.OK.value());
			result.put("message", "add into bloomFilter successfully");
			result.put("email", inputUser.getEmail());
			returnResultStr = JSON.toJSONString(result);
			logger.info("returnResultStr======>" + returnResultStr);
			response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.OK);
		} catch (Exception e) {
			logger.error("add a new product with error: " + e.getMessage(), e);
			result.put("message", "add a new product with error: " + e.getMessage());
			returnResultStr = JSON.toJSONString(result);
			response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.INTERNAL_SERVER_ERROR);
		return response;
	@PostMapping(value = "/checkEmailInBloom", produces = "application/json")
	public ResponseEntity<String> findEmailInBloom(@RequestBody String params) {
		ResponseEntity<String> response = null;
		String returnResultStr;
		HttpHeaders headers = new HttpHeaders();
		Map<String, Object> result = new HashMap<>();
		try {
			JSONObject requestJsonObj = JSON.parseObject(params);
			UserVO inputUser = getUserFromJson(requestJsonObj);
			BloomFilterHelper<String> myBloomFilterHelper = new BloomFilterHelper<>((Funnel<String>) (from,
					into) -> into.putString(from, Charsets.UTF_8).putString(from, Charsets.UTF_8), 1500000, 0.00001);
			boolean answer = redisUtil.includeByBloomFilter(myBloomFilterHelper, "email_existed_bloom",
			logger.info("answer=====" + answer);
			result.put("code", HttpStatus.OK.value());
			result.put("email", inputUser.getEmail());
			result.put("exist", answer);
			returnResultStr = JSON.toJSONString(result);
			logger.info("returnResultStr======>" + returnResultStr);
			response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.OK);
		} catch (Exception e) {
			logger.error("add a new product with error: " + e.getMessage(), e);
			result.put("message", "add a new product with error: " + e.getMessage());
			returnResultStr = JSON.toJSONString(result);
			response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.INTERNAL_SERVER_ERROR);
		return response;
	private UserVO getUserFromJson(JSONObject requestObj) {
		String userName = requestObj.getString("username");
		String userAddress = requestObj.getString("address");
		String userEmail = requestObj.getString("email");
		int userAge = requestObj.getInteger("age");
		UserVO u = new UserVO();
		return u;





SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

我们使用"、addEmailToBloom"往redis bloom filter里插入了一个“yumi@yahoo.com”的email。


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

我们使用redisclient连接上我们的redis查看,这个值确实也是插入进了bloom filter了。

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

使用压测工具喂120万条数据进入Redis Bloomfilter看实际效果

接下来,我们用jmeter对着“/addEmailToBloom”喂上个120万左右数据进去,然后我们再来看bloom filter在120万email按照布隆算 法喂进去后我们的系统是如何表现的。


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache




SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

jmeter post请求

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


jmeter -n -t add_randomemail_to_bloom.jmx -l add_email_to_bloom\report\03-result.csv -j add_email_to_bloom\logs\03-log.log -e -o add_email_to_bloom\html_report_3


  • -t 指定jmeter执行计划文件所在路径;

  • -l 生成report的目录,这个目录如果不存在则创建 ,必须是一个空目录;

  • -j 生成log的目录,这个目录如果不存在则创建 ,必须是一个空目录;

  • -e 生成html报告,它配合着-o参数一起使用;

  • -o 生成html报告所在的路径,这个目录如果不存在则创建 ,必须是一个空目录;


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


我们查看我们用-e -o生成的jmeter html报告,前面说过了,我一共运行了3次,第一次是10分钟70059条数据 ,第二次是30分钟40多万条数据 ,第三次是45他钟70多万条数据。我共计插入了1,200,790条email。

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache


SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache

120万条数据插进去后,我们接着从我们的log4j的输出中随便找一条logger.info住的email如:egpoghnfjekjajdo@163.com来看一下,redis bloomfilter找到这条记录的表现如何,76ms,我运行了多次,平均在80ms左右:

SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache



As long as the key that does not exist in bloomfilter directly returns false to the client, with the dynamic expansion of nginx, cdn, waf, and interface layer caching, it is actually very simple for the entire website to resist 6-digit or even 7-digit concurrency. thing.

The above is the detailed content of SpringBoot+Redis Bloom filter prevents malicious traffic from penetrating the cache. For more information, please follow other related articles on the PHP Chinese website!

This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete