Home >Backend Development >Python Tutorial >Tutorial on writing a simple chat program in Python

Tutorial on writing a simple chat program in Python

WBOY
WBOYforward
2023-05-08 18:37:082660browse

Implementation ideas

x01 Establishment of the server

First, on the server side, use socket to accept messages. Every time a socket request is accepted, a socket is opened. A new thread is used to manage the distribution and acceptance of messages. At the same time, there is a handler to manage all threads, thereby realizing the processing of various functions of the chat room

x02 Establishment of the client

The establishment of the client is much simpler than the server. The function of the client is only to send and receive messages, and to enter specific characters according to specific rules to achieve the use of different functions. Therefore, in On the client side, you only need to use two threads, one is dedicated to receiving messages, and the other is dedicated to sending messages. As for why not use one, that is because, if you only use one, then After receiving a message, the one that receives the message is in a blocked state before sending it. Similarly, the same is true for sending a message. If these two functions are implemented in one place, it will make it impossible to continuously send or receive messages.

Implementation method

Server-side implementation

Tutorial on writing a simple chat program in Python

import json
import threading
from socket import *
from time import ctime


class PyChattingServer:
    __socket = socket(AF_INET, SOCK_STREAM, 0)
    __address = ('', 12231)

    __buf = 1024

    def __init__(self):
        self.__socket.bind(self.__address)
        self.__socket.listen(20)
        self.__msg_handler = ChattingHandler()

    def start_session(self):
        print('等待客户连接...\r\n')
        try:
            while True:
                cs, caddr = self.__socket.accept()
                # 利用handler来管理线程,实现线程之间的socket的相互通信
                self.__msg_handler.start_thread(cs, caddr)
        except socket.error:
            pass


class ChattingThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs, caddr, msg_handler):
        super(ChattingThread, self).__init__()
        self.__cs = cs
        self.__caddr = caddr
        self.__msg_handler = msg_handler

    # 使用多线程管理会话
    def run(self):
        try:
            print('...连接来自于:', self.__caddr)
            data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)\r\n'
            self.__cs.sendall(bytes(data, 'utf-8'))
            while True:
                data = self.__cs.recv(self.__buf).decode('utf-8')
                if not data:
                    break
                self.__msg_handler.handle_msg(data, self.__cs)
                print(data)
        except socket.error as e:
            print(e.args)
            pass
        finally:
            self.__msg_handler.close_conn(self.__cs)
            self.__cs.close()


class ChattingHandler:
    __help_str = "[ SYSTEM ]\r\n" \
                 "输入/ls,即可获得所有登陆用户信息\r\n" \
                 "输入/h,即可获得帮助\r\n" \
                 "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
                 "输入/i,即可屏蔽群聊信息\r\n" \
                 "再次输入/i,即可取消屏蔽\r\n" \
                 "所有首字符为/的信息都不会发送出去"

    __buf = 1024
    __socket_list = []

    __user_name_to_socket = {}
    __socket_to_user_name = {}

    __user_name_to_broadcast_state = {}

    def start_thread(self, cs, caddr):
        self.__socket_list.append(cs)
        chat_thread = ChattingThread(cs, caddr, self)
        chat_thread.start()

    def close_conn(self, cs):
        if cs not in self.__socket_list:
            return
        # 去除socket的记录
        nickname = "SOMEONE"
        if cs in self.__socket_list:
            self.__socket_list.remove(cs)
        # 去除socket与username之间的映射关系
        if cs in self.__socket_to_user_name:
            nickname = self.__socket_to_user_name[cs]
            self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
            self.__socket_to_user_name.pop(cs)
            self.__user_name_to_broadcast_state.pop(nickname)
        nickname += " "
        # 广播某玩家退出聊天室
        self.broadcast_system_msg(nickname + "离开了PY_CHATTING")

    # 管理用户输入的信息
    def handle_msg(self, msg, cs):
        js = json.loads(msg)
        if js['type'] == "login":
            if js['msg'] not in self.__user_name_to_socket:
                if ' ' in js['msg']:
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': False,
                        'msg': '账号不能够带有空格'
                    }), cs)
                else:
                    self.__user_name_to_socket[js['msg']] = cs
                    self.__socket_to_user_name[cs] = js['msg']
                    self.__user_name_to_broadcast_state[js['msg']] = True
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': True,
                        'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
                    }), cs)
                    # 广播其他人,他已经进入聊天室
                    self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")
            else:
                self.send_to(json.dumps({
                    'type': 'login',
                    'success': False,
                    'msg': '账号已存在'
                }), cs)
        # 若玩家处于屏蔽模式,则无法发送群聊消息
        elif js['type'] == "broadcast":
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                self.broadcast(js['msg'], cs)
            else:
                self.send_to(json.dumps({
                    'type': 'broadcast',
                    'msg': '屏蔽模式下无法发送群聊信息'
                }), cs)
        elif js['type'] == "ls":
            self.send_to(json.dumps({
                'type': 'ls',
                'msg': self.get_all_login_user_info()
            }), cs)
        elif js['type'] == "help":
            self.send_to(json.dumps({
                'type': 'help',
                'msg': self.__help_str
            }), cs)
        elif js['type'] == "sendto":
            self.single_chatting(cs, js['nickname'], js['msg'])
        elif js['type'] == "ignore":
            self.exchange_ignore_state(cs)

    def exchange_ignore_state(self, cs):
        if cs in self.__socket_to_user_name:
            state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
            if state:
                state = False
            else:
                state = True
            self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
            self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
            if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                msg = "通常模式"
            else:
                msg = "屏蔽模式"
            self.send_to(json.dumps({
                'type': 'ignore',
                'success': True,
                'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
            }), cs)
        else:
            self.send_to({
                'type': 'ignore',
                'success': False,
                'msg': '切换失败'
            }, cs)

    def single_chatting(self, cs, nickname, msg):
        if nickname in self.__user_name_to_socket:
            msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
                ctime(), self.__socket_to_user_name[cs], nickname, msg)
            self.send_to_list(json.dumps({
                'type': 'single',
                'msg': msg
            }), self.__user_name_to_socket[nickname], cs)
        else:
            self.send_to(json.dumps({
                'type': 'single',
                'msg': '该用户不存在'
            }), cs)
        print(nickname)

    def send_to_list(self, msg, *cs):
        for i in range(len(cs)):
            self.send_to(msg, cs[i])

    def get_all_login_user_info(self):
        login_list = "[ SYSTEM ] ALIVE USER : \r\n"
        for key in self.__socket_to_user_name:
            login_list += self.__socket_to_user_name[key] + ",\r\n"
        return login_list

    def send_to(self, msg, cs):
        if cs not in self.__socket_list:
            self.__socket_list.append(cs)
        cs.sendall(bytes(msg, 'utf-8'))

    def broadcast_system_msg(self, msg):
        data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
        js = json.dumps({
            'type': 'system_msg',
            'msg': data
        })
        # 屏蔽了群聊的玩家也可以获得系统的群发信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))

    def broadcast(self, msg, cs):
        data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
        js = json.dumps({
            'type': 'broadcast',
            'msg': data
        })
        # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息
        for i in range(len(self.__socket_list)):
            if self.__socket_list[i] in self.__socket_to_user_name \
                    and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
                self.__socket_list[i].sendall(bytes(js, 'utf-8'))


def main():
    server = PyChattingServer()
    server.start_session()


main()
Tutorial on writing a simple chat program in PythonClient-side implementation

Tutorial on writing a simple chat program in Python

import json
import threading
from socket import *

is_login = False
is_broadcast = True


class ClientReceiveThread(threading.Thread):
    __buf = 1024

    def __init__(self, cs):
        super(ClientReceiveThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.receive_msg()

    def receive_msg(self):
        while True:
            msg = self.__cs.recv(self.__buf).decode('utf-8')
            if not msg:
                break
            js = json.loads(msg)
            if js['type'] == "login":
                if js['success']:
                    global is_login
                    is_login = True
                print(js['msg'])
            elif js['type'] == "ignore":
                if js['success']:
                    global is_broadcast
                    if is_broadcast:
                        is_broadcast = False
                    else:
                        is_broadcast = True
                print(js['msg'])
            else:
                if not is_broadcast:
                    print("[现在处于屏蔽模式]")
                print(js['msg'])


class ClientSendMsgThread(threading.Thread):

    def __init__(self, cs):
        super(ClientSendMsgThread, self).__init__()
        self.__cs = cs

    def run(self):
        self.send_msg()

    # 根据不同的输入格式来进行不同的聊天方式
    def send_msg(self):
        while True:
            js = None
            msg = input()
            if not is_login:
                js = json.dumps({
                    'type': 'login',
                    'msg': msg
                })
            elif msg[0] == "@":
                data = msg.split(' ')
                if not data:
                    print("请重新输入")
                    break
                nickname = data[0]
                nickname = nickname.strip("@")
                if len(data) == 1:
                    data.append(" ")
                js = json.dumps({
                    'type': 'sendto',
                    'nickname': nickname,
                    'msg': data[1]
                })
            elif msg == "/help":
                js = json.dumps({
                    'type': 'help',
                    'msg': None
                })
            elif msg == "/ls":
                js = json.dumps({
                    'type': 'ls',
                    'msg': None
                })
            elif msg == "/i":
                js = json.dumps({
                    'type': 'ignore',
                    'msg': None
                })
            else:
                if msg[0] != '/':
                    js = json.dumps({
                        'type': 'broadcast',
                        'msg': msg
                    })
            if js is not None:
                self.__cs.sendall(bytes(js, 'utf-8'))


def main():
    buf = 1024
    # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了
    address = ("127.0.0.1", 12231)
    cs = socket(AF_INET, SOCK_STREAM, 0)
    cs.connect(address)
    data = cs.recv(buf).decode("utf-8")
    if data:
        print(data)
    receive_thread = ClientReceiveThread(cs)
    receive_thread.start()
    send_thread = ClientSendMsgThread(cs)
    send_thread.start()
    while True:
        pass


main()

The above is the detailed content of Tutorial on writing a simple chat program in Python. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete