class SaltAPI(object):
def __init__(self, url=None, user=None, password=None):
self.__url = settings.SALT_API_URL
self.__user = settings.SALT_API_USER
self.__password = settings.SALT_API_PASSWORD
self.__headers = {'Accept': 'application/json', 'Content-type': 'application/json', 'Connection': 'close'}
self.__data = {'client': 'local'}
self.__token = None
def get_token(self):
"""
用戶登陸和獲取token
:return:
"""
params = {'eauth': 'pam', 'username': self.__user, 'password': self.__password}
content = self.postRequest(params, self.__headers, prefix='login')
try:
self.__token = content['return'][0]['token']
self.__headers['X-Auth-Token'] = self.__token
except Exception as e:
logger.error(e)
return content
def get_grains(self, target=None):
"""
獲取系統(tǒng)基礎(chǔ)信息
:return:
"""
data = copy.deepcopy(self.__data)
if target:
data['tgt'] = target
else:
data['tgt'] = '*'
data['fun'] = 'grains.items'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def get_auth_keys(self):
"""
獲取所有已認(rèn)證的主機(jī)
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['data']['return']['minions']
except Exception as e:
logger.error(e)
return content
def get_minion_status(self):
"""
獲取所有主機(jī)的連接狀態(tài)
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'manage.status'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def delete_key(self, minion=None):
'''
刪除指定主機(jī)的認(rèn)證信息
'''
if not minion:
return {'success': False, 'msg': 'minion-id is none'}
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.delete'
data['match'] = minion
content = self.postRequest(data, self.__headers)
try:
return {'success': content['return'][0]['data']['success']}
except Exception as e:
logger.error(e)
return content
def minion_alive(self, minion=None):
'''
Minion主機(jī)存活檢測
'''
data = copy.deepcopy(self.__data)
if minion:
data['tgt'] = minion
result = {minion: False}
else:
data['tgt'] = '*'
result = {'success': False}
data['fun'] = 'test.ping'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return result
def passwd(self, target=None, user=None, password=None, pass_length=16):
"""
修改密碼
:param target: 目標(biāo)客戶端
:param user: 目標(biāo)客戶端的系統(tǒng)用戶名
:param password: 新的密碼,必須大于等于12位
:return:
"""
if not target:
return {'success': False, 'msg': 'target is none.'}, password
if not user:
return {'success': False, 'msg': 'user is none.'}, password
if password:
if len(password) < pass_length:
return {'success': False, 'msg': 'password must be greater than or equal to {} bits.'.format(pass_length)}, password
if password.isalpha() or password.isdigit() or password.islower() or password.isupper():
return {'success': False, 'msg': 'password must be have lowercase, uppercase and digit.'}, password
else:
password = make_pass(pass_length)
_password = crypt(password, 'cmdb')
try:
self.cmd(target=target, arg='usermod -p "{}" {}'.format(_password, user))
except Exception as e:
logger.debug(e)
res = {'success': True,
'msg': 'Changing password for user {}.all authentication tokens updated successfully.'.format(user),
}
return res, password
def get_users(self, target=None):
"""
獲取系統(tǒng)用戶
:param target: 目標(biāo)客戶端
:return:
"""
if not target: return {'success': False, 'msg': 'target is none.'}
content = self.cmd(target=target, arg="grep /bin/bash /etc/passwd|awk -F ':' '{print $1}'")
return content
def run_cmdb_agent(self, target=None):
"""
運(yùn)行cmdb agent
:param target: 目標(biāo)客戶端
:return:
"""
if not target:
return {'success': False, 'msg': 'target is none.'}
content = self.cmd(target=target, arg="/etc/init.d/vmagent")
return content
def cmd(self, target=None, fun='cmd.run', arg=None, async=False):
"""
遠(yuǎn)程執(zhí)行任務(wù)
:param target: 目標(biāo)客戶端,為空return False
:param fun: 模塊
:param arg: 參數(shù),可為空
:param async: 異步執(zhí)行,默認(rèn)非異步
:return:
"""
data = copy.deepcopy(self.__data)
if not target:
return {'success': False, 'msg': 'target is none'}
if arg:
data['arg'] = arg
if async:
data['client'] = 'local_async'
data['tgt'] = target
data['fun'] = fun
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def jobs(self, fun=None, jid=None):
"""
任務(wù)
:param fun: active,detail
:param jid: Job ID
:return:
"""
data = {'client': 'runner'}
if fun == 'active':
data['fun'] = 'jobs.active'
elif fun == 'detail':
if not jid: return {'success': False, 'msg': 'job id is none'}
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
else:
return {'success': False, 'msg': 'fun is active or detail'}
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def postRequest(self, data, headers, prefix=None):
if prefix:
url = '{}/{}'.format(self.__url, prefix)
else:
url = self.__url
try:
s = requests.Session()
s.mount('https://', HTTPAdapter(max_retries=10))
ret = s.post(url, data=json.dumps(data), headers=headers, verify=False, timeout=(30, 60))
if ret.status_code == 401:
logger.error('Salt Unauthorized')
return {'return': [{'success': False, 'msg': 'Salt Unauthorized'}]}
elif ret.status_code == 200:
return ret.json()
else:
return {'return': [{'success': False, 'msg': ret.content}]}
except Exception as e:
logger.error(e)
return {'return': [{'success': False, 'msg': e}]}
def get_pre_auth_keys(self):
"""
獲取未授權(quán)的salt主機(jī)
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['data']['return']['minions_pre']
except Exception as e:
logger.error(e)
return content
def accept_key(self, minion):
"""
授權(quán)salt主機(jī)
:param minion:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'wheel'
data['fun'] = 'key.accept'
data['match'] = minion
content = self.postRequest(data, self.__headers)
try:
return {'success': content['return'][0]['data']['success']}
except Exception as e:
logger.error(e)
return content
def salt_alive(self, tgt):
'''
Minion主機(jī)存活檢測
'''
data = copy.deepcopy(self.__data)
if tgt:
data['tgt'] = tgt
else:
data['tgt'] = '*'
data['fun'] = 'test.ping'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return None
def getRequest(self, headers, prefix=None):
if prefix:
url = '{}/{}'.format(self.__url, prefix)
else:
url = self.__url
try:
s = requests.Session()
s.mount('https://', HTTPAdapter(max_retries=10))
ret = s.get(url, headers=headers, verify=False, timeout=(30, 60))
if ret.status_code == 401:
logger.error('Salt Unauthorized')
return {'return': [{'success': False, 'msg': 'Salt Unauthorized'}]}
elif ret.status_code == 200:
return ret.json()
except Exception as e:
logger.error(e)
return {'return': [{'success': False, 'msg': e}]}
def salt_runner_requests(self, jid):
'''
通過jid獲取執(zhí)行結(jié)果
'''
content = self.getRequest(prefix='/jobs/{}'.format(jid), headers=self.__headers)
return content
def salt_runner(self, jid):
"""
獲取job的執(zhí)行結(jié)果
:param jid:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
content = self.postRequest(data, self.__headers)
return content
def salt_running_jobs(self):
"""
獲取在運(yùn)行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['clent'] = 'runner'
data['fun'] = 'jobs.active'
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def remote_execution(self, tgt, fun, arg, expr_form):
"""
異步執(zhí)行遠(yuǎn)程指令
:param tgt:
:param fun:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local_async'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
def remote_module(self, tgt, fun, arg, kwarg, expr_form, client='local_async'):
"""
異步部署模塊
:param tgt:
:param fun:
:param arg:
:param kwarg:
:param expr_form:
:param client local_async 異步 or local 同步
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = client
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['kwarg'] = {"pillar": kwarg}
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
if client == "local_async":
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
else:
return content
def remote_localexec(self, tgt, fun, arg, expr_form):
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]['jid']
except Exception as e:
logger.error(e)
return content
def salt_state(self, tgt, arg, expr_form):
"""
sls文件
:param tgt:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = 'state.sls'
data['arg'] = arg
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def project_manage(self, tgt, fun, arg1, arg2, arg3, arg4, arg5, expr_form):
"""
項(xiàng)目管理
:param tgt:
:param fun:
:param arg1:
:param arg2:
:param arg3:
:param arg4:
:param arg5:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['arg3'] = arg3
data['arg4'] = arg4
data['arg5'] = arg5
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_copy(self, tgt, fun, arg1, arg2, expr_form):
"""
文件copy
:param tgt: ./
:param fun: file.manager
:param arg1:
:param arg2:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_bak(self, tgt, fun, arg, expr_form):
"""
文件備份到master上
:param tgt:
:param fun:
:param arg:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def file_manage(self, tgt, fun, arg1, arg2, arg3, expr_form):
"""
文件回滾
:param tgt:
:param fun:
:param arg1:
:param arg2:
:param arg3:
:param expr_form:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
data['arg'] = arg1
data['arg2'] = arg2
data['arg3'] = arg3
data['expr_form'] = expr_form
content = self.postRequest(data, self.__headers)
try:
return content['return'][0]
except Exception as e:
logger.error(e)
return content
def remote_server_info(self, tgt, fun):
"""
獲取遠(yuǎn)程主機(jī)信息
:param tgt:
:param fun:
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'local'
data['tgt'] = tgt
data['fun'] = fun
content = self.postRequest(data, self.__headers)
try:
return content['return'][0][tgt]
except Exception as e:
logger.error(e)
return content
def get_list_job(self):
"""
獲取正在運(yùn)行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.list_jobs'
content = self.postRequest(data, self.__headers)
ret = content['return'][0]
return [{k: v} for k, v in zip(ret.keys(), ret.values())]
def get_running_job(self):
"""
獲取正在運(yùn)行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.active'
content = self.postRequest(data, self.__headers)
ret = content['return'][0]
return [{k: v} for k, v in zip(ret.keys(), ret.values())]
def term_running_job(self, jid):
"""
終止正在運(yùn)行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['tgt'] = '*'
data['client'] = 'local'
data['fun'] = 'saltutil.term_job'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content['return'][0]
def get_job_info(self, jid):
"""
獲取正在運(yùn)行的job
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.lookup_jid'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content
def check_job_result(self, jid):
"""
檢查job是否已經(jīng)運(yùn)行完成
:return:
"""
data = copy.deepcopy(self.__data)
data['client'] = 'runner'
data['fun'] = 'jobs.exit_success'
data['arg'] = jid
content = self.postRequest(data, self.__headers)
return content
使用方式
client = SaltAPI()
client.get_token()
然后用client調(diào)用方法
異步執(zhí)行模塊部署
入?yún)?module.module_path # 模塊路徑
"tgt_list": [
"SHTL00706921"
],
"arg": "init.6_env_init"
expr_form = 'list'
調(diào)用并返回jid
jid = sapi.remote_module(tgt_select, 'state.sls', 'module.{}.{}'.format(module.module_path, module.module),
{'SALTSRC': 'module/{}'.format(module.module_path)}, expr_form)