1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
from fabric import Connection
from pathlib import Path
import stat
import logging
class FabConnection:
def __init__(self, host, user, password, port=22, mylogger=None):
self._host = host
self._user = user
self._password = password
self._port = port
self._sftp = None
self._conn = None
self.mylogger = mylogger
self.connect()
def connect(self):
# 密钥文件方式
# connect_kwargs={"key_filename": "/home/myuser/.ssh/private.key"},
conn = Connection(f"{self._user}@{self._host}:{self._port}", connect_kwargs={"password": self._password})
self._conn = conn
self._sftp = conn.sftp()
def get(self, remote_path, local_path):
local_path, remote_path = Path(local_path), Path(remote_path)
if self.remote_isdir(remote_path):
filepaths = self.traverse_remote_files(remote_path, local_path)
for filepath in filepaths:
relpath = filepath.relative_to(remote_path)
local = local_path.joinpath(relpath)
try:
self._conn.get(self.normpath(filepath), self.normpath(local))
self.writer(f"download {relpath} successful!")
except FileNotFoundError:
self.writer(f"FileNotFoundError: {filepath} -> {local}", level='error')
else:
if not local_path.suffix: # 如果本地路径没有提供文件名,默认为远程文件名
local_path = local_path.joinpath(remote_path.name)
try:
self._conn.get(self.normpath(remote_path), self.normpath(local_path))
self.writer(f"download {remote_path} successful!")
except FileNotFoundError:
self.writer(f"FileNotFoundError: {remote_path} -> {local_path}", level='error')
def put(self, local_path, remote_path):
local_path, remote_path = Path(local_path), Path(remote_path)
if local_path.is_dir():
for path in local_path.rglob('[!.]*'):
# 拼接远程路径,relative_to获取相对路径
remote = remote_path.joinpath(path.relative_to(local_path))
if path.is_file():
self.check_remote_path(remote.parent, is_mkdir=True) # 目标机器上不存在此路径需要创建
self._conn.put(self.normpath(path), self.normpath(remote))
self.writer(f"upload {path} successful!")
# 上传单个文件
else:
self.check_remote_path(remote_path.parent, is_mkdir=True)
if self.remote_isdir(remote_path): # 若远程路径是一个目录,就将本地文件名作为默认名字
remote_path = remote_path.joinpath(local_path.name)
self._conn.put(self.normpath(local_path), self.normpath(remote_path))
self.writer(f"upload the file {local_path} successful!")
def traverse_remote_files(self, remote_path: Path, local_path: Path):
files_attr = self._sftp.listdir_attr(self.normpath(remote_path))
for file_attr in files_attr:
filename = file_attr.filename
if filename.startswith('.'): # 过滤以点开头的目录或文件
continue
# 此处的local和remote可能为目录,也可能为文件
local, remote = local_path.joinpath(filename), remote_path.joinpath(filename)
# 若为目录,则递归调用
if stat.S_ISDIR(file_attr.st_mode): # st_mode判断文件类型(目录还是文件)
yield from self.traverse_remote_files(remote, local)
else:
yield remote
def close(self):
if self._conn:
self._conn.close()
if self._sftp:
self._sftp.close()
self._conn, self._sftp = None, None
def writer(self, message, level=None):
"""
自定义写入文件方法,同时支持logger或文件对象
:param message:
:param level:
:return:
"""
if self.mylogger:
if isinstance(self.mylogger, logging.Logger):
if not level:
self.mylogger.info(message)
elif level == 'error':
self.mylogger.error(message)
else:
self.mylogger.warning(message)
else:
self.mylogger.write(message + "\n")
else:
print(message + "\n")
@staticmethod
def normpath(path):
"""
由于windows和linux操作系统不同,路径格式会出现不统一的情况,反斜杠不处理的话会出现很多问题
替换windows路径中的\
:param path:
:return:
"""
if isinstance(path, Path):
path = str(path)
return path.replace('\\', '/')
def remote_isdir(self, remote_path):
"""
检查一个远程路径是否为目录
:param remote_path:
:return:
"""
attr = self._sftp.lstat(self.normpath(remote_path))
return stat.S_ISDIR(attr.st_mode)
def check_remote_path(self, remote_path, is_mkdir=False):
"""
判断目标机器路径是否存在
:param remote_path:
:param is_mkdir: 若为True,则会创建此路径
:return:
"""
remote_path = self.normpath(remote_path)
try:
self._sftp.lstat(remote_path)
return True
except FileNotFoundError:
if is_mkdir:
try:
self._sftp.mkdir(remote_path)
except PermissionError:
ret = self.run(f"sudo mkdir {remote_path}", pty=True, watchers=[self.sudo_pass])
return not ret[2]
else:
return False
@property
def sudo_pass(self):
sudopass = Responder(pattern=fr'[sudo] password for {self._user}:', response=f'{self._password}\n', )
return sudopass
def run(self, cmd, hide=False, warn=True, pty=False, watchers=None):
# hide=True控制台不打印运行信息,当out_stream不为空时,hide参数无效
ret = self._conn.run(cmd,
hide=hide,
warn=warn,
pty=pty,
out_stream=self.mylogger,
err_stream=self.mylogger,
watchers=watchers,
encoding="utf8"
)
stdout, stderr = ret.stdout.strip(), ret.stderr.strip()
return stdout, stderr, ret.failed
@property
def conn(self):
return self._conn
demo = FabConnection(host='172.16.101.222',user='xj',password='king')
demo.get('/home/xj/Desktop/dir_a','./')
demo.put('./','/home/xj/Desktop/dir_cc')
demo.run('ls -l')
|