本指南將教您如何使用 Python 建立 WebSocket 代理伺服器。
伺服器將執行以下操作:
- 驗證客戶端身分:在允許客戶端連線之前,會檢查每個用戶端是否擁有唯一的「使用者金鑰(API Key)」。
- 連接到另一個 WebSocket: 伺服器將連接到一個單獨的 WebSocket 伺服器。
- 中繼訊息:伺服器將從連接的 WebSocket 接收訊息並將其傳送給所有經過驗證的用戶端。
開始之前:
- 確保您已安裝Python 3.6或更高版本。 WebSockets 需要 Python 3.6 或更高版本。
- 安裝 WebSockets 庫:您可以在終端機中使用以下命令來安裝它。
pip install websockets
1. 入門
- 為您的專案建立一個新資料夾。
- 在資料夾中建立一個新的 Python 檔案並將其命名為「websocket_proxy_server.py」。該文件將保存您伺服器的所有代碼。
2. 建立WebSocket伺服器
- 導入所需的庫。您將需要之前安裝的庫。
- 建構伺服器的基本結構。使用 WebSockets 庫為您的伺服器建立基礎。
import asyncio import websockets import json class WebSocketProxy: def init(self, source_url, symbols): self.source_url = source_url self.clients = set() self.symbols = symbols self.valid_user_key = "yourValidUserKey" # Single valid user key for authentication async def on_open(self, ws): print("Connected to source") symbols_str = ",".join(self.symbols.keys()) init_message = f"{{"userKey":"your_api_key", "symbol":"{symbols_str}"}}" await ws.send(init_message)
3. 連線並驗證客戶端
- 確保伺服器已全部設定為接受來自客戶端的連線。
- 新增檢查以驗證每個客戶的身份。當客戶端嘗試連線時,伺服器應要求提供「使用者金鑰」。只有具有正確密鑰的客戶端才被允許連線。
async def client_handler(self, websocket, path): try: # Wait for a message that should contain the authentication key auth_message = await asyncio.wait_for(websocket.recv(), timeout=10) auth_data = json.loads(auth_message) user_key = auth_data.get("userKey") if user_key == self.valid_user_key: self.clients.add(websocket) print(f"Client authenticated with key: {user_key}") try: await websocket.wait_closed() finally: self.clients.remove(websocket) else: print("Authentication failed") await websocket.close(reason="Authentication failed") except (asyncio.TimeoutError, json.JSONDecodeError, KeyError): print("Failed to authenticate") await websocket.close(reason="Failed to authenticate")
4. 連接到來源並共享訊息
- 建立一個函數,使伺服器保持與原始 WebSocket 的連線。
- 此函數應該會自動將從原始 WebSocket 接收到的訊息傳送到所有成功驗證的用戶端。
async def source_handler(self): async with websockets.connect(self.source_url) as websocket: await self.on_open(websocket) async for message in websocket: await self.broadcast(message) async def broadcast(self, message): if self.clients: await asyncio.gather(*(client.send(message) for client in self.clients))
5. 啟動伺服器
- 建立一個函數來啟動伺服器並監聽連線。
- 新增程式碼來執行此函數,啟動您的 WebSocket 代理伺服器。
def run(self, host="localhost", port=8765): start_server = websockets.serve(self.client_handler, host, port) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_until_complete(self.source_handler()) asyncio.get_event_loop().run_forever() if name == "main": symbols = {"EURUSD": {}, "GBPUSD": {}, "USDJPY": {}, "AUDUSD": {}, "USDCAD": {}} source_url = "ws://example.com/source" proxy = WebSocketProxy(source_url, symbols) proxy.run()
總之
您已經成功開發了一個基於Python的WebSocket代理伺服器。此伺服器可以驗證客戶端身份,維護與指定資料來源的持久連接,並將從來源接收的訊息有效地分發給所有經過驗證的客戶端。事實證明,對於需要將資料從單一來源安全即時傳播到不同用戶群的應用程式來說,此功能非常寶貴。
下一步
徹底的伺服器測試對於確保最佳效能和可靠性至關重要。它驗證其對連接和訊息傳輸的正確處理。為了提高效率,請考慮實施負載平衡機制和自訂連線標頭。最後,建議將伺服器部署到適合生產部署的環境,例如專門為容納長期網路連線而設計的雲端服務。
另外,請查看我們網站上最初發布的教學:使用 Python 代理程式擴展外匯 WebSocket
以上是使用 Python 代理實現可擴展的外匯 WebSocket的詳細內容。更多資訊請關注PHP中文網其他相關文章!

Inpython,YouAppendElementStoAlistusingTheAppend()方法。 1)useappend()forsingleelements:my_list.append(4).2)useextend()orextend()或= formultiplelements:my_list.extend.extend(emote_list)ormy_list = [4,5,6] .3)useInsert()forspefificpositions:my_list.insert(1,5).beaware

調試shebang問題的方法包括:1.檢查shebang行確保是腳本首行且無前置空格;2.驗證解釋器路徑是否正確;3.直接調用解釋器運行腳本以隔離shebang問題;4.使用strace或truss跟踪系統調用;5.檢查環境變量對shebang的影響。

pythonlistscanbemanipulationusseveralmethodstoremovelements:1)theremove()MethodRemovestHefirStocCurrenceOfAstePecificiedValue.2)thepop()thepop()methodRemovesandReturnturnturnturnsanaNelementAgivenIndex.3)

pythristssupportnumeroferations:1)addingElementSwithAppend(),Extend(),andInsert()。 2)emovingItemSusingRemove(),pop(),andclear(),and clear()。 3)訪問andModifyingandmodifyingwithIndexingandSlicing.4)

使用NumPy創建多維數組可以通過以下步驟實現:1)使用numpy.array()函數創建數組,例如np.array([[1,2,3],[4,5,6]])創建2D數組;2)使用np.zeros(),np.ones(),np.random.random()等函數創建特定值填充的數組;3)理解數組的shape和size屬性,確保子數組長度一致,避免錯誤;4)使用np.reshape()函數改變數組形狀;5)注意內存使用,確保代碼清晰高效。

播放innumpyisamethodtoperformoperationsonArraySofDifferentsHapesbyAutapityallate AligningThem.itSimplifififiesCode,增強可讀性,和Boostsperformance.Shere'shore'showitworks:1)較小的ArraySaraySaraysAraySaraySaraySaraySarePaddedDedWiteWithOnestOmatchDimentions.2)

forpythondataTastorage,choselistsforflexibilityWithMixedDatatypes,array.ArrayFormeMory-effficityHomogeneousnumericalData,andnumpyArraysForAdvancedNumericalComputing.listsareversareversareversareversArversatilebutlessEbutlesseftlesseftlesseftlessforefforefforefforefforefforefforefforefforefforlargenumerdataSets; arrayoffray.array.array.array.array.array.ersersamiddreddregro


熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

Video Face Swap
使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

熱工具

Dreamweaver Mac版
視覺化網頁開發工具

SublimeText3 英文版
推薦:為Win版本,支援程式碼提示!

MinGW - Minimalist GNU for Windows
這個專案正在遷移到osdn.net/projects/mingw的過程中,你可以繼續在那裡關注我們。 MinGW:GNU編譯器集合(GCC)的本機Windows移植版本,可自由分發的導入函式庫和用於建置本機Windows應用程式的頭檔;包括對MSVC執行時間的擴展,以支援C99功能。 MinGW的所有軟體都可以在64位元Windows平台上運作。

Atom編輯器mac版下載
最受歡迎的的開源編輯器

Safe Exam Browser
Safe Exam Browser是一個安全的瀏覽器環境,安全地進行線上考試。該軟體將任何電腦變成一個安全的工作站。它控制對任何實用工具的訪問,並防止學生使用未經授權的資源。