首頁 >後端開發 >Golang >擴展 Zensearch 的能力來查詢整個資料庫

擴展 Zensearch 的能力來查詢整個資料庫

Linda Hamilton
Linda Hamilton原創
2024-11-14 12:08:02404瀏覽

Scaling Zensearch

以前,我能夠毫無問題地為我的搜尋引擎抓取網頁並為其建立索引,直到我的資料庫增長超過 RabbitMQ 訊息佇列的容納能力。如果訊息佇列中的訊息超過其預設大小,RabbitMQ 會拋出錯誤並引發恐慌,我可以更改預設大小,但如果我的資料庫成長,則不會擴展,因此為了讓使用者抓取網頁而不必擔心訊息代理崩潰了。

建立段

我已經實現了一個函數,用於創建具有最大段大小或MSS 的段,其原理與創建段時TCP 的原理相同,該段包含一個8 字節標頭,其中8 字節標頭中的每個4 位元組是序號,分段總數,主體的其餘部分是分段資料庫的有效負載。

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

解析傳入的段

這種建立大型資料集的小片段的方法將有助於擴展資料庫查詢,即使資料庫成長也是如此。

現在搜尋引擎如何解析緩衝區並將每個段轉換為網頁數組?

從段緩衝區讀取

先擷取段標頭,因為標頭包含 2 個屬性,即序號和總段數,

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

處理段的重傳與重新排隊

序號將用於分段的重傳/重新排隊,因此如果預期的序號不是收到的序號,則從目前分段開始重新排隊每個分段。

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

附加段有效負載

如果搜尋引擎接收到的段總數等於資料庫服務要傳送的總段長度,則總段將用於中斷監聽生產者(資料庫服務)然後中斷並解析聚合的段緩衝區,如果不是,則繼續偵聽並將段有效負載緩衝區附加到網頁緩衝區以保存來自所有傳入段的位元組。

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

順便說一句,我用 vim

感謝您參加我的 ted 演講,我將為 zensearch 實現更多功能和修復。

以上是擴展 Zensearch 的能力來查詢整個資料庫的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn