ホームページ >バックエンド開発 >Golang >Zensearch の機能を拡張してデータベース全体をクエリする

Zensearch の機能を拡張してデータベース全体をクエリする

Linda Hamilton
Linda Hamiltonオリジナル
2024-11-14 12:08:02409ブラウズ

Scaling Zensearch

これまでは、データベースが RabbitMQ のメッセージ キューが保持できる容量を超えるまでは、問題なく検索エンジンの Web ページをクロールしてインデックスを付けることができました。メッセージ キュー内のメッセージがデフォルトのサイズを超えると、RabbitMQ はエラーをスローしてパニックになります。デフォルトのサイズを変更することもできますが、データベースが大きくなると拡張できなくなります。そのため、ユーザーが心配することなく Web ページをクロールできるようにするためです。メッセージ ブローカーがクラッシュしています。

セグメントの作成

セグメントを作成するときに TCP と同じ原則に基づいて、最大セグメント サイズまたは MSS を使用してセグメントを作成する関数を実装しました。セグメントには 8 バイトのヘッダーが含まれており、8 バイトのヘッダーの各 4 バイトはシーケンス番号であり、合計セグメント数、本体の残りの部分はセグメント化されたデータベースのペイロードです。

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

受信セグメントの解析

大規模なデータセットの小さなセグメントを作成するこの方法は、データベースが拡大した場合でもデータベース クエリを拡張するのに役立ちます。

では、検索エンジンはどのようにしてバッファを解析し、各セグメントを Web ページの配列に変換するのでしょうか?

セグメントバッファからの読み取り

ヘッダーにはシーケンス番号と合計セグメントという 2 つのプロパティが含まれているため、最初にセグメント ヘッダーを抽出します。

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

セグメントの再送信と再キューイングの処理

シーケンス番号はセグメントの再送信/再キューイングに使用されるため、予期したシーケンス番号が受信したものではない場合は、現在のセグメントから始まるすべてのセグメントを再キューイングします。

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

セグメントペイロードの追加

検索エンジンが受信したセグメントの合計数がデータベース サービスによって送信されるセグメントの合計の長さと等しい場合、セグメントの合計はプロデューサー (データベース サービス) のリスニングを中断するために使用されます。次に、集約されたセグメント バッファを分割して解析し、そうでない場合はリスニングを続け、セグメント ペイロード バッファを Web ページ バッファに追加して、すべての受信セグメントからのバイトを保持します。

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

ところで vim を使っています

私の ted トークにお越しいただきありがとうございます。zensearch にはさらに多くの機能と修正を実装する予定です。

以上がZensearch の機能を拡張してデータベース全体をクエリするの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。