Maison >développement back-end >Golang >Stocker les données des appareils IoT
Dans un article précédent, nous avons montré comment recevoir les données d'un appareil IoT d'un courtier MQTT. Dans cet article, nous stockerons les données dans une base de données.
Dans un système robuste, nous pouvons choisir de stocker les événements de données brutes dans un lac de données. Peut-être que nous explorerons cela à l'avenir ; mais pour l'instant, nous allons le stocker dans PostGres pour plus de simplicité.
Le message précédent montrait la réception des données brutes et leur désorganisation dans une structure déjà annotée avec des balises gorm. Gorm est un ORM populaire pour Go. Si vous ne le connaissez pas, vous pouvez pour plus d'informations ici.
type IoTDeviceMessage struct { BaseModel Time time.Time `json:"time" gorm:"index"` DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` DeviceData json.RawMessage `json:"device_data"` }
Il nous suffit donc de configurer la connexion Postgres, puis d'utiliser gorm pour enregistrer les données de l'événement.
func setupPostgres(logger *zerolog.Logger) *Repository { dbHost := os.Getenv("POSTGRES_HOST") dbName := os.Getenv("POSTGRES_DB") dbPort := os.Getenv("POSTGRES_PORT") dbUser := os.Getenv("POSTGRES_USER") dbPassword := os.Getenv("POSTGRES_PASSWORD") dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC", dbHost, dbUser, dbPassword, dbName, dbPort) logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn)) db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } // Auto-migrate the schema err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{}) if err != nil { logger.Fatal().Err(err).Msg("failed to migrate models") } sqlDB, err := db.DB() sqlDB.SetMaxIdleConns(10) sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) repo := NewRepository(db, logger) return repo }
Ici, nous configurons la connexion Postgres. Notez que nous utilisons des variables d'environnement pour stocker nos informations sensibles. C'est une bonne pratique pour les systèmes de production, qu'ils soient conteneurisés ou non.
Nous initialisons également une structure appelée Repository. Cette structure contient nos méthodes actuelles de stockage et de récupération. Cela nous offre une certaine séparation de la configuration postgres.
type Repository struct { db *gorm.DB logger *zerolog.Logger } func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository { return &Repository{db: db, logger: logger} } func (r *Repository) Close() { sqlDb, err := r.db.DB() if err != nil { r.logger.Error().Err(err).Msg("failed to close database") return } _ = sqlDb.Close() } ... // Message-related functions func (r *Repository) CreateMessage(message *IoTDeviceMessage) error { return r.db.Create(message).Error } func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) { var messages []IoTDeviceMessage err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error return messages, err } func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error { return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error }
Maintenant, le message doit simplement être conservé. Puisque nous utilisons le modèle de pipeline pour traiter les messages, nous ajouterons l'étape de persistance, une nouvelle étape dans le pipeline.
// pipeline stage to persist the message func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage { out := make(chan IoTRawDeviceMessage) go func() { defer close(out) for iotMsg := range input { logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID)) err := repo.CreateMessage(&iotMsg) if err != nil { logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage") } } }() return out } ... finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan)) for iotMsg := range finalChan { // now we have the IoTRawDeviceMessage that has been persisted logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg)) // do something like check for alert conditions }
C'est tout ce qu'il y a à faire.
Le code pour cela peut être trouvé ici. Vous pouvez l'utiliser avec le même code éditeur que celui du post précédent. Assurez-vous de configurer vos paramètres Postgres en tant que variables d'environnement.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!