#!/usr/bin/python # -*- coding:utf-8 _*- """ @author: pengtao @file: mftp.py @time: 2020/04/07 """ import os import sys import time import socket import ftplib class MyFTP: def __init__(self, data): self.host = data['host'] self.port = data.get('port', 21) self.ftp_dirs = data['ftp_dirs'] self.user = data['user'] self.passwd = data['passwd'] def get_file_size(self, type='text'): with ftplib.FTP(self.ftp_dirs) as ftp: try: ftp.login(self.user, self.passwd) if type == 'text': size = ftp.size('debian/README') elif type == "binary": # TYPE A for ASCII mode ftp.sendcmd('TYPE I') size = ftp.size('debian/ls-lR.gz') else: return None except ftplib.all_errors as e: print('FTP error:', e) return size def create_director(self): with ftplib.FTP(self.ftp_dirs) as ftp: try: ftp.login(self.user, self.passwd) wdir = ftp.pwd() print(wdir) ftp.cwd('debian') wdir2 = ftp.pwd() print(wdir2) except ftplib.all_errors as e: print('FTP error:', e) def upload(self): with ftplib.FTP(self.ftp_dirs) as ftp: filename = 'README' try: ftp.login('user7', 's$cret') ftp.cwd(self.ftp_path) ftp.set_pasv(1) local_file = os.path.join(self.local, filename) with open(local_file, 'rb') as fp: res = ftp.storlines("STOR " + filename, fp) if not res.startswith('226 Transfer complete'): print('Upload failed') except ftplib.all_errors as e: print('FTP error:', e) def download(self): with ftplib.FTP(self.ftp_dirs) as ftp: file_orig = '/debian/README' file_copy = 'README' try: ftp.login() with open(file_copy, 'w') as fp: res = ftp.retrlines('RETR ' + file_orig, fp.write) if not res.startswith('226 Transfer complete'): print('Download failed') if os.path.isfile(file_copy): os.remove(file_copy) except ftplib.all_errors as e: print('FTP error:', e) if os.path.isfile(file_copy): os.remove(file_copy) if __name__ == "__main__": data = {'host': '10.10.3.10', 'user': 'ftp', 'passwd': 'kingsome2018'} my_ftp = MyFTP(data) # 下载单个文件 my_ftp.download_file("G:/ftp_test/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") # 下载目录 # my_ftp.download_file_tree("G:/ftp_test/", "App/AutoUpload/ouyangpeng/I12/") # 上传单个文件 # my_ftp.upload_file("G:/ftp_test/Release/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk") # 上传目录 # my_ftp.upload_file_tree("G:/ftp_test/", "/App/AutoUpload/ouyangpeng/I12/") my_ftp.close()