Rumah  >  Artikel  >  pembangunan bahagian belakang  >  Membina Logger Data Tersuai dengan Raspberry Pi untuk Kenderaan: Mengintegrasikan Penderia BNO dan ELM

Membina Logger Data Tersuai dengan Raspberry Pi untuk Kenderaan: Mengintegrasikan Penderia BNO dan ELM

WBOY
WBOYasal
2024-09-10 16:30:06839semak imbas

Ringkasan

Sebagai jurutera perisian yang bekerja dalam industri automotif, saya mempunyai minat yang mendalam dalam kenderaan autonomi, teknik pengukuran data dan kaedah analisis. Dalam siaran ini, saya akan menerangkan sistem ukuran tersuai, memperincikan proses dari awal, dan membentangkan beberapa hasil percubaan. Logger data saya terdiri daripada Raspberry Pi 3, sensor BNO055 dan penyesuai ELM327 OBD-II, yang masing-masing bertanggungjawab untuk pengkomputeran, mengumpul data pecutan dan halaju putaran serta mendapatkan maklumat enjin daripada kenderaan sebenar.

Jadual Kandungan

1.Latar Belakang
2.Seni Bina
3.Maklumat sensor
4.Maklumat sistem koordinat
5.Perbandingan Data
6. Logik Teras
7. Kesimpulan
8.Rujukan

1. Latar belakang

Dalam beberapa tahun kebelakangan ini, peningkatan IoT, kecerdasan buatan dan pengkomputeran tepi telah membawa kemajuan yang ketara kepada pelbagai industri, termasuk sektor automotif. Kenderaan moden dilengkapi dengan pelbagai penderia yang bekerjasama untuk mencipta sistem yang canggih, membolehkan segala-galanya daripada ciri keselamatan yang dipertingkatkan kepada keupayaan pemanduan autonomi.

Lazimnya, memantau keadaan kenderaan memerlukan penggunaan sensor canggih dan mahal untuk mencapai ukuran yang tepat dan boleh dipercayai. Walau bagaimanapun, untuk situasi tertentu, seperti mendapatkan data pada litar untuk pemahaman umum tentang dinamik kenderaan, penderia mewah ini mungkin tidak begitu diperlukan. Menyedari perkara ini, saya memutuskan untuk mencipta sistem kos efektif yang mampu mengukur keadaan fizikal kenderaan menggunakan komponen yang lebih berpatutan.

Saya memutuskan untuk menumpukan pada mengukur data yang berkaitan dengan dinamik kenderaan dan bukannya keselesaan memandu. Ini kerana menganalisis keselesaan perjalanan memerlukan menangkap lebih banyak isyarat frekuensi tinggi dengan ketepatan yang sesuai, yang memerlukan menetapkan frekuensi pensampelan yang tinggi semasa pengukuran.

Khususnya, dalam ciri tindak balas frekuensi pecutan sisi berkenaan dengan sudut stereng, frekuensi resonan biasanya muncul di bawah 5 Hz, walaupun ia bergantung pada kelajuan kenderaan. Sebaliknya, dalam analisis keselesaan perjalanan, perbincangan selalunya berlanjutan kepada julat beberapa puluh Hz.

Oleh itu, matlamat utama membangunkan sistem pengukuran adalah untuk mencipta sistem pengelogan data kos rendah yang memudahkan analisis data dalam domain dinamik kenderaan.

2. Seni bina

Sistem pengukuran terdiri daripada perkakasan berikut:

  • Raspberry Pi 3
  • BNO055
  • Pengimbas ELM327 OBD2
  • Penyongsang
  • Pantau
  • papan kekunci / tetikus(jika perlu)

Raspberry Pi bertindak sebagai komputer utama, mengumpul data daripada BNO055 dan ELM327. Raspberry Pi berkomunikasi dengan BNO055 melalui I2C dan dengan ELM327 melalui Bluetooth (lihat Rajah 1).

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
Rajah 1: Seni bina perkakasan

3. Maklumat sensor

3-1. Adafruits BNO055

Penderia BNO055 mampu mengukur pelbagai jenis gerakan dan data:★Gambaran Keseluruhan Adafruit BNO055[1]

  • Orientasi Mutlak (Euler Vector, 100Hz): Menyediakan data orientasi tiga paksi berdasarkan sfera 360°.

  • Orientasi Mutlak (Quaternion, 100Hz): Menawarkan output kuaternion empat mata untuk manipulasi data yang lebih tepat.

  • Vektor Halaju Sudut (100Hz): Mengukur kelajuan putaran dalam radian sesaat (rad/s) merentasi tiga paksi.

  • Vektor Pecutan (100Hz): Menangkap pecutan termasuk graviti dan gerakan linear dalam meter sesaat kuasa dua (m/s²) merentas tiga paksi.

  • Vektor Kekuatan Medan Magnetik (20Hz): Mengesan kekuatan medan magnet dalam mikroteslas (µT) merentas tiga paksi.

  • Vektor Pecutan Linear (100Hz): Merekod data pecutan linear (tidak termasuk graviti) dalam meter sesaat kuasa dua (m/s²) merentas tiga paksi.

  • Vektor Graviti (100Hz): Mengukur pecutan graviti (tidak termasuk sebarang pergerakan) dalam meter sesaat kuasa dua (m/s²) merentasi tiga paksi.

  • Suhu (1Hz): Menyediakan suhu ambien dalam darjah Celsius.

BNO055 padat dan disokong oleh perpustakaan Python Adafruit yang dipanggil Adafruit_CircuitPython. Selain itu, perpustakaan bahasa C tersedia daripada Adafruit.

★Adafruit_CircuitPython_BNO055[2]

★Adafruit_BNO055[3]

Untuk demonstrasi, anda boleh melihat video di sini:
Adafruit BNO055 Demo
★Demo Adafruit BNO055[4]

3-2. ELM327

ELM327 是一種廣泛使用的 OBD-II(車載診斷)適配器,可透過標準介面存取車輛引擎資料。它充當車輛 ECU(電子控制單元)和電腦或智慧型手機等外部設備之間的橋樑,能夠檢索診斷和性能數據。 ELM327 的主要特性與功能包括:★ELM327 資訊[5]

  • OBD-II 協議支援:ELM327 支援各種 OBD-II 協議,包括 ISO 9141、ISO 14230 (KWP2000)、ISO 15765 (CAN) 等,使其與廣泛的車輛相容。

  • 診斷故障碼(DTC):它可以讀取和清除車輛 ECU 中的診斷故障碼,幫助識別和排除問題。

  • 即時數據流:提供來自車輛感知器的即時數據,例如引擎轉速、車速、冷卻液溫度和燃油油位。

  • 凍結幀數據:在檢測到故障時捕獲並儲存數據,這有助於診斷間歇性問題。

  • 車輛資訊:檢索有關車輛的詳細信息,包括 VIN(車輛識別碼)、校準 ID 等。

  • 相容性:提供多種形式,包括藍牙、USB、Wi-Fi版本,可以相容於不同的裝置和平台。

通常,ELM327 用於診斷車輛狀態,特別是引擎狀態。然而,它也可以用來從車輛收集廣泛的數據。 ELM327 可在亞馬遜等平台購買,價格約 20 美元到 80 美元不等。

使用 python-OBD 庫,您可以建立查詢以透過 ELM327 適配器收集數據,從而實現可自訂和詳細的數據收集

4.坐標系資訊

由於該系統與多個感測器和實際車輛交互,因此它必須了解每個坐標系。

4-1. BNO055 資料記錄器座標系

下圖顯示了我的資料測量系統的 BNO055 座標系。我的系統也使用四元數來計算旋轉角度。感測器座標系需要與車輛座標系相符。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖2:BNO055座標系

4-2.車輛座標系(ISO 8855)

測量系統是基於車輛座標系運作。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖3:車輛座標系[6] 資料來源:MathWorks,自動駕駛工具箱中的座標系。

4-3. iPhone 座標系

我使用了 iPhone 應用程序,它可以測量多個物理數據點,包括加速度、陀螺儀等,用於驗證任務。使用iPhone應用程式的目的是透過該應用程式中的數據來確認我的數據測量系統的有效性。圖4引自蘋果開發者網站。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖4:iPhone座標系[7] 來源:Apple,取得原始陀螺儀事件。

5. 數據比較

在本節中,我將展示一些實驗結果。第一步,我進行了正弦波任務,以確認我的測量系統輸出的讀數是否沒有錯誤,例如單位錯誤。這部分很基礎,但對於確保以正確的方式收集數據至關重要。
接下來,我在實車上駕駛時測量了一些物理值。
然後,我檢查了ELM327的性能和每個感測器的採樣精度。

5-1.正弦波測試

我進行了正弦波測試,在本節中,我將展示結果。這些測試的目的是確認以下內容:

  • 測量系統的座標係是否正常運作
  • 驗證 iPhone 上測量應用程式的準確性

為了實現這一目標,我使用了「phyphox」★phyphox[8],這是一款可以測量各種物理值的智慧型手機應用程式。我將 BNO055 感應器安裝在 iPhone 上,如下所示 [圖 5]。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖 5:用於進行正弦波測試的感測器和 iPhone 安裝方法的圖像

圖6顯示了取樣頻率為10 Hz的正弦波測試結果。圖例「VDDM」代表我開發的測量系統收集的數據,而「iPhone」則表示使用「phyphox」收集的數據。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖 6:正弦波測試結果
  • 來自 VDDM 和 iPhone 的數據在所有軸上都顯示出相似的趨勢,表明這兩個應用程式正在捕捉相似的運動模式。

  • VDDM 資料似乎稍有變化,特別是在加速度圖中,這可能表明與 iPhone 相比具有更高的靈敏度或不同的過濾方法。

  • 兩個數據大致吻合,但幅度差異表明,根據精度要求,可能需要進一步校準或調整。

  • VDDM 可能會捕捉額外的雜訊或更高頻率的分量,尤其是在加速度圖中,而 iPhone 資料中不存在這些分量。這可能有利於詳細分析或需要進一步過濾。

5-2.車輛動力學數據評估

圖 7 顯示了針對實際車輛中的 BNO055 數據進行的實驗結果。一個名為「phyphox」的 iPhone 應用程式被用作參考。取樣頻率為50Hz。我觀察到,我在車輛上開發的數據記錄系統測量的加速度和旋轉訊號包含雜訊、異常值和 NaN 值。因此,我應用了後處理功能,包括 IQR 方法和巴特沃斯低通濾波器。每個圖表的圖例中的術語“fc”表示截止頻率。例如,fc_19.5表示截止頻率為19.5Hz。請注意,本實驗僅使用 BNO055 感測器進行,且 ELM327 功能已停用。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖7:實車測試結果
  • 來自 VDDM(原始數據)和 iPhone 的數據在所有軸上都顯示出相似的趨勢。這表明兩個系統正在捕捉相同的運動模式。

  • 與 iPhone 資料相比,VDDM 的資料似乎有更大的變化。這在加速圖中尤其明顯,可能表明 VDDM 以更高的靈敏度捕獲資料或使用不同的過濾方法。

  • 來自 VDDM 的過濾資料(fc 19.5Hz 和 10Hz)與原始資料相比變化較小,並且更接近 iPhone 資料。這表示 VDDM(原始資料)可能包含更多雜訊或高頻成分。

  • 來自 VDDM 的原始資料包含異常值和缺失值,透過應用過濾器可以緩解這些異常值和缺失值。這在滾動率和俯仰率圖表中尤其明顯。

  • 與 iPhone 相比,VDDM 似乎包含更多高頻成分和噪音。雖然這有利於詳細分析,但如果降噪至關重要,則可能需要額外的濾波或硬體級調整。

5-3.車輛速度評估

圖 8 顯示了有關引擎性能的數據。這些數據是使用 ELM327 掃描器以 4 Hz 的取樣頻率收集的。 BNO055 功能在資料收集期間已停用。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖8:車速與引擎數據
  • 來自 VDDM(原始數據)和 iPhone 的車速數據表現出相似的趨勢。
  • ELM327 可以收集有關引擎轉速和節氣門位置的資訊。這些數據對於了解一般油門踏板操作非常有用。請注意,測試車輛是HEV。

5-4.取樣頻率精度評估

在使用 BNO055 和 ELM327 進行實驗期間,我觀察到取樣延遲。為了找出問題,我透過調整取樣頻率以及啟用和停用 ELM327 和 BNO055 來實作測量。
首先,我給出與取樣頻率相對應的時間差。此測試涉及使用 BNO055 和 ELM327 感測器

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖9:取樣頻率對應的時間差

圖9顯示,當取樣頻率設定為10Hz時,取樣時間的精確度明顯下降。
ELM327 無法實現與 BNO055 一樣高的取樣頻率,這可能會對整體系統效能產生負面影響。儘管如此,為了更好地了解每個感測器的性能,我進行了單獨的測量。

圖10和圖11說明了取樣頻率和取樣定時精度之間的關係。採樣時序精度表示為:

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖10:BNO055_抽樣測試結果

根據圖 10,BNO055 可以回應高達 50 Hz 的取樣頻率。然而,超過 60 Hz 後,採樣計時精度急劇下降。

Building a Custom Data Logger with Raspberry Pi for Vehicles: Integrating BNO and ELM Sensors
圖11:BNO055抽樣測試結果

圖 11 顯示 ELM327 的反應度低於 BNO055。 ELM327 在最高 5 Hz 時仍能保持良好的取樣定時精度。這是因為 ELM327 透過 OBD 與車輛通信,而車輛的 ECU 可能不支援對 OBD 請求的高頻響應,從而導致通信頻率較低。

此外,ELM327 是可透過 OBD-II 從車輛中的多個 ECU 檢索資料的感測器中最便宜且品質最低的選項。
這篇文章★挑戰你汽車的行車電腦燃油效率數據[9]說

Elm327 需要考慮的一件重要事情是回應時間不容忽視:裝置可能需要0.1 到0.3 秒的時間來回應訊息請求,這意味著快速添加太多訊號最終會減少取樣率達到無用的程度

更多詳細建議,請造訪此頁面:★選擇OBDII適配器[10]

對於表現低下的原因,可以考慮以下原因:

  • 車輛ECU規格
  • 對 OBD 掃描器的要求太多
  • 從多個 ECU 請求多個訊號
  • 軟體問題
  • 網路問題
  • OBD-II 協定開銷
  • 掃描器硬體限制
  • 車輛通訊速度
  • 韌體和驅動程式問題
  • 環境因素

6. 核心邏輯

我一直在開發的測量軟體假設多個感測器需要的用例。此外,使用者可以更改 yaml 檔案上的測量設定。使用者可以在簡單的 GUI 上操作測量應用程式。您可以在這裡查看原始程式碼:★VDDM[11]

測量系統的架構如下:

VDDM/
│
├── fusion/
│   ├── __init__.py
│   ├── sensor_fusion.py   # Handles data fusion, collects data from sensors
│   └── sensors/
│       ├── __init__.py
│       ├── bno055_measurement.py   # Script for BNO055 sensor data collection
│       └── elm327_measurement.py   # Script for ELM327 OBD2 data collection
│
├── signalprocessing/
│   ├── __init__.py
│   └── filter.py           # Contains filtering algorithms (e.g., Butterworth filter)
│
├── config/
│   ├── config_manager.py    # Loads configuration from YAML
│   └── measurement_system_config.yaml   # Configuration file for sensors and system settings
│
├── utils/
│   ├── tools.py             # Utility functions (e.g., wait functions)
│   └── visualize_data.py    # Functions to format and display sensor data
│
├── measurement/
│   ├── __init__.py
│   └── measurement_control.py   # Controls the measurement process
│
├── gui/
│   ├── __init__.py
│   └── main_gui.py          # GUI setup for starting/stopping measurement
│
├── main.py                  # Entry point for starting the system
└── requirements.txt         # Project dependencies

下圖表示 VDDM 的流程。

main.py
  └── main_gui.py
        ├── Starts GUI interface and buttons
        └── measurement_control.py
              ├── Initializes sensor measurements
              ├── Controls start/stop of measurements
              └── sensor_fusion.py
                    ├── Manages sensor data collection and fusion
                    ├── Uses:
                    │    ├── bno055_measurement.py  (BNO055 sensor data collection)
                    │    └── elm327_measurement.py  (ELM327 OBD2 data collection)
                    └── Processes data with:
                         ├── filter.py  (Applies Butterworth low-pass filter)
                         └── visualize_data.py  (Formats and visualizes sensor data)

VDDM 的主要元件是 VDDM/fusion/sensor_fusion.py、VDDM/fusion/sensors/bno055_measurement.py 和 VDDM/fusion/sensors/elm327_measurement.py。

# sensor_fusion.py
import os
import sys
import importlib
from time import perf_counter
from collections import defaultdict
import pandas as pd
import datetime
import asyncio
import numpy as np



parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)

from config import config_manager
from utils.tools import wait_process
from utils.visualize_data import format_sensor_fusion_data
from signalprocessing.filter import butterlowpass

config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class SensorFactory:
    @staticmethod
    def create_sensor(sensor_type, config):
        try:
            # Import the appropriate sensor module based on sensor_type
            module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement")
            # Get a sensor class
            sensor_class = getattr(module, f"{sensor_type.upper()}")
            # Create a sensor instance and return
            return sensor_class(config)
        except (ImportError, AttributeError) as e:
            print(f"Error creating sensor {sensor_type}: {e}")
            return None


class Sensors:
    def __init__(self, config):
        self.config = config_manager.load_config(config_path)
        self.sensor_list = tuple(self.config.sensors.keys())
        self.sensor_instances = {}
        self.is_running = False


        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv"
        self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s]
        # Buffer size is determined by the relation of sequence length and sampling frequency
        # Buffer secures data for SEQUENCE_LENGTH[s]
        self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.is_filter = config.filter_params.is_filter
        self.is_show_real_time_data = config.is_show_real_time_data
        self.TIMEZONE = config.timezone
        self.all_data_columns_list = ()
        for sensor_name in self.sensor_list:
            self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"])            




        self.data_buffer = pd.DataFrame()  # data buffer

        for sensor_type in self.sensor_list:
            sensor_config = self.config.sensors[sensor_type]
            sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config)
            if sensor_instance:
                self.sensor_instances[sensor_type] = sensor_instance


        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization")                


    def get_sensor(self, sensor_type):
        """
        Retrieve the sensor instance corresponding to the specified sensor type.

        Args:
            sensor_type (str): The type of the sensor to retrieve.

        Returns:
            object: The sensor instance corresponding to the specified sensor type.
                    Returns None if the sensor type does not exist.
        """
        return self.sensor_instances.get(sensor_type)

    def collect_data(self):
        """
        Collect data from all sensors.

        This method iterates over all sensor instances and collects data from each sensor.
        The collected data is stored in a dictionary where the keys are sensor types and
        the values are the data collected from the corresponding sensors.

        Returns:
            dict: A dictionary containing the collected data from all sensors.
                The keys are sensor types and the values are the data from each sensor.

        Raises:
            Exception: If an error occurs while collecting data from any sensor, the exception
                    is caught and printed.
        """
        data = {}
        try:
            for sensor_type, sensor in self.sensor_instances.items():
                # get data from sensors
                data[sensor_type] = sensor.get_data_from_sensor()
            return data
        except Exception as e:
            print(e)



    def on_change_start_measurement(self):
        """
        Start the measurement process.

        This method sets the is_running flag to True, indicating that the measurement
        process should start.
        """
        self.is_running = True

    def on_change_stop_measurement(self):
        """
        Stop the measurement process.

        This method sets the is_running flag to False, indicating that the measurement
        process should stop.
        """
        self.is_running = False


    def filtering(self, df, labellist):
        """
        Apply a low-pass filter to the specified columns in the DataFrame.

        This method applies a Butterworth low-pass filter to each column specified
        in the labellist. The "Time" column should be excluded from the labellist
        as it is not needed for the computation.

        Args:
            df (pd.DataFrame): The input DataFrame containing the data to be filtered.
            labellist (list of str): A list of column names to be filtered. The "Time"
                                     column should not be included in this list.

        Returns:
            pd.DataFrame: A new DataFrame with the filtered data.
        """
        filtered_df = df.copy()
        for labelname in labellist:
            # Ensure the column is converted to a numpy array
            x = df[labelname].to_numpy()
            filtered_df[labelname] = butterlowpass(
                x=x,  # Correctly pass the numpy array as 'x'
                fpass=self.FPASS,
                fstop=self.FSTOP,
                gpass=self.GPASS,
                gstop=self.GSTOP,
                fs=self.SAMPLING_FREQUENCY_HZ,
                dt=self.SAMPLING_TIME,
                checkflag=False,
                labelname=labelname
            )
        return filtered_df

    def convert_dictdata(self, current_time, sensor_data_dict):
        """
        Convert nested dictionary data from multiple sensors into a single DataFrame.

        This method converts nested dictionary data obtained from multiple sensors
        into a single dictionary and then converts it into a pandas DataFrame. The
        current_time information is associated with the data.

        Args:
            current_time (float): The current time at which the data was obtained.
            sensor_data_dict (dict): A nested dictionary containing data from multiple sensors.

        Returns:
            pd.DataFrame: A DataFrame containing the converted data with the current time information.
        """
        converted_data = {'Time': current_time}
        for sensor, data in sensor_data_dict.items():
            converted_data.update(data)

        converted_data = pd.DataFrame([converted_data])

        return converted_data



    async def update_data_buffer(self, dict_data):
        """
        Add data from sensors to the buffer and save it if necessary.

        This method adds the provided sensor data to the internal buffer. If the buffer
        exceeds the specified maximum length, the oldest data is saved to a CSV file
        and removed from the buffer.

        Args:
            dict_data (dict): The data from sensors to be added to the buffer.
        """

        # Add data to the buffer
        self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True)

        # If the buffer exceeds the specified length, save the oldest data
        if len(self.data_buffer) > self.MAX_DATA_BUF_LEN:
            # Save the oldest data to a CSV file
            old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN)

            await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH)

            # Update the buffer
            self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN)        



    async def save_data_async(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method uses asyncio.to_thread to run the synchronous to_csv method
        in a separate thread, allowing it to be handled asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        if not os.path.isfile(path):
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w')
        else:
            await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a')

    async def save_data(self, df, path):
        """
        Save the DataFrame to a CSV file asynchronously.

        This method calls save_data_async to save the DataFrame to a CSV file
        asynchronously.

        Args:
            df (pd.DataFrame): The DataFrame to be saved.
            path (str): The file path where the DataFrame should be saved.
        """
        await self.save_data_async(df, path)



    async def finish_measurement_and_save_data(self):
        """
        Finish the measurement process and save the data.

        This method finalizes the measurement process by saving the buffered data
        to a CSV file. It also applies filtering if specified and saves the filtered
        data to a separate CSV file. The method handles time zone settings and
        generates a timestamp for the file names.

        The buffered data is saved to a temporary CSV file, which is then read back
        and saved to a final file path with a timestamp. If filtering is enabled,
        the filtered data is also saved. The temporary CSV file is deleted after
        the data is saved.

        Raises:
            Exception: If an error occurs during the file operations.
        """
        t_delta = datetime.timedelta(hours=9)
        TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone
        now = datetime.datetime.now(TIMEZONE)
        timestamp = now.strftime('%Y%m%d%H%M%S')
        final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], 
                                                   timestamp + "/" + timestamp + '_' + 
                                                   self.SAVE_BUF_CSVDATA_PATH.split('/')[-1])
        await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH)
        raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0)
        os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True)
        raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True)


        if self.is_filter:
            filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:])
            filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True)

        if os.path.exists(self.SAVE_BUF_CSVDATA_PATH):
            os.remove(self.SAVE_BUF_CSVDATA_PATH)
            print(f"File  '{self.SAVE_BUF_CSVDATA_PATH}' was deleted")
        else:
            print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed")






async def sensor_fusion_main():
    """
    Main function for sensor fusion.

    This function initializes the sensor fusion process, starts the measurement loop,
    collects data from multiple sensors, updates the data buffer, and handles real-time
    data display. It also calculates and prints the sampling delay and reliability rate
    upon termination.

    The main loop runs until the measurement process is stopped, either by an exception
    or a keyboard interrupt.

    Raises:
        Exception: If an error occurs during the measurement process, it is caught and printed.
        KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped
                           and the data is saved.

    """
    print("Start sensor fusion main")
    config = config_manager.load_config(config_path)
    sensors = Sensors(config["master"])
    print("Called an instance of Sensors class")

    # sensors.start_all_measurements()
    sampling_counter = 0
    current_time = 0
    #sensors.is_running = True
    sensors.on_change_start_measurement()

    try:
        main_loop_start_time = None
        while sensors.is_running:
            iteration_start_time = perf_counter() # Start time of each iteration

            if main_loop_start_time is None:
                    main_loop_start_time = iteration_start_time  # initialize main loop start time

            current_time = perf_counter() - main_loop_start_time # Current time

            data = sensors.collect_data() # Get data from sensors                                        
            sampling_counter += 1 # Num of sampling

            converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format
            # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file.
            await sensors.update_data_buffer(converted_data)
            # Display data in real time. This process is executed on additional thread.
            if sensors.is_show_real_time_data:
                formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list)    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait based on the sampling interval and execution time to maintain the sampling frequency.
            iteration_end_time = perf_counter()
            iteration_duration = iteration_end_time - iteration_start_time 
            print("Iteration duration is: {0} [s]".format(iteration_duration))
            sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration)
            if sleep_time > 0:
                wait_process(sleep_time)

    except Exception as e:
        print(e)

    except KeyboardInterrupt:
        sensors.on_change_stop_measurement()
        print("KeyboardInterrupt")
        await sensors.finish_measurement_and_save_data()

    finally:
        print("finish")
         # Compute delay of sampling
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))



        # Compute ideal sampliing time
        ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ)
        # Cpmpute a delay
        delay_time = current_time - ideal_time
        # reliability rate
        sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100


        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))


if __name__ == '__main__':
    asyncio.run(sensor_fusion_main())

sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.

# bno055_measurement.py
import time
import numpy as np
import adafruit_bno055
import board
import os
import sys

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')


class BNO055:
    def __init__(self, config):
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter

        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data 

        i2c_instance = board.I2C() # Create i2c instance
        self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance

    def calibration(self):
        print("Start calibration!")
        while not self.bno055_sensor.calibrated:
            print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status)))
            time.sleep(1)


    def calcEulerfromQuaternion(self, _w, _x, _y, _z):
        """
        Calculate Euler angles (roll, pitch, yaw) from quaternion components.

        This method converts quaternion components (_w, _x, _y, _z) into Euler angles
        (roll, pitch, yaw) in degrees. If any of the quaternion components are None,
        it returns (0.0, 0.0, 0.0) and prints an error message.

        Args:
            _w (float): The w component of the quaternion.
            _x (float): The x component of the quaternion.
            _y (float): The y component of the quaternion.
            _z (float): The z component of the quaternion.

        Returns:
            tuple: A tuple containing the roll, pitch, and yaw angles in degrees.
                If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message.
        """
        if None in (_w, _x, _y, _z):
            print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}")
            return 0.0, 0.0, 0.0

        try:
            sqw = _w ** 2
            sqx = _x ** 2
            sqy = _y ** 2
            sqz = _z ** 2
            COEF_EULER2DEG = 57.2957795131

            # Yaw
            term1 = 2.0 * (_x * _y + _z * _w)
            term2 = sqx - sqy - sqz + sqw
            yaw = np.arctan2(term1, term2)

            # Pitch
            term1 = -2.0 * (_x * _z - _y * _w)
            term2 = sqx + sqy + sqz + sqw
            pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0

            # Roll
            term1 = 2.0 * (_y * _z + _x * _w)
            term2 = -sqx - sqy + sqz + sqw
            roll = np.arctan2(term1, term2)

            return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw

        except Exception as e:
            print(f"Error in calcEulerfromQuaternion: {e}")
            return 0.0, 0.0, 0.0

    def get_data_from_sensor(self):
        """
        Retrieve data from the BNO055 sensor and return it as a dictionary.

        This method collects various sensor readings from the BNO055 sensor, including
        Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field,
        and calibration status. It then constructs a dictionary with these values and
        returns only the columns specified in self.COLUMNS.

        Returns:
            dict: A dictionary containing the sensor data. Only the columns specified
                in self.COLUMNS are included in the returned dictionary.
        """
        # Get data
        euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler]  # X: yaw, Y: pitch, Z: roll
        gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro]  # Gyro[rad/s]
        linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration]  # Linear acceleration[m/s^2]
        quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion]  # Quaternion
        quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4)  # Cal Euler angle from quaternion
        magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic]  # Magnetic field
        calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status]  # Status of calibration


        data_dict = {
            "linear_accel_x": linear_accel_x,
            "linear_accel_y": linear_accel_y,
            "linear_accel_z": linear_accel_z,
            "gyro_x": gyro_x,
            "gyro_y": gyro_y,
            "gyro_z": gyro_z,
            "euler_x": euler_x,
            "euler_y": euler_y,
            "euler_z": euler_z,
            "quat_roll": quat_roll,
            "quat_pitch": quat_pitch,
            "quat_yaw": quat_yaw,
            "quaternion_1": quaternion_1,
            "quaternion_2": quaternion_2,
            "quaternion_3": quaternion_3,
            "quaternion_4": quaternion_4,
            "magnetic_x": magnetic_x,
            "magnetic_y": magnetic_y,
            "magnetic_z": magnetic_z,
            "calibstat_sys": calibstat_sys,
            "calibstat_gyro": calibstat_gyro,
            "calibstat_accel": calibstat_accel,
            "calibstat_mag": calibstat_mag
        }

        return {column: data_dict[column] for column in self.COLUMNS if column in data_dict}


def format_sensor_data(data, labels):
    """
    Format sensor data into a string for display.

    This method takes a dictionary of sensor data and a list of labels, and formats
    the data into a string where each label is followed by its corresponding value.
    If a value is None, it is replaced with the string "None". Each label-value pair
    is separated by " / ".

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the BNO055 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")

    config = load_config(config_path)
    meas_bno055 = BNO055(config.sensors['bno055'])

    start_time = perf_counter()
    sampling_counter = 0

    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_bno055.get_data_from_sensor()
            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_bno055.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_bno055.COLUMNS)
                # current time    
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        # Calculate the sampling delay from the number of samples and the current time
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))




if __name__ == '__main__':
    test_main()

This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.

# elm327_measurement.py

import obd
import os
import time
from collections import deque
import numpy as np
import pandas as pd
import datetime
import asyncio
import scipy
from scipy import signal
import matplotlib as plt
import sys
from collections import defaultdict
import random

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(parent_dir)

from config.config_manager import load_config
config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml')

class ELM327:
    def __init__(self, config):
        """
        Initialize the ELM327 class with configuration parameters.

        Args:
            config (dict): Configuration parameters for the ELM327.
        """
        self.COLUMNS = config.data_columns    
        self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz
        self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ
        self.SAVE_DATA_DIR = config.save_data_dir
        self.SEQUENCE_LENGTH = config.sequence_length
        self.FPASS = config.filter_params.fpass
        self.FSTOP = config.filter_params.fstop
        self.GPASS = config.filter_params.gpass
        self.GSTOP = config.filter_params.gstop
        self.Isfilter = config.filter_params.is_filter
        self.res = self.connect_to_elm327()

        self.is_offline = config.is_offline
        self.IsStart = False
        self.IsStop = True
        self.Is_show_real_time_data = config.is_show_real_time_data

    def initialize_BLE(self):
        """
        Initialize Bluetooth Low Energy (BLE) for ELM327 connection.
        """
        os.system('sudo hcitool scan')
        os.system('sudo hciconfig hci0 up')
        os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3')
        os.system('sudo rfcomm listen 0 1 &')

    def connect_to_elm327(self):
        """
        Establish a connection to the ELM327 device.

        Returns:
            res (obd.OBDStatus): The connection status of the ELM327 device.
        """
        res = None
        try:
            self.initialize_BLE()
            self.connection = obd.OBD()
            print(self.connection.status())
            res = self.connection.status()
            if res == obd.OBDStatus.CAR_CONNECTED:
                print("----------Connection establishment is successful!----------")
                return res
            else:
                print("----------Connection establishment failed!----------")
                print("End program. Please check settings of the computer and ELM327")
        except Exception as e:
            print("----------Exception!----------")
            print(e)
        finally:
            return res

    def get_data_from_sensor(self):
        """
        Retrieve data from the sensor.

        Returns:
            dict: A dictionary containing sensor data.
        """
        if self.is_offline:
            data = self.get_data_from_sensor_stub()
        else:
            # Retrieve data and save it in dictionary format
            data = {column: self.get_obd2_value(column) for column in self.COLUMNS}
        return data

    def get_obd2_value_debug(self, column):
        """
        Retrieve OBD-II value for a specific column with debug information.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response:
                print(f"Response for command '{command}': {response}")
                if response.value is not None:  # Check for None
                    print(f"Response value for command '{command}': {response.value}")
                    return response.value.magnitude
                else:
                    print(f"No value in response for command '{command}'")
            else:
                print(f"No response for command '{command}'")
        else:
            print(f"No command found for column '{column}'")
        return None

    def get_obd2_value(self, column):
        """
        Retrieve OBD-II value for a specific column.

        Args:
            column (str): The OBD-II command column.

        Returns:
            float or None: The value of the OBD-II command, or None if not available.
        """
        command = getattr(obd.commands, column, None)
        if command:
            response = self.connection.query(command)
            if response.value is not None:  # Check for None
                return response.value.magnitude
        return None

    def get_data_from_sensor_stub(self):
        """
        Generate stub data for the sensor.

        Returns:
            dict: A dictionary containing stub sensor data.
        """
        data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS}        
        # Randomly insert None or 0.0
        if random.choice([True, False]):
            random_column = random.choice(self.COLUMNS)
            if random.choice([True, False]):
                data_stub[random_column] = None
            else:
                data_stub[random_column] = 0.0

        return data_stub

def format_data_for_display(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    for label, value in zip(labels, data.values()):
        if value is None:
            value = "None"
        else:
            value = f"{value:.4f}"
        formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def format_sensor_data(data, labels):
    """
    Format sensor data for display.

    Args:
        data (dict or list): The sensor data to format.
        labels (list of str): The list of labels to include in the formatted string.

    Returns:
        str: A formatted string containing the sensor data.
    """
    formatted_str = ""
    if isinstance(data, dict):
        for label in labels:
            value = data.get(label, None)
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    else:
        for label, value in zip(labels, data):
            if value is None:
                value = "None"
            else:
                value = f"{value:.4f}"
            formatted_str += f"{label}: {value} / "
    return formatted_str.rstrip(" / ")

def test_main():
    """
    Main function for testing sensor data collection and display.

    This function initializes the ELM327 sensor, starts a loop to collect data,
    formats the data for display, and prints it in real-time. It also calculates
    and prints the sampling delay and reliability rate upon termination.

    The main loop runs until interrupted by the user.

    Raises:
        KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated
                           and the final statistics are printed.
    """
    from utils.tools import wait_process
    from time import perf_counter
    import matplotlib.pyplot as plt

    print("Main start")
    config = load_config(config_path)
    meas_elm327 = ELM327(config.sensors['elm327'])
    # res = meas_elm327.connect_to_elm327()

    start_time = perf_counter()
    sampling_counter = 0
    try:
        main_loop_start_time = perf_counter()
        while True:
            iteration_start_time = perf_counter()

            # Data acquisition process
            data = meas_elm327.get_data_from_sensor()

            current_time = perf_counter() - start_time
            sampling_counter += 1

            if meas_elm327.Is_show_real_time_data:
                formatted_data = format_sensor_data(data, meas_elm327.COLUMNS)
                print("--------------------------------------------------------------------")
                print("Current Time is: {:.3f}".format(current_time))
                print(formatted_data)

            # Wait to meet the sampling frequency based on the sampling interval and execution time
            elapsed_time = perf_counter() - iteration_start_time
            sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time
            if sleep_time > 0:
                wait_process(sleep_time)

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        main_loop_end_time = perf_counter() - main_loop_start_time
        print("Program terminated")
        print("main loop is ended. current time is: {:.3f}".format(current_time))
        print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time))
        print("sampling num is: {}".format(sampling_counter))

        # Calculate the ideal sampling time
        ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ)
        # Calculate the delay
        delay_time = current_time - ideal_time
        # The reliability rate is the delay divided by the sampling time
        sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100

        print("sampling delay is: {:.3f} s".format(delay_time))
        print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate))

if __name__ == '__main__':
    test_main()

This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.

7. Conclusion

In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.

8. Reference

[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.

[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.

[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.

[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.

[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.

[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.

[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.

[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.

[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.

[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.

[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.

Atas ialah kandungan terperinci Membina Logger Data Tersuai dengan Raspberry Pi untuk Kenderaan: Mengintegrasikan Penderia BNO dan ELM. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn