Heim >Backend-Entwicklung >Golang >Speichern Sie IoT-Gerätedaten

Speichern Sie IoT-Gerätedaten

Susan Sarandon
Susan SarandonOriginal
2024-09-27 10:50:02968Durchsuche

Store IoT device data

In einem früheren Beitrag haben wir gezeigt, wie man IoT-Gerätedaten von einem MQTT-Broker empfängt. In diesem Beitrag werden wir die Daten in einer Datenbank speichern.

In einem robusten System können wir uns dafür entscheiden, die Rohdatenereignisse in einem Data Lake zu speichern. Vielleicht werden wir das in Zukunft untersuchen; aber der Einfachheit halber speichern wir es vorerst in PostGres.

Im vorherigen Beitrag wurde der Empfang der Rohdaten und deren Unmarshalling in eine Struktur gezeigt, die bereits mit Gorm-Tags annotiert war. Gorm ist ein beliebtes ORM für Go. Wenn Sie damit nicht vertraut sind, finden Sie hier weitere Informationen.

type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}

Alles, was wir tun müssen, ist, die Postgres-Verbindung zu konfigurieren und dann Gorm zum Speichern der Ereignisdaten zu verwenden.

func setupPostgres(logger *zerolog.Logger) *Repository {
    dbHost := os.Getenv("POSTGRES_HOST")
    dbName := os.Getenv("POSTGRES_DB")
    dbPort := os.Getenv("POSTGRES_PORT")
    dbUser := os.Getenv("POSTGRES_USER")
    dbPassword := os.Getenv("POSTGRES_PASSWORD")
    dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
        dbHost, dbUser, dbPassword, dbName, dbPort)
    logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to connect to database")
    }

    // Auto-migrate the schema
    err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to migrate models")
    }

    sqlDB, err := db.DB()
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetConnMaxLifetime(time.Hour)

    repo := NewRepository(db, logger)
    return repo
}

Hier richten wir die Postgres-Verbindung ein. Beachten Sie, dass wir Umgebungsvariablen zum Speichern unserer vertraulichen Informationen verwenden. Dies ist eine bewährte Vorgehensweise für Produktionssysteme, unabhängig davon, ob sie in Containern vorliegen oder nicht.

Wir initialisieren außerdem eine Struktur namens Repository. Diese Struktur enthält unsere eigentlichen Speicher- und Abrufmethoden. Dies bietet uns eine gewisse Trennung von der Postgres-Konfiguration.

type Repository struct {
    db     *gorm.DB
    logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
    return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
    sqlDb, err := r.db.DB()
    if err != nil {
        r.logger.Error().Err(err).Msg("failed to close database")
        return
    }
    _ = sqlDb.Close()
}
...
// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
    return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
    var messages []IoTDeviceMessage
    err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
    return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
    return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}

Jetzt muss die Nachricht nur noch persistiert werden. Da wir das Pipeline-Muster zum Verarbeiten der Nachrichten verwenden, werden wir den Persistenzschritt als neue Stufe in der Pipeline hinzufügen.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
    out := make(chan IoTRawDeviceMessage)
    go func() {
        defer close(out)
        for iotMsg := range input {
            logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
            err := repo.CreateMessage(&iotMsg)
            if err != nil {
                logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
            }
        }
    }()
    return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
        for iotMsg := range finalChan {
            // now we have the IoTRawDeviceMessage that has been persisted
            logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
            // do something like check for alert conditions
        }

Das ist alles.

Den Code dafür finden Sie hier. Sie können es mit demselben Herausgebercode wie im vorherigen Beitrag verwenden. Stellen Sie sicher, dass Sie Ihre Postgres-Einstellungen als Umgebungsvariablen konfigurieren.

Das obige ist der detaillierte Inhalt vonSpeichern Sie IoT-Gerätedaten. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn