>  기사  >  Java  >  Java가 Redis를 사용하여 메시지 대기열을 구현하는 방법에 대한 구체적인 분석

Java가 Redis를 사용하여 메시지 대기열을 구현하는 방법에 대한 구체적인 분석

黄舟
黄舟원래의
2017-07-24 15:39:242007검색

이 글에서는 주로 Redis를 사용하여 메시지 큐를 구현하는 Java의 샘플 코드를 소개합니다. 편집자는 꽤 좋다고 생각하므로 지금 공유하고 참고용으로 제공하겠습니다. 편집기를 따라 살펴보겠습니다

이 글에서는 Redis를 사용하여 메시지 큐를 구현하는 Java의 샘플 코드를 소개하고 모든 사람과 공유합니다.

바이너리 저장, Java 직렬화 전송, 많은 IO 연결 수, 빈번한 연결

1. 직렬화

여기에는 주로 객체를 바이트 배열로 변환하고 바이트 배열로 역직렬화하는 Java 직렬화 도구가 작성되어 있습니다. .java 객체; ByteArrayOutputStream 및 ByteArrayInputStream이 주로 사용됩니다. 참고: 직렬화해야 하는 모든 객체는 직렬화 가능 인터페이스를 구현해야 합니다.

코드는 다음과 같습니다.

package Utils;
import java.io.*;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class ObjectUtil {
 /**
  * 对象转byte[]
  * @param obj
  * @return
  * @throws IOException
  */
 public static byte[] object2Bytes(Object obj) throws IOException{
  ByteArrayOutputStream bo=new ByteArrayOutputStream();
  ObjectOutputStream oo=new ObjectOutputStream(bo);
  oo.writeObject(obj);
  byte[] bytes=bo.toByteArray();
  bo.close();
  oo.close();
  return bytes;
 }
 /**
  * byte[]转对象
  * @param bytes
  * @return
  * @throws Exception
  */
 public static Object bytes2Object(byte[] bytes) throws Exception{
  ByteArrayInputStream in=new ByteArrayInputStream(bytes);
  ObjectInputStream sIn=new ObjectInputStream(in);
  return sIn.readObject();
 }
}

직렬화 가능한 인터페이스)


package Model;

import java.io.Serializable;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Message implements Serializable {

 private static final long serialVersionUID = -389326121047047723L;
 private int id;
 private String content;
 public Message(int id, String content) {
  this.id = id;
  this.content = content;
 }
 public int getId() {
  return id;
 }
 public void setId(int id) {
  this.id = id;
 }
 public String getContent() {
  return content;
 }
 public void setContent(String content) {
  this.content = content;
 }
}

3. Redis 작업


redis를 대기열로 사용하여 redis에서 목록의 push 및 pop 작업을 사용합니다. 대기열:

새 요소는 한쪽 끝과 대기열 끝에서만 삽입할 수 있습니다. FIFO: 선입선출 원칙 Redis에서는 lpush 헤드인(rpop tail-out) 또는 rpush tail-in(lpop head-out)은 요구 사항을 충족할 수 있습니다. Redis 목록의 push 또는 pop 개체는 byte[]

Java는 Redis 저장소에 Jedis를 사용합니다. 및 Redis 연결 풀 설정

위 코드:


package Utils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * Created by Kinglf on 2016/10/17.
 */
public class JedisUtil {
 private static String JEDIS_IP;
 private static int JEDIS_PORT;
 private static String JEDIS_PASSWORD;
 private static JedisPool jedisPool;
 static {
  //Configuration自行写的配置文件解析类,继承自Properties
  Configuration conf=Configuration.getInstance();
  JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
  JEDIS_PORT=conf.getInt("jedis.port",6379);
  JEDIS_PASSWORD=conf.getString("jedis.password",null);
  JedisPoolConfig config=new JedisPoolConfig();
  config.setMaxActive(5000);
  config.setMaxIdle(256);
  config.setMaxWait(5000L);
  config.setTestOnBorrow(true);
  config.setTestOnReturn(true);
  config.setTestWhileIdle(true);
  config.setMinEvictableIdleTimeMillis(60000L);
  config.setTimeBetweenEvictionRunsMillis(3000L);
  config.setNumTestsPerEvictionRun(-1);
  jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
 }
 /**
  * 获取数据
  * @param key
  * @return
  */
 public static String get(String key){
  String value=null;
  Jedis jedis=null;
  try{
   jedis=jedisPool.getResource();
   value=jedis.get(key);
  }catch (Exception e){
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  }finally {
   close(jedis);
  }
  return value;
 }

 private static void close(Jedis jedis) {
  try{
   jedisPool.returnResource(jedis);
  }catch (Exception e){
   if(jedis.isConnected()){
    jedis.quit();
    jedis.disconnect();
   }
  }
 }
 public static byte[] get(byte[] key){
  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.get(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }

 public static void set(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void set(byte[] key, byte[] value, int time) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.set(key, value);
   jedis.expire(key, time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(byte[] key, byte[] field, byte[] value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static void hset(String key, String field, String value) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hset(key, field, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static String hget(String key, String field) {

  String value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 /**
  * 获取数据
  *
  * @param key
  * @return
  */
 public static byte[] hget(byte[] key, byte[] field) {

  byte[] value = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   value = jedis.hget(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }

  return value;
 }
 public static void hdel(byte[] key, byte[] field) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hdel(key, field);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }
 /**
  * 存储REDIS队列 顺序存储
  * @param key reids键名
  * @param value 键值
  */
 public static void lpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.lpush(key, value);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 /**
  * 存储REDIS队列 反向存储
  * @param key reids键名
  * @param value 键值
  */
 public static void rpush(byte[] key, byte[] value) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpush(key, value);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
  * @param key reids键名
  * @param destination 键值
  */
 public static void rpoplpush(byte[] key, byte[] destination) {

  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.rpoplpush(key, destination);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
 }

 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static List lpopList(byte[] key) {

  List list = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   list = jedis.lrange(key, 0, -1);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return list;
 }
 /**
  * 获取队列数据
  * @param key 键名
  * @return
  */
 public static byte[] rpop(byte[] key) {

  byte[] bytes = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   bytes = jedis.rpop(key);

  } catch (Exception e) {

   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {

   //返还到连接池
   close(jedis);

  }
  return bytes;
 }
 public static void hmset(Object key, Map hash) {
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static void hmset(Object key, Map hash, int time) {
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   jedis.hmset(key.toString(), hash);
   jedis.expire(key.toString(), time);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
 }
 public static List hmget(Object key, String... fields) {
  List result = null;
  Jedis jedis = null;
  try {

   jedis = jedisPool.getResource();
   result = jedis.hmget(key.toString(), fields);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }

 public static Set hkeys(String key) {
  Set result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hkeys(key);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static List lrange(byte[] key, int from, int to) {
  List result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.lrange(key, from, to);

  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);

  }
  return result;
 }
 public static Map hgetAll(byte[] key) {
  Map result = null;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   result = jedis.hgetAll(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();

  } finally {
   //返还到连接池
   close(jedis);
  }
  return result;
 }

 public static void del(byte[] key) {

  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.del(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
 }

 public static long llen(byte[] key) {

  long len = 0;
  Jedis jedis = null;
  try {
   jedis = jedisPool.getResource();
   jedis.llen(key);
  } catch (Exception e) {
   //释放redis对象
   jedisPool.returnBrokenResource(jedis);
   e.printStackTrace();
  } finally {
   //返还到连接池
   close(jedis);
  }
  return len;
 }
}

只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可

java采用Jedis进行Redis的存储和Redis的连接池设置 4. 구성은 주로 Redis 구성 정보를 읽는 데 사용됩니다.


package Utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class Configuration extends Properties {

 private static final long serialVersionUID = -2296275030489943706L;
 private static Configuration instance = null;

 public static synchronized Configuration getInstance() {
  if (instance == null) {
   instance = new Configuration();
  }
  return instance;
 }


 public String getProperty(String key, String defaultValue) {
  String val = getProperty(key);
  return (val == null || val.isEmpty()) ? defaultValue : val;
 }

 public String getString(String name, String defaultValue) {
  return this.getProperty(name, defaultValue);
 }

 public int getInt(String name, int defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public long getLong(String name, long defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
 }

 public float getFloat(String name, float defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
 }

 public double getDouble(String name, double defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
 }

 public byte getByte(String name, byte defaultValue) {
  String val = this.getProperty(name);
  return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
 }

 public Configuration() {
  InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
  try {
   this.loadFromXML(in);
   in.close();
  } catch (IOException ioe) {

  }
 }
}

5. 테스트


import Model.Message;
import Utils.JedisUtil;
import Utils.ObjectUtil;
import redis.clients.jedis.Jedis;

import java.io.IOException;

/**
 * Created by Kinglf on 2016/10/17.
 */
public class TestRedisQueue {
 public static byte[] redisKey = "key".getBytes();
 static {
  try {
   init();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }

 private static void init() throws IOException {
  for (int i = 0; i < 1000000; i++) {
   Message message = new Message(i, "这是第" + i + "个内容");
   JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
  }

 }

 public static void main(String[] args) {
  try {
   pop();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 private static void pop() throws Exception {
  byte[] bytes = JedisUtil.rpop(redisKey);
  Message msg = (Message) ObjectUtil.bytes2Object(bytes);
  if (msg != null) {
   System.out.println(msg.getId() + "----" + msg.getContent());
  }
 }
}

每执行一次pop()方法,结果如下:
<br>1----这是第1个内容
<br>2----这是第2个内容
<br>3----这是第3个内容
<br>4----这是第4个内容

Summary


이제 전체 Redis 메시지 큐의 생산자 및 소비자 코드가 완성되었습니다

1.전송해야 할 메시지 엔터티 클래스(구현해야 함) 직렬화 가능한 인터페이스)2.Configuration Redis 구성 읽기 클래스, Properties에서 상속됨

3.ObjectUtil은 객체와 바이트 배열을 양방향으로 변환하는 도구 클래스

4.Jedis는 첫 번째를 사용하는 도구 클래스 메시지 대기열의 FIFO(입선출) 특성을 가지며 Redis 목록에서 푸시 및 팝 작업을 결합합니다

위 내용은 Java가 Redis를 사용하여 메시지 대기열을 구현하는 방법에 대한 구체적인 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.