xxxxxxxxxx
221// ID服务器
2// 声明全局变量
3//
4
5
6
7
8
9//
10// 配置信息
11//
12extern char* cfg_maddrs; // MySQL地址表
13extern acl::master_str_tbl cfg_str[]; // 字符串配置表
14
15extern int cfg_mtimeout; // MySQL读写超时
16extern int cfg_maxoffset; // 最大偏移
17extern acl::master_int_tbl cfg_int[]; // 整型配置表
18
19extern std::vector<std::string> g_maddrs; // MySQL地址表
20extern std::string g_hostname; // 主机名
21extern std::vector<id_pair_t> g_ids; // ID表
22extern pthread_mutex_t g_mutex; // 互斥锁
xxxxxxxxxx
231// ID服务器
2// 定义全局变量
3//
4
5//
6// 配置信息
7//
8char* cfg_maddrs; // MySQL地址表
9acl::master_str_tbl cfg_str[] = { // 字符串配置表
10 {"mysql_addrs", "127.0.0.1", &cfg_maddrs},
11 {0, 0, 0}};
12
13int cfg_mtimeout; // MySQL读写超时
14int cfg_maxoffset; // 最大偏移
15acl::master_int_tbl cfg_int[] = { // 整型配置表
16 {"mysql_rw_timeout", 30, &cfg_mtimeout, 0, 0},
17 {"idinc_max_step", 100, &cfg_maxoffset, 0, 0},
18 {0, 0, 0, 0, 0}};
19
20std::vector<std::string> g_maddrs; // MySQL地址表
21std::string g_hostname; // 主机名
22std::vector<id_pair_t> g_ids; // ID表
23pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
xxxxxxxxxx
251// ID服务器
2// 声明数据库访问类
3//
4
5
6
7//
8// 数据库访问类
9//
10class db_c {
11public:
12 // 构造函数
13 db_c(void);
14 // 析构函数
15 ~db_c(void);
16
17 // 连接数据库
18 int connect(void);
19
20 // 获取ID当前值,同时产生下一个值
21 int get(char const* key, int inc, long* value) const;
22
23private:
24 MYSQL* m_mysql; // MySQL对象
25};
xxxxxxxxxx
991// ID服务器
2// 实现数据库访问类
3//
4
5
6
7// 构造函数
8db_c::db_c(void): m_mysql(mysql_init(NULL)) { // 创建MySQL对象
9 if (!m_mysql)
10 logger_error("create dao fail: %s", mysql_error(m_mysql));
11}
12
13// 析构函数
14db_c::~db_c(void) {
15 // 销毁MySQL对象
16 if (m_mysql) {
17 mysql_close(m_mysql);
18 m_mysql = NULL;
19 }
20}
21
22// 连接数据库
23int db_c::connect(void) {
24 MYSQL* mysql = m_mysql;
25
26 // 遍历MySQL地址表,尝试连接数据库
27 for (std::vector<std::string>::const_iterator maddr =
28 g_maddrs.begin(); maddr != g_maddrs.end(); ++maddr)
29 if ((m_mysql = mysql_real_connect(mysql, maddr->c_str(),
30 "root", "123456", "tnv_idsdb", 0, NULL, 0)))
31 return OK;
32
33 logger_error("connect database fail: %s",
34 mysql_error(m_mysql = mysql));
35 return ERROR;
36}
37
38// 获取ID当前值,同时产生下一个值
39int db_c::get(char const* key, int inc, long* value) const {
40 // 关闭自动提交
41 mysql_autocommit(m_mysql, 0);
42
43 // 查询数据库
44 acl::string sql;
45 sql.format("SELECT id_value FROM t_id_gen WHERE id='%s';", key);
46 if (mysql_query(m_mysql, sql.c_str())) {
47 logger_error("query database fail: %s, sql: %s",
48 mysql_error(m_mysql), sql.c_str());
49 mysql_autocommit(m_mysql, 1);
50 return ERROR;
51 }
52
53 // 获取查询结果
54 MYSQL_RES* res = mysql_store_result(m_mysql);
55 if (!res) {
56 logger_error("result is null: %s, sql: %s",
57 mysql_error(m_mysql), sql.c_str());
58 mysql_autocommit(m_mysql, 1);
59 return ERROR;
60 }
61
62 // 获取结果记录
63 MYSQL_ROW row = mysql_fetch_row(res);
64 if (row) { // 有记录
65 // 更新旧记录
66 sql.format("UPDATE t_id_gen SET id_value="
67 "id_value+%d WHERE id='%s';", inc, key);
68 if (mysql_query(m_mysql, sql.c_str())) {
69 logger_error("update database fail: %s, sql: %s",
70 mysql_error(m_mysql), sql.c_str());
71 mysql_autocommit(m_mysql, 1);
72 return ERROR;
73 }
74 // 提交数据库
75 mysql_commit(m_mysql);
76 // 库中当前值
77 *value = atol(row[0]);
78 }
79 else { // 无记录
80 // 插入新记录
81 sql.format("INSERT INTO t_id_gen SET id='%s', id_value='%d';",
82 key, inc);
83 if (mysql_query(m_mysql, sql.c_str())) {
84 logger_error("insert database fail: %s, sql: %s",
85 mysql_error(m_mysql), sql.c_str());
86 mysql_autocommit(m_mysql, 1);
87 return ERROR;
88 }
89 // 提交数据库
90 mysql_commit(m_mysql);
91 // 缺省当前值
92 *value = 0;
93 }
94
95 // 打开自动提交
96 mysql_autocommit(m_mysql, 1);
97
98 return OK;
99}
xxxxxxxxxx
291// ID服务器
2// 声明业务服务类
3//
4
5
6
7//
8// 业务服务类
9//
10class service_c {
11public:
12 // 业务处理
13 bool business(acl::socket_stream* conn, char const* head) const;
14
15private:
16 // 处理来自存储服务器的获取ID请求
17 bool get(acl::socket_stream* conn, long long bodylen) const;
18
19 // 根据ID的键获取其值
20 long get(char const* key) const;
21 // 从数据库中获取ID值
22 long fromdb(char const* key) const;
23
24 // 应答ID
25 bool id(acl::socket_stream* conn, long value) const;
26 // 应答错误
27 bool error(acl::socket_stream* conn, short errnumb,
28 char const* format, ...) const;
29};
xxxxxxxxxx
2051// ID服务器
2// 实现业务服务类
3//
4
5
6
7
8
9
10// 业务处理
11bool service_c::business(acl::socket_stream* conn,
12 char const* head) const {
13 // |包体长度|命令|状态| 包体 |
14 // | 8 | 1 | 1 |包体长度|
15 // 解析包头
16 long long bodylen = ntoll(head); // 包体长度
17 if (bodylen < 0) {
18 error(conn, -1, "invalid body length: %lld < 0", bodylen);
19 return false;
20 }
21 int command = head[BODYLEN_SIZE]; // 命令
22 int status = head[BODYLEN_SIZE+COMMAND_SIZE]; // 状态
23 logger("bodylen: %lld, command: %d, status: %d",
24 bodylen, command, status);
25
26 bool result;
27
28 // 根据命令执行具体业务处理
29 switch (command) {
30 case CMD_ID_GET:
31 // 处理来自存储服务器的获取ID请求
32 result = get(conn, bodylen);
33 break;
34
35 default:
36 error(conn, -1, "unknown command: %d", command);
37 return false;
38 }
39
40 return result;
41}
42
43////////////////////////////////////////////////////////////////////////
44
45// 处理来自存储服务器的获取ID请求
46bool service_c::get(acl::socket_stream* conn, long long bodylen) const {
47 // |包体长度|命令|状态|ID键|
48 // | 8 | 1 | 1 |64+1|
49 // 检查包体长度
50 long long expected = ID_KEY_MAX + 1; // 期望包体长度
51 if (bodylen > expected) {
52 error(conn, -1, "invalid body length: %lld > %lld",
53 bodylen, expected);
54 return false;
55 }
56
57 // 接收包体
58 char body[bodylen];
59 if (conn->read(body, bodylen) < 0) {
60 logger_error("read fail: %s, bodylen: %lld, from: %s",
61 acl::last_serror(), bodylen, conn->get_peer());
62 return false;
63 }
64
65 // 根据ID的键获取其值
66 long value = get(body);
67 if (value < 0) {
68 error(conn, -1, "get id fail, key: %s", body);
69 return false;
70 }
71
72 logger("get id ok, key: %s, value: %ld", body, value);
73
74 return id(conn, value);
75}
76
77////////////////////////////////////////////////////////////////////////
78
79// 根据ID的键获取其值
80long service_c::get(char const* key) const {
81 // 互斥锁加锁
82 if ((errno = pthread_mutex_lock(&g_mutex))) {
83 logger_error("call pthread_mutex_lock fail: %s",
84 strerror(errno));
85 return -1;
86 }
87
88 long value = -1;
89
90 // 在ID表中查找ID
91 std::vector<id_pair_t>::iterator id;
92 for (id = g_ids.begin(); id != g_ids.end(); ++id)
93 if (!strcmp(id->id_key, key))
94 break;
95 if (id != g_ids.end()) { // 找到该ID
96 if (id->id_offset < cfg_maxoffset) { // 该ID的偏移未及上限
97 value = id->id_value + id->id_offset;
98 ++id->id_offset;
99 }
100 else if ((value = fromdb(key)) >= 0) { // 从数据库中获取ID值
101 // 更新ID表中的ID
102 id->id_value = value;
103 id->id_offset = 1;
104 }
105 }
106 else if ((value = fromdb(key)) >= 0) { // 从数据库中获取ID值
107 // 在ID表中添加ID
108 id_pair_t id;
109 strcpy(id.id_key, key);
110 id.id_value = value;
111 id.id_offset = 1;
112 g_ids.push_back(id);
113 }
114
115 // 互斥锁解锁
116 if ((errno = pthread_mutex_unlock(&g_mutex))) {
117 logger_error("call pthread_mutex_unlock fail: %s",
118 strerror(errno));
119 return -1;
120 }
121
122 return value;
123}
124
125// 从数据库中获取ID值
126long service_c::fromdb(char const* key) const {
127 db_c db; // 数据库访问对象
128
129 // 连接数据库
130 if (db.connect() != OK)
131 return -1;
132
133 long value = -1;
134
135 // 获取ID当前值,同时产生下一个值
136 if (db.get(key, cfg_maxoffset, &value) != OK)
137 return -1;
138
139 return value;
140}
141
142////////////////////////////////////////////////////////////////////////
143
144// 应答ID
145bool service_c::id(acl::socket_stream* conn, long value) const {
146 // |包体长度|命令|状态|ID值|
147 // | 8 | 1 | 1 | 8 |
148 // 构造响应
149 long long bodylen = BODYLEN_SIZE;
150 long long resplen = HEADLEN + bodylen;
151 char resp[resplen] = {};
152 llton(bodylen, resp);
153 resp[BODYLEN_SIZE] = CMD_ID_REPLY;
154 resp[BODYLEN_SIZE+COMMAND_SIZE] = 0;
155 lton(value, resp + HEADLEN);
156
157 // 发送响应
158 if (conn->write(resp, resplen) < 0) {
159 logger_error("write fail: %s, resplen: %lld, to: %s",
160 acl::last_serror(), resplen, conn->get_peer());
161 return false;
162 }
163
164 return true;
165}
166
167// 应答错误
168bool service_c::error(acl::socket_stream* conn, short errnumb,
169 char const* format, ...) const {
170 // 错误描述
171 char errdesc[ERROR_DESC_SIZE];
172 va_list ap;
173 va_start(ap, format);
174 vsnprintf(errdesc, ERROR_DESC_SIZE, format, ap);
175 va_end(ap);
176 logger_error("%s", errdesc);
177 acl::string desc;
178 desc.format("[%s] %s", g_hostname.c_str(), errdesc);
179 memset(errdesc, 0, sizeof(errdesc));
180 strncpy(errdesc, desc.c_str(), ERROR_DESC_SIZE - 1);
181 size_t desclen = strlen(errdesc);
182 desclen += desclen != 0;
183
184 // |包体长度|命令|状态|错误号|错误描述|
185 // | 8 | 1 | 1 | 2 | <=1024 |
186 // 构造响应
187 long long bodylen = ERROR_NUMB_SIZE + desclen;
188 long long resplen = HEADLEN + bodylen;
189 char resp[resplen] = {};
190 llton(bodylen, resp);
191 resp[BODYLEN_SIZE] = CMD_ID_REPLY;
192 resp[BODYLEN_SIZE+COMMAND_SIZE] = STATUS_ERROR;
193 ston(errnumb, resp + HEADLEN);
194 if (desclen)
195 strcpy(resp + HEADLEN + ERROR_NUMB_SIZE, errdesc);
196
197 // 发送响应
198 if (conn->write(resp, resplen) < 0) {
199 logger_error("write fail: %s, resplen: %lld, to: %s",
200 acl::last_serror(), resplen, conn->get_peer());
201 return false;
202 }
203
204 return true;
205}
xxxxxxxxxx
341// ID服务器
2// 声明服务器类
3//
4
5
6
7//
8// 服务器类
9//
10class server_c: public acl::master_threads {
11protected:
12 // 进程切换用户后被调用
13 void proc_on_init(void);
14 // 子进程意图退出时被调用
15 // 返回true,子进程立即退出,否则
16 // 若配置项ioctl_quick_abort非0,子进程立即退出,否则
17 // 待所有客户机连接都关闭后,子进程再退出
18 bool proc_exit_timer(size_t nclients, size_t nthreads);
19
20 // 线程获得连接时被调用
21 // 返回true,连接将被用于后续通信,否则
22 // 函数返回后即关闭连接
23 bool thread_on_accept(acl::socket_stream* conn);
24 // 与线程绑定的连接可读时被调用
25 // 返回true,保持长连接,否则
26 // 函数返回后即关闭连接
27 bool thread_on_read(acl::socket_stream* conn);
28 // 线程读写连接超时时被调用
29 // 返回true,继续等待下一次读写,否则
30 // 函数返回后即关闭连接
31 bool thread_on_timeout(acl::socket_stream* conn);
32 // 与线程绑定的连接关闭时被调用
33 void thread_on_close(acl::socket_stream* conn);
34};
xxxxxxxxxx
881// ID服务器
2// 实现服务器类
3//
4
5
6
7
8
9
10
11// 进程切换用户后被调用
12void server_c::proc_on_init(void) {
13 // MySQL地址表
14 if (!cfg_maddrs || !*cfg_maddrs)
15 logger_fatal("mysql addresses is null");
16 split(cfg_maddrs, g_maddrs);
17 if (g_maddrs.empty())
18 logger_fatal("mysql addresses is empty");
19
20 // 主机名
21 char hostname[256+1] = {};
22 if (gethostname(hostname, sizeof(hostname) - 1))
23 logger_error("call gethostname fail: %s", strerror(errno));
24 g_hostname = hostname;
25
26 // 最大偏移不能太小
27 if (cfg_maxoffset < 10)
28 logger_fatal("invalid maximum offset: %d < 10", cfg_maxoffset);
29
30 // 打印配置信息
31 logger("cfg_maddrs: %s, cfg_mtimeout: %d, cfg_maxoffset: %d",
32 cfg_maddrs, cfg_mtimeout, cfg_maxoffset);
33}
34
35// 子进程意图退出时被调用
36// 返回true,子进程立即退出,否则
37// 若配置项ioctl_quick_abort非0,子进程立即退出,否则
38// 待所有客户机连接都关闭后,子进程再退出
39bool server_c::proc_exit_timer(size_t nclients, size_t nthreads) {
40 if (!nclients || !nthreads) {
41 logger("nclients: %lu, nthreads: %lu", nclients, nthreads);
42 return true;
43 }
44
45 return false;
46}
47
48// 线程获得连接时被调用
49// 返回true,连接将被用于后续通信,否则
50// 函数返回后即关闭连接
51bool server_c::thread_on_accept(acl::socket_stream* conn) {
52 logger("connect, from: %s", conn->get_peer());
53 return true;
54}
55
56// 与线程绑定的连接可读时被调用
57// 返回true,保持长连接,否则
58// 函数返回后即关闭连接
59bool server_c::thread_on_read(acl::socket_stream* conn) {
60 // 接收包头
61 char head[HEADLEN];
62 if (conn->read(head, HEADLEN) < 0) {
63 if (conn->eof())
64 logger("connection has been closed, from: %s",
65 conn->get_peer());
66 else
67 logger_error("read fail: %s, from: %s",
68 acl::last_serror(), conn->get_peer());
69 return false;
70 }
71
72 // 业务处理
73 service_c service;
74 return service.business(conn, head);
75}
76
77// 线程读写连接超时时被调用
78// 返回true,继续等待下一次读写,否则
79// 函数返回后即关闭连接
80bool server_c::thread_on_timeout(acl::socket_stream* conn) {
81 logger("read timeout, from: %s", conn->get_peer());
82 return true;
83}
84
85// 与线程绑定的连接关闭时被调用
86void server_c::thread_on_close(acl::socket_stream* conn) {
87 logger("client disconnect, from: %s", conn->get_peer());
88}
xxxxxxxxxx
1191// ID服务器
2// 定义主函数
3//
4
5
6
7int main(void) {
8 // 初始化ACL库
9 acl::acl_cpp_init();
10 acl::log::stdout_open(true);
11
12 // 创建并运行服务器
13 server_c& server = acl::singleton2<server_c>::get_instance();
14 server.set_cfg_str(cfg_str);
15 server.set_cfg_int(cfg_int);
16 server.run_alone("127.0.0.1:22000", "../etc/id.cfg");
17
18 return 0;
19}
xxxxxxxxxx
1211PROJ = ../../bin/id
2OBJS = $(patsubst %.cpp, %.o, $(wildcard ../01_common/*.cpp *.cpp))
3CC = g++
4LINK = g++
5RM = rm -rf
6CFLAGS = -c -Wall \
7 -I/usr/include/acl-lib/acl_cpp \
8 `mysql_config --cflags` \
9 -I../01_common
10LIBS = -pthread -lacl_all `mysql_config --libs`
11
12all: $(PROJ)
13
14$(PROJ): $(OBJS)
15 $(LINK) $^ $(LIBS) -o $@
16
17.cpp.o:
18 $(CC) $(CFLAGS) $^ -o $@
19
20clean:
21 $(RM) $(PROJ) $(OBJS)
xxxxxxxxxx
11service id {
2# MySQL地址表
3mysql_addrs = 127.0.0.1
4# MySQL读写超时
5mysql_rw_timeout = 30
6# 最大偏移
7idinc_max_step = 100
8}
1DROP DATABASE IF EXISTS tnv_idsdb;
2CREATE DATABASE tnv_idsdb;
3USE tnv_idsdb;
4
5CREATE TABLE `t_id_gen` (
6 `id` varchar(64) NOT NULL DEFAULT '',
7 `id_value` bigint(20) DEFAULT NULL,
8 `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
9 `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
10 PRIMARY KEY (`id`)
11) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;