01MySQL內核分析-The Skeleton of the Server Code

摘要

這個官方文檔一段對MySQL內核分析的一個嚮導。是對MySQL一條insert語句寫入到MySQL資料庫的分析。
但是,對於MySQL 5.7版本來說,基本上都是寫入到innodb引擎。但也還是有借鑒意義,大的框架沒有太大變化。
後面的文檔,會通過mysqld –debug 和gdb等工具,通過分析mysqld.trace來分析insert語句在MySQL 5.7中怎麼寫入資料庫。

官方文檔給出的一段結構,如下:

/sql/mysqld.cc
/sql/sql_parse.cc
/sql/sql_prepare.cc
/sql/sql_insert.cc
/sql/ha_myisam.cc
/myisam/mi_write.c

上述梳理一個過程,是說從客戶段執行一條簡單的insert語句,然後到達MySQL伺服器端,並通過MyISAM存儲層。寫入到MyISAM文件的過程。

由於,我們現在的主流都是InnoDB存儲引擎,所以我們分析的寫入到存儲層應該是InnoDB的源程式碼。但是上述的一個框架也有借鑒意義。雖然,走的是InnoDB存儲引擎插入數據,但是也還是需要通過SQL層的ha_*這樣的介面進行接入。

正題開始!!!!!!!!!!!!!!!!!!!!!!!

第一步,進入MySQL大門的地方。夢開始的地方。眾所周知,C語言都是需要main方法作為主入口。而MySQL的主入口如下:

程式碼位置 /sql/mysqld.cc

  int main(int argc, char **argv)
  {
    _cust_check_startup();
    (void) thr_setconcurrency(concurrency);
    init_ssl();
    server_init();                             // 'bind' + 'listen'
    init_server_components();
    start_signal_handler();
    acl_init((THD *)0, opt_noacl);
    init_slave();
    create_shutdown_thread();
    create_maintenance_thread();
    handle_connections_sockets(0);             // !  這裡也代表著我們進入下一個門的地方
    DBUG_PRINT("quit",("Exiting main thread"));
    exit(0);
  }

這裡可以看到很多的init_*或者server_init()。通過名字我們可以猜測出,這裡做了很多初始化的工作。例如:啟動過程中一些初始化的檢查和MySQL配置變數的載入和一些組件的初始化等。

這裡重要的函數是handle_connections_sockets

繼續跟蹤 /sql/mysqld.cc

 handle_connections_sockets (arg __attribute__((unused))
  {
     if (ip_sock != INVALID_SOCKET)
     {
       FD_SET(ip_sock,&clientFDs);
       DBUG_PRINT("general",("Waiting for connections."));
       while (!abort_loop)
       {
         new_sock = accept(sock, my_reinterpret_cast(struct sockaddr*)
           (&cAddr),             &length);
         thd= new THD;
         if (sock == unix_sock)
         thd->host=(char*) localhost;
         create_new_thread(thd);            // !
         }

從簡易的思維,忽視其他的判斷語句。可以看到這裡做的是典型的client/server架構。伺服器有一個主執行緒,它總是偵聽來自新客戶機的請求。一旦它接收到這樣的請求,它將分配資源。特別是,主執行緒將生成一個新執行緒來處理連接。然後主伺服器將循環並偵聽新連接——但我們將保留它並跟蹤新執行緒。

這裡創建新執行緒的方法是:create_new_thread(thd);

繼續跟蹤 /sql/mysqld.cc

  create_new_thread(THD *thd)
  {
    pthread_mutex_lock(&LOCK_thread_count);
    pthread_create(&thd->real_id,&connection_attrib,
        handle_one_connection,                        // !
        (void*) thd));
    pthread_mutex_unlock(&LOCK_thread_count);
  }

可以看到這裡獲得一個新執行緒加入一個互斥鎖,避免衝突。

繼續跟蹤 /sql/mysqld.cc

handle_one_connection(THD *thd)
  {
    init_sql_alloc(&thd->mem_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
    while (!net->error && net->vio != 0 && !thd->killed)
    {
      if (do_command(thd))            // !
        break;
    }
    close_connection(net);
    end_thread(thd,1);
    packet=(char*) net->read_pos;

從這裡開始,我們即將脫離mysqld.cc文件,因為我們獲得了thread,且分配一小段記憶體資源,給與我們來處理我們的SQL語句了。

我們會走向何方呢,可以開始觀察do_command(thd)方法。

繼續跟蹤/sql/sql_parse.cc

bool do_command(THD *thd)
{
  net_new_transaction(net);
  packet_length=my_net_read(net);
  packet=(char*) net->read_pos;
  command = (enum enum_server_command) (uchar) packet[0];
  dispatch_command(command,thd, packet+1, (uint) packet_length);
// !
}

其中從這裡可以看到,do_command(THD *thd)把它串聯起來的是一個叫作THD的東西,也就是thread。所以後面的工作和行為,基本都是通過thread進行牽線搭橋的。

my_net_read函數位於另一個名為net_servlet .cc的文件中。該函數從客戶端獲取一個包,解壓縮它,並去除頭部。

一旦完成,我們就得到了一個名為packet的多位元組變數,它包含客戶端發送的內容。第一個位元組很重要,因為它包含標識消息類型的程式碼。

說明了packet第一個位元組很重要。debug也有證據進行一個佐證。

packet_header: Memory: 0x7f7fc000a4b0  Bytes: (4)
21 00 00 00

然後把packet第一個位元組和餘下的部分傳遞給dispatch_command

繼續跟蹤/sql/sql_parse.cc

bool dispatch_command(enum enum_server_command command, THD *thd,
       char* packet, uint packet_length)
{
  switch (command) {
    case COM_INIT_DB:          ...
    case COM_REGISTER_SLAVE:   ...
    case COM_TABLE_DUMP:       ...
    case COM_CHANGE_USER:      ...
    case COM_EXECUTE:
         mysql_stmt_execute(thd,packet);
    case COM_LONG_DATA:        ...
    case COM_PREPARE:
         mysql_stmt_prepare(thd, packet, packet_length);   // !
    /* and so on for 18 other cases */
    default:
     send_error(thd, ER_UNKNOWN_COM_ERROR);
     break;
    }

這裡sql_parser .cc中有一個非常大的switch語句

switch語句中程式碼有:code for prepare, close statement, query, quit, create database, drop database, dump binary log, refresh, statistics, get process info, kill process, sleep, connect, and several minor commands

除了COM_EXECUTE和COM_PREPARE兩種情況外,我們刪除了所有情況下的程式碼細節。

可以看到

  • COM_EXECUTE 會調用mysql_stmt_execute(thd,packet);

  • COM_PREPARE 會調用mysql_stmt_prepare(thd, packet, packet_length);

這裡就像一個中轉站一般,看我們去向什麼地方。這裡去的門是:COM_PREPARE:mysql_stmt_prepare

跟蹤 /sql/sql_prepare.cc

下面是一段prepare的注釋

"Prepare:
Parse the query
Allocate a new statement, keep it in 'thd->prepared statements' pool
Return to client the total number of parameters and result-set
metadata information (if any)"

繼續回到主線COM_EXECUTE

跟蹤/sql/sql_parse.cc

  bool dispatch_command(enum enum_server_command command, THD *thd,
       char* packet, uint packet_length)
  {
  switch (command) {
    case COM_INIT_DB:          ...
    case COM_REGISTER_SLAVE:   ...
    case COM_TABLE_DUMP:       ...
    case COM_CHANGE_USER:      ...
    case COM_EXECUTE:
         mysql_stmt_execute(thd,packet);                   // !
    case COM_LONG_DATA:        ...
    case COM_PREPARE:
         mysql_stmt_prepare(thd, packet, packet_length);
    /* and so on for 18 other cases */
    default:
     send_error(thd, ER_UNKNOWN_COM_ERROR);
     break;
    }

現在“COM_EXECUTE 中的mysql_stmt_execute`是我們關注的重點,我們來看看

跟蹤/sql/sql_prepare.cc程式碼

  void mysql_stmt_execute(THD *thd, char *packet)
  {
    if (!(stmt=find_prepared_statement(thd, stmt_id, "execute")))
    {
      send_error(thd);
      DBUG_VOID_RETURN;
    }
    init_stmt_execute(stmt);
    mysql_execute_command(thd);           // !
  }

這裡做一個判斷,看是否是execute,然後初始化語句,並開始執行mysql_execute_command(thd);可以看到,是通過thread來調用動作。

跟蹤/sql/sql_parse.cc程式碼

  void mysql_execute_command(THD *thd)
       switch (lex->sql_command) {
       case SQLCOM_SELECT: ...
       case SQLCOM_SHOW_ERRORS: ...
       case SQLCOM_CREATE_TABLE: ...
       case SQLCOM_UPDATE: ...
       case SQLCOM_INSERT: ...                   // !
       case SQLCOM_DELETE: ...
       case SQLCOM_DROP_TABLE: ...
       }

lex 解析sql語句。然後進入SQLCOM_INSERT。

跟蹤/sql/sql_parse.cc程式碼

case SQLCOM_INSERT:
{
  my_bool update=(lex->value_list.elements ? UPDATE_ACL : 0);
  ulong privilege= (lex->duplicates == DUP_REPLACE ?
                    INSERT_ACL | DELETE_ACL : INSERT_ACL | update);
  if (check_access(thd,privilege,tables->db,&tables->grant.privilege))
    goto error;
  if (grant_option && check_grant(thd,privilege,tables))
    goto error;
  if (select_lex->item_list.elements != lex->value_list.elements)
  {
    send_error(thd,ER_WRONG_VALUE_COUNT);
    DBUG_VOID_RETURN;
  }
  res = mysql_insert(thd,tables,lex->field_list,lex->many_values,
                     select_lex->item_list, lex->value_list,
                     (update ? DUP_UPDATE : lex->duplicates));
// !
  if (thd->net.report_error)
    res= -1;
  break;
}

對於插入數據,我們要做的第一件事情是:檢查用戶是否具有對錶進行插入的適當特權,伺服器通過調用check_access和check_grant函數在這裡進行檢查。

有了許可權才可以做【插入】動作。

我們可以導航 /sql 目錄,如下:

Program Name          SQL statement type
------------          ------------------
sql_delete.cc         DELETE
sql_do.cc             DO
sql_handler.cc        HANDLER
sql_help.cc           HELP
sql_insert.cc         INSERT            // !
sql_load.cc           LOAD
sql_rename.cc         RENAME
sql_select.cc         SELECT
sql_show.cc           SHOW
sql_update.cc         UPDATE

sql_insert.cc是具體執行插入的操作。

上面的mysql_insert() 的方法具體實現,在sql_insert.cc文件中。

跟蹤 /sql/sql_insert.cc程式碼

 int mysql_insert(THD *thd,TABLE_LIST *table_list, List<Item> &fields,
        List<List_item> &values_list,enum_duplicates duplic)
  {
    table = open_ltable(thd,table_list,lock_type);
    if (check_insert_fields(thd,table,fields,*values,1) ||
      setup_tables(table_list) ||
      setup_fields(thd,table_list,*values,0,0,0))
      goto abort;
    fill_record(table->field,*values);
    error=write_record(table,&info);                 // !
    query_cache_invalidate3(thd, table_list, 1);
    if (transactional_table)
      error=ha_autocommit_or_rollback(thd,error);
    query_cache_invalidate3(thd, table_list, 1);
    mysql_unlock_tables(thd, thd->lock);
    }

這裡就要開始,打開一張表。然後各種檢查,看插入表的欄位是否有問題。不行就abort。

然後,開始填充記錄數據。最終調用write_record 寫記錄的方法。

由於write_record 會對應不同的存儲引擎,所以這裡有分支的。我這裡講解兩種

繼續跟蹤/sql/sql_insert.cc

  int write_record(TABLE *table,COPY_INFO *info)
  {
    table->file->write_row(table->record[0];           // !
  }

終於,要寫文件了。調用那個存儲引擎呢?看handler.h

  /* The handler for a table type.
     Will be included in the TABLE structure */

  handler(TABLE *table_arg) :
table(table_arg),active_index(MAX_REF_PARTS),
    ref(0),ref_length(sizeof(my_off_t)),
block_size(0),records(0),deleted(0),
    data_file_length(0), max_data_file_length(0),
index_file_length(0),
    delete_length(0), auto_increment_value(0), raid_type(0),
    key_used_on_scan(MAX_KEY),
    create_time(0), check_time(0), update_time(0), mean_rec_length(0),
    ft_handler(0)
    {}
  ...
  virtual int write_row(byte * buf)=0;

寫入之MyISAM的程式碼路徑

官方文檔默認調用的是 ha_myisam::write_row

程式碼 /sql/ha_myisam.cc

如下:

int ha_myisam::write_row(byte * buf)
{
  statistic_increment(ha_write_count,&LOCK_status);
   /* If we have a timestamp column, update it to the current time */
   if (table->time_stamp)
    update_timestamp(buf+table->time_stamp-1);
   /*
  If we have an auto_increment column and we are writing a changed row
    or a new row, then update the auto_increment value in the record.
  */
  if (table->next_number_field && buf == table->record[0])
    update_auto_increment();
  return mi_write(file,buf);     // !
}

這些以字母ha開頭的程式是處理程式的介面,而這個程式是myisam處理程式的介面。我們這裡就開始調用MyISAM了。

可以看到這裡調用了mi_write(file,buf);

跟蹤/myisam/mi_write.c

int mi_write(MI_INFO *info, byte *record)
{
  _mi_readinfo(info,F_WRLCK,1);
  _mi_mark_file_changed(info);
  /* Calculate and check all unique constraints */
  for (i=0 ; i < share->state.header.uniques ; i++)
  {
    mi_check_unique(info,share->uniqueinfo+i,record,
      mi_unique_hash(share->uniqueinfo+i,record),
      HA_OFFSET_ERROR);
  }

  ... to be continued in next snippet

這裡有很多唯一性的校驗,繼續看下面

 ... continued from previous snippet

  /* Write all keys to indextree */
  for (i=0 ; i < share->base.keys ; i++)
  {
    share->keyinfo[i].ck_insert(info,i,buff,
      _mi_make_key(info,i,buff,record,filepos)
  }
  (*share->write_record)(info,record);
  if (share->base.auto_key)
    update_auto_increment(info,record);
}

這裡就是我們寫入到文件的地方。至此,MySQL的插入操作結束。

路徑為:

main in /sql/mysqld.cc
handle_connections_sockets in /sql/mysqld.cc
create_new_thread in /sql/mysqld.cc
handle_one_connection in /sql/sql_parse.cc
do_command in /sql/sql_parse.cc
dispatch_command in /sql/sql_parse.cc
mysql_stmt_execute in /sql/sql_prepare.cc
mysql_execute_command in /sql/sql_parse.cc
mysql_insert in /sql/mysql_insert.cc
write_record in /sql/mysql_insert.cc
ha_myisam::write_row in /sql/ha_myisam.cc
mi_write in /myisam/mi_write.c

1.進入主函數入口

2.建立socket connection的請求

3.創建一個新的執行緒

4.處理執行緒,分配記憶體資源

5.do_command,是獲取packet第一位元組,看做什麼操作,並接受餘下位元組。

6.dispatch_command,分發操作,這裡分發的是insert。

7.mysql_stmt_execute,檢查是否為execute,初始化,準備做execute動作。

8.mysql_execute_command ,lex解析SQL語句,進入到SQLCOM_INSERT

9.mysql_insert ,開始做插入操作。調用write_record

10.write_record,準備寫入,看調用哪個存儲引擎,寫入前期準備工作

11.ha_myisam::write_row,ha_myisam進行插入寫入。

12.mi_write,最後做寫入操作。

文獻參考://dev.mysql.com/doc/internals/en/guided-tour-skeleton.html