Heim >Backend-Entwicklung >Golang >Skalierung der Funktionen von Zensearch, um die gesamte Datenbank abzufragen

Skalierung der Funktionen von Zensearch, um die gesamte Datenbank abzufragen

Linda Hamilton
Linda HamiltonOriginal
2024-11-14 12:08:02425Durchsuche

Scaling Zensearch

Zuvor konnte ich problemlos Webseiten für meine Suchmaschine crawlen und indizieren, bis meine Datenbank stärker wuchs, als die Nachrichtenwarteschlange von RabbitMQ aufnehmen konnte. Wenn eine Nachricht in einer Nachrichtenwarteschlange ihre Standardgröße überschreitet, gibt RabbitMQ einen Fehler aus und gerät in Panik. Ich könnte die Standardgröße ändern, aber das würde sich nicht skalieren, wenn meine Datenbank wächst, damit Benutzer Webseiten ohne Bedenken crawlen können Der Nachrichtenbroker stürzt ab.

Segmente erstellen

Ich habe eine Funktion zum Erstellen von Segmenten mit einer maximalen Segmentgröße oder MSS nach den gleichen Prinzipien von TCP implementiert. Beim Erstellen von Segmenten enthält das Segment einen 8-Byte-Header, wobei jedes 4 Byte des 8-Byte-Headers die Sequenznummer ist und die Gesamtsegmentanzahl und der Rest des Körpers ist die Nutzlast der segmentierten Datenbank.

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

Analysieren eingehender Segmente

Diese Methode zum Erstellen kleiner Segmente eines großen Datensatzes würde dazu beitragen, die Datenbankabfrage zu skalieren, selbst wenn die Datenbank wächst.

Wie analysiert nun die Suchmaschine den Puffer und wandelt jedes Segment in ein Webseiten-Array um?

Lesen aus Segmentpuffern

Extrahieren Sie zunächst den Segment-Header, da der Header zwei Eigenschaften enthält, nämlich Sequenznummer und Gesamtsegmente,

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

Abwicklung der erneuten Übertragung und erneuten Warteschlangeneinteilung von Segmenten

Die Sequenznummer wird für die Neuübertragung/Neueinreihung der Segmente verwendet. Wenn also die erwartete Sequenznummer nicht mit der empfangenen übereinstimmt, stellen Sie jedes Segment, beginnend mit dem aktuellen, erneut in die Warteschlange.

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

Anhängen von Segmentnutzlasten

Das Gesamtsegment wird zum Ausbrechen des Abhörens des Produzenten (Datenbankdienst) verwendet, wenn die Gesamtzahl der von der Suchmaschine empfangenen Segmente gleich der Länge der Gesamtsegmente ist, die vom Datenbankdienst gesendet werden sollen Brechen Sie dann den aggregierten Segmentpuffer aus und analysieren Sie ihn. Wenn nicht, hören Sie weiter zu und hängen Sie den Segment-Nutzlastpuffer an einen Webseitenpuffer an, um Bytes aus allen eingehenden Segmenten zu speichern.

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

Ich verwende übrigens vim

Vielen Dank, dass Sie zu meinem Ted-Vortrag gekommen sind. Ich werde weitere Funktionen und Korrekturen für Zensearch implementieren.

Das obige ist der detaillierte Inhalt vonSkalierung der Funktionen von Zensearch, um die gesamte Datenbank abzufragen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn