#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
@File :MySkfClass.py
"""
from ctypes import *
# SM3 算法定义
# define SGD_SM3 0x00000001
SGD_SM3 = 0x00000001
"""
// ECC 签名结构体
// 信息安全技术 智能密码钥匙应用接口规范 GB/T 35291-2017
#define ECC_MAX_XCOORDINATE_BITS_LEN 512
typedef struct Struct_ECCSIGNATUREBLOB{
BYTE r[ECC_MAX_XCOORDINATE_BITS_LEN/8];
BYTE s[ECC_MAX_XCOORDINATE_BITS_LEN/8];
} ECCSIGNATUREBLOB, *PECCSIGNATUREBLOB
"""
ECC_MAX_XCOORDINATE_BITS_LEN = 512
# ECC 签名结构体
# 信息安全技术 智能密码钥匙应用接口规范 GB/T 35291-2017
class Struct_ECCSIGNATUREBLOB(Structure):
_fields_ = [("r", c_ubyte * int(ECC_MAX_XCOORDINATE_BITS_LEN / 8)),
("s", c_ubyte * int(ECC_MAX_XCOORDINATE_BITS_LEN / 8))]
# ECC 公钥结构体
# 信息安全技术 智能密码钥匙应用接口规范 GB/T 35291-2017
class Struct_ECCPUBLICKEYBLOB(Structure):
"""官方文档是C语言的 ULONG,不管操作系统是32位还是64位,都要求是 4个字节。
但是 ctypes.c_ulong 在64位系统下是8个字节,在32位系统下是 4个字节,
所以这部分的定义我们要改成 c_uint32,而不是 c_ulong 来解决这个问题
"""
_fields_ = [("BitLen", c_uint32),
("XCoordinate", c_ubyte * 64),
("YCoordinate", c_ubyte * 64)]
"""利用Python的动态类生成特性处理变长结构体,目前测试发现 ctypes.memmove 可能在部分ARM系统上闪退
因此,使用动态类的方式进行处理
"""
def EccCipherBlobFactory(size):
# 密文数据结构
# 信息安全技术 智能密码钥匙应用接口规范 GB/T 35291-2017
class Struct_ECCCIPHERBLOB(Structure):
"""官方文档是C语言的 ULONG,不管操作系统是32位还是64位,都要求是 4个字节。
但是 ctypes.c_ulong 在64位系统下是8个字节,在32位系统下是 4个字节,
所以这部分的定义我们要改成 c_uint32,而不是 c_ulong 来解决这个问题
"""
_fields_ = [("XCoordinate", c_ubyte * 64),
("YCoordinate", c_ubyte * 64),
("HASH", c_ubyte * 32),
("CipherLen", c_uint32),
("Cipher", c_ubyte * size)]
def __init__(self):
"""如下代码部分代码存在歧义,由于国标没有明确 CipherLen 是否应当作为传入缓冲区的长度进行校验
这样就造成厂商实现接口的时候出现差异,有的厂商要求必须明确设置这个长度参数,否则认定缓冲区不足(比如飞天诚信)。
有些厂商就没有这个要求(比如:恒宝、握奇)。
"""
self.CipherLen = c_uint32(size)
return Struct_ECCCIPHERBLOB
class MySkfClass:
def __init__(self, dll_path):
self.skf_handle = cdll.LoadLibrary(dll_path) # 加载dll,获取句柄
self.error = 0 # 定义错误码
self.dev_handle = None # 定义设备句柄
self.app_handle = None # 定义应用句柄
self.cnt_handle = None # 定义容器句柄句柄
self.Init_handle() # 获取设备、应用、容器句柄
def Init_handle(self):
"""句柄获取
获取设备、应用、容器句柄
"""
self.dev_name = self.__GetDevName() # 获取设备名
self.dev_handle = self.__ConnectDev() # 获取设备句柄 需要用到设备名
self.app_name = self.__GetAppName() # 获取应用名
self.app_handle = self.__OpenApplication() # 获取应用句柄 需要用到应用名
self.cnt_name = self.__GetCntNames() # 获取容器名
if self.error == 0: # 获取容器名的时候返回的error为0,正常执行
""" 此处选择第一个容器作为ECC密钥对,如果第一个不是ECC密钥对,可能导致应用闪退
请根据实际情况进行相关调整
"""
self.cnt_handle = self.OpenContainer(self.cnt_name[0])
def close_handle(self):
"""句柄释放
释放设备、应用、容器句柄
"""
if self.cnt_handle is not None:
self.skf_handle.SKF_CloseContainer(self.cnt_handle)
if self.app_handle is not None:
self.skf_handle.SKF_CloseApplication(self.app_handle)
if self.dev_handle is not None:
self.skf_handle.SKF_DisConnectDev(self.dev_handle)
def __GetDevName(self):
"""获取设备名
多设备的情况只会获取到第一个设备
"""
pulSize = c_uint32() # 用于获取设备名长度
self.error = self.skf_handle.SKF_EnumDev(True, None, byref(pulSize))
szNameList = create_string_buffer(pulSize.value) # 为szNameList分配内存
self.error = self.skf_handle.SKF_EnumDev(
True, szNameList, byref(pulSize))
return szNameList.value
def __ConnectDev(self):
phDev = c_void_p()
self.error = self.skf_handle.SKF_ConnectDev(
self.dev_name, byref(phDev)) # 连接设备
return phDev
def __GetAppName(self):
pulSize = c_uint32() # 用于获取应用名长度
self.error = self.skf_handle.SKF_EnumApplication(
self.dev_handle, None, byref(pulSize))
szNameList = create_string_buffer(pulSize.value) # 为szNameList分配内存
self.error = self.skf_handle.SKF_EnumApplication(
self.dev_handle, szNameList, byref(pulSize)) # 获取应用名,这里默认返回第一个应用名
return szNameList.value
def __OpenApplication(self):
phApp = c_void_p()
self.error = self.skf_handle.SKF_OpenApplication(
self.dev_handle, self.app_name, byref(phApp)) # 打开应用
return phApp
def __GetCntNames(self):
pulSize = c_uint32() # 用于获取应用名长度:
self.error = self.skf_handle.SKF_EnumContainer(
self.app_handle, None, byref(pulSize))
szNameList = create_string_buffer(pulSize.value) # 为szNameList分配内存
# 由于szNameList是以'\0'为分隔符,获取多个容器名列表切分获取
self.error = self.skf_handle.SKF_EnumContainer(self.app_handle, szNameList,
byref(pulSize))
name_list = str(szNameList.raw)[2:].split('\\x00')[
:-2] # .raw是获取所有,.values是只读取Null结尾的
cnt_name = [bytes(name, 'utf8') for name in name_list] # 容器列表
return cnt_name
def OpenContainer(self, cnt_name):
phCnt = c_void_p() # 容器句柄
self.error = self.skf_handle.SKF_OpenContainer(
self.app_handle, cnt_name, byref(phCnt)) # 打开容器
return phCnt
def ECCEncrypt(self, plain_text):
"""
1-导出ECC的加密密钥对的公钥
2-用公钥加密SKF_ExtECCEncrypt
"""
plain_text = create_string_buffer(
bytes(plain_text, 'utf8')) # 转成byte类型
# 计算字符长度,-1是因为byte长度比实际多1
plain_text_len = c_uint32(len(plain_text) - 1)
# 声明密文结构变量
pCipherText = EccCipherBlobFactory(
plain_text_len.value)()
pulBlobLen = c_uint32(sizeof(Struct_ECCPUBLICKEYBLOB))
# 创建公钥接收对象
pbBlob = Struct_ECCPUBLICKEYBLOB()
self.error = self.skf_handle.SKF_ExportPublicKey(
self.cnt_handle, False, byref(pbBlob), byref(pulBlobLen))
# 用公钥进行加密
self.error = self.skf_handle.SKF_ExtECCEncrypt(self.dev_handle, byref(pbBlob), byref(plain_text),
plain_text_len, byref(pCipherText))
return pCipherText
def ECCEncrypt_Hex(self, plain_text):
"""
1-导出ECC的加密密钥对的公钥
2-用公钥加密SKF_ExtECCEncrypt
3-转成Hex字符串
"""
pCipherText = self.ECCEncrypt(plain_text)
# 将密文结构内容转成Hex字符串
pCipherText_Hex = self.CipherText_ToStrHex(pCipherText)
return pCipherText_Hex
def ECCDecrypt(self, cipher_text):
plain_text_len = c_uint32(0)
# 解密,传None,获得plain_text的长度
self.error = self.skf_handle.SKF_ECCDecrypt(self.cnt_handle, False, byref(cipher_text), None,
byref(plain_text_len))
# plain_text分配空间
plain_text = create_string_buffer(
plain_text_len.value)
# 解密
self.error = self.skf_handle.SKF_ECCDecrypt(self.cnt_handle, False, byref(cipher_text), byref(plain_text),
byref(plain_text_len))
# 返回明文
return plain_text.value.decode('utf8')
def ECCDecrypt_Hex(self, cipher_hex):
# 判断密文是否有问题
try:
cipher_text = self.StrHex_ToCipherText(
cipher_hex) # 先将Hex密文字符串转为ECC密文结构
except:
return "Error: Wrong ciphertext!"
plain_text = self.ECCDecrypt(cipher_text) # 解密
return plain_text
def ECCSignEx(self, plain_text):
# 转成byte类型
plain_text = create_string_buffer(bytes(plain_text, "utf-8"))
# 计算字符长度,-1是因为byte长度比实际多1
plain_text_len = c_uint32(len(plain_text) - 1)
# 签名返回结果
pbBlob = Struct_ECCSIGNATUREBLOB()
# WQ 扩展接口
# self.error = self.skf_handle.SKF_ECCSignDataEx(
# self.cnt_handle, SGD_SM3, byref(plain_text), plain_text_len, byref(pbBlob))
# FT/HB 扩展接口
# self.error = self.skf_handle.SKF_ECCDigestSignData(
# self.cnt_handle, SGD_SM3, byref(plain_text), plain_text_len, byref(pbBlob))
try:
self.error = self.skf_handle.SKF_ECCDigestSignData(
self.cnt_handle, SGD_SM3, byref(plain_text), plain_text_len, byref(pbBlob))
except:
self.error = self.skf_handle.SKF_ECCSignDataEx(
self.cnt_handle, SGD_SM3, byref(plain_text), plain_text_len, byref(pbBlob))
return [pbBlob.r, pbBlob.s]
def ECCSign(self, plain_text):
"""
1-计算原文的SM3
2-使用私钥对SM3结果进行签名 SKF_ECCSignData
要求签名字符串长度不得大于密钥模长
比如 ECC256 不得大于 32个字节(模长(256) / 8)
2-返回[r, s]
"""
"""
在数字签名时,要指定签名所使用的证书。通过遍历本机上的证书,与签名用的证书进行对比,定位到签名证书在USBKEY中的位置,得到设备、应用和容器的句柄,然后使用证书的私钥进行签名。另外,由于数字签名会用到私钥,因此这里需要验证口令。
1.SKF_VerifyPIN(HAPPLICATION hApplication, ULONG ulPINType, LPSTR szPIN, ULONG *pulRetryCount);
此方法用来验证证书所在应用的PIN码,及上面说的口令,为后面的签名取得权限。hApplication是应用句柄;ulPINType是PIN类型,可以为0是管理员账户,1为普通用户,这个参数一般选择1。szPIN值是PIN码,pulRetryCount为出错后返回的重试次数。
2.SKF_ExportPublicKey(HCONTAINER hContainer, BOOL bSignFlag, BYTE* pbBlob, ULONG* pulBlobLen);
这个方法用来导出容器中的签名公钥,hContainer为证书所在容器句柄;bSignFlag 为导出密钥类型,TRUE表示导出签名公钥,FALSE表示导出加密公钥,这里选择TRUE;pbBlob为返回公钥的数据;pulBlobLen为数据的长度。这里这个方法可以不用调用两次,因为公钥结构是已知的,其长度也是固定的,因此可以直接为pbBlob分配固定长度的数据,以返回公钥。
3.SKF_DigestInit(DEVHANDLE hDev, ULONG ulAlgID, ECCPUBLICKEYBLOB *pPubKey, unsigned char *pucID, ULONG ulIDLen, HANDLE *phHash);
此方法进行杂凑(国密标准里把摘要称之为杂凑)运算初始化,并指定计算消息杂凑的算法。hDev为设备句柄;ulAlgID是杂凑算法标识,这里选择SGD_SM3(0x00000001),表明使用SM3算法;pPubKey为签名用证书公钥数据;pucID为签名者的ID值;ulIDLen是签名者的ID值的长度;phHash为返回的杂凑对象句柄。加入签名者ID值是SM2数字签名的一个重要特征,默认使用"1234567812345678"这个字符串值。
4.SKF_Digest(HANDLE hHash, BYTE *pbData, ULONG ulDataLen, BYTE *pbHashData, ULONG *pulHashLen);
初始化后,调用此方法进行数据杂凑运算。hHash是SKF_DigestInit方法返回的杂凑对象句柄; pbData为产生签名的原文,ulDataLen是原文数据的长度,pbHashData返回杂凑数据; pulHashLen返回杂凑结果的长度。同样,因为杂凑数据的长度都是固定的,这里同样可以为pbHashData事先分配固定长度,而不用再调用两遍。
注意,如果进行杂凑的数据是分组的,那就得使用下面两个方法:
SKF_DigestUpdate(HANDLE hHash, BYTE *pbData, ULONG ulDataLen);
SKF_DigestFinal(HANDLE hHash, BYTE *pHashData, ULONG *pulHashLen);
对每一组数据都使用SKF_DigestUpdate,最后调用SKF_DigestFinal返回杂凑值。当然,在数字签名运算中不存在分块计算签名的情况,所以这里也不会把数据分块杂凑。
5.SKF_ECCSignData(HCONTAINER hContainer, BYTE *pbData, ULONG ulDataLen, PECCSIGNATUREBLOB pSignature);
最后调用此方法进行数字签名。hContainer用来签名的私钥所在容器句柄,也就是遍历对比证书得到的容器句柄;pbData是被签名的数据;ulDataLen是被签名数据长度,必须小于密钥模长; pbSignature为返回的签名值。
"""
# 公钥匙对象内存长度
pulBlobLen = c_uint32(sizeof(Struct_ECCPUBLICKEYBLOB))
# 获取公钥对象
eccPubKey = Struct_ECCPUBLICKEYBLOB()
# 导出计算公钥
self.error = self.skf_handle.SKF_ExportPublicKey(
self.cnt_handle, True, byref(eccPubKey), byref(pulBlobLen))
# 转成byte类型
plain_text = create_string_buffer(bytes(plain_text, "utf-8"))
# 计算字符长度,-1是因为byte长度比实际多1
plain_text_len = c_uint32(len(plain_text) - 1)
# 默认签名者ID
signerId = create_string_buffer(b'1234567812345678')
phDigest = c_void_p()
# 初始化计算参数,指定为 SM3
self.error = self.skf_handle.SKF_DigestInit(
self.dev_handle, SGD_SM3, byref(eccPubKey), byref(signerId), len(signerId) - 1, byref(phDigest))
hash = (c_ubyte * 32)()
hashLen = c_uint32(32)
# 计算 SM3
self.error = self.skf_handle.SKF_Digest(
phDigest, byref(plain_text), plain_text_len, byref(hash), byref(hashLen))
# 签名
pbBlob = Struct_ECCSIGNATUREBLOB() # 签名返回结果
# 标准的 SKF_ECCSignData 接口目前测试来看,各个厂商的接口实现很多会报错
# 比如现在很多都是带屏幕显示的二代UKey,交易的时候屏幕上会显示交易内容,要求
# 用户点击确定之后才能完成交易,这种操作一般都是厂商自定义的扩展接口实现的
# 比如最常见的 SKF_ECCSignDataEx 具体的参数可以找厂商沟通获取
self.error = self.skf_handle.SKF_ECCSignData(
self.cnt_handle, SGD_SM3, byref(hash), hashLen, byref(pbBlob))
return [pbBlob.r, pbBlob.s]
def SM2Sign(self, plain_text):
"""
1-对原文进行SM2签名
2-获取签名证书信息
3-返回签名证书字符数组,SM2签名结果 [r, s]
"""
# 签名
[s_r, s_s] = self.ECCSignEx(plain_text)
# 取ECC签名的后 32 位 作为SM2签名,参考
# https://github.com/guanzhi/GmSSL/blob/master/src/skf/skf.c
# SKF_ECCSIGNATUREBLOB_to_SM2_SIGNATURE
sm2_r = bytes(s_r[32:])
sm2_s = bytes(s_s[32:])
# 导出签名证书
pulBlobLen = c_uint32()
self.error = self.skf_handle.SKF_ExportCertificate(
self.cnt_handle, True, None, byref(pulBlobLen))
# 申请内存
cert_der = (c_ubyte * pulBlobLen.value)()
self.error = self.skf_handle.SKF_ExportCertificate(
self.cnt_handle, True, byref(cert_der), byref(pulBlobLen))
# 证书转换为字符数组
cert_der = bytes(cert_der)
return [cert_der, [sm2_r, sm2_s]]
def SM2SignP7DER(self, plain_text):
"""
1-对原文进行签名
2-获取签名证书信息
3-对报文进行P7 DER格式封装
"""
[cert_der, [sm2_r, sm2_s]] = self.SM2Sign(plain_text)
return self.__BuildSm2SignP7DER(plain_text, cert_der, sm2_r, sm2_s)
def SM2SignP7DER_Hex(self, plain_text):
"""
对报文进行签名,并对结果进行 P7 格式封装,返回结果使用HEX格式
"""
res = self.SM2SignP7DER(plain_text)
return res.hex()
def SM2SignP7PEM(self, plain_text):
"""
对报文进行签名,并对结果进行 P7 PEM格式封装
"""
der = self.SM2SignP7DER(plain_text)
import base64
return base64.b64encode(der)
def VerifyPIN(self, user_pin):
"""
PIN校验
user_pin:字符串
return:返回剩余尝试次数
"""
user_pin = create_string_buffer(
bytes(user_pin, 'utf8')) # 将PIN转成byte类型数据
ulPINType = c_uint32(1) # PIN类型,1表示用户PIN
pulRetryCount = c_uint32(0) # PIN剩余尝试次数
# 调用验证PIN接口
self.error = self.skf_handle.SKF_VerifyPIN(
self.app_handle, ulPINType, user_pin, byref(pulRetryCount))
return pulRetryCount.value
def __BuildSm2SignP7DER(self, sig_text, der_cert, sm2_r, sm2_s):
"""
构建 P7 DER 格式的SM2签名报文
1-sig_text 签名原文
2-der_cert DER格式的签名证书
3-sm2_r SM2格式签名返回的 r 32位字符数组
4-sm2_s SM2格式签名返回的 s 32位字符数组
"""
# pip install "asn1crypto>=1.5.1" 版本不低于 1.5.1 否则可能运行异常
from asn1crypto import core, x509, algos, cms
cert = x509.Certificate.load(der_cert)
# 签名证书
cert_set = cms.CertificateChoices({
'certificate': cert # 证书
})
# SM3 签名算法 OID
sm3_digest_oid = algos.DigestAlgorithmId(
"1.2.156.10197.1.401")
# SM3 签名算法
sm3_digest_algo = algos.DigestAlgorithm({
'algorithm': sm3_digest_oid,
'parameters': core.Null()
})
# SM2 签名算法 OID
sm2_sign_oid = algos.SignedDigestAlgorithmId(
"1.2.156.10197.1.301.1")
# SM2 签名算法
sm2_sign_algo = algos.SignedDigestAlgorithm({
'algorithm': sm2_sign_oid,
'parameters': core.Null()
})
# TBSCertificate
org_tbs = cert['tbs_certificate']
# 签名信息
signer_info = cms.SignerInfo({
'version': cms.CMSVersion(1),
'sid': { # 签名证书信息
'issuer_and_serial_number': {
'issuer': org_tbs['issuer'],
'serial_number': org_tbs['serial_number']
}
},
'digest_algorithm': sm3_digest_algo, # 摘要算法
'signature_algorithm': sm2_sign_algo, # 签名算法
# 签名信息 (OCTET STRING)
'signature': core.OctetString(sm2_r + sm2_s)
})
# 构建完整证书的 ASN.1 PKCS#7 SignedData 结构
signedData = cms.SignedData({
'version': cms.CMSVersion(1), # 签名版本 v1
'digest_algorithms': [{ # 签名算法类型
'algorithm': sm3_digest_oid, # SM3withSM2
'parameters': core.Null()
}],
'encap_content_info': { # 签名载核(业务)数据
# 载核(业务)数据类型 (PKCS #7)
'content_type': cms.ContentType('1.2.840.113549.1.7.1'),
# 签名载核(业务)数据
'content': core.OctetString(sig_text.encode('utf-8')),
}, # 签名载核数据
'certificates': [cert_set], # 证书集合
'signer_infos': [signer_info] # 签名信息
})
# 数据传输封装
payloadContent = cms.ContentInfo({
# 数据类型 (PKCS #7)
'content_type': cms.ContentType('1.2.840.113549.1.7.2'),
# 签名数据
'content': signedData
})
# print(payloadContent.dump().hex())
# 转换为 DER 编码
return payloadContent.dump()
def __IntList_ToHexStr(self, int_list):
"""
将int类型的列表转成Hex字符串
int_list:列表
return:返回Hex字符串
"""
result = [] # 声明返回结果
for _int in int_list:
str_hex = (hex(_int)[2:]).upper() # 将结果转成hex字符串,并转成大写
if len(str_hex) % 2 != 0: # 判断长度是否是2的倍数,不是的话签名补一个0
str_hex = "0" + str_hex
result.append(str_hex)
return ''.join(result)
def __HexStr_ToIntList(self, str_hex):
"""
将Hex字符串转成int类型的列表
str_hex:Hex字符串
return:int,列表
"""
result = [] # 声明返回结果
for i in range(0, len(str_hex), 2):
s = '0x' + (str_hex[i:i + 2]) # 每两位hex的字符转成一个int类型
int_s = int(s, 16)
result.append(int_s)
return result
def StrHex_ToCipherText(self, str_hex):
"""
将Hex字符串转成ECC密文结构
str_hex:Hex字符串
return:ECC密文结构
"""
# 密文结构中 Cipher 长度(C3)
cipher_len = int((len(str_hex) - 192) / 2)
pCipherText = EccCipherBlobFactory(
cipher_len)()() # 声明pCipherText密文结构变量
XCoordinate_hex = str_hex[0:64] # 获取密文结构的 XCoordinate 的Hex字符串
YCoordinate_hex = str_hex[64:128] # 获取密文结构的 YCoordinate 的Hex字符串
HASH_hex = str_hex[128:192] # 获取密文结构的 HASH 的Hex字符串
Cipher_hex = str_hex[192:] # C2:密文结构中的Cipher
# 为密文结构中的每部分赋值
pCipherText.XCoordinate = (
c_ubyte * 64)(*((0,) * 32 + tuple(self.__HexStr_ToIntList(XCoordinate_hex))))
pCipherText.YCoordinate = (
c_ubyte * 64)(*((0,) * 32 + tuple(self.__HexStr_ToIntList(YCoordinate_hex))))
pCipherText.HASH = (
c_ubyte * 32)(*tuple(self.__HexStr_ToIntList(HASH_hex)))
# 密文部分转为ctype类型
Cipher = (c_ubyte * cipher_len)(*self.__HexStr_ToIntList(Cipher_hex))
memmove(pCipherText.Cipher, Cipher, cipher_len) # 将Cipher值赋给密文结构
return pCipherText
def CipherText_ToStrHex(self, cipher_text):
"""
将ECC密文结构转成Hex字符串
"""
XCoordinate = cipher_text.XCoordinate[32:] # 获取密文结构中 XCoordinate
YCoordinate = cipher_text.YCoordinate[32:] # 获取密文结构中 YCoordinate
HASH = cipher_text.HASH[:] # 获取密文结构中 HASH
XCoordinate_hex = self.__IntList_ToHexStr(
XCoordinate) # 将 XCoordinate 转成Hex字符串
YCoordinate_hex = self.__IntList_ToHexStr(
YCoordinate) # 将 YCoordinate 转成Hex字符串
HASH_hex = self.__IntList_ToHexStr(
HASH) # 将 HASH 转成Hex字符串
Cipher_hex = self.__IntList_ToHexStr(
cipher_text.Cipher) # 将 Cipher 转成Hex字符串
return XCoordinate_hex + YCoordinate_hex + HASH_hex + Cipher_hex