首頁 >後端開發 >php教程 >Twitter的分散式自增ID演算法snowflake

Twitter的分散式自增ID演算法snowflake

伊谢尔伦
伊谢尔伦原創
2017-01-24 14:28:462193瀏覽

Twitter-Snowflake演算法產生的背景相當簡單,為了滿足Twitter每秒上萬條訊息的請求,每個訊息都必須分配一條唯一的id,這些id還需要一些大致的順序(方便客戶端排序),並且在分散式系統中不同機器所產生的id必須不同。

Snowflake演算法核心

把時間戳,工作機器id,序號組合在一起。

Twitter的分散式自增ID演算法snowflake

除了最高位bit標示為不可用以外,其餘三組bit佔位均可浮動,看具體的業務需求而定。預設41bit的時間戳可以支援演算法使用到2082年,10bit的工作機器id可以支援1023台機器,序號支援1毫秒產生4095個自增序列id。下文會具體分析。


Snowflake – 時間戳

這裡時間戳的細度是毫秒級,具體代碼如下,建議使用64位linux系統機器,因為有vdso,gettimeofday()在用戶態就可以完成操作,減少了進入內核態的損耗。

uint64_t generateStamp()
{
    timeval tv;
    gettimeofday(&tv, 0);
    return (uint64_t)tv.tv_sec * 1000 + (uint64_t)tv.tv_usec / 1000;
}

預設有41個bit可以供使用,那麼一共有T(1llu

Snowflake – 工作機器id

嚴格意義上來說這個bit段的使用可以是進程級,機器級的話你可以使用MAC位址來唯一標示工作機器,工作進程級可以使用IP+Path來區分工作進程。如果工作機器比較少,可以使用設定檔來設定這個id是一個不錯的選擇,如果機器過多設定檔的維護是一個災難性的事情。

這裡的解決方案是需要一個工作id分配的進程,可以使用自己寫一個簡單進程來記錄分配id,或者利用Mysql auto_increment機制也可以達到效果。

Twitter的分散式自增ID演算法snowflake

工作進程與工作id分配器只是在工作進程啟動的時候交互一次,然後工作進程可以自行將分配的id資料落文件,下一次啟動直接讀取文件裡的id使用。

PS:這個工作機器id的bit段也可以進一步拆分,例如用前5個bit標記進程id,後5個bit標記線程id之類:D

Snowflake – 序號

序號就是一系列的自增id(多線程建議使用atomic),為了處理在同一毫秒內需要給多個訊息分配id,若同一毫秒把序號用完了,則「等待至下一毫秒」。

uint64_t waitNextMs(uint64_t lastStamp)
{
    uint64_t cur = 0;
    do {
        cur = generateStamp();
    } while (cur <= lastStamp);
    return cur;
}

整體來說,是一個很高效很方便的GUID產生演算法,一個int64_t字段就可以勝任,不像現在主流128bit的GUID演算法,即使無法保證嚴格的id序列性,但是對於特定的業務,例如用做遊戲伺服器端的GUID產生會很方便。另外,在多線程的環境下,序號使用atomic可以在程式碼實現上有效減少鎖的密度。

在分散式系統中,需要產生全域UID的場合還是比較多的,twitter的snowflake解決了這種需求,實現也還是很簡單的,除去配置信息,核心代碼就是毫秒級時間41位+機器ID 10位元+毫秒內序列12位元。

核心程式碼為其IdWorker這個類別實現,其原理結構如下,我分別用一個0表示一位,用—分割開部分的作用:

0---0000000000 0000000000 0000000000 0000000000 0 --- 00000 ---00000 ---000000000000

在上面的字符串中,第一位为未使用(实际上也可作为long的符号位),接下来的41位为毫秒级时间,然后5位datacenter标识位,5位机器ID(并不算标识符,实际是为线程标识),然后12位该毫秒内的当前毫秒内的计数,加起来刚好64位,为一个Long型。

这样的好处是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由datacenter和机器ID作区分),并且效率较高,经测试,snowflake每秒能够产生26万ID左右,完全满足需要。

且看其核心代码:

/** Copyright 2010-2012 Twitter, Inc.*/
package com.twitter.service.snowflake

import com.twitter.ostrich.stats.Stats
import com.twitter.service.snowflake.gen._
import java.util.Random
import com.twitter.logging.Logger

/**
 * An object that generates IDs.
 * This is broken into a separate class in case
 * we ever want to support multiple worker threads
 * per process
 */
class IdWorker(val workerId: Long, val datacenterId: Long, private val reporter: Reporter, var sequence: Long = 0L)
extends Snowflake.Iface {
  private[this] def genCounter(agent: String) = {
    Stats.incr("ids_generated")
    Stats.incr("ids_generated_%s".format(agent))
  }
  private[this] val exceptionCounter = Stats.getCounter("exceptions")
  private[this] val log = Logger.get
  private[this] val rand = new Random

  val twepoch = 1288834974657L

 //机器标识位数

  private[this] val workerIdBits = 5L

//数据中心标识位数
  private[this] val datacenterIdBits = 5L

//机器ID最大值
  private[this] val maxWorkerId = -1L ^ (-1L << workerIdBits)

//数据中心ID最大值
  private[this] val maxDatacenterId = -1L ^ (-1L << datacenterIdBits)

//毫秒内自增位
  private[this] val sequenceBits = 12L

//机器ID偏左移12位

  private[this] val workerIdShift = sequenceBits

//数据中心ID左移17位
  private[this] val datacenterIdShift = sequenceBits + workerIdBits

//时间毫秒左移22位
  private[this] val timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits
  private[this] val sequenceMask = -1L ^ (-1L << sequenceBits)

  private[this] var lastTimestamp = -1L

  // sanity check for workerId
  if (workerId > maxWorkerId || workerId < 0) {
    exceptionCounter.incr(1)
    throw new IllegalArgumentException("worker Id can&#39;t be greater than %d or less than 0".format(maxWorkerId))
  }

  if (datacenterId > maxDatacenterId || datacenterId < 0) {
    exceptionCounter.incr(1)
    throw new IllegalArgumentException("datacenter Id can&#39;t be greater than %d or less than 0".format(maxDatacenterId))
  }

  log.info("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d",
    timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId)

  def get_id(useragent: String): Long = {
    if (!validUseragent(useragent)) {
      exceptionCounter.incr(1)
      throw new InvalidUserAgentError
    }

    val id = nextId()
    genCounter(useragent)

    reporter.report(new AuditLogEntry(id, useragent, rand.nextLong))
    id
  }

  def get_worker_id(): Long = workerId
  def get_datacenter_id(): Long = datacenterId
  def get_timestamp() = System.currentTimeMillis

  protected[snowflake] def nextId(): Long = synchronized {
    var timestamp = timeGen()

 //时间错误

    if (timestamp < lastTimestamp) {
      exceptionCounter.incr(1)
      log.error("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp);
      throw new InvalidSystemClock("Clock moved backwards.  Refusing to generate id for %d milliseconds".format(
        lastTimestamp - timestamp))
    }

    if (lastTimestamp == timestamp) {
//当前毫秒内,则+1
      sequence = (sequence + 1) & sequenceMask
      if (sequence == 0) {
//当前毫秒内计数满了,则等待下一秒
        timestamp = tilNextMillis(lastTimestamp)
      }
    } else {
      sequence = 0
    }

    lastTimestamp = timestamp
//ID偏移组合生成最终的ID,并返回ID   

((timestamp - twepoch) << timestampLeftShift) |
      (datacenterId << datacenterIdShift) |
      (workerId << workerIdShift) |
      sequence
  }

//等待下一个毫秒的到来 

protected def tilNextMillis(lastTimestamp: Long): Long = {
    var timestamp = timeGen()
    while (timestamp <= lastTimestamp) {
      timestamp = timeGen()
    }
    timestamp
  }

  protected def timeGen(): Long = System.currentTimeMillis()

  val AgentParser = """([a-zA-Z][a-zA-Z\-0-9]*)""".r

  def validUseragent(useragent: String): Boolean = useragent match {
    case AgentParser(_) => true
    case _ => false
  }
}

上述为twitter的实现,下面且看一个Java实现,貌似为淘宝的朋友写的。

public class IdWorker {
 private final long workerId;
 private final static long twepoch = 1361753741828L;
 private long sequence = 0L;
 private final static long workerIdBits = 4L;
 public final static long maxWorkerId = -1L ^ -1L << workerIdBits;
 private final static long sequenceBits = 10L;
 private final static long workerIdShift = sequenceBits;
 private final static long timestampLeftShift = sequenceBits + workerIdBits;
 public final static long sequenceMask = -1L ^ -1L << sequenceBits;
 private long lastTimestamp = -1L;
 public IdWorker(final long workerId) {
  super();
  if (workerId > this.maxWorkerId || workerId < 0) {
   throw new IllegalArgumentException(String.format(
     "worker Id can&#39;t be greater than %d or less than 0",
     this.maxWorkerId));
  }
  this.workerId = workerId;
 }
 public synchronized long nextId() {
  long timestamp = this.timeGen();
  if (this.lastTimestamp == timestamp) {
   this.sequence = (this.sequence + 1) & this.sequenceMask;
   if (this.sequence == 0) {
    System.out.println("###########" + sequenceMask);
    timestamp = this.tilNextMillis(this.lastTimestamp);
   }
  } else {
   this.sequence = 0;
  }
  if (timestamp < this.lastTimestamp) {
   try {
    throw new Exception(
      String.format(
        "Clock moved backwards.  Refusing to generate id for %d milliseconds",
        this.lastTimestamp - timestamp));
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
  this.lastTimestamp = timestamp;
  long nextId = ((timestamp - twepoch << timestampLeftShift))
    | (this.workerId << this.workerIdShift) | (this.sequence);
//  System.out.println("timestamp:" + timestamp + ",timestampLeftShift:"
//    + timestampLeftShift + ",nextId:" + nextId + ",workerId:"
//    + workerId + ",sequence:" + sequence);
  return nextId;
 }
 private long tilNextMillis(final long lastTimestamp) {
  long timestamp = this.timeGen();
  while (timestamp <= lastTimestamp) {
   timestamp = this.timeGen();
  }
  return timestamp;
 }
 private long timeGen() {
  return System.currentTimeMillis();
 }
 
 
 public static void main(String[] args){
  IdWorker worker2 = new IdWorker(2);
  System.out.println(worker2.nextId());
  
 }
}

再来看一个php的实现

<?php
class Idwork
{
const debug = 1;
static $workerId;
static $twepoch = 1361775855078;
static $sequence = 0;
const workerIdBits = 4;
static $maxWorkerId = 15;
const sequenceBits = 10;
static $workerIdShift = 10;
static $timestampLeftShift = 14;
static $sequenceMask = 1023;
private  static $lastTimestamp = -1;
function __construct($workId){
if($workId > self::$maxWorkerId || $workId< 0 )
{
throw new Exception("worker Id can&#39;t be greater than 15 or less than 0");
}
self::$workerId=$workId;
echo &#39;logdebug->__construct()->self::$workerId:&#39;.self::$workerId;
echo &#39;</br>&#39;;
}
function timeGen(){
//获得当前时间戳
$time = explode(&#39; &#39;, microtime());
$time2= substr($time[0], 2, 3);
$timestramp = $time[1].$time2;
echo &#39;logdebug->timeGen()->$timestramp:&#39;.$time[1].$time2;
echo &#39;</br>&#39;;
return  $time[1].$time2;
}
function  tilNextMillis($lastTimestamp) {
$timestamp = $this->timeGen();
while ($timestamp <= $lastTimestamp) {
$timestamp = $this->timeGen();
}
echo &#39;logdebug->tilNextMillis()->$timestamp:&#39;.$timestamp;
echo &#39;</br>&#39;;
return $timestamp;
}
function  nextId()
{
$timestamp=$this->timeGen();
echo &#39;logdebug->nextId()->self::$lastTimestamp1:&#39;.self::$lastTimestamp;
echo &#39;</br>&#39;;
if(self::$lastTimestamp == $timestamp) {
self::$sequence = (self::$sequence + 1) & self::$sequenceMask;
if (self::$sequence == 0) {
    echo "###########".self::$sequenceMask;
    $timestamp = $this->tilNextMillis(self::$lastTimestamp);
    echo &#39;logdebug->nextId()->self::$lastTimestamp2:&#39;.self::$lastTimestamp;
    echo &#39;</br>&#39;;
  }
} else {
self::$sequence  = 0;
    echo &#39;logdebug->nextId()->self::$sequence:&#39;.self::$sequence;
    echo &#39;</br>&#39;;
}
if ($timestamp < self::$lastTimestamp) {
   throw new Excwption("Clock moved backwards.  Refusing to generate id for ".(self::$lastTimestamp-$timestamp)." milliseconds");
   }
self::$lastTimestamp  = $timestamp;
echo &#39;logdebug->nextId()->self::$lastTimestamp3:&#39;.self::$lastTimestamp;
echo &#39;</br>&#39;;
echo &#39;logdebug->nextId()->(($timestamp - self::$twepoch << self::$timestampLeftShift )):&#39;.((sprintf(&#39;%.0f&#39;, $timestamp) - sprintf(&#39;%.0f&#39;, self::$twepoch) ));
echo &#39;</br>&#39;;
$nextId = ((sprintf(&#39;%.0f&#39;, $timestamp) - sprintf(&#39;%.0f&#39;, self::$twepoch)  )) | ( self::$workerId << self::$workerIdShift ) | self::$sequence;
echo &#39;timestamp:&#39;.$timestamp.&#39;-----&#39;;
echo &#39;twepoch:&#39;.sprintf(&#39;%.0f&#39;, self::$twepoch).&#39;-----&#39;;
echo &#39;timestampLeftShift =&#39;.self::$timestampLeftShift.&#39;-----&#39;;
echo &#39;nextId:&#39;.$nextId.&#39;----&#39;;
echo &#39;workId:&#39;.self::$workerId.&#39;-----&#39;;
echo &#39;workerIdShift:&#39;.self::$workerIdShift.&#39;-----&#39;;
return $nextId;
}
}
$Idwork = new Idwork(1);
$a= $Idwork->nextId();
$Idwork = new Idwork(2);
$a= $Idwork->nextId();
?>
陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn