이전에는 RabbitMQ의 메시지 대기열이 감당할 수 있는 것보다 데이터베이스가 더 커질 때까지 문제 없이 검색 엔진의 웹 페이지를 크롤링하고 색인을 생성할 수 있었습니다. 메시지 큐의 메시지가 기본 크기를 초과하면 RabbitMQ는 오류와 패닉을 발생시킵니다. 기본 크기를 변경할 수 있지만 데이터베이스가 커지면 크기가 조정되지 않으므로 사용자가 걱정할 필요 없이 웹 페이지를 크롤링할 수 있습니다. 메시지 브로커가 충돌합니다.
세그먼트를 생성할 때 TCP와 동일한 원리로 최대 세그먼트 크기 또는 MSS로 세그먼트를 생성하는 기능을 구현했습니다. 세그먼트에는 8바이트 헤더의 각 4바이트가 시퀀스 번호이고, 전체 세그먼트 수이고 본문의 나머지 부분은 세그먼트화된 데이터베이스의 페이로드입니다.
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
대규모 데이터 세트의 작은 세그먼트를 생성하는 이 방법은 데이터베이스가 커지더라도 데이터베이스 쿼리를 확장하는 데 도움이 됩니다.
이제 검색 엔진은 어떻게 버퍼를 구문 분석하고 각 세그먼트를 웹 페이지 배열로 변환합니까?
헤더에는 시퀀스 번호와 총 세그먼트라는 두 가지 속성이 포함되어 있으므로 먼저 세그먼트 헤더를 추출합니다.
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
시퀀스 번호는 세그먼트 재전송/재큐잉에 사용됩니다. 따라서 예상되는 시퀀스 번호가 수신된 번호가 아닌 경우 현재 세그먼트부터 시작하여 모든 세그먼트를 다시 대기열에 추가하세요.
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
검색 엔진이 수신한 총 세그먼트 수가 데이터베이스 서비스에서 전송하려는 총 세그먼트의 길이와 동일한 경우 총 세그먼트는 생산자(데이터베이스 서비스)의 청취를 중단하는 데 사용됩니다. 그런 다음 집계된 세그먼트 버퍼를 분리하여 구문 분석합니다. 그렇지 않은 경우 계속 수신하고 세그먼트 페이로드 버퍼를 웹 페이지 버퍼에 추가하여 들어오는 모든 세그먼트의 바이트를 보관합니다.
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
저는 vim을 사용합니다
테드 토크에 와주셔서 감사합니다. zensearch에 더 많은 기능과 수정 사항을 구현하겠습니다.
위 내용은 전체 데이터베이스를 쿼리할 수 있도록 Zensearch 기능 확장의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!