基于Nginx的微服务网关实现与最佳实践:从零打造企业级API Gateway
引言:为什么你的微服务架构需要一个强大的网关?
还记得上次生产环境的那个事故吗?某个服务突然涌入大量请求,没有限流保护,直接把下游服务打挂了。或者是那次安全审计,发现有些API接口裸奔在公网上,没有任何认证机制。又或者是运维同学半夜被叫起来,因为某个服务的日志分散在十几台机器上,根本无法快速定位问题...
这些痛点,其实都指向同一个解决方案:你需要一个统一的API网关。
今天,我将分享我们团队如何基于Nginx构建了一个日均处理10亿+请求的微服务网关,以及踩过的那些坑。这套方案已经稳定运行2年+,经历过多次大促考验。
一、架构设计:不只是反向代理那么简单
1.1 整体架构设计
我们的网关架构分为四层:
├── 接入层(DNS + CDN) ├── 网关层(Nginx + OpenResty) ├── 服务层(微服务集群) └── 数据层(Redis + MySQL + MongoDB)
核心设计理念:
•高可用:多活部署,自动故障转移
•高性能:充分利用Nginx的事件驱动模型
•可扩展:基于OpenResty的Lua脚本扩展
•可观测:完整的监控和日志体系
1.2 技术选型对比
| 方案 | 优势 | 劣势 | 适用场景 |
|---|---|---|---|
| Nginx + OpenResty | 性能极高、稳定性好、运维成熟 | 功能相对简单、需要二次开发 | 高并发、低延迟要求 |
| Kong | 功能丰富、插件生态好 | 性能损耗较大、运维复杂 | 中小规模、快速搭建 |
| Spring Cloud Gateway | Java生态友好、功能完善 | 性能一般、资源占用高 | Java技术栈 |
| Envoy | 云原生、功能强大 | 学习曲线陡、配置复杂 | K8s环境 |
二、核心功能实现:从配置到代码
2.1 动态路由配置
传统的Nginx配置需要reload才能生效,这在生产环境是不可接受的。我们的解决方案:
# nginx.conf 核心配置
http{
# 引入Lua模块
lua_package_path"/usr/local/openresty/lualib/?.lua;;";
lua_shared_dictroutes_cache100m;
lua_shared_dictupstream_cache100m;
# 初始化阶段加载路由
init_by_lua_block{
localroute_manager = require"gateway.route_manager"
route_manager.init()
}
# 定时更新路由配置
init_worker_by_lua_block {
localroute_manager = require"gateway.route_manager"
-- 每10秒从配置中心拉取最新路由
ngx.timer.every(10, route_manager.sync_routes)
}
server {
listen80;
server_nameapi.example.com;
location/ {
# 动态路由处理
access_by_lua_block{
localrouter = require"gateway.router"
router.route()
}
# 动态upstream
proxy_pass http://$upstream;
# 标准代理配置
proxy_set_headerHost$host;
proxy_set_headerX-Real-IP$remote_addr;
proxy_set_headerX-Forwarded-For$proxy_add_x_forwarded_for;
proxy_set_headerX-Request-Id$request_id;
}
}
}
对应的Lua路由模块:
-- gateway/router.lua
local_M = {}
localroutes_cache = ngx.shared.routes_cache
localcjson =require"cjson"
function_M.route()
localuri = ngx.var.uri
localmethod = ngx.var.request_method
-- 从缓存获取路由配置
localroute_key = method ..":".. uri
localroute_data = routes_cache:get(route_key)
ifnotroute_datathen
-- 模糊匹配逻辑
route_data = _M.fuzzy_match(uri, method)
end
ifroute_datathen
localroute = cjson.decode(route_data)
-- 设置upstream
ngx.var.upstream = route.upstream
-- 添加自定义header
ifroute.headersthen
fork, vinpairs(route.headers)do
ngx.req.set_header(k, v)
end
end
-- 路径重写
ifroute.rewritethen
ngx.req.set_uri(route.rewrite)
end
else
ngx.exit(404)
end
end
function_M.fuzzy_match(uri, method)
-- 实现路径参数匹配 /api/user/{id} -> /api/user/123
localall_routes = routes_cache:get("all_routes")
ifnotall_routesthen
returnnil
end
localroutes = cjson.decode(all_routes)
for_, routeinipairs(routes)do
localpattern = route.path:gsub("{.-}","([^/]+)")
localmatches = {ngx.re.match(uri,"^".. pattern .."$")}
ifmatchesandroute.method == methodthen
-- 提取路径参数
localparams = {}
fori,matchinipairs(matches)do
ifi >1then
params[route.params[i-1]] =match
end
end
-- 将参数传递给upstream
ngx.ctx.path_params = params
returncjson.encode(route)
end
end
returnnil
end
return_M
2.2 智能负载均衡
不仅仅是轮询,我们实现了基于响应时间的动态权重调整:
-- gateway/balancer.lua
local_M = {}
localupstream_cache = ngx.shared.upstream_cache
function_M.get_server(upstream_name)
localservers_key ="servers:".. upstream_name
localservers_data = upstream_cache:get(servers_key)
ifnotservers_datathen
returnnil
end
localservers = cjson.decode(servers_data)
-- 基于加权响应时间选择服务器
localtotal_weight =0
localweighted_servers = {}
for_, serverinipairs(servers)do
-- 获取服务器统计信息
localstats_key ="stats:".. server.host ..":".. server.port
localstats = upstream_cache:get(stats_key)
ifstatsthen
stats = cjson.decode(stats)
-- 响应时间越短,权重越高
localweight =1000/ (stats.avg_response_time +1)
-- 考虑错误率
weight = weight * (1- stats.error_rate)
-- 考虑服务器配置的基础权重
weight = weight * server.weight
table.insert(weighted_servers, {
server = server,
weight = weight,
range_start = total_weight,
range_end = total_weight + weight
})
total_weight = total_weight + weight
else
-- 新服务器,给予默认权重
table.insert(weighted_servers, {
server = server,
weight = server.weight,
range_start = total_weight,
range_end = total_weight + server.weight
})
total_weight = total_weight + server.weight
end
end
-- 加权随机选择
localrandom_weight =math.random() * total_weight
for_, wsinipairs(weighted_servers)do
ifrandom_weight >= ws.range_startandrandom_weight < ws.range_end then
return ws.server
end
end
-- 兜底返回第一个
return servers[1]
end
-- 更新服务器统计信息
function _M.update_stats(server, response_time, is_error)
local stats_key = "stats:" .. server.host .. ":" .. server.port
local stats = upstream_cache:get(stats_key)
if not stats then
stats = {
total_requests = 0,
total_response_time = 0,
avg_response_time = 0,
error_count = 0,
error_rate = 0
}
else
stats = cjson.decode(stats)
end
-- 更新统计
stats.total_requests = stats.total_requests + 1
stats.total_response_time = stats.total_response_time + response_time
stats.avg_response_time = stats.total_response_time / stats.total_requests
if is_error then
stats.error_count = stats.error_count + 1
end
stats.error_rate = stats.error_count / stats.total_requests
-- 保存统计,设置过期时间防止内存泄漏
upstream_cache:set(stats_key, cjson.encode(stats), 300)
end
return _M
2.3 限流熔断机制
基于令牌桶算法的分布式限流实现:
-- gateway/rate_limiter.lua
local_M = {}
localredis =require"resty.redis"
-- 令牌桶限流
function_M.token_bucket_limit(key, rate, capacity)
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifnotokthen
ngx.log(ngx.ERR,"Redis连接失败: ", err)
returntrue-- 降级放行
end
-- Lua脚本原子操作
localscript =[[
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4] or 1)
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1] or capacity)
local last_refill = tonumber(bucket[2] or now)
-- 计算应该添加的令牌数
local elapsed = math.max(0, now - last_refill)
local tokens_to_add = elapsed * rate
tokens = math.min(capacity, tokens + tokens_to_add)
if tokens >= requested then
tokens = tokens - requested
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, capacity / rate + 1)
return 1
else
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, capacity / rate + 1)
return 0
end
]]
localnow = ngx.now()
localres = red:eval(script,1, key, rate, capacity, now,1)
red:set_keepalive(10000,100)
returnres ==1
end
-- 熔断器实现
function_M.circuit_breaker(service_name)
localbreaker_key ="breaker:".. service_name
localbreaker_cache = ngx.shared.breaker_cache
-- 获取熔断器状态
localstate = breaker_cache:get(breaker_key ..":state")or"closed"
ifstate =="open"then
-- 检查是否到了半开时间
localopen_time = breaker_cache:get(breaker_key ..":open_time")
ifngx.now() - open_time >30then-- 30秒后尝试半开
breaker_cache:set(breaker_key ..":state","half_open")
state ="half_open"
else
returnfalse,"Circuit breaker is open"
end
end
ifstate =="half_open"then
-- 半开状态,允许少量请求通过
localhalf_open_count = breaker_cache:incr(breaker_key ..":half_open_count",1,0)
ifhalf_open_count >5then-- 只允许5个请求
returnfalse,"Circuit breaker is half open, limit exceeded"
end
end
returntrue
end
-- 更新熔断器状态
function_M.update_breaker(service_name, is_success)
localbreaker_key ="breaker:".. service_name
localbreaker_cache = ngx.shared.breaker_cache
localstate = breaker_cache:get(breaker_key ..":state")or"closed"
ifstate =="closed"then
ifnotis_successthen
-- 增加失败计数
localfail_count = breaker_cache:incr(breaker_key ..":fail_count",1,0,60)
-- 10秒内失败10次,打开熔断器
iffail_count >=10then
breaker_cache:set(breaker_key ..":state","open")
breaker_cache:set(breaker_key ..":open_time", ngx.now())
ngx.log(ngx.WARN,"Circuit breaker opened for: ", service_name)
end
end
elseifstate =="half_open"then
ifis_successthen
-- 半开状态成功,关闭熔断器
breaker_cache:set(breaker_key ..":state","closed")
breaker_cache:delete(breaker_key ..":fail_count")
breaker_cache:delete(breaker_key ..":half_open_count")
ngx.log(ngx.INFO,"Circuit breaker closed for: ", service_name)
else
-- 半开状态失败,重新打开
breaker_cache:set(breaker_key ..":state","open")
breaker_cache:set(breaker_key ..":open_time", ngx.now())
end
end
end
return_M
2.4 统一认证鉴权
JWT认证和细粒度权限控制:
-- gateway/auth.lua
local_M = {}
localjwt =require"resty.jwt"
localredis =require"resty.redis"
-- JWT验证
function_M.verify_jwt()
localauth_header = ngx.var.http_authorization
ifnotauth_headerthen
returnfalse,"Missing authorization header"
end
local_, _, token =string.find(auth_header,"Bearer%s+(.+)")
ifnottokenthen
returnfalse,"Invalid authorization header format"
end
-- 验证JWT
localjwt_secret =os.getenv("JWT_SECRET")or"your-secret-key"
localjwt_obj = jwt:verify(jwt_secret, token)
ifnotjwt_obj.verifiedthen
returnfalse, jwt_obj.reason
end
-- 检查token是否在黑名单中(用于主动失效)
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifokthen
localblacklisted = red:get("blacklist:".. token)
ifblacklistedthen
red:set_keepalive(10000,100)
returnfalse,"Token has been revoked"
end
red:set_keepalive(10000,100)
end
-- 将用户信息存入上下文
ngx.ctx.user = jwt_obj.payload
returntrue
end
-- 权限检查
function_M.check_permission(required_permission)
localuser = ngx.ctx.user
ifnotuserthen
returnfalse,"User not authenticated"
end
-- 从缓存或数据库获取用户权限
localpermissions = _M.get_user_permissions(user.user_id)
-- 支持通配符匹配
for_, perminipairs(permissions)do
if_M.match_permission(perm, required_permission)then
returntrue
end
end
returnfalse,"Permission denied"
end
-- 权限匹配(支持通配符)
function_M.match_permission(user_perm, required_perm)
-- 将权限字符串转换为模式
-- user* 可以匹配 userprofile
localpattern = user_perm:gsub("*",".*")
pattern ="^".. pattern .."$"
returnngx.re.match(required_perm, pattern) ~=nil
end
-- 获取用户权限(带缓存)
function_M.get_user_permissions(user_id)
localcache_key ="permissions:".. user_id
localpermissions_cache = ngx.shared.permissions_cache
-- 先从本地缓存获取
localcached = permissions_cache:get(cache_key)
ifcachedthen
returncjson.decode(cached)
end
-- 从Redis获取
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifnotokthen
ngx.log(ngx.ERR,"Redis连接失败: ", err)
return{}
end
localpermissions = red:smembers("user".. user_id)
red:set_keepalive(10000,100)
-- 缓存5分钟
permissions_cache:set(cache_key, cjson.encode(permissions),300)
returnpermissions
end
-- API签名验证(防重放攻击)
function_M.verify_signature()
localsignature = ngx.var.http_x_signature
localtimestamp = ngx.var.http_x_timestamp
localnonce = ngx.var.http_x_nonce
ifnotsignatureornottimestampornotnoncethen
returnfalse,"Missing signature headers"
end
-- 检查时间戳(5分钟内有效)
localcurrent_time = ngx.now()
ifmath.abs(current_time -tonumber(timestamp)) >300then
returnfalse,"Request expired"
end
-- 检查nonce是否已使用
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifokthen
localnonce_key ="nonce:".. nonce
localexists = red:get(nonce_key)
ifexiststhen
red:set_keepalive(10000,100)
returnfalse,"Nonce already used"
end
-- 记录nonce,5分钟过期
red:setex(nonce_key,300,"1")
red:set_keepalive(10000,100)
end
-- 验证签名
localmethod = ngx.var.request_method
localuri = ngx.var.uri
localbody = ngx.req.get_body_data()or""
localsign_string = method .. uri .. timestamp .. nonce .. body
localapp_secret = _M.get_app_secret(ngx.var.http_x_app_id)
localexpected_signature = ngx.encode_base64(
ngx.hmac_sha256(app_secret, sign_string)
)
ifsignature ~= expected_signaturethen
returnfalse,"Invalid signature"
end
returntrue
end
return_M
2.5 请求响应转换
处理不同版本API的兼容性:
-- gateway/transformer.lua
local_M = {}
localcjson =require"cjson"
-- 请求转换
function_M.transform_request()
localuri = ngx.var.uri
localversion = ngx.var.http_x_api_versionor"v2"
-- 根据版本转换请求
ifversion =="v1"then
_M.transform_v1_to_v2_request()
end
-- 添加追踪头
ifnotngx.var.http_x_request_idthen
ngx.req.set_header("X-Request-Id", ngx.var.request_id)
end
-- 添加来源标识
ngx.req.set_header("X-Gateway-Time", ngx.now())
ngx.req.set_header("X-Forwarded-Host", ngx.var.host)
ngx.req.set_header("X-Forwarded-Proto", ngx.var.scheme)
end
-- V1到V2的请求转换
function_M.transform_v1_to_v2_request()
ngx.req.read_body()
localbody = ngx.req.get_body_data()
ifbodythen
localdata = cjson.decode(body)
-- 字段映射
localfield_mapping = {
user_name ="username",
user_id ="userId",
create_time ="createdAt"
}
forold_field, new_fieldinpairs(field_mapping)do
ifdata[old_field]then
data[new_field] = data[old_field]
data[old_field] =nil
end
end
-- 更新请求体
ngx.req.set_body_data(cjson.encode(data))
end
end
-- 响应转换
function_M.transform_response()
localversion = ngx.var.http_x_api_versionor"v2"
ifversion =="v1"then
-- 获取响应体
localresp_body = ngx.arg[1]
ifresp_bodythen
localok, data =pcall(cjson.decode, resp_body)
ifokthen
-- V2到V1的响应转换
data = _M.transform_v2_to_v1_response(data)
-- 更新响应体
ngx.arg[1] = cjson.encode(data)
end
end
end
-- 添加响应头
ngx.header["X-Gateway-Response-Time"] = ngx.now() - ngx.ctx.start_time
ngx.header["X-Request-Id"] = ngx.var.request_id
end
-- V2到V1的响应转换
function_M.transform_v2_to_v1_response(data)
-- 字段映射(反向)
localfield_mapping = {
username ="user_name",
userId ="user_id",
createdAt ="create_time"
}
localfunctiontransform_object(obj)
iftype(obj) ~="table"then
returnobj
end
fornew_field, old_fieldinpairs(field_mapping)do
ifobj[new_field]then
obj[old_field] = obj[new_field]
obj[new_field] =nil
end
end
-- 递归处理嵌套对象
fork, vinpairs(obj)do
obj[k] = transform_object(v)
end
returnobj
end
returntransform_object(data)
end
-- 协议转换(GraphQL to REST)
function_M.graphql_to_rest()
localbody = ngx.req.get_body_data()
ifnotbodythen
return
end
localgraphql_query = cjson.decode(body)
-- 解析GraphQL查询
localoperation = graphql_query.query:match("(%w+)%s*{")
-- 映射到REST端点
localendpoint_mapping = {
getUser = {method ="GET",path="/api/users/"},
createUser = {method ="POST",path="/api/users"},
updateUser = {method ="PUT",path="/api/users/"},
deleteUser = {method ="DELETE",path="/api/users/"}
}
localmapping = endpoint_mapping[operation]
ifmappingthen
-- 提取参数
localparams = graphql_query.variablesor{}
-- 转换为REST请求
ngx.req.set_method(ngx[mapping.method])
ifparams.idthen
ngx.req.set_uri(mapping.path.. params.id)
params.id =nil
else
ngx.req.set_uri(mapping.path)
end
-- 设置请求体
ifmapping.method ~="GET"then
ngx.req.set_body_data(cjson.encode(params))
end
end
end
return_M
三、性能优化:让网关飞起来
3.1 缓存策略
多级缓存架构,大幅提升响应速度:
# 配置本地缓存 proxy_cache_path/var/cache/nginx/api_cache levels=1:2 keys_zone=api_cache:100m max_size=10g inactive=60m use_temp_path=off; # 配置缓存清理 proxy_cache_path/var/cache/nginx/static_cache levels=1:2 keys_zone=static_cache:50m max_size=5g inactive=7d use_temp_path=off; server{ location/api/ { # 定义缓存key set$cache_key"$scheme$request_method$host$request_uri$is_args$args"; # Lua处理缓存逻辑 access_by_lua_block{ localcache = require"gateway.cache" cache.handle_cache() } # Nginx缓存配置 proxy_cache api_cache; proxy_cache_key$cache_key; proxy_cache_valid2003045m; proxy_cache_valid4041m; proxy_cache_use_staleerrortimeout updating http_500 http_502 http_503 http_504; proxy_cache_background_updateon; proxy_cache_lockon; proxy_cache_lock_timeout5s; # 添加缓存状态头 add_headerX-Cache-Status$upstream_cache_status; proxy_passhttp://backend; } location/static/ { proxy_cachestatic_cache; proxy_cache_valid2003047d; proxy_cache_validany1h; # 支持断点续传 proxy_set_headerRange$http_range; proxy_set_headerIf-Range$http_if_range; proxy_passhttp://static_backend; } }
智能缓存控制Lua脚本:
-- gateway/cache.lua
local_M = {}
localredis =require"resty.redis"
function_M.handle_cache()
localmethod = ngx.var.request_method
localuri = ngx.var.uri
-- 只缓存GET请求
ifmethod ~="GET"then
return
end
-- 根据用户身份生成缓存key
localcache_key = _M.generate_cache_key()
-- 先从Redis获取缓存
localcached_response = _M.get_from_redis(cache_key)
ifcached_responsethen
-- 检查缓存是否过期
ifnot_M.is_stale(cached_response)then
ngx.header["Content-Type"] = cached_response.content_type
ngx.header["X-Cache-Hit"] ="redis"
ngx.say(cached_response.body)
ngx.exit(200)
else
-- 异步更新缓存
ngx.timer.at(0, _M.refresh_cache, cache_key, uri)
end
end
-- 设置响应处理
ngx.ctx.cache_key = cache_key
ngx.ctx.should_cache =true
end
function_M.generate_cache_key()
localuser = ngx.ctx.user
localuri = ngx.var.uri
localargs = ngx.var.argsor""
-- 考虑用户个性化
localuser_id = useranduser.user_idor"anonymous"
-- 生成缓存key
localcache_key = ngx.md5(user_id ..":".. uri ..":".. args)
returncache_key
end
function_M.get_from_redis(key)
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifnotokthen
returnnil
end
localres = red:get("cache:".. key)
red:set_keepalive(10000,100)
if
```lua
ifresandres ~= ngx.nullthen
returncjson.decode(res)
end
returnnil
end
function_M.is_stale(cached_response)
localttl = cached_response.ttlor300
localcached_time = cached_response.cached_ator0
return(ngx.now() - cached_time) > ttl
end
function_M.refresh_cache(cache_key, uri)
-- 异步请求后端更新缓存
localhttpc =require("resty.http").new()
localres, err = httpc:request_uri("http://backend".. uri, {
method ="GET",
headers = {
["X-Cache-Refresh"] ="true"
}
})
ifresandres.status==200then
_M.save_to_redis(cache_key, res.body, res.headers)
end
end
function_M.save_to_redis(key, body, headers)
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifnotokthen
return
end
localcache_data = {
body = body,
content_type = headers["Content-Type"],
cached_at = ngx.now(),
ttl =300
}
red:setex("cache:".. key,300, cjson.encode(cache_data))
red:set_keepalive(10000,100)
end
return_M
3.2 连接池优化
# upstream连接池配置
upstreambackend {
# 使用keepalive保持长连接
keepalive256;
keepalive_requests1000;
keepalive_timeout60s;
# 动态服务器列表
server192.168.1.10:8080max_fails=2fail_timeout=10s;
server192.168.1.11:8080max_fails=2fail_timeout=10s;
server192.168.1.12:8080max_fails=2fail_timeout=10sbackup;
# 使用least_conn负载均衡算法
least_conn;
}
http{
# 优化连接配置
keepalive_timeout65;
keepalive_requests100;
# 优化代理设置
proxy_connect_timeout5s;
proxy_send_timeout60s;
proxy_read_timeout60s;
proxy_buffer_size32k;
proxy_buffers464k;
proxy_busy_buffers_size128k;
proxy_temp_file_write_size256k;
# 开启HTTP/2
http2_max_field_size16k;
http2_max_header_size32k;
# 上游连接池复用
proxy_http_version1.1;
proxy_set_headerConnection"";
}
3.3 内存管理优化
-- gateway/memory_manager.lua
local_M = {}
-- 定期清理过期缓存
function_M.cleanup_expired_cache()
localcache_dict = ngx.shared.routes_cache
localkeys = cache_dict:get_keys(0) -- 获取所有键
for_, keyinipairs(keys)do
localttl = cache_dict:ttl(key)
-- 清理即将过期的键
ifttlandttl < 10 then
cache_dict:delete(key)
end
end
end
-- 监控内存使用
function _M.monitor_memory()
local cache_list = {
"routes_cache",
"upstream_cache",
"permissions_cache",
"breaker_cache"
}
local memory_stats = {}
for _, cache_name in ipairs(cache_list) do
local cache = ngx.shared[cache_name]
if cache then
memory_stats[cache_name] = {
capacity = cache:capacity(),
free_space = cache:free_space()
}
end
end
-- 当内存使用超过80%时告警
for name, stats in pairs(memory_stats) do
local usage = (stats.capacity - stats.free_space) / stats.capacity
if usage >0.8then
ngx.log(ngx.WARN,string.format(
"Memory usage warning: %s is %.2f%% full",
name,
usage *100
))
-- 触发清理
_M.force_cleanup(name)
end
end
returnmemory_stats
end
-- 强制清理缓存
function_M.force_cleanup(cache_name)
localcache = ngx.shared[cache_name]
ifnotcachethen
return
end
-- 使用LRU策略清理
cache:flush_expired()
-- 如果还是不够,清理最旧的10%
localkeys = cache:get_keys(0)
localto_delete =math.floor(#keys *0.1)
fori =1, to_deletedo
cache:delete(keys[i])
end
end
-- 初始化定时任务
function_M.init_timers()
-- 每分钟清理一次
ngx.timer.every(60, _M.cleanup_expired_cache)
-- 每5分钟监控一次内存
ngx.timer.every(300, _M.monitor_memory)
end
return_M
四、监控告警:可观测性建设
4.1 日志采集方案
-- gateway/logger.lua
local_M = {}
localcjson =require"cjson"
-- 结构化日志
function_M.access_log()
locallog_data = {
-- 基础信息
timestamp = ngx.now(),
request_id = ngx.var.request_id,
-- 请求信息
method = ngx.var.request_method,
uri = ngx.var.uri,
args = ngx.var.args,
host = ngx.var.host,
-- 客户端信息
client_ip = ngx.var.remote_addr,
user_agent = ngx.var.http_user_agent,
referer = ngx.var.http_referer,
-- 响应信息
status= ngx.var.status,
bytes_sent = ngx.var.bytes_sent,
request_time = ngx.var.request_time,
upstream_response_time = ngx.var.upstream_response_time,
-- 上游信息
upstream_addr = ngx.var.upstream_addr,
upstream_status = ngx.var.upstream_status,
-- 缓存信息
cache_status = ngx.var.upstream_cache_status,
-- 用户信息
user_id = ngx.ctx.userandngx.ctx.user.user_idornil,
-- 追踪信息
trace_id = ngx.var.http_x_trace_id,
span_id = ngx.var.http_x_span_id
}
-- 异步写入日志
_M.write_log(log_data)
-- 慢请求告警
iftonumber(ngx.var.request_time) >3then
_M.alert_slow_request(log_data)
end
-- 错误告警
iftonumber(ngx.var.status) >=500then
_M.alert_error(log_data)
end
end
-- 写入日志到Kafka
function_M.write_log(log_data)
localkafka_producer =require"resty.kafka.producer"
localbroker_list = {
{host ="127.0.0.1", port =9092}
}
localproducer = kafka_producer:new(broker_list, {
producer_type ="async",
batch_num =200,
batch_size =1048576,
max_buffering =50000
})
localok, err = producer:send("gateway-logs",nil, cjson.encode(log_data))
ifnotokthen
ngx.log(ngx.ERR,"Failed to send log to Kafka: ", err)
-- 降级写入本地文件
_M.write_local_log(log_data)
end
end
-- 本地日志备份
function_M.write_local_log(log_data)
localfile =io.open("/var/log/nginx/gateway_access.log","a+")
iffilethen
file:write(cjson.encode(log_data) .."
")
file:close()
end
end
-- 慢请求告警
function_M.alert_slow_request(log_data)
localalert = {
type="SLOW_REQUEST",
level ="WARNING",
service ="api-gateway",
message =string.format(
"Slow request detected: %s %s took %.2fs",
log_data.method,
log_data.uri,
log_data.request_time
),
details = log_data,
timestamp = ngx.now()
}
_M.send_alert(alert)
end
-- 发送告警
function_M.send_alert(alert)
localhttpc =require("resty.http").new()
-- 发送到告警平台
ngx.timer.at(0,function()
httpc:request_uri("http://alert-system/api/alerts", {
method ="POST",
body = cjson.encode(alert),
headers = {
["Content-Type"] ="application/json"
}
})
end)
end
return_M
4.2 Metrics采集
-- gateway/metrics.lua
local_M = {}
localprometheus =require"nginx.prometheus"
-- 初始化Prometheus metrics
function_M.init()
prometheus.init("prometheus_metrics")
-- 定义metrics
_M.request_count = prometheus:counter(
"gateway_requests_total",
"Total number of requests",
{"method","path","status"}
)
_M.request_duration = prometheus:histogram(
"gateway_request_duration_seconds",
"Request duration in seconds",
{"method","path"}
)
_M.upstream_duration = prometheus:histogram(
"gateway_upstream_duration_seconds",
"Upstream response time in seconds",
{"upstream","method","path"}
)
_M.active_connections = prometheus:gauge(
"gateway_active_connections",
"Number of active connections"
)
_M.rate_limit_hits = prometheus:counter(
"gateway_rate_limit_hits_total",
"Number of rate limit hits",
{"client","rule"}
)
_M.circuit_breaker_state = prometheus:gauge(
"gateway_circuit_breaker_state",
"Circuit breaker state (0=closed, 1=open, 2=half-open)",
{"service"}
)
end
-- 记录请求metrics
function_M.log()
localmethod = ngx.var.request_method
localpath= ngx.var.uri
localstatus= ngx.var.status
-- 请求计数
_M.request_count:inc(1, {method,path,status})
-- 请求耗时
localrequest_time =tonumber(ngx.var.request_time)or0
_M.request_duration:observe(request_time, {method,path})
-- 上游耗时
localupstream_time =tonumber(ngx.var.upstream_response_time)or0
localupstream = ngx.var.upstream_addror"unknown"
_M.upstream_duration:observe(upstream_time, {upstream, method,path})
-- 活跃连接数
_M.active_connections:set(ngx.var.connections_active)
end
-- 暴露metrics端点
function_M.collect()
prometheus:collect()
end
return_M
4.3 健康检查
# 健康检查配置
upstreambackend {
server192.168.1.10:8080;
server192.168.1.11:8080;
# 主动健康检查
checkinterval=3000rise=2fall=3timeout=1000type=http;
check_http_send"GET /health HTTP/1.0
";
check_http_expect_alivehttp_2xx http_3xx;
}
server{
# 网关自身健康检查端点
location/health {
access_logoff;
content_by_lua_block{
localhealth = require"gateway.health"
health.check()
}
}
# Prometheus metrics端点
location /metrics {
access_logoff;
content_by_lua_block{
localmetrics = require"gateway.metrics"
metrics.collect()
}
}
# 上游健康状态页面
location /upstream_status {
access_logoff;
check_status;
access_by_lua_block{
-- 简单的IP白名单
localallowed_ips = {
["127.0.0.1"] = true,
["10.0.0.0/8"] =true
}
local client_ip = ngx.var.remote_addr
if not allowed_ips[client_ip] then
ngx.exit(403)
end
}
}
}
对应的健康检查Lua模块:
-- gateway/health.lua
local_M = {}
function_M.check()
localchecks = {}
localhealthy =true
-- 检查Redis连接
localredis_health = _M.check_redis()
checks.redis = redis_health
healthy = healthyandredis_health.healthy
-- 检查上游服务
localupstream_health = _M.check_upstreams()
checks.upstreams = upstream_health
healthy = healthyandupstream_health.healthy
-- 检查内存使用
localmemory_health = _M.check_memory()
checks.memory = memory_health
healthy = healthyandmemory_health.healthy
-- 返回结果
localstatus= healthyand200or503
ngx.status=status
ngx.header["Content-Type"] ="application/json"
ngx.say(cjson.encode({
status= healthyand"UP"or"DOWN",
timestamp = ngx.now(),
checks = checks
}))
end
function_M.check_redis()
localredis =require"resty.redis"
localred = redis:new()
red:set_timeout(1000)
localok, err = red:connect("127.0.0.1",6379)
ifnotokthen
return{
healthy =false,
message ="Redis connection failed: ".. err
}
end
-- 测试读写
localres = red:ping()
red:set_keepalive(10000,100)
return{
healthy = res =="PONG",
message = res =="PONG"and"Redis is healthy"or"Redis ping failed"
}
end
function_M.check_upstreams()
localupstream_cache = ngx.shared.upstream_cache
localall_upstreams = upstream_cache:get("all_upstreams")
ifnotall_upstreamsthen
return{
healthy =false,
message ="No upstreams configured"
}
end
localupstreams = cjson.decode(all_upstreams)
localhealthy_count =0
localtotal_count =0
forname, serversinpairs(upstreams)do
for_, serverinipairs(servers)do
total_count = total_count +1
localstats_key ="stats:".. server.host ..":".. server.port
localstats = upstream_cache:get(stats_key)
ifstatsthen
stats = cjson.decode(stats)
ifstats.error_rate < 0.1 then -- 错误率小于10%
healthy_count = healthy_count + 1
end
end
end
end
local health_ratio = healthy_count / total_count
return {
healthy = health_ratio >0.5, -- 超过50%健康即可
message =string.format("%d/%d upstreams healthy", healthy_count, total_count),
details = {
healthy = healthy_count,
total = total_count,
ratio = health_ratio
}
}
end
function_M.check_memory()
localmemory_manager =require"gateway.memory_manager"
localstats = memory_manager.monitor_memory()
localmax_usage =0
forname, statinpairs(stats)do
localusage = (stat.capacity - stat.free_space) / stat.capacity
ifusage > max_usagethen
max_usage = usage
end
end
return{
healthy = max_usage < 0.9, -- 内存使用率小于90%
message = string.format("Memory usage: %.2f%%", max_usage * 100),
details = stats
}
end
return _M
五、高可用部署方案
5.1 多活架构
# docker-compose.yml
version:'3.8'
services:
# 网关节点1
gateway-1:
image:openresty/openresty:alpine
container_name:gateway-1
volumes:
-./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
-./lua:/usr/local/openresty/lualib/gateway
-./logs/gateway-1:/var/log/nginx
ports:
-"8080:80"
environment:
-GATEWAY_NODE_ID=node-1
-REDIS_HOST=redis
-CONSUL_HOST=consul
depends_on:
-redis
-consul
networks:
-gateway-network
deploy:
resources:
limits:
cpus:'2'
memory:2G
reservations:
cpus:'1'
memory:1G
# 网关节点2
gateway-2:
image:openresty/openresty:alpine
container_name:gateway-2
volumes:
-./conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf
-./lua:/usr/local/openresty/lualib/gateway
-./logs/gateway-2:/var/log/nginx
ports:
-"8081:80"
environment:
-GATEWAY_NODE_ID=node-2
-REDIS_HOST=redis
-CONSUL_HOST=consul
depends_on:
-redis
-consul
networks:
-gateway-network
deploy:
resources:
limits:
cpus:'2'
memory:2G
# Keepalived + HAProxy实现高可用
haproxy:
image:haproxy:2.4-alpine
container_name:haproxy
volumes:
-./conf/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
ports:
-"80:80"
-"443:443"
-"8404:8404" # Stats页面
depends_on:
-gateway-1
-gateway-2
networks:
-gateway-network
# Redis集群
redis:
image:redis:6-alpine
container_name:redis
command:redis-server--appendonlyyes
volumes:
-redis-data:/data
ports:
-"6379:6379"
networks:
-gateway-network
# Consul服务发现
consul:
image:consul:1.10
container_name:consul
command:agent-server-bootstrap-expect=1-ui-client=0.0.0.0
ports:
-"8500:8500"
-"8600:8600/udp"
networks:
-gateway-network
# Prometheus监控
prometheus:
image:prom/prometheus:latest
container_name:prometheus
volumes:
-./conf/prometheus.yml:/etc/prometheus/prometheus.yml
-prometheus-data:/prometheus
ports:
-"9090:9090"
command:
-'--config.file=/etc/prometheus/prometheus.yml'
-'--storage.tsdb.path=/prometheus'
networks:
-gateway-network
# Grafana可视化
grafana:
image:grafana/grafana:latest
container_name:grafana
volumes:
-grafana-data:/var/lib/grafana
-./conf/grafana/dashboards:/etc/grafana/provisioning/dashboards
-./conf/grafana/datasources:/etc/grafana/provisioning/datasources
ports:
-"3000:3000"
environment:
-GF_SECURITY_ADMIN_PASSWORD=admin
networks:
-gateway-network
networks:
gateway-network:
driver:bridge
volumes:
redis-data:
prometheus-data:
grafana-data:
5.2 灰度发布
-- gateway/canary.lua local_M = {} -- 灰度发布策略 function_M.route_canary() localuri = ngx.var.uri localheaders = ngx.req.get_headers() -- 策略1:基于Header的灰度 ifheaders["X-Canary"] =="true"then return_M.get_canary_upstream() end -- 策略2:基于Cookie的灰度 localcookie_canary = ngx.var.cookie_canary ifcookie_canary =="true"then return_M.get_canary_upstream() end -- 策略3:基于用户ID的灰度 localuser = ngx.ctx.user ifuserand_M.is_canary_user(user.user_id)then return_M.get_canary_upstream() end -- 策略4:基于流量百分比的灰度 localcanary_percentage = _M.get_canary_percentage(uri) ifcanary_percentage >0then localrandom=math.random(100) ifrandom<= canary_percentage then return _M.get_canary_upstream() end end -- 默认返回稳定版本 return _M.get_stable_upstream() end -- 判断是否为灰度用户 function _M.is_canary_user(user_id) local canary_users = ngx.shared.canary_cache:get("canary_users") if not canary_users then return false end canary_users = cjson.decode(canary_users) -- 支持用户列表 for _, id in ipairs(canary_users) do if id == user_id then return true end end -- 支持用户ID范围 local user_id_num = tonumber(user_id) if user_id_num and user_id_num % 100 < 10 then -- 10%的用户 return true end return false end -- 获取灰度百分比 function _M.get_canary_percentage(uri) local canary_rules = ngx.shared.canary_cache:get("canary_rules") if not canary_rules then return 0 end canary_rules = cjson.decode(canary_rules) for _, rule in ipairs(canary_rules) do if ngx.re.match(uri, rule.pattern) then return rule.percentage end end return 0 end -- 获取灰度上游 function _M.get_canary_upstream() ngx.header["X-Canary-Version"] = "canary" return "canary_backend" end -- 获取稳定版上游 function _M.get_stable_upstream() ngx.header["X-Canary-Version"] = "stable" return "stable_backend" end -- 灰度发布控制API function _M.control_api() local method = ngx.var.request_method local uri = ngx.var.uri if uri == "/api/canary/percentage" then if method == "GET" then _M.get_percentage_api() elseif method == "POST" then _M.set_percentage_api() end elseif uri == "/api/canary/users" then if method == "GET" then _M.get_users_api() elseif method == "POST" then _M.set_users_api() end end end return _M
六、实战案例与性能数据
6.1 性能测试结果
在我们的生产环境中,经过优化后的网关性能数据:
| 指标 | 数值 | 测试条件 |
|---|---|---|
| QPS | 100,000+ | 8核16G单节点 |
| P99延迟 | < 10ms | 不包含业务处理时间 |
| P95延迟 | < 5ms | 不包含业务处理时间 |
| CPU使用率 | 40-60% | 高峰期 |
| 内存使用 | 2-4GB | 含缓存 |
| 连接数 | 50,000+ | 并发连接 |
6.2 故障处理案例
Case 1: 下游服务雪崩
• 问题:某个核心服务故障,导致大量请求堆积
• 解决:熔断器自动开启,返回降级响应,避免故障扩散
• 效果:整体可用性保持99.9%
Case 2: DDoS攻击
• 问题:遭受每秒百万级请求攻击
• 解决:多层限流+IP黑名单自动封禁
• 效果:业务完全无感知
七、踩坑总结与最佳实践
7.1 踩过的坑
1.Nginx reload导致连接断开
• 问题:配置更新需要reload,导致连接中断
• 解决:使用动态配置,避免reload
2.内存泄漏问题
• 问题:Lua脚本内存泄漏
• 解决:正确使用连接池,及时清理变量
3.DNS解析缓存
• 问题:上游服务IP变更后无法及时感知
• 解决:配置resolver和合理的DNS缓存时间
7.2 最佳实践建议
1.渐进式改造:不要一次性改造所有功能,分阶段实施
2.充分测试:压测、故障演练必不可少
3.监控先行:完善的监控是稳定性的基础
4.文档完善:维护详细的运维文档和故障处理手册
5.定期演练:定期进行故障演练,验证高可用方案
总结
基于Nginx构建微服务网关是一个系统工程,需要在架构设计、功能实现、性能优化、高可用等多个方面深入思考和实践。本文分享的方案和代码都经过生产环境验证,希望能给大家一些参考。
网关作为微服务架构的咽喉要道,其重要性不言而喻。一个设计良好的网关不仅能提供统一的入口,还能大幅简化微服务的复杂度,提升整体系统的可维护性。
-
网关
+关注
关注
9文章
6429浏览量
55567 -
nginx
+关注
关注
0文章
180浏览量
12966 -
微服务
+关注
关注
0文章
147浏览量
8049
原文标题:基于Nginx的微服务网关实现与最佳实践:从零打造企业级API Gateway
文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。
发布评论请先 登录
基于STM32F的智能家居服务网关设计
微服务网关gateway的相关资料推荐
性能提升1倍,成本直降50%!基于龙蜥指令加速的下一代云原生网关
面向数控设备的WEB服务网关
面向数控设备的WEB服务网关
面向数控设备的WEB服务网关
基于社交网络和关联数据的服务网络构建方法
使用FastAPI构建机器学习微服务
Spring Cloud Gateway服务网关的部署与使用详细教程
基于Traefik自研的微服务网关
Spring Cloud :打造可扩展的微服务网关
如何构建弹性、高可用的微服务?

如何基于Nginx构建微服务网关
评论