Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from functools import partial
- import paramiko
- import stat
- import os
- import os.path as op
- def eprint(verbose=True, *args, **kwargs):
- if verbose:
- print(*args, **kwargs)
- def unix_path_join(parent, child):
- """兼容 *nix/dos 系统版本的 os.path.join"""
- return op.join(parent, child).replace(os.sep, '/')
- class SFTPDownloader:
- def __init__(self, host, username, pkeyfile=None, password=None, port=22, verbose=False):
- self._eprint = partial(eprint, verbose, flush=True)
- self.host = host
- self.username = username
- self.password = password
- self.pkeyfile = pkeyfile
- self.port = port
- self.connect_sftp_server()
- def connect_sftp_server(self):
- """连接 SFTP 服务器"""
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- if self.pkeyfile is not None:
- try:
- pkey = paramiko.RSAKey.from_private_key_file(
- filename=self.pkeyfile)
- ssh.connect(hostname=self.host,
- username=self.username, pkey=pkey)
- except Exception as e:
- self._eprint('[ERR] failed to connect {} '
- 'using rsa key <{}> | <{}>'
- .format(self.host, self.pkeyfile, e))
- else:
- self._eprint('[INFO] connect to {}@{} '
- 'successfully (using rsa key)'
- .format(self.username, self.host))
- self.sftp = paramiko.SSHClient.open_sftp(ssh)
- if self.password is not None:
- try:
- ts = paramiko.Transport((self.host, self.port))
- ts.connect(username=self.username, password=self.password)
- except Exception as e:
- self._eprint('[ERR] failed to connect {} '
- 'using username/password | <{}>'
- .format(self.host, e))
- else:
- self._eprint('[INFO] connect to {}@{} successfully'
- '(using username/password)'
- .format(self.username, self.host))
- self.sftp = paramiko.SFTPClient.from_transport(ts)
- def download(self, remotefile, localdir='.'):
- """下载单个文件至指定本地目录"""
- if not op.exists(localdir):
- os.makedirs(localdir)
- try:
- self.sftp.get(remotefile, unix_path_join(
- localdir, op.basename(remotefile)))
- except Exception:
- return False
- else:
- return True
- def download_dir(self, rootdir, localdir='.', include=True):
- """
- 递归下载远程目录
- @param rootdir 远程根目录
- @param localdir 本地根目录
- @param include 是否包括远程目录根目录本身
- """
- rootdir = rootdir.rstrip('/\\')
- localdir = localdir.rstrip('/\\')
- if include:
- prefix = len(op.dirname(rootdir)) + 1
- else:
- prefix = len(rootdir) + 1
- def iterdirs(remotedir):
- for child in self.sftp.listdir_attr(remotedir):
- if stat.S_ISDIR(child.st_mode):
- local_dir = unix_path_join(unix_path_join(
- localdir, remotedir[prefix:]), child.filename)
- try:
- os.makedirs(local_dir)
- except Exception:
- pass
- iterdirs(unix_path_join(remotedir, child.filename))
- else:
- remote_file = unix_path_join(remotedir, child.filename)
- local_dir = unix_path_join(localdir, remotedir[prefix:])
- self._eprint('[INFO] downloading <{}> ... '
- .format(remote_file.replace(op.sep, '/')), end='')
- if self.download(remote_file, local_dir):
- self._eprint('success!')
- else:
- self._eprint('fail!')
- iterdirs(rootdir)
- def test():
- # 用户名-密钥文件
- # host = 'localhost'
- # username = 'username'
- # pkeyfile = '/home/xx/.ssh/id_rsa'
- # sftp = SFTPDownloader(host, username, pkeyfile, verbose=True)
- # sftp.download_dir('remote/dir', 'local/dest', False)
- # 用户名-密码
- # host = 'localhost'
- # port = 10086
- # username = 'username'
- # password = 'password'
- # sftp = SFTPDownloader(host, username, password=password, port=port, verbose=True)
- # sftp.download_dir('remote/dir/', 'local/dest', True)
- pass
- if __name__ == "__main__":
- test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement