m1/mftp.py
2020-04-17 11:37:13 +08:00

107 lines
3.2 KiB
Python

#!/usr/bin/python
# -*- coding:utf-8 _*-
"""
@author: pengtao
@file: mftp.py
@time: 2020/04/07
"""
import os
import sys
import time
import socket
import ftplib
class MyFTP:
def __init__(self, data):
self.host = data['host']
self.port = data.get('port', 21)
self.ftp_dirs = data['ftp_dirs']
self.user = data['user']
self.passwd = data['passwd']
def get_file_size(self, type='text'):
with ftplib.FTP(self.ftp_dirs) as ftp:
try:
ftp.login(self.user, self.passwd)
if type == 'text':
size = ftp.size('debian/README')
elif type == "binary":
# TYPE A for ASCII mode
ftp.sendcmd('TYPE I')
size = ftp.size('debian/ls-lR.gz')
else:
return None
except ftplib.all_errors as e:
print('FTP error:', e)
return size
def create_director(self):
with ftplib.FTP(self.ftp_dirs) as ftp:
try:
ftp.login(self.user, self.passwd)
wdir = ftp.pwd()
print(wdir)
ftp.cwd('debian')
wdir2 = ftp.pwd()
print(wdir2)
except ftplib.all_errors as e:
print('FTP error:', e)
def upload(self):
with ftplib.FTP(self.ftp_dirs) as ftp:
filename = 'README'
try:
ftp.login('user7', 's$cret')
ftp.cwd(self.ftp_path)
ftp.set_pasv(1)
local_file = os.path.join(self.local, filename)
with open(local_file, 'rb') as fp:
res = ftp.storlines("STOR " + filename, fp)
if not res.startswith('226 Transfer complete'):
print('Upload failed')
except ftplib.all_errors as e:
print('FTP error:', e)
def download(self):
with ftplib.FTP(self.ftp_dirs) as ftp:
file_orig = '/debian/README'
file_copy = 'README'
try:
ftp.login()
with open(file_copy, 'w') as fp:
res = ftp.retrlines('RETR ' + file_orig, fp.write)
if not res.startswith('226 Transfer complete'):
print('Download failed')
if os.path.isfile(file_copy):
os.remove(file_copy)
except ftplib.all_errors as e:
print('FTP error:', e)
if os.path.isfile(file_copy):
os.remove(file_copy)
if __name__ == "__main__":
data = {'host': '10.10.3.10', 'user': 'ftp', 'passwd': 'kingsome2018'}
my_ftp = MyFTP(data)
# 下载单个文件
my_ftp.download_file("G:/ftp_test/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk")
# 下载目录
# my_ftp.download_file_tree("G:/ftp_test/", "App/AutoUpload/ouyangpeng/I12/")
# 上传单个文件
# my_ftp.upload_file("G:/ftp_test/Release/XTCLauncher.apk", "/App/AutoUpload/ouyangpeng/I12/Release/XTCLauncher.apk")
# 上传目录
# my_ftp.upload_file_tree("G:/ftp_test/", "/App/AutoUpload/ouyangpeng/I12/")
my_ftp.close()