没有所谓的捷径
一切都是时间最平凡的累积

python3 对用户发送给企业微信公众账号的消息加解密代码.支持中文

本文最后更新于2019年8月16日,已超过33天没有更新,如果文章内容失效,请反馈给我们,谢谢!

python3 对用户发送给企业微信公众账号的消息加解密代码.支持中文

企业微信官方提供的python教程是2.x的,在3.x下测试中文有异常, 以下是修改过的,亲测可用

WXBizMsgCrypt.py

#!/usr/bin/env python
# -*- encoding:utf-8 -*-
# python3对公众平台发送给公众账号的消息加解密代码.支持中文.

import base64
import string
import random
import hashlib
import time
import struct
import binascii
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket
import ierror

""" AES加解密用 pycrypto """


class FormatException(Exception):
    pass


def throw_exception(message, exception_class=FormatException):
    """my define raise exception function"""
    raise exception_class(message)


class SHA1:
    """计算公众平台的消息签名接口"""

    def getSHA1(self, token, timestamp, nonce, encrypt):
        """用SHA1算法生成安全签名
        @param token:  票据
        @param timestamp: 时间戳
        @param encrypt: 密文
        @param nonce: 随机字符串
        @return: 安全签名
        """
        try:
            token = token.decode()
            sortlist = [token, timestamp, nonce, encrypt]
            sortlist.sort()
            sha = hashlib.sha1()
            sha.update("".join(sortlist).encode("utf8"))
            return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_ComputeSignature_Error, None


class XMLParse(object):
    """提供提取消息格式中的密文及生成回复消息格式的接口"""

    # xml消息模板
    AES_TEXT_RESPONSE_TEMPLATE = """<xml>
        <Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
        <MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
        <TimeStamp>%(timestamp)s</TimeStamp>
        <Nonce><![CDATA[%(nonce)s]]></Nonce>
        </xml>"""
    # xml消息模版2  接受微信公众号发来的消息
    AES_TEXT_RESPONSE_TEMPLATE2 = """<xml>
        <ToUserName><![CDATA[%(ToUserName)s]]></ToUserName>
        <FromUserName><![CDATA[%(FromUserName)s]]></FromUserName>
        <CreateTime>%(CreateTime)s</CreateTime>
        <MsgType><![CDATA[%(MsgType)s]]></MsgType>
        <Content><![CDATA[%(Content)s]]></Content>
        <MsgId>%(MsgId)s</MsgId>
        <AgentID>%(AgentID)s</AgentID>
        </xml>"""

    def extract(self, xmltext):
        """提取出xml数据包中的加密消息
        @param xmltext: 待提取的xml字符串
        @return: 提取出的加密消息字符串
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            encrypt = xml_tree.find("Encrypt")
            touser_name = xml_tree.find("ToUserName")
            return ierror.WXBizMsgCrypt_OK, encrypt.text, touser_name.text
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None, None

    def extract_from_server(self, xmltext):
        """提取出xml数据
        @param xmltext: 待提取的xml字符串
        @return: 提取出数据FromUserName, Content
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            FromUserName = xml_tree.find("FromUserName")
            Content = xml_tree.find("Content")
            ToUserName = xml_tree.find("ToUserName")
            MsgId = xml_tree.find("MsgId")
            AgentID = xml_tree.find("AgentID")
            return ierror.WXBizMsgCrypt_OK, FromUserName.text, Content.text, ToUserName.text, MsgId.text, AgentID.text
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None, None, None, None, None

    def generate(self, encrypt, signature, timestamp, nonce):
        """生成xml消息
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'msg_encrypt': encrypt,
            'msg_signaturet': signature,
            'timestamp': timestamp,
            'nonce': nonce,
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
        return resp_xml
    def generate_from_server(self, ToUserName, FromUserName, CreateTime, MsgType, Content, MsgId, AgentID):
        """生成xml消息,回复给公众号用户
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'ToUserName': ToUserName,
            'FromUserName': FromUserName,
            'CreateTime': CreateTime,
            'MsgType': MsgType,
            'Content': Content,
            'MsgId': MsgId,
            'AgentID': AgentID
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE2 % resp_dict
        return resp_xml

class PKCS7Encoder(object):
    """提供基于PKCS7算法的加解密接口"""

    block_size = 32

    def encode(self, text):
        """ 对需要加密的明文进行填充补位
        @param text: 需要进行填充补位操作的明文
        @return: 补齐明文字符串
        """
        text_length = len(text)
        # 计算需要填充的位数
        amount_to_pad = self.block_size - (text_length % self.block_size)
        if amount_to_pad == 0:
            amount_to_pad = self.block_size
        # 获得补位所用的字符
        pad = chr(amount_to_pad).encode()
        return text + pad * amount_to_pad

    def decode(self, decrypted):
        """删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文
        """
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]


class Prpcrypt(object):
    """提供接收和推送给公众平台消息的加解密接口"""

    def __init__(self, key):
        # self.key = base64.b64decode(key+"=")
        self.key = key
        # 设置加解密模式为AES的CBC模式
        self.mode = AES.MODE_CBC

    def encrypt(self, text, appid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        len_str = struct.pack("I", socket.htonl(len(text.encode())))
        # text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
        text = self.get_random_str() + len_str + text.encode() + appid
        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
        except Exception:
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None

    def decrypt(self, text, appid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            # pad = ord(plain_text[-1])
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:]
        except Exception:
            return ierror.WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content.decode()

    def get_random_str(self):
        """ 随机生成16位字符串
        @return: 16位字符串
        """
        rule = string.ascii_letters + string.digits
        str = random.sample(rule, 16)
        return "".join(str).encode()

class WXBizMsgCrypt(object):
    # 构造函数
    # @param sToken: 公众平台上,开发者设置的Token
    # @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey
    # @param sAppId: 企业号的AppId
    def __init__(self, sToken, sEncodingAESKey, sAppId):
        try:
            self.key = base64.b64decode(sEncodingAESKey + "=")
            assert len(self.key) == 32
        except Exception:
            throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            # return ierror.WXBizMsgCrypt_IllegalAesKey)
        self.token = sToken.encode()
        self.appid = sAppId.encode()

    def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
        sha1 = SHA1()
        ret,signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, sEchoStr)
        if ret  != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret,sReplyEchoStr = pc.decrypt(sEchoStr, self.appid)
        return ret,sReplyEchoStr

    def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
        # 将公众号回复用户的消息加密打包
        # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
        # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
        # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
        # sEncryptMsg: 加密后的可以直接回复用户的密文,
        # 包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串
        # return:成功0,sEncryptMsg,失败返回对应的错误码None
        pc = Prpcrypt(self.key)
        ret, encrypt = pc.encrypt(sReplyMsg, self.appid)
        if ret != 0:
            return ret, None
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 生成安全签名
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)

        if ret != 0:
            return ret, None
        xmlParse = XMLParse()
        return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)

    def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
        # 检验消息的真实性,并且获取解密后的明文
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sPostData: 密文,对应POST请求的数据
        #  xml_content: 解密后的原文,当return返回0时有效
        # @return: 成功0,失败返回对应的错误码
        # 验证安全签名
        xmlParse = XMLParse()
        ret, encrypt, touser_name = xmlParse.extract(sPostData)
        if ret != 0:
            return ret, None
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, xml_content = pc.decrypt(encrypt, self.appid)
        return ret, xml_content

ierror.py文件

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#########################################################################
# Author: jonyqin
# Created Time: Thu 11 Sep 2014 01:53:58 PM CST
# File Name: ierror.py
# Description:定义错误码含义
#########################################################################
WXBizMsgCrypt_OK = 0
WXBizMsgCrypt_ValidateSignature_Error = -40001
WXBizMsgCrypt_ParseXml_Error = -40002
WXBizMsgCrypt_ComputeSignature_Error = -40003
WXBizMsgCrypt_IllegalAesKey = -40004
WXBizMsgCrypt_ValidateCorpid_Error = -40005
WXBizMsgCrypt_EncryptAES_Error = -40006
WXBizMsgCrypt_DecryptAES_Error = -40007
WXBizMsgCrypt_IllegalBuffer = -40008
WXBizMsgCrypt_EncodeBase64_Error = -40009
WXBizMsgCrypt_DecodeBase64_Error = -40010
WXBizMsgCrypt_GenReturnXml_Error = -40011

调用文件mywx.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from WXBizMsgCrypt import WXBizMsgCrypt, XMLParse
from lxml import etree
from class_sendmessage import weixin
from work import *
from dateandtime import *

sToken = "cpmEznLDvq95nJi*********"
sEncodingAESKey = "UJBNUHqc9W********"
sCorpID = "wwe7e5022*********"

def readLog(file, n=10):
    with open(file, 'r', encoding='utf-8') as f:
        lines = f.readlines()[-n:]
    return ''.join(lines)

def verifyurl(sVerifyMsgSig, sVerifyTimeStamp,sVerifyNonce,sVerifyEchoStr):
    # 微信验证url
    wxcpt = WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)
    ret, EchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp,sVerifyNonce,sVerifyEchoStr)
    return EchoStr

def getMsg(sPostData, sMsgSignature, sTimeStamp, sNonce):
    # 解析微信用户发送的消息内容
    wxcpt = WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)
    ret, xml_content = wxcpt.DecryptMsg(sPostData, sMsgSignature, sTimeStamp, sNonce)
    if ret == 0 and 'work.hongyanzi.top' not in xml_content:
        xp = XMLParse()
        WXBizMsgCrypt_OK, FromUserName, Content, ToUserName, MsgId, AgentID = xp.extract_from_server(xml_content)
        return WXBizMsgCrypt_OK, FromUserName, Content, ToUserName, MsgId, AgentID
    else:
        return 1, None, None, None, None, None

def generateMsg(ToUserName, FromUserName, Content, MsgId, AgentID):
    # 封装被动返回给微信用户消息
    wxcpt = WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)
    xp = XMLParse()
    resp_xml = xp.generate_from_server(ToUserName, FromUserName, '', 'text', Content, MsgId, AgentID)
    ret, sRespData  = wxcpt.EncryptMsg(resp_xml, '234234234', '2342352362332')
    if ret == 0:
        return sRespData
    else:
        return None

 

赞(0) 打赏
声明:本站发布的内容(图片、视频和文字)以原创、转载和分享网络内容为主,若涉及侵权请及时告知,将会在第一时间删除,联系邮箱:lwarm@qq.com。文章观点不代表本站立场。本站原创内容未经允许不得转载,或转载时需注明出处:红岩子 » python3 对用户发送给企业微信公众账号的消息加解密代码.支持中文
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

今天所做的努力都是在为明天积蓄力量

联系我们赞助我们