次のコラム Redis チュートリアル では、Redis とキューについて詳しく説明します。困っている友人の役に立てば幸いです。
Redis はキャッシュ サーバーとしてだけでなく、メッセージ キューとしても使用できます。そのリスト タイプは本質的にメッセージ キューとしての使用をサポートしています。以下の図に示すように:
Redis リストは二重リンク リストを使用して実装され、先頭ノードと末尾ノードが保存されるため、要素を次の位置に挿入するのが非常に高速です。リストの先頭と末尾。
したがって、lpush と rpop、または rpush と lpop という 2 つの簡単な命令だけで、Redis のリストを直接使用してメッセージ キューを実装できます。簡単な例は次のとおりです。
メッセージの終了 (メッセージ プロデューサー) を格納します:
package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; import java.util.concurrent.TimeUnit; /** * 消息生产者 * @author yamikaze */public class Producer extends Thread { public static final String MESSAGE_KEY = "message:queue"; private Jedis jedis; private String producerName; private volatile int count; public Producer(String name) { this.producerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void putMessage(String message) { Long size = jedis.lpush(MESSAGE_KEY, message); System.out.println(producerName + ": 当前未被处理消息条数为:" + size); count++; } public int getCount() { return count; } @Override public void run() { try { while (true) { putMessage(StringUtils.generate32Str()); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException{ Producer producer = new Producer("myProducer"); producer.start(); for(; ;) { System.out.println("main : 已存储消息条数:" + producer.getCount()); TimeUnit.SECONDS.sleep(10); } } }
メッセージの処理の終了 (メッセージ コンシューマ):
package org.yamikaze.redis.messsage.queue; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息消费者 * @author yamikaze */public class Customer extends Thread{ private String customerName; private volatile int count; private Jedis jedis; public Customer(String name) { this.customerName = name; init(); } private void init() { jedis = MyJedisFactory.getLocalJedis(); } public void processMessage() { String message = jedis.rpop(Producer.MESSAGE_KEY); if(message != null) { count++; handle(message); } } public void handle(String message) { System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条"); } @Override public void run() { while (true) { processMessage(); } } public static void main(String[] args) { Customer customer = new Customer("yamikaze"); customer.start(); } }
はかなり良いように思えますが、上記の例では、メッセージ コンシューマに問題があります。つまり、リスト内に保留中のメッセージがあるかどうかを確認するために rpop メソッドを常に呼び出す必要があります。呼び出しごとに接続が開始されるため、不必要な無駄が発生します。おそらく、Thread.sleep() やその他のメソッドを使用して、一定期間後にコンシューマー スレッドが再び消費できるようにすることになるでしょうが、これには 2 つの問題があります。
1) プロデューサの速度がコンシューマの消費速度、メッセージ キューの長さは増加し続け、時間の経過とともに多くのメモリ領域を占有します。
2) スリープ時間が長すぎると、一部の時間に敏感なメッセージを処理できなくなります。スリープ時間が短すぎると、接続に比較的大きなオーバーヘッドが発生します。
したがって、brpop 命令を使用できます。この命令は、要素がある場合にのみ返されます。要素がない場合は、タイムアウトになるまでブロックされ、null が返されます。そのため、コンシューマは processMessage を次のように変更できます:
public void processMessage() { /** * brpop支持多个列表(队列) * brpop指令是支持队列优先级的,比如这个例子中MESSAGE_KEY的优先级大于testKey(顺序决定)。 * 如果两个列表中都有元素,会优先返回优先级高的列表中的元素,所以这儿优先返回MESSAGE_KEY * 0表示不限制等待,会一直阻塞在这儿 */ List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey"); if(messages.size() != 0) { //由于该指令可以监听多个Key,所以返回的是一个列表 //列表由2项组成,1) 列表名,2)数据 String keyName = messages.get(0); //如果返回的是MESSAGE_KEY的消息 if(Producer.MESSAGE_KEY.equals(keyName)) { String message = messages.get(1); handle(message); } } System.out.println("======================="); }
その後、Customer を実行してコンソールをクリアすると、プログラムには出力がなく、ここ brpop でブロックされていることがわかります。次に、Redis クライアントを開き、コマンド client list を入力して、現在 2 つの接続があることを確認します。
メッセージ キューのサポートに加えて、Redis はパブリッシュ/サブスクライブ モードをサポートする一連のコマンドも提供します。 Redis のパブリッシュ/サブスクライブ モードを使用すると、1 回生成して複数回消費するキューを実装できます。
1) Publish
PUBLISH 命令を使用すると、PUBLISH チャネル メッセージ形式でメッセージを公開できます。
戻り値は、メッセージの購読者数を示します。
2) サブスクリプション
SUBSCRIBE 命令は、SUBSCRIBE チャネル形式のメッセージを受信するために使用されます
SUBSCRIBE 命令を使用した後、サブスクリプション モードに入ったが、メッセージを受信していないことがわかります。パブリッシュによって送信されたメッセージ。これは、サブスクリプションがメッセージを送信する前にのみ受信するためです。このモードの他のコマンドについては、応答のみが表示されます。応答には 3 つのタイプがあります:
1. サブスクライブの場合、2 番目の値はサブスクライブされたチャネルを示し、3 番目の値はどのチャネルがサブスクライブされているかを示します (シリアル番号として理解されますか?)
2. Ifはメッセージ、2 番目の値はメッセージを生成したチャネル、3 番目の値はメッセージ
3 です。 unsubscribe の場合、2 番目の値は未購読のチャネルを表し、3 番目の値は現在のクライアントのサブスクリプション数を表します。
コマンド UNSUBSCRIBE を使用して購読を解除できます。パラメータを追加しない場合、SUBSCRIBE コマンドによって購読されたすべてのチャネルが購読解除されます。
Redis は、ワイルドカードに基づくメッセージ サブスクリプションもサポートしています。コマンド PSUBSCRIBE (パターン サブスクライブ) を使用します。例:
メッセージを再度プッシュしようとすると、次の結果が得られます:
パブリッシュ コマンドが 2 を返し、サブスクライバがメッセージを 2 回受信したことがわかります。これは、PSUBSCRIBE コマンドが繰り返しチャネルにサブスクライブできるためです。 PSUBSCRIBE コマンドを使用してサブスクライブされたチャネルは、PUNSUBSCRIBE コマンドを使用してサブスクライブ解除する必要もあります。このコマンドは、SUBSCRIBE によってサブスクライブされたチャネルからはサブスクライブ解除できません。同様に、UNSUBSCRIBE は、PSUBSCRIBE コマンドによってサブスクライブされたチャネルからサブスクライブ解除できません。同時に、PUNSUBSCRIBE 命令のワイルドカードは展開されません。
例: PUNSUBSCRIBE * は、channel.* と一致しないため、channel.* の購読を解除するには、次のように PUBSUBSCRIBE channel.* を記述する必要があります。
コードのデモは次のとおりです:
package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; /** * 消息发布方 * @author yamikaze */public class Publisher { public static final String CHANNEL_KEY = "channel:message"; private Jedis jedis; public Publisher() { jedis = MyJedisFactory.getLocalJedis(); } public void publishMessage(String message) { if(StringUtils.isBlank(message)) { return; } jedis.publish(CHANNEL_KEY, message); } public static void main(String[] args) { Publisher publisher = new Publisher(); publisher.publishMessage("Hello Redis!"); } }
メッセージを送信するだけです。
メッセージ サブスクライバー:
package org.yamikaze.redis.messsage.subscribe; import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; import java.util.concurrent.TimeUnit; /** * 消息订阅方客户端 * @author yamikaze */public class SubscribeClient { private Jedis jedis; private static final String EXIT_COMMAND = "exit"; public SubscribeClient() { jedis = MyJedisFactory.getLocalJedis(); } public void subscribe(String ...channel) { if(channel == null || channel.length <= 0) { return; } //消息处理,接收到消息时如何处理 JedisPubSub jps = new JedisPubSub() { /** * JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现 * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法 * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[] */ @Override public void onMessage(String channel, String message) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("接收到消息: channel : " + message); //接收到exit消息后退出 if(EXIT_COMMAND.equals(message)) { System.exit(0); } } } /** * 订阅时 */ @Override public void onSubscribe(String channel, int subscribedChannels) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("订阅了频道:" + channel); } } }; //可以订阅多个频道 当前线程会阻塞在这儿 jedis.subscribe(jps, channel); } public static void main(String[] args) { SubscribeClient client = new SubscribeClient(); client.subscribe(Publisher.CHANNEL_KEY); //并没有 unsubscribe方法 //相应的也没有punsubscribe方法 } }
まずクライアントを実行し、次にパブリッシャーを実行してメッセージを送信します。出力結果:
# #Redis Pub/sub には、コンシューマーがオフラインになるとプロデューサーのメッセージが失われるという欠点もあります。
遅延キューバックグラウンドビジネス開発のプロセスでは、次のような遅延処理が必要なシナリオがいくつかあります。a.订单下单之后超过30分钟用户未支付,需要取消订单
b.订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
c.点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。。
处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求时候,采用了延时队列来完成。
延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列:
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化
优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息
优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列
Redis由于其自身的Zset数据结构,本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?
试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。
Zset的排列效果如下图:
java代码实现如下:
package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/** * @program: test * @description: redis实现延时队列 * @author: xingcheng * @create: 2018-08-19 **/public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); private static CountDownLatch cdl = new CountDownLatch(10); public static Jedis getJedis() { return jedisPool.getResource(); } /** * 生产者,生成5个订单 */ public void productionDelayMessage() { for (int i = 0; i < 5; i++) { Calendar instance = Calendar.getInstance(); // 3秒后执行 instance.add(Calendar.SECOND, 3 + i); AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1)); System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println((3 + i) + "秒后执行"); } } //消费者,取订单 public static void consumerDelayMessage() { Jedis jedis = AppTest.getJedis(); while (true) { Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0); if (order == null || order.isEmpty()) { System.out.println("当前没有等待的任务"); try { TimeUnit.MICROSECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } continue; } Tuple tuple = (Tuple) order.toArray()[0]; double score = tuple.getScore(); Calendar instance = Calendar.getInstance(); long nowTime = instance.getTimeInMillis() / 1000; if (nowTime >= score) { String element = tuple.getElement(); Long orderId = jedis.zrem("orderId", element); if (orderId > 0) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element); } } } } static class DelayMessage implements Runnable{ @Override public void run() { try { cdl.await(); consumerDelayMessage(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { AppTest appTest = new AppTest(); appTest.productionDelayMessage(); for (int i = 0; i < 10; i++) { new Thread(new DelayMessage()).start(); cdl.countDown(); } } }
实现效果如下:
以上がRedisとキューの詳しい説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。