Maison  >  Article  >  développement back-end  >  Tutoriel sur l'écriture d'un programme de chat simple en Python

Tutoriel sur l'écriture d'un programme de chat simple en Python

WBOY
WBOYavant
2023-05-08 18:37:082597parcourir

Idées d'implémentation

x01 Mise en place du serveur

Tout d'abord, côté serveur, le socket est utilisé pour accepter les messages. Chaque fois qu'une demande de socket est acceptée, un nouveau thread est ouvert pour gérer la distribution et l'acceptation des messages. messages En même temps, il existe un gestionnaire pour gérer tous les threads afin de gérer diverses fonctions de la salle de discussion

x02 Établissement du client

L'établissement du client est beaucoup plus simple que le rôle du serveur. Le client doit uniquement envoyer des messages. Et accepter et saisir des caractères spécifiques selon des règles spécifiques pour obtenir l'utilisation de différentes fonctions. Par conséquent, du côté client, vous n'avez besoin que de deux threads, l'un est dédié à la réception des messages et l'autre. l'autre est dédié à l'envoi

Quant à savoir pourquoi nous n'en utilisons pas, c'est parce que si nous n'en utilisons qu'un, lorsque le message est reçu, celui qui reçoit le message sera bloqué avant de l'envoyer. De même, la même chose est vraie. pour l'envoi de messages, donc si ces deux fonctions sont placées Si elles sont implémentées au même endroit, il n'y aura aucun moyen d'envoyer ou de recevoir des messages en continu

Méthode d'implémentation

Implémentation côté serveur

Tutoriel sur lécriture dun programme de chat simple en Python

Tutoriel sur lécriture dun programme de chat simple en Python

import json
import threading
from socket import *
from time import ctime


class PyChattingServer:
    __socket = socket(AF_INET, SOCK_STREAM, 0)
    __address = ('', 12231)

    __buf = 1024

    def __init__(self):
        self.__socket.bind(self.__address)
        self.__socket.listen(20)
        self.__msg_handler = ChattingHandler()

    def start_session(self):
        print('等待客户连接...\r\n')
        try:
            while True:
                cs, caddr = self.__socket.accept()
                # 利用handler来管理线程,实现线程之间的socket的相互通信
                self.__msg_handler.start_thread(cs, caddr)
        except socket.error:
            pass


class ChattingThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs, caddr, msg_handler):
        super(ChattingThread, self).__init__()
        self.__cs = cs
        self.__caddr = caddr
        self.__msg_handler = msg_handler

    # 使用多线程管理会话
    def run(self):
        try:
            print('...连接来自于:', self.__caddr)
            data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n'
            self.__cs.sendall(bytes(data, 'utf-8'))
            while True:
                data = self.__cs.recv(self.__buf).decode('utf-8')
                if not data:
                    break
                self.__msg_handler.handle_msg(data, self.__cs)
                print(data)
        except socket.error as e:
            print(e.args)
            pass
        finally:
            self.__msg_handler.close_conn(self.__cs)
            self.__cs.close()


class ChattingHandler:
    __help_str = "[ SYSTEM ]\r\n" \
                 "输入/ls,即可获得所有登陆用户信息\r\n" \
                 "输入/h,即可获得帮助\r\n" \
                 "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
                 "输入/i,即可屏蔽群聊信息\r\n" \
                 "再次输入/i,即可取消屏蔽\r\n" \
                 "所有首字符为/的信息都不会发送出去"

    __buf = 1024
    __socket_list = []

    __user_name_to_socket = {}
    __socket_to_user_name = {}

    __user_name_to_broadcast_state = {}

    def start_thread(self, cs, caddr):
        self.__socket_list.append(cs)
        chat_thread = ChattingThread(cs, caddr, self)
        chat_thread.start()

    def close_conn(self, cs):
        if cs not in self.__socket_list:
            return
        # 去除socket的记录
        nickname = "SOMEONE"
        if cs in self.__socket_list:
            self.__socket_list.remove(cs)
        # 去除socket与username之间的映射关系
        if cs in self.__socket_to_user_name:
            nickname = self.__socket_to_user_name[cs]
            self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
            self.__socket_to_user_name.pop(cs)
            self.__user_name_to_broadcast_state.pop(nickname)
        nickname += " "
        # 广播某玩家退出聊天室
        self.broadcast_system_msg(nickname + "离开了PY_CHATTING")

    # 管理用户输入的信息
    def handle_msg(self, msg, cs):
        js = json.loads(msg)
        if js['type'] == "login":
            if js['msg'] not in self.__user_name_to_socket:
                if ' ' in js['msg']:
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': False,
                        'msg': '账号不能够带有空格'
                    }), cs)
                else:
                    self.__user_name_to_socket[js['msg']] = cs
                    self.__socket_to_user_name[cs] = js['msg']
                    self.__user_name_to_broadcast_state[js['msg']] = True
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': True,
                        'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
                    }), cs)
                    # 广播其他人,他已经进入聊天室
                    self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")
            else:
                self.send_to(json.dumps({
                    'type': 'login',
                    'success': False,
                    'msg': '账号已存在'
                }), cs)
        # 若玩家处于屏蔽模式,则无法发送群聊消息
        elif js['type'] == "broadcast":
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                self.broadcast(js['msg'], cs)
            else:
                self.send_to(json.dumps({
                    'type': 'broadcast',
                    'msg': '屏蔽模式下无法发送群聊信息'
                }), cs)
        elif js['type'] == "ls":
            self.send_to(json.dumps({
                'type': 'ls',
                'msg': self.get_all_login_user_info()
            }), cs)
        elif js['type'] == "help":
            self.send_to(json.dumps({
                'type': 'help',
                'msg': self.__help_str
            }), cs)
        elif js['type'] == "sendto":
            self.single_chatting(cs, js['nickname'], js['msg'])
        elif js['type'] == "ignore":
            self.exchange_ignore_state(cs)

    def exchange_ignore_state(self, cs):
        if cs in self.__socket_to_user_name:
            state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
            if state:
                state = False
            else:
                state = True
            self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
            self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                msg = "通常模式"
            else:
                msg = "屏蔽模式"
            self.send_to(json.dumps({
                'type': 'ignore',
                'success': True,
                'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
            }), cs)
        else:
            self.send_to({
                'type': 'ignore',
                'success': False,
                'msg': '切换失败'
            }, cs)

    def single_chatting(self, cs, nickname, msg):
        if nickname in self.__user_name_to_socket:
            msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
                ctime(), self.__socket_to_user_name[cs], nickname, msg)
            self.send_to_list(json.dumps({
                'type': 'single',
                'msg': msg
            }), self.__user_name_to_socket[nickname], cs)
        else:
            self.send_to(json.dumps({
                'type': 'single',
                'msg': '该用户不存在'
            }), cs)
        print(nickname)

    def send_to_list(self, msg, *cs):
        for i in range(len(cs)):
            self.send_to(msg, cs[i])

    def get_all_login_user_info(self):
        login_list = "[ SYSTEM ] ALIVE USER : \r\n"
        for key in self.__socket_to_user_name:
            login_list += self.__socket_to_user_name[key] + ",\r\n"
        return login_list

    def send_to(self, msg, cs):
        if cs not in self.__socket_list:
            self.__socket_list.append(cs)
        cs.sendall(bytes(msg, 'utf-8'))

    def broadcast_system_msg(self, msg):
        data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
        js = json.dumps({
            'type': 'system_msg',
            'msg': data
        })
        # 屏蔽了群聊的玩家也可以获得系统的群发信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))

    def broadcast(self, msg, cs):
        data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
        js = json.dumps({
            'type': 'broadcast',
            'msg': data
        })
        # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name \
                    and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))


def main():
    server = PyChattingServer()
    server.start_session()


main()

Client- côté mise en œuvre

Tutoriel sur lécriture dun programme de chat simple en Python

Tutoriel sur lécriture dun programme de chat simple en Python

import json
import threading
from socket import *

is_login = False
is_broadcast = True


class ClientReceiveThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs):
        super(ClientReceiveThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.receive_msg()

    def receive_msg(self):
        while True:
            msg = self.__cs.recv(self.__buf).decode('utf-8')
            if not msg:
                break
            js = json.loads(msg)
            if js['type'] == "login":
                if js['success']:
                    global is_login
                    is_login = True
                print(js['msg'])
            elif js['type'] == "ignore":
                if js['success']:
                    global is_broadcast
                    if is_broadcast:
                        is_broadcast = False
                    else:
                        is_broadcast = True
                print(js['msg'])
            else:
                if not is_broadcast:
                    print("[现在处于屏蔽模式]")
                print(js['msg'])


class ClientSendMsgThread(threading.Thread):

    def __init__(self, cs):
        super(ClientSendMsgThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.send_msg()

    # 根据不同的输入格式来进行不同的聊天方式
    def send_msg(self):
        while True:
            js = None
            msg = input()
            if not is_login:
                js = json.dumps({
                    'type': 'login',
                    'msg': msg
                })
            elif msg[0] == "@":
                data = msg.split(' ')
                if not data:
                    print("请重新输入")
                    break
                nickname = data[0]
                nickname = nickname.strip("@")
                if len(data) == 1:
                    data.append(" ")
                js = json.dumps({
                    'type': 'sendto',
                    'nickname': nickname,
                    'msg': data[1]
                })
            elif msg == "/help":
                js = json.dumps({
                    'type': 'help',
                    'msg': None
                })
            elif msg == "/ls":
                js = json.dumps({
                    'type': 'ls',
                    'msg': None
                })
            elif msg == "/i":
                js = json.dumps({
                    'type': 'ignore',
                    'msg': None
                })
            else:
                if msg[0] != '/':
                    js = json.dumps({
                        'type': 'broadcast',
                        'msg': msg
                    })
            if js is not None:
                self.__cs.sendall(bytes(js, 'utf-8'))


def main():
    buf = 1024
    # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了
    address = ("127.0.0.1", 12231)
    cs = socket(AF_INET, SOCK_STREAM, 0)
    cs.connect(address)
    data = cs.recv(buf).decode("utf-8")
    if data:
        print(data)
    receive_thread = ClientReceiveThread(cs)
    receive_thread.start()
    send_thread = ClientSendMsgThread(cs)
    send_thread.start()
    while True:
        pass


main()

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer