侧边栏壁纸
  • 累计撰写 39 篇文章
  • 累计创建 1 个标签
  • 累计收到 3 条评论
标签搜索

Apisix 源码分析以及手撸一个简单版本的 Apisix

mousycoder
2022-01-20 / 0 评论 / 0 点赞 / 264 阅读 / 20,423 字
温馨提示:
本文最后更新于 2022-01-20,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

APISIX 概述

APISIX 与 Kong 类似,是一个基于 OpenResty 构建的 API 网关,APISIX 不同于 Kong 的地方,例如 etcd 数据变化监听、强大的缓存机制、以及在性能优化上做的尝试。

项目概述

APISIX 是基于 OpenResty 开发的 API 网关,与 OpenResty 的请求生命周期一致,APISIX 利用 Lua Nginx Module 提供的 *_by_lua 添加 Hook
QbKIVf
APISIX 抽象了 Route、Service、Upstream、Plugin、Consumer 等数据模型,与 Kong 网关如出一辙。
cczXLs
YunX6x
基本上可以看作 APISIX 是 Kong 网关的重构——运用大量 LuaJIT、OpenResty 技巧优化性能、简化复杂的数据结构、替换储存引擎为 etcd 等。

生态概述

Kong 网关开源生态有的,APISIX 基本都有或者正在做。包含:Kubernetes Ingress Controller、Mesh、Dashboard。
插件方面比 Kong 开源版本多了 Skywalking APM 数据上报、Traffit 流量拆分、Mirror 流量镜像等功能。

阅读源码目的

  1. 对一个项目有着全局思维,遇到问题能快速找到关联点
  2. 克服心里障碍,遇到坑后查阅各种资料无果

什么时候开始阅读源码

对这个开源项目使用已经很熟练了,用来解决什么问题,设计思想都比较清楚的前提下

源码分析方法论

  1. 整体模块结构
  2. 找到启动类
  3. 分清主干和枝节代码
  4. 充分利用源码项目的测试类
  5. 要学会一些调试技巧
  6. 看 git 提交记录,寻找最小版 demo
  7. 带着问题去思考
  8. 和开发人员直接沟通

基本流程

本节概述 APISIX 的目录结构,以及其启动流程。

目录结构

$ tree -L 2
.
├── apisix
│   ├── admin # Admin API
│   ├── api_router.lua
│   ├── balancer # 负载均衡器
│   ├── balancer.lua
│   ├── cli # CLI, Lua 脚本
│   ├── constants.lua # 常量
│   ├── consumer.lua
│   ├── control
│   ├── core # 主要是封装的公共方法
│   ├── core.lua
│   ├── debug.lua
│   ├── discovery # 服务发现, 支持 consul, eruka, dns
│   ├── http
│   ├── init.lua # _by_lua 函数入口
│   ├── patch.lua
│   ├── plugin_config.lua
│   ├── plugin.lua # 插件
│   ├── plugins
│   ├── router.lua # Router
│   ├── schema_def.lua # jsonschema 定义
│   ├── script.lua
│   ├── ssl
│   ├── ssl.lua
│   ├── stream
│   ├── timers.lua # timer 封装
│   ├── upstream.lua
│   └── utils
├── bin
│   └── apisix # apisix CLI, shell 脚本
├── ci # CI 脚本
├── conf # 默认配置文件
├── deps
├── docs
├── Makefile # 快捷指令
├── rockspec # luarocks 包管理
├── t # Test::Nginx 测试
└── utils # Shell 脚本

启动流程

  • make run 调用 ./bin/apisix start
  • 寻找 luajit 路径 运行 /usr/local/Cellar/openresty/1.19.9.1_2/luajit/bin/luajit ./apisix/cli/apisix.lua start
  • 调用 ops.lua 里的 start 方法,初始化配置,ETCD,执行openresty 启动命令
local function start(env, ...)
    init(env)
    init_etcd(env, args)
    util.execute_cmd(env.openresty_args)
end
  • 初始化 nginx 配置,通过读取 conf/config.yaml 配合模板 ngx_tpl.lua 生成 nginx config 文件。供 openresty(nginx)使用
 local conf_render = template.compile(ngx_tpl)
    local ngxconf = conf_render(sys_conf)

    local ok, err = util.write_file(env.apisix_home .. "/conf/nginx.conf",
                                    ngxconf)
    if not ok then
        util.die("failed to update nginx.conf: ", err, "\n")
    end
  • 初始化 ETCD,读取ETCD集群配置,进行连接
local version_url = host .. "/version"
        local errmsg

        local res, err
        local retry_time = 0
        while retry_time < 2 do
            res, err = request(version_url, yaml_conf)
            -- In case of failure, request returns nil followed by an error message.
            -- Else the first return value is the response body
            -- and followed by the response status code.
            if res then
                break
            end
            retry_time = retry_time + 1
            print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
                             version_url, err, retry_time))
        end
  • 执行openresty 启动命令 openresty -p /usr/local/apisix -c /conf/nginx.conf
    local openresty_args = [[openresty -p ]] .. apisix_home .. [[ -c ]]
                           .. apisix_home .. [[/conf/nginx.conf]]

    util.execute_cmd(env.openresty_args)

插件流程

nginx 配置中嵌入的 apisix 流程

 init_by_lua_block {
        require "resty.core"
        apisix = require("apisix")

        local dns_resolver = { "172.19.2.70", "172.19.2.62", }
        local args = {
            dns_resolver = dns_resolver,
        }
        apisix.http_init(args)
}
    
init_worker_by_lua_block {
        apisix.http_init_worker()
}

exit_worker_by_lua_block {
        apisix.http_exit_worker()
}

access_by_lua_block {
                apisix.http_access_phase()
}
header_filter_by_lua_block {
                apisix.http_header_filter_phase()
}
body_filter_by_lua_block {
                apisix.http_body_filter_phase()
}
log_by_lua_block {
                apisix.http_log_phase()
}

proxy_pass      $upstream_scheme://apisix_backend$upstream_uri;

upstream apisix_backend {
        server 0.0.0.1;

        balancer_by_lua_block {
            apisix.http_balancer_phase()
        }

        keepalive 320;
        keepalive_requests 1000;
        keepalive_timeout 60s;
}



  • apisix.http_init
    1.设置 dns resolver
    2.设置实例id
    3.启动 privileged agent
    core.resolver.init_resolver(args)
    core.id.init()
    local process = require("ngx.process")
    local ok, err = process.enable_privileged_agent()
  • apisix.http_init_worker
function _M.http_init_worker()
    local seed, err = core.utils.get_seed_from_urandom()
    if not seed then
        core.log.warn('failed to get seed from urandom: ', err)
        seed = ngx_now() * 1000 + ngx.worker.pid()
    end
    math.randomseed(seed)
    -- for testing only
    core.log.info("random test in [1, 10000]: ", math.random(1, 10000))

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event: " .. err)
    end
    local discovery = require("apisix.discovery.init").discovery
    if discovery and discovery.init_worker then
        discovery.init_worker()
    end
    require("apisix.balancer").init_worker()
    load_balancer = require("apisix.balancer")
    require("apisix.admin.init").init_worker()

    require("apisix.timers").init_worker()

    plugin.init_worker()
    router.http_init_worker()
    require("apisix.http.service").init_worker()
    plugin_config.init_worker()
    require("apisix.consumer").init_worker()

    if core.config == require("apisix.core.config_yaml") then
        core.config.init_worker()
    end

    require("apisix.debug").init_worker()
    apisix_upstream.init_worker()
    require("apisix.plugins.ext-plugin.init").init_worker()

    local_conf = core.config.local_conf()

    if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
        ver_header = "APISIX"
    end
end

1.初始化 openresty worker event

    local we = require("resty.worker.events")
    local ok, err = we.configure({shm = "worker-events", interval = 0.1})
    if not ok then
        error("failed to init worker event: " .. err)
    end


2.初始化服务发现
3.初始化balancer组件
4.初始化admin组件(会同步一次插件配置到 etcd)

     sync_local_conf_to_etcd(true)

5.初始化timers组件
6.初始化 plugin 组件(清掉旧的 table, 然后把从config-default.yaml 和 config.yaml 文件中读取插件配置放到一个 local_plugins_hash中,并按优先级排序)

local local_conf_path = profile:yaml_path("config-default")
local default_conf_yaml, err = util.read_file(local_conf_path)
local_conf_path = profile:yaml_path("config")
local user_conf_yaml, err = util.read_file(local_conf_path)
 ok, err = merge_conf(default_conf, user_conf)


 local local_plugins = core.table.new(32, 0)

  for name in pairs(local_plugins_hash) do
        unload_plugin(name)
    end

    core.table.clear(local_plugins)
    core.table.clear(local_plugins_hash)

    for name in pairs(processed) do
        load_plugin(name, local_plugins)
    end

    -- sort by plugin's priority
    if #local_plugins > 1 then
        sort_tab(local_plugins, sort_plugin)
    end

    local plugin_metadatas, err = core.config.new("/plugin_metadata",
        {automatic = true}
    )

7.初始化router组件(初始化 etcd /global_rules 数据)

    local global_rules, err = core.config.new("/global_rules", {
            automatic = true,
            item_schema = core.schema.global_rule,
            checker = plugin_checker,
        })

8.初始化 servers 组件(从ETCD抓取 services 配置)

services, err = core.config.new("/services", {
        automatic = true,
        item_schema = core.schema.service,
        checker = plugin_checker,
        filter = filter,
    })

9.初始化sonsumer组件
10.同步 config_yaml 到各个进程

    -- sync data in each non-master process
    ngx.timer.every(1, read_apisix_yaml)

11.初始化upstream组件
12.初始化 ext-plugin 组件

  • apisix.http_exit_worker()
    停止 "privileged agent"
  • apisix.http_access_phase()
function _M.http_access_phase()
    local ngx_ctx = ngx.ctx

    if not verify_tls_client(ngx_ctx.api_ctx) then
        return core.response.exit(400)
    end

    -- always fetch table from the table pool, we don't need a reused api_ctx
    local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
    ngx_ctx.api_ctx = api_ctx

    core.ctx.set_vars_meta(api_ctx)

    local uri = api_ctx.var.uri
    if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then
        if str_byte(uri, #uri) == str_byte("/") then
            api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1)
            core.log.info("remove the end of uri '/', current uri: ",
                          api_ctx.var.uri)
        end
    end

    if router.api.has_route_not_under_apisix() or
        core.string.has_prefix(uri, "/apisix/")
    then
        local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api
        local matched = router.api.match(api_ctx, skip)
        if matched then
            return
        end
    end

    router.router_http.match(api_ctx)

    local route = api_ctx.matched_route
    if not route then
        -- run global rule
        plugin.run_global_rules(api_ctx, router.global_rules, nil)

        core.log.info("not find any matched route")
        return core.response.exit(404,
                    {error_msg = "404 Route Not Found"})
    end

    core.log.info("matched route: ",
                  core.json.delay_encode(api_ctx.matched_route, true))

    local enable_websocket = route.value.enable_websocket

    if route.value.plugin_config_id then
        local conf = plugin_config.get(route.value.plugin_config_id)
        if not conf then
            core.log.error("failed to fetch plugin config by ",
                            "id: ", route.value.plugin_config_id)
            return core.response.exit(503)
        end

        route = plugin_config.merge(route, conf)
    end

    if route.value.service_id then
        local service = service_fetch(route.value.service_id)
        if not service then
            core.log.error("failed to fetch service configuration by ",
                           "id: ", route.value.service_id)
            return core.response.exit(404)
        end

        route = plugin.merge_service_route(service, route)
        api_ctx.matched_route = route
        api_ctx.conf_type = "route&service"
        api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex
        api_ctx.conf_id = route.value.id .. "&" .. service.value.id
        api_ctx.service_id = service.value.id
        api_ctx.service_name = service.value.name

        if enable_websocket == nil then
            enable_websocket = service.value.enable_websocket
        end

    else
        api_ctx.conf_type = "route"
        api_ctx.conf_version = route.modifiedIndex
        api_ctx.conf_id = route.value.id
    end
    api_ctx.route_id = route.value.id
    api_ctx.route_name = route.value.name

    -- run global rule
    plugin.run_global_rules(api_ctx, router.global_rules, nil)

    if route.value.script then
        script.load(route, api_ctx)
        script.run("access", api_ctx)

    else
        local plugins = plugin.filter(route)
        api_ctx.plugins = plugins

        plugin.run_plugin("rewrite", plugins, api_ctx)
        if api_ctx.consumer then
            local changed
            route, changed = plugin.merge_consumer_route(
                route,
                api_ctx.consumer,
                api_ctx
            )

            core.log.info("find consumer ", api_ctx.consumer.username,
                          ", config changed: ", changed)

            if changed then
                api_ctx.matched_route = route
                core.table.clear(api_ctx.plugins)
                api_ctx.plugins = plugin.filter(route, api_ctx.plugins)
            end
        end
        plugin.run_plugin("access", plugins, api_ctx)
    end

    local up_id = route.value.upstream_id

    -- used for the traffic-split plugin
    if api_ctx.upstream_id then
        up_id = api_ctx.upstream_id
    end

    if up_id then
        local upstream = get_upstream_by_id(up_id)
        api_ctx.matched_upstream = upstream

    else
        if route.has_domain then
            local err
            route, err = parse_domain_in_route(route)
            if err then
                core.log.error("failed to get resolved route: ", err)
                return core.response.exit(500)
            end

            api_ctx.conf_version = route.modifiedIndex
            api_ctx.matched_route = route
        end

        local route_val = route.value
        if route_val.upstream and route_val.upstream.enable_websocket then
            enable_websocket = true
        end

        api_ctx.matched_upstream = (route.dns_value and
                                    route.dns_value.upstream)
                                   or route_val.upstream
    end

    if enable_websocket then
        api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade
        api_ctx.var.upstream_connection = api_ctx.var.http_connection
        core.log.info("enabled websocket for route: ", route.value.id)
    end

    if route.value.service_protocol == "grpc" then
        api_ctx.upstream_scheme = "grpc"
    end

    local code, err = set_upstream(route, api_ctx)
    if code then
        core.log.error("failed to set upstream: ", err)
        core.response.exit(code)
    end

    local server, err = load_balancer.pick_server(route, api_ctx)
    if not server then
        core.log.error("failed to pick server: ", err)
        return core.response.exit(502)
    end

    api_ctx.picked_server = server

    set_upstream_headers(api_ctx, server)

    -- run the before_proxy method in access phase first to avoid always reinit request
    common_phase("before_proxy")

    local ref = ctxdump.stash_ngx_ctx()
    core.log.info("stash ngx ctx: ", ref)
    ngx_var.ctx_ref = ref

    local up_scheme = api_ctx.upstream_scheme
    if up_scheme == "grpcs" or up_scheme == "grpc" then
        return ngx.exec("@grpc_pass")
    end

    if api_ctx.dubbo_proxy_enabled then
        return ngx.exec("@dubbo_pass")
    end
end

1.初始化 api_ctx 上下文
2.client tls 验证
3.是否为 apisix 已经注册的路径,对请求进行匹配,内部使用 Radix tree 进行匹配
4.向上下文注入该请求匹配的 apisix route ,service 等信息用于后续阶段使用
5.向上下文注入该请求相关的插件,例如:请求对应的路由存在插件,若请求存在对应的 service 则加入 service 定义的插件,以及全局插件
6.调用 "rewrite" 阶段的插件
7.调用 "access" 阶段的插件
8.获取 upstream
9.执行 loadbalancer 选择 server
10.调用 "balancer" 阶段插件
11.判断 upstream ,根据 upstream 类型,grpc dubbo 等进入 @grpc_pass @dubbo_pass 等不同的后续处理流程。 这些配置可在 nginx.conf 中查看

  • apisix.http_balancer_phase()
function _M.http_header_filter_phase()
    if ngx_var.ctx_ref ~= '' then
        -- prevent for the table leak
        local stash_ctx = fetch_ctx()

        -- internal redirect, so we should apply the ctx
        if ngx_var.from_error_page == "true" then
            ngx.ctx = stash_ctx
        end
    end

    core.response.set_header("Server", ver_header)

    local up_status = get_var("upstream_status")
    if up_status and #up_status == 3
       and tonumber(up_status) >= 500
       and tonumber(up_status) <= 599
    then
        set_resp_upstream_status(up_status)
    elseif up_status and #up_status > 3 then
        -- the up_status can be "502, 502" or "502, 502 : "
        local last_status
        if str_byte(up_status, -1) == str_byte(" ") then
            last_status = str_sub(up_status, -6, -3)
        else
            last_status = str_sub(up_status, -3)
        end

        if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then
            set_resp_upstream_status(up_status)
        end
    end

    common_phase("header_filter")
end

1.设置头 "Server", APISIX
2.设置上游状态头:X-APISIX-Upstream-Status
3.执行 “header_filter” 阶段的插件

  • apisix.http_body_filter_phase()
function _M.http_body_filter_phase()
    common_phase("body_filter")
end

执行 “body_filter” 阶段的插件

  • apisix.http_log_phase()
function _M.http_log_phase()
    if ngx_var.ctx_ref ~= '' then
        -- prevent for the table leak
        local stash_ctx = fetch_ctx()

        -- internal redirect, so we should apply the ctx
        if ngx_var.from_error_page == "true" then
            ngx.ctx = stash_ctx
        end
    end

    local api_ctx = common_phase("log")
    if not api_ctx then
        return
    end

    healthcheck_passive(api_ctx)

    if api_ctx.server_picker and api_ctx.server_picker.after_balance then
        api_ctx.server_picker.after_balance(api_ctx, false)
    end

    if api_ctx.uri_parse_param then
        core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
    end

    core.ctx.release_vars(api_ctx)
    if api_ctx.plugins then
        core.tablepool.release("plugins", api_ctx.plugins)
    end

    if api_ctx.curr_req_matched then
        core.tablepool.release("matched_route_record", api_ctx.curr_req_matched)
    end

    core.tablepool.release("api_ctx", api_ctx)
end

1.执行 “log” 阶段的插件
2.回收 plugins, matched_route_record,api_ctx 缓存

  • apisix.http_balancer_phase()
function _M.http_balancer_phase()
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        core.log.error("invalid api_ctx")
        return core.response.exit(500)
    end

    load_balancer.run(api_ctx.matched_route, api_ctx, common_phase)
end

1.设置 balance 的基本参数(超时时间,连接失败重试次数)
2.选择上游服务器,采用一致性hash算法
3.调用 set_current_peer 设置 proxy_pass

自定义插件逻辑

EwBSkx
pEM0oa
XNMkUO
Y2PrOC
通过 common_phase 作为自定义插件的方法的公共入口,被 openresty 各个环节调用

local function common_phase(phase_name)
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        return
    end

    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

    if api_ctx.script_obj then
        script.run(phase_name, api_ctx)
        return api_ctx, true
    end

    return plugin.run_plugin(phase_name, nil, api_ctx)
end

真正执行自定义插件逻辑

 for i = 1, #plugins, 2 do
            local phase_func = plugins[i][phase]
            if phase_func then
                plugin_run = true
                local code, body = phase_func(plugins[i + 1], api_ctx)
                if code or body then
                    if is_http then
                        if code >= 400 then
                            core.log.warn(plugins[i].name, " exits with http status code ", code)
                        end

                        core.response.exit(code, body)
                    else
                        if code >= 400 then
                            core.log.warn(plugins[i].name, " exits with status code ", code)
                        end

                        ngx_exit(1)
                    end
                end
            end
        end

admin api

        location /apisix/admin {
            set $upstream_scheme             'http';
            set $upstream_host               $http_host;
            set $upstream_uri                '';

                allow 127.0.0.0/24;
                deny all;

            content_by_lua_block {
                apisix.http_admin()
            }
        }

自定义插件

入口

local function common_phase(phase_name)
    local api_ctx = ngx.ctx.api_ctx
    if not api_ctx then
        return
    end

    plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)

    if api_ctx.script_obj then
        script.run(phase_name, api_ctx)
        return api_ctx, true
    end

    return plugin.run_plugin(phase_name, nil, api_ctx)
end

已知的阶段
preread
ssl
access
balancer
rewrite
header_filter
body_filter
log

admin api

入口

        location /apisix/admin {
            set $upstream_scheme             'http';
            set $upstream_host               $http_host;
            set $upstream_uri                '';

                allow 127.0.0.0/24;
                deny all;

            content_by_lua_block {
                apisix.http_admin()
            }
        }

根据路由进行转发

    local ok = router:dispatch(get_var("uri"), {method = get_method()})

具体路由 dispatch 逻辑

function _M.dispatch(self, path, opts, ...)
    if type(path) ~= "string" then
        error("invalid argument path", 2)
    end

    local args
    local len = select('#', ...)
    if len > 0 then
        if not self.args then
            self.args = {...}
        else
            clear_tab(self.args)
            for i = 1, len do
                self.args[i] = select(i, ...)
            end
        end

        -- To keep the self.args in safe,
        -- we can't yield until filter_fun is called
        args = self.args
        args[0] = len
    end

    local route, err = match_route(self, path, opts or empty_table, args)
    if not route then
        if err then
            return nil, err
        end
        return nil
    end

    local handler = route.handler
    if not handler or type(handler) ~= "function" then
        return nil, "missing handler"
    end

    handler(...)
    return true
end

路由与hanlder 的关系

local uri_route = {
    {
        paths = [[/apisix/admin/*]],
        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
        handler = run,
    },
    {
        paths = [[/apisix/admin/stream_routes/*]],
        methods = {"GET", "PUT", "POST", "DELETE", "PATCH"},
        handler = run_stream,
    },
    {
        paths = [[/apisix/admin/plugins/list]],
        methods = {"GET"},
        handler = get_plugins_list,
    },
    {
        paths = reload_event,
        methods = {"PUT"},
        handler = post_reload_plugins,
    },
}

调用 run 方法


local function run()
    local api_ctx = {}
    core.ctx.set_vars_meta(api_ctx)
    ngx.ctx.api_ctx = api_ctx

    local ok, err = check_token(api_ctx)
    if not ok then
        core.log.warn("failed to check token: ", err)
        core.response.exit(401)
    end

    local uri_segs = core.utils.split_uri(ngx.var.uri)
    core.log.info("uri: ", core.json.delay_encode(uri_segs))

    -- /apisix/admin/schema/route
    local seg_res, seg_id = uri_segs[4], uri_segs[5]
    local seg_sub_path = core.table.concat(uri_segs, "/", 6)
    if seg_res == "schema" and seg_id == "plugins" then
        -- /apisix/admin/schema/plugins/limit-count
        seg_res, seg_id = uri_segs[5], uri_segs[6]
        seg_sub_path = core.table.concat(uri_segs, "/", 7)
    end

    local resource = resources[seg_res]
    if not resource then
        core.response.exit(404)
    end

    local method = str_lower(get_method())
    if not resource[method] then
        core.response.exit(404)
    end

    local req_body, err = core.request.get_body(MAX_REQ_BODY)
    if err then
        core.log.error("failed to read request body: ", err)
        core.response.exit(400, {error_msg = "invalid request body: " .. err})
    end

    if req_body then
        local data, err = core.json.decode(req_body)
        if not data then
            core.log.error("invalid request body: ", req_body, " err: ", err)
            core.response.exit(400, {error_msg = "invalid request body: " .. err,
                                     req_body = req_body})
        end

        req_body = data
    end

    local uri_args = ngx.req.get_uri_args() or {}
    if uri_args.ttl then
        if not tonumber(uri_args.ttl) then
            core.response.exit(400, {error_msg = "invalid argument ttl: "
                                                 .. "should be a number"})
        end
    end

    local code, data = resource[method](seg_id, req_body, seg_sub_path,
                                        uri_args)
    if code then
        data = strip_etcd_resp(data)
        core.response.exit(code, data)
    end
end

根据 resource 定义找到对应的 lua 模板的执行方法

local resources = {
    routes          = require("apisix.admin.routes"),
    services        = require("apisix.admin.services"),
    upstreams       = require("apisix.admin.upstreams"),
    consumers       = require("apisix.admin.consumers"),
    schema          = require("apisix.admin.schema"),
    ssl             = require("apisix.admin.ssl"),
    plugins         = require("apisix.admin.plugins"),
    proto           = require("apisix.admin.proto"),
    global_rules    = require("apisix.admin.global_rules"),
    stream_routes   = require("apisix.admin.stream_routes"),
    plugin_metadata = require("apisix.admin.plugin_metadata"),
    plugin_configs  = require("apisix.admin.plugin_config"),
}

执行对应模块的对应的方法,例如:
GET http://127.0.0.1:9080/apisix/admin/plugin_metadata/whalefin-i18n
就是直接调用 plugin_metadata 的 get 方法

function _M.get(key)

    local path = "/plugin_metadata"
    if key then
        path = path .. "/" .. key
    end

    local res, err = core.etcd.get(path, not key)
    if not res then
        core.log.error("failed to get metadata[", key, "]: ", err)
        return 503, {error_msg = err}
    end

    return res.status, res.body
end

control api

入口 apisix.http_control()

server {
        listen 127.0.0.1:9090;

        access_log off;

        location / {
            content_by_lua_block {
                apisix.http_control()
            }
        }

        location @50x.html {
            set $from_error_page 'true';
            content_by_lua_block {
                require("apisix.error_handling").handle_500()
            }
        }
    }

先注册所有的插件的 control_api 方法,调用 router:dispatch 进行路由分发

function _M.match(uri)
    if cached_version ~= plugin_mod.load_times then
        local err
        router, err = fetch_control_api_router()
        if router == nil then
            core.log.error("failed to fetch valid api router: ", err)
            return false
        end

        cached_version = plugin_mod.load_times
    end

    core.table.clear(match_opts)
    match_opts.method = get_method()

    return router:dispatch(uri, match_opts)
end

例如:server_info 插件,注册路径 /v1/server_info 并指定使用 get_server_info函数进行处理

function _M.control_api()
    return {
        {
            methods = {"GET"},
            uris ={"/v1/server_info"},
            handler = get_server_info,
        }
    }
end

注册 plugin的 control_api 方法

    for _, plugin in ipairs(plugin_mod.plugins) do
        local api_fun = plugin.control_api
        if api_fun then
            local api_route = api_fun()
            register_api_routes(routes, api_route)
        end
    end

通过dispatch 方法调用插件的 handler 方法

   local route, err = match_route(self, path, opts or empty_table, args)
    if not route then
        if err then
            return nil, err
        end
        return nil
    end

    local handler = route.handler
    if not handler or type(handler) ~= "function" then
        return nil, "missing handler"
    end

    handler(...)
0

评论区