搜索
首页数据库mysql教程MariaDB线程池源码分析_MySQL

MariaDB

bitsCN.com

 

MariaDB线程池源码分析

心中无码

 

0  前言

MySQL5.5的Enterprise版本以plugin的方式引入了thread pool,在并发请求数达到一定 数量的时候,性能相比社区版貌似有不少提高, 可以看下这个性能对比。

 

在引入线程池之前,MySQL支持的线程处理方式(thread_handling参数控制)有no-threads和one-thread-per-connection两种方式,no-threads方式是指任一时刻最多只有一个连接可以连接到server,一般用于实验性质。 one-thread-per-connection是指针对每个连接创建一个线程来处理这个连接的所有请求,直到连接断开,线程 结束。是thread_handling的默认方式。

 

one-thread-per-connection存在的问题就是需要为每个连接创建一个新的thread,当并发连接数达到一定 程度,性能会有明显下降,因为过多的线程会导致频繁的上下文切换,CPU cache命中率降低和锁的竞争 更加激烈。

 

解决one-thread-per-connection的方法就是降低线程数,这样就需要多个连接共用线程,这便引入了线程 池的概念。线程池中的线程是针对请求的,而不是针对连接的,也就是说几个连接可能使用相同的线程处理 各自的请求。

 

MariaDB在5.5引入了一个动态的线程池方案,可以根据当前请求的并发情况自动增加或减少线程数,还好 MariaDB完全开源,本文结合MariaDB的代码来介绍下thread pool的实现。这里使用的MariaDB 10.0的 代码树。

 

1  相关参数

MySQL的参数都写在sys_vars.cc文件下。

static Sys_var_uint Sys_threadpool_idle_thread_timeout(  "thread_pool_idle_timeout",  "Timeout in seconds for an idle thread in the thread pool."  "Worker thread will be shut down after timeout",  GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, UINT_MAX), DEFAULT(60), BLOCK_SIZE(1));static Sys_var_uint Sys_threadpool_oversubscribe(  "thread_pool_oversubscribe",  "How many additional active worker threads in a group are allowed.",  GLOBAL_VAR(threadpool_oversubscribe), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, 1000), DEFAULT(3), BLOCK_SIZE(1));static Sys_var_uint Sys_threadpool_size( "thread_pool_size", "Number of thread groups in the pool. " "This parameter is roughly equivalent to maximum number of concurrently " "executing threads (threads in a waiting state do not count as executing).",  GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, MAX_THREAD_GROUPS), DEFAULT(my_getncpus()), BLOCK_SIZE(1),  NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),  ON_UPDATE(fix_threadpool_size));static Sys_var_uint Sys_threadpool_stall_limit( "thread_pool_stall_limit", "Maximum query execution time in milliseconds," "before an executing non-yielding thread is considered stalled." "If a worker thread is stalled, additional worker thread " "may be created to handle remaining clients.",  GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(10, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1),  NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),   ON_UPDATE(fix_threadpool_stall_limit));

这几个参数都有相应的描述,这里再稍微具体介绍一下。
thread_pool_size: 线程池的分组(group)个数。MariaDB的线程池并不是说一整个大池子,而是分成了不同的group,而且是按照到来connection的顺序进行分组的,如第一个connection分配到group[0],那么第二个connection就分配到group[1],是一种Round Robin的轮询分配方式。默认值是CPU core个数。

 

thread_pool_idle_timeout: 线程最大空闲时间,如果某个线程空闲的时间大于这个参数,则线程退出。

 

thread_pool_stall_limit: 监控间隔时间,thread pool有个监控线程,每隔这个时间,会检查每个group的线程可用数等状态,然后进行相应的处理,如wake up或者create thread。

 

thread_pool_oversubscribe: 允许的每个group上的活跃的线程数,注意这并不是每个group上的最大线程数,而只是可以处理请求的线程数。

 

 

2  thread handling设置

thread pool模式其实是新增了一种thread_handling的方式,即在配置文件中设置:

[mysqld]thread_handling=pool-of-threads.....
 

MySQL内部是有一个scheduler_functions结构体,不论thread_handling是哪种方式,都是通过设置这个 结构体中的函数来进行不同的调度。

/** scheduler_functions结构体 */struct scheduler_functions{  uint max_threads, *connection_count;  ulong *max_connections;  bool (*init)(void);  bool (*init_new_connection_thread)(void);  void (*add_connection)(THD *thd);  void (*thd_wait_begin)(THD *thd, int wait_type);  void (*thd_wait_end)(THD *thd);  void (*post_kill_notification)(THD *thd);  bool (*end_thread)(THD *thd, bool cache_thread);  void (*end)(void);};static int get_options(int *argc_ptr, char ***argv_ptr){  ...  /** 根据thread_handling选项的设置,选择不同的处理方式*/if (thread_handling max_threads= threadpool_max_threads;  func->max_connections= arg_max_connections;  func->connection_count= arg_connection_count;  scheduler_init();}

上面可以看到设置了thread_scheduler的处理函数为tp_scheduler_functions,即 为thread pool方式,这种方式对应的初始函数为tp_init, 创建新连接的函数为 tp_add_connection,等待开始函数为tp_wait_begin,等待结束函数为tp_wait_end. 这里说明下等待函数的意义,等待函数一般是在等待磁盘I/O,等待锁资源,SLEEP,或者等待 网络消息的时候,调用wait_begin,在等待结束后调用wait_end,那么为什么要等待的时候 调用等待函数呢?这个在后面进行介绍。

 

上面讲的其实和thread pool关系不是很大,下面开始thread pool流程的介绍。thread pool涉及 到的源码在emphsql/threadpool_common.cc和emphsql/threadpool_unix.cc, 对于windows而言,还有emphsql/threadpool_win.cc.

 

3  线程池初始化——tp_init

 
>tp_init| >thread_group_init| >start_timer

tp_init非常简单,首先是调用了thread_group_init进行组的初始化, 然后调用的start_timer开启了监控线程timer_thread。 至此为止,thread pool里面只有一个监控线程启动,而没有任何工作线程, 直到有新的连接到来。

 

4  添加新连接——tp_add_connection

 
void tp_add_connection(THD *thd){  DBUG_ENTER("tp_add_connection");    threads.append(thd);  mysql_mutex_unlock(&LOCK_thread_count);  connection_t *connection= alloc_connection(thd);  if (connection)  {    thd->event_scheduler.data= connection;          /* Assign connection to a group. */    thread_group_t *group=       &all_groups[thd->thread_id%group_count];        connection->thread_group=group;          mysql_mutex_lock(&group->mutex);    group->connection_count++;    mysql_mutex_unlock(&group->mutex);        /*       Add connection to the work queue.Actual logon        will be done by a worker thread.    */    queue_put(group, connection);  }  else  {    /* Allocation failed */    threadpool_remove_connection(thd);  }   DBUG_VOID_RETURN;}

但server的主监听线程监听到有客户端的connect时,会调用tp_add_connection函数进行处理。 首先根据thread_id对group_count取模,找到其所属的group,然后调用queue_put将此connection 放入到group中的queue中。这里涉及到两个新的结构体,connection_t和thread_group_t。

struct connection_t{  THD *thd;  thread_group_t *thread_group;  connection_t *next_in_queue;  connection_t **prev_in_queue;  ulonglong abs_wait_timeout; //等待超时时间  bool logged_in; //是否进行了登录验证  bool bound_to_poll_descriptor; //是否添加到了epoll进行监听  bool waiting; //是否在等待状态,如I/O, sleep};struct thread_group_t {  mysql_mutex_t mutex;  connection_queue_t queue;  //connection请求链表  worker_list_t waiting_threads; //group中正在等待被唤醒的thread  worker_thread_t *listener;  //当前group中用于监听的线程  pthread_attr_t *pthread_attr;  int  pollfd;  //epoll 文件描述符,用于绑定group中的所有连接  int  thread_count;  //线程数  int  active_thread_count;//活跃线程数  int  connection_count; //连接数  /* Stats for the deadlock detection timer routine.*/  int io_event_count;  //epoll产生的事件数  int queue_event_count; //工作线程消化的事件数  ulonglong last_thread_creation_time;  int  shutdown_pipe[2];  bool shutdown;  bool stalled; // 工作线程是否处于停滞状态  } MY_ALIGNED(512);

上面对这些参数进行了说明,理解这些参数的意义,才能了解这个动态thread pool的管理机制, 因为每个参数都会影响到thread pool的增长或收缩。

 

介绍完结构体,继续回到新的连接到来,这时会调用queue_put函数,将此connection放到 group的队列queue中。

static void queue_put(thread_group_t *thread_group, connection_t *connection){  DBUG_ENTER("queue_put");  mysql_mutex_lock(&thread_group->mutex);  thread_group->queue.push_back(connection);  if (thread_group->active_thread_count == 0)    wake_or_create_thread(thread_group);  mysql_mutex_unlock(&thread_group->mutex);  DBUG_VOID_RETURN;}

注意,这时候有个active_thread_count的判断,如果没有活跃的线程,那么就无法处理 这个新到的请求啊,这时就需要调用wake_or_create_thread,这个函数首先会尝试唤醒group 等待线程链表waiting_threads中的线程,如果没有等待中的线程,则需要创建一个线程。 至此,新到的connection被挂到了group的queue上,这样一个连接算是add进队列了,那么如何 处理这个连接呢?我们继续往下看。

 

5  工作线程——worker_main

由于是第一个连接到来,那么肯定没有waiting_threads,此时会调用create_worker 函数创建一个工作线程。我们直接来看下工作线程。

 
static void *worker_main(void *param){  ...  DBUG_ENTER("worker_main");    thread_group_t *thread_group = (thread_group_t *)param;  /* Run event loop */  for(;;)  {    connection_t *connection;    struct timespec ts;    set_timespec(ts,threadpool_idle_timeout);    connection = get_event(&this_thread, thread_group, &ts);    if (!connection)      break;    this_thread.event_count++;    handle_event(connection);  }  ....  my_thread_end();  return NULL;}

上面是整个工作线程的逻辑,可以看到是一个循环,get_event用来获取新的需要处理的 connection,然后调用handle_event进行处理相应的connection。one thread per connection 中每个线程也是一个循环体,这两者之间的区别就是,thread pool的循环等待的是一个可用的event, 并不局限于某个固定的connection的event,而one thread per connection的循环等待是等待固定的 connection上的event,这就是两者最大的区别。

 

6  事件获取——get_event

 

工作线程通过get_event获取需要处理的connection,

 
connection_t *get_event(worker_thread_t *current_thread,   thread_group_t *thread_group,  struct timespec *abstime){   ...  for(;;)   {  ...      /** 从QUEUE中获取connection */      connection = queue_get(thread_group);      if(connection) {        fprintf(stderr, "Thread %x get a new connection./n", (unsigned int)pthread_self());        break;      }      ...      /**监听epoll */    if(!thread_group->listener)    {      thread_group->listener= current_thread;      thread_group->active_thread_count--;      mysql_mutex_unlock(&thread_group->mutex);      fprintf(stderr, "Thread %x waiting for a new event./n", (unsigned int)pthread_self());      connection = listener(current_thread, thread_group);      fprintf(stderr, "Thread %x get a new event for connection %p./n",              (unsigned int)pthread_self(), connection);      mysql_mutex_lock(&thread_group->mutex);      thread_group->active_thread_count++;      /* There is no listener anymore, it just returned. */      thread_group->listener= NULL;      break;    }    ...}
 

这个get_event的函数逻辑稍微有点多,这里只抽取了获取事件的两个点, 我们接着按照第一个连接到来是的情形进行说明, 第一个连接到来,queue中有了一个connection,这是get_event便会从queue中获取到一个 connection,返回给worker_main线程。worker_main接着调用handle_event进行事件处理。

 

每个新的connection连接到服务器后,其socket会绑定到group的epoll中,所以,如果queue中 没有connection,需要从epool中获取,每个group的所有连接的socket都绑定在group的epool 中,所以任何一个时刻,最多只有一个线程能够监听epoll,如果epoll监听到有event的话,也会返回 相应的connection,然后再调用handle_event进行处理。

 

7  事件处理——handle_event

 

handle_event的逻辑比较简单,就是根据connection_t上是否登录过,进行分支,如果没 登录过,说明是新到的连接,则进行验证,否则直接进行请求处理。

static void handle_event(connection_t *connection){  DBUG_ENTER("handle_event");  int err;  if (!connection->logged_in) //处理登录  {    err= threadpool_add_connection(connection->thd);    connection->logged_in= true;  }  else  //处理请求  {    err= threadpool_process_request(connection->thd);  }  if(err)    goto end;  set_wait_timeout(connection);  /** 设置socket到epoll的监听 */  err= start_io(connection);end:  if (err)    connection_abort(connection);  DBUG_VOID_RETURN;}static int start_io(connection_t *connection){   int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);  ...      /* 绑定到epoll *。  if (!connection->bound_to_poll_descriptor)  {    connection->bound_to_poll_descriptor= true;    return io_poll_associate_fd(group->pollfd, fd, connection);  }    return io_poll_start_read(group->pollfd, fd, connection);}

注意,在handle_event之后,会调用start_io,这个函数很重要,这个函数会将新 到的connection的socket绑定到group的epoll上进行监听。

 

8  线程等待

 

当group中的线程没有任务执行时,所有线程都会在get_event处等待,但是有两种等待方式, 一种是在epoll上等待事件,每个group中只有一个线程会做这个事情,且这个会一直等待,直到有新 的事件到来。另一种就是等待一定的时间, 即参数thread_pool_idle_time这个时间,如果超过了这个时间,那么当前的线程的get_event就会 返回空,然后worker_main线程就会退出。如果在线程等待的过程被唤醒的话,那么就会继续在 get_event中进行循环,等待新的事件。

 

9  唤醒等待线程

 

有两种方式会唤醒等待的线程,一种是监控线程timer_thread,另一种就是一些active的线程碰到 需要等待的时候,会调用tp_wait_begin,这个函数如果判断当前没有active的thread且没有thread监听 epoll,则会调用wake_or_create_thread。

 

监控线程timer_thread用于定期监控group中的thread使用情况,具体的检查函数是check_stall.

void check_stall(thread_group_t *thread_group){  ...  /** 如果没有线程监听epoll且自上次检查到现在没有新的event事件产生,说明所有的  活跃线程都在 忙于执行长任务,则需要唤醒或创建工作线程 */  if (!thread_group->listener && !thread_group->io_event_count)  {    wake_or_create_thread(thread_group);    mysql_mutex_unlock(&thread_group->mutex);    return;  }    /*  Reset io event count */  thread_group->io_event_count= 0;  /** 如果队列queue中有请求,且自上次检查到现在queue中的请求没有被消化,  则说明所有活跃线程忙于执行长任务,需要唤醒或创建工作线程*/  if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)  {    thread_group->stalled= true;    wake_or_create_thread(thread_group);  }    /* Reset queue event count */  thread_group->queue_event_count= 0;    mysql_mutex_unlock(&thread_group->mutex);}
 

10  小结

MariaDB的thread pool的实现相对比较简单,总体上就是将group中所有的connection的socket挂在 group的epoll_fd上进行事件监听,监听到的事件或被当前线程执行,或者被push到group的queue上 被其他线程执行。

 

监控线程timer_thread定期的根据需要去唤醒等待线程或创建新的线程,来达到动态增加的thread的 目的。而thread的收缩则是通过线程等待事件超时来完成的。

 

btw,在跟踪代码的过程中,也发现了使用thread pool时导致server crash的情况,提交了个 bug给MariaDB,发现当天就有回复, 并立刻修复push到source tree上了,看来MariaDB的团队反映够迅速的,赞一个。

 

References

[1]
Thread pool in MariaDB 5.5
[3]
The Thread Pool Plugin
[3]
Thread Pool Worklog
 




File translated fromTEXby TTH,version 4.03.
On 25 May 2013, 01:39.

bitsCN.com
声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
您什么时候应该使用复合索引与多个单列索引?您什么时候应该使用复合索引与多个单列索引?Apr 11, 2025 am 12:06 AM

在数据库优化中,应根据查询需求选择索引策略:1.当查询涉及多个列且条件顺序固定时,使用复合索引;2.当查询涉及多个列但条件顺序不固定时,使用多个单列索引。复合索引适用于优化多列查询,单列索引则适合单列查询。

如何识别和优化MySQL中的慢速查询? (慢查询日志,performance_schema)如何识别和优化MySQL中的慢速查询? (慢查询日志,performance_schema)Apr 10, 2025 am 09:36 AM

要优化MySQL慢查询,需使用slowquerylog和performance_schema:1.启用slowquerylog并设置阈值,记录慢查询;2.利用performance_schema分析查询执行细节,找出性能瓶颈并优化。

MySQL和SQL:开发人员的基本技能MySQL和SQL:开发人员的基本技能Apr 10, 2025 am 09:30 AM

MySQL和SQL是开发者必备技能。1.MySQL是开源的关系型数据库管理系统,SQL是用于管理和操作数据库的标准语言。2.MySQL通过高效的数据存储和检索功能支持多种存储引擎,SQL通过简单语句完成复杂数据操作。3.使用示例包括基本查询和高级查询,如按条件过滤和排序。4.常见错误包括语法错误和性能问题,可通过检查SQL语句和使用EXPLAIN命令优化。5.性能优化技巧包括使用索引、避免全表扫描、优化JOIN操作和提升代码可读性。

描述MySQL异步主奴隶复制过程。描述MySQL异步主奴隶复制过程。Apr 10, 2025 am 09:30 AM

MySQL异步主从复制通过binlog实现数据同步,提升读性能和高可用性。1)主服务器记录变更到binlog;2)从服务器通过I/O线程读取binlog;3)从服务器的SQL线程应用binlog同步数据。

mysql:简单的概念,用于轻松学习mysql:简单的概念,用于轻松学习Apr 10, 2025 am 09:29 AM

MySQL是一个开源的关系型数据库管理系统。1)创建数据库和表:使用CREATEDATABASE和CREATETABLE命令。2)基本操作:INSERT、UPDATE、DELETE和SELECT。3)高级操作:JOIN、子查询和事务处理。4)调试技巧:检查语法、数据类型和权限。5)优化建议:使用索引、避免SELECT*和使用事务。

MySQL:数据库的用户友好介绍MySQL:数据库的用户友好介绍Apr 10, 2025 am 09:27 AM

MySQL的安装和基本操作包括:1.下载并安装MySQL,设置根用户密码;2.使用SQL命令创建数据库和表,如CREATEDATABASE和CREATETABLE;3.执行CRUD操作,使用INSERT,SELECT,UPDATE,DELETE命令;4.创建索引和存储过程以优化性能和实现复杂逻辑。通过这些步骤,你可以从零开始构建和管理MySQL数据库。

InnoDB缓冲池如何工作,为什么对性能至关重要?InnoDB缓冲池如何工作,为什么对性能至关重要?Apr 09, 2025 am 12:12 AM

InnoDBBufferPool通过将数据和索引页加载到内存中来提升MySQL数据库的性能。1)数据页加载到BufferPool中,减少磁盘I/O。2)脏页被标记并定期刷新到磁盘。3)LRU算法管理数据页淘汰。4)预读机制提前加载可能需要的数据页。

MySQL:初学者的数据管理易用性MySQL:初学者的数据管理易用性Apr 09, 2025 am 12:07 AM

MySQL适合初学者使用,因为它安装简单、功能强大且易于管理数据。1.安装和配置简单,适用于多种操作系统。2.支持基本操作如创建数据库和表、插入、查询、更新和删除数据。3.提供高级功能如JOIN操作和子查询。4.可以通过索引、查询优化和分表分区来提升性能。5.支持备份、恢复和安全措施,确保数据的安全和一致性。

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
3 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

适用于 Eclipse 的 SAP NetWeaver 服务器适配器

将Eclipse与SAP NetWeaver应用服务器集成。