ホームページ >バックエンド開発 >Python チュートリアル >Python プロキシを使用したスケーラブルな外国為替 WebSocket の実装
このガイドでは、Python で WebSocket プロキシ サーバーを作成する方法を説明します。
pip install websockets
import asyncio import websockets import json class WebSocketProxy: def init(self, source_url, symbols): self.source_url = source_url self.clients = set() self.symbols = symbols self.valid_user_key = "yourValidUserKey" # Single valid user key for authentication async def on_open(self, ws): print("Connected to source") symbols_str = ",".join(self.symbols.keys()) init_message = f"{{"userKey":"your_api_key", "symbol":"{symbols_str}"}}" await ws.send(init_message)
async def client_handler(self, websocket, path): try: # Wait for a message that should contain the authentication key auth_message = await asyncio.wait_for(websocket.recv(), timeout=10) auth_data = json.loads(auth_message) user_key = auth_data.get("userKey") if user_key == self.valid_user_key: self.clients.add(websocket) print(f"Client authenticated with key: {user_key}") try: await websocket.wait_closed() finally: self.clients.remove(websocket) else: print("Authentication failed") await websocket.close(reason="Authentication failed") except (asyncio.TimeoutError, json.JSONDecodeError, KeyError): print("Failed to authenticate") await websocket.close(reason="Failed to authenticate")
async def source_handler(self): async with websockets.connect(self.source_url) as websocket: await self.on_open(websocket) async for message in websocket: await self.broadcast(message) async def broadcast(self, message): if self.clients: await asyncio.gather(*(client.send(message) for client in self.clients))
def run(self, host="localhost", port=8765): start_server = websockets.serve(self.client_handler, host, port) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_until_complete(self.source_handler()) asyncio.get_event_loop().run_forever() if name == "main": symbols = {"EURUSD": {}, "GBPUSD": {}, "USDJPY": {}, "AUDUSD": {}, "USDCAD": {}} source_url = "ws://example.com/source" proxy = WebSocketProxy(source_url, symbols) proxy.run()
Python ベースの WebSocket プロキシ サーバーの開発に成功しました。このサーバーは、クライアント ID を認証し、指定されたデータ ソースへの永続的な接続を維持し、ソースから受信したメッセージを検証されたすべてのクライアントに効果的に配布できます。この機能は、単一の発信元から多様なユーザー ベースにデータを安全かつ瞬時に配布する必要があるアプリケーションにとって非常に貴重であることがわかります。
最適なパフォーマンスと信頼性を確保するには、徹底的なサーバー テストが不可欠です。接続とメッセージ送信が適切に処理されることを検証します。効率を高めるには、負荷分散メカニズムの実装と接続ヘッダーのカスタマイズを検討してください。最後に、長期間のネットワーク接続に対応するように特別に設計されたクラウド サービスなど、実稼働展開に適した環境にサーバーを展開することをお勧めします。
また、弊社 Web サイトで最初に公開されたチュートリアル: Python プロキシを使用した外国為替 WebSocket のスケーリング
もご覧ください。以上がPython プロキシを使用したスケーラブルな外国為替 WebSocket の実装の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。