>  기사  >  백엔드 개발  >  실시간 데이터를 사용한 ETL 프로세스 이해: 추출, 변환, 로드 및 시각화

실시간 데이터를 사용한 ETL 프로세스 이해: 추출, 변환, 로드 및 시각화

Barbara Streisand
Barbara Streisand원래의
2024-10-04 12:11:02344검색

Understanding the ETL Process with Real-Time Data: Extraction, Transformation, Loading, and Visualization

ETL(Extract, Transform, Load) 프로세스는 특히 실시간 데이터를 기반으로 빠른 의사결정이 필요한 애플리케이션에서 데이터를 효율적으로 관리하기 위한 기본 프로세스입니다. 이 기사에서는 Binance API의 실시간 암호화폐 거래와 관련된 실제 사례를 사용하여 ETL 프로세스를 살펴보겠습니다. 제공된 Python 코드는 거래 데이터를 추출하고, 이를 사용 가능한 형식으로 변환하고, SQLite 데이터베이스에 로드하고, 실시간 플로팅으로 데이터를 시각화하는 방법을 보여줍니다.

샘플 ETL 프로젝트 : https://github.com/vcse59/FeatureEngineering/tree/main/Real-Time-CryptoCurrency-Price-Tracker

1. 추출
ETL 프로세스의 첫 번째 단계는 다양한 소스에서 데이터를 수집하는 추출입니다. 이 경우 Binance Testnet API에 대한 WebSocket 연결을 통해 데이터가 추출됩니다. 이 연결을 통해 BTC/USDT 거래를 실시간 스트리밍할 수 있습니다.

코드에서 추출이 구현되는 방법은 다음과 같습니다.

 with websockets.connect(url) as ws:
    response = await ws.recv()
    trade_data = json.loads(response)

수신된 각 메시지에는 JSON 형식의 가격, 수량, 타임스탬프 등 필수 거래 데이터가 포함되어 있습니다.

2. 변신
데이터가 추출되면 변환 프로세스를 거칩니다. 이 단계에서는 데이터를 정리하고 구조화하여 더욱 유용하게 만듭니다. 이 예에서 변환에는 타임스탬프를 밀리초 단위에서 읽을 수 있는 형식으로 변환하고 추가 처리를 위해 데이터를 적절한 유형으로 구성하는 작업이 포함됩니다.


price = float(trade_data['p'])
quantity = float(trade_data['q'])
timestamp = int(trade_data['T'])

trade_time = datetime.fromtimestamp(timestamp / 1000.0)


이렇게 하면 가격과 수량은 부동 소수점으로 저장되고 타임스탬프는 날짜/시간 개체로 변환되어 더 쉽게 조작하고 분석할 수 있습니다.

3. 로드
마지막 단계는 변환된 데이터가 대상 데이터베이스에 저장되는 로드입니다. 우리 코드에서 SQLite 데이터베이스는 거래 데이터의 저장 매체 역할을 합니다.

로딩 프로세스는 다음 기능으로 관리됩니다.


def save_trade_to_db(price, quantity, timestamp):
    conn = sqlite3.connect('trades.db')
    cursor = conn.cursor()
    # Create a table if it doesn't exist
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS trades (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            price REAL,
            quantity REAL,
            timestamp TEXT
        )
    ''')
    # Insert the trade data
    cursor.execute('''
        INSERT INTO trades (price, quantity, timestamp)
        VALUES (?, ?, ?)
    ''', (price, quantity, trade_time))
    conn.commit()
    conn.close()


이 함수는 SQLite 데이터베이스에 연결하여 테이블이 없으면 테이블을 생성하고 거래 데이터를 삽입합니다.

4. 시각화
데이터를 저장하는 것 외에도 데이터를 시각화하는 것은 더 나은 이해와 의사결정을 위해 필수적입니다. 제공된 코드에는 실시간으로 거래를 표시하는 기능이 포함되어 있습니다.


def plot_trades():
    if len(trades) > 0:
        timestamps, prices, quantities = zip(*trades)

        plt.subplot(2, 1, 1)
        plt.cla()  # Clear the previous plot for real-time updates
        plt.plot(timestamps, prices, label='Price', color='blue')
        plt.ylabel('Price (USDT)')
        plt.legend()
        plt.title('Real-Time BTC/USDT Prices')
        plt.xticks(rotation=45)

        plt.subplot(2, 1, 2)
        plt.cla()  # Clear the previous plot for real-time updates
        plt.plot(timestamps, quantities, label='Quantity', color='orange')
        plt.ylabel('Quantity')
        plt.xlabel('Time')
        plt.legend()
        plt.xticks(rotation=45)

        plt.tight_layout()  # Adjust layout for better spacing
        plt.pause(0.1)  # Pause to update the plot


이 함수는 두 개의 하위 플롯(가격에 대한 하위 플롯과 수량에 대한 하위 플롯)을 생성합니다. matplotlib 라이브러리를 사용하여 데이터를 동적으로 시각화함으로써 사용자가 실시간으로 시장 동향을 관찰할 수 있습니다.

결론
이 예에서는 ETL 프로세스를 강조하여 WebSocket API에서 데이터를 추출하고, 분석을 위해 변환하고, 데이터베이스에 로드하고, 즉각적인 피드백을 위해 시각화하는 방법을 보여줍니다. 이 프레임워크는 거래 플랫폼, 시장 분석 도구 등 실시간 데이터를 기반으로 정보에 근거한 결정을 내려야 하는 애플리케이션을 구축하는 데 매우 중요합니다.

위 내용은 실시간 데이터를 사용한 ETL 프로세스 이해: 추출, 변환, 로드 및 시각화의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
이전 기사:데이 랙킹다음 기사:데이 랙킹