Specific analysis of how Java uses Redis to implement message queues
This article mainly introduces the sample code of Java using Redis to implement message queue. The editor thinks it is quite good. Now I will share it with you and give it as a reference. Let’s follow the editor and take a look.
This article introduces the sample code of Java using Redis to implement message queue, and shares it with everyone. The details are as follows:
Application scenarios
Why use redis?
Binary storage, java serialized transmission, high number of IO connections, frequent connections
1. Serialization
A java serialization tool is written here, which mainly converts objects into byte arrays and deserializes them into java objects based on byte arrays; mainly using ByteArrayOutputStream and ByteArrayInputStream ; Note: Every object that needs to be serialized must implement the Serializable interface;
The code is as follows:
package Utils; import java.io.*; /** * Created by Kinglf on 2016/10/17. */ public class ObjectUtil { /** * 对象转byte[] * @param obj * @return * @throws IOException */ public static byte[] object2Bytes(Object obj) throws IOException{ ByteArrayOutputStream bo=new ByteArrayOutputStream(); ObjectOutputStream oo=new ObjectOutputStream(bo); oo.writeObject(obj); byte[] bytes=bo.toByteArray(); bo.close(); oo.close(); return bytes; } /** * byte[]转对象 * @param bytes * @return * @throws Exception */ public static Object bytes2Object(byte[] bytes) throws Exception{ ByteArrayInputStream in=new ByteArrayInputStream(bytes); ObjectInputStream sIn=new ObjectInputStream(in); return sIn.readObject(); } }
2. Message class (implementing Serializable interface)
##
package Model; import java.io.Serializable; /** * Created by Kinglf on 2016/10/17. */ public class Message implements Serializable { private static final long serialVersionUID = -389326121047047723L; private int id; private String content; public Message(int id, String content) { this.id = id; this.content = content; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
3. Redis operation
Using redis as a queue, we use the push and pop operations of the list in redis; Combined with the characteristics of the queue:
Only allowed to insert new elements at one end only at the end of the queue FIFO: first in first out principle In Redis, lpush head in (rpop tail out) or rpush tail in (lpop head out) can be To meet the requirements, the list medicine push or pop objects in Redis only need to be converted into byte[]
java uses Jedis for Redis storage and Redis connection pool settings
package Utils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import java.util.List; import java.util.Map; import java.util.Set; /** * Created by Kinglf on 2016/10/17. */ public class JedisUtil { private static String JEDIS_IP; private static int JEDIS_PORT; private static String JEDIS_PASSWORD; private static JedisPool jedisPool; static { //Configuration自行写的配置文件解析类,继承自Properties Configuration conf=Configuration.getInstance(); JEDIS_IP=conf.getString("jedis.ip","127.0.0.1"); JEDIS_PORT=conf.getInt("jedis.port",6379); JEDIS_PASSWORD=conf.getString("jedis.password",null); JedisPoolConfig config=new JedisPoolConfig(); config.setMaxActive(5000); config.setMaxIdle(256); config.setMaxWait(5000L); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); config.setMinEvictableIdleTimeMillis(60000L); config.setTimeBetweenEvictionRunsMillis(3000L); config.setNumTestsPerEvictionRun(-1); jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000); } /** * 获取数据 * @param key * @return */ public static String get(String key){ String value=null; Jedis jedis=null; try{ jedis=jedisPool.getResource(); value=jedis.get(key); }catch (Exception e){ jedisPool.returnBrokenResource(jedis); e.printStackTrace(); }finally { close(jedis); } return value; } private static void close(Jedis jedis) { try{ jedisPool.returnResource(jedis); }catch (Exception e){ if(jedis.isConnected()){ jedis.quit(); jedis.disconnect(); } } } public static byte[] get(byte[] key){ byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.get(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void set(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void set(byte[] key, byte[] value, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.set(key, value); jedis.expire(key, time); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(byte[] key, byte[] field, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hset(String key, String field, String value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hset(key, field, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取数据 * * @param key * @return */ public static String hget(String key, String field) { String value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } /** * 获取数据 * * @param key * @return */ public static byte[] hget(byte[] key, byte[] field) { byte[] value = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); value = jedis.hget(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return value; } public static void hdel(byte[] key, byte[] field) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hdel(key, field); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储REDIS队列 顺序存储 * @param key reids键名 * @param value 键值 */ public static void lpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.lpush(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 存储REDIS队列 反向存储 * @param key reids键名 * @param value 键值 */ public static void rpush(byte[] key, byte[] value) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpush(key, value); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端 * @param key reids键名 * @param destination 键值 */ public static void rpoplpush(byte[] key, byte[] destination) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.rpoplpush(key, destination); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } /** * 获取队列数据 * @param key 键名 * @return */ public static List lpopList(byte[] key) { List list = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); list = jedis.lrange(key, 0, -1); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return list; } /** * 获取队列数据 * @param key 键名 * @return */ public static byte[] rpop(byte[] key) { byte[] bytes = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); bytes = jedis.rpop(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return bytes; } public static void hmset(Object key, Map hash) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static void hmset(Object key, Map hash, int time) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.hmset(key.toString(), hash); jedis.expire(key.toString(), time); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static List hmget(Object key, String... fields) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hmget(key.toString(), fields); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static Set hkeys(String key) { Set result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hkeys(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static List lrange(byte[] key, int from, int to) { List result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.lrange(key, from, to); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static Map hgetAll(byte[] key) { Map result = null; Jedis jedis = null; try { jedis = jedisPool.getResource(); result = jedis.hgetAll(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return result; } public static void del(byte[] key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.del(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } } public static long llen(byte[] key) { long len = 0; Jedis jedis = null; try { jedis = jedisPool.getResource(); jedis.llen(key); } catch (Exception e) { //释放redis对象 jedisPool.returnBrokenResource(jedis); e.printStackTrace(); } finally { //返还到连接池 close(jedis); } return len; } }
Fourth, Configuration is mainly used to read Redis configuration information
package Utils; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * Created by Kinglf on 2016/10/17. */ public class Configuration extends Properties { private static final long serialVersionUID = -2296275030489943706L; private static Configuration instance = null; public static synchronized Configuration getInstance() { if (instance == null) { instance = new Configuration(); } return instance; } public String getProperty(String key, String defaultValue) { String val = getProperty(key); return (val == null || val.isEmpty()) ? defaultValue : val; } public String getString(String name, String defaultValue) { return this.getProperty(name, defaultValue); } public int getInt(String name, int defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public long getLong(String name, long defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public float getFloat(String name, float defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val); } public double getDouble(String name, double defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val); } public byte getByte(String name, byte defaultValue) { String val = this.getProperty(name); return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val); } public Configuration() { InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml"); try { this.loadFromXML(in); in.close(); } catch (IOException ioe) { } } }
5. Test
import Model.Message; import Utils.JedisUtil; import Utils.ObjectUtil; import redis.clients.jedis.Jedis; import java.io.IOException; /** * Created by Kinglf on 2016/10/17. */ public class TestRedisQueue { public static byte[] redisKey = "key".getBytes(); static { try { init(); } catch (IOException e) { e.printStackTrace(); } } private static void init() throws IOException { for (int i = 0; i < 1000000; i++) { Message message = new Message(i, "这是第" + i + "个内容"); JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message)); } } public static void main(String[] args) { try { pop(); } catch (Exception e) { e.printStackTrace(); } } private static void pop() throws Exception { byte[] bytes = JedisUtil.rpop(redisKey); Message msg = (Message) ObjectUtil.bytes2Object(bytes); if (msg != null) { System.out.println(msg.getId() + "----" + msg.getContent()); } } }
每执行一次pop()方法,结果如下: <br>1----这是第1个内容 <br>2----这是第2个内容 <br>3----这是第3个内容 <br>4----这是第4个内容
Summary
At this point, the producer and consumer code of the entire Redis message queue has been completedThe above is the detailed content of Specific analysis of how Java uses Redis to implement message queues. For more information, please follow other related articles on the PHP Chinese website!

How to avoid repeated execution of timed tasks in SpringBoot multi-node environment? In Spring...

Deeply discussing properties and states in object-oriented programming. In object-oriented programming, the concepts of properties and state are often confused, and there is a subtle between them...

How to deal with digital overflow errors when connecting to Oracle database in IDEA When we are using IntelliJ...

When studying the MyBatis framework, developers often encounter various problems about annotations. One of the common questions is how to use the @ResultType annotation correctly...

Methods of using natural language processing technology to query personnel data In modern enterprises, the management and query of personnel data is a common requirement. Suppose we...

Database access performance problem in Springboot project multi-data source configuration This article aims at using Atomikos for multi-data source configuration in a Springboot project...

When packaging a Java project into an executable JAR file, it encounters the problem of NoClassDefFoundError. Many Java developers may...

Regarding the analysis method of IntelliJIDEA cracking in the programming world, IntelliJ...


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

SecLists
SecLists is the ultimate security tester's companion. It is a collection of various types of lists that are frequently used during security assessments, all in one place. SecLists helps make security testing more efficient and productive by conveniently providing all the lists a security tester might need. List types include usernames, passwords, URLs, fuzzing payloads, sensitive data patterns, web shells, and more. The tester can simply pull this repository onto a new test machine and he will have access to every type of list he needs.

EditPlus Chinese cracked version
Small size, syntax highlighting, does not support code prompt function

Zend Studio 13.0.1
Powerful PHP integrated development environment

SublimeText3 English version
Recommended: Win version, supports code prompts!

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool