Maison >développement back-end >Golang >Mise à l'échelle des capacités de Zensearch pour interroger l'ensemble de la base de données
Auparavant, j'étais capable d'explorer et d'indexer des pages Web pour mon moteur de recherche sans problème, jusqu'à ce que ma base de données grossisse plus que ce que la file d'attente de messages de RabbitMQ était capable de contenir. Si un message dans une file d'attente de messages dépasse sa taille par défaut, RabbitMQ générera une erreur et paniquera. Je pourrais modifier la taille par défaut mais cela ne s'adapterait pas si ma base de données grandissait, donc pour que les utilisateurs puissent explorer les pages Web sans avoir à s'inquiéter. le courtier de messages plante.
J'ai implémenté une fonction pour créer des segments avec une taille de segment maximale ou MSS à partir des mêmes principes de TCP lors de la création de segments, le segment contient un en-tête de 8 octets où chaque 4 octets de l'en-tête de 8 octets est le numéro de séquence et le nombre total de segments, et le reste du corps est la charge utile de la base de données segmentée.
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
Cette méthode de création de petits segments d'un grand ensemble de données permettrait de faire évoluer la requête de base de données même si la base de données se développe.
Maintenant, comment le moteur de recherche analyse-t-il le tampon et transforme chaque segment en un tableau de page Web ?
Extrayez d'abord l'en-tête du segment, car l'en-tête contient 2 propriétés, à savoir le numéro de séquence et le nombre total de segments,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
Le numéro de séquence sera utilisé pour la retransmission/mise en file d'attente des segments, donc si le numéro de séquence attendu n'est pas celui qui a été reçu, remettez chaque segment en file d'attente en commençant par celui en cours.
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
Le segment total sera utilisé pour sortir de l'écoute du producteur (service de base de données) si le nombre total de segments reçus par le moteur de recherche est égal à la longueur du total des segments qui doit être envoyé par le service de base de données puis décomposez et analysez le tampon de segment agrégé, sinon continuez à écouter et ajoutez le tampon de charge utile du segment à un tampon de page Web pour contenir les octets de tous les segments entrants.
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
J'utilise vim d'ailleurs
Merci d'être venu à ma conférence Ted, je vais implémenter plus de fonctionnalités et de correctifs pour zensearch.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!