Maison >développement back-end >Tutoriel Python >Utilisation de python3 pour implémenter un exemple de fonction de service FTP (serveur pour Linux)

Utilisation de python3 pour implémenter un exemple de fonction de service FTP (serveur pour Linux)

高洛峰
高洛峰original
2017-03-26 09:49:362196parcourir

这篇文章主要介绍了python3实现ftp服务功能,服务端 For Linux,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

本文实例为大家分享了python3实现ftp服务功能的具体代码,供大家参考,具体内容如下

功能介绍:

可执行的命令:

ls
pwd
cd
put
rm
get
mkdir

1、用户加密认证

2、允许多用户同时登陆

3、每个用户有自己的家目录,且只可以访问自己的家目录

4、运行在自己家目录下随意切换目录

5、允许上传下载文件,且文件一致

6、传输过程中显示进度条
server main 代码:

# Author by Andy
# _*_ coding:utf-8 _*_
import os, sys, json, hashlib, socketserver, time
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
sys.path.append(base_dir)
from conf import userdb_set
class Ftp_server(socketserver.BaseRequestHandler):
 user_home_dir = ''
 def auth(self, *args):
  '''验证用户名及密码'''
  cmd_dic = args[0]
  username = cmd_dic["username"]
  password = cmd_dic["password"]
  f = open(userdb_set.userdb_set(), 'r')
  user_info = json.load(f)
  if username in user_info.keys():
   if password == user_info[username]:
    self.request.send('0'.encode())
    os.chdir('/home/%s' % username)
    self.user_home_dir = os.popen('pwd').read().strip()
    data = "%s login successed" % username
    self.loging(data)
   else:
    self.request.send('1'.encode())
    data = "%s login failed" % username
    self.loging(data)
    f.close
  else:
   self.request.send('1'.encode())
   data = "%s login failed" % username
   self.loging(data)
   f.close
   ##########################################
 def get(self, *args):
  '''给客户端传输文件'''
  request_code = {
   '0': 'file is ready to get',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  if os.path.isfile(filename):
   self.request.send('0'.encode('utf-8')) # 确认文件存在
   self.request.recv(1024)
   self.request.send(str(os.stat(filename).st_size).encode('utf-8'))
   self.request.recv(1024)
   m = hashlib.md5()
   f = open(filename, 'rb')
   for line in f:
    m.update(line)
    self.request.send(line)
   self.request.send(m.hexdigest().encode('utf-8'))
   print('From server:Md5 value has been sended!')
   f.close()
  else:
   self.request.send('1'.encode('utf-8'))
   ###########################################
 def cd(self, *args):
  '''执行cd命令'''
  user_current_dir = os.popen('pwd').read().strip()
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  if path.startswith('/'):
   if self.user_home_dir in path:
    os.chdir(path)
    new_dir = os.popen('pwd').read()
    user_current_dir = new_dir
    self.request.send('Change dir successfully!'.encode("utf-8"))
    data = 'Change dir successfully!'
    self.loging(data)
   elif os.path.exists(path):
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
   else:
    self.request.send('Directory not found!'.encode("utf-8"))
    data = 'Directory not found!'
    self.loging(data)
  elif os.path.exists(path):
   os.chdir(path)
   new_dir = os.popen('pwd').read().strip()
   if self.user_home_dir in new_dir:
    self.request.send('Change dir successfully!'.encode("utf-8"))
    user_current_dir = new_dir
    data = 'Change dir successfully!'
    self.loging(data)
   else:
    os.chdir(user_current_dir)
    self.request.send('Permission Denied!'.encode("utf-8"))
    data = 'Permission Denied!'
    self.loging(data)
  else:
   self.request.send('Directory not found!'.encode("utf-8"))
   data = 'Directory not found!'
   self.loging(data)
   ###########################################
 def rm(self, *args):
  request_code = {
   '0': 'file exist,and Please confirm whether to rm',
   '1': 'file not found!'
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic['filename']
  if os.path.exists(filename):
   self.request.send('0'.encode("utf-8")) # 确认文件存在
   client_response = self.request.recv(1024).decode()
   if client_response == '0':
    os.popen('rm -rf %s' % filename)
    self.request.send(('File %s has been deleted!' % filename).encode("utf-8"))
    self.loging('File %s has been deleted!' % filename)
   else:
    self.request.send(('File %s not deleted!' % filename).encode("utf-8"))
    self.loging('File %s not deleted!' % filename)
  else:
   self.request.send('1'.encode("utf-8"))
   ########################################
 def pwd(self, *args):
  '''执行pwd命令'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  server_response = os.popen('pwd').read().strip().encode("utf-8")
  self.request.send(server_response)
 #############################################
 def ls(self, *args):
  '''执行ls命名'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  path = cmd_dic['path']
  cmd = 'ls -l %s' % path
  server_response = os.popen(cmd).read().encode("utf-8")
  self.request.send(server_response)
 ############################################
 def put(self, *args):
  '''接收客户端文件'''
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  filename = cmd_dic["filename"]
  filesize = cmd_dic["size"]
  if os.path.isfile(filename):
   f = open(filename + '.new', 'wb')
  else:
   f = open(filename, 'wb')
  request_code = {
   '200': 'Ready to recceive data!',
   '210': 'Not ready to received data!'
  }
  self.request.send('200'.encode())
  receive_size = 0
  while True:
   if receive_size < filesize:
    data = self.request.recv(1024)
    f.write(data)
    receive_size += len(data)
   else:
    data = "File %s has been uploaded successfully!" % filename
    self.loging(data)
    print(data)
    break
    ################################################
 def mkdir(self, *args):
  request_code = {
   &#39;0&#39;: &#39;Directory has been made!&#39;,
   &#39;1&#39;: &#39;Directory is aleady exist!&#39;
  }
  cmd_dic = args[0]
  self.loging(json.dumps(cmd_dic))
  dir_name = cmd_dic[&#39;dir_name&#39;]
  if os.path.exists(dir_name):
   self.request.send(&#39;1&#39;.encode("utf-8"))
  else:
   os.popen(&#39;mkdir %s&#39; % dir_name)
   self.request.send(&#39;0&#39;.encode("utf-8"))
   #############################################
  def loging(self, data):
  &#39;&#39;&#39;日志记录&#39;&#39;&#39;
  localtime = time.asctime(time.localtime(time.time()))
  log_file = &#39;/root/ftp/ftpserver/log/server.log&#39;
  with open(log_file, &#39;a&#39;, encoding=&#39;utf-8&#39;) as f:
   f.write(&#39;%s-->' % localtime + data + '\n')
   ##############################################
 def handle(self):
  # print("您本次访问使用的IP为:%s" %self.client_address[0])
  # localtime = time.asctime( time.localtime(time.time()))
  # print(localtime)
  while True:
   try:
    self.data = self.request.recv(1024).decode() #
    # print(self.data)
    cmd_dic = json.loads(self.data)
    action = cmd_dic["action"]
    # print("用户请求%s"%action)
    if hasattr(self, action):
     func = getattr(self, action)
     func(cmd_dic)
   except Exception as e:
    self.loging(str(e))
    break
def run():
 HOST, PORT = '0.0.0.0', 6969
 print("The server is started,and listenning at port 6969")
 server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server)
 server.serve_forever()
if name == 'main':
 run()

设置用户口令代码:

#Author by Andy
#_*_ coding:utf-8 _*_
import os,json,hashlib,sys
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(file)))
userdb_file = base_dir+"\data\\userdb"
# print(userdb_file)
def userdb_set():
 if os.path.isfile(userdb_file):
  # print(userdb_file)
  return userdb_file
 else:
  print('请先为您的服务器创建用户!')
  user_data = {}
  dict={}
  Exit_flags = True
  while Exit_flags:
   username = input("Please input username:")
   if username != 'exit':
    password = input("Please input passwod:")
    if password != 'exit':
      user_data.update({username:password})
      m = hashlib.md5()
      # m.update('hello')
      # print(m.hexdigest())
      for i in user_data:
       # print(i,user_data[i])
       m.update(user_data[i].encode())
       dict.update({i:m.hexdigest()})
    else:
     break
   else:
    break
  f = open(userdb_file,'w')
  json.dump(dict,f)
  f.close()
 return userdb_file

目录结构

使用python3实现ftp服务功能实例(服务端 For Linux)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn