ホームページ >バックエンド開発 >Python チュートリアル >Pythonでリモートホストのリアルタイムデータを監視する方法
このプログラムは、TCP プロトコルに基づいたサーバー プログラムであり、クライアントから送信された命令を受け取り、対応する処理を実行します。結果はクライアントに返されます。プログラムはローカルホスト (このマシン) のポート 8888 で実行されます。
主な機能とコマンド:
• CPU 使用率の取得: コマンド "cpu"
• メモリ使用量の取得: コマンド "memory"
# #&bull ; ネットワーク帯域幅情報の取得: コマンド "network"• 現在ログインしているユーザーの取得: コマンド "user"• システム負荷の取得: コマンド "loadavg"• 現在時刻の取得: コマンド "time"• プロセス一覧の取得: コマンド "process"• システム情報の取得: コマンド "system" • ネットワーク接続リストを取得: コマンド "connection"• GPU 使用状況を取得: コマンド "gpu"• ディスク使用状況を取得: コマンド "disk"命令が異なると、プログラムはデータの取得と処理に次のように異なるライブラリ関数を使用します: • 命令「cpu」の場合、psutil ライブラリを使用して CPU 使用率を取得します。 • 命令「memory」については、psutil ライブラリを使用してメモリ使用量を取得します。 • コマンド「network」の場合、speedtest-cli ライブラリを使用してネットワーク帯域幅情報を取得します。 • 命令 "user" の場合、psutil ライブラリを使用して、現在ログインしているユーザーを取得します。 • 命令「loadavg」では、os ライブラリを使用してシステムの負荷状態を取得します。 • 命令「time」の場合、datetime ライブラリを使用して現在時刻を取得します。 • ディレクティブ「process」については、psutil ライブラリを使用してプロセスのリストを取得します。プログラムはメモリ使用量に従ってプロセスを並べ替え、上位 10 のプロセスのみを返します。 • 命令 "system" については、プラットフォーム ライブラリを使用してシステム情報を取得します。 • コマンド「connection」の場合、psutil ライブラリを使用してネットワーク接続リストを取得します。プログラムは接続をプロセス ID でソートし、最初の 10 個の接続のみを返します。 • 命令「gpu」については、nvidia-smi ツールを使用して GPU 使用率を取得します。 • コマンド「disk」の場合、psutil ライブラリを使用してディスク使用量を取得します。 各命令の処理結果は文字列としてクライアントに返されます。命令を処理する際、プログラムは受信したデータが空かどうかを判断します。空の場合は、クライアント ソケットを切断して閉じます。プログラム内の handle_client 関数は、単一のクライアントとの通信を担当するスレッド関数であり、クライアント接続ごとにスレッドが開始されます。 1.2 クライアントこのプログラムは、サーバーに接続してリクエストを送信し、サーバーの応答を待つことができる単純な TCP クライアントです。このプログラムは、Python のソケット モジュールを使用して TCP ソケットを作成し、connect メソッドを使用してサーバー アドレスに接続します。プログラムはループを通じてユーザーがリクエスト データ型を入力するのを継続的に待ち、リクエスト データ型を UTF-8 形式にエンコードし、sendall メソッドを使用してサーバーに送信します。次に、プログラムはサーバーの応答を待ち、応答データを UTF-8 形式にデコードして出力します。最後に、プログラムはクライアント ソケットを閉じます。 プログラムの主なプロセスは次のとおりです: 1. ソケット モジュールをインポートします。 2. TCP ソケット オブジェクトを作成します。 3. 指定されたサーバー アドレスに接続します。 4. ユーザーが要求されたデータ型を入力するのをループで待機します。 5. リクエストのデータ型を UTF-8 形式にエンコードしてサーバーに送信します。 6. サーバーが応答して応答データを受信するまで待ちます。 7. 応答データを UTF-8 形式にデコードし、出力します。 8. クライアントソケットを閉じます。 このプログラムは、特定のデータ型を提供するサーバーと通信するために使用できます。ユーザーは、さまざまなリクエスト データ タイプを入力して、さまざまなタイプのデータを取得できます。サーバーはリクエストの種類に基づいて適切なデータを返します。プログラムの動作はサーバーの可用性と応答速度に依存します。サーバーが応答しない場合、プログラムはサーバーが応答するかプログラムが中断されるまで待機します。 2 コードサーバーimport os import socket import subprocess import threading from datetime import datetime from sys import platform import psutil from speedtest import Speedtest # 创建TCP套接字 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 绑定IP和端口 server_address = ('localhost', 8888) server_socket.bind(server_address) # 监听连接请求 server_socket.listen(5) def handle_client(client_socket, client_address): while True: # 接收客户端发送的数据 data = client_socket.recv(1024) # 如果接收到空数据,则断开连接 if not data: client_socket.close() print(f"Connection with {client_address} closed") break # 处理接收到的数据 request = data.decode('utf-8') if request == 'cpu': # 使用psutil库获取CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) response_data = f'CPU使用率:{cpu_percent}%'.encode('utf-8') elif request == 'memory': # 使用psutil库获取内存使用率 memory_percent = psutil.virtual_memory().percent response_data = f'内存使用率:{memory_percent}%'.encode('utf-8') elif request == 'network': # 使用speedtest-cli库获取网络带宽信息 st = Speedtest() download_speed = st.download() upload_speed = st.upload() response_data = f'下载速度:{download_speed / 1000000}Mbps,上传速度:{upload_speed / 1000000}Mbps'.encode('utf-8') elif request == 'user': # 使用psutil库获取当前登录用户 username = psutil.users()[0].name response_data = f'当前登录用户:{username}'.encode('utf-8') elif request == 'loadavg': # 使用os库获取系统负载情况 load_avg = os.getloadavg() response_data = f'系统负载情况:{load_avg}'.encode('utf-8') elif request == 'time': # 使用datetime库获取当前时间 current_time = datetime.datetime.now() response_data = f'当前时间:{current_time}'.encode('utf-8') elif request == 'process': # 使用psutil库获取进程列表 process_list = [] for process in psutil.process_iter(['pid', 'name', 'memory_info']): try: process_list.append((process.info['pid'], process.info['name'], process.info['memory_info'].rss)) except (psutil.AccessDenied, psutil.NoSuchProcess): pass process_list.sort(key=lambda x: x[2], reverse=True) response_data = '' for i, (pid, name, memory) in enumerate(process_list[:10]): response_data += f'{i + 1}. 进程名称:{name},进程ID:{pid},占用内存:{memory / 1024 / 1024:.2f}MB\n' response_data = response_data.encode('utf-8') elif request == 'system': # 使用platform库获取系统信息 system_info = f'操作系统:{platform.system()} {platform.release()}\n处理器:{platform.processor()}\nPython版本:{platform.python_version()}' response_data = system_info.encode('utf-8') elif request == 'connection': # 使用psutil库获取网络连接列表 conn_list = [] for conn in psutil.net_connections(): if conn.status == psutil.CONN_ESTABLISHED: conn_list.append((conn.laddr.ip, conn.laddr.port, conn.raddr.ip, conn.raddr.port, conn.pid)) conn_list.sort(key=lambda x: x[4]) response_data = '' for i, (laddr_ip, laddr_port, raddr_ip, raddr_port, pid) in enumerate(conn_list[:10]): response_data += f'{i + 1}. 本地地址:{laddr_ip}:{laddr_port},远程地址:{raddr_ip}:{raddr_port},进程ID:{pid}\n' response_data = response_data.encode('utf-8') elif request == 'disk': # 使用psutil库获取磁盘使用情况 disk_usage = psutil.disk_usage('/') disk_info = f'磁盘总容量:{disk_usage.total / 1024 / 1024 / 1024:.2f}GB,已用容量:{disk_usage.used / 1024 / 1024 / 1024:.2f}GB,可用容量:{disk_usage.free / 1024 / 1024 / 1024:.2f}GB' response_data = disk_info.encode('utf-8') elif request == 'load': # 使用psutil库获取系统负载 load_avg = psutil.getloadavg() load_info = f'1分钟内平均负载:{load_avg[0]:.2f},5分钟内平均负载:{load_avg[1]:.2f},15分钟内平均负载:{load_avg[2]:.2f}' response_data = load_info.encode('utf-8') elif request == 'thread': # 使用psutil库获取进程线程数 thread_info = f'当前进程线程数:{psutil.Process().num_threads()}' response_data = thread_info.encode('utf-8') else: response_data = b'Invalid request' # 发送响应数据 client_socket.sendall(response_data) # 接收多个客户端连接 while True: client_socket, client_address = server_socket.accept() print(f"New connection from {client_address}") # 创建新线程处理客户端连接 client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address)) client_thread.start()クライアント
import socket # 创建TCP套接字 client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 连接服务器 server_address = ('localhost', 8888) client_socket.connect(server_address) while True: # 发送请求数据给服务器 request = input("请输入要请求的数据类型(cpu/memory/network/user/loadavg/time/process/system/connection/disk/load/thread):") client_socket.sendall(request.encode('utf-8')) # 接收服务器响应数据 response_data = client_socket.recv(1024) # 处理接收到的数据 response = response_data.decode('utf-8') print(response) # 关闭客户端套接字 client_socket.close()
以上がPythonでリモートホストのリアルタイムデータを監視する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。