Maison >développement back-end >Golang >Stocker les données des appareils IoT

Stocker les données des appareils IoT

Susan Sarandon
Susan Sarandonoriginal
2024-09-27 10:50:02952parcourir

Store IoT device data

Dans un article précédent, nous avons montré comment recevoir les données d'un appareil IoT d'un courtier MQTT. Dans cet article, nous stockerons les données dans une base de données.

Dans un système robuste, nous pouvons choisir de stocker les événements de données brutes dans un lac de données. Peut-être que nous explorerons cela à l'avenir ; mais pour l'instant, nous allons le stocker dans PostGres pour plus de simplicité.

Le message précédent montrait la réception des données brutes et leur désorganisation dans une structure déjà annotée avec des balises gorm. Gorm est un ORM populaire pour Go. Si vous ne le connaissez pas, vous pouvez pour plus d'informations ici.

type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}

Il nous suffit donc de configurer la connexion Postgres, puis d'utiliser gorm pour enregistrer les données de l'événement.

func setupPostgres(logger *zerolog.Logger) *Repository {
    dbHost := os.Getenv("POSTGRES_HOST")
    dbName := os.Getenv("POSTGRES_DB")
    dbPort := os.Getenv("POSTGRES_PORT")
    dbUser := os.Getenv("POSTGRES_USER")
    dbPassword := os.Getenv("POSTGRES_PASSWORD")
    dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
        dbHost, dbUser, dbPassword, dbName, dbPort)
    logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to connect to database")
    }

    // Auto-migrate the schema
    err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to migrate models")
    }

    sqlDB, err := db.DB()
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetConnMaxLifetime(time.Hour)

    repo := NewRepository(db, logger)
    return repo
}

Ici, nous configurons la connexion Postgres. Notez que nous utilisons des variables d'environnement pour stocker nos informations sensibles. C'est une bonne pratique pour les systèmes de production, qu'ils soient conteneurisés ou non.

Nous initialisons également une structure appelée Repository. Cette structure contient nos méthodes actuelles de stockage et de récupération. Cela nous offre une certaine séparation de la configuration postgres.

type Repository struct {
    db     *gorm.DB
    logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
    return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
    sqlDb, err := r.db.DB()
    if err != nil {
        r.logger.Error().Err(err).Msg("failed to close database")
        return
    }
    _ = sqlDb.Close()
}
...
// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
    return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
    var messages []IoTDeviceMessage
    err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
    return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
    return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}

Maintenant, le message doit simplement être conservé. Puisque nous utilisons le modèle de pipeline pour traiter les messages, nous ajouterons l'étape de persistance, une nouvelle étape dans le pipeline.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
    out := make(chan IoTRawDeviceMessage)
    go func() {
        defer close(out)
        for iotMsg := range input {
            logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
            err := repo.CreateMessage(&iotMsg)
            if err != nil {
                logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
            }
        }
    }()
    return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
        for iotMsg := range finalChan {
            // now we have the IoTRawDeviceMessage that has been persisted
            logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
            // do something like check for alert conditions
        }

C'est tout ce qu'il y a à faire.

Le code pour cela peut être trouvé ici. Vous pouvez l'utiliser avec le même code éditeur que celui du post précédent. Assurez-vous de configurer vos paramètres Postgres en tant que variables d'environnement.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn