继上一篇介绍了skynet的网络部分之后,这一篇以网关gate.lua为例,简单分析下其串接和处理流程。

在官方给出的范例中,是以examples/main.lua作为启动脚本的,在此过程中会创建watchdog服务:

     local watchdog = skynet.newservice("watchdog")
skynet.call(watchdog, "lua", "start", {
port = ,
maxclient = max_client,
nodelay = true,
})

首先加载watchdog.lua脚本。而在watchdog.lua的加载过程中,创建了gate服务。加载gate.lua过程中,调用gateserver.start(gate),gateserver会向skynet注册socket协议的处理:

     skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
unpack = function ( msg, sz )
return netpack.filter( queue, msg, sz)
end,
dispatch = function (_, _, q, type, ...)
queue = q
if type then
MSG[type](...)
end
end
}

另外gateserver也会注册lua协议的处理,这里就不展开了。gateserver中会拦截skynet_socket_message;也会拦截部分lua消息(一般是由watchdog转发而来),并调用gate注册进来的回调。注意gateserver才是skynet消息的入口,gate只不过是个回调而已。至此,gate服务加载完毕。

watchdog服务加载完毕后,main.lua中接着调用watchdog的start方法,其参数分别指定了侦听的端口、最大客户端连接数、是否延迟等。看下watchdog的start方法:

 function CMD.start(conf)
skynet.call(gate, "lua", "open" , conf)
end

其紧接着调用gate的open方法,而这个方法在gateserver中被拦截了:

     function CMD.open( source, conf )
assert(not socket)
local address = conf.address or "0.0.0.0"
local port = assert(conf.port)
maxclient = conf.maxclient or
nodelay = conf.nodelay
skynet.error(string.format("Listen on %s:%d", address, port))
socket = socketdriver.listen(address, port)
socketdriver.start(socket)
if handler.open then
return handler.open(source, conf)
end
end

可以看到,在open方法中创建了socket并开始了侦听过程。回忆上篇,socket操作的LuaAPI作为socketdriver被实现在lua-socket.c文件中,看一眼这里的listen是如何交互的:

 static int
llisten(lua_State *L) {
const char * host = luaL_checkstring(L,);
int port = luaL_checkinteger(L,);
int backlog = luaL_optinteger(L,,BACKLOG);
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex());
int id = skynet_socket_listen(ctx, host,port,backlog);
if (id < ) {
return luaL_error(L, "Listen error");
} lua_pushinteger(L,id);
return ;
}

析取参数,获取关联的skynet-context之后,调用skynet_socket.c的skynet_socket_listen:

 int
skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) {
uint32_t source = skynet_context_handle(ctx);
return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);
}

拿到context-handle,这个handle在后续创建socket时会被关联起来。handle作为参数opaque传递入socket_server.c中的socket_server_listen方法中:

 int
socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
int fd = do_listen(addr, port, backlog);
if (fd < ) {
return -;
}
struct request_package request;
int id = reserve_id(ss);
if (id < ) {
close(fd);
return id;
}
request.u.listen.opaque = opaque;
request.u.listen.id = id;
request.u.listen.fd = fd;
send_request(ss, &request, 'L', sizeof(request.u.listen));
return id;
}

在作了bind和listen之后,将此socket描述符打包为request写入socket_server的读管道,由socket_server_poll轮循处理。至此,gate服务中listen的流程已经非常清晰了。

再回到gateserver.CMD.open方法中,在socketdriver.listen之后,紧接着调用socketdriver.start开始网络事件的处理(具体细节请按上述流程参照源码),最后调用gate.lua的回调handler.open。在这个过程中,gate服务的skynet-context-handle已经与对应的socket绑定了,后续关于此socket的SOCKET消息都会转发到gate服务中来,具体到代码则是由gateserver接收过滤后,再做进一步的分发处理。

那么一个新的客户端连接是如何被接收创建的呢?

在socket_server_poll过程中,会检查是否有网络事件产生(以下是简化的代码):

 int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
// 管道select
...... // socket event check
...... // s: socket
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
return report_connect(ss, s, result);
case SOCKET_TYPE_LISTEN: {
int ok = report_accept(ss, s, result);
if (ok > ) { return SOCKET_ACCEPT; }
17 if (ok < ) { return SOCKET_ERROR; }
// when ok == 0, retry
break;
}
23 // other
24 ......25 }
26 }

对于正在listen的socket(SOCKET_TYPE_LISTEN),当发生事件(即侦测到有新的连接)时,会在report_accept中accept得连接描述符fd并创建socket结构,然后返回SOCKET_ACCEPT交由上层的skynet_socket.c:skynet_socket_poll处理,后者会封装类型为SKYNET_SOCKET_TYPE_ACCEPT的skynet-message并推入到gate服务的队列中去,最终转发到gateserver中所注册的SOCKET协议入口。

回到gateserver中来(见上述gateserver摘录的代码),在接收到网络消息时,先是在unpack中通过netpack(见lua-netpack.c)合并过滤消息,比如TCP消息粘包等(注意skynet_socket_message如果其padding为true,则表示非数据的命令,比如SKYNET_SOCKET_TYPE_ACCEPT)。对于SKYNET_SOCKET_TYPE_ACCEPT命令,netpack会解析并转换为open命令,最后gateserver会调用到MSG.open:

     function MSG.open(fd, msg)
if client_number >= maxclient then
socketdriver.close(fd)
return
end
if nodelay then
socketdriver.nodelay(fd)
end
connection[fd] = true
client_number = client_number +
handler.connect(fd, msg)
end

回调到gate.lua中的connect:

 function handler.connect(fd, addr)
local c = {
fd = fd,
ip = addr,
}
connection[fd] = c
skynet.send(watchdog, "lua", "socket", "open", fd, addr)
end

watchdog.lua中的SOCKET.open:

 function SOCKET.open(fd, addr)
skynet.error("New client from : " .. addr)
agent[fd] = skynet.newservice("agent")
skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
end

此时,会创建玩家agent服务,并调用start方法:

 function CMD.start(conf)
local fd = conf.client
local gate = conf.gate
WATCHDOG = conf.watchdog
-- slot 1,2 set at main.lua
host = sprotoloader.load():host "package"
send_request = host:attach(sprotoloader.load())
skynet.fork(function()
while true do
send_package(send_request "heartbeat")
skynet.sleep()
end
end) client_fd = fd
skynet.call(gate, "lua", "forward", fd)
end

agent拿到gate服务的标识后,调用forward将自己的标识注册到gate服务中来:

 function CMD.forward(source, fd, client, address)
local c = assert(connection[fd])
unforward(c)
c.client = client or
c.agent = address or source
forwarding[c.agent] = c
gateserver.openclient(fd)
end

gateserver.openclient开始侦听此socket的事件:

function gateserver.openclient(fd)
if connection[fd] then
socketdriver.start(fd)
end
end

至此,一个新连接的建立流程就结束了。那么连接建立后网络数据又是如何转发进来的呢?流程依然是一致的,socket_server_poll侦测到READ后读取数据并转发到gateserver中来,后者调用netpack对数据粘包,此时会调用MSG.more或MSG.data将数据转交给gate.message:

 function handler.message(fd, msg, sz)
-- recv a package, forward it
local c = connection[fd]
local agent = c.agent
if agent then
skynet.redirect(agent, c.client, "client", , msg, sz)
else
skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
end
end

直接将数据转发给之前通过forward注册进来的agent,采用的协议是"client"。agent中需注册此协议的处理,拿到字节流并根据上层的业务协议对数据转码,做进一步的处理。这样,数据接收和转发的流程就结束了。其它方面,比如数据发送,关闭socket等等,流程上都是一致的,具体细节不再详述。

最新文章

  1. 【.net 深呼吸】写入日志文件
  2. HTTP 错误 404.3 - Not Found 由于扩展配置问题而无法提供您请求的页面。如果该页面是脚本 ,请添加处理程序。如果下载文件,请添加 MIME 映射。 IIS站点中添加WCF项目后浏览网站报错解决方法。
  3. win7快捷键
  4. ActiveMQ入门实例
  5. Launching web on MyEclipse Tomcat 问题
  6. HDU 2255 &amp; KM模板
  7. CSS 编码规范
  8. UVa 11468 (AC自动机 概率DP) Substring
  9. 安装ADT Cannot complete the install because one or more required items could not be found.
  10. Android权限Uri.parse
  11. smokeping报错Can&#39;t locate RRDs.pm in @INC (@INC contains
  12. hdu 3917 最大重量封闭图
  13. CDN云主机与传统虚拟主机功能对比
  14. java载入XML文件并解析xml
  15. 分布式一致性算法:Raft 算法(论文翻译)
  16. ●BZOJ 4008 [HNOI2015]亚瑟王
  17. DVWA-暴力破解学习笔记
  18. laravel 图片验证码
  19. SQL Server datetime类型转换超出范围的报错
  20. cmake与autoconf+automake

热门文章

  1. Spring Task Schedule 及多线程
  2. LeetCode OJ--Search in Rotated Sorted Array II
  3. Working with multiple environments
  4. Wannafly挑战赛11 D 题 字符串hash + 卡常
  5. (7)C#连DB2---oledb方式
  6. HDU - 5974 A Simple Math Problem (数论 GCD)
  7. go语言学习之路四:字典
  8. Java8期间及持续时间
  9. POJ 题目3450 Corporate Identity(KMP 暴力)
  10. 谷歌訪问之直接输入ip地址