首页 >后端开发 >Golang >扩展 Zensearch 的能力来查询整个数据库

扩展 Zensearch 的能力来查询整个数据库

Linda Hamilton
Linda Hamilton原创
2024-11-14 12:08:02409浏览

Scaling Zensearch

以前,我能够毫无问题地为我的搜索引擎抓取网页并为其建立索引,直到我的数据库增长超过 RabbitMQ 消息队列的容纳能力。如果消息队列中的消息超过其默认大小,RabbitMQ 会抛出错误并引发恐慌,我可以更改默认大小,但如果我的数据库增长,则不会扩展,因此为了让用户抓取网页而不必担心消息代理崩溃了。

创建段

我已经实现了一个函数,用于创建具有最大段大小或 MSS 的段,其原理与创建段时 TCP 的原理相同,该段包含一个 8 字节标头,其中 8 字节标头中的每个 4 字节是序列号,分段总数,主体的其余部分是分段数据库的有效负载。

// MSS is number in bytes
function createSegments(
  webpages: Array<Webpage>, // webpages queried from database
  MSS: number,
): Array<ArrayBufferLike> {
  const text_encoder = new TextEncoder();
  const encoded_text = text_encoder.encode(JSON.stringify(webpages));
  const data_length = encoded_text.byteLength;
  let currentIndex = 0;
  let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder
  let segments: Array<ArrayBufferLike> = [];
  let pointerPosition = MSS;

  for (let i = 0; i < segmentCount; i++) {
    let currentDataLength = Math.abs(currentIndex - data_length);

    let slicedArray = encoded_text.slice(currentIndex, pointerPosition);

    currentIndex += slicedArray.byteLength;
    // Add to offset MSS to point to the next segment in the array
    // manipulate pointerPosition to adjust to lower values using Math.min()

    // Is current data length enough to fit MSS?
    // if so add from current position + MSS
    // else get remaining of the currentDataLength
    pointerPosition += Math.min(MSS, currentDataLength);
    const payload = new Uint8Array(slicedArray.length);
    payload.set(slicedArray);
    segments.push(newSegment(i, segmentCount, Buffer.from(payload)));
  }
  return segments;
}

function newSegment(
  sequenceNum: number,
  segmentCount: number,
  payload: Buffer,
): ArrayBufferLike {
  // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount
  const sequenceNumBuffer = convertIntToBuffer(sequenceNum);
  const segmentCountBuffer = convertIntToBuffer(segmentCount);
  const headerBuffer = new ArrayBuffer(8);
  const header = new Uint8Array(headerBuffer);
  header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer]));
  return Buffer.concat([header, payload]);
}

function convertIntToBuffer(int: number): Buffer {
  const bytes = Buffer.alloc(4);
  bytes.writeIntLE(int, 0, 4);
  console.log(bytes);
  return bytes;
}

解析传入的段

这种创建大型数据集的小片段的方法将有助于扩展数据库查询,即使数据库增长也是如此。

现在搜索引擎如何解析缓冲区并将每个段转换为网页数组?

从段缓冲区读取

首先提取段标头,因为标头包含 2 个属性,即序列号和总段数,

func GetSegmentHeader(buf []byte) (*SegmentHeader, error) {
    byteReader := bytes.NewBuffer(buf)
    headerOffsets := []int{0, 4}
    newSegmentHeader := SegmentHeader{}

    for i := range headerOffsets {
        buffer := make([]byte, 4)
        _, err := byteReader.Read(buffer)
        if err != nil {
            return &SegmentHeader{}, err
        }
        value := binary.LittleEndian.Uint32(buffer)

        // this feels disgusting but i dont feel like bothering with this
        if i == 0 {
            newSegmentHeader.SequenceNum = value
            continue
        }
        newSegmentHeader.TotalSegments = value
    }
    return &newSegmentHeader, nil
}

func GetSegmentPayload(buf []byte) ([]byte, error) {
    headerOffset := 8
    byteReader := bytes.NewBuffer(buf[headerOffset:])
    return byteReader.Bytes(), nil

}

处理段的重传和重新排队

序列号将用于分段的重传/重新排队,因此如果预期的序列号不是收到的序列号,则从当前分段开始重新排队每个分段。

    // for retransmission/requeuing
        if segmentHeader.SequenceNum != expectedSequenceNum {
            ch.Nack(data.DeliveryTag, true, true)
            log.Printf("Expected Sequence number %d, got %d\n",
                expectedSequenceNum, segmentHeader.SequenceNum)
            continue
        }

附加段有效负载

如果搜索引擎接收到的段总数等于数据库服务要发送的总段长度,则总段将用于中断监听生产者(数据库服务)然后中断并解析聚合的段缓冲区,如果不是,则继续侦听并将段有效负载缓冲区附加到网页缓冲区以保存来自所有传入段的字节。

        segmentCounter++
        fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments)
        fmt.Printf("current segments : %d\n", segmentCounter)
        expectedSequenceNum++
        ch.Ack(data.DeliveryTag, false)
        webpageBytes = append(webpageBytes, segmentPayload...)
        fmt.Printf("Byte Length: %d\n", len(webpageBytes))

        if segmentCounter == segmentHeader.TotalSegments {
            log.Printf("Got all segments from Database %d", segmentCounter)
            break
        }

顺便说一句,我使用 vim

感谢您参加我的 ted 演讲,我将为 zensearch 实现更多功能和修复。

以上是扩展 Zensearch 的能力来查询整个数据库的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn