import requests
import json
import time
from dingtalk.crypto import DingTalkCrypto
from django.conf import settings
# settings.BASE_DIR
class Crypto(object):
def __init__(self, token):
# 随便填的字符串
self.token = token
# 自己生成的43位随机字符串
self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN")
# 钉钉企业ID
self.corp_id = settings.DINGDING.get("CorpId") #
print("corp_id:", self.corp_id)
self.nonce = settings.DINGDING.get("nonce")
self.crypto = DingTalkCrypto(
token=self.nonce,
encoding_aes_key=self.aes_key,
corpid_or_suitekey=self.corp_id
)
def encrypt_success(self):
# 返回加密success
result = self.crypto.encrypt_message(
msg="success",
nonce=self.nonce,
timestamp=int(time.time()*1000)
)
return result
class DING(object):
def __init__(self, approve_process):
self.AgentId = settings.DINGDING.get("AgentId")
self.AppKey = settings.DINGDING.get("AppKey")
self.AppSecret = settings.DINGDING.get("AppSecret")
self.dingding_url = settings.DINGDING.get("URL")
self.process_code = settings.DINGDING.get("APPROVE_PROCESS").get(approve_process)['process_code']
self.aes_key = settings.DINGDING.get("DINGTALK_AES_TOKEN")
self.nonce = settings.DINGDING.get("nonce")
def get_token(self):
'''
获取钉钉的token
:return: 钉钉token
'''
url = self.dingding_url + '/gettoken?appkey={}appsecret={}'.format(self.AppKey, self.AppSecret)
req = requests.get(url)
req = json.loads(req.text)
return req['access_token']
# def createCallbackDd():
# '''
# 注册钉钉回调函数
# :return:
# '''
# url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token=' + self.getToken()
# data = {
# "call_back_tag": ["bpms_task_change", "bpms_instance_change"], #这两个回调种类是审批的
# "token": TOKEN, #自定义的字符串
# "aes_key": AES_KEY, #自定义的43位字符串,密钥
# "url": URL #回调地址
# }
# requests.post(url, data=json.dumps(data))
# return ('OK')
def create_process(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list, has_cc=0):
'''
创建钉钉审批
approvers为list 元素为钉钉userid cc_list同理
'''
url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
print("form_component_value_vo:", form_component_value_vo)
if has_cc == 0:
data = {
'agent_id': self.AgentId,
'process_code': self.process_code, #工单id
'originator_user_id': originator_user_id,
'dept_id': dept_id, #创建人的钉钉部门id
'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
'approvers': approvers,
'cc_list': cc_list,
'cc_position': 'START_FINISH' # 发起和完成时与抄送
}
else:
data = {
'agent_id': self.AgentId,
'process_code': self.process_code, #工单id
'originator_user_id': originator_user_id, #创建人的钉钉userid
'dept_id': dept_id, #创建人的钉钉部门id
'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
'approvers': approvers,
}
print("dingding_utils:", data)
response = requests.post(url, data=data)
return response.json()
def get_status(self, process_instance_id):
url = self.dingding_url + '/topapi/processinstance/get?access_token=' + self.get_token()
data = {
"process_instance_id": process_instance_id
}
response = requests.post(url, data=data)
return response.json()
def register_callback(self, call_back_url):
# 注册回调
url = self.dingding_url + '/call_back/register_call_back?access_token=' + self.get_token()
print("self.get_token():", self.get_token())
data = {
"call_back_tag": ['bpms_task_change', 'bpms_instance_change'],
"token": self.nonce,
"aes_key": self.aes_key,
"url": call_back_url,
}
response = requests.post(url, data=json.dumps(data))
return response.json()
def get_callback(self):
url = self.dingding_url + '/call_back/get_call_back?access_token=' + self.get_token()
req = requests.get(url)
req = json.loads(req.text)
return req
def create_process_approver_v2(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list):
'''
创建钉钉审批
'''
url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
data = {
'agent_id': self.AgentId,
'process_code': self.process_code,
'originator_user_id': originator_user_id,
'dept_id': dept_id,
'form_component_values': str(form_component_value_vo),
'approvers_v2': json.dumps(approvers)
}
if cc_list:
data['cc_list'] = cc_list
data['cc_position'] = 'FINISH'
response = requests.post(url, data=data)
return response.json()
def create_process_approver_v2_test(self, originator_user_id, dept_id, form_component_value_vo):
'''
创建钉钉审批
'''
url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
data = {
'agent_id': self.AgentId,
'process_code': self.process_code,
'originator_user_id': originator_user_id,
'dept_id': dept_id,
'form_component_values': str(form_component_value_vo),
'approvers_v2': json.dumps([
{
"task_action_type": "NONE",
"user_ids": ["dingding_id"], # 单独审批人
},
{
"task_action_type": "OR",
"user_ids": ["dingding_id1", "dingding_id2"], # 或签
},
{
"task_action_type": "AND",
"user_ids": ["dingding_id1", "dingding_id2"], # 会签
}
])
}
response = requests.post(url, data=data)
return response.json()
if __name__ == "__main__":
import django, os, sys
sys.path.append('xxxxxx') # 项目路径
os.environ['DJANGO_SETTINGS_MODULE'] = 'xx.settings'
# print("settings.DINGDING", settings.DINGDING)
ding = DING("create_xx")
# print(ding.get_token())
# info = [{'name': '单行输入框','value': 'testixxxxxxxx'}]
# # print(ding.create_process('11', 11, info))
a = [
{'name': "输入框1", 'value': "value1"},
{'name': "输入框2", 'value': "value2"},
]
# print(ding.create_process_test('11', 11, a))
# print(ding.create_process_approver_v2_test('11', 11, a))
# print(ding.create_process_test2())
# print(ding.get_status('xxx'))
print(ding.get_status('xx'))
# # 验证 回调
# a = ding.get_token()
# print(a)
# c = Crypto(a)
# print(c.encrypt_success())
# 注册回调
# print(ding.register_callback("http://xxxx.vaiwan.com/xxx"))
# print(ding.get_callback())
1 Crypto类用于对接钉钉回调用的。一个公司只有一个corpId,并且一个corpid只能注册一个回调地址。我司有公共组注册好了回调。只要接入公司内的回调即可。所以我实际没有使用到Crypto。
2 在钉钉管理后台中创建应用后会有这三个东西:AgentId、AppKey,AppSecret 。在创建钉钉审批流程,可以从审批流程浏览器中获取到APPROVE_PROCESS。别忘啦给这个流程审批接口权限。这些官方文档有说。
到此这篇关于python公司内项目对接钉钉审批流程的实现的文章就介绍到这了,更多相关python对接钉钉审批内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!