ETL(擷取、轉換、載入)流程是有效管理資料的基礎,特別是在需要基於即時資料快速決策的應用程式中。在本文中,我們將使用涉及幣安 API 的即時加密貨幣交易的實際範例來探索 ETL 流程。提供的 Python 程式碼說明如何提取交易資料、將其轉換為可用格式、將其載入到 SQLite 資料庫中,以及透過即時繪圖將資料視覺化。
範例 ETL 專案: https://github.com/vcse59/FeatureEngineering/tree/main/Real-Time-CryptoCurrency-Price-Tracker
1。萃取物
ETL 過程的第一步是提取,其中涉及從各種來源收集資料。在這種情況下,資料是透過與 Binance Testnet API 的 WebSocket 連線提取的。此連接允許即時傳輸 BTC/USDT 交易。
程式碼中擷取的實作方式如下:
with websockets.connect(url) as ws: response = await ws.recv() trade_data = json.loads(response)
收到的每條訊息都包含重要的交易數據,包括價格、數量和時間戳,格式為 JSON。
2。變形
提取資料後,它會經歷轉換過程。此步驟清理並結構化資料以使其更有用。在我們的範例中,轉換包括將時間戳記從毫秒轉換為可讀格式,並將資料組織成適當的類型以進行進一步處理。
price = float(trade_data['p']) quantity = float(trade_data['q']) timestamp = int(trade_data['T']) trade_time = datetime.fromtimestamp(timestamp / 1000.0)
這確保了價格和數量儲存為浮點數,並且時間戳被轉換為日期時間對象,以便於操作和分析。
3。載入
最後一步是加載,將轉換後的資料儲存在目標資料庫中。在我們的程式碼中,SQLite 資料庫作為交易資料的儲存媒體。
載入過程由下列函數管理:
def save_trade_to_db(price, quantity, timestamp): conn = sqlite3.connect('trades.db') cursor = conn.cursor() # Create a table if it doesn't exist cursor.execute(''' CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, price REAL, quantity REAL, timestamp TEXT ) ''') # Insert the trade data cursor.execute(''' INSERT INTO trades (price, quantity, timestamp) VALUES (?, ?, ?) ''', (price, quantity, trade_time)) conn.commit() conn.close()
此函數連接到 SQLite 資料庫,如果不存在則建立一個表,並插入交易資料。
4。可視化
除了儲存資料之外,將資料視覺化以便更好地理解和決策也很重要。提供的程式碼包含一個即時繪製交易的函數:
def plot_trades(): if len(trades) > 0: timestamps, prices, quantities = zip(*trades) plt.subplot(2, 1, 1) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, prices, label='Price', color='blue') plt.ylabel('Price (USDT)') plt.legend() plt.title('Real-Time BTC/USDT Prices') plt.xticks(rotation=45) plt.subplot(2, 1, 2) plt.cla() # Clear the previous plot for real-time updates plt.plot(timestamps, quantities, label='Quantity', color='orange') plt.ylabel('Quantity') plt.xlabel('Time') plt.legend() plt.xticks(rotation=45) plt.tight_layout() # Adjust layout for better spacing plt.pause(0.1) # Pause to update the plot
此函數產生兩個子圖:一個用於價格,另一個用於數量。它使用matplotlib庫動態視覺化數據,讓使用者即時觀察市場趨勢。
結論
此範例重點介紹了 ETL 流程,示範如何從 WebSocket API 中提取資料、進行轉換以進行分析、載入到資料庫中以及如何進行視覺化以獲取即時回饋。該框架對於建立需要基於即時數據做出明智決策的應用程式至關重要,例如交易平台和市場分析工具。
以上是了解即時資料的 ETL 流程:提取、轉換、載入和視覺化的詳細內容。更多資訊請關注PHP中文網其他相關文章!