xxxxxxxxxx
351// 跟踪服务器
2// 声明全局变量
3//
4
5
6
7
8
9
10
11
12//
13// 配置信息
14//
15extern char* cfg_appids; // 应用ID表
16extern char* cfg_maddrs; // MySQL地址表
17extern char* cfg_raddrs; // Redis地址表
18extern acl::master_str_tbl cfg_str[]; // 字符串配置表
19
20extern int cfg_interval; // 存储服务器状态检查间隔秒数
21extern int cfg_mtimeout; // MySQL读写超时
22extern int cfg_maxconns; // Redis连接池最大连接数
23extern int cfg_ctimeout; // Redis连接超时
24extern int cfg_rtimeout; // Redis读写超时
25extern int cfg_ktimeout; // Redis键超时
26extern acl::master_int_tbl cfg_int[]; // 整型配置表
27
28extern std::vector<std::string> g_appids; // 应用ID表
29extern std::vector<std::string> g_maddrs; // MySQL地址表
30extern std::vector<std::string> g_raddrs; // Redis地址表
31extern acl::redis_client_pool* g_rconns; // Redis连接池
32extern std::string g_hostname; // 主机名
33extern std::map<std::string,
34 std::list<storage_info_t> > g_groups; // 组表
35extern pthread_mutex_t g_mutex; // 互斥锁
xxxxxxxxxx
391// 跟踪服务器
2// 定义全局变量
3//
4
5//
6// 配置信息
7//
8char* cfg_appids; // 应用ID表
9char* cfg_maddrs; // MySQL地址表
10char* cfg_raddrs; // Redis地址表
11acl::master_str_tbl cfg_str[] = { // 字符串配置表
12 {"tnv_apps_id", "tnvideo", &cfg_appids},
13 {"mysql_addrs", "127.0.0.1", &cfg_maddrs},
14 {"redis_addrs", "127.0.0.1:6379", &cfg_raddrs},
15 {0, 0, 0}};
16
17int cfg_interval; // 存储服务器状态检测间隔秒数
18int cfg_mtimeout; // MySQL读写超时
19int cfg_maxconns; // Redis连接池最大连接数
20int cfg_ctimeout; // Redis连接超时
21int cfg_rtimeout; // Redis读写超时
22int cfg_ktimeout; // Redis键超时
23acl::master_int_tbl cfg_int[] = { // 整型配置表
24 {"check_active_interval", 120, &cfg_interval, 0, 0},
25 {"mysql_rw_timeout", 30, &cfg_mtimeout, 0, 0},
26 {"redis_max_conn_num", 600, &cfg_maxconns, 0, 0},
27 {"redis_conn_timeout", 10, &cfg_ctimeout, 0, 0},
28 {"redis_rw_timeout", 10, &cfg_rtimeout, 0, 0},
29 {"redis_key_timeout", 60, &cfg_ktimeout, 0, 0},
30 {0, 0, 0, 0, 0}};
31
32std::vector<std::string> g_appids; // 应用ID表
33std::vector<std::string> g_maddrs; // MySQL地址表
34std::vector<std::string> g_raddrs; // Redis地址表
35acl::redis_client_pool* g_rconns; // Redis连接池
36std::string g_hostname; // 主机名
37std::map<std::string,
38 std::list<storage_info_t> > g_groups; // 组表
39pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
xxxxxxxxxx
201// 跟踪服务器
2// 声明缓存类
3//
4
5
6
7//
8// 缓存类
9//
10class cache_c {
11public:
12 // 根据键获取其值
13 int get(char const* key, acl::string& value) const;
14
15 // 设置指定键的值
16 int set(char const* key, char const* value, int timeout = -1) const;
17
18 // 删除指定键值对
19 int del(char const* key) const;
20};
xxxxxxxxxx
1301// 跟踪服务器
2// 实现缓存类
3//
4
5
6
7// 根据键获取其值
8int cache_c::get(char const* key, acl::string& value) const {
9 // 构造键
10 acl::string tracker_key;
11 tracker_key.format("%s:%s", TRACKER_REDIS_PREFIX, key);
12
13 // 检查Redis连接池
14 if (!g_rconns) {
15 logger_warn("redis connection pool is null, key: %s",
16 tracker_key.c_str());
17 return ERROR;
18 }
19
20 // 从连接池中获取一个Redis连接
21 acl::redis_client* rconn = (acl::redis_client*)g_rconns->peek();
22 if (!rconn) {
23 logger_warn("peek redis connection fail, key: %s",
24 tracker_key.c_str());
25 return ERROR;
26 }
27
28 // 持有此连接的Redis对象即为Redis客户机
29 acl::redis redis;
30 redis.set_client(rconn);
31
32 // 借助Redis客户机根据键获取其值
33 if (!redis.get(tracker_key.c_str(), value)) {
34 logger_warn("get cache fail, key: %s", tracker_key.c_str());
35 g_rconns->put(rconn, false);
36 return ERROR;
37 }
38
39 // 检查空值
40 if (value.empty()) {
41 logger_warn("value is empty, key: %s", tracker_key.c_str());
42 g_rconns->put(rconn, false);
43 return ERROR;
44 }
45
46 logger("get cache ok, key: %s, value: %s",
47 tracker_key.c_str(), value.c_str());
48 g_rconns->put(rconn, true);
49
50 return OK;
51}
52
53// 设置指定键的值
54int cache_c::set(char const* key, char const* value,
55 int timeout /* = -1 */) const {
56 // 构造键
57 acl::string tracker_key;
58 tracker_key.format("%s:%s", TRACKER_REDIS_PREFIX, key);
59
60 // 检查Redis连接池
61 if (!g_rconns) {
62 logger_warn("redis connection pool is null, key: %s",
63 tracker_key.c_str());
64 return ERROR;
65 }
66
67 // 从连接池中获取一个Redis连接
68 acl::redis_client* rconn = (acl::redis_client*)g_rconns->peek();
69 if (!rconn) {
70 logger_warn("peek redis connection fail, key: %s",
71 tracker_key.c_str());
72 return ERROR;
73 }
74
75 // 持有此连接的Redis对象即为Redis客户机
76 acl::redis redis;
77 redis.set_client(rconn);
78
79 // 借助Redis客户机设置指定键的值
80 if (timeout < 0)
81 timeout = cfg_ktimeout;
82 if (!redis.setex(tracker_key.c_str(), value, timeout)) {
83 logger_warn("set cache fail, key: %s, value: %s, timeout: %d",
84 tracker_key.c_str(), value, timeout);
85 g_rconns->put(rconn, false);
86 return ERROR;
87 }
88 logger("set cache ok, key: %s, value: %s, timeout: %d",
89 tracker_key.c_str(), value, timeout);
90 g_rconns->put(rconn, true);
91
92 return OK;
93}
94
95// 删除指定键值对
96int cache_c::del(char const* key) const {
97 // 构造键
98 acl::string tracker_key;
99 tracker_key.format("%s:%s", TRACKER_REDIS_PREFIX, key);
100
101 // 检查Redis连接池
102 if (!g_rconns) {
103 logger_warn("redis connection pool is null, key: %s",
104 tracker_key.c_str());
105 return ERROR;
106 }
107
108 // 从连接池中获取一个Redis连接
109 acl::redis_client* rconn = (acl::redis_client*)g_rconns->peek();
110 if (!rconn) {
111 logger_warn("peek redis connection fail, key: %s",
112 tracker_key.c_str());
113 return ERROR;
114 }
115
116 // 持有此连接的Redis对象即为Redis客户机
117 acl::redis redis;
118 redis.set_client(rconn);
119
120 // 借助Redis客户机删除指定键值对
121 if (!redis.del_one(tracker_key.c_str())) {
122 logger_warn("delete cache fail, key: %s", tracker_key.c_str());
123 g_rconns->put(rconn, false);
124 return ERROR;
125 }
126 logger("delete cache ok, key: %s", tracker_key.c_str());
127 g_rconns->put(rconn, true);
128
129 return OK;
130}
xxxxxxxxxx
321// 跟踪服务器
2// 声明数据库访问类
3//
4
5
6
7
8
9//
10// 数据库访问类
11//
12class db_c {
13public:
14 // 构造函数
15 db_c(void);
16 // 析构函数
17 ~db_c(void);
18
19 // 连接数据库
20 int connect(void);
21
22 // 根据用户ID获取其对应的组名
23 int get(char const* userid, std::string& groupname) const;
24 // 设置用户ID和组名的对应关系
25 int set(char const* appid, char const* userid,
26 char const* groupname) const;
27 // 获取全部组名
28 int get(std::vector<std::string>& groupnames) const;
29
30private:
31 MYSQL* m_mysql; // MySQL对象
32};
xxxxxxxxxx
1381// 跟踪服务器
2// 实现数据库访问类
3//
4
5
6
7
8// 构造函数
9db_c::db_c(void): m_mysql(mysql_init(NULL)) { // 创建MySQL对象
10 if (!m_mysql)
11 logger_error("create dao fail: %s", mysql_error(m_mysql));
12}
13
14// 析构函数
15db_c::~db_c(void) {
16 // 销毁MySQL对象
17 if (m_mysql) {
18 mysql_close(m_mysql);
19 m_mysql = NULL;
20 }
21}
22
23// 连接数据库
24int db_c::connect(void) {
25 MYSQL* mysql = m_mysql;
26
27 // 遍历MySQL地址表,尝试连接数据库
28 for (std::vector<std::string>::const_iterator maddr =
29 g_maddrs.begin(); maddr != g_maddrs.end(); ++maddr)
30 if ((m_mysql = mysql_real_connect(mysql, maddr->c_str(),
31 "root", "123456", "tnv_trackerdb", 0, NULL, 0)))
32 return OK;
33
34 logger_error("connect database fail: %s",
35 mysql_error(m_mysql = mysql));
36 return ERROR;
37}
38
39// 根据用户ID获取其对应的组名
40int db_c::get(char const* userid, std::string& groupname) const {
41 // 先尝试从缓存中获取与用户ID对应的组名
42 cache_c cache;
43 acl::string key;
44 key.format("userid:%s", userid);
45 acl::string value;
46 if (cache.get(key, value) == OK) {
47 groupname = value.c_str();
48 return OK;
49 }
50
51 // 缓存中没有再查询数据库
52 acl::string sql;
53 sql.format("SELECT group_name FROM t_router WHERE userid='%s';",
54 userid);
55 if (mysql_query(m_mysql, sql.c_str())) {
56 logger_error("query database fail: %s, sql: %s",
57 mysql_error(m_mysql), sql.c_str());
58 return ERROR;
59 }
60
61 // 获取查询结果
62 MYSQL_RES* res = mysql_store_result(m_mysql);
63 if (!res) {
64 logger_error("result is null: %s, sql: %s",
65 mysql_error(m_mysql), sql.c_str());
66 return ERROR;
67 }
68
69 // 获取结果记录
70 MYSQL_ROW row = mysql_fetch_row(res);
71 if (!row)
72 logger_warn("result is empty: %s, sql: %s",
73 mysql_error(m_mysql), sql.c_str());
74 else {
75 groupname = row[0];
76
77 // 将用户ID和组名的对应关系保存在缓存中
78 cache.set(key, groupname.c_str());
79 }
80
81 return OK;
82}
83
84// 设置用户ID和组名的对应关系
85int db_c::set(char const* appid, char const* userid,
86 char const* groupname) const {
87 // 插入一条记录
88 acl::string sql;
89 sql.format("INSERT INTO t_router SET "
90 "appid='%s', userid='%s', group_name='%s';",
91 appid, userid, groupname);
92 if (mysql_query(m_mysql, sql.c_str())) {
93 logger_error("insert database fail: %s, sql: %s",
94 mysql_error(m_mysql), sql.c_str());
95 return ERROR;
96 }
97
98 // 检查插入结果
99 MYSQL_RES* res = mysql_store_result(m_mysql);
100 if (!res && mysql_field_count(m_mysql)) {
101 logger_error("insert database fail: %s, sql: %s",
102 mysql_error(m_mysql), sql.c_str());
103 return ERROR;
104 }
105
106 return OK;
107}
108
109// 获取全部组名
110int db_c::get(std::vector<std::string>& groupnames) const {
111 // 查询全部组名
112 acl::string sql;
113 sql.format("SELECT group_name FROM t_groups_info;");
114 if (mysql_query(m_mysql, sql.c_str())) {
115 logger_error("query database fail: %s, sql: %s",
116 mysql_error(m_mysql), sql.c_str());
117 return ERROR;
118 }
119
120 // 获取查询结果
121 MYSQL_RES* res = mysql_store_result(m_mysql);
122 if (!res) {
123 logger_error("result is null: %s, sql: %s",
124 mysql_error(m_mysql), sql.c_str());
125 return ERROR;
126 }
127
128 // 获取结果记录
129 int nrows = mysql_num_rows(res);
130 for (int i = 0; i < nrows; ++i) {
131 MYSQL_ROW row = mysql_fetch_row(res);
132 if (!row)
133 break;
134 groupnames.push_back(row[0]);
135 }
136
137 return OK;
138}
xxxxxxxxxx
481// 跟踪服务器
2// 声明业务服务类
3//
4
5
6
7
8//
9// 业务服务类
10//
11class service_c {
12public:
13 // 业务处理
14 bool business(acl::socket_stream* conn, char const* head) const;
15
16private:
17 // 处理来自存储服务器的加入包
18 bool join(acl::socket_stream* conn, long long bodylen) const;
19 // 处理来自存储服务器的心跳包
20 bool beat(acl::socket_stream* conn, long long bodylen) const;
21 // 处理来自客户机的获取存储服务器地址列表请求
22 bool saddrs(acl::socket_stream* conn, long long bodylen) const;
23 // 处理来自客户机的获取组列表请求
24 bool groups(acl::socket_stream* conn) const;
25
26 // 将存储服务器加入组表
27 int join(storage_join_t const* sj, char const* saddr) const;
28
29 // 将存储服务器标为活动
30 int beat(char const* groupname, char const* hostname,
31 char const* saddr) const;
32
33 // 响应客户机存储服务器地址列表
34 int saddrs(acl::socket_stream* conn,
35 char const* appid, char const* userid) const;
36 // 根据用户ID获取其对应的组名
37 int group_of_user(char const* appid,
38 char const* userid, std::string& groupname) const;
39 // 根据组名获取存储服务器地址列表
40 int saddrs_of_group(char const* groupname,
41 std::string& saddrs) const;
42
43 // 应答成功
44 bool ok(acl::socket_stream* conn) const;
45 // 应答错误
46 bool error(acl::socket_stream* conn, short errnumb,
47 char const* format, ...) const;
48};
xxxxxxxxxx
6401// 跟踪服务器
2// 实现业务服务类
3//
4
5
6
7
8
9
10
11// 业务处理
12bool service_c::business(acl::socket_stream* conn,
13 char const* head) const {
14 // |包体长度|命令|状态| 包体 |
15 // | 8 | 1 | 1 |包体长度|
16 // 解析包头
17 long long bodylen = ntoll(head); // 包体长度
18 if (bodylen < 0) {
19 error(conn, -1, "invalid body length: %lld < 0", bodylen);
20 return false;
21 }
22 int command = head[BODYLEN_SIZE]; // 命令
23 int status = head[BODYLEN_SIZE+COMMAND_SIZE]; // 状态
24 logger("bodylen: %lld, command: %d, status: %d",
25 bodylen, command, status);
26
27 bool result;
28
29 // 根据命令执行具体业务处理
30 switch (command) {
31 case CMD_TRACKER_JOIN:
32 // 处理来自存储服务器的加入包
33 result = join(conn, bodylen);
34 break;
35
36 case CMD_TRACKER_BEAT:
37 // 处理来自存储服务器的心跳包
38 result = beat(conn, bodylen);
39 break;
40
41 case CMD_TRACKER_SADDRS:
42 // 处理来自客户机的获取存储服务器地址列表请求
43 result = saddrs(conn, bodylen);
44 break;
45
46 case CMD_TRACKER_GROUPS:
47 // 处理来自客户机的获取组列表请求
48 result = groups(conn);
49 break;
50
51 default:
52 error(conn, -1, "unknown command: %d", command);
53 return false;
54 }
55
56 return result;
57}
58
59////////////////////////////////////////////////////////////////////////
60
61// 处理来自存储服务器的加入包
62bool service_c::join(acl::socket_stream* conn,
63 long long bodylen) const {
64 // |包体长度|命令|状态|storage_join_body_t|
65 // | 8 | 1 | 1 | 包体长度 |
66 // 检查包体长度
67 long long expected = sizeof(storage_join_body_t); // 期望包体长度
68 if (bodylen != expected) {
69 error(conn, -1, "invalid body length: %lld != %lld",
70 bodylen, expected);
71 return false;
72 }
73
74 // 接收包体
75 char body[bodylen];
76 if (conn->read(body, bodylen) < 0) {
77 logger_error("read fail: %s, bodylen: %lld, from: %s",
78 acl::last_serror(), bodylen, conn->get_peer());
79 return false;
80 }
81
82 // 解析包体
83 storage_join_t sj;
84 storage_join_body_t* sjb = (storage_join_body_t*)body;
85 // 版本
86 strcpy(sj.sj_version, sjb->sjb_version);
87 // 组名
88 strcpy(sj.sj_groupname, sjb->sjb_groupname);
89 if (valid(sj.sj_groupname) != OK) {
90 error(conn, -1, "invalid groupname: %s", sj.sj_groupname);
91 return false;
92 }
93 // 主机名
94 strcpy(sj.sj_hostname, sjb->sjb_hostname);
95 // 端口号
96 sj.sj_port = ntos(sjb->sjb_port);
97 if (!sj.sj_port) {
98 error(conn, -1, "invalid port: %u", sj.sj_port);
99 return false;
100 }
101 // 启动时间
102 sj.sj_stime = ntol(sjb->sjb_stime);
103 // 加入时间
104 sj.sj_jtime = ntol(sjb->sjb_jtime);
105 logger("storage join, version: %s, groupname: %s, "
106 "hostname: %s, port: %u, stime: %s, jtime: %s",
107 sj.sj_version, sj.sj_groupname,
108 sj.sj_hostname, sj.sj_port,
109 std::string(ctime(&sj.sj_stime)).c_str(),
110 std::string(ctime(&sj.sj_jtime)).c_str());
111
112 // 将存储服务器加入组表
113 if (join(&sj, conn->get_peer()) != OK) {
114 error(conn, -1, "join into groups fail");
115 return false;
116 }
117
118 return ok(conn);
119}
120
121// 处理来自存储服务器的心跳包
122bool service_c::beat(acl::socket_stream* conn,
123 long long bodylen) const {
124 // |包体长度|命令|状态|storage_beat_body_t|
125 // | 8 | 1 | 1 | 包体长度 |
126 // 检查包体长度
127 long long expected = sizeof(storage_beat_body_t); // 期望包体长度
128 if (bodylen != expected) {
129 error(conn, -1, "invalid body length: %lld != %lld",
130 bodylen, expected);
131 return false;
132 }
133
134 // 接收包体
135 char body[bodylen];
136 if (conn->read(body, bodylen) < 0) {
137 logger_error("read fail: %s, bodylen: %lld, from: %s",
138 acl::last_serror(), bodylen, conn->get_peer());
139 return false;
140 }
141
142 // 解析包体
143 storage_beat_body_t* sbb = (storage_beat_body_t*)body;
144 // 组名
145 char groupname[STORAGE_GROUPNAME_MAX+1];
146 strcpy(groupname, sbb->sbb_groupname);
147 // 主机名
148 char hostname[STORAGE_HOSTNAME_MAX+1];
149 strcpy(hostname, sbb->sbb_hostname);
150 logger("storage beat, groupname: %s, hostname: %s",
151 groupname, hostname);
152
153 // 将存储服务器标为活动
154 if (beat(groupname, hostname, conn->get_peer()) != OK) {
155 error(conn, -1, "mark storage as active fail");
156 return false;
157 }
158
159 return ok(conn);
160}
161
162// 处理来自客户机的获取存储服务器地址列表请求
163bool service_c::saddrs(acl::socket_stream* conn,
164 long long bodylen) const {
165 // |包体长度|命令|状态|应用ID|用户ID|文件ID|
166 // | 8 | 1 | 1 | 16 | 256 | 128 |
167 // 检查包体长度
168 long long expected = APPID_SIZE + USERID_SIZE + FILEID_SIZE;
169 if (bodylen != expected) {
170 error(conn, -1, "invalid body length: %lld != %lld",
171 bodylen, expected);
172 return false;
173 }
174
175 // 接收包体
176 char body[bodylen];
177 if (conn->read(body, bodylen) < 0) {
178 logger_error("read fail: %s, bodylen: %lld, from: %s",
179 acl::last_serror(), bodylen, conn->get_peer());
180 return false;
181 }
182
183 // 解析包体
184 char appid[APPID_SIZE];
185 strcpy(appid, body);
186 char userid[USERID_SIZE];
187 strcpy(userid, body + APPID_SIZE);
188 char fileid[FILEID_SIZE];
189 strcpy(fileid, body + APPID_SIZE + USERID_SIZE);
190
191 // 响应客户机存储服务器地址列表
192 if (saddrs(conn, appid, userid) != OK)
193 return false;
194
195 return true;
196}
197
198// 处理来自客户机的获取组列表请求
199bool service_c::groups(acl::socket_stream* conn) const {
200 // 互斥锁加锁
201 if ((errno = pthread_mutex_lock(&g_mutex))) {
202 logger_error("call pthread_mutex_lock fail: %s",
203 strerror(errno));
204 return false;
205 }
206
207 acl::string gps; // 全组字符串
208 gps.format(" COUNT OF GROUPS: %lu\n", g_groups.size());
209
210 // 遍历组表中的每一个组
211 for (std::map<std::string, std::list<storage_info_t> >::
212 const_iterator group = g_groups.begin();
213 group != g_groups.end(); ++group) {
214 acl::string grp; // 单组字符串
215 grp.format(" GROUPNAME: %s\n"
216 " COUNT OF STORAGES: %lu\n"
217 "COUNT OF ACTIVE STORAGES: %s\n",
218 group->first.c_str(),
219 group->second.size(),
220 "%d");
221
222 int act = 0; // 活动存储服务器数
223
224 // 遍历该组中的每一台存储服务器
225 for (std::list<storage_info_t>::const_iterator si =
226 group->second.begin(); si != group->second.end(); ++si) {
227 acl::string stg; // 存储服务器字符串
228 stg.format(" VERSION: %s\n"
229 " HOSTNAME: %s\n"
230 " ADDRESS: %s:%u\n"
231 " STARTUP TIME: %s"
232 " JOIN TIME: %s"
233 " BEAT TIME: %s"
234 " STATUS: ",
235 si->si_version,
236 si->si_hostname,
237 si->si_addr, si->si_port,
238 std::string(ctime(&si->si_stime)).c_str(),
239 std::string(ctime(&si->si_jtime)).c_str(),
240 std::string(ctime(&si->si_btime)).c_str());
241
242 switch (si->si_status) {
243 case STORAGE_STATUS_OFFLINE:
244 stg += "OFFLINE";
245 break;
246 case STORAGE_STATUS_ONLINE:
247 stg += "ONLINE";
248 break;
249 case STORAGE_STATUS_ACTIVE:
250 stg += "ACTIVE";
251 ++act;
252 break;
253 default:
254 stg += "UNKNOWN";
255 break;
256 }
257
258 grp += stg + "\n";
259 }
260
261 gps += grp.format(grp, act);
262 }
263
264 gps = gps.left(gps.size() - 1);
265
266 // 互斥锁解锁
267 if ((errno = pthread_mutex_unlock(&g_mutex))) {
268 logger_error("call pthread_mutex_unlock fail: %s",
269 strerror(errno));
270 return false;
271 }
272
273 // |包体长度|命令|状态| 组列表 |
274 // | 8 | 1 | 1 |包体长度|
275 // 构造响应
276 long long bodylen = gps.size() + 1;
277 long long resplen = HEADLEN + bodylen;
278 char resp[resplen] = {};
279 llton(bodylen, resp);
280 resp[BODYLEN_SIZE] = CMD_TRACKER_REPLY;
281 resp[BODYLEN_SIZE+COMMAND_SIZE] = 0;
282 strcpy(resp + HEADLEN, gps.c_str());
283
284 // 发送响应
285 if (conn->write(resp, resplen) < 0) {
286 logger_error("write fail: %s, resplen: %lld, to: %s",
287 acl::last_serror(), resplen, conn->get_peer());
288 return false;
289 }
290
291 return true;
292}
293
294////////////////////////////////////////////////////////////////////////
295
296// 将存储服务器加入组表
297int service_c::join(storage_join_t const* sj, char const* saddr) const {
298 // 互斥锁加锁
299 if ((errno = pthread_mutex_lock(&g_mutex))) {
300 logger_error("call pthread_mutex_lock fail: %s",
301 strerror(errno));
302 return ERROR;
303 }
304
305 // 在组表中查找待加入存储服务器所隶属的组
306 std::map<std::string, std::list<storage_info_t> >::iterator
307 group = g_groups.find(sj->sj_groupname);
308 if (group != g_groups.end()) { // 若找到该组
309 // 遍历该组的存储服务器列表
310 std::list<storage_info_t>::iterator si;
311 for (si = group->second.begin();
312 si != group->second.end(); ++si)
313 // 若待加入存储服务器已在该列表中
314 if (!strcmp(si->si_hostname, sj->sj_hostname) &&
315 !strcmp(si->si_addr, saddr)) {
316 // 更新该列表中的相应记录
317 strcpy(si->si_version, sj->sj_version); // 版本
318 si->si_port = sj->sj_port; // 端口号
319 si->si_stime = sj->sj_stime; // 启动时间
320 si->si_jtime = sj->sj_jtime; // 加入时间
321 si->si_btime = sj->sj_jtime; // 心跳时间
322 si->si_status = STORAGE_STATUS_ONLINE; // 状态
323 break;
324 }
325 // 若待加入存储服务器不在该列表中
326 if (si == group->second.end()) {
327 // 将待加入存储服务器加入该列表
328 storage_info_t si;
329 strcpy(si.si_version, sj->sj_version); // 版本
330 strcpy(si.si_hostname, sj->sj_hostname); // 主机名
331 strcpy(si.si_addr, saddr); // IP地址
332 si.si_port = sj->sj_port; // 端口号
333 si.si_stime = sj->sj_stime; // 启动时间
334 si.si_jtime = sj->sj_jtime; // 加入时间
335 si.si_btime = sj->sj_jtime; // 心跳时间
336 si.si_status = STORAGE_STATUS_ONLINE; // 状态
337 group->second.push_back(si);
338 }
339 }
340 else { // 若没有该组
341 // 将待加入存储服务器所隶属的组加入组表
342 g_groups[sj->sj_groupname] = std::list<storage_info_t>();
343 // 将待加入存储服务器加入该组的存储服务器列表
344 storage_info_t si;
345 strcpy(si.si_version, sj->sj_version); // 版本
346 strcpy(si.si_hostname, sj->sj_hostname); // 主机名
347 strcpy(si.si_addr, saddr); // IP地址
348 si.si_port = sj->sj_port; // 端口号
349 si.si_stime = sj->sj_stime; // 启动时间
350 si.si_jtime = sj->sj_jtime; // 加入时间
351 si.si_btime = sj->sj_jtime; // 心跳时间
352 si.si_status = STORAGE_STATUS_ONLINE; // 状态
353 g_groups[sj->sj_groupname].push_back(si);
354 }
355
356 // 互斥锁解锁
357 if ((errno = pthread_mutex_unlock(&g_mutex))) {
358 logger_error("call pthread_mutex_unlock fail: %s",
359 strerror(errno));
360 return ERROR;
361 }
362
363 return OK;
364}
365
366// 将存储服务器标为活动
367int service_c::beat(char const* groupname, char const* hostname,
368 char const* saddr) const {
369 // 互斥锁加锁
370 if ((errno = pthread_mutex_lock(&g_mutex))) {
371 logger_error("call pthread_mutex_lock fail: %s",
372 strerror(errno));
373 return ERROR;
374 }
375
376 int result = OK;
377
378 // 在组表中查找待标记存储服务器所隶属的组
379 std::map<std::string, std::list<storage_info_t> >::iterator
380 group = g_groups.find(groupname);
381 if (group != g_groups.end()) { // 若找到该组
382 // 遍历该组的存储服务器列表
383 std::list<storage_info_t>::iterator si;
384 for (si = group->second.begin();
385 si != group->second.end(); ++si)
386 // 若待标记存储服务器已在该列表中
387 if (!strcmp(si->si_hostname, hostname) &&
388 !strcmp(si->si_addr, saddr)) {
389 // 更新该列表中的相应记录
390 si->si_btime = time(NULL); // 心跳时间
391 si->si_status = STORAGE_STATUS_ACTIVE; // 状态
392 break;
393 }
394 // 若待标记存储服务器不在该列表中
395 if (si == group->second.end()) {
396 logger_error("storage not found, groupname: %s, "
397 "hostname: %s, saddr: %s", groupname, hostname, saddr);
398 result = ERROR;
399 }
400 }
401 else { // 若没有该组
402 logger_error("group not found, groupname: %s", groupname);
403 result = ERROR;
404 }
405
406 // 互斥锁解锁
407 if ((errno = pthread_mutex_unlock(&g_mutex))) {
408 logger_error("call pthread_mutex_unlock fail: %s",
409 strerror(errno));
410 return ERROR;
411 }
412
413 return result;
414}
415
416// 响应客户机存储服务器地址列表
417int service_c::saddrs(acl::socket_stream* conn,
418 char const* appid, char const* userid) const {
419 // 应用ID是否合法
420 if (valid(appid) != OK) {
421 error(conn, -1, "invalid appid: %s", appid);
422 return ERROR;
423 }
424
425 // 应用ID是否存在
426 if (std::find(g_appids.begin(), g_appids.end(),
427 appid) == g_appids.end()) {
428 error(conn, -1, "unknown appid: %s", appid);
429 return ERROR;
430 }
431
432 // 根据用户ID获取其对应的组名
433 std::string groupname;
434 if (group_of_user(appid, userid, groupname) != OK) {
435 error(conn, -1, "get groupname fail");
436 return ERROR;
437 }
438
439 // 根据组名获取存储服务器地址列表
440 std::string saddrs;
441 if (saddrs_of_group(groupname.c_str(), saddrs) != OK) {
442 error(conn, -1, "get storage address fail");
443 return ERROR;
444 }
445
446 logger("appid: %s, userid: %s, groupname: %s, saddrs: %s",
447 appid, userid, groupname.c_str(), saddrs.c_str());
448
449 // |包体长度|命令|状态|组名|存储服务器地址列表|
450 // | 8 | 1 | 1 | 包体长度 |
451 // 构造响应
452 long long bodylen = STORAGE_GROUPNAME_MAX + 1 + saddrs.size() + 1;
453 long long resplen = HEADLEN + bodylen;
454 char resp[resplen] = {};
455 llton(bodylen, resp);
456 resp[BODYLEN_SIZE] = CMD_TRACKER_REPLY;
457 resp[BODYLEN_SIZE+COMMAND_SIZE] = 0;
458 strncpy(resp + HEADLEN, groupname.c_str(), STORAGE_GROUPNAME_MAX);
459 strcpy(resp + HEADLEN + STORAGE_GROUPNAME_MAX + 1, saddrs.c_str());
460
461 // 发送响应
462 if (conn->write(resp, resplen) < 0) {
463 logger_error("write fail: %s, resplen: %lld, to: %s",
464 acl::last_serror(), resplen, conn->get_peer());
465 return ERROR;
466 }
467
468 return OK;
469}
470
471// 根据用户ID获取其对应的组名
472int service_c::group_of_user(char const* appid,
473 char const* userid, std::string& groupname) const {
474 db_c db; // 数据库访问对象
475
476 // 连接数据库
477 if (db.connect() != OK)
478 return ERROR;
479
480 // 根据用户ID获取其对应的组名
481 if (db.get(userid, groupname) != OK)
482 return ERROR;
483
484 // 组名为空表示该用户没有组,为其随机分配一个
485 if (groupname.empty()) {
486 logger("groupname is empty, appid: %s, userid: %s, allocate one",
487 appid, userid);
488
489 // 获取全部组名
490 std::vector<std::string> groupnames;
491 if (db.get(groupnames) != OK)
492 return ERROR;
493 if (groupnames.empty()) {
494 logger_error("groupnames is empty, appid: %s, userid: %s",
495 appid, userid);
496 return ERROR;
497 }
498
499 // 随机抽取组名
500 srand(time(NULL));
501 groupname = groupnames[rand() % groupnames.size()];
502
503 // 设置用户ID和组名的对应关系
504 if (db.set(appid, userid, groupname.c_str()) != OK)
505 return ERROR;
506 }
507
508 return OK;
509}
510
511// 根据组名获取存储服务器地址列表
512int service_c::saddrs_of_group(char const* groupname,
513 std::string& saddrs) const {
514 // 互斥锁加锁
515 if ((errno = pthread_mutex_lock(&g_mutex))) {
516 logger_error("call pthread_mutex_lock fail: %s",
517 strerror(errno));
518 return ERROR;
519 }
520
521 int result = OK;
522
523 // 根据组名在组表中查找特定组
524 std::map<std::string, std::list<storage_info_t> >::const_iterator
525 group = g_groups.find(groupname);
526 if (group != g_groups.end()) { // 若找到该组
527 if (!group->second.empty()) { // 若该组的存储服务器列表非空
528 // 在该组的存储服务器列表中,从随机位置开
529 // 始最多抽取三台处于活动状态的存储服务器
530 srand(time(NULL));
531 int nsis = group->second.size();
532 int nrand = rand() % nsis;
533 std::list<storage_info_t>::const_iterator si =
534 group->second.begin();
535 int nacts = 0;
536 for (int i = 0; i < nsis + nrand; ++i, ++si) {
537 if (si == group->second.end())
538 si = group->second.begin();
539 logger("i: %d, nrand: %d, addr: %s, port: %u, "
540 "status: %d", i, nrand, si->si_addr, si->si_port,
541 si->si_status);
542 if (i >= nrand && si->si_status ==
543 STORAGE_STATUS_ACTIVE) {
544 char saddr[256];
545 sprintf(saddr, "%s:%d", si->si_addr, si->si_port);
546 saddrs += saddr;
547 saddrs += ";";
548 if (++nacts >= 3)
549 break;
550 }
551 }
552 if (!nacts) { // 若没有处于活动状态的存储服务器
553 logger_error("no active storage in group %s",
554 groupname);
555 result = ERROR;
556 }
557 }
558 else { // 若该组的存储服务器列表为空
559 logger_error("no storage in group %s", groupname);
560 result = ERROR;
561 }
562 }
563 else { // 若没有该组
564 logger_error("not found group %s", groupname);
565 result = ERROR;
566 }
567
568 // 互斥锁解锁
569 if ((errno = pthread_mutex_unlock(&g_mutex))) {
570 logger_error("call pthread_mutex_unlock fail: %s",
571 strerror(errno));
572 return ERROR;
573 }
574
575 return result;
576}
577
578////////////////////////////////////////////////////////////////////////
579
580// 应答成功
581bool service_c::ok(acl::socket_stream* conn) const {
582 // |包体长度|命令|状态|
583 // | 8 | 1 | 1 |
584 // 构造响应
585 long long bodylen = 0;
586 long long resplen = HEADLEN + bodylen;
587 char resp[resplen] = {};
588 llton(bodylen, resp);
589 resp[BODYLEN_SIZE] = CMD_TRACKER_REPLY;
590 resp[BODYLEN_SIZE+COMMAND_SIZE] = 0;
591
592 // 发送响应
593 if (conn->write(resp, resplen) < 0) {
594 logger_error("write fail: %s, resplen: %lld, to: %s",
595 acl::last_serror(), resplen, conn->get_peer());
596 return false;
597 }
598
599 return true;
600}
601
602// 应答错误
603bool service_c::error(acl::socket_stream* conn, short errnumb,
604 char const* format, ...) const {
605 // 错误描述
606 char errdesc[ERROR_DESC_SIZE];
607 va_list ap;
608 va_start(ap, format);
609 vsnprintf(errdesc, ERROR_DESC_SIZE, format, ap);
610 va_end(ap);
611 logger_error("%s", errdesc);
612 acl::string desc;
613 desc.format("[%s] %s", g_hostname.c_str(), errdesc);
614 memset(errdesc, 0, sizeof(errdesc));
615 strncpy(errdesc, desc.c_str(), ERROR_DESC_SIZE - 1);
616 size_t desclen = strlen(errdesc);
617 desclen += desclen != 0;
618
619 // |包体长度|命令|状态|错误号|错误描述|
620 // | 8 | 1 | 1 | 2 | <=1024 |
621 // 构造响应
622 long long bodylen = ERROR_NUMB_SIZE + desclen;
623 long long resplen = HEADLEN + bodylen;
624 char resp[resplen] = {};
625 llton(bodylen, resp);
626 resp[BODYLEN_SIZE] = CMD_TRACKER_REPLY;
627 resp[BODYLEN_SIZE+COMMAND_SIZE] = STATUS_ERROR;
628 ston(errnumb, resp + HEADLEN);
629 if (desclen)
630 strcpy(resp + HEADLEN + ERROR_NUMB_SIZE, errdesc);
631
632 // 发送响应
633 if (conn->write(resp, resplen) < 0) {
634 logger_error("write fail: %s, resplen: %lld, to: %s",
635 acl::last_serror(), resplen, conn->get_peer());
636 return false;
637 }
638
639 return true;
640}
xxxxxxxxxx
271// 跟踪服务器
2// 声明存储服务器状态检查线程类
3//
4
5
6
7//
8// 存储服务器状态检查线程类
9//
10class status_c: public acl::thread {
11public:
12 // 构造函数
13 status_c(void);
14
15 // 终止线程
16 void stop(void);
17
18protected:
19 // 线程过程
20 void* run(void);
21
22private:
23 // 检查存储服务器状态
24 int check(void) const;
25
26 bool m_stop; // 是否终止
27};
xxxxxxxxxx
621// 跟踪服务器
2// 实现存储服务器状态检查线程类
3//
4
5
6
7
8// 构造函数
9status_c::status_c(void): m_stop(false) {
10}
11
12// 终止线程
13void status_c::stop(void) {
14 m_stop = true;
15}
16
17// 线程过程
18void* status_c::run(void) {
19 for (time_t last = time(NULL); !m_stop; sleep(1)) {
20 time_t now = time(NULL); // 现在
21
22 // 若现在距离最近一次检查存储服务器状态已足够久
23 if (now - last >= cfg_interval) {
24 check(); // 检查存储服务器状态
25 last = now; // 更新最近一次检查时间
26 }
27 }
28
29 return NULL;
30}
31
32// 检查存储服务器状态
33int status_c::check(void) const {
34 time_t now = time(NULL); // 现在
35
36 // 互斥锁加锁
37 if ((errno = pthread_mutex_lock(&g_mutex))) {
38 logger_error("call pthread_mutex_lock fail: %s",
39 strerror(errno));
40 return ERROR;
41 }
42
43 // 遍历组表中的每一个组
44 for (std::map<std::string, std::list<storage_info_t> >::iterator
45 group = g_groups.begin(); group != g_groups.end(); ++group)
46 // 遍历该组中的每一台存储服务器
47 for (std::list<storage_info_t>::iterator si =
48 group->second.begin(); si != group->second.end(); ++si)
49 // 若该存储服务器心跳停止太久
50 if (now - si->si_btime >= cfg_interval)
51 // 则将其状态标记为离线
52 si->si_status = STORAGE_STATUS_OFFLINE;
53
54 // 互斥锁解锁
55 if ((errno = pthread_mutex_unlock(&g_mutex))) {
56 logger_error("call pthread_mutex_unlock fail: %s",
57 strerror(errno));
58 return ERROR;
59 }
60
61 return OK;
62}
xxxxxxxxxx
401// 跟踪服务器
2// 声明服务器类
3//
4
5
6
7
8//
9// 服务器类
10//
11class server_c: public acl::master_threads {
12protected:
13 // 进程切换用户后被调用
14 void proc_on_init(void);
15 // 子进程意图退出时被调用
16 // 返回true,子进程立即退出,否则
17 // 若配置项ioctl_quick_abort非0,子进程立即退出,否则
18 // 待所有客户机连接都关闭后,子进程再退出
19 bool proc_exit_timer(size_t nclients, size_t nthreads);
20 // 进程退出前被调用
21 void proc_on_exit(void);
22
23 // 线程获得连接时被调用
24 // 返回true,连接将被用于后续通信,否则
25 // 函数返回后即关闭连接
26 bool thread_on_accept(acl::socket_stream* conn);
27 // 与线程绑定的连接可读时被调用
28 // 返回true,保持长连接,否则
29 // 函数返回后即关闭连接
30 bool thread_on_read(acl::socket_stream* conn);
31 // 线程读写连接超时时被调用
32 // 返回true,继续等待下一次读写,否则
33 // 函数返回后即关闭连接
34 bool thread_on_timeout(acl::socket_stream* conn);
35 // 与线程绑定的连接关闭时被调用
36 void thread_on_close(acl::socket_stream* conn);
37
38private:
39 status_c* m_status; // 存储服务器状态检查线程
40};
xxxxxxxxxx
1461// 跟踪服务器
2// 实现服务器类
3//
4
5
6
7
8
9
10
11// 进程切换用户后被调用
12void server_c::proc_on_init(void) {
13 // 应用ID表
14 if (!cfg_appids || !*cfg_appids)
15 logger_fatal("application ids is null");
16 split(cfg_appids, g_appids);
17 if (g_appids.empty())
18 logger_fatal("application ids is empty");
19
20 // MySQL地址表
21 if (!cfg_maddrs || !*cfg_maddrs)
22 logger_fatal("mysql addresses is null");
23 split(cfg_maddrs, g_maddrs);
24 if (g_maddrs.empty())
25 logger_fatal("mysql addresses is empty");
26
27 // Redis地址表
28 if (!cfg_raddrs || !*cfg_raddrs)
29 logger_error("redis addresses is null");
30 else {
31 split(cfg_raddrs, g_raddrs);
32 if (g_raddrs.empty())
33 logger_error("redis addresses is empty");
34 else {
35 // 遍历Redis地址表,尝试创建连接池
36 for (std::vector<std::string>::const_iterator raddr =
37 g_raddrs.begin(); raddr != g_raddrs.end(); ++raddr)
38 if ((g_rconns = new acl::redis_client_pool(
39 raddr->c_str(), cfg_maxconns))) {
40 // 设置Redis连接超时和读写超时
41 g_rconns->set_timeout(cfg_ctimeout, cfg_rtimeout);
42 break;
43 }
44 if (!g_rconns)
45 logger_error("create redis connection pool fail, cfg_raddrs: %s",
46 cfg_raddrs);
47 }
48 }
49
50 // 主机名
51 char hostname[256+1] = {};
52 if (gethostname(hostname, sizeof(hostname) - 1))
53 logger_error("call gethostname fail: %s", strerror(errno));
54 g_hostname = hostname;
55
56 // 创建并启动存储服务器状态检查线程
57 if ((m_status = new status_c)) {
58 m_status->set_detachable(false);
59 m_status->start();
60 }
61
62 // 打印配置信息
63 logger("cfg_appids: %s, cfg_maddrs: %s, cfg_raddrs: %s, "
64 "cfg_interval: %d, cfg_mtimeout: %d, cfg_maxconns: %d, "
65 "cfg_ctimeout: %d, cfg_rtimeout: %d, cfg_ktimeout: %d",
66 cfg_appids, cfg_maddrs, cfg_raddrs,
67 cfg_interval, cfg_mtimeout, cfg_maxconns,
68 cfg_ctimeout, cfg_rtimeout, cfg_ktimeout);
69}
70
71// 子进程意图退出时被调用
72// 返回true,子进程立即退出,否则
73// 若配置项ioctl_quick_abort非0,子进程立即退出,否则
74// 待所有客户机连接都关闭后,子进程再退出
75bool server_c::proc_exit_timer(size_t nclients, size_t nthreads) {
76 // 终止存储服务器状态检查线程
77 m_status->stop();
78
79 if (!nclients || !nthreads) {
80 logger("nclients: %lu, nthreads: %lu", nclients, nthreads);
81 return true;
82 }
83
84 return false;
85}
86
87// 进程退出前被调用
88void server_c::proc_on_exit(void) {
89 // 回收存储服务器状态检测线程
90 if (!m_status->wait(NULL))
91 logger_error("wait thread #%lu fail", m_status->thread_id());
92
93 // 销毁存储服务器状态检查线程
94 if (m_status) {
95 delete m_status;
96 m_status = NULL;
97 }
98
99 // 销毁Redis连接池
100 if (g_rconns) {
101 delete g_rconns;
102 g_rconns = NULL;
103 }
104}
105
106// 线程获得连接时被调用
107// 返回true,连接将被用于后续通信,否则
108// 函数返回后即关闭连接
109bool server_c::thread_on_accept(acl::socket_stream* conn) {
110 logger("connect, from: %s", conn->get_peer());
111 return true;
112}
113
114// 与线程绑定的连接可读时被调用
115// 返回true,保持长连接,否则
116// 函数返回后即关闭连接
117bool server_c::thread_on_read(acl::socket_stream* conn) {
118 // 接收包头
119 char head[HEADLEN];
120 if (conn->read(head, HEADLEN) < 0) {
121 if (conn->eof())
122 logger("connection has been closed, from: %s",
123 conn->get_peer());
124 else
125 logger_error("read fail: %s, from: %s",
126 acl::last_serror(), conn->get_peer());
127 return false;
128 }
129
130 // 业务处理
131 service_c service;
132 return service.business(conn, head);
133}
134
135// 线程读写连接超时时被调用
136// 返回true,继续等待下一次读写,否则
137// 函数返回后即关闭连接
138bool server_c::thread_on_timeout(acl::socket_stream* conn) {
139 logger("read timeout, from: %s", conn->get_peer());
140 return true;
141}
142
143// 与线程绑定的连接关闭时被调用
144void server_c::thread_on_close(acl::socket_stream* conn) {
145 logger("client disconnect, from: %s", conn->get_peer());
146}
xxxxxxxxxx
191// 跟踪服务器
2// 定义主函数
3//
4
5
6
7int main(void) {
8 // 初始化ACL库
9 acl::acl_cpp_init();
10 acl::log::stdout_open(true);
11
12 // 创建并运行服务器
13 server_c& server = acl::singleton2<server_c>::get_instance();
14 server.set_cfg_str(cfg_str);
15 server.set_cfg_int(cfg_int);
16 server.run_alone("127.0.0.1:21000", "../etc/tracker.cfg");
17
18 return 0;
19}
xxxxxxxxxx
211PROJ = ../../bin/tracker
2OBJS = $(patsubst %.cpp, %.o, $(wildcard ../01_common/*.cpp *.cpp))
3CC = g++
4LINK = g++
5RM = rm -rf
6CFLAGS = -c -Wall \
7 -I/usr/include/acl-lib/acl_cpp \
8 `mysql_config --cflags` \
9 -I../01_common
10LIBS = -pthread -lacl_all `mysql_config --libs`
11
12all: $(PROJ)
13
14$(PROJ): $(OBJS)
15 $(LINK) $^ $(LIBS) -o $@
16
17.cpp.o:
18 $(CC) $(CFLAGS) $^ -o $@
19
20clean:
21 $(RM) $(PROJ) $(OBJS)
xxxxxxxxxx
201service tracker {
2# 应用ID表
3tnv_apps_id = tnvideo
4# MySQL地址表
5mysql_addrs = 127.0.0.1
6# Redis地址表
7redis_addrs = 127.0.0.1:6379
8# 存储服务器状态检查间隔秒数
9check_active_interval = 120
10# MySQL读写超时
11mysql_rw_timeout = 30
12# Redis连接池最大连接数
13redis_max_conn_num = 600
14# Redis连接超时
15redis_conn_timeout = 10
16# Redis读写超时
17redis_rw_timeout = 10
18# Redis键超时
19redis_key_timeout = 60
20}
xxxxxxxxxx
231DROP DATABASE IF EXISTS tnv_trackerdb;
2CREATE DATABASE tnv_trackerdb;
3USE tnv_trackerdb;
4
5CREATE TABLE `t_groups_info` (
6 `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
7 `group_name` varchar(32) DEFAULT NULL,
8 `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
9 `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
10 PRIMARY KEY (`id`)
11) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
12
13INSERT INTO `t_groups_info` (`group_name`) VALUES ('group001');
14
15CREATE TABLE `t_router` (
16 `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
17 `appid` varchar(32) DEFAULT NULL,
18 `userid` varchar(128) DEFAULT NULL,
19 `group_name` varchar(32) DEFAULT NULL,
20 `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
21 `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
22 PRIMARY KEY (`id`)
23) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;