シンプルで通常の売られすぎのロジック コードを作成すると、複数のユーザーが同じデータを同時に操作して、生じる問題。
データ情報を Redis に保存し、対応するインターフェイスをリクエストして製品数量情報を取得します。
製品数量情報が 0 より大きい場合は、1 を差し引いて Redis に再保存します。
実行コードテストの問題。
/** * Redis数据库操作,超卖问题模拟 * @author * */ @RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; // 测试数据设置接口 @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } // 模拟商品超卖代码 @RequestMapping("/deductStock") public String deductStock() { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } return "end"; } }
単一アプリケーション モードでは、ストレス テストに jmeter
を使用します。
テスト結果:
各リクエストは 1 つのスレッドに相当します。複数のスレッドが同時にデータを取得すると、スレッド A はインベントリを 84 として取得します。このとき、スレッド B もプログラムに入り、CPU を占有し、インベントリに 84 としてアクセスします。最終的に、両方のスレッドはインベントリを 0 ずつ減分します。実際、もう 1 つのアイテムが販売されました。
データ処理がスレッド間で一貫していないため、synchronized
を使用してテストをロックできますか?
引き続き最初に単一サーバーをテストします
// 模拟商品超卖代码, // 设置synchronized同步锁 @RequestMapping("/deductStock1") public String deductStock1() { synchronized (this) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } } return "end"; }
数量100
# 再テスト後、取得されたログ情報は次のとおりです。
# スタンドアロン モードでは、同期キーワードを追加すると、確かに商品の売れすぎ現象を回避できます。
しかし、分散マイクロサービスでは、クラスターがサービス用にセットアップされている場合でも、同期によってデータの正確性が保証できるでしょうか?
複数のリクエストが登録センターによって負荷分散され、各マイクロサービスの処理インターフェイスが同期して追加されると仮定すると、
は引き続き同様に表示されます over sold
の問題:
#Redis は分散ロックを実装します
synchronized
単一サーバー
Lockの
JVMのみ、ただしディストリビューション多くの異なるサーバーがあるため、2 つ以上のスレッドが異なるサーバー上の製品数量情報を共同で操作します。
# # setnx key value
処理インターフェイスを変更し、キーを追加します// 模拟商品超卖代码 @RequestMapping("/deductStock2") public String deductStock2() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取“队列”形式排队! // 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if(stock > 0) { int realStock = stock -1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:"+realStock); }else { System.out.println("库存不足....."); } // 程序执行完成,则删除这个key stringRedisTemplate.delete(key); return "end"; }
1. インターフェイスに入るように要求します。キーが Redis に存在しない場合は、新しい setnx が作成されます。存在する場合は存在しません。新たに作成され、同時にエラーコードが返され、爆買いロジックは実行され続けません。
2. 作成が成功したら、スナップアップ ロジックを実行します。3. 急ぎ購入ロジックの実行後、データベース内のを単独で使用するよりもはるかに合理的ですが、スナップアップ操作中に例外が発生した場合、このsetnx
syn
に対応する
keyを削除します。他のリクエストを設定して実行できるようにします。
このロジックは、以前に
key は使用できません。使用済みです。
削除。その結果、他の処理リクエストは
key を取得できず、プログラム ロジックがデッドロック状態になります。
try...finally を使用して
/** * 模拟商品超卖代码 设置 * * @return */ @RequestMapping("/deductStock3") public String deductStock3() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }を操作できます。このロジックは、上記の他のロジックよりも厳密です。 ただし、停電やシステムクラッシュなどにより一連のサーバーが
ダウンタイム
になった場合、実行されるべきfinally のステートメントは正常に実行されません。 ! !また、
key が常に存在するようです その結果、
デッドロック!
タイムアウトによって上記の問題を解決します。
/** * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;<br> * 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;<br> * 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;<br> * 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 设置key有效时间 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }
しかし、上記のコードのロジックには依然として問題が存在します:
処理ロジックに
timeoutの問題がある場合。ロジックが実行され、設定されたキーの有効時間を超えると、この時点でどのような問題が発生しますか?
上の図から問題が明確にわかります:
リクエストの実行時間がキーの有効時間を超えた場合。新しいリクエストが実行されるとき、キーを取得する必要があり、時刻を設定できます;このとき redis に保存されるキーはリクエスト 1 のキーではなく、他のリクエストによって設定されます。
リクエスト 1 の実行が完了したら、ここでキーを削除します。削除されるのは他のリクエストで設定されたキーです。
依然出现了key形同虚设
的问题!如果失效一直存在,超卖问题依旧不会解决。
既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!
修改上述代码如下所示:
/** * 模拟商品超卖代码 <br> * 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }
由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁
我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁
@Component public class RedisLock { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final long acquireTimeout = 10*1000; // 获取锁之前的超时时间(获取锁的等待重试时间) private final int timeOut = 20; // 获取锁之后的超时时间(防止死锁) @Autowired private StringRedisTemplate stringRedisTemplate; // 引入String类型redis操作模板 /** * 获取分布式锁 * @return 锁标识 */ public boolean getRedisLock(String lockName,String lockValue) { // 1.计算获取锁的时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 2.尝试获取锁 while (System.currentTimeMillis() < endTime) { //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS); if (result) { return true; } } return false; } /** * 释放分布式锁 * @param lockName 锁名称 * @param lockValue 锁值 */ public void unRedisLock(String lockName,String lockValue) { if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) { stringRedisTemplate.delete(lockName); } } }
@RestController public class RedisController { // 引入String类型redis操作模板 @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisLock redisLock; @RequestMapping("/setStock") public String setStock() { stringRedisTemplate.opsForValue().set("stock", "100"); return "ok"; } @RequestMapping("/deductStock") public String deductStock() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); try { boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁 if (redisLock) { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } } finally { redisLock.unRedisLock(key,lock_value); //释放锁 } return "end"; } }
可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束
实际上这个最终版依然存在3个问题
1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。在GC操作完成并恢复后,执行del操作时,当前被加锁的key是否仍然存在?
2、问题如图所示
以上がSpringboot が Redis を統合して過剰販売の問題を解決する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。