handle_connections_methods()
{
if (hPipe != INVALID_HANDLE_VALUE)
{
handler_count++;
if (pthread_create(&hThread,&connection_attrib,
handle_connections_namedpipes, 0))
{
sql_print_warning("Can't create thread to handle named pipes");
handler_count--;
}
}
if (have_tcpip && !opt_disable_networking)
{
handler_count++;
if (pthread_create(&hThread,&connection_attrib,
handle_connections_sockets, 0))
{
sql_print_warning("Can't create thread to handle TCP/IP");
handler_count--;
}
}
if (opt_enable_shared_memory)
{
handler_count++;
if (pthread_create(&hThread,&connection_attrib,
handle_connections_shared_memory, 0))
{
sql_print_warning("Can't create thread to handle shared memory");
handler_count--;
}
}
}
由于对于namepipe和memory share的通信方式不太了解,这里只研究socket的通信方式。从代码中可以看出,handle_connections_sockets便是socket的设置,我们就来看下它。
4.socket管理创建新线程socket管理其实比较简单,直接给出其伪代码:
handle_connections_sockets
{
select; //监视socket文件描述符
new_socket = accept;//处理到来的客户端连接
thd = new THD;创建THD类
vio_tmp = vio_new(new_socket,VIO_TYPE_TCPIP, 0); //初始化VIO结构体
my_net_init(&thd->net, vio_tmp);//初始化thd的net结构体
create_new_thread(thd);//为这个连接创建一个新的线程,如果是单线程模式的话,就不会创建一个新线程
}
首先是select函数进行监视socket端口,如果监控到有连接,则通过accept函数接受客户端的连接,然后新建一个THD类,将连接参数全部设置到THD类的参数上,最后调用create_new_thread函数,这个函数便是重点。 我们进入这个函数,看下做了啥。
create_new_thread
{
++connection_count;//全局连接数自增
thread_count++; //全局线程数自增
thread_scheduler.add_connection(thd);//真正创建线程
}
So easy,首先将全局连接数+1,全局线程数+1,然后调用add_connection函数,这个函数就是我们在上面第一步设置连接的
线程数中,one_thread_scheduler和one_thread_per_connection_scheduler中设置的一个参数。这两者的区别便是是否创建了
一个新的线程来处理到来的连接。one_thread_scheduler是单线程方式,木有新建线程。我们重点研究one_thread_per_connection_scheduler,其设置的add_connection函数为create_thread_to_handle_connection:
create_thread_to_handle_connection(THD *thd)
{
thread_created++;
threads.append(thd); // 创建线程数自增,并加入到threads链表上
pthread_create(&thd->real_id,&connection_attrib,
handle_one_connection,
(void*) thd);//这就是真正创建线程的地方了,函数便是handle_one_connection
}
可见,最后调用了pthread_create函数,这个函数便是创建一个新的线程,新线程的处理函数为handle_one_connection.
5.新线程处理流程
新线程处理函数为handle_one_connection,到此位置,一个新的connection被一个新创建的线程所单独处理。我们看下其中
是如何进行处理的。
handle_one_connection(void *arg)
{
for (;;)
{
lex_start(thd); //初始化词法分析结构体
login_connection(thd); //用户认证,失败报错
prepare_new_connection_state(THD* thd);//Initialize THD to handle queries
while (!net->error && net->vio != 0 && //循环处理command
!(thd->killed == THD::KILL_CONNECTION))
{
if (do_command(thd))
break; //处理失败跳出
}
end_connection(thd); //关闭连接
close_connection(thd, 0, 1);
thread_scheduler.end_thread(thd,1);//结束线程
return 0;
}
}
首先进行了词法分析结构体的初始化,然后进行用户认证,认证成功后通过do_command循环执行客户端发过来的命令。
6.总结
整个connection manager的流