• 小工具,用于将数据插入到MySQL数据库中,包含5个方法

    • init传入数据库地址、用户名、密码、数据库名以及端口号连接

    • close关闭连接

    • insert

      • 给定vector将其中的元素视为一行进行插入(必须是完整的一行元素)
      • 给定csv路径,将完整的csv文件插入到数据库中。需要指定是否忽略第一行,以及每次读取多少行csv文件(避免内存爆掉)
    • insertBig插入大文件csv,给定csv路径,表名,是否需要忽略第一行(6000万行数据大约需要20分钟)

      • 需要以允许操作本地文件的标志登录,允许加载本地文件,实际调用的SQL命令如下
1
mysql --local-infile=1 -h 127.0.0.1 -P 3306 -u xuan -p
1
2
show global variables like 'local_infile';
set global local_infile=1;
1
2
3
4
LOAD DATA LOCAL INFILE 'path'
INTO TABLE table_name
FIELDS TERMINATED BY ','
;
  • 编译选项 g++ -std=c++11 -L/lib64/mysql -lmysqlclient main.cpp SendMsg2Mysql.cpp

  • SendMsg2Mysql.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#ifndef SendMsg2Mysql__
#define SendMsg2Mysql__
#include <mysql/mysql.h>
#include <string>
#include <vector>
#include <iostream>
class SendMsg2Mysql
{
private:
MYSQL mysql;
int start_line=0;
public:
// 连接数据库, 成功返回0, 失败返回-1
int init(std::string host, std::string user, std::string passwd, std::string db, unsigned int port);

// 插入数据(表名, 给定一行数据)
int insert(std::string table, std::vector<float>& data);

// 插入数据(给定csv路径, 表名, 每次读取csv数据量, 是否需要忽略第一行)
int insert(std::string path, std::string table, int chunk_size, bool ignore1lines);

// 插入大量数据(几百万几千万行, 给定csv路径, 表名, 是否需要忽略第一行)
int insertBig(std::string path, std::string table, bool ignore1lines);
void close();
};
#endif
  • SendMsg2Mysql.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include "SendMsg2Mysql.h"
#include <string.h>
#include <iostream>
#include <vector>
#include <fstream>
#include <sstream>

std::vector<std::vector<std::string>> read_csv_chunk(const std::string& filename, size_t chunk_size, size_t start_line) {
std::ifstream file(filename);
std::vector<std::vector<std::string>> data;
std::string line;
size_t current_line = 0;

// 跳过指定行数
while (current_line < start_line && std::getline(file, line)) {
current_line++;
}

// 读取指定行数的数据
while (std::getline(file, line) && data.size() < chunk_size) {
std::istringstream iss(line);
std::vector<std::string> tokens;
std::string token;
while (std::getline(iss, token, ',')) {
tokens.push_back(token);
}
data.push_back(tokens);
current_line++;
}
return data;
}


// 初始化需要传入连接数据库的地址、账户密码、数据库名、端口
int SendMsg2Mysql::init(std::string host, std::string user, std::string passwd, std::string db, unsigned int port)
{
mysql_init(&mysql);
// 连接数据库(mysql对象, 主机, 用户名, 密码, 数据库名, 端口, 套接字(nullptr由系统分配), 客户端标志(默认为0))
if(mysql_real_connect(&mysql, host.c_str(), user.c_str(), passwd.c_str(), db.c_str(), port, NULL, 0) == nullptr)
{
fprintf(stderr, "%s\n", mysql_error(&mysql)); //显示连接时遇到的错误
std::cerr << "Failed to connect to MySQL user!\n";
return -1;
}
std::cout << "Successfully to connect to MySQL user!\n";
// 设置编码格式为8
mysql_set_character_set(&mysql, "utf8");
return 0;
}


// 加载大数据文件(几百万几千万行)
int SendMsg2Mysql::insertBig(std::string path, std::string table, bool ignore1lines)
{
std::string a = "LOAD DATA LOCAL INFILE '";
std::string b = "' INTO TABLE ";
std::string c = " FIELDS TERMINATED BY ','";
std::string d = " IGNORE 1 LINES ";
std::string insert_query = ignore1lines ? (a + path + b + table + c + d) : (a + path + b + table + c);
if (mysql_query(&mysql, insert_query.c_str())) {
std::cerr << "Error: " << mysql_error(&mysql) << std::endl;
mysql_close(&mysql);
return -1;
}
}


// 插入数据(表名, 给定一个以逗号分隔的csv文件)
int SendMsg2Mysql::insert(std::string path, std::string table, int chunk_size, bool ignore1lines)
{
if(ignore1lines) start_line=1;
// 分段读取CSV文件并插入到MySQL表中
while (true) {
// 从CSV文件中读取数据块
std::vector<std::vector<std::string>> csv_data = read_csv_chunk(path, chunk_size, start_line);

if (csv_data.empty()) {
break; // 文件读取完成
}

// 插入数据到MySQL表中
for (const auto& row : csv_data) {
std::string a = "INSERT INTO ";
std::string b = " VALUES (";
std::string insert_query = a + table + b;
for (size_t i = 0; i < row.size(); ++i) {
insert_query += "'" + row[i] + "'";
if (i < row.size() - 1) {
insert_query += ", ";
}
}
insert_query = insert_query + ")";
if (mysql_query(&mysql, insert_query.c_str())) {
std::cerr << "Error: " << mysql_error(&mysql) << std::endl;
mysql_close(&mysql);
return -1;
}
}

// 更新起始行数
start_line += chunk_size;
}
}


// 插入数据(表名, 给定一行数据)
int SendMsg2Mysql::insert(std::string table, std::vector<float>& data)
{
MYSQL_STMT *stmt = mysql_stmt_init(&mysql); // 分配并初始化MySQL语句对象
MYSQL_BIND *bind = new MYSQL_BIND[data.size()]; // MySQL绑定对象数组
if (!stmt) {
fprintf(stderr, "初始化语句失败: %s\n", mysql_error(&mysql)); // 输出错误信息
return -1;
}
// 准备SQL语句,参数分别是:语句对象,SQL语句,语句长度
std::string a = "INSERT INTO ";
std::string c = " VALUES(?";
for(int i=1; i<data.size(); ++i)
{
c = c + ",?";
}
c = c + ")";
std::cout<<"HI"<<std::endl;
std::string query = a + table + c;
if (mysql_stmt_prepare(stmt, query.c_str(), query.length())) {
fprintf(stderr, "准备语句失败: %s\n", mysql_stmt_error(stmt)); // 输出错误信息
return -1;
}

// 绑定参数
unsigned long length = data.size() * sizeof(float);
memset(bind, 0, sizeof(bind)); // 清空绑定对象数组
for (int i = 0; i < data.size(); ++i) {
bind[i].buffer_type = MYSQL_TYPE_FLOAT; // 参数类型为浮点数
bind[i].buffer = &data[i]; // 绑定数据的地址
bind[i].is_null = 0; // 数据非空
}

// 绑定参数到语句对象
if (mysql_stmt_bind_param(stmt, bind)) {
fprintf(stderr, "绑定参数失败: %s\n", mysql_stmt_error(stmt)); // 输出错误信息
return -1;
}

// 执行语句
if (mysql_stmt_execute(stmt)) {
fprintf(stderr, "执行语句失败: %s\n", mysql_stmt_error(stmt)); // 输出错误信息
return -1;
}

// 关闭预处理语句
mysql_stmt_close(stmt);
}


void SendMsg2Mysql::close()
{
mysql_close(&mysql);
}