複数のユーザーが同時に共同作業できる分散型リアルタイム コラボレーション プラットフォームを構築しましょう。このプロジェクトでは、Go での WebSocket の処理、競合解決、状態の同期をデモします。
// WebSocket server implementation type CollaborationServer struct { sessions map[string]*Session documents map[string]*Document broadcast chan Message register chan *Client unregister chan *Client } type Client struct { id string session *Session conn *websocket.Conn send chan Message } type Message struct { Type MessageType `json:"type"` Payload interface{} `json:"payload"` } func NewCollaborationServer() *CollaborationServer { return &CollaborationServer{ sessions: make(map[string]*Session), documents: make(map[string]*Document), broadcast: make(chan Message), register: make(chan *Client), unregister: make(chan *Client), } } func (s *CollaborationServer) Run() { for { select { case client := <-s.register: s.handleRegister(client) case client := <-s.unregister: s.handleUnregister(client) case message := <-s.broadcast: s.handleBroadcast(message) } } } func (s *CollaborationServer) handleRegister(client *Client) { session := s.sessions[client.session.ID] if session == nil { session = &Session{ ID: client.session.ID, Clients: make(map[string]*Client), } s.sessions[session.ID] = session } session.Clients[client.id] = client }
// Operational transformation implementation type Operation struct { Type OperationType Position int Content string ClientID string Revision int } type Document struct { ID string Content string History []Operation Revision int mu sync.RWMutex } func (d *Document) ApplyOperation(op Operation) error { d.mu.Lock() defer d.mu.Unlock() // Transform operation against concurrent operations transformedOp := d.transformOperation(op) // Apply the transformed operation switch transformedOp.Type { case OpInsert: d.insertContent(transformedOp.Position, transformedOp.Content) case OpDelete: d.deleteContent(transformedOp.Position, len(transformedOp.Content)) } // Update revision and history d.Revision++ d.History = append(d.History, transformedOp) return nil } func (d *Document) transformOperation(op Operation) Operation { transformed := op // Transform against all concurrent operations for _, historical := range d.History[op.Revision:] { transformed = transform(transformed, historical) } return transformed }
// Real-time presence tracking type PresenceSystem struct { mu sync.RWMutex users map[string]*UserPresence updates chan PresenceUpdate } type UserPresence struct { UserID string Document string Cursor Position Selection Selection LastSeen time.Time } type Position struct { Line int Column int } type Selection struct { Start Position End Position } func (ps *PresenceSystem) UpdatePresence(update PresenceUpdate) { ps.mu.Lock() defer ps.mu.Unlock() user := ps.users[update.UserID] if user == nil { user = &UserPresence{UserID: update.UserID} ps.users[update.UserID] = user } user.Document = update.Document user.Cursor = update.Cursor user.Selection = update.Selection user.LastSeen = time.Now() // Broadcast update to other users ps.updates <- update } func (ps *PresenceSystem) StartCleanup() { ticker := time.NewTicker(30 * time.Second) go func() { for range ticker.C { ps.cleanupInactiveUsers() } }() }
// Conflict resolution system type ConflictResolver struct { strategy ConflictStrategy } type ConflictStrategy interface { Resolve(a, b Operation) Operation } // Last-write-wins strategy type LastWriteWinsStrategy struct{} func (s *LastWriteWinsStrategy) Resolve(a, b Operation) Operation { if a.Timestamp.After(b.Timestamp) { return a } return b } // Three-way merge strategy type ThreeWayMergeStrategy struct{} func (s *ThreeWayMergeStrategy) Resolve(base, a, b Operation) Operation { // Implement three-way merge logic if a.Position == b.Position { if a.Type == OpDelete && b.Type == OpDelete { return a // Both deleted same content } if a.Timestamp.After(b.Timestamp) { return a } return b } // Non-overlapping changes if a.Position < b.Position { return combineOperations(a, b) } return combineOperations(b, a) }
// State synchronization system type SyncManager struct { documents map[string]*DocumentState clients map[string]*ClientState } type DocumentState struct { Content string Version int64 Operations []Operation Checksum string } type ClientState struct { LastSync time.Time SyncVersion int64 } func (sm *SyncManager) SynchronizeState(clientID string, docID string) error { client := sm.clients[clientID] doc := sm.documents[docID] if client.SyncVersion == doc.Version { return nil // Already in sync } // Get operations since last sync ops := sm.getOperationsSince(docID, client.SyncVersion) // Apply operations to client state for _, op := range ops { if err := sm.applyOperation(clientID, op); err != nil { return fmt.Errorf("sync failed: %w", err) } } // Update client sync version client.SyncVersion = doc.Version client.LastSync = time.Now() return nil }
// Real-time chat implementation type ChatSystem struct { rooms map[string]*ChatRoom history map[string][]ChatMessage } type ChatRoom struct { ID string Members map[string]*Client Messages chan ChatMessage } type ChatMessage struct { ID string RoomID string UserID string Content string Timestamp time.Time } func (cs *ChatSystem) SendMessage(msg ChatMessage) error { room := cs.rooms[msg.RoomID] if room == nil { return fmt.Errorf("room not found: %s", msg.RoomID) } // Store message in history cs.history[msg.RoomID] = append(cs.history[msg.RoomID], msg) // Broadcast to room members room.Messages <- msg return nil }
// Message batching implementation type MessageBatcher struct { messages []Message timeout time.Duration size int batch chan []Message } func (mb *MessageBatcher) Add(msg Message) { mb.messages = append(mb.messages, msg) if len(mb.messages) >= mb.size { mb.flush() } } func (mb *MessageBatcher) Start() { ticker := time.NewTicker(mb.timeout) go func() { for range ticker.C { mb.flush() } }() }
// Distributed coordination using Redis type DistributedCoordinator struct { client *redis.Client pubsub *redis.PubSub } func (dc *DistributedCoordinator) PublishUpdate(update Update) error { return dc.client.Publish(ctx, "updates", update).Err() } func (dc *DistributedCoordinator) SubscribeToUpdates() { sub := dc.client.Subscribe(ctx, "updates") for msg := range sub.Channel() { // Handle distributed update dc.handleUpdate(msg) } }
func TestOperationalTransformation(t *testing.T) { doc := NewDocument("test") // Test concurrent inserts op1 := Operation{Type: OpInsert, Position: 0, Content: "Hello"} op2 := Operation{Type: OpInsert, Position: 0, Content: "World"} doc.ApplyOperation(op1) doc.ApplyOperation(op2) expected := "WorldHello" if doc.Content != expected { t.Errorf("expected %s, got %s", expected, doc.Content) } }
func TestRealTimeCollaboration(t *testing.T) { server := NewCollaborationServer() go server.Run() // Create test clients client1 := createTestClient() client2 := createTestClient() // Simulate concurrent editing go simulateEditing(client1) go simulateEditing(client2) // Verify final state time.Sleep(2 * time.Second) verifyDocumentState(t, server) }
リアルタイム コラボレーション プラットフォームの構築は、複雑な分散システムの概念とリアルタイムのデータ同期を示します。このプロジェクトは、Go の強力な同時実行機能と WebSocket 処理機能を紹介します。
以下のコメント欄でリアルタイム コラボレーション システム構築の経験を共有してください!
タグ: #golang #websockets #リアルタイム #コラボレーション #分散システム
以上がGo と WebSocket を使用したリアルタイム コラボレーション プラットフォームの構築の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。