网站摇奖活动怎么做,网站充值怎么做的,江门蓬江发布,兰州做网站目录 API安全概述设计一个安全的API一个基本的API主要代码调用API的一些问题 BasicAuth认证流程主要代码问题 API Key流程主要代码问题 Bearer auth/Token auth流程 Digest Auth流程主要代码问题 JWT Token流程代码问题 Hmac流程主要代码问题 OAuth比较自定义请求签名身份认证密钥加密防重放请求时效性请求签名算法设计代码 攻击与防御SQL注入敏感信息泄露越权攻击重放攻击 全部代码参考 API安全概述
利用API可进行以下常见的攻击
注入攻击SQL注入、命令注入、XSS等DOS/DDOS攻击SSRF未授权/水平垂直越权敏感数据泄露中间人攻击更改请求方法调用并发攻击重放攻击数据篡改和伪造
有以下常见的防御方式
资源请求限制通过限频等手段来解决DOS、DDOS攻击线程加锁来解决并发攻击权限控制通过ABAC、RBAC等方式解决越权攻击敏感信息防泄露通过分类分级引擎数据库加密存储等方式来解决敏感信息泄露防重放通过API认证解决重放攻击加密例如HTTPS来解决中间人攻击安全产品例如API网关、WAF等来解决大部分攻击
当然有些还是需要API后端代码来进行防御例如命令注入、SSRF等。
本文以API身份认证为主要内容浅谈各种认证的使用场景与优缺点同时穿插部分攻击与防御。
设计一个安全的API
一个基本的API
主要代码
import uuid
import re
import tracebackfrom flask import Flask, request, jsonify
from mysql import MysqlCli
from log import log
from setting import *app Flask(__name__)
log.set_file()# 验证username
def validate_username(username:str)-bool:if len(username) 20:return Falsereturn True# 验证手机号
def validate_phone_number(phone_number:str)-bool:# 使用正则表达式检查手机号格式pattern re.compile(r^1[3456789]\d{9}$)if re.match(pattern, phone_number):return Trueelse:return False# 管理员注册用户接口
app.route(/api/v1.0/admin/add_user, methods[POST])
def add_user():resp {requestid: uuid.uuid4()}if request.is_json:data request.get_json()username data.get(username)if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400phone_number data.get(phone_number)if phone_number is None:resp[error] phone_number is requiredlog.logger.error(furl:{request.url},params:{phone_number},resp:{resp})return jsonify(resp),400if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400try:cli MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one(users,{username: username,phone_number: phone_number})cli.close()resp[message] fsuccess to add user:{username}!log.logger.info(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)except Exception:log.logger.info(furl:{request.url},params:{username},resp:{resp})resp[error] ffailed to insert to mysql:{traceback.format_exc()}return jsonify(resp),500else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400# 用户获取信息接口
app.route(/api/v1.0/get_user_info, methods[POST])
def get_user_info():resp {requestid: uuid.uuid4()}if request.is_json:data request.get_json()username data.get(username)if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400try:cli MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql fselect * from users where username {username} limit 1user cli.select_all(sql)resp[message] fsuccess to get user:{user}.log.logger.info(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)except Exception:resp[error] ffailed to get user, error:{traceback.format_exc()}log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),500else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400if __name__ __main__:app.run()
可以看到共有两个API管理员注册用户接口、用户获取信息接口拥有以下功能或安全措施
版本控制日志记录请求方法校验请求数据校验
调用 API的一些问题
没有身份认证只要有人知道api地址、方法、参数就能调用没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
BasicAuth
认证流程
客户端发送请求头Authorization为Basic username:passwordbase64编码的数据包服务端对请求头Authorization判断解码后从数据库查询判断账号密码是否正确
主要代码
def basic_auth(f):wraps(f)def decorated_function(*args, **kwargs):resp {requestid: uuid.uuid4()}auth_header request.headers.get(Authorization)if not auth_header or not auth_header.startswith(Basic ):resp[error] basic auth is requiredlog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp), 400else:try:encoded_credentials auth_header.split( )[1]decoded_credentials base64.b64decode(encoded_credentials).decode(utf-8)username, password decoded_credentials.split(:)if not check_basic_auth(username, password):resp[error] basic auth failed,check your username or password is rightlog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp), 401except Exception:resp[error] fbasic auth failed,err: {traceback.format_exc()}log.logger.error(furl:{request.url},resp:{resp})return jsonify(resp), 500return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0 增加密码
app.route(/api/v2.0/admin/basic_auth/add_user, methods[POST])
basic_auth
def add_user_basic_auth():resp {requestid: uuid.uuid4()}# 请求是json格式if request.is_json:data request.get_json()username data.get(username)# username检查if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400# phone_number检查phone_number data.get(phone_number)if phone_number is None:resp[error] phone_number is requiredlog.logger.error(furl:{request.url},params:{phone_number},resp:{resp})return jsonify(resp),400# 插入数据库try:cli MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one(users,{username: username,phone_number: phone_number})pwd generate_random_password(secrets.choice(range(8, 17)))cli.insert_one(passwords, {username: username,password: pwd})cli.close()resp[message] fsuccess to add user:{username},password {pwd},remember it!log.logger.info(furl:{request.url},params:{username},resp:{resp})# 异常返回except Exception:log.logger.info(furl:{request.url},params:{username},resp:{resp})resp[error] ffailed to insert to mysql:{traceback.format_exc()},500return jsonify(resp)return jsonify(resp)# 请求不是json格式else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400问题
没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
引入问题
请求的密码进行base64编码容易获得并解码获得明文数据库密码明文存储数据库sql语句执行没有按事务处理可能用户插入到数据库但密码没有入库
API Key
流程
客户端发送请求时通过query string、请求头X-API-Key或其他自定义请求头、Cookie中携带apikey服务端根据约定的方式获取后查询数据库判断是否存在
主要代码
def api_key_auth(f):wraps(f)def decorated_function(*args, **kwargs):resp {requestid: uuid.uuid4()}# api key检查if api_key not in request.headers:resp[error] api_key is required in headerslog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp), 400if not check_api_key(request.headers[api_key]):resp[error] fapi_key {request.headers[api_key]} is invalidlog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp), 401return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0 增加api token
app.route(/api/v2.0/admin/api_key/add_user, methods[POST])
api_key_auth
def add_user_api_key():resp {requestid: uuid.uuid4()}# request是jsonif request.is_json:data request.get_json()username data.get(username)if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400phone_number data.get(phone_number)if phone_number is None:resp[error] phone_number is requiredlog.logger.error(furl:{request.url},params:{phone_number},resp:{resp})return jsonify(resp),400if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400try:cli MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one(users,{username: username,phone_number: phone_number})key secrets.token_hex(16)cli.insert_one(keys,{username: username,key: key})cli.close()resp[message] fsuccess to add user:{username},key {key},remember it!log.logger.info(furl:{request.url},params:{username},resp:{resp})except Exception:log.logger.info(furl:{request.url},params:{username},resp:{resp})resp[error] ffailed to insert to mysql:{traceback.format_exc()}return jsonify(resp),500return jsonify(resp)else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400博主这里实现时添加了自定义请求头api_key
问题
没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
引入问题
API key明文发送容器抓包获取API key明文存储数据库sql语句执行没有按事务处理可能用户插入到数据库但api key没有入库
Bearer auth/Token auth
流程
请求端通过某种认证机制比如用户名密码登录、OAuth 认证等获取令牌。 在发起 HTTP 请求时客户端将这个令牌添加到Authorization 请求头中格式为 “Bearer token”。服务器接收到请求后会验证这个令牌的有效性如果有效则允许请求继续处理否则拒绝访问。
使用 Bearer authentication 的优势在于令牌本身可以包含更多的信息、具有较长的有效期并且不需要在服务器端保存会话状态这样可以减轻服务器负担并提高安全性。
这里就不实现了后面通过jwt token算是实现其中的一种。
Digest Auth
流程
客户端发送一个未经认证的请求给服务器。 服务器返回一个 401 Unauthorized响应有一个响应头WWW-Authenticate其中包含一个随机数nonce和其他认证需要的信息。客户端收到 401响应后会向用户提示输入用户名和密码然后根据特定的算法通常是 MD5对用户名、密码、随机数nonce、HTTP 方法和请求的URI 进行摘要计算。客户端将计算出的摘要放在 Authorization 请求头中发送给服务器。服务器收到请求后会根据事先约定好的算法再次计算摘要如果两个摘要匹配则验证通过否则拒绝访问。
涉及的几个常见参数如下
realm必选。是一个保护空间的名称用于向用户表明请求的资源属于哪个保护空间。它通常用于表示一组受保护的资源用于构造摘要字符串。nonce必选。是一个唯一的字符串401时由服务器生成并发送给客户端。它用于防止重放攻击replay attack。每次认证请求都会使用一个新的 nonce 值使得每次请求的摘要都是不同的从而提高了安全性。qop(Quality of Protection) 必选。可以是 “auth” 或 “auth-int”。auth 代表身份验证而 auth-int 代表身份验证和消息完整性保护。Qop 的存在使得摘要认证更加灵活和安全。algorithm可选默认MD5。指定了用于计算摘要的哈希算法通常是 MD5。服务器在 WWW-Authenticate 响应头中指定客户端按照这个算法进行摘要计算。还有MD5-sess、SHA、SHA-256、SHA-512等。nc可选是一个计数器用于跟踪特定 nonce 值的使用次数。每次客户端发送请求时Nc 都会递增帮助防止重放攻击。cnonce(Client Nonce) 可选。是客户端生成的随机字符串用于与服务器的 nonce 一起使用以增加请求的独特性和安全性。opaque可选。是服务器生成的字符串客户端在后续请求中必须原样返回。它用来保持服务器状态或防止某些类型的攻击。charset可选。默认utf-8编码方式。userhash可选。默认false。服务端返回的是否支持username哈希。
主要代码
# 将字符串保存到Redis中并设置过期时间
def save_nonce_with_expiry(key, value, expiry_seconds)::param key: 键:param value: 值:param expiry_seconds: 过期时间秒# 连接Redis数据库redis_client redis.StrictRedis(hostREDIS_HOST,passwordREDIS_PASSWORD, portREDIS_PORT, dbREDIS_DB)redis_client.setex(key, expiry_seconds, value)def check_nonce(key):检查字符串是否存在于Redis中:param key: 键:return: 布尔值表示键是否存在# 连接Redis数据库redis_client redis.StrictRedis(hostREDIS_HOST,passwordREDIS_PASSWORD ,portREDIS_PORT, dbREDIS_DB)return redis_client.exists(key)# 校验username 返回密码
def check_username(username):try:cli MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql fselect * from passwords where username {username} limit 1user cli.select_one(sql)return user[password] if user else except Exception:log.logger.error(fcheck_basic_auth failed,error:{traceback.format_exc()})return False# 校验response
def check_response(response,realm,username,password,method,uri,req_nonce,nc,cnonce,qop):try:log.logger.info(fcheck_response ,response:{response},realm:{realm},username:{username},fpassword:{password},method:{method},uri:{uri},req_nonce:{req_nonce},nc:{nc},fcnonce:{cnonce},qop:{qop})# 校验nonceif not check_nonce(req_nonce):log.logger.error(fcheck_signature failed,error:{req_nonce} not exist!)return Falseha1hashlib.md5(f{username}:{realm}:{password}.encode()).hexdigest()ha2hashlib.md5(f{method}:{uri}.encode()).hexdigest()return response hashlib.md5(f{ha1}:{req_nonce}:{nc}:{cnonce}:{qop}:{ha2}.encode()).hexdigest()except Exception:log.logger.error(fcheck_responce failed,error:{traceback.format_exc()})return False# 添加用户 用于添加用户时无需额外增加用户关联的密钥等信息时的接口
def add_user():resp {requestid: uuid.uuid4()}log.logger.info(request.headers)# request是jsonif request.is_json:data request.get_json()username data.get(username)if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)phone_number data.get(phone_number)if phone_number is None:resp[error] phone_number is requiredlog.logger.error(furl:{request.url},params:{phone_number},resp:{resp})return jsonify(resp)if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)try:cli MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)cli.insert_one(users,{username: username,phone_number: phone_number})cli.close()resp[message] fsuccess to add user:{username}!log.logger.info(furl:{request.url},params:{username},resp:{resp})except Exception:log.logger.info(furl:{request.url},params:{username},resp:{resp})resp[error] ffailed to insert to mysql:{traceback.format_exc()}return jsonify(resp)else:resp[error] Invalid JSON format in requestreturn jsonify(resp)# 管理员注册用户接口 v2.0增加digest算法
app.route(/api/v2.0/admin/digest_auth/add_user, methods[POST])
digest_auth
def add_user_digest():return add_user()输入账号密码后登录发起请求
问题
没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
优点
可以防止重放攻击
引入问题
发送两个请求更加消耗资源需要存储nonce这里有设置过期时间实现比较复杂
JWT Token
流程
通常由三个部分组成header、payload 和 signature。
header包含两个部分令牌类型即 “JWT”和所使用的签名算法如 HMAC SHA256 或 RSA。payload包含声明claims。声明是关于实体通常是用户和其他数据的声明。 issIssuer该声明标识了 JWT 的发行者。subSubject该声明标识了 JWT 的主题即所描述的实体。audAudience该声明标识了 JWT 的受众即预期的接收者。expExpiration Time该声明指定了 JWT 的过期时间在此时间之后JWT 将被认为是无效的。nbfNot Before该声明指定了 JWT 的生效时间在此时间之前JWT 将被认为是无效的。iatIssued At该声明指定了 JWT 的签发时间。jtiJWT ID该声明为 JWT 提供了一个唯一标识符。 signature为了确保 JWT 未被篡改需要对编码后的 header 和 payload 使用指定的签名算法和一个密钥进行签名。
代码
def jwt_auth(f):wraps(f)def decorated_function(*args, **kwargs):resp {requestid: uuid.uuid4()}auth_header request.headers.get(Authorization)if not auth_header or not auth_header.startswith(Bearer ):resp[message] no Authorization header or invalid formatreturn jsonify(resp), 400try:token auth_header.split( )[1]jwt.decode(token, JWT_SECRET, algorithms[HS256])except jwt.ExpiredSignatureError:resp[error] token has expiredlog.logger.error(furl:{request.url},token:{token},resp:{resp})return jsonify(resp), 401except jwt.InvalidTokenError:resp[message] finvalid token {token}log.logger.error(furl:{request.url},token:{token},resp:{resp})return jsonify(resp), 401except Exception as e:resp[message] finteral error {e}log.logger.error(furl:{request.url},token:{token},resp:{resp})return jsonify(resp), 500return f(*args, **kwargs)return decorated_function# 用户登录获取jwt token接口
app.route(/api/v2.0/admin/jwt/login, methods[POST])
def get_jwt_token():resp {requestid: uuid.uuid4()}if request.is_json:data request.get_json()username data.get(username)password data.get(password)if username is None or password is None:resp[error] username or password is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),400try:cli MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql fselect * from passwords where username {username} limit 1user cli.select_one(sql)if not user:resp[error] username not foundlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp), 400if user[password] ! password:resp[error] password is not rightlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp), 401payload {iss: lady_killer9,exp: datetime.now() timedelta(seconds5*60),jti: str(uuid.uuid4())}resp[message] fsuccess to login :{user},token:{jwt.encode(payload,JWT_SECRET,algorithmHS256)}log.logger.info(furl:{request.url},params:{username},resp:{resp})return jsonify(resp), 200except Exception:resp[error] ffailed to get user, error:{traceback.format_exc()}log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp),500else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400# 管理员注册用户接口 v2.0增加jwt sha256算法
app.route(/api/v2.0/admin/jwt/add_user, methods[POST])
jwt_auth
def add_user_jwt():return add_user()# 用户获取信息接口 v2.0 增加jwt sha256算法
app.route(/api/v2.0/jwt/get_user_info, methods[POST])
jwt_auth
def get_user_info_jwt():return get_user_info()问题
没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
优点
token可以设置时间限制过期后不可调用payload的aud等参数可以用于鉴权可添加自定义payload方便做其他的功能
引入问题
jwt token容易破解
bejson jwt在线解密
Hmac
流程 和Digest Auth差不多可以由客户端生成随机数这样请求一次即可随机数不可重复。
主要代码
# 验证摘要
def check_signature(signture:str,username:str,nonce:int,data:dict):try:cli MysqlCli(MYSQL_HOST, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE)sql fselect * from secrets where username {username} limit 1secret cli.select_one(sql)cli.close()if secret:server_signature hmac.new(str(secret[secret]).encode(utf-8), json.dumps(data).encode(utf-8), hashlib.sha256).hexdigest()log.logger.info(fsecret:{str(secret[secret])},data:{data},server_signature:{server_signature})return server_signature signturelog.logger.error(fcheck_signature failed,error:{username} {secret})return Falseexcept Exception:log.logger.error(fcheck_signature failed,error:{traceback.format_exc()})return Falsedef hmac_auth(f):wraps(f)def decorated_function(*args, **kwargs):resp {requestid: uuid.uuid4()}# query检查 nonce usernamenonce request.args.get(nonce, typeint)username request.args.get(username)if nonce is None or username is None:resp[error] username or nonce not found in query stringlog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp)# 随机数验证if check_nonce(nonce):log.logger.error(fcheck_signature failed,error:{nonce} is in database)resp[error] fcheck_signature failed,error:{nonce} is in databaselog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp)save_nonce_with_expiry(nonce,1,MIN*60)# signature检查if Signature not in request.headers:resp[error] signature is required in headerslog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp)# request是jsonif request.is_json:data request.get_json()if not check_signature(request.headers[Signature], username, nonce, data):resp[error] fsignature {request.headers[Signature]} is invalidlog.logger.error(furl:{request.url},resp:{resp})return jsonify(resp),401else:resp[error] Invalid JSON format in requestreturn jsonify(resp),400return f(*args, **kwargs)return decorated_function
# 管理员注册用户接口 v2.0增加hmac sha256算法
app.route(/api/v2.0/admin/hmac/add_user, methods[POST])
hmac_auth
def add_user_hmac():resp {requestid: uuid.uuid4()}# request是jsonif request.is_json:data request.get_json()username data.get(username)if username is None:resp[error] username is requiredlog.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)phone_number data.get(phone_number)if phone_number is None:resp[error] phone_number is requiredlog.logger.error(furl:{request.url},params:{phone_number},resp:{resp})return jsonify(resp)if not validate_username(username):resp[error] username length should not exceed 10log.logger.error(furl:{request.url},params:{username},resp:{resp})return jsonify(resp)try:cli MysqlCli(MYSQL_HOST,MYSQL_USER,MYSQL_PASSWORD,MYSQL_DATABASE)cli.insert_one(users,{username: username,phone_number: phone_number})secret secrets.token_hex(16)cli.insert_one(secrets,{username: username,secret: secret})cli.close()resp[message] fsuccess to add user:{username},secret {secret},remember it!log.logger.info(furl:{request.url},params:{username},resp:{resp})except Exception:log.logger.info(furl:{request.url},params:{username},resp:{resp})resp[error] ffailed to insert to mysql:{traceback.format_exc()}return jsonify(resp)else:resp[error] Invalid JSON format in requestreturn jsonify(resp)# 用户获取信息接口 v2.0 增加hmac
app.route(/api/v2.0/hmac/get_user_info, methods[POST])
hmac_auth
def get_user_info_hmac():return get_user_info()问题
没有调用次数限制可能被Dos或DDos攻击造成服务器压力没有防止数据篡改或伪造被抓包后可能被篡改或伪造数据没有对重放攻击的防御抓包后可能重放包越权问题管理员接口任何人都可以调用SQL注入没有做参数化查询等防止SQL操作可能被拖库
引入问题
需要存储nonce这里有设置过期时间
OAuth
内容太多留坑先不看了
比较
比较项Basic AuthAPI keyBearer Auth/Token authDigest AuthHmac AuthJWT身份认证√√√√√√密钥加密×××√√×服务端存储√√√√√×token可以不存防重放×××√√×token失效前可重放时效性×××√√√自定义××√√√√
通过以上比较如果设计一个签名具有以下优点会比较好
防重放。通过随机数防止请求重放随机数由客户端计算可以减轻服务端压力。密钥加密。使用密钥计算签名。服务端存储。存储随机数设置过期时间。时效性。添加时间戳过期后请求失败。自定义。自定义请求签名当然可以自定义一些东西用于鉴权等。
自定义请求签名 身份认证密钥加密
请求需要验证身份就需要有账密这里就用SecretId、SecretKey其中SecretKey用于进行签名的计算。
防重放
为了防重放生成一个随机数NonceNonce唯一这样服务端收到携带该Nonce的请求后还发送带该Nonce的请求就拒绝掉。因此Nonce需要服务器保存同时为了防止篡改签名时需要。 那么问题来了Nonce需要服务器保存不能一直保存吧随着时间推移存储成本会越来越高因此需要时间限制。
请求时效性
请求应该具有时效性这里使用unix时间戳Timestamp。规定在1分钟内请求有效这样Nonce保存时间在1分钟即可。
请求签名算法设计
密钥加密选择SHA-256算法当然算法可以当做参数由客户端指定就用Algorithm吧 url类似xxx?SecretIdxxxNoncexxxTimestampxxxAlgorithmxxx 添加一个自定义请求头Signature放上签名待签名字符串规定格式如下 {Nonce}:{Timestamp}:{Algorithm}:{HTTPMethod}:{base64(HTTPBody)} HTTPMethodHTTP请求方法例如POSTHTTPBodyHTTP请求体例如{“name”:“lady_killer9”}
当然还可以添加更多到待签名字符串
客户端
生成一个随机字符串不包含:对请求头和请求体做base64编码并按照格式拼接使用算法和SecretKey进行签名生成时间戳发送请求
服务端
从url获取时间戳Timestamp校验是否在时间内从url获取随机字符串Nonce校验是否在数据库中对请求头和请求体做base64编码从url获取SecretId后从数据库查询对应的SecretKey使用SecretKey和Algorithm算法对拼接同样格式字符串进行签名得到ServerSignature比较Signature是否与请求头Signature的值一致
代码
def verify_signature(signature, nonce,timestamp,algorithm,method,body_base64):format_str f{nonce}:{timestamp}:{algorithm}:{method}:{body_base64}server_signature hashlib.sha256(format_str.encode()).hexdigest()log.logger.info(format_str)log.logger.info(server_signature)return signature server_signaturedef require_signature(f):wraps(f)def decorated_function(*args, **kwargs):resp {requestid: uuid.uuid4()}secret_id request.args.get(SecretId, typestr)nonce request.args.get(Nonce, typestr)timestamp request.args.get(Timestamp, typeint)algorithm request.args.get(Algorithm, typestr)if secret_id is None:resp[error] No SecretId in query stringreturn jsonify(resp),400if nonce is None:resp[error] No Nonce in query stringreturn jsonify(resp),400if timestamp is None:resp[error] No Timestamp in query stringreturn jsonify(resp),400if algorithm is None:resp[error] No Algorithm in query stringreturn jsonify(resp),400if algorithm not in [sha256]:resp[error] fcan not support {algorithm}return jsonify(resp), 400if (datetime.now() - timedelta(minutesMIN)).timestamp() timestamp:resp[error] Request is send before 5 mins ago, check Timestampreturn jsonify(resp), 400if : in nonce:resp[error] can not contain : in Nonce, generate a new onereturn jsonify(resp), 400cli redis.Redis.from_url(REDIS_URL)if cli.exists(nonce):resp[error] can not request with same Noncereturn jsonify(resp), 400else:cli.setex(nonce, MIN*60, 1)signature request.headers.get(Signature)if not signature:resp[error] no Signature in headersreturn jsonify(resp), 400# 解析 Authorization header验证签名body_base64 bytes.decode(b64encode(json.dumps(request.get_json(),ensure_asciiFalse).encode()))if not verify_signature(signature, nonce,timestamp,algorithm,request.method,body_base64):resp[error] invalid signaturereturn jsonify(resp), 401return f(*args, **kwargs)return decorated_function攻击与防御
SQL注入
例如在v1.0的get_user_info接口存在将用户输入拼接到sql的漏洞可以被SQL注入。
抓包如下 防御方面可以通过预编译等方式来解决 v3.0已解决
敏感信息泄露
例如在v1.0的get_user_info接口用户手机号被完整返回没有打码。 防御上可以通过加*打码或MFA等来解决 v3.0已解决
越权攻击
例如在v2.0的api key认证接口任意用户都能查询admin用户的信息只需要知道username即可 防御方面可以通过添加RBAC等鉴权来解决
重放攻击
例如v2.0的api key认证接口设置burpsuite代理放到重放器Reapter发送多少次都可以。 v3.0通过自定义请求签名就解决了此类问题。
全部代码
Github-api_history
参考
API-Security Owasp top 10
API 鉴权都有哪些分类这些重点不要错过 best-practices-for-authentication-and-authorization-for-rest-apis/
pyjwt https://github.com/ticarpi/jwt_tool
rfc6750-The OAuth 2.0 Authorization Framework: Bearer Token Usage rfc7616-HTTP Digest Access Authentication rfc2617-HTTP Authentication: Basic and Digest Access Authentication rfc7519-JSON web Token (JWT)
Github-jwt_tool