Rumah >pembangunan bahagian belakang >Golang >Simpan data peranti IoT

Simpan data peranti IoT

Susan Sarandon
Susan Sarandonasal
2024-09-27 10:50:02952semak imbas

Store IoT device data

Dalam siaran sebelumnya, kami menunjukkan cara menerima data peranti iot daripada broker MQTT. Dalam siaran ini, kami akan menyimpan data ke pangkalan data.

Dalam sistem yang mantap, kami boleh memilih untuk menyimpan peristiwa data mentah dalam tasik data. Mungkin, kita akan meneroka itu pada masa hadapan; tetapi buat masa ini kami akan menyimpannya dalam PostGres untuk memudahkan.

Siaran sebelumnya menunjukkan penerimaan data mentah dan menyahkarangnya ke dalam struct yang telah dianotasi dengan tag gorm. Gorm ialah ORM yang popular untuk Go. Jika anda tidak biasa dengannya, anda boleh mendapatkan maklumat lanjut di sini.

type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}

Jadi apa yang perlu kita lakukan ialah mengkonfigurasi sambungan Postgres, kemudian gunakan gorm untuk menyimpan data acara.

func setupPostgres(logger *zerolog.Logger) *Repository {
    dbHost := os.Getenv("POSTGRES_HOST")
    dbName := os.Getenv("POSTGRES_DB")
    dbPort := os.Getenv("POSTGRES_PORT")
    dbUser := os.Getenv("POSTGRES_USER")
    dbPassword := os.Getenv("POSTGRES_PASSWORD")
    dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
        dbHost, dbUser, dbPassword, dbName, dbPort)
    logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to connect to database")
    }

    // Auto-migrate the schema
    err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to migrate models")
    }

    sqlDB, err := db.DB()
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetConnMaxLifetime(time.Hour)

    repo := NewRepository(db, logger)
    return repo
}

Di sini kami menyediakan sambungan Postgres. Ambil perhatian bahawa kami menggunakan pembolehubah persekitaran untuk menyimpan maklumat sensitif kami. Ini adalah amalan yang baik untuk sistem pengeluaran sama ada ia dalam bekas atau tidak.

Kami juga memulakan struct yang dipanggil Repositori. Struk ini mengandungi kaedah penyimpanan dan pengambilan sebenar kami. Ini memberikan kita sedikit pemisahan daripada konfigurasi postgres.

type Repository struct {
    db     *gorm.DB
    logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
    return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
    sqlDb, err := r.db.DB()
    if err != nil {
        r.logger.Error().Err(err).Msg("failed to close database")
        return
    }
    _ = sqlDb.Close()
}
...
// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
    return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
    var messages []IoTDeviceMessage
    err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
    return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
    return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}

Kini mesej hanya perlu diteruskan. Memandangkan kami menggunakan corak saluran paip untuk memproses mesej, kami akan menambah langkah berterusan sebagai peringkat baharu dalam saluran paip.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
    out := make(chan IoTRawDeviceMessage)
    go func() {
        defer close(out)
        for iotMsg := range input {
            logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
            err := repo.CreateMessage(&iotMsg)
            if err != nil {
                logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
            }
        }
    }()
    return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
        for iotMsg := range finalChan {
            // now we have the IoTRawDeviceMessage that has been persisted
            logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
            // do something like check for alert conditions
        }

Itu sahaja yang ada.

Kod untuk ini boleh didapati di sini. Anda boleh menggunakannya dengan kod penerbit yang sama dari siaran sebelumnya. Pastikan anda mengkonfigurasi tetapan Postgres anda sebagai pembolehubah persekitaran.

Atas ialah kandungan terperinci Simpan data peranti IoT. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn