>  기사  >  데이터 베이스  >  Redis를 사용하여 분산 잠금을 구현하는 방법

Redis를 사용하여 분산 잠금을 구현하는 방법

尚
원래의
2019-07-05 16:45:209318검색

Redis를 사용하여 분산 잠금을 구현하는 방법

Redis를 사용하여 분산 잠금 구현

redis 기능 소개

1. String, List, Map, Set, ZSet 등과 같은 유형입니다.

2. 데이터 지속성, RDB 및 AOF 방법 지원

3. 클러스터 작업 모드, 강력한 파티션 내결함성 지원

4. 명령의 순차적 처리

5. 트랜잭션 지원

6. 게시 및 구독 지원

Redis는 SETNX 명령을 사용하여 분산 잠금을 구현합니다. 🎜#

SETNX 키 값

키가 존재하지 않는 경우에만 키 값을 value로 설정합니다.

주어진 키가 이미 존재하는 경우 SETNX는 어떠한 조치도 취하지 않습니다.

SETNX는 "SET if Not eXists"(존재하지 않으면 SET)의 약어입니다.

사용 가능한 버전: >= 1.0.0 시간 복잡도: O(1) 반환 값:

설정에 성공하면 1을 반환합니다.

설정하지 못했습니다. 0을 반환합니다.

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 设置成功
(integer) 1

redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0

redis> GET job                   # 没有被覆盖
"programmer"

먼저 공개 Redis 액세스 도구 클래스를 캡슐화해야 합니다. 이 클래스에는 RedisTemplate 인스턴스와 ValueOperations 인스턴스를 삽입해야 합니다. Redis에서 구현한 분산 잠금은 가장 간단한 문자열 유형을 사용하기 때문에 ValueOperations 인스턴스가 사용됩니다. 또한 SETNX, 만료 및 del 명령에 해당하는 setIfObsent(문자열 키, 문자열 값), 만료(문자열 키, 긴 시간 제한, TimeUnit 단위) 및 삭제(문자열 키)라는 세 가지 메서드를 캡슐화해야 합니다. Redis의 각각. 다음은 Redis 액세스 도구 클래스의 구체적인 구현입니다.

@Component
public class RedisDao {

	@Autowired
	private RedisTemplate redisTemplate;
	
	@Resource(name="redisTemplate")
	private ValueOperations<Object, Object> valOpsObj;
	
	/**
	 * 如果key不存在,就存储一个key-value,相当于SETNX命令
	 * @param key      键
	 * @param value    值,可以为空
	 * @return
	 */
	public boolean setIfObsent (String key, String value) {
		return valOpsObj.setIfAbsent(key, value);
	}
	
	/**
	 * 为key设置失效时间
	 * @param key       键
	 * @param timeout   时间大小
	 * @param unit      时间单位
	 */
	public boolean expire (String key, long timeout, TimeUnit unit) {
		return redisTemplate.expire(key, timeout, unit);
	}
	
	/**
	 * 删除key
	 * @param key 键
	 */
	public void delete (String key) {
		redisTemplate.delete(key);
	}
}

Redis 액세스 도구 클래스 구현을 완료한 후 이제 고려해야 할 것은 경쟁 분산 잠금을 시뮬레이션하는 방법입니다. Redis 자체는 분산 클러스터를 지원하므로 멀티 스레드 처리 비즈니스 시나리오만 시뮬레이션하면 됩니다. 여기에서는 스레드 풀을 사용하여 시뮬레이션합니다. 다음은 테스트 클래스의 구체적인 구현입니다.

@RestController
@RequestMapping("test")
public class TestController {

	private static final Logger LOG = LoggerFactory.getLogger(TestController.class);  //日志对象
	@Autowired
	private RedisDao redisDao;
	//定义的分布式锁key
	private static final String LOCK_KEY = "MyTestLock";
	
	@RequestMapping(value={"testRedisLock"}, method=RequestMethod.GET)
	public void testRedisLock () {
		ExecutorService executorService = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 5; i++) {
			executorService.submit(new Runnable() {
				@Override
				public void run() {
				    //获取分布式锁
					boolean flag = redisDao.setIfObsent(LOCK_KEY, "lock");
					if (flag) {
						LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁成功");
						//获取锁成功后设置失效时间
						redisDao.expire(LOCK_KEY, 2, TimeUnit.SECONDS);
						try {
							LOG.info(Thread.currentThread().getName() + ":处理业务开始");
							Thread.sleep(1000); //睡眠1000ms模拟处理业务
							LOG.info(Thread.currentThread().getName() + ":处理业务结束");
							//处理业务完成后删除锁
							redisDao.delete(LOCK_KEY);
						} catch (InterruptedException e) {
							LOG.error("处理业务异常:", e);
						}
					} else {
						LOG.info(Thread.currentThread().getName() + ":获取Redis分布式锁失败");
					}
				}
			});
		}
	}
}

위 코드를 통해 다음과 같은 질문이 발생할 수 있습니다.

스레드가 잠금이 실패했습니다.

스레드가 분산 잠금을 성공적으로 획득한 후 잠금 만료 시간을 결정하는 방법은 무엇입니까?

스레드 업무 처리가 완료된 후 잠금을 삭제해야 하는 이유는 무엇인가요?

이러한 질문에 대해 논의할 수 있습니다.

첫째, Redis의 SETNX 명령은 키가 이미 존재하는 경우 아무런 작업도 수행하지 않으므로 SETNX에서 구현하는 분산 잠금은 재진입 잠금이 아닙니다. 물론 코드를 통해 또는 분산 잠금을 얻을 때까지 n번 재시도할 수도 있습니다. 그러나 이는 공정한 경쟁을 보장하지 않으며 잠금을 기다리고 있었기 때문에 스레드가 차단됩니다. 따라서 Redis에서 구현한 분산 잠금은 공유 리소스를 한 번 쓰고 여러 번 읽는 시나리오에 더 적합합니다.

둘째, 분산 잠금은 만료 시간을 설정해야 하며, 만료 시간은 비즈니스 처리에 필요한 시간보다 커야 합니다(데이터 일관성 보장). 따라서 테스트 단계에서는 정상적인 업무 처리에 소요되는 시간을 최대한 정확하게 예측해야 하며, 업무 처리 과정에서 특정 원인으로 인한 교착 상태를 방지하기 위해 실패 시간을 설정해야 합니다.

셋째, 업무 처리가 완료된 후에는 잠금을 삭제해야 합니다.

위의 잠금 설정과 잠금 만료 시간 설정은 두 단계로 완료됩니다. 분산 잠금 설정과 잠금 만료 시간 설정을 한 번에 설정하는 것이 더 합리적인 방법입니다. 둘 다 성공하거나 둘 다 실패합니다. 구현 코드는 다음과 같습니다.

/**
* Redis访问工具类
*/
@Component
public class RedisDao {

	private static Logger logger = LoggerFactory.getLogger(RedisDao.class);
	
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
	/**
	 * 设置分布式锁    
	 * @param key     键
	 * @param value   值
	 * @param timeout 失效时间
	 * @return
	 */
	public boolean setDistributeLock (String key, String value, long timeout) {
		RedisConnection connection = null;
		boolean flag = false;
		try {
			//获取一个连接
			connection = stringRedisTemplate.getConnectionFactory().getConnection();
			//设置分布式锁的同时为锁设置失效时间
			connection.set(key.getBytes(), value.getBytes(), Expiration.seconds(timeout), RedisStringCommands.SetOption.SET_IF_ABSENT);
			flag = true;
		} catch (Exception e) {
			logger.error("set automic lock error:", e);
		} finally {
			//使用后关闭连接
			connection.close();
		}
		return flag;
	}
	
	/**
	 * 查询key的失效时间
	 * @param key       键
	 * @param timeUnit  时间单位
	 * @return
	 */
	public long ttl (String key, TimeUnit timeUnit) {
		return stringRedisTemplate.getExpire(key, timeUnit);
	}
}

/**
* 单元测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo1ApplicationTests {

	private static final Logger LOG = LoggerFactory.getLogger(Demo1ApplicationTests.class);
		
	@Autowired
	private RedisDao redisDao;
	
	@Test
	public void testDistributeLock () {
		String key = "MyDistributeLock";
		//设置分布式锁,失效时间20s
		boolean result = redisDao.setDistributeLock(key, "1", 20);
		if (result) {
			LOG.info("设置分布式锁成功");
			long ttl = redisDao.ttl(key, TimeUnit.SECONDS);
			LOG.info("{}距离失效还有{}s", key, ttl);
		}
	}
}

단위 테스트 클래스를 실행하면 결과는 다음과 같습니다.

2019-05-15 13:07:10.827 - 设置分布式锁成功
2019-05-15 13:07:10.838 - MyDistributeLock距离失效还有19s

Redis 관련 지식을 더 보려면

을 방문하세요. Redis 사용법 튜토리얼 컬럼 !

위 내용은 Redis를 사용하여 분산 잠금을 구현하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.