>백엔드 개발 >Golang >IoT 장치 데이터 저장

IoT 장치 데이터 저장

Susan Sarandon
Susan Sarandon원래의
2024-09-27 10:50:02949검색

Store IoT device data

이전 게시물에서는 MQTT 브로커로부터 IoT 디바이스 데이터를 수신하는 방법을 보여드렸습니다. 이번 포스팅에서는 데이터를 데이터베이스에 저장하겠습니다.

강력한 시스템에서는 원시 데이터 이벤트를 데이터 레이크에 저장하도록 선택할 수 있습니다. 아마도 우리는 미래에 그것을 탐구할 것입니다. 하지만 지금은 단순화를 위해 PostGres에 저장하겠습니다.

이전 게시물에서는 원시 데이터를 수신하고 이미 gorm 태그로 주석이 달린 구조체로 이를 역마샬링하는 방법을 시연했습니다. Gorm은 인기 있는 Go용 ORM입니다. 익숙하지 않은 경우 여기에서 자세한 내용을 확인할 수 있습니다.

type IoTDeviceMessage struct {
    BaseModel
    Time       time.Time       `json:"time" gorm:"index"`
    DeviceID   string          `json:"device_id"`
    DeviceType string          `json:"device_type"`
    DeviceData json.RawMessage `json:"device_data"`
}

따라서 우리가 해야 할 일은 Postgres 연결을 구성한 다음 gorm을 사용하여 이벤트 데이터를 저장하는 것뿐입니다.

func setupPostgres(logger *zerolog.Logger) *Repository {
    dbHost := os.Getenv("POSTGRES_HOST")
    dbName := os.Getenv("POSTGRES_DB")
    dbPort := os.Getenv("POSTGRES_PORT")
    dbUser := os.Getenv("POSTGRES_USER")
    dbPassword := os.Getenv("POSTGRES_PASSWORD")
    dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
        dbHost, dbUser, dbPassword, dbName, dbPort)
    logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to connect to database")
    }

    // Auto-migrate the schema
    err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
    if err != nil {
        logger.Fatal().Err(err).Msg("failed to migrate models")
    }

    sqlDB, err := db.DB()
    sqlDB.SetMaxIdleConns(10)
    sqlDB.SetMaxOpenConns(100)
    sqlDB.SetConnMaxLifetime(time.Hour)

    repo := NewRepository(db, logger)
    return repo
}

여기서 Postgres 연결을 설정했습니다. 민감한 정보를 저장하기 위해 환경 변수를 사용하고 있습니다. 이는 컨테이너화 여부에 관계없이 생산 시스템에 적합한 모범 사례입니다.

또한 Repository라는 구조체를 초기화하고 있습니다. 이 구조체에는 실제 저장 및 검색 방법이 포함되어 있습니다. 이는 Postgres 구성과 어느 정도 분리됩니다.

type Repository struct {
    db     *gorm.DB
    logger *zerolog.Logger
}

func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
    return &Repository{db: db, logger: logger}
}

func (r *Repository) Close() {
    sqlDb, err := r.db.DB()
    if err != nil {
        r.logger.Error().Err(err).Msg("failed to close database")
        return
    }
    _ = sqlDb.Close()
}
...
// Message-related functions

func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
    return r.db.Create(message).Error
}

func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
    var messages []IoTDeviceMessage
    err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
    return messages, err
}

func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
    return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}

이제 메시지가 지속되기만 하면 됩니다. 메시지를 처리하기 위해 파이프라인 패턴을 사용하고 있으므로 지속 단계를 파이프라인의 새로운 단계에 추가하겠습니다.

// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
    out := make(chan IoTRawDeviceMessage)
    go func() {
        defer close(out)
        for iotMsg := range input {
            logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
            err := repo.CreateMessage(&iotMsg)
            if err != nil {
                logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
            }
        }
    }()
    return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
        for iotMsg := range finalChan {
            // now we have the IoTRawDeviceMessage that has been persisted
            logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
            // do something like check for alert conditions
        }

그게 전부입니다.

이 코드는 여기에서 확인할 수 있습니다. 이전 게시물과 동일한 게시자 코드로 사용하실 수 있습니다. Postgres 설정을 환경 변수로 구성해야 합니다.

위 내용은 IoT 장치 데이터 저장의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.