>백엔드 개발 >Golang >전체 데이터베이스를 쿼리할 수 있도록 Zensearch 기능 확장

전체 데이터베이스를 쿼리할 수 있도록 Zensearch 기능 확장

Linda Hamilton
Linda Hamilton원래의
2024-11-14 12:08:02409검색

Scaling Zensearch

이전에는 RabbitMQ의 메시지 대기열이 감당할 수 있는 것보다 데이터베이스가 더 커질 때까지 문제 없이 검색 엔진의 웹 페이지를 크롤링하고 색인을 생성할 수 있었습니다. 메시지 큐의 메시지가 기본 크기를 초과하면 RabbitMQ는 오류와 패닉을 발생시킵니다. 기본 크기를 변경할 수 있지만 데이터베이스가 커지면 크기가 조정되지 않으므로 사용자가 걱정할 필요 없이 웹 페이지를 크롤링할 수 있습니다. 메시지 브로커가 충돌합니다.

세그먼트 생성

세그먼트를 생성할 때 TCP와 동일한 원리로 최대 세그먼트 크기 또는 MSS로 세그먼트를 생성하는 기능을 구현했습니다. 세그먼트에는 8바이트 헤더의 각 4바이트가 시퀀스 번호이고, 전체 세그먼트 수이고 본문의 나머지 부분은 세그먼트화된 데이터베이스의 페이로드입니다.

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

수신 세그먼트 구문 분석

대규모 데이터 세트의 작은 세그먼트를 생성하는 이 방법은 데이터베이스가 커지더라도 데이터베이스 쿼리를 확장하는 데 도움이 됩니다.

이제 검색 엔진은 어떻게 버퍼를 구문 분석하고 각 세그먼트를 웹 페이지 배열로 변환합니까?

세그먼트 버퍼에서 읽기

헤더에는 시퀀스 번호와 총 세그먼트라는 두 가지 속성이 포함되어 있으므로 먼저 세그먼트 헤더를 추출합니다.

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

재전송 처리 및 세그먼트 재큐잉

시퀀스 번호는 세그먼트 재전송/재큐잉에 사용됩니다. 따라서 예상되는 시퀀스 번호가 수신된 번호가 아닌 경우 현재 세그먼트부터 시작하여 모든 세그먼트를 다시 대기열에 추가하세요.

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

세그먼트 페이로드 추가

검색 엔진이 수신한 총 세그먼트 수가 데이터베이스 서비스에서 전송하려는 총 세그먼트의 길이와 동일한 경우 총 세그먼트는 생산자(데이터베이스 서비스)의 청취를 중단하는 데 사용됩니다. 그런 다음 집계된 세그먼트 버퍼를 분리하여 구문 분석합니다. 그렇지 않은 경우 계속 수신하고 세그먼트 페이로드 버퍼를 웹 페이지 버퍼에 추가하여 들어오는 모든 세그먼트의 바이트를 보관합니다.

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

저는 vim을 사용합니다

테드 토크에 와주셔서 감사합니다. zensearch에 더 많은 기능과 수정 사항을 구현하겠습니다.

위 내용은 전체 데이터베이스를 쿼리할 수 있도록 Zensearch 기능 확장의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.