Der folgende Editor bringt Ihnen einen Artikel über die Implementierung von Sequence basierend auf MySQL. Der Herausgeber findet es ziemlich gut, deshalb teile ich es jetzt mit Ihnen und gebe es als Referenz. Folgen wir dem Herausgeber, um einen Blick darauf zu werfen
Das Team hat das neue Framework ersetzt. Alle neuen Unternehmen nutzen neue Frameworks und sogar neue Datenbanken – MySQL.
Oracle wurde hier bereits verwendet. Verschiedene Bestellnummern, Seriennummern, Chargennummern usw. verwenden alle direkt die von der Oracle-Sequenz bereitgestellten digitalen Seriennummern. Nachdem die Datenbank nun auf MySQL umgestellt wurde, ist die alte Methode offensichtlich nicht mehr anwendbar.
Es muss ein neues geschrieben werden:
•Wird in verteilten Szenarien verwendet
•Erfüllt bestimmte Parallelitätsanforderungen
Ich habe nach relevanten Informationen gesucht und festgestellt, dass die Implementierung von MySQL auf einem Datenbankeintrag basiert und dessen Wert kontinuierlich aktualisiert wird. Dann verwenden die meisten Implementierungslösungen Funktionen.
Veröffentlichen Sie den Code aus dem Internet:
Implementierung der
tabellenbasiert auf MySQL-Funktionsstruktur
CREATE TABLE `t_sequence` ( `sequence_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '序列名称' , `value` int(11) NULL DEFAULT NULL COMMENT '当前值' , PRIMARY KEY (`sequence_name`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci ROW_FORMAT=COMPACT ;
Den nächsten Wert abrufen
CREATE DEFINER = `root`@`localhost` FUNCTION `nextval`(sequence_name varchar(64)) RETURNS int(11) BEGIN declare current integer; set current = 0; update t_sequence t set t.value = t.value + 1 where t.sequence_name = sequence_name; select t.value into current from t_sequence t where t.sequence_name = sequence_name; return current; end;
In gleichzeitigen Szenarien können Probleme auftreten. Obwohl das Sperren auf der Geschäftsebene erfolgen kann, kann dies in verteilten Szenarien nicht garantiert werden, und die Effizienz ist nicht hoch.
Implementieren Sie eine selbst, Java-Version
Prinzip:
•Einen Datensatz lesen, ein Datensegment zwischenspeichern, z. B.: 0-100, den aktuellen Wert ändern Datensatz von 0 auf 100 geändert
•Aktualisierung der optimistischen Sperre der Datenbank, Wiederholungsversuche möglich
•Daten aus dem Cache lesen, Datenbank nach Abschluss erneut lesen
Kein Unsinn, los geht's Code:
Basierend auf Java-Implementierung
Tabellenstruktur
Jedes Mal, wenn es aktualisiert wird, SEQ_VALUE ist auf SEQ_VALUE+STEP
CREATE TABLE `t_pub_sequence` ( `SEQ_NAME` varchar(128) CHARACTER SET utf8 NOT NULL COMMENT '序列名称', `SEQ_VALUE` bigint(20) NOT NULL COMMENT '目前序列值', `MIN_VALUE` bigint(20) NOT NULL COMMENT '最小值', `MAX_VALUE` bigint(20) NOT NULL COMMENT '最大值', `STEP` bigint(20) NOT NULL COMMENT '每次取值的数量', `TM_CREATE` datetime NOT NULL COMMENT '创建时间', `TM_SMP` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`SEQ_NAME`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流水号生成表';
Sequenzschnittstelle
/** * <p></p> * @author coderzl * @Title MysqlSequence * @Description 基于mysql数据库实现的序列 * @date 2017/6/6 23:03 */ public interface MysqlSequence { /** * <p> * 获取指定sequence的序列号 * </p> * @param seqName sequence名 * @return String 序列号 */ public String nextVal(String seqName); }
Sequenzintervall
wird verwendet, um eine Sequenz lokal zwischenzuspeichern, vom minimalen bis zum maximalen Intervall/** * <p></p> * * @author coderzl * @Title SequenceRange * @Description 序列区间,用于缓存序列 * @date 2017/6/6 22:58 */ @Data public class SequenceRange { private final long min; private final long max; /** */ private final AtomicLong value; /** 是否超限 */ private volatile boolean over = false; /** * 构造. * * @param min * @param max */ public SequenceRange(long min, long max) { this.min = min; this.max = max; this.value = new AtomicLong(min); } /** * <p>Gets and increment</p> * * @return */ public long getAndIncrement() { long currentValue = value.getAndIncrement(); if (currentValue > max) { over = true; return -1; } return currentValue; } }BO entspricht dem Datenbankeintrag
@Data public class MysqlSequenceBo { /** * seq名 */ private String seqName; /** * 当前值 */ private Long seqValue; /** * 最小值 */ private Long minValue; /** * 最大值 */ private Long maxValue; /** * 每次取值的数量 */ private Long step; /** */ private Date tmCreate; /** */ private Date tmSmp; public boolean validate(){ //一些简单的校验。如当前值必须在最大最小值之间。step值不能大于max与min的差 if (StringUtil.isBlank(seqName) || minValue < 0 || maxValue <= 0 || step <= 0 || minValue >= maxValue || maxValue - minValue <= step ||seqValue < minValue || seqValue > maxValue ) { return false; } return true; } }DAOHinzufügen, Löschen, Ändern und Überprüfen. Modifikation und Prüfung
public interface MysqlSequenceDAO { /** * */ public int createSequence(MysqlSequenceBo bo); public int updSequence(@Param("seqName") String seqName, @Param("oldValue") long oldValue ,@Param("newValue") long newValue); public int delSequence(@Param("seqName") String seqName); public MysqlSequenceBo getSequence(@Param("seqName") String seqName); public List<MysqlSequenceBo> getAll(); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="com.xxxxx.core.sequence.impl.dao.MysqlSequenceDAO" > <resultMap id="BaseResultMap" type="com.xxxxx.core.sequence.impl.MysqlSequenceBo" > <result column="SEQ_NAME" property="seqName" jdbcType="VARCHAR" /> <result column="SEQ_VALUE" property="seqValue" jdbcType="BIGINT" /> <result column="MIN_VALUE" property="minValue" jdbcType="BIGINT" /> <result column="MAX_VALUE" property="maxValue" jdbcType="BIGINT" /> <result column="STEP" property="step" jdbcType="BIGINT" /> <result column="TM_CREATE" property="tmCreate" jdbcType="TIMESTAMP" /> <result column="TM_SMP" property="tmSmp" jdbcType="TIMESTAMP" /> </resultMap> <delete id="delSequence" parameterType="java.lang.String" > delete from t_pub_sequence where SEQ_NAME = #{seqName,jdbcType=VARCHAR} </delete> <insert id="createSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" > insert into t_pub_sequence (SEQ_NAME,SEQ_VALUE,MIN_VALUE,MAX_VALUE,STEP,TM_CREATE) values (#{seqName,jdbcType=VARCHAR}, #{seqValue,jdbcType=BIGINT}, #{minValue,jdbcType=BIGINT}, #{maxValue,jdbcType=BIGINT}, #{step,jdbcType=BIGINT}, now()) </insert> <update id="updSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" > update t_pub_sequence set SEQ_VALUE = #{newValue,jdbcType=BIGINT} where SEQ_NAME = #{seqName,jdbcType=VARCHAR} and SEQ_VALUE = #{oldValue,jdbcType=BIGINT} </update> <select id="getAll" resultMap="BaseResultMap" > select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP from t_pub_sequence </select> <select id="getSequence" resultMap="BaseResultMap" > select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP from t_pub_sequence where SEQ_NAME = #{seqName,jdbcType=VARCHAR} </select> </mapper>
@Repository("mysqlSequence") public class MysqlSequenceImpl implements MysqlSequence{ @Autowired private MysqlSequenceFactory mysqlSequenceFactory; /** * <p> * 获取指定sequence的序列号 * </p> * * @param seqName sequence名 * @return String 序列号 * @author coderzl */ @Override public String nextVal(String seqName) { return Objects.toString(mysqlSequenceFactory.getNextVal(seqName)); } }
Die Factory macht nur zwei Dinge
•Wenn der Dienst startet, initialisieren Sie alle Sequenzen in der Datenbank [Sequenzintervall-Cache vervollständigen]
•Erhalten Sie den nächsten Wert der Sequenz
@Component public class MysqlSequenceFactory { private final Lock lock = new ReentrantLock(); /** */ private Map<String,MysqlSequenceHolder> holderMap = new ConcurrentHashMap<>(); @Autowired private MysqlSequenceDAO msqlSequenceDAO; /** 单个sequence初始化乐观锁更新失败重试次数 */ @Value("${seq.init.retry:5}") private int initRetryNum; /** 单个sequence更新序列区间乐观锁更新失败重试次数 */ @Value("${seq.get.retry:20}") private int getRetryNum; @PostConstruct private void init(){ //初始化所有sequence initAll(); } /** * <p> 加载表中所有sequence,完成初始化 </p> * @return void * @author coderzl */ private void initAll(){ try { lock.lock(); List<MysqlSequenceBo> boList = msqlSequenceDAO.getAll(); if (boList == null) { throw new IllegalArgumentException("The sequenceRecord is null!"); } for (MysqlSequenceBo bo : boList) { MysqlSequenceHolder holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum); holder.init(); holderMap.put(bo.getSeqName(), holder); } }finally { lock.unlock(); } } /** * <p> </p> * @param seqName * @return long * @author coderzl */ public long getNextVal(String seqName){ MysqlSequenceHolder holder = holderMap.get(seqName); if (holder == null) { try { lock.lock(); holder = holderMap.get(seqName); if (holder != null){ return holder.getNextVal(); } MysqlSequenceBo bo = msqlSequenceDAO.getSequence(seqName); holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum); holder.init(); holderMap.put(seqName, holder); }finally { lock.unlock(); } } return holder.getNextVal(); } }
•init()-Initialisierung, einschließlich Parameterüberprüfung , Datenbankdatensatzaktualisierung und Erstellung eines Sequenzintervalls
•getNextVal() ruft den nächsten Wert ab
public class MysqlSequenceHolder { private final Lock lock = new ReentrantLock(); /** seqName */ private String seqName; /** sequenceDao */ private MysqlSequenceDAO sequenceDAO; private MysqlSequenceBo sequenceBo; /** */ private SequenceRange sequenceRange; /** 是否初始化 */ private volatile boolean isInitialize = false; /** sequence初始化重试次数 */ private int initRetryNum; /** sequence获取重试次数 */ private int getRetryNum; /** * <p> 构造方法 </p> * @Title MysqlSequenceHolder * @param sequenceDAO * @param sequenceBo * @param initRetryNum 初始化时,数据库更新失败后重试次数 * @param getRetryNum 获取nextVal时,数据库更新失败后重试次数 * @return * @author coderzl */ public MysqlSequenceHolder(MysqlSequenceDAO sequenceDAO, MysqlSequenceBo sequenceBo,int initRetryNum,int getRetryNum) { this.sequenceDAO = sequenceDAO; this.sequenceBo = sequenceBo; this.initRetryNum = initRetryNum; this.getRetryNum = getRetryNum; if(sequenceBo != null) this.seqName = sequenceBo.getSeqName(); } /** * <p> 初始化 </p> * @Title init * @param * @return void * @author coderzl */ public void init(){ if (isInitialize == true) { throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder has inited"); } if (sequenceDAO == null) { throw new SequenceException("[" + seqName + "] the sequenceDao is null"); } if (seqName == null || seqName.trim().length() == 0) { throw new SequenceException("[" + seqName + "] the sequenceName is null"); } if (sequenceBo == null) { throw new SequenceException("[" + seqName + "] the sequenceBo is null"); } if (!sequenceBo.validate()){ throw new SequenceException("[" + seqName + "] the sequenceBo validate fail. BO:"+sequenceBo); } // 初始化该sequence try { initSequenceRecord(sequenceBo); } catch (SequenceException e) { throw e; } isInitialize = true; } /** * <p> 获取下一个序列号 </p> * @Title getNextVal * @param * @return long * @author coderzl */ public long getNextVal(){ if(isInitialize == false){ throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder not inited"); } if(sequenceRange == null){ throw new SequenceException("[" + seqName + "] the sequenceRange is null"); } long curValue = sequenceRange.getAndIncrement(); if(curValue == -1){ try{ lock.lock(); curValue = sequenceRange.getAndIncrement(); if(curValue != -1){ return curValue; } sequenceRange = retryRange(); curValue = sequenceRange.getAndIncrement(); }finally { lock.unlock(); } } return curValue; } /** * <p> 初始化当前这条记录 </p> * @Title initSequenceRecord * @Description * @param sequenceBo * @return void * @author coderzl */ private void initSequenceRecord(MysqlSequenceBo sequenceBo){ //在限定次数内,乐观锁更新数据库记录 for(int i = 1; i < initRetryNum; i++){ //查询bo MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName()); if(curBo == null){ throw new SequenceException("[" + seqName + "] the current sequenceBo is null"); } if (!curBo.validate()){ throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail"); } //改变当前值 long newValue = curBo.getSeqValue()+curBo.getStep(); //检查当前值 if(!checkCurrentValue(newValue,curBo)){ newValue = resetCurrentValue(curBo); } int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue); if(result > 0){ sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1); curBo.setSeqValue(newValue); this.sequenceBo = curBo; return; }else{ continue; } } //限定次数内,更新失败,抛出异常 throw new SequenceException("[" + seqName + "] sequenceBo update error"); } /** * <p> 检查新值是否合法 新的当前值是否在最大最小值之间</p> * @param curValue * @param curBo * @return boolean * @author coderzl */ private boolean checkCurrentValue(long curValue,MysqlSequenceBo curBo){ if(curValue > curBo.getMinValue() && curValue <= curBo.getMaxValue()){ return true; } return false; } /** * <p> 重置sequence当前值 :当前sequence达到最大值时,重新从最小值开始 </p> * @Title resetCurrentValue * @param curBo * @return long * @author coderzl */ private long resetCurrentValue(MysqlSequenceBo curBo){ return curBo.getMinValue(); } /** * <p> 缓存区间使用完毕时,重新读取数据库记录,缓存新序列段 </p> * @Title retryRange * @param SequenceRange * @author coderzl */ private SequenceRange retryRange(){ for(int i = 1; i < getRetryNum; i++){ //查询bo MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName()); if(curBo == null){ throw new SequenceException("[" + seqName + "] the current sequenceBo is null"); } if (!curBo.validate()){ throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail"); } //改变当前值 long newValue = curBo.getSeqValue()+curBo.getStep(); //检查当前值 if(!checkCurrentValue(newValue,curBo)){ newValue = resetCurrentValue(curBo); } int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue); if(result > 0){ sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1); curBo.setSeqValue(newValue); this.sequenceBo = curBo; return sequenceRange; }else{ continue; } } throw new SequenceException("[" + seqName + "] sequenceBo update error"); } }
• Wenn der Dienst neu gestartet wird oder abnormal ist, geht der aktuelle Wert verloren. Die vom Dienst zwischengespeicherte ungenutzte Sequenz
• In verteilten Szenarien ist es optimistisch, wenn mehrere Dienste gleichzeitig initialisiert werden oder wenn die Sequenz erneut abgerufen wird Durch die Sperrung wird sichergestellt, dass sie nicht miteinander in Konflikt geraten. Dienst A erhält 0–99, Dienst B 100–199 usw.
•Wenn die Sequenz häufiger abgerufen wird, kann eine Erhöhung des Schrittwerts die Leistung verbessern. Wenn der Dienst jedoch abnormal ist, gehen gleichzeitig mehr Sequenzen verloren.
• Ändern Sie einige Attributwerte der Sequenz in der Datenbank, z. B. Schritt, Max usw., und die neuen Parameter werden aktualisiert wird beim nächsten Abrufen aus der Datenbank aktiviert
•sequence stellt nur eine begrenzte Anzahl von Sequenznummern bereit (höchstens Max-Min). Nach Erreichen des Maximums beginnt der Zyklus von vorne.
•Da die Sequenz nach Erreichen des Maximums in einer Schleife läuft, ist sie nicht eindeutig, wenn sie erneut abgerufen wird. Es wird empfohlen, die Reihenfolge für geschäftliche Seriennummern und Verbindungszeiten zu verwenden. Zum Beispiel: 20170612235101+Seriennummer
Unternehmens-ID-Spleißmethode
Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung der Beispiele für Sequenzimplementierungsmethoden in MySQL. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!