107 lines
3.2 KiB
Python
107 lines
3.2 KiB
Python
#!/usr/bin/python
|
|
# -*- coding:utf-8 _*-
|
|
"""
|
|
@author: pengtao
|
|
@file: mftp.py
|
|
@time: 2020/04/07
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import socket
|
|
import ftplib
|
|
|
|
|
|
class MyFTP:
|
|
def __init__(self, data):
|
|
self.host = data['host']
|
|
self.port = data.get('port', 21)
|
|
self.ftp_dirs = data['ftp_dirs']
|
|
self.user = data['user']
|
|
self.passwd = data['passwd']
|
|
|
|
def get_file_size(self, type='text'):
|
|
with ftplib.FTP(self.ftp_dirs) as ftp:
|
|
try:
|
|
ftp.login(self.user, self.passwd)
|
|
if type == 'text':
|
|
size = ftp.size('debian/README')
|
|
elif type == "binary":
|
|
# TYPE A for ASCII mode
|
|
ftp.sendcmd('TYPE I')
|
|
size = ftp.size('debian/ls-lR.gz')
|
|
else:
|
|
return None
|
|
except ftplib.all_errors as e:
|
|
print('FTP error:', e)
|
|
return size
|
|
|
|
def create_director(self):
|
|
with ftplib.FTP(self.ftp_dirs) as ftp:
|
|
try:
|
|
ftp.login(self.user, self.passwd)
|
|
wdir = ftp.pwd()
|
|
print(wdir)
|
|
|
|
ftp.cwd('debian')
|
|
|
|
wdir2 = ftp.pwd()
|
|
print(wdir2)
|
|
|
|
except ftplib.all_errors as e:
|
|
print('FTP error:', e)
|
|
|
|
def upload(self):
|
|
with ftplib.FTP(self.ftp_dirs) as ftp:
|
|
filename = 'README'
|
|
try:
|
|
ftp.login('user7', 's$cret')
|
|
ftp.cwd(self.ftp_path)
|
|
ftp.set_pasv(1)
|
|
local_file = os.path.join(self.local, filename)
|
|
with open(local_file, 'rb') as fp:
|
|
res = ftp.storlines("STOR " + filename, fp)
|
|
if not res.startswith('226 Transfer complete'):
|
|
print('Upload failed')
|
|
except ftplib.all_errors as e:
|
|
print('FTP error:', e)
|
|
|
|
def download(self):
|
|
with ftplib.FTP(self.ftp_dirs) as ftp:
|
|
file_orig = '/debian/README'
|
|
file_copy = 'README'
|
|
try:
|
|
ftp.login()
|
|
with open(file_copy, 'w') as fp:
|
|
res = ftp.retrlines('RETR ' + file_orig, fp.write)
|
|
if not res.startswith('226 Transfer complete'):
|
|
print('Download failed')
|
|
if os.path.isfile(file_copy):
|
|
os.remove(file_copy)
|
|
|
|
except ftplib.all_errors as e:
|
|
print('FTP error:', e)
|
|
|
|
if os.path.isfile(file_copy):
|
|
os.remove(file_copy)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
data = {'host': '10.10.3.10', 'user': 'ftp', 'passwd': 'kingsome2018'}
|
|
my_ftp = MyFTP(data)
|
|
|
|
# 下载单个文件
|
|
my_ftp.download_file("G:/ftp_test/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk")
|
|
|
|
# 下载目录
|
|
# my_ftp.download_file_tree("G:/ftp_test/", "App/AutoUpload/ouyangpeng/I12/")
|
|
|
|
# 上传单个文件
|
|
# my_ftp.upload_file("G:/ftp_test/Release/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk")
|
|
|
|
# 上传目录
|
|
# my_ftp.upload_file_tree("G:/ftp_test/", "/App/AutoUpload/ouyangpeng/I12/")
|
|
|
|
my_ftp.close()
|