Home  >  Article  >  Database  >  Detailed explanation of Sequence implementation method examples in Mysql

Detailed explanation of Sequence implementation method examples in Mysql

黄舟
黄舟Original
2017-09-08 11:31:432841browse

The following editor will bring you an article on how to implement Sequence based on Mysql. The editor thinks it’s pretty good, so I’ll share it with you now and give it as a reference. Let’s follow the editor to take a look at the

team’s replacement of the new framework. All new businesses use new frameworks and even new databases - Mysql.

Oracle has been used here before. Various order numbers, serial numbers, batch numbers, etc., all directly use the digital serial numbers provided by Oracle's sequence. Now that the database has been changed to Mysql, obviously the old method is no longer applicable.

Need to write a new one:

•Used in distributed scenarios

•Meet certain concurrency requirements

I searched for some relevant information and found that the implementation of mysql is based on a database record and continuously updates its value. Then most implementation solutions use functions.

Post the code from the Internet:

Based on the mysql function to implement the

table Structure


CREATE TABLE `t_sequence` (
`sequence_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '序列名称' ,
`value` int(11) NULL DEFAULT NULL COMMENT '当前值' ,
PRIMARY KEY (`sequence_name`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=COMPACT
;

Get the next value


CREATE DEFINER = `root`@`localhost` FUNCTION `nextval`(sequence_name varchar(64))
 RETURNS int(11)
BEGIN
 declare current integer;
 set current = 0;
 
 update t_sequence t set t.value = t.value + 1 where t.sequence_name = sequence_name;
 select t.value into current from t_sequence t where t.sequence_name = sequence_name;

 return current;
end;

Concurrency scenarios are possible Problems will arise. Although locking can be done at the business layer, it cannot be guaranteed in distributed scenarios, and the efficiency will not be high.

Implement one by myself, java version

Principle:

•Read a record, cache a data segment, such as: 0-100, change the current value of the record from 0 Modify to 100

•Database optimistic lock update, allow retries

•Read data from the cache, read the database again after use

No nonsense, let’s go Code:

Based on java implementation

Table structure

Every time you update, SEQ_VALUE is set to SEQ_VALUE+STEP


CREATE TABLE `t_pub_sequence` (
 `SEQ_NAME` varchar(128) CHARACTER SET utf8 NOT NULL COMMENT '序列名称',
 `SEQ_VALUE` bigint(20) NOT NULL COMMENT '目前序列值',
 `MIN_VALUE` bigint(20) NOT NULL COMMENT '最小值',
 `MAX_VALUE` bigint(20) NOT NULL COMMENT '最大值',
 `STEP` bigint(20) NOT NULL COMMENT '每次取值的数量',
 `TM_CREATE` datetime NOT NULL COMMENT '创建时间',
 `TM_SMP` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
 PRIMARY KEY (`SEQ_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流水号生成表';

sequence interface


/**
 * <p></p>
 * @author coderzl
 * @Title MysqlSequence
 * @Description 基于mysql数据库实现的序列
 * @date 2017/6/6 23:03
 */
public interface MysqlSequence {
 /**
  * <p>
  * 获取指定sequence的序列号
  * </p>
  * @param seqName sequence名
  * @return String 序列号
  */
 public String nextVal(String seqName);
}

Sequence interval

is used to cache a sequence locally, from min to max interval


/**
 * <p></p>
 *
 * @author coderzl
 * @Title SequenceRange
 * @Description 序列区间,用于缓存序列
 * @date 2017/6/6 22:58
 */
 @Data
public class SequenceRange {
 private final long  min;
 private final long  max;
 /** */
 private final AtomicLong value;
 /** 是否超限 */
 private volatile boolean over = false;

 /**
  * 构造.
  *
  * @param min 
  * @param max 
  */
 public SequenceRange(long min, long max) {
  this.min = min;
  this.max = max;
  this.value = new AtomicLong(min);
 }

 /**
  * <p>Gets and increment</p>
  *
  * @return 
  */
 public long getAndIncrement() {
  long currentValue = value.getAndIncrement();
  if (currentValue > max) {
   over = true;
   return -1;
  }

  return currentValue;
 }

}

BO

Corresponding database record


@Data
public class MysqlSequenceBo {
 /**
  * seq名
  */
 private String seqName;
 /**
  * 当前值
  */
 private Long seqValue;
 /**
  * 最小值
  */
 private Long minValue;
 /**
  * 最大值
  */
 private Long maxValue;
 /**
  * 每次取值的数量
  */
 private Long step;
 /** */
 private Date tmCreate;
 /** */
 private Date tmSmp;

 public boolean validate(){
  //一些简单的校验。如当前值必须在最大最小值之间。step值不能大于max与min的差
  if (StringUtil.isBlank(seqName) || minValue < 0 || maxValue <= 0 || step <= 0 || minValue >= maxValue || maxValue - minValue <= step ||seqValue < minValue || seqValue > maxValue ) {
   return false;
  }
  return true; 
 }
}

DAO

Add, delete, modify and check, in fact, use modification and check


public interface MysqlSequenceDAO {
 /**
 * 
 */
 public int createSequence(MysqlSequenceBo bo);

 public int updSequence(@Param("seqName") String seqName, @Param("oldValue") long oldValue ,@Param("newValue") long newValue);

 public int delSequence(@Param("seqName") String seqName);

 public MysqlSequenceBo getSequence(@Param("seqName") String seqName);

 public List<MysqlSequenceBo> getAll();
}

Mapper


<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.xxxxx.core.sequence.impl.dao.MysqlSequenceDAO" >
 <resultMap id="BaseResultMap" type="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  <result column="SEQ_NAME" property="seqName" jdbcType="VARCHAR" />
  <result column="SEQ_VALUE" property="seqValue" jdbcType="BIGINT" />
  <result column="MIN_VALUE" property="minValue" jdbcType="BIGINT" />
  <result column="MAX_VALUE" property="maxValue" jdbcType="BIGINT" />
  <result column="STEP" property="step" jdbcType="BIGINT" />
  <result column="TM_CREATE" property="tmCreate" jdbcType="TIMESTAMP" />
  <result column="TM_SMP" property="tmSmp" jdbcType="TIMESTAMP" />
 </resultMap>
 <delete id="delSequence" parameterType="java.lang.String" >
  delete from t_pub_sequence
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR}
 </delete>
 <insert id="createSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  insert into t_pub_sequence (SEQ_NAME,SEQ_VALUE,MIN_VALUE,MAX_VALUE,STEP,TM_CREATE)
  values (#{seqName,jdbcType=VARCHAR}, #{seqValue,jdbcType=BIGINT},
  #{minValue,jdbcType=BIGINT}, #{maxValue,jdbcType=BIGINT}, #{step,jdbcType=BIGINT},
  now())
 </insert>
 <update id="updSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  update t_pub_sequence
  set SEQ_VALUE = #{newValue,jdbcType=BIGINT}
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR} and SEQ_VALUE = #{oldValue,jdbcType=BIGINT}
 </update>

 <select id="getAll" resultMap="BaseResultMap" >
  select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP
  from t_pub_sequence
 </select>

 <select id="getSequence" resultMap="BaseResultMap" >
  select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP
  from t_pub_sequence
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR}
 </select>
</mapper>

Interface implementation


@Repository("mysqlSequence")
public class MysqlSequenceImpl implements MysqlSequence{

 @Autowired
 private MysqlSequenceFactory mysqlSequenceFactory;
 /**
  * <p>
  * 获取指定sequence的序列号
  * </p>
  *
  * @param seqName sequence名
  * @return String 序列号
  * @author coderzl
  */
 @Override
 public String nextVal(String seqName) {
  return Objects.toString(mysqlSequenceFactory.getNextVal(seqName));
 }
}

Factory

Factory only does Two things

•When the service starts, initialize all sequences in the database [Complete sequence interval cache]

•Get the next value of the sequence


@Component
public class MysqlSequenceFactory {

 private final Lock lock = new ReentrantLock();

 /** */
 private Map<String,MysqlSequenceHolder> holderMap = new ConcurrentHashMap<>();

 @Autowired
 private MysqlSequenceDAO msqlSequenceDAO;
 /** 单个sequence初始化乐观锁更新失败重试次数 */
 @Value("${seq.init.retry:5}")
 private int initRetryNum;
 /** 单个sequence更新序列区间乐观锁更新失败重试次数 */
 @Value("${seq.get.retry:20}")
 private int getRetryNum;

 @PostConstruct
 private void init(){
  //初始化所有sequence
  initAll();
 }


 /**
  * <p> 加载表中所有sequence,完成初始化 </p>
  * @return void
  * @author coderzl
  */
 private void initAll(){
  try {
   lock.lock();
   List<MysqlSequenceBo> boList = msqlSequenceDAO.getAll();
   if (boList == null) {
    throw new IllegalArgumentException("The sequenceRecord is null!");
   }
   for (MysqlSequenceBo bo : boList) {
    MysqlSequenceHolder holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum);
    holder.init();
    holderMap.put(bo.getSeqName(), holder);
   }
  }finally {
   lock.unlock();
  }
 }


 /**
  * <p> </p>
  * @param seqName
  * @return long
  * @author coderzl
  */
 public long getNextVal(String seqName){
  MysqlSequenceHolder holder = holderMap.get(seqName);
  if (holder == null) {
   try {
    lock.lock();
    holder = holderMap.get(seqName);
    if (holder != null){
     return holder.getNextVal();
    }
    MysqlSequenceBo bo = msqlSequenceDAO.getSequence(seqName);
    holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum);
    holder.init();
    holderMap.put(seqName, holder);
   }finally {
    lock.unlock();
   }
  }
  return holder.getNextVal();
 }

}

Single sequence Holder

•init() initialization includes parameter verification, database record update, and creation of sequence intervals

•getNextVal() to obtain the next value


public class MysqlSequenceHolder {

 private final Lock lock    = new ReentrantLock();

 /** seqName */
 private String seqName;

 /** sequenceDao */
 private MysqlSequenceDAO sequenceDAO;

 private MysqlSequenceBo sequenceBo;
 /** */
 private SequenceRange sequenceRange;
 /** 是否初始化 */
 private volatile boolean  isInitialize  = false;
 /** sequence初始化重试次数 */
 private int initRetryNum;
 /** sequence获取重试次数 */
 private int getRetryNum;

 /**
  * <p> 构造方法 </p>
  * @Title MysqlSequenceHolder
  * @param sequenceDAO 
  * @param sequenceBo
  * @param initRetryNum 初始化时,数据库更新失败后重试次数
  * @param getRetryNum 获取nextVal时,数据库更新失败后重试次数
  * @return
  * @author coderzl
  */
 public MysqlSequenceHolder(MysqlSequenceDAO sequenceDAO, MysqlSequenceBo sequenceBo,int initRetryNum,int getRetryNum) {
  this.sequenceDAO = sequenceDAO;
  this.sequenceBo = sequenceBo;
  this.initRetryNum = initRetryNum;
  this.getRetryNum = getRetryNum;
  if(sequenceBo != null)
   this.seqName = sequenceBo.getSeqName();
 }

 /**
  * <p> 初始化 </p>
  * @Title init
  * @param
  * @return void
  * @author coderzl
  */
 public void init(){
  if (isInitialize == true) {
   throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder has inited");
  }
  if (sequenceDAO == null) {
   throw new SequenceException("[" + seqName + "] the sequenceDao is null");
  }
  if (seqName == null || seqName.trim().length() == 0) {
   throw new SequenceException("[" + seqName + "] the sequenceName is null");
  }
  if (sequenceBo == null) {
   throw new SequenceException("[" + seqName + "] the sequenceBo is null");
  }
  if (!sequenceBo.validate()){
   throw new SequenceException("[" + seqName + "] the sequenceBo validate fail. BO:"+sequenceBo);
  }
  // 初始化该sequence
  try {
   initSequenceRecord(sequenceBo);
  } catch (SequenceException e) {
   throw e;
  }
  isInitialize = true;
 }

 /**
  * <p> 获取下一个序列号 </p>
  * @Title getNextVal
  * @param
  * @return long
  * @author coderzl
  */
 public long getNextVal(){
  if(isInitialize == false){
   throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder not inited");
  }
  if(sequenceRange == null){
   throw new SequenceException("[" + seqName + "] the sequenceRange is null");
  }
  long curValue = sequenceRange.getAndIncrement();

  if(curValue == -1){
   try{
    lock.lock();
    curValue = sequenceRange.getAndIncrement();
    if(curValue != -1){
     return curValue;
    }
    sequenceRange = retryRange();
    curValue = sequenceRange.getAndIncrement();
   }finally {
    lock.unlock();
   }
  }
  return curValue;
 }

 /**
  * <p> 初始化当前这条记录 </p>
  * @Title initSequenceRecord
  * @Description
  * @param sequenceBo
  * @return void
  * @author coderzl
  */
 private void initSequenceRecord(MysqlSequenceBo sequenceBo){
  //在限定次数内,乐观锁更新数据库记录
  for(int i = 1; i < initRetryNum; i++){
   //查询bo
   MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName());
   if(curBo == null){
    throw new SequenceException("[" + seqName + "] the current sequenceBo is null");
   }
   if (!curBo.validate()){
    throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail");
   }
   //改变当前值
   long newValue = curBo.getSeqValue()+curBo.getStep();
   //检查当前值
   if(!checkCurrentValue(newValue,curBo)){
    newValue = resetCurrentValue(curBo);
   }
   int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue);
   if(result > 0){
    sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1);
    curBo.setSeqValue(newValue);
    this.sequenceBo = curBo;
    return;
   }else{
    continue;
   }
  }
  //限定次数内,更新失败,抛出异常
  throw new SequenceException("[" + seqName + "] sequenceBo update error");
 }

 /**
  * <p> 检查新值是否合法 新的当前值是否在最大最小值之间</p>
  * @param curValue
  * @param curBo
  * @return boolean
  * @author coderzl
  */
 private boolean checkCurrentValue(long curValue,MysqlSequenceBo curBo){
  if(curValue > curBo.getMinValue() && curValue <= curBo.getMaxValue()){
   return true;
  }
  return false;
 }

 /**
  * <p> 重置sequence当前值 :当前sequence达到最大值时,重新从最小值开始 </p>
  * @Title resetCurrentValue
  * @param curBo
  * @return long
  * @author coderzl
  */
 private long resetCurrentValue(MysqlSequenceBo curBo){
  return curBo.getMinValue();
 }

 /**
  * <p> 缓存区间使用完毕时,重新读取数据库记录,缓存新序列段 </p>
  * @Title retryRange
  * @param SequenceRange
  * @author coderzl
  */
 private SequenceRange retryRange(){
  for(int i = 1; i < getRetryNum; i++){
   //查询bo
   MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName());
   if(curBo == null){
    throw new SequenceException("[" + seqName + "] the current sequenceBo is null");
   }
   if (!curBo.validate()){
    throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail");
   }
   //改变当前值
   long newValue = curBo.getSeqValue()+curBo.getStep();
   //检查当前值
   if(!checkCurrentValue(newValue,curBo)){
    newValue = resetCurrentValue(curBo);
   }
   int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue);
   if(result > 0){
    sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1);
    curBo.setSeqValue(newValue);
    this.sequenceBo = curBo;
    return sequenceRange;
   }else{
    continue;
   }
  }
  throw new SequenceException("[" + seqName + "] sequenceBo update error");

 }
}

Summary

•When the service is restarted or abnormal, the unused sequences cached by the current service will be lost

•Distributed In scenarios where multiple services are initialized at the same time, or when the sequence is reacquired, optimistic locking will ensure that they do not conflict with each other. Service A obtains 0-99, service B obtains 100-199, and so on

•When the sequence is obtained more frequently, increasing the step value can improve performance. But at the same time, when the service is abnormal, more sequences are lost

•Modify some attribute values ​​of the sequence in the database, such as step, max, etc., and the new parameters will be enabled the next time it is obtained from the database

•sequence only provides a limited number of sequence numbers (max-min at most). After reaching max, the cycle will start from the beginning.

•Since the sequence will loop, it will not be unique if it is obtained again after reaching the max. It is recommended to use sequence to do business serial numbers and splice time. For example: 20170612235101+serial number

Business id splicing method


@Service
public class JrnGeneratorService {
 private static final String SEQ_NAME = "T_SEQ_TEST";

 /** sequence服务 */
 @Autowired
 private MySqlSequence mySqlSequence;
 
 public String generateJrn() {
  try {
   String sequence = mySqlSequence.getNextValue(SEQ_NAME);
   sequence = leftPadding(sequence,8);
   Calendar calendar = Calendar.getInstance();
   SimpleDateFormat sDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
   String nowdate = sDateFormat.format(calendar.getTime());
   nowdate.substring(4, nowdate.length());
   String jrn = nowdate + sequence + RandomUtil.getFixedLengthRandom(6);//10位时间+8位序列 + 6位随机数=24位流水号
   return jrn;
  } catch (Exception e) {
   //TODO
  }
 }
 
 private String leftPadding(String seq,int len){
  String res ="";
  String str ="";
  if(seq.length()<len){
   for(int i=0;i<len-seq.length();i++){
    str +="0"; 
   }   
  }
  res =str+seq;
  return res;
  
 }

}

The above is the detailed content of Detailed explanation of Sequence implementation method examples in Mysql. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn