ホームページ >バックエンド開発 >Golang >Go と WebSocket を使用したリアルタイム コラボレーション プラットフォームの構築

Go と WebSocket を使用したリアルタイム コラボレーション プラットフォームの構築

Susan Sarandon
Susan Sarandonオリジナル
2025-01-05 21:25:46993ブラウズ

Building a Real-time Collaboration Platform with Go and WebSockets

導入

複数のユーザーが同時に共同作業できる分散型リアルタイム コラボレーション プラットフォームを構築しましょう。このプロジェクトでは、Go での WebSocket の処理、競合解決、状態の同期をデモします。

プロジェクト概要:リアルタイムコラボレーションプラットフォーム

コア機能

  • リアルタイムのドキュメント編集
  • カーソル位置の同期
  • 存在認識
  • 運用の変革
  • 紛争の解決
  • チャット機能

技術的な実装

1. Webソケットサーバー

// WebSocket server implementation
type CollaborationServer struct {
    sessions    map[string]*Session
    documents   map[string]*Document
    broadcast   chan Message
    register    chan *Client
    unregister  chan *Client
}

type Client struct {
    id       string
    session  *Session
    conn     *websocket.Conn
    send     chan Message
}

type Message struct {
    Type    MessageType `json:"type"`
    Payload interface{} `json:"payload"`
}

func NewCollaborationServer() *CollaborationServer {
    return &CollaborationServer{
        sessions:   make(map[string]*Session),
        documents:  make(map[string]*Document),
        broadcast:  make(chan Message),
        register:   make(chan *Client),
        unregister: make(chan *Client),
    }
}

func (s *CollaborationServer) Run() {
    for {
        select {
        case client := <-s.register:
            s.handleRegister(client)

        case client := <-s.unregister:
            s.handleUnregister(client)

        case message := <-s.broadcast:
            s.handleBroadcast(message)
        }
    }
}

func (s *CollaborationServer) handleRegister(client *Client) {
    session := s.sessions[client.session.ID]
    if session == nil {
        session = &Session{
            ID:      client.session.ID,
            Clients: make(map[string]*Client),
        }
        s.sessions[session.ID] = session
    }
    session.Clients[client.id] = client
}

2. 運用変革エンジン

// Operational transformation implementation
type Operation struct {
    Type      OperationType
    Position  int
    Content   string
    ClientID  string
    Revision  int
}

type Document struct {
    ID        string
    Content   string
    History   []Operation
    Revision  int
    mu        sync.RWMutex
}

func (d *Document) ApplyOperation(op Operation) error {
    d.mu.Lock()
    defer d.mu.Unlock()

    // Transform operation against concurrent operations
    transformedOp := d.transformOperation(op)

    // Apply the transformed operation
    switch transformedOp.Type {
    case OpInsert:
        d.insertContent(transformedOp.Position, transformedOp.Content)
    case OpDelete:
        d.deleteContent(transformedOp.Position, len(transformedOp.Content))
    }

    // Update revision and history
    d.Revision++
    d.History = append(d.History, transformedOp)

    return nil
}

func (d *Document) transformOperation(op Operation) Operation {
    transformed := op

    // Transform against all concurrent operations
    for _, historical := range d.History[op.Revision:] {
        transformed = transform(transformed, historical)
    }

    return transformed
}

3. プレゼンスシステム

// Real-time presence tracking
type PresenceSystem struct {
    mu       sync.RWMutex
    users    map[string]*UserPresence
    updates  chan PresenceUpdate
}

type UserPresence struct {
    UserID    string
    Document  string
    Cursor    Position
    Selection Selection
    LastSeen  time.Time
}

type Position struct {
    Line   int
    Column int
}

type Selection struct {
    Start Position
    End   Position
}

func (ps *PresenceSystem) UpdatePresence(update PresenceUpdate) {
    ps.mu.Lock()
    defer ps.mu.Unlock()

    user := ps.users[update.UserID]
    if user == nil {
        user = &UserPresence{UserID: update.UserID}
        ps.users[update.UserID] = user
    }

    user.Document = update.Document
    user.Cursor = update.Cursor
    user.Selection = update.Selection
    user.LastSeen = time.Now()

    // Broadcast update to other users
    ps.updates <- update
}

func (ps *PresenceSystem) StartCleanup() {
    ticker := time.NewTicker(30 * time.Second)
    go func() {
        for range ticker.C {
            ps.cleanupInactiveUsers()
        }
    }()
}

4. 紛争の解決

// Conflict resolution system
type ConflictResolver struct {
    strategy ConflictStrategy
}

type ConflictStrategy interface {
    Resolve(a, b Operation) Operation
}

// Last-write-wins strategy
type LastWriteWinsStrategy struct{}

func (s *LastWriteWinsStrategy) Resolve(a, b Operation) Operation {
    if a.Timestamp.After(b.Timestamp) {
        return a
    }
    return b
}

// Three-way merge strategy
type ThreeWayMergeStrategy struct{}

func (s *ThreeWayMergeStrategy) Resolve(base, a, b Operation) Operation {
    // Implement three-way merge logic
    if a.Position == b.Position {
        if a.Type == OpDelete && b.Type == OpDelete {
            return a // Both deleted same content
        }
        if a.Timestamp.After(b.Timestamp) {
            return a
        }
        return b
    }

    // Non-overlapping changes
    if a.Position < b.Position {
        return combineOperations(a, b)
    }
    return combineOperations(b, a)
}

5. 状態の同期

// State synchronization system
type SyncManager struct {
    documents map[string]*DocumentState
    clients   map[string]*ClientState
}

type DocumentState struct {
    Content    string
    Version    int64
    Operations []Operation
    Checksum   string
}

type ClientState struct {
    LastSync    time.Time
    SyncVersion int64
}

func (sm *SyncManager) SynchronizeState(clientID string, docID string) error {
    client := sm.clients[clientID]
    doc := sm.documents[docID]

    if client.SyncVersion == doc.Version {
        return nil // Already in sync
    }

    // Get operations since last sync
    ops := sm.getOperationsSince(docID, client.SyncVersion)

    // Apply operations to client state
    for _, op := range ops {
        if err := sm.applyOperation(clientID, op); err != nil {
            return fmt.Errorf("sync failed: %w", err)
        }
    }

    // Update client sync version
    client.SyncVersion = doc.Version
    client.LastSync = time.Now()

    return nil
}

6. チャットシステム

// Real-time chat implementation
type ChatSystem struct {
    rooms    map[string]*ChatRoom
    history  map[string][]ChatMessage
}

type ChatRoom struct {
    ID        string
    Members   map[string]*Client
    Messages  chan ChatMessage
}

type ChatMessage struct {
    ID        string
    RoomID    string
    UserID    string
    Content   string
    Timestamp time.Time
}

func (cs *ChatSystem) SendMessage(msg ChatMessage) error {
    room := cs.rooms[msg.RoomID]
    if room == nil {
        return fmt.Errorf("room not found: %s", msg.RoomID)
    }

    // Store message in history
    cs.history[msg.RoomID] = append(cs.history[msg.RoomID], msg)

    // Broadcast to room members
    room.Messages <- msg

    return nil
}

高度な機能

1. パフォーマンスの最適化

  • メッセージのバッチ処理
  • 操作圧縮
  • 選択放送
// Message batching implementation
type MessageBatcher struct {
    messages []Message
    timeout  time.Duration
    size     int
    batch    chan []Message
}

func (mb *MessageBatcher) Add(msg Message) {
    mb.messages = append(mb.messages, msg)

    if len(mb.messages) >= mb.size {
        mb.flush()
    }
}

func (mb *MessageBatcher) Start() {
    ticker := time.NewTicker(mb.timeout)
    go func() {
        for range ticker.C {
            mb.flush()
        }
    }()
}

2. スケーリングに関する考慮事項

// Distributed coordination using Redis
type DistributedCoordinator struct {
    client  *redis.Client
    pubsub  *redis.PubSub
}

func (dc *DistributedCoordinator) PublishUpdate(update Update) error {
    return dc.client.Publish(ctx, "updates", update).Err()
}

func (dc *DistributedCoordinator) SubscribeToUpdates() {
    sub := dc.client.Subscribe(ctx, "updates")
    for msg := range sub.Channel() {
        // Handle distributed update
        dc.handleUpdate(msg)
    }
}

テスト戦略

1. 単体テスト

func TestOperationalTransformation(t *testing.T) {
    doc := NewDocument("test")

    // Test concurrent inserts
    op1 := Operation{Type: OpInsert, Position: 0, Content: "Hello"}
    op2 := Operation{Type: OpInsert, Position: 0, Content: "World"}

    doc.ApplyOperation(op1)
    doc.ApplyOperation(op2)

    expected := "WorldHello"
    if doc.Content != expected {
        t.Errorf("expected %s, got %s", expected, doc.Content)
    }
}

2. 結合テスト

func TestRealTimeCollaboration(t *testing.T) {
    server := NewCollaborationServer()
    go server.Run()

    // Create test clients
    client1 := createTestClient()
    client2 := createTestClient()

    // Simulate concurrent editing
    go simulateEditing(client1)
    go simulateEditing(client2)

    // Verify final state
    time.Sleep(2 * time.Second)
    verifyDocumentState(t, server)
}

導入アーキテクチャ

  • ロードバランサーの背後にある複数のサーバーインスタンス
  • Pub/Sub と状態の調整のための Redis
  • WebSocket 接続管理
  • 監視とアラート

結論

リアルタイム コラボレーション プラットフォームの構築は、複雑な分散システムの概念とリアルタイムのデータ同期を示します。このプロジェクトは、Go の強力な同時実行機能と WebSocket 処理機能を紹介します。

追加リソース

  • WebSocket プロトコル RFC
  • 業務の変革
  • Redis Pub/Sub ドキュメント

以下のコメント欄でリアルタイム コラボレーション システム構築の経験を共有してください!


タグ: #golang #websockets #リアルタイム #コラボレーション #分散システム

以上がGo と WebSocket を使用したリアルタイム コラボレーション プラットフォームの構築の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。