功能介绍
应用鉴权是第三方应用程序/服务 调用云鲸AIoT开发者平台服务的一种签名鉴权机制。本文主要介绍如何进行应用鉴权。
前提条件
您已完成创建应用 的步骤。
逻辑图
鉴权加密算法
- 对请求参数进行排序并加密
// 使用 TreeMap 对请求参数 自然排序,并转为 Json 字符串,然后使用 SHA256 加密并转为16进制字符串。
// 伪代码↓
HexEncode(Hash.SHA256(payloadJson))
目前支持的参数方式:
- HTTP GET 请求:将请求参数以 Key 、Value 形式放入Map中。value 需为字符串类型。
- HTTP POST 请求,请求类型为 application/Json : 支持JSON对象传递,将JSON对象中的第一层级的 Key 、Value 放入Map中,如Value 仍为 JSON对象,也同样需要进行自然排序。其他类型value 需为字符串类型。
- HTTP POST 请求,请求类型为 multipart/form-data : 将普通参数以 Key 、Value 形式放入Map中,文件参数不参与校验过程。如Value 仍为 JSON对象,也同样需要进行自然排序。其他类型value 需为字符串类型。
- 拼接待签名字符串
//按照如下格式进行拼接待签名字符串:
StringToSign = Algorithm + ‘\n’ + date + ‘\n’ + HashedRequestPayload
字段说明:
字段名称 | 解释 | |
---|---|---|
1 | Algorithm | 签名算法,固定为 HMAC-SHA256 |
2 | date | UTC 标准时间的日期,取值需要和请求参数中 timestamp 换算的 UTC 标准时间日期一致 |
3 | HashedRequestPayload | 第一步所得规范请求串的哈希值 |
- 根据以上规则,得到待签名字符串如下:
HMAC-SHA256
2021-06-23 01:11:12
3b6099ce6240a6cfa8a4f77a2d3e5cd723b6fa9d2d43031b340b9362eb3ed434
- 计算签名
计算签名的伪代码如下:
String signature = HexEncode(HMAC_SHA256(Assess key Secret, StringToSign))
signature = signature.toLowerCase()
- 拼接Authorization
按如下格式拼接 Authorization:
Authorization = Algorithm + ‘ ‘ + ‘Signature=’ + Signature ‘ ‘+ ‘AccessKey=’+ AK + ‘ ‘ + ‘Timestamp=’ + timestamp
鉴权加密算法代码示例
JAVA版本
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.TreeMap;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
public class ApiAuthorizationDemo{
private final static Charset UTF8 = StandardCharsets.UTF_8;
public static byte[] hmac256(byte[] key, String msg) throws Exception {
Mac mac = Mac.getInstance("HmacSHA256");
SecretKeySpec secretKeySpec = new SecretKeySpec(key, mac.getAlgorithm());
mac.init(secretKeySpec);
return mac.doFinal(msg.getBytes(UTF8));
}
public static String sha256Hex(String s) throws Exception {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] d = md.digest(s.getBytes(UTF8));
return DatatypeConverter.printHexBinary(d).toLowerCase();
}
public static void main(String[] args) throws Exception {
String algorithm = "HMAC-SHA256";
String accessKey = "915d957d05ff44f3aeb12413e2247e5cdb5e";
String accessKeySecret = "c7531c2357e2200bb7eee3fe190f138a0febcbec791241417aceac0418f8c662657b97e5fa87b5b4b9bafb";
//组装参数,此处以设备注册为例子
final TreeMap<String, Object> treeMap = Maps.newTreeMap();
treeMap.put("productId","pJabWNSCCU");
final Long timestamp = System.currentTimeMillis();
//参数转为JSON,并进行加密
final JSONObject jsonObject = JSONUtil.parseObj(treeMap,JSONConfig.create().setOrder(true));
final String jsonStr = jsonObject.toString();
final String hashedRequestPayload = sha256Hex(jsonStr);
//拼接待签名字符串
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" );
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String date = sdf.format(new Date(Long.valueOf(timestamp)));
String stringToSign = algorithm + "\n" + date + "\n" + hashedRequestPayload;
//计算签名
final String signature = DatatypeConverter.printHexBinary(hmac256(accessKeySecret.getBytes(UTF8), stringToSign)).toLowerCase();
//拼接Authorization
String authorization = algorithm + " " + "Signature="+signature + " " + "AccessKey="+ accessKey + " " + "Timestamp=" + timestamp ;
}
}
Node JS版本
/* eslint-disable no-restricted-syntax */
import * as crypto from 'crypto';
const {
createHash,
createHmac,
} = crypto;
const algorithm = 'HMAC-SHA256';
type GetAuthorizationOptions = {
payload: {},
accessKey: string,
accessSecret: string,
};
// 自然排序
function sortObj(obj) {
const newObj = {};
Object.assign(newObj, obj);
const keys = Object.keys(newObj); // Cannot convert undefined or null to object
keys.sort();
const sorted = {};
for (const k of keys) {
sorted[k] = obj[k];
}
return sorted;
}
function hashSha256(content: string): string {
return createHash('sha256').update(content).digest('hex');
}
function hmacSha256(content: string, secret: string): string {
return createHmac('sha256', secret).update(content).digest('hex');
}
function getFull(v) {
return v < 10 ? `0${v}` : v;
}
function getUTC(date: Date): string {
const y = date.getUTCFullYear();
const m = getFull(date.getUTCMonth() + 1);
const d = getFull(date.getUTCDate());
const h = getFull(date.getUTCHours());
const M = getFull(date.getUTCMinutes());
const s = getFull(date.getUTCSeconds());
return `${y}-${m}-${d} ${h}:${M}:${s}`;
}
function getAuthorization(options: GetAuthorizationOptions): string {
const { payload, accessKey, accessSecret } = options;
const now = new Date();
// 1:对请求参数进行排序并加密
const sha256Result = hashSha256(JSON.stringify(sortObj(payload)));
// 2: 拼接待签名字符串
const stringToSign = `${algorithm}
${getUTC(now)}
${sha256Result}`;
// 3:计算签名
const hmacSha256Result = hmacSha256(stringToSign, accessSecret);
// 4:拼接 Authorization
const Authorization = `${algorithm} Signature=${hmacSha256Result} AccessKey=${accessKey} Timestamp=${now.valueOf()}`;
return Authorization;
}
// 调用方式
// getAuthorization({
// payload: {},
// accessKey: 'xxx',
// accessSecret: 'xxx',
// });
export default getAuthorization;
Python版本
#! /usr/bin/env python3.9
import os
import hashlib
import time
import hmac
import traceback
import requests
from common import sqlalchemy_mysql_conn
from models import *
from datetime import datetime, timedelta
class sysMonitorMain:
def __init__(self):
self.conf_info = {
"test_cn": {
"host": "https://testcn-openapi.narwaltech.com",
"obs_base_url": "https://narwal-test.obs.cn-south-1.myhuaweicloud.com/",
"access_key": "k3NtuLpLGgRzoEFP1Qen",
"access_key_secret": "fc8c28139c96485d90c816f5c1c2b2d5"
},
"prod_cn": {
"host": "https://cn-openapi.narwaltech.com",
"obs_base_url": "https://narwal-base-robot-logs.obs.cn-south-1.myhuaweicloud.com",
"access_key": "ALzJii8TBcnkTKQuWrEz",
"access_key_secret": "24bd4f4ac1224176b3d1af19545b8884"
}
}
self.api_url = '/api/v1/obs/file'
self.line = {
"disk": 80,
"cpu": 55,
"mem": 60
}
self.MYSQL_OBJ = sqlalchemy_mysql_conn.MySQL()
def get_all_device_list(self):
res = []
device_list = {}
datas = self.MYSQL_OBJ.query_all(sys_device_list)
# print(datas)
for data in datas:
# print(type(data))
# print(data.test_schema())
res.append(data.test_schema())
if not data.test_schema()['device_name'] in device_list.keys():
device_list[data.test_schema()['device_name']] = {"product_id": data.test_schema()['product_id'],
"service_type": data.test_schema()['service_type'],
"contry_code": data.test_schema()['contry_code']}
# print(res)
# print(device_list)
return device_list
def gen_authorization(self, param, access_key, accessKeySecret):
param_json = json.dumps(param, separators=(',', ':')) # 去除默认加的空格
header_hash = hashlib.sha256()
header_hash.update(bytes(param_json, encoding='utf-8'))
Algorithm = "HMAC-SHA256"
tmp_time = time.time()
time_stamp = int((tmp_time) * 1000)
date_tmp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(tmp_time))
strptime = datetime.strptime(date_tmp, "%Y-%m-%d %H:%M:%S")
date = strptime - timedelta(hours=8)
stringtosign = Algorithm + '
' + str(date) + '
' + header_hash.hexdigest()
signaturetohash = hmac.new(accessKeySecret.encode(), stringtosign.encode(),
digestmod="sha256").hexdigest().lower()
# print(signaturetohash)
Authorization = Algorithm + ' ' + 'Signature=' + signaturetohash + ' ' + "AccessKey=" + access_key + ' ' + 'Timestamp=' + str(
int(time_stamp))
header = {
'Content-Type': 'application/json',
'Authorization': Authorization
}
return header
def run(self):
device_list = self.get_all_device_list()
first_signal = True
# sleep_days = 0.25
sleep_secs = 3600
while True:
try:
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
if first_signal:
start_time = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
else:
start_time = (datetime.now() - timedelta(seconds=sleep_secs)).strftime("%Y-%m-%d %H:%M:%S")
for key in device_list.keys():
conf = {}
# 正服/测服判断
if device_list[key]['service_type'] is None or 'test' in device_list[key]['service_type']:
# 国别判断
if device_list[key]['contry_code'] is None:
conf = self.conf_info['prod_cn']
else:
conf = self.conf_info['prod_cn']
elif 'prod' in device_list[key]['service_type']:
# 国别判断
if device_list[key]['contry_code'] is None:
conf = self.conf_info['prod_cn']
else:
conf = self.conf_info['prod_cn']
conf['device_name'] = key
conf['product_id'] = device_list[key]['product_id']
self.get_iot_info(conf, "core", start_time, end_time)
self.get_iot_info(conf, "log", start_time, end_time)
os.system('rm -rf ../tmp/*')
first_signal = False
time.sleep(sleep_secs)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main_entry = sysMonitorMain()
main_entry.run()
统一请求路径
创建时间:2023-03-28 21:40
最后编辑:admin 更新时间:2024-10-18 16:29
最后编辑:admin 更新时间:2024-10-18 16:29