Rumah > Artikel > pembangunan bahagian belakang > Simpan data peranti IoT
Dalam siaran sebelumnya, kami menunjukkan cara menerima data peranti iot daripada broker MQTT. Dalam siaran ini, kami akan menyimpan data ke pangkalan data.
Dalam sistem yang mantap, kami boleh memilih untuk menyimpan peristiwa data mentah dalam tasik data. Mungkin, kita akan meneroka itu pada masa hadapan; tetapi buat masa ini kami akan menyimpannya dalam PostGres untuk memudahkan.
Siaran sebelumnya menunjukkan penerimaan data mentah dan menyahkarangnya ke dalam struct yang telah dianotasi dengan tag gorm. Gorm ialah ORM yang popular untuk Go. Jika anda tidak biasa dengannya, anda boleh mendapatkan maklumat lanjut di sini.
type IoTDeviceMessage struct { BaseModel Time time.Time `json:"time" gorm:"index"` DeviceID string `json:"device_id"` DeviceType string `json:"device_type"` DeviceData json.RawMessage `json:"device_data"` }
Jadi apa yang perlu kita lakukan ialah mengkonfigurasi sambungan Postgres, kemudian gunakan gorm untuk menyimpan data acara.
func setupPostgres(logger *zerolog.Logger) *Repository { dbHost := os.Getenv("POSTGRES_HOST") dbName := os.Getenv("POSTGRES_DB") dbPort := os.Getenv("POSTGRES_PORT") dbUser := os.Getenv("POSTGRES_USER") dbPassword := os.Getenv("POSTGRES_PASSWORD") dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC", dbHost, dbUser, dbPassword, dbName, dbPort) logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn)) db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatal().Err(err).Msg("failed to connect to database") } // Auto-migrate the schema err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{}) if err != nil { logger.Fatal().Err(err).Msg("failed to migrate models") } sqlDB, err := db.DB() sqlDB.SetMaxIdleConns(10) sqlDB.SetMaxOpenConns(100) sqlDB.SetConnMaxLifetime(time.Hour) repo := NewRepository(db, logger) return repo }
Di sini kami menyediakan sambungan Postgres. Ambil perhatian bahawa kami menggunakan pembolehubah persekitaran untuk menyimpan maklumat sensitif kami. Ini adalah amalan yang baik untuk sistem pengeluaran sama ada ia dalam bekas atau tidak.
Kami juga memulakan struct yang dipanggil Repositori. Struk ini mengandungi kaedah penyimpanan dan pengambilan sebenar kami. Ini memberikan kita sedikit pemisahan daripada konfigurasi postgres.
type Repository struct { db *gorm.DB logger *zerolog.Logger } func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository { return &Repository{db: db, logger: logger} } func (r *Repository) Close() { sqlDb, err := r.db.DB() if err != nil { r.logger.Error().Err(err).Msg("failed to close database") return } _ = sqlDb.Close() } ... // Message-related functions func (r *Repository) CreateMessage(message *IoTDeviceMessage) error { return r.db.Create(message).Error } func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) { var messages []IoTDeviceMessage err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error return messages, err } func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error { return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error }
Kini mesej hanya perlu diteruskan. Memandangkan kami menggunakan corak saluran paip untuk memproses mesej, kami akan menambah langkah berterusan sebagai peringkat baharu dalam saluran paip.
// pipeline stage to persist the message func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage { out := make(chan IoTRawDeviceMessage) go func() { defer close(out) for iotMsg := range input { logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID)) err := repo.CreateMessage(&iotMsg) if err != nil { logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage") } } }() return out } ... finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan)) for iotMsg := range finalChan { // now we have the IoTRawDeviceMessage that has been persisted logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg)) // do something like check for alert conditions }
Itu sahaja yang ada.
Kod untuk ini boleh didapati di sini. Anda boleh menggunakannya dengan kod penerbit yang sama dari siaran sebelumnya. Pastikan anda mengkonfigurasi tetapan Postgres anda sebagai pembolehubah persekitaran.
Atas ialah kandungan terperinci Simpan data peranti IoT. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!