Maison  >  Article  >  développement back-end  >  Mise à l'échelle des capacités de Zensearch pour interroger l'ensemble de la base de données

Mise à l'échelle des capacités de Zensearch pour interroger l'ensemble de la base de données

Linda Hamilton
Linda Hamiltonoriginal
2024-11-14 12:08:02336parcourir

Scaling Zensearch

Auparavant, j'étais capable d'explorer et d'indexer des pages Web pour mon moteur de recherche sans problème, jusqu'à ce que ma base de données grossisse plus que ce que la file d'attente de messages de RabbitMQ était capable de contenir. Si un message dans une file d'attente de messages dépasse sa taille par défaut, RabbitMQ générera une erreur et paniquera. Je pourrais modifier la taille par défaut mais cela ne s'adapterait pas si ma base de données grandissait, donc pour que les utilisateurs puissent explorer les pages Web sans avoir à s'inquiéter. le courtier de messages plante.

Création de segments

J'ai implémenté une fonction pour créer des segments avec une taille de segment maximale ou MSS à partir des mêmes principes de TCP lors de la création de segments, le segment contient un en-tête de 8 octets où chaque 4 octets de l'en-tête de 8 octets est le numéro de séquence et le nombre total de segments, et le reste du corps est la charge utile de la base de données segmentée.

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

Analyser les segments entrants

Cette méthode de création de petits segments d'un grand ensemble de données permettrait de faire évoluer la requête de base de données même si la base de données se développe.

Maintenant, comment le moteur de recherche analyse-t-il le tampon et transforme chaque segment en un tableau de page Web ?

Lecture à partir de tampons de segments

Extrayez d'abord l'en-tête du segment, car l'en-tête contient 2 propriétés, à savoir le numéro de séquence et le nombre total de segments,

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

Gestion de la retransmission et de la remise en file d'attente des segments

Le numéro de séquence sera utilisé pour la retransmission/mise en file d'attente des segments, donc si le numéro de séquence attendu n'est pas celui qui a été reçu, remettez chaque segment en file d'attente en commençant par celui en cours.

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

Ajout de charges utiles de segment

Le segment total sera utilisé pour sortir de l'écoute du producteur (service de base de données) si le nombre total de segments reçus par le moteur de recherche est égal à la longueur du total des segments qui doit être envoyé par le service de base de données puis décomposez et analysez le tampon de segment agrégé, sinon continuez à écouter et ajoutez le tampon de charge utile du segment à un tampon de page Web pour contenir les octets de tous les segments entrants.

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

J'utilise vim d'ailleurs

Merci d'être venu à ma conférence Ted, je vais implémenter plus de fonctionnalités et de correctifs pour zensearch.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn