最近看了slave IO的源码,发现slave IO的写relay log貌似是单线程单连接的,这让我有点小失望。

slave IO的主函数是handle_slave_io,处理流程如下:

图1 handle_slave_io处理流程

我们这次主要要完成safe_connect以及try_to_reconnet用到的核心函数 mysql_real_connect流程的探索。

一、mysql_real_connect流程

在这之前我们需要弄明白连接mysql需要那几步操作,参考自官网的文档(http://dev.mysql.com/doc/internals/en/plain-handshake.html),据说连接时需要以下操作:

图2 mysql_real_connect操作流程

1.建立与mysql的连接

对于需要连接的建立一个监听端口,然后建立与链表中的所有服务端建立连接,并绑定到监听端口

   if (!net->vio &&
(!mysql->options.protocol ||
mysql->options.protocol == MYSQL_PROTOCOL_SOCKET) &&
(unix_socket || mysql_unix_port) &&
(!host || !strcmp(host,LOCAL_HOST)))
{
my_socket sock= socket(AF_UNIX, SOCK_STREAM, );
DBUG_PRINT("info", ("Using socket"));
if (sock == SOCKET_ERROR)
{
set_mysql_extended_error(mysql, CR_SOCKET_CREATE_ERROR,
unknown_sqlstate,
ER(CR_SOCKET_CREATE_ERROR),
socket_errno);
goto error;
} net->vio= vio_new(sock, VIO_TYPE_SOCKET,
VIO_LOCALHOST | VIO_BUFFERED_READ);
if (!net->vio)
{
DBUG_PRINT("error",("Unknow protocol %d ", mysql->options.protocol));
set_mysql_error(mysql, CR_CONN_UNKNOW_PROTOCOL, unknown_sqlstate);
closesocket(sock);
goto error;
} host= LOCAL_HOST;
if (!unix_socket)
unix_socket= mysql_unix_port;
host_info= (char*) ER(CR_LOCALHOST_CONNECTION);
DBUG_PRINT("info", ("Using UNIX sock '%s'", unix_socket)); memset(&UNIXaddr, , sizeof(UNIXaddr));
UNIXaddr.sun_family= AF_UNIX;
strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-); if (vio_socket_connect(net->vio, (struct sockaddr *) &UNIXaddr,
sizeof(UNIXaddr), get_vio_connect_timeout(mysql)))
{
DBUG_PRINT("error",("Got error %d on connect to local server",
socket_errno));
set_mysql_extended_error(mysql, CR_CONNECTION_ERROR,
unknown_sqlstate,
ER(CR_CONNECTION_ERROR),
unix_socket, socket_errno);
vio_delete(net->vio);
net->vio= ;
goto error;
}
mysql->options.protocol=MYSQL_PROTOCOL_SOCKET;
}
 for (t_res= res_lst; t_res; t_res= t_res->ai_next)
{
DBUG_PRINT("info", ("Create socket, family: %d type: %d proto: %d",
t_res->ai_family, t_res->ai_socktype,
t_res->ai_protocol)); sock= socket(t_res->ai_family, t_res->ai_socktype, t_res->ai_protocol);
if (sock == SOCKET_ERROR)
{
DBUG_PRINT("info", ("Socket created was invalid"));
/* Try next address if there is one */
saved_error= socket_errno;
continue;
} if (client_bind_ai_lst)
{
struct addrinfo* curr_bind_ai= NULL;
DBUG_PRINT("info", ("Attempting to bind socket to bind address(es)")); /*
We'll attempt to bind to each of the addresses returned, until
we find one that works.
If none works, we'll try the next destination host address
(if any)
*/
curr_bind_ai= client_bind_ai_lst; while (curr_bind_ai != NULL)
{
/* Attempt to bind the socket to the given address */
bind_result= bind(sock,
curr_bind_ai->ai_addr,
curr_bind_ai->ai_addrlen);
if (!bind_result)
break; /* Success */ DBUG_PRINT("info", ("bind failed, attempting another bind address"));
/* Problem with the bind, move to next address if present */
curr_bind_ai= curr_bind_ai->ai_next;
} if (bind_result)
{
/*
Could not bind to any client-side address with this destination
Try the next destination address (if any)
*/
DBUG_PRINT("info", ("All bind attempts with this address failed"));
saved_error= socket_errno;
closesocket(sock);
continue;
}
DBUG_PRINT("info", ("Successfully bound client side of socket"));
} /* Create a new Vio object to abstract the socket. */
if (!net->vio)
{
if (!(net->vio= vio_new(sock, VIO_TYPE_TCPIP, flags)))
{
set_mysql_error(mysql, CR_OUT_OF_MEMORY, unknown_sqlstate);
closesocket(sock);
freeaddrinfo(res_lst);
if (client_bind_ai_lst)
freeaddrinfo(client_bind_ai_lst);
goto error;
}
}
/* Just reinitialize if one is already allocated. */
else if (vio_reset(net->vio, VIO_TYPE_TCPIP, sock, NULL, flags))
{
set_mysql_error(mysql, CR_UNKNOWN_ERROR, unknown_sqlstate);
closesocket(sock);
freeaddrinfo(res_lst);
if (client_bind_ai_lst)
freeaddrinfo(client_bind_ai_lst);
goto error;
} DBUG_PRINT("info", ("Connect socket"));
status= vio_socket_connect(net->vio, t_res->ai_addr,
(socklen_t)t_res->ai_addrlen,
get_vio_connect_timeout(mysql));
/*
Here we rely on vio_socket_connect() to return success only if
the connect attempt was really successful. Otherwise we would
stop trying another address, believing we were successful.
*/
if (!status)
break; /*
Save either the socket error status or the error code of
the failed vio_connection operation. It is necessary to
avoid having it overwritten by later operations.
*/
saved_error= socket_errno; DBUG_PRINT("info", ("No success, try next address."));
}

2.读取初始握手报文

  if ((pkt_length=cli_safe_read(mysql, NULL)) == packet_error)
{
if (mysql->net.last_errno == CR_SERVER_LOST)
set_mysql_extended_error(mysql, CR_SERVER_LOST, unknown_sqlstate,
ER(CR_SERVER_LOST_EXTENDED),
"reading initial communication packet",
socket_errno);
goto error;
}
pkt_end= (char*)net->read_pos + pkt_length;
/* Check if version of protocol matches current one */
mysql->protocol_version= net->read_pos[];
DBUG_DUMP("packet",(uchar*) net->read_pos,);
DBUG_PRINT("info",("mysql protocol version %d, server=%d",
PROTOCOL_VERSION, mysql->protocol_version));
if (mysql->protocol_version != PROTOCOL_VERSION)
{
set_mysql_extended_error(mysql, CR_VERSION_ERROR, unknown_sqlstate,
ER(CR_VERSION_ERROR), mysql->protocol_version,
PROTOCOL_VERSION);
goto error;
}
server_version_end= end= strend((char*) net->read_pos+);
mysql->thread_id=uint4korr((uchar*) end + );
end+=;
/*
Scramble is split into two parts because old clients do not understand
long scrambles; here goes the first part.
*/
scramble_data= end;
scramble_data_len= AUTH_PLUGIN_DATA_PART_1_LENGTH + ;
scramble_plugin= NULL;
end+= scramble_data_len; if (pkt_end >= end + )
mysql->server_capabilities=uint2korr((uchar*) end);
if (pkt_end >= end + )
{
/* New protocol with 16 bytes to describe server characteristics */
mysql->server_language=end[];
mysql->server_status=uint2korr((uchar*) end + );
mysql->server_capabilities|= uint2korr((uchar*) end + ) << ;
pkt_scramble_len= end[];
if (pkt_scramble_len < )
{
set_mysql_error(mysql, CR_MALFORMED_PACKET,
unknown_sqlstate); /* purecov: inspected */
goto error;
}
}
end+= ; if (mysql_init_character_set(mysql))
goto error;

3.发送回复握手报文

通过run_plugin_auth发送回复握手报文

   mpvio.mysql_change_user= data_plugin == ;
mpvio.cached_server_reply.pkt= (uchar*)data;
mpvio.cached_server_reply.pkt_len= data_len;
mpvio.read_packet= client_mpvio_read_packet;
mpvio.write_packet= client_mpvio_write_packet;
mpvio.info= client_mpvio_info;
mpvio.mysql= mysql;
mpvio.packets_read= mpvio.packets_written= ;
mpvio.db= db;
mpvio.plugin= auth_plugin; MYSQL_TRACE(AUTH_PLUGIN, mysql, (auth_plugin->name)); res= auth_plugin->authenticate_user((struct st_plugin_vio *)&mpvio, mysql);

 static int native_password_auth_client(MYSQL_PLUGIN_VIO *vio, MYSQL *mysql)
{
int pkt_len;
uchar *pkt; DBUG_ENTER("native_password_auth_client"); if (((MCPVIO_EXT *)vio)->mysql_change_user)
{
/*
in mysql_change_user() the client sends the first packet.
we use the old scramble.
*/
pkt= (uchar*)mysql->scramble;
pkt_len= SCRAMBLE_LENGTH + ;
}
else
{
/* read the scramble */
if ((pkt_len= vio->read_packet(vio, &pkt)) < )
DBUG_RETURN(CR_ERROR); if (pkt_len != SCRAMBLE_LENGTH + )
DBUG_RETURN(CR_SERVER_HANDSHAKE_ERR); /* save it in MYSQL */
memcpy(mysql->scramble, pkt, SCRAMBLE_LENGTH);
mysql->scramble[SCRAMBLE_LENGTH] = ;
} if (mysql->passwd[])
{
char scrambled[SCRAMBLE_LENGTH + ];
DBUG_PRINT("info", ("sending scramble"));
scramble(scrambled, (char*)pkt, mysql->passwd);
if (vio->write_packet(vio, (uchar*)scrambled, SCRAMBLE_LENGTH))
DBUG_RETURN(CR_ERROR);
}
else
{
DBUG_PRINT("info", ("no password"));
if (vio->write_packet(vio, , )) /* no password */
DBUG_RETURN(CR_ERROR);
} DBUG_RETURN(CR_OK);
}

先通过read_packet获得挑战码,再通过scramble加密,然后通过write_packet发送回复握手报文。

client_mpvio_write_packet->send_client_reply_packet,该函数是发送回复握手报文。

4.读入认证回复报文

  /* read the OK packet (or use the cached value in mysql->net.read_pos */
if (res == CR_OK)
pkt_length= (*mysql->methods->read_change_user_result)(mysql);
else /* res == CR_OK_HANDSHAKE_COMPLETE */
pkt_length= mpvio.last_read_packet_len;

最后通过cli_read_change_user_result即cli_safe_read读取ok报文

5.选择数据库

 int STDCALL
mysql_select_db(MYSQL *mysql, const char *db)
{
int error;
DBUG_ENTER("mysql_select_db");
DBUG_PRINT("enter",("db: '%s'",db)); if ((error=simple_command(mysql,COM_INIT_DB, (const uchar*) db,
(ulong) strlen(db),)))
DBUG_RETURN(error);
my_free(mysql->db);
mysql->db=my_strdup(key_memory_MYSQL,
db,MYF(MY_WME));
DBUG_RETURN();
}

以command报文的形式发送命令数据

二、 登陆阶段所用到的报文格式

1.初始握手报文

               [0a] protocol version
string[NUL] server version
connection id
string[] auth-plugin-data-part-
[] filler
capability flags (lower bytes)
if more data in the packet:
character set
status flags
capability flags (upper bytes)
if capabilities & CLIENT_PLUGIN_AUTH {
length of auth-plugin-data
} else {
[]
}
string[] reserved (all [])
if capabilities & CLIENT_SECURE_CONNECTION {
string[$len] auth-plugin-data-part- ($len=MAX(, length of auth-plugin-data - ))
if capabilities & CLIENT_PLUGIN_AUTH {
string[NUL] auth-plugin name
}

(1)协议的版本

(2)协议的版本名

(3)连接id其实是线程的id

(4)挑战码的第一部分(用于登陆密码加密)

(5)不用关注

(6)标志位的最低两个,该标志会确定较多信息后面的capabilities就是该标志为

(8)字符集编号,其实就是采用什么样的字符集,如utf8等等

(9)服务器状态编码

(10)标志位的较高两位

(12)挑战码总长度(用于登陆密码加密,是一个可选项)

(16)都是0,不用关注

(18)第二段挑战码(用于登陆密码加密,是一个可选项)

(20)挑战码生成名(是一个可选项)

2.回复握手报文

               capability flags, CLIENT_PROTOCOL_41 always set
max-packet size
character set
string[] reserved (all [])
string[NUL] username
if capabilities & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA {
lenenc-int length of auth-response
string[n] auth-response
} else if capabilities & CLIENT_SECURE_CONNECTION {
length of auth-response
string[n] auth-response
} else {
string[NUL] auth-response
}
if capabilities & CLIENT_CONNECT_WITH_DB {
string[NUL] database
}
if capabilities & CLIENT_PLUGIN_AUTH {
string[NUL] auth plugin name
}
if capabilities & CLIENT_CONNECT_ATTRS {
lenenc-int length of all key-values
lenenc-str key
lenenc-str value
if-more data in 'length of all key-values', more keys and value pairs
}

(1)收到的capability flags

(2)最大报文长度,这是与mysql服务器协商的

(3)字符集

(4)不需要关注

(5)登陆的用户名

(7)(8)一般选项为此选项,即加密的密码报文

下面的报文在本文件发送中没有用到

3.认证回复报文

 Type    Name    Description
int<> header [] or [fe] the OK packet header
int<lenenc> affected_rows affected rows
int<lenenc> last_insert_id last insert-id
if capabilities & CLIENT_PROTOCOL_41 {
int<> status_flags Status Flags
int<> warnings number of warnings
} elseif capabilities & CLIENT_TRANSACTIONS {
int<> status_flags Status Flags
}
if capabilities & CLIENT_SESSION_TRACK {
string<lenenc> info human readable status information
if status_flags & SERVER_SESSION_STATE_CHANGED {
string<lenenc> session_state_changes session state info
}
} else {
string<EOF> info human readable status information
}

最新文章

  1. Twentydaysgone
  2. Oracle 11g系列:约束
  3. time
  4. 温故知新---重读C#InDepth(二)
  5. Java基础-继承-子类与父类执行顺序
  6. Python学习总结14:时间模块datetime &amp; time &amp; calendar (一)
  7. android EditText中的inputType
  8. 验证码 Captcha 之大插件
  9. jQuery整理笔记文件夹
  10. [APUE]进程控制(中)
  11. JS判断当前使用设备是pc端还是web端(转MirageFireFox)
  12. 爬取知名社区技术文章_pipelines_4
  13. Dapper查询返回Datatable
  14. Scala--模式匹配和样例类
  15. Linux版本Membase无法写入default bucket的问题分析
  16. SAX解析器
  17. redis集群搭建教程(以3.2.2为例)
  18. CSS3性能体验
  19. 【转】MySQL执行计划分析
  20. 轻量级IOC框架:Ninject (上)

热门文章

  1. 基于dubbo的分布式项目实例应用
  2. java中注解的使用与实例(一)
  3. 转载:安装ie driver和chrome driver
  4. 【BZOJ3943】[Usaco2015 Feb]SuperBull 最小生成树
  5. 深入理解JavaScript运行机制
  6. NOIP2016普及
  7. BZOJ2908: 又是nand
  8. React-Native需要css和布局-20160902
  9. Myeclipse 不能保存汉字
  10. gkENGINE重开!