ホームページ >バックエンド開発 >Python チュートリアル >Raspberry Pi を使用した車両用カスタム データ ロガーの構築: BNO センサーと ELM センサーの統合
私は自動車業界で働くソフトウェアエンジニアとして、自動運転車、データ測定技術、分析手法に強い関心を持っています。この投稿では、カスタム構築の測定システムについて説明し、そのプロセスを一から詳しく説明し、いくつかの実験結果を示します。私のデータロガーは、Raspberry Pi 3、BNO055 センサー、ELM327 OBD-II アダプターで構成されており、それぞれ計算、加速度と回転速度データの収集、実際の車両からのエンジン情報の取得を担当します。
1.背景
2.アーキテクチャ
3.センサー情報
4.座標系情報
5.データ比較
6.コアロジック
7.結論
8.参考
近年、IoT、人工知能、エッジ コンピューティングの台頭により、自動車分野を含むさまざまな業界に大きな進歩がもたらされました。現代の車両には多数のセンサーが装備されており、それらが連携して高度なシステムを構築し、強化された安全機能から自動運転機能まであらゆるものを可能にしています。
通常、車両の状態を監視するには、正確で信頼性の高い測定を実現するために、高度で高価なセンサーを使用する必要があります。ただし、車両のダイナミクスを一般的に理解するためにサーキット上のデータを取得するなど、特定の状況では、これらのハイエンド センサーが必ずしも必要ではない場合があります。これを認識し、私はより手頃な価格のコンポーネントを使用して車両の物理的状態を測定できる費用対効果の高いシステムを作成することにしました。
私は乗り心地よりも車両運動に関するデータの測定に重点を置くことにしました。これは、乗り心地を解析するには、より多くの高周波信号を適切な精度で捕捉する必要があり、測定時のサンプリング周波数を高く設定する必要があるためです。
具体的には、操舵角に対する横加速度の周波数応答特性において、車速にもよるが、一般に5Hz以下に共振周波数が現れる。一方、乗り心地解析では数十Hzの範囲にまで議論が及ぶことが多い
。したがって、測定システム開発の主な目標は、車両ダイナミクス領域でのデータ分析を容易にする低コストのデータロギングシステムを作成することです。
測定システムは次のハードウェアで構成されています:
Raspberry Pi はメインコンピューターとして機能し、BNO055 と ELM327 からデータを収集します。 Raspberry Pi は、I2C 経由で BNO055 と通信し、Bluetooth 経由で ELM327 と通信します (図 1 を参照)。
BNO055 センサーは、さまざまな種類のモーションとデータを測定できます:★Adafruit BNO055 概要[1]
絶対方位 (オイラーベクトル、100Hz): 360° 球面に基づいた 3 軸方位データを提供します。
絶対方向 (クォータニオン、100Hz): より正確なデータ操作のための 4 ポイント クォータニオン出力を提供します。
角速度ベクトル (100Hz): 3 つの軸にわたる回転速度をラジアン/秒 (rad/s) で測定します。
加速度ベクトル (100Hz): 3 つの軸にわたる重力と直線運動を含む加速度をメートル/秒の 2 乗 (m/s²) でキャプチャします。
磁場強度ベクトル (20Hz): 3 軸にわたる磁場強度をマイクロテスラ (μT) 単位で感知します。
線形加速度ベクトル (100Hz): 3 軸にわたる線形加速度データ (重力を除く) をメートル/秒 2 乗 (m/s²) で記録します。
重力ベクトル (100Hz): 3 軸にわたる重力加速度 (動きを除く) をメートル/秒 2 乗 (m/s²) で測定します。
温度 (1Hz): 周囲温度を摂氏で提供します。
BNO055 はコンパクトで、Adafruit_CircuitPython と呼ばれる Adafruit の Python ライブラリでサポートされています。さらに、Adafruit から C 言語ライブラリを入手できます。
★Adafruit_CircuitPython_BNO055[2]
★Adafruit_BNO055[3]
デモについては、ここでビデオをご覧ください:
Adafruit BNO055 デモ
★Adafruit BNO055 デモ[4]
ELM327 是一种广泛使用的 OBD-II(车载诊断)适配器,允许通过标准接口访问车辆发动机数据。它充当车辆 ECU(电子控制单元)和计算机或智能手机等外部设备之间的桥梁,能够检索诊断和性能数据。 ELM327 的主要特性和功能包括:★ELM327 信息[5]
OBD-II 协议支持:ELM327 支持各种 OBD-II 协议,包括 ISO 9141、ISO 14230 (KWP2000)、ISO 15765 (CAN) 等,使其与广泛的车辆兼容。
诊断故障代码(DTC):它可以读取和清除车辆 ECU 中的诊断故障代码,帮助识别和排除问题。
实时数据流:提供来自车辆传感器的实时数据,例如发动机转速、车速、冷却液温度和燃油油位。
冻结帧数据:在检测到故障时捕获并存储数据,这有助于诊断间歇性问题。
车辆信息:检索有关车辆的详细信息,包括 VIN(车辆识别码)、校准 ID 等。
兼容性:提供多种形式,包括蓝牙、USB、Wi-Fi版本,可以兼容不同的设备和平台。
通常,ELM327 用于诊断车辆状态,特别是发动机状态。然而,它也可以用来从车辆收集广泛的数据。 ELM327 可在亚马逊等平台购买,价格约为 20 美元到 80 美元不等。
使用 python-OBD 库,您可以创建查询以通过 ELM327 适配器收集数据,从而实现可定制和详细的数据收集
由于该系统与多个传感器和实际车辆交互,因此它必须了解每个坐标系。
下图显示了我的数据测量系统的 BNO055 坐标系。我的系统还使用四元数计算旋转角度。传感器坐标系需要与车辆坐标系相匹配。
测量系统基于车辆坐标系运行。
我使用了一款 iPhone 应用程序,它可以测量多个物理数据点,包括加速度、陀螺仪等,用于验证任务。使用iPhone应用程序的目的是通过该应用程序中的数据来确认我的数据测量系统的有效性。图4引自苹果开发者网站。
在本节中,我将展示一些实验结果。第一步,我进行了正弦波任务,以确认我的测量系统输出的读数是否没有错误,例如单位错误。这部分很基础,但对于确保以正确的方式收集数据至关重要。
接下来,我在实车上驾驶时测量了一些物理值。
然后,我检查了ELM327的性能和每个传感器的采样精度。
我进行了正弦波测试,在本节中,我将展示结果。这些测试的目的是确认以下内容:
为了实现这一目标,我使用了“phyphox”★phyphox[8],这是一款可以测量各种物理值的智能手机应用程序。我将 BNO055 传感器安装在 iPhone 上,如下所示 [图 5]。
图6显示了采样频率为10 Hz的正弦波测试结果。图例“VDDM”表示由我开发的测量系统收集的数据,而“iPhone”表示使用“phyphox”收集的数据。
来自 VDDM 和 iPhone 的数据在所有轴上都显示出相似的趋势,表明这两个应用程序正在捕获相似的运动模式。
VDDM 数据似乎稍有变化,特别是在加速度图中,这可能表明与 iPhone 相比具有更高的灵敏度或不同的过滤方法。
两个数据大体吻合,但幅度差异表明,根据精度要求,可能需要进一步校准或调整。
VDDM 可能会捕获额外的噪声或更高频率的分量,尤其是在加速度图中,而 iPhone 数据中不存在这些分量。这可能有利于详细分析或需要进一步过滤。
图 7 显示了针对实际车辆中的 BNO055 数据进行的实验结果。一个名为“phyphox”的 iPhone 应用程序被用作参考。采样频率为50Hz。我观察到,我在车辆上开发的数据记录系统测量的加速度和旋转信号包含噪声、异常值和 NaN 值。因此,我应用了后处理功能,包括 IQR 方法和巴特沃斯低通滤波器。每个图表的图例中的术语“fc”表示截止频率。例如,fc_19.5表示截止频率为19.5Hz。请注意,本实验仅使用 BNO055 传感器进行,并且 ELM327 功能被禁用。
来自 VDDM(原始数据)和 iPhone 的数据在所有轴上都显示出相似的趋势。这表明两个系统正在捕捉相同的运动模式。
与 iPhone 数据相比,VDDM 的数据似乎有更大的变化。这在加速图中尤其明显,可能表明 VDDM 以更高的灵敏度捕获数据或使用不同的过滤方法。
来自 VDDM 的过滤数据(fc 19.5Hz 和 10Hz)与原始数据相比变化较小,并且更接近 iPhone 数据。这表明 VDDM(原始数据)可能包含更多噪声或高频分量。
来自 VDDM 的原始数据包含异常值和缺失值,通过应用过滤器可以减轻这些异常值和缺失值。这在滚动率和俯仰率图表中尤其明显。
与 iPhone 相比,VDDM 似乎包含更多高频成分和噪声。虽然这有利于详细分析,但如果降噪至关重要,则可能需要额外的滤波或硬件级调整。
图 8 显示了有关发动机性能的数据。这些数据是使用 ELM327 扫描仪以 4 Hz 的采样频率收集的。 BNO055 功能在数据收集期间被禁用。
在使用 BNO055 和 ELM327 进行实验期间,我观察到采样延迟。为了找出问题,我通过调整采样频率以及激活和停用 ELM327 和 BNO055 来实施测量。
首先,我给出与采样频率相对应的时间差。此测试涉及使用 BNO055 和 ELM327 传感器
图9显示,当采样频率设置为10Hz时,采样时间的精度明显下降。
ELM327 无法实现与 BNO055 一样高的采样频率,这可能会对整体系统性能产生负面影响。尽管如此,为了更好地了解每个传感器的性能,我进行了单独的测量。
图10和图11说明了采样频率和采样定时精度之间的关系。采样时序精度表示为:
根据图 10,BNO055 可以响应高达 50 Hz 的采样频率。然而,超过 60 Hz 后,采样计时精度急剧下降。
图 11 显示 ELM327 的响应度低于 BNO055。 ELM327 在最高 5 Hz 时仍能保持良好的采样定时精度。这是因为 ELM327 通过 OBD 与车辆通信,而车辆的 ECU 可能不支持对 OBD 请求的高频响应,从而导致通信频率较低。
此外,ELM327 是可通过 OBD-II 从车辆中的多个 ECU 检索数据的传感器中最便宜且质量最低的选项。
这篇文章★挑战你汽车的行车电脑燃油效率数据[9]说
Elm327 需要考虑的一件重要事情是响应时间不容忽视:设备可能需要 0.1 到 0.3 秒的时间来响应消息请求,这意味着快速添加太多信号最终会减少采样率达到无用的程度
更多详细建议,请访问此页面:★选择OBDII适配器[10]
对于性能低下的原因,可以考虑以下原因:
我一直在开发的测量软件假设多个传感器需要的用例。此外,用户可以更改 yaml 文件上的测量设置。用户可以在简单的 GUI 上操作测量应用程序。您可以在这里查看源代码:★VDDM[11]
测量系统的结构如下:
VDDM/ │ ├── fusion/ │ ├── __init__.py │ ├── sensor_fusion.py # Handles data fusion, collects data from sensors │ └── sensors/ │ ├── __init__.py │ ├── bno055_measurement.py # Script for BNO055 sensor data collection │ └── elm327_measurement.py # Script for ELM327 OBD2 data collection │ ├── signalprocessing/ │ ├── __init__.py │ └── filter.py # Contains filtering algorithms (e.g., Butterworth filter) │ ├── config/ │ ├── config_manager.py # Loads configuration from YAML │ └── measurement_system_config.yaml # Configuration file for sensors and system settings │ ├── utils/ │ ├── tools.py # Utility functions (e.g., wait functions) │ └── visualize_data.py # Functions to format and display sensor data │ ├── measurement/ │ ├── __init__.py │ └── measurement_control.py # Controls the measurement process │ ├── gui/ │ ├── __init__.py │ └── main_gui.py # GUI setup for starting/stopping measurement │ ├── main.py # Entry point for starting the system └── requirements.txt # Project dependencies
下图表示 VDDM 的流程。
main.py └── main_gui.py ├── Starts GUI interface and buttons └── measurement_control.py ├── Initializes sensor measurements ├── Controls start/stop of measurements └── sensor_fusion.py ├── Manages sensor data collection and fusion ├── Uses: │ ├── bno055_measurement.py (BNO055 sensor data collection) │ └── elm327_measurement.py (ELM327 OBD2 data collection) └── Processes data with: ├── filter.py (Applies Butterworth low-pass filter) └── visualize_data.py (Formats and visualizes sensor data)
VDDM 的主要组件是 VDDM/fusion/sensor_fusion.py、VDDM/fusion/sensors/bno055_measurement.py 和 VDDM/fusion/sensors/elm327_measurement.py。
# sensor_fusion.py import os import sys import importlib from time import perf_counter from collections import defaultdict import pandas as pd import datetime import asyncio import numpy as np parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) from config import config_manager from utils.tools import wait_process from utils.visualize_data import format_sensor_fusion_data from signalprocessing.filter import butterlowpass config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class SensorFactory: @staticmethod def create_sensor(sensor_type, config): try: # Import the appropriate sensor module based on sensor_type module = importlib.import_module(f"fusion.sensors.{sensor_type}_measurement") # Get a sensor class sensor_class = getattr(module, f"{sensor_type.upper()}") # Create a sensor instance and return return sensor_class(config) except (ImportError, AttributeError) as e: print(f"Error creating sensor {sensor_type}: {e}") return None class Sensors: def __init__(self, config): self.config = config_manager.load_config(config_path) self.sensor_list = tuple(self.config.sensors.keys()) self.sensor_instances = {} self.is_running = False self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SAVE_BUF_CSVDATA_PATH = self.SAVE_DATA_DIR + "/" + "measurement_raw_data.csv" self.SEQUENCE_LENGTH = config.sequence_length # Windows size [s] # Buffer size is determined by the relation of sequence length and sampling frequency # Buffer secures data for SEQUENCE_LENGTH[s] self.MAX_DATA_BUF_LEN = self.SEQUENCE_LENGTH * self.SAMPLING_FREQUENCY_HZ self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.is_filter = config.filter_params.is_filter self.is_show_real_time_data = config.is_show_real_time_data self.TIMEZONE = config.timezone self.all_data_columns_list = () for sensor_name in self.sensor_list: self.all_data_columns_list += tuple(self.config["sensors"][sensor_name]["data_columns"]) self.data_buffer = pd.DataFrame() # data buffer for sensor_type in self.sensor_list: sensor_config = self.config.sensors[sensor_type] sensor_instance = SensorFactory.create_sensor(sensor_type, sensor_config) if sensor_instance: self.sensor_instances[sensor_type] = sensor_instance if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted for initialization") def get_sensor(self, sensor_type): """ Retrieve the sensor instance corresponding to the specified sensor type. Args: sensor_type (str): The type of the sensor to retrieve. Returns: object: The sensor instance corresponding to the specified sensor type. Returns None if the sensor type does not exist. """ return self.sensor_instances.get(sensor_type) def collect_data(self): """ Collect data from all sensors. This method iterates over all sensor instances and collects data from each sensor. The collected data is stored in a dictionary where the keys are sensor types and the values are the data collected from the corresponding sensors. Returns: dict: A dictionary containing the collected data from all sensors. The keys are sensor types and the values are the data from each sensor. Raises: Exception: If an error occurs while collecting data from any sensor, the exception is caught and printed. """ data = {} try: for sensor_type, sensor in self.sensor_instances.items(): # get data from sensors data[sensor_type] = sensor.get_data_from_sensor() return data except Exception as e: print(e) def on_change_start_measurement(self): """ Start the measurement process. This method sets the is_running flag to True, indicating that the measurement process should start. """ self.is_running = True def on_change_stop_measurement(self): """ Stop the measurement process. This method sets the is_running flag to False, indicating that the measurement process should stop. """ self.is_running = False def filtering(self, df, labellist): """ Apply a low-pass filter to the specified columns in the DataFrame. This method applies a Butterworth low-pass filter to each column specified in the labellist. The "Time" column should be excluded from the labellist as it is not needed for the computation. Args: df (pd.DataFrame): The input DataFrame containing the data to be filtered. labellist (list of str): A list of column names to be filtered. The "Time" column should not be included in this list. Returns: pd.DataFrame: A new DataFrame with the filtered data. """ filtered_df = df.copy() for labelname in labellist: # Ensure the column is converted to a numpy array x = df[labelname].to_numpy() filtered_df[labelname] = butterlowpass( x=x, # Correctly pass the numpy array as 'x' fpass=self.FPASS, fstop=self.FSTOP, gpass=self.GPASS, gstop=self.GSTOP, fs=self.SAMPLING_FREQUENCY_HZ, dt=self.SAMPLING_TIME, checkflag=False, labelname=labelname ) return filtered_df def convert_dictdata(self, current_time, sensor_data_dict): """ Convert nested dictionary data from multiple sensors into a single DataFrame. This method converts nested dictionary data obtained from multiple sensors into a single dictionary and then converts it into a pandas DataFrame. The current_time information is associated with the data. Args: current_time (float): The current time at which the data was obtained. sensor_data_dict (dict): A nested dictionary containing data from multiple sensors. Returns: pd.DataFrame: A DataFrame containing the converted data with the current time information. """ converted_data = {'Time': current_time} for sensor, data in sensor_data_dict.items(): converted_data.update(data) converted_data = pd.DataFrame([converted_data]) return converted_data async def update_data_buffer(self, dict_data): """ Add data from sensors to the buffer and save it if necessary. This method adds the provided sensor data to the internal buffer. If the buffer exceeds the specified maximum length, the oldest data is saved to a CSV file and removed from the buffer. Args: dict_data (dict): The data from sensors to be added to the buffer. """ # Add data to the buffer self.data_buffer = pd.concat([self.data_buffer, dict_data], ignore_index=True) # If the buffer exceeds the specified length, save the oldest data if len(self.data_buffer) > self.MAX_DATA_BUF_LEN: # Save the oldest data to a CSV file old_data = self.data_buffer.head(self.MAX_DATA_BUF_LEN) await self.save_data(old_data, self.SAVE_BUF_CSVDATA_PATH) # Update the buffer self.data_buffer = self.data_buffer.tail(len(self.data_buffer) - self.MAX_DATA_BUF_LEN) async def save_data_async(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method uses asyncio.to_thread to run the synchronous to_csv method in a separate thread, allowing it to be handled asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ if not os.path.isfile(path): await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=True, mode='w') else: await asyncio.to_thread(df.to_csv, path, sep=',', encoding='utf-8', index=False, header=False, mode='a') async def save_data(self, df, path): """ Save the DataFrame to a CSV file asynchronously. This method calls save_data_async to save the DataFrame to a CSV file asynchronously. Args: df (pd.DataFrame): The DataFrame to be saved. path (str): The file path where the DataFrame should be saved. """ await self.save_data_async(df, path) async def finish_measurement_and_save_data(self): """ Finish the measurement process and save the data. This method finalizes the measurement process by saving the buffered data to a CSV file. It also applies filtering if specified and saves the filtered data to a separate CSV file. The method handles time zone settings and generates a timestamp for the file names. The buffered data is saved to a temporary CSV file, which is then read back and saved to a final file path with a timestamp. If filtering is enabled, the filtered data is also saved. The temporary CSV file is deleted after the data is saved. Raises: Exception: If an error occurs during the file operations. """ t_delta = datetime.timedelta(hours=9) TIMEZONE = datetime.timezone(t_delta, self.TIMEZONE)# You have to set your timezone now = datetime.datetime.now(TIMEZONE) timestamp = now.strftime('%Y%m%d%H%M%S') final_file_path = self.SAVE_BUF_CSVDATA_PATH.replace(self.SAVE_BUF_CSVDATA_PATH.split('/')[-1], timestamp + "/" + timestamp + '_' + self.SAVE_BUF_CSVDATA_PATH.split('/')[-1]) await self.save_data_async(self.data_buffer, self.SAVE_BUF_CSVDATA_PATH) raw_df = pd.read_csv(self.SAVE_BUF_CSVDATA_PATH, header=0) os.makedirs(self.SAVE_DATA_DIR + "/" + timestamp, exist_ok=True) raw_df.to_csv(final_file_path, sep=',', encoding='utf-8', index=False, header=True) if self.is_filter: filt_df = self.filtering(df=raw_df, labellist=raw_df.columns[1:]) filt_df.to_csv(final_file_path.replace('_raw_data.csv', '_filt_data.csv'), sep=',', encoding='utf-8', index=False, header=True) if os.path.exists(self.SAVE_BUF_CSVDATA_PATH): os.remove(self.SAVE_BUF_CSVDATA_PATH) print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' was deleted") else: print(f"File '{self.SAVE_BUF_CSVDATA_PATH}' is not existed") async def sensor_fusion_main(): """ Main function for sensor fusion. This function initializes the sensor fusion process, starts the measurement loop, collects data from multiple sensors, updates the data buffer, and handles real-time data display. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until the measurement process is stopped, either by an exception or a keyboard interrupt. Raises: Exception: If an error occurs during the measurement process, it is caught and printed. KeyboardInterrupt: If a keyboard interrupt occurs, the measurement process is stopped and the data is saved. """ print("Start sensor fusion main") config = config_manager.load_config(config_path) sensors = Sensors(config["master"]) print("Called an instance of Sensors class") # sensors.start_all_measurements() sampling_counter = 0 current_time = 0 #sensors.is_running = True sensors.on_change_start_measurement() try: main_loop_start_time = None while sensors.is_running: iteration_start_time = perf_counter() # Start time of each iteration if main_loop_start_time is None: main_loop_start_time = iteration_start_time # initialize main loop start time current_time = perf_counter() - main_loop_start_time # Current time data = sensors.collect_data() # Get data from sensors sampling_counter += 1 # Num of sampling converted_data = sensors.convert_dictdata(current_time, data) # Convert data to dataframe format # Update the data buffer. If it reaches the buffer limit, write the data to a CSV file. await sensors.update_data_buffer(converted_data) # Display data in real time. This process is executed on additional thread. if sensors.is_show_real_time_data: formatted_data = format_sensor_fusion_data(data, sensors.all_data_columns_list) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait based on the sampling interval and execution time to maintain the sampling frequency. iteration_end_time = perf_counter() iteration_duration = iteration_end_time - iteration_start_time print("Iteration duration is: {0} [s]".format(iteration_duration)) sleep_time = max(0, sensors.SAMPLING_TIME - iteration_duration) if sleep_time > 0: wait_process(sleep_time) except Exception as e: print(e) except KeyboardInterrupt: sensors.on_change_stop_measurement() print("KeyboardInterrupt") await sensors.finish_measurement_and_save_data() finally: print("finish") # Compute delay of sampling main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Compute ideal sampliing time ideal_time = ((sampling_counter - 1) / sensors.SAMPLING_FREQUENCY_HZ) # Cpmpute a delay delay_time = current_time - ideal_time # reliability rate sampling_reliability_rate = (delay_time / (sampling_counter / sensors.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': asyncio.run(sensor_fusion_main())
sensor_fusion.py manages sensor data collection and processing using asynchronous operations. It dynamically creates sensor instances, collects data, applies filters, and saves the data to CSV files. The script also displays real-time data if needed and monitors performance metrics like sampling delay and reliability. The main function runs a loop that handles data collection, buffer updates, and performance reporting. It uses asynchronous operations to efficiently manage data saving and ensure smooth performance.
# bno055_measurement.py import time import numpy as np import adafruit_bno055 import board import os import sys parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class BNO055: def __init__(self, config): self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data i2c_instance = board.I2C() # Create i2c instance self.bno055_sensor = adafruit_bno055.BNO055_I2C(i2c_instance) # create BNO055_I2C instance def calibration(self): print("Start calibration!") while not self.bno055_sensor.calibrated: print('SYS: {0}, Gyro: {1}, Accel: {2}, Mag: {3}'.format(*(self.bno055_sensor.calibration_status))) time.sleep(1) def calcEulerfromQuaternion(self, _w, _x, _y, _z): """ Calculate Euler angles (roll, pitch, yaw) from quaternion components. This method converts quaternion components (_w, _x, _y, _z) into Euler angles (roll, pitch, yaw) in degrees. If any of the quaternion components are None, it returns (0.0, 0.0, 0.0) and prints an error message. Args: _w (float): The w component of the quaternion. _x (float): The x component of the quaternion. _y (float): The y component of the quaternion. _z (float): The z component of the quaternion. Returns: tuple: A tuple containing the roll, pitch, and yaw angles in degrees. If an error occurs, it returns (0.0, 0.0, 0.0) and prints an error message. """ if None in (_w, _x, _y, _z): print(f"Error: One or more quaternion values are None: {_w}, {_x}, {_y}, {_z}") return 0.0, 0.0, 0.0 try: sqw = _w ** 2 sqx = _x ** 2 sqy = _y ** 2 sqz = _z ** 2 COEF_EULER2DEG = 57.2957795131 # Yaw term1 = 2.0 * (_x * _y + _z * _w) term2 = sqx - sqy - sqz + sqw yaw = np.arctan2(term1, term2) # Pitch term1 = -2.0 * (_x * _z - _y * _w) term2 = sqx + sqy + sqz + sqw pitch = np.arcsin(term1 / term2) if -1 <= term1 / term2 <= 1 else 0.0 # Roll term1 = 2.0 * (_y * _z + _x * _w) term2 = -sqx - sqy + sqz + sqw roll = np.arctan2(term1, term2) return COEF_EULER2DEG * roll, COEF_EULER2DEG * pitch, COEF_EULER2DEG * yaw except Exception as e: print(f"Error in calcEulerfromQuaternion: {e}") return 0.0, 0.0, 0.0 def get_data_from_sensor(self): """ Retrieve data from the BNO055 sensor and return it as a dictionary. This method collects various sensor readings from the BNO055 sensor, including Euler angles, gyroscope data, linear acceleration, quaternion, magnetic field, and calibration status. It then constructs a dictionary with these values and returns only the columns specified in self.COLUMNS. Returns: dict: A dictionary containing the sensor data. Only the columns specified in self.COLUMNS are included in the returned dictionary. """ # Get data euler_z, euler_y, euler_x = [val for val in self.bno055_sensor.euler] # X: yaw, Y: pitch, Z: roll gyro_x, gyro_y, gyro_z = [val for val in self.bno055_sensor.gyro] # Gyro[rad/s] linear_accel_x, linear_accel_y, linear_accel_z = [val for val in self.bno055_sensor.linear_acceleration] # Linear acceleration[m/s^2] quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in self.bno055_sensor.quaternion] # Quaternion quat_roll, quat_pitch, quat_yaw = self.calcEulerfromQuaternion(quaternion_1, quaternion_2, quaternion_3, quaternion_4) # Cal Euler angle from quaternion magnetic_x, magnetic_y, magnetic_z = [val for val in self.bno055_sensor.magnetic] # Magnetic field calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in self.bno055_sensor.calibration_status] # Status of calibration data_dict = { "linear_accel_x": linear_accel_x, "linear_accel_y": linear_accel_y, "linear_accel_z": linear_accel_z, "gyro_x": gyro_x, "gyro_y": gyro_y, "gyro_z": gyro_z, "euler_x": euler_x, "euler_y": euler_y, "euler_z": euler_z, "quat_roll": quat_roll, "quat_pitch": quat_pitch, "quat_yaw": quat_yaw, "quaternion_1": quaternion_1, "quaternion_2": quaternion_2, "quaternion_3": quaternion_3, "quaternion_4": quaternion_4, "magnetic_x": magnetic_x, "magnetic_y": magnetic_y, "magnetic_z": magnetic_z, "calibstat_sys": calibstat_sys, "calibstat_gyro": calibstat_gyro, "calibstat_accel": calibstat_accel, "calibstat_mag": calibstat_mag } return {column: data_dict[column] for column in self.COLUMNS if column in data_dict} def format_sensor_data(data, labels): """ Format sensor data into a string for display. This method takes a dictionary of sensor data and a list of labels, and formats the data into a string where each label is followed by its corresponding value. If a value is None, it is replaced with the string "None". Each label-value pair is separated by " / ". Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the BNO055 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_bno055 = BNO055(config.sensors['bno055']) start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_bno055.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_bno055.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_bno055.COLUMNS) # current time print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_bno055.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: # Calculate the sampling delay from the number of samples and the current time main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Since it is 0-based, the number of samples is current_time + 1 # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_bno055.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_bno055.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script interfaces with the BNO055 sensor to collect and process data. It retrieves sensor readings, such as Euler angles and gyroscope data, formats them for display, and prints real-time updates if configured. The script includes a calibration method and calculates Euler angles from quaternion data. It manages data acquisition with a defined sampling frequency and calculates sampling delay and reliability. The main loop continues until interrupted, reporting performance metrics upon termination.
# elm327_measurement.py import obd import os import time from collections import deque import numpy as np import pandas as pd import datetime import asyncio import scipy from scipy import signal import matplotlib as plt import sys from collections import defaultdict import random parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(parent_dir) from config.config_manager import load_config config_path = os.path.join(parent_dir, 'config', 'measurement_system_config.yaml') class ELM327: def __init__(self, config): """ Initialize the ELM327 class with configuration parameters. Args: config (dict): Configuration parameters for the ELM327. """ self.COLUMNS = config.data_columns self.SAMPLING_FREQUENCY_HZ = config.sampling_frequency_hz self.SAMPLING_TIME = 1 / self.SAMPLING_FREQUENCY_HZ self.SAVE_DATA_DIR = config.save_data_dir self.SEQUENCE_LENGTH = config.sequence_length self.FPASS = config.filter_params.fpass self.FSTOP = config.filter_params.fstop self.GPASS = config.filter_params.gpass self.GSTOP = config.filter_params.gstop self.Isfilter = config.filter_params.is_filter self.res = self.connect_to_elm327() self.is_offline = config.is_offline self.IsStart = False self.IsStop = True self.Is_show_real_time_data = config.is_show_real_time_data def initialize_BLE(self): """ Initialize Bluetooth Low Energy (BLE) for ELM327 connection. """ os.system('sudo hcitool scan') os.system('sudo hciconfig hci0 up') os.system('sudo rfcomm bind 0 8A:2A:D4:FF:38:F3') os.system('sudo rfcomm listen 0 1 &') def connect_to_elm327(self): """ Establish a connection to the ELM327 device. Returns: res (obd.OBDStatus): The connection status of the ELM327 device. """ res = None try: self.initialize_BLE() self.connection = obd.OBD() print(self.connection.status()) res = self.connection.status() if res == obd.OBDStatus.CAR_CONNECTED: print("----------Connection establishment is successful!----------") return res else: print("----------Connection establishment failed!----------") print("End program. Please check settings of the computer and ELM327") except Exception as e: print("----------Exception!----------") print(e) finally: return res def get_data_from_sensor(self): """ Retrieve data from the sensor. Returns: dict: A dictionary containing sensor data. """ if self.is_offline: data = self.get_data_from_sensor_stub() else: # Retrieve data and save it in dictionary format data = {column: self.get_obd2_value(column) for column in self.COLUMNS} return data def get_obd2_value_debug(self, column): """ Retrieve OBD-II value for a specific column with debug information. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response: print(f"Response for command '{command}': {response}") if response.value is not None: # Check for None print(f"Response value for command '{command}': {response.value}") return response.value.magnitude else: print(f"No value in response for command '{command}'") else: print(f"No response for command '{command}'") else: print(f"No command found for column '{column}'") return None def get_obd2_value(self, column): """ Retrieve OBD-II value for a specific column. Args: column (str): The OBD-II command column. Returns: float or None: The value of the OBD-II command, or None if not available. """ command = getattr(obd.commands, column, None) if command: response = self.connection.query(command) if response.value is not None: # Check for None return response.value.magnitude return None def get_data_from_sensor_stub(self): """ Generate stub data for the sensor. Returns: dict: A dictionary containing stub sensor data. """ data_stub = {column: np.abs(np.random.randn()).astype(np.float32).item() for column in self.COLUMNS} # Randomly insert None or 0.0 if random.choice([True, False]): random_column = random.choice(self.COLUMNS) if random.choice([True, False]): data_stub[random_column] = None else: data_stub[random_column] = 0.0 return data_stub def format_data_for_display(data, labels): """ Format sensor data for display. Args: data (dict): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" for label, value in zip(labels, data.values()): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def format_sensor_data(data, labels): """ Format sensor data for display. Args: data (dict or list): The sensor data to format. labels (list of str): The list of labels to include in the formatted string. Returns: str: A formatted string containing the sensor data. """ formatted_str = "" if isinstance(data, dict): for label in labels: value = data.get(label, None) if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " else: for label, value in zip(labels, data): if value is None: value = "None" else: value = f"{value:.4f}" formatted_str += f"{label}: {value} / " return formatted_str.rstrip(" / ") def test_main(): """ Main function for testing sensor data collection and display. This function initializes the ELM327 sensor, starts a loop to collect data, formats the data for display, and prints it in real-time. It also calculates and prints the sampling delay and reliability rate upon termination. The main loop runs until interrupted by the user. Raises: KeyboardInterrupt: If a keyboard interrupt occurs, the loop is terminated and the final statistics are printed. """ from utils.tools import wait_process from time import perf_counter import matplotlib.pyplot as plt print("Main start") config = load_config(config_path) meas_elm327 = ELM327(config.sensors['elm327']) # res = meas_elm327.connect_to_elm327() start_time = perf_counter() sampling_counter = 0 try: main_loop_start_time = perf_counter() while True: iteration_start_time = perf_counter() # Data acquisition process data = meas_elm327.get_data_from_sensor() current_time = perf_counter() - start_time sampling_counter += 1 if meas_elm327.Is_show_real_time_data: formatted_data = format_sensor_data(data, meas_elm327.COLUMNS) print("--------------------------------------------------------------------") print("Current Time is: {:.3f}".format(current_time)) print(formatted_data) # Wait to meet the sampling frequency based on the sampling interval and execution time elapsed_time = perf_counter() - iteration_start_time sleep_time = meas_elm327.SAMPLING_TIME - elapsed_time if sleep_time > 0: wait_process(sleep_time) except KeyboardInterrupt: print("Interrupted by user") finally: main_loop_end_time = perf_counter() - main_loop_start_time print("Program terminated") print("main loop is ended. current time is: {:.3f}".format(current_time)) print("main loop is ended. end time is: {:.3f}".format(main_loop_end_time)) print("sampling num is: {}".format(sampling_counter)) # Calculate the ideal sampling time ideal_time = ((sampling_counter - 1) / meas_elm327.SAMPLING_FREQUENCY_HZ) # Calculate the delay delay_time = current_time - ideal_time # The reliability rate is the delay divided by the sampling time sampling_reliability_rate = (delay_time / (sampling_counter / meas_elm327.SAMPLING_FREQUENCY_HZ)) * 100 print("sampling delay is: {:.3f} s".format(delay_time)) print("sampling delay rate is: {:.3f} %".format(sampling_reliability_rate)) if __name__ == '__main__': test_main()
This script defines an ELM327 class for interfacing with an ELM327 device to collect sensor data via OBD-II. It initializes the connection, retrieves data, and formats it for display. The test_main function sets up the ELM327 sensor, collects data in a loop, and prints it in real-time. It also calculates and displays sampling delay and reliability rates upon interruption. The script handles both real-time data acquisition and simulated offline data for testing purposes.
In this post, I described the project focused on developing a measurement system to capture vehicle behavior. While the project is still ongoing, I have recognized the potential for capturing vehicle data with a focus on the vehicle dynamics domain. However, when it comes to measuring signals from vehicle ECUs, some limitations need to be considered, as the ELM327 may not reliably achieve sampling rates beyond 5 Hz with good accuracy. This issue could likely be resolved by using a more advanced OBD scanner, such as the OBDLink MX+.
[1] Adafruit. "Adafruit BNO055 Absolute Orientation Sensor."
https://learn.adafruit.com/adafruit-bno055-absolute-orientation-sensor/overview. Accessed September 3, 2024.
[2] Adafruit. "Adafruit_CircuitPython_BNO055." https://github.com/adafruit/Adafruit_CircuitPython_BNO055. Accessed September 3, 2024.
[3] Adafruit. "Adafruit_BNO055." https://github.com/adafruit/Adafruit_BNO055. Accessed September 3, 2024.
[4] Adafruit. "Adafruit BNO055 Demo." https://cdn-shop.adafruit.com/product-videos/1024x768/2472-04.mp4 Accessed September 3, 2024.
[5] Amazon. "Elm327 Launchh OBD2 Professional Bluetooth Scan Tool and Code Reader for Android and PC, Interface OBDII OBD2 Car Auto Diagnostic Scanner, Not Support J1850 VPW & J1850 PWM". https://a.co/d/5BFn4GN. Accessed September 3, 2024.
[6] MathWorks. "Coordinate Systems in Automated Driving Toolbox." https://www.mathworks.com/help/driving/ug/coordinate-systems.html. Accessed September 3, 2024.
[7] Apple. "Getting raw gyroscope events." https://developer.apple.com/documentation/coremotion/getting_raw_gyroscope_events. Accessed September 3, 2024.
[8] phyphox. "phyphox top page."https://phyphox.org/. Accessed September 3, 2024.
[9] Andrea Patrucco, Vehicle dynamics engineer presso Applus+ IDIADA. "Challenging your car's trip computer fuel efficiency figures." https://www.linkedin.com/pulse/challenging-your-cars-trip-computer-fuel-efficiency-figures-patrucco/. Accessed September 3, 2024.
[10] "Choosing OBDII adapter". https://www.carscanner.info/choosing-obdii-adapter/. Accessed September 3, 2024.
[11] "VDDM". https://github.com/Qooniee/VDDM/tree/master/. Accessed September 3, 2024.
以上がRaspberry Pi を使用した車両用カスタム データ ロガーの構築: BNO センサーと ELM センサーの統合の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。