ホームページ >Java >&#&チュートリアル >Java が Redis を使用してメッセージ キューを実装する方法の具体的な分析

Java が Redis を使用してメッセージ キューを実装する方法の具体的な分析

黄舟
黄舟オリジナル
2017-07-24 15:39:242045ブラウズ

この記事では、Redis を使用してメッセージ キューを実装する Java のサンプル コードを主に紹介します。編集者が非常に優れていると考えたので、参考として共有します。エディターをフォローして見てみましょう

この記事では、Redis を使用してメッセージキューを実装する Java のサンプルコードを紹介し、それを皆さんと共有します。詳細は次のとおりです:

アプリケーションシナリオ

なぜ Redis を使用するのですか?

バイナリストレージ、Javaシリアル化送信、多数のIO接続、頻繁な接続

1.シリアル化

Javaシリアル化ツールがここに書かれており、主にオブジェクトをバイト配列に変換し、それらをバイト配列に逆シリアル化します。 . Java オブジェクト; ByteArrayOutputStream と ByteArrayInputStream が主に使用されます。 注: シリアル化する必要があるすべてのオブジェクトは、Serializable インターフェイスを実装する必要があります。

package Utils;
import java.io.*;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class ObjectUtil {
 /**
  * 对象转byte[]
  * @param obj
  * @return
  * @throws IOException
  */
 public static byte[] object2Bytes(Object obj) throws IOException{
  ByteArrayOutputStream bo=new ByteArrayOutputStream();
  ObjectOutputStream oo=new ObjectOutputStream(bo);
  oo.writeObject(obj);
  byte[] bytes=bo.toByteArray();
  bo.close();
  oo.close();
  return bytes;
 }
 /**
  * byte[]转对象
  * @param bytes
  * @return
  * @throws Exception
  */
 public static Object bytes2Object(byte[] bytes) throws Exception{
  ByteArrayInputStream in=new ByteArrayInputStream(bytes);
  ObjectInputStream sIn=new ObjectInputStream(in);
  return sIn.readObject();
 }
}

2. メッセージ クラス (シリアル化可能なインターフェース)

package Model;

import java.io.Serializable;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Message implements Serializable {

 private static final long serialVersionUID = -389326121047047723L;
 private int id;
 private String content;
 public Message(int id, String content) {
  this.id = id;
  this.content = content;
 }
 public int getId() {
  return id;
 }
 public void setId(int id) {
  this.id = id;
 }
 public String getContent() {
  return content;
 }
 public void setContent(String content) {
  this.content = content;
 }
}

3. Redis の操作

Redis をキューとして使用し、Redis 内のリストのプッシュおよびポップ操作を使用します

の特性と組み合わせます。キュー:


新しい要素は、キューの一端とキューの最後にのみ挿入できます。FIFO: 先入れ先出し原則Redis では、lpush ヘッドイン (rpop) Redis では、リスト内のプッシュ オブジェクトまたはポップ オブジェクトを byte[] に変換するだけで済みます。
java では、Jedis を使用します。 Redis ストレージと Redis 接続プールの設定

上記のコード:只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可

java采用Jedis进行Redis的存储和Redis的连接池设置

package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class JedisUtil {
 private static String JEDIS_IP;
 private static int JEDIS_PORT;
 private static String JEDIS_PASSWORD;
 private static JedisPool jedisPool;
 static {
  //Configuration自行写的配置文件解析类,继承自Properties
  Configuration conf=Configuration.getInstance();
  JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
  JEDIS_PORT=conf.getInt("jedis.port",6379);
  JEDIS_PASSWORD=conf.getString("jedis.password",null);
  JedisPoolConfig config=new JedisPoolConfig();
  config.setMaxActive(5000);
  config.setMaxIdle(256);
  config.setMaxWait(5000L);
  config.setTestOnBorrow(true);
  config.setTestOnReturn(true);
  config.setTestWhileIdle(true);
  config.setMinEvictableIdleTimeMillis(60000L);
  config.setTimeBetweenEvictionRunsMillis(3000L);
  config.setNumTestsPerEvictionRun(-1);
  jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
 }
 /**
  * 获取数据
  * @param key
  * @return
  */
 public static String get(String key){
  String value=null;
  Jedis jedis=null;
  try{
   jedis=jedisPool.getResource();
   value=jedis.get(key);
  }catch (Exception e){
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  }finally {
   close(jedis);
  }
  return value;
 }

 private static void close(Jedis jedis) {
  try{
   jedisPool.returnResource(jedis);
  }catch (Exception e){
   if(jedis.isConnected()){
    jedis.quit();
    jedis.disconnect();
   }
  }
 }
 public static byte[] get(byte[] key){
  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.get(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }

 public static void set(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void set(byte[] key, byte[] value, int time) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
   jedis.expire(key, time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(byte[] key, byte[] field, byte[] value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(String key, String field, String value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static String hget(String key, String field) {

  String value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static byte[] hget(byte[] key, byte[] field) {

  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 public static void hdel(byte[] key, byte[] field) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hdel(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }
 /**
  * 存储REDIS队列 顺序存储
  * @param key reids键名
  * @param value 键值
  */
 public static void lpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.lpush(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 存储REDIS队列 反向存储
  * @param key reids键名
  * @param value 键值
  */
 public static void rpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpush(key, value);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
  * @param key reids键名
  * @param destination 键值
  */
 public static void rpoplpush(byte[] key, byte[] destination) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpoplpush(key, destination);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static List lpopList(byte[] key) {

  List list = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   list = jedis.lrange(key, 0, -1);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return list;
 }
 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static byte[] rpop(byte[] key) {

  byte[] bytes = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   bytes = jedis.rpop(key);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return bytes;
 }
 public static void hmset(Object key, Map hash) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static void hmset(Object key, Map hash, int time) {
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
   jedis.expire(key.toString(), time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static List hmget(Object key, String... fields) {
  List result = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   result = jedis.hmget(key.toString(), fields);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }

 public static Set hkeys(String key) {
  Set result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hkeys(key);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static List lrange(byte[] key, int from, int to) {
  List result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.lrange(key, from, to);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static Map hgetAll(byte[] key) {
  Map result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hgetAll(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);
  }
  return result;
 }

 public static void del(byte[] key) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.del(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static long llen(byte[] key) {

  long len = 0;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.llen(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
  return len;
 }
}


4. 構成は主に Redis 構成情報を読み取るために使用されます

package Utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Configuration extends Properties {

 private static final long serialVersionUID = -2296275030489943706L;
 private static Configuration instance = null;

 public static synchronized Configuration getInstance() {
  if (instance == null) {
   instance = new Configuration();
  }
  return instance;
 }


 public String getProperty(String key, String defaultValue) {
  String val = getProperty(key);
  return (val == null || val.isEmpty()) ? defaultValue : val;
 }

 public String getString(String name, String defaultValue) {
  return this.getProperty(name, defaultValue);
 }

 public int getInt(String name, int defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public long getLong(String name, long defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public float getFloat(String name, float defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
 }

 public double getDouble(String name, double defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
 }

 public byte getByte(String name, byte defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
 }

 public Configuration() {
  InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
  try {
   this.loadFromXML(in);
   in.close();
  } catch (IOException ioe) {

  }
 }
}


5。

import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;

import java.io.IOException;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class TestRedisQueue {
 public static byte[] redisKey = "key".getBytes();
 static {
  try {
   init();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 private static void init() throws IOException {
  for (int i = 0; i < 1000000; i++) {
   Message message = new Message(i, "这是第" + i + "个内容");
   JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
  }

 }

 public static void main(String[] args) {
  try {
   pop();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 private static void pop() throws Exception {
  byte[] bytes = JedisUtil.rpop(redisKey);
  Message msg = (Message) ObjectUtil.bytes2Object(bytes);
  if (msg != null) {
   System.out.println(msg.getId() + "----" + msg.getContent());
  }
 }
}

每执行一次pop()方法,结果如下:
<br>1----这是第1个内容
<br>2----这是第2个内容
<br>3----这是第3个内容
<br>4----这是第4个内容

概要

以上で、Redis メッセージキュー全体のプロデューサーコードとコンシューマーコードが完成しました1.送信する必要があるメッセージエンティティクラス(送信する必要があります) Serializableインターフェイスを実装します)


2.Propertiesから継承したConfiguration Redis構成読み取りクラス


3.ObjectUtilは、オブジェクトとバイト配列を両方向に変換するツールクラスです


4.Jedisは、を使用するツールクラスですメッセージキューの先入れ先出し (FIFO) 特性を利用し、Redis リストでプッシュ操作とポップ操作を組み合わせます

以上がJava が Redis を使用してメッセージ キューを実装する方法の具体的な分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。