多线程C++更新MYSQL

继续上次那个需求的优化

使用MYSQL CONNECTOR/C++ JDBC接口开发MYSQL 应用程序

MYSQL C++接口
用AI和VSCODE开发MYSQLC++(JDBC)应用脚本
如何优化千倍降低加密耗时?

在这三篇文章介绍使用C++以及接口开发应用程序,或者小工具
最后优化是加密,放弃系统调用,直接用OPENSSL函数库.

今天我们继续优化,使用C++多线程 去并行更新MYSQL
下面先拿单线程数据看看

单线程模型
Total Rows:1
TotalRow1: 1538
TotalRow2: 1538
ColName: count(*)
SQLSTRING_VALUE: 1538
Get SQL LIST Elapsed Time:0s, 11ms, 741us, 658ns 
ENCRY NEW KEY Elapsed Time:0s, 6ms, 950us, 639ns 
UPDATE SQL Elapsed Time:4s, 585ms, 155us, 288ns 
TotalUpdateRows=1538
ALL Elapsed Time:4s, 639ms, 383us, 839ns 
 
 Performance counter stats for './main.exe':

            238.39 msec task-clock                #    0.028 CPUs utilized          
             3,118      context-switches          #    0.013 M/sec                  
                31      cpu-migrations            #    0.130 K/sec                  
             1,392      page-faults               #    0.006 M/sec                  
       564,511,231      cycles                    #    2.368 GHz                    
       274,603,902      instructions              #    0.49  insn per cycle         
        38,662,453      branches                  #  162.179 M/sec                  
         2,288,515      branch-misses             #    5.92% of all branches        

       8.484480222 seconds time elapsed
       0.041685000 seconds user
       0.282866000 seconds sys
单线程模型 
获得数据花费11毫秒,加密1538花费6.9毫秒,更新数据库花费4585毫秒。
CPU工作238.39毫秒;
发生3118次上下文切换;
31次CPU迁移;
5.64511亿CPU周期;
2.7460亿指令;
每周期0.49个指令;
产生3866万个分支;
分支预测失败率5.92%

  
多线程模型
SQLSTRING_VALUE: 1538
Get SQL LIST Elapsed Time:0s, 15ms, 676us, 971ns 
ENCRY NEW KEY Elapsed Time:0s, 9ms, 639us, 807ns 
RUN THREAD:0
RUN THREAD:1
RUN THREAD:2
RUN THREAD:3
Update RemainRow:2  Actaul Update Rows:3
Thread 3:TotalUpdateRows=383
Thread 2:TotalUpdateRows=384
Thread 0:TotalUpdateRows=384
0:线程已经结束执行。
Thread 1:TotalUpdateRows=384
1:线程已经结束执行。
2:线程已经结束执行。
3:线程已经结束执行。
Thread Update Elapsed Time:3s, 148ms, 616us, 18ns 
MAIN FREE:
ALL Elapsed Time:3s, 197ms, 509us, 467ns 


 Performance counter stats for './main.exe':

            163.42 msec task-clock                #    0.048 CPUs utilized          
             1,604      context-switches          #    0.010 M/sec                  
                54      cpu-migrations            #    0.330 K/sec                  
             1,529      page-faults               #    0.009 M/sec                  
       421,362,571      cycles                    #    2.578 GHz                    
       292,511,736      instructions              #    0.69  insn per cycle         
        39,041,804      branches                  #  238.906 M/sec                  
         1,662,566      branch-misses             #    4.26% of all branches        

       3.426073087 seconds time elapsed

       0.041813000 seconds user
       0.187438000 seconds sys

  

这么看来多线程下只提升了1秒 25% 有点鸡骨!

CPU耗时163.42毫秒;

发生1604次上下文切换;

54次CPU迁移;
4.2136亿CPU周期;

产生2.9251亿指令;

每周期0.69个指令;

产生39041万个分支;

分支预测失败率4.26%

绿色是提升,红色代表下降

简单的C++ 多线程

#include <iostream>
#include <thread>
#include <vector>

void threadFunction(int id) 
{
    // 子线程的代码逻辑
    std::cout << "子线程 " << id << " 执行完毕" << std::endl;
}

int main() 
{
    std::vector<std::thread> threads;

    // 创建多个子线程并将它们添加到向量中
    for (int i = 0; i < 5; ++i) 
{
        threads.push_back(std::thread(threadFunction, i));
    }

    // 等待所有子线程完成
    for (auto& t : threads)
 {
        t.join(); 
    }

    std::cout << "所有子线程已结束" << std::endl;

    return 0;
}

      

调用线程库<thread.h> 这是C++的库.
另外还有LINUX系统线程库,一般为POSIX线程库 #include<pthread.h>
 定义了线程数组
   std::vector<std::thread> threads; 
创建线程并插入到数组后面
   threads.push_back(std::thread(threadFunction, i));
创建线程 并分配工作函数,紧跟着函数的参数i
   std::thread(threadFunction, i)

线程创建就立马运行;
下个循环就是主程序空转,等待工作线程全部结束.

编写多线程函数的时候遇到的问题如下
1 线程工作函数的参数不支持引用 报错是无法重载
2 线程参数 无论是引用,指针 都是值COPY方式
3 线程结束后会摧毁一切,包含参数传来的对象
4 JOIN()和ThreadArray[i].detach(); 区别

2和1有点矛盾...... 
3 意味着它会FREE 掉MYSQL的链接,报DUBLE FREE

*** Error in `/home/shark/projects/CPP_Projects/UPDATE_MYSQL_CRYPTO/main.exe': double free or corruption (fasttop): 0x0000000000655150 ****** Error in `/home/shark/projects/CPP_Projects/UPDATE_MYSQL_CRYPTO/main.exe': invalid fastbin entry (free): 0x00000000006680a0 ***

通过GDB堆栈了解到释放了connect对像

(gdb) bt#0  0x00007ffff6195989 in raise () from /lib64/libc.so.6#1  0x00007ffff6197098 in abort () from /lib64/libc.so.6#2  0x00007ffff61d6197 in __libc_message () from /lib64/libc.so.6#3  0x00007ffff61dd56d in _int_free () from /lib64/libc.so.6#4  0x00007ffff75d0bdd in CRYPTO_free () from lib64/libcrypto.so.1.0.0#5  0x00007ffff76973f7 in X509_VERIFY_PARAM_free () from lib64/libcrypto.so.1.0.0#6  0x00007ffff79963c9 in SSL_free () from lib64/libssl.so.1.0.0#7  0x00007ffff6e39fa8 in vio_ssl_delete (vio=0x643c70) at ../../mysql-8.0.11/vio/viossl.cc:350#8  0x00007ffff6e19807 in end_server (mysql=0x642920) at ../../mysql-8.0.11/sql-common/client.cc:1486#9  0x00007ffff6e19b0b in cli_safe_read_with_ok (mysql=mysql@entry=0x642920, parse_ok=parse_ok@entry=false, is_data_packet=is_data_packet@entry=0x0) at ../../mysql-8.0.11/sql-common/client.cc:1010#10 0x00007ffff6e19c9f in cli_safe_read (mysql=mysql@entry=0x642920, is_data_packet=is_data_packet@entry=0x0) at ../../mysql-8.0.11/sql-common/client.cc:1115#11 0x00007ffff6e1abe9 in cli_read_query_result (mysql=0x642920) at ../../mysql-8.0.11/sql-common/client.cc:5209#12 0x00007ffff6e24bbf in execute (stmt=stmt@entry=0x743790, packet=packet@entry=0x7ffff0000a30 "", length=length@entry=171) at ../../mysql-8.0.11/libmysql/libmysql.cc:1933#13 0x00007ffff6e25a15 in cli_stmt_execute (stmt=0x743790) at ../../mysql-8.0.11/libmysql/libmysql.cc:2057#14 0x00007ffff6e26bb4 in mysql_stmt_execute (stmt=0x743790) at ../../mysql-8.0.11/libmysql/libmysql.cc:2392#15 0x00007ffff6df9c2a in sql::mysql::MySQL_Prepared_Statement::do_query() () from lib64/libmysqlcppconn.so.7#16 0x00007ffff6df60de in sql::mysql::MySQL_Prepared_Statement::executeUpdate() () from lib64/libmysqlcppconn.so.7#17 0x0000000000405379 in ProcessUpdateData (RowLimit=100, prep_stmt=0x7436e0, subDataArray=std::vector of length 384, capacity 384 = {...}) at main.cpp:440#18 0x000000000040ba35 in std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)>::_M_invoke<0ul, 1ul, 2ul>(std::_Index_tuple<0ul, 1ul, 2ul>) (this=0x751d70) at /usr/include/c++/4.8.2/functional:1732#19 0x000000000040b854 in std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)>::operator()() (this=0x751d70) at /usr/include/c++/4.8.2/functional:1720#20 0x000000000040b7dd in std::thread::_Impl<std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >))(int, sql::PreparedStatement*, std::vector<TUpdateData, std::allocator<TUpdateData> >)> >::_M_run() (this=0x751d58) at /usr/include/c++/4.8.2/thread:115#21 0x00007ffff6aedda0 in ?? () from /lib64/libstdc++.so.6#22 0x00007ffff7bc7df3 in start_thread () from /lib64/libpthread.so.0#23 0x00007ffff62563dd in clone () from /lib64/libc.so.6

为此不得不NEW一个出来

//传给线程工作函数的必须是NEW出新对象,线程退出它会摧毁所有传过去的对象
sql::mysql::MySQL_Driver *driverThread = new sql::mysql::MySQL_Driver();
sql::Connection *connectThread = driverThread->connect(URL, config["DB_USER"], config["DB_PASW"]);
connectThread->setSchema(config["DB_NAME"]);
sql::PreparedStatement *prep_stmtThread = connectThread->prepareStatement(config["UPDATE_SQL"]);
std::cout << "RUN THREAD:" << i << std::endl;
ThreadArray[i] = std::thread(ProcessUpdateData, i, prep_stmtThread, subUpdateDataArray); // 值传给线程函数
      

4 AI 给了错误提示 循环启动线程,然后马上join() 导致主程序等待线程完成,再创建下个线程. 换成detach() 主程序就不等待了,立马把所有线程启动起来了. 可是这样主线程执行完后面的代码就结束了.主程序一结束立马把所有线程都杀了. 为此使用下面语句进行等待一段时间. 这就没有意思了

std::this_thread::sleep_for(std::chrono::seconds(10));

  

所以采用双循环模式,先把所有线程启动,运行,然后再另外个循环等待
其实这英文单词取得很绕 join() 应该是wait() detach()应该是nowait()
 

最后C++相对于C语言来说编程非常便利和友好化
1 动态数组 vercort<>
      可以RESIZE() 后插自动RESIZE()
2 数组动态定义大小 
      std::thread ThreadArray[WorkThreadNum];
3 类型自动推动
    auto i=3; 自动推动i是整型
4 智能指针 
 auto_ptr   std::auto_ptr<int> ap1; 使用完不用delete,自动释放。unique_ptr 唯一指针,禁止指针拷贝; shared_ptr共享指针,通过计数器方式解决释放问题

主程序 多线程调用核心源码

auto start_time_Thread = std::chrono::high_resolution_clock::now();
 u_int32_t RemainRows = 0;                                 // max 42,9496,7295
 u_int32_t PerThreadProcessRow = 0;                        // max 42,9496,7295
 u_int8_t WorkThreadNum = std::stoi(config["WORK_THEAD"]); // max 256
 int ROWLIMIT = std::stoi(config["ROW_LIMIT"]);            // max 65535

 if (Total_row1 % WorkThreadNum > 0)
 {
     PerThreadProcessRow = Total_row1 / WorkThreadNum;
     RemainRows = Total_row1 % WorkThreadNum;
 }
 else
 {
     PerThreadProcessRow = Total_row1 / WorkThreadNum;
 }

// UpdateDataArray.resize(Old_Data_Map.size());
// subUpdateDataArray.resize(PerThreadProcessRow);     
//  两个MAP 对象 合并成一个结构体动态容器
 std::vector<TUpdateData> UpdateDataArray(Old_Data_Map.size()), subUpdateDataArray(PerThreadProcessRow);
 auto Old_it = Old_Data_Map.begin();
 auto New_it = New_Data_Map.begin();
 for (int i = 0; Old_it != Old_Data_Map.end(); i++, ++Old_it, ++New_it)
 {
     UpdateDataArray[i].Card_id = Old_it->first;
     UpdateDataArray[i].Old_Encry = Old_it->second;
     UpdateDataArray[i].New_Encry = New_it->second;
 }

 //生成线程数组,分配数据,启动线程
 std::thread ThreadArray[WorkThreadNum];
 for (int i = 0; i < WorkThreadNum; i++)
 {
     for (int j = 0; j < PerThreadProcessRow; j++)
     {
         int k = i * PerThreadProcessRow + j;
         subUpdateDataArray[j] = UpdateDataArray[k];
     }
                         //传给线程工作函数的必须是NEW出新对象,线程退出它会摧毁所有传过去的对象
     sql::mysql::MySQL_Driver *driverThread = new sql::mysql::MySQL_Driver();
     sql::Connection *connectThread = driverThread->connect(URL, config["DB_USER"], config["DB_PASW"]);
     connectThread->setSchema(config["DB_NAME"]);
     sql::PreparedStatement *prep_stmtThread = connectThread->prepareStatement(config["UPDATE_SQL"]);
     std::cout << "RUN THREAD:" << i << std::endl;
     ThreadArray[i] = std::thread(ProcessUpdateData, i, prep_stmtThread, subUpdateDataArray); // 值传给线程函数
     // ThreadArray[i].detach();
 }

 //使用主线程链接语句对象完成剩余数据的更新 

 sql::PreparedStatement *prep_stmt = nullptr;
 int UpdateCount=0;
 int TotalUpdateRows=0;
 prep_stmt = con->prepareStatement(config["UPDATE_SQL"]);
 if (RemainRows >0 )
 {
    for (int f=WorkThreadNum*PerThreadProcessRow-1;  f < UpdateDataArray.size();f++)
    {
     prep_stmt->setString(1, UpdateDataArray[f].New_Encry);
     prep_stmt->setString(2, UpdateDataArray[f].Card_id);
     prep_stmt->setString(3, UpdateDataArray[f].Old_Encry);
     UpdateCount = prep_stmt->executeUpdate();
     TotalUpdateRows = UpdateCount + TotalUpdateRows;
    }
    std::cout << "Update RemainRow:"<<RemainRows<<"\t Actaul Update Rows:"<<TotalUpdateRows<< std::endl;
    RemainRows = 0;
  }        

 //等待所有线程工作完成

 for (int i = 0; i < WorkThreadNum; ++i)
 {
     if (ThreadArray[i].joinable())
     {
         ThreadArray[i].join();
         std::cout << i << ":线程已经结束执行。" << std::endl;
     }
 }
 auto end_time_Thread = std::chrono::high_resolution_clock::now();
 std::cout << "Thread Update Elapsed Time:" << Calc_time_diff(end_time_Thread, start_time_Thread) << std::endl;     

线程工作函数

void ProcessUpdateData(int RowLimit, sql::PreparedStatement *prep_stmt, std::vector<TUpdateData> subDataArray)
{

    u_int64_t UpdateCount, TotalUpdateRows;
    UpdateCount = 0;
    TotalUpdateRows = 0;
    try
    {
        // prep_stmt->getConnection()->setAutoCommit(false);
        for (auto &pArray : subDataArray)
        {
            prep_stmt->setString(1, pArray.New_Encry);
            prep_stmt->setString(2, pArray.Card_id);
            prep_stmt->setString(3, pArray.Old_Encry);
            UpdateCount = prep_stmt->executeUpdate();
            TotalUpdateRows = UpdateCount + TotalUpdateRows;
        }

        std::cout << "Thread " << RowLimit << ":TotalUpdateRows=" << TotalUpdateRows << std::endl;
    }
    catch (sql::SQLException &e)
    {
        std::cout << "# ERR: SQLException in " << __FILE__; // 打印异常 文件名  函数名 行号  SQL异常 代码 状态等信息
        std::cout << "(" << __FUNCTION__ << ") on line " << __LINE__ << std::endl;
        std::cout << "SQLException: " << e.what() << std::endl;
        std::cout << "MySQL error code: " << e.getErrorCode() << std::endl;
        std::cout << "SQLState: " << e.getSQLState() << std::endl;
    }
}     

   

线程参数不支持引用,否则报错​​​​​​​

In file included from /usr/include/c++/4.8.2/thread:39:0,                 from main.cpp:14:/usr/include/c++/4.8.2/functional: In instantiation of ‘struct std::_Bind_simple<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))(int&, sql::PreparedStatement*, std::vector<TUpdateData>)>’:/usr/include/c++/4.8.2/thread:137:47:   required from ‘std::thread::thread(_Callable&&, _Args&& ...) [with _Callable = void (&)(int&, sql::PreparedStatement*, std::vector<TUpdateData>); _Args = {int&, sql::PreparedStatement*&, std::vector<TUpdateData, std::allocator<TUpdateData> >&}]’main.cpp:209:99:   required from here/usr/include/c++/4.8.2/functional:1697:61: 错误:no type named ‘type’ in ‘class std::result_of<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))(int&, sql::PreparedStatement*, std::vector<TUpdateData>)>’       typedef typename result_of<_Callable(_Args...)>::type result_type;                                                             ^/usr/include/c++/4.8.2/functional:1727:9: 错误:no type named ‘type’ in ‘class std::result_of<void (*(int, sql::PreparedStatement*, std::vector<TUpdateData>))(int&, sql::PreparedStatement*, std::vector<TUpdateData>)>’         _M_invoke(_Index_tuple<_Indices...>)

这样定义是不对的

void ProcessUpdateData(int& RowLimit, sql::PreparedStatement *prep_stmt, std::vector<TUpdateData> subDataArray)

日后有空研究线程的同步技术...

相关推荐

  1. 线C++更新MYSQL

    2024-03-16 06:32:05       21 阅读
  2. C++ 线

    2024-03-16 06:32:05       26 阅读
  3. C++ 线

    2024-03-16 06:32:05       18 阅读
  4. C#线

    2024-03-16 06:32:05       12 阅读
  5. C++ 线

    2024-03-16 06:32:05       11 阅读
  6. 线C#】

    2024-03-16 06:32:05       8 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-16 06:32:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-16 06:32:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-16 06:32:05       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-16 06:32:05       20 阅读

热门阅读

  1. 前端框架的发展史

    2024-03-16 06:32:05       21 阅读
  2. hive行转列函数stack(int n, v_1, v_2, ..., v_k)

    2024-03-16 06:32:05       19 阅读
  3. WPF实现拖动控件功能(类似从工具箱拖出工具)

    2024-03-16 06:32:05       20 阅读
  4. C++中using 和 typedef 的区别

    2024-03-16 06:32:05       18 阅读
  5. 半监督学习--一起学习吧之人工智能

    2024-03-16 06:32:05       22 阅读