关键词搜索

源码搜索 ×
×

漫话Redis源码之三十一

发布2021-12-12浏览626次

详情内容

这里没什么特别的,考大家一个问题,为什么在mstime中需要加long long转化?是为了溢出,这里做得非常专业,我曾经踩过这个坑:

  1. typedef struct _client {
  2. redisContext *context;
  3. sds obuf;
  4. char **randptr; /* Pointers to :rand: strings inside the command buf */
  5. size_t randlen; /* Number of pointers in client->randptr */
  6. size_t randfree; /* Number of unused pointers in client->randptr */
  7. char **stagptr; /* Pointers to slot hashtags (cluster mode only) */
  8. size_t staglen; /* Number of pointers in client->stagptr */
  9. size_t stagfree; /* Number of unused pointers in client->stagptr */
  10. size_t written; /* Bytes of 'obuf' already written */
  11. long long start; /* Start time of a request */
  12. long long latency; /* Request latency */
  13. int pending; /* Number of pending requests (replies to consume) */
  14. int prefix_pending; /* If non-zero, number of pending prefix commands. Commands
  15. such as auth and select are prefixed to the pipeline of
  16. benchmark commands and discarded after the first send. */
  17. int prefixlen; /* Size in bytes of the pending prefix commands */
  18. int thread_id;
  19. struct clusterNode *cluster_node;
  20. int slots_last_update;
  21. } *client;
  22. /* Threads. */
  23. typedef struct benchmarkThread {
  24. int index;
  25. pthread_t thread;
  26. aeEventLoop *el;
  27. } benchmarkThread;
  28. /* Cluster. */
  29. typedef struct clusterNode {
  30. char *ip;
  31. int port;
  32. sds name;
  33. int flags;
  34. sds replicate; /* Master ID if node is a slave */
  35. int *slots;
  36. int slots_count;
  37. int current_slot_index;
  38. int *updated_slots; /* Used by updateClusterSlotsConfiguration */
  39. int updated_slots_count; /* Used by updateClusterSlotsConfiguration */
  40. int replicas_count;
  41. sds *migrating; /* An array of sds where even strings are slots and odd
  42. * strings are the destination node IDs. */
  43. sds *importing; /* An array of sds where even strings are slots and odd
  44. * strings are the source node IDs. */
  45. int migrating_count; /* Length of the migrating array (migrating slots*2) */
  46. int importing_count; /* Length of the importing array (importing slots*2) */
  47. struct redisConfig *redis_config;
  48. } clusterNode;
  49. typedef struct redisConfig {
  50. sds save;
  51. sds appendonly;
  52. } redisConfig;
  53. /* Prototypes */
  54. char *redisGitSHA1(void);
  55. char *redisGitDirty(void);
  56. static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask);
  57. static void createMissingClients(client c);
  58. static benchmarkThread *createBenchmarkThread(int index);
  59. static void freeBenchmarkThread(benchmarkThread *thread);
  60. static void freeBenchmarkThreads();
  61. static void *execBenchmarkThread(void *ptr);
  62. static clusterNode *createClusterNode(char *ip, int port);
  63. static redisConfig *getRedisConfig(const char *ip, int port,
  64. const char *hostsocket);
  65. static redisContext *getRedisContext(const char *ip, int port,
  66. const char *hostsocket);
  67. static void freeRedisConfig(redisConfig *cfg);
  68. static int fetchClusterSlotsConfiguration(client c);
  69. static void updateClusterSlotsConfiguration();
  70. int showThroughput(struct aeEventLoop *eventLoop, long long id,
  71. void *clientData);
  72. static sds benchmarkVersion(void) {
  73. sds version;
  74. version = sdscatprintf(sdsempty(), "%s", REDIS_VERSION);
  75. /* Add git commit and working tree status when available */
  76. if (strtoll(redisGitSHA1(),NULL,16)) {
  77. version = sdscatprintf(version, " (git:%s", redisGitSHA1());
  78. if (strtoll(redisGitDirty(),NULL,10))
  79. version = sdscatprintf(version, "-dirty");
  80. version = sdscat(version, ")");
  81. }
  82. return version;
  83. }
  84. /* Dict callbacks */
  85. static uint64_t dictSdsHash(const void *key);
  86. static int dictSdsKeyCompare(void *privdata, const void *key1,
  87. const void *key2);
  88. /* Implementation */
  89. static long long ustime(void) {
  90. struct timeval tv;
  91. long long ust;
  92. gettimeofday(&tv, NULL);
  93. ust = ((long)tv.tv_sec)*1000000;
  94. ust += tv.tv_usec;
  95. return ust;
  96. }
  97. static long long mstime(void) {
  98. struct timeval tv;
  99. long long mst;
  100. gettimeofday(&tv, NULL);
  101. mst = ((long long)tv.tv_sec)*1000;
  102. mst += tv.tv_usec/1000;
  103. return mst;
  104. }
  105. static uint64_t dictSdsHash(const void *key) {
  106. return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
  107. }
  108. static int dictSdsKeyCompare(void *privdata, const void *key1,
  109. const void *key2)
  110. {
  111. int l1,l2;
  112. DICT_NOTUSED(privdata);
  113. l1 = sdslen((sds)key1);
  114. l2 = sdslen((sds)key2);
  115. if (l1 != l2) return 0;
  116. return memcmp(key1, key2, l1) == 0;
  117. }
  118. /* _serverAssert is needed by dict */
  119. void _serverAssert(const char *estr, const char *file, int line) {
  120. fprintf(stderr, "=== ASSERTION FAILED ===");
  121. fprintf(stderr, "==> %s:%d '%s' is not true",file,line,estr);
  122. *((char*)-1) = 'x';
  123. }
  124. static redisContext *getRedisContext(const char *ip, int port,
  125. const char *hostsocket)
  126. {
  127. redisContext *ctx = NULL;
  128. redisReply *reply = NULL;
  129. if (hostsocket == NULL)
  130. ctx = redisConnect(ip, port);
  131. else
  132. ctx = redisConnectUnix(hostsocket);
  133. if (ctx == NULL || ctx->err) {
  134. fprintf(stderr,"Could not connect to Redis at ");
  135. char *err = (ctx != NULL ? ctx->errstr : "");
  136. if (hostsocket == NULL)
  137. fprintf(stderr,"%s:%d: %s\n",ip,port,err);
  138. else
  139. fprintf(stderr,"%s: %s\n",hostsocket,err);
  140. goto cleanup;
  141. }
  142. if (config.tls==1) {
  143. const char *err = NULL;
  144. if (cliSecureConnection(ctx, config.sslconfig, &err) == REDIS_ERR && err) {
  145. fprintf(stderr, "Could not negotiate a TLS connection: %s\n", err);
  146. goto cleanup;
  147. }
  148. }
  149. if (config.auth == NULL)
  150. return ctx;
  151. if (config.user == NULL)
  152. reply = redisCommand(ctx,"AUTH %s", config.auth);
  153. else
  154. reply = redisCommand(ctx,"AUTH %s %s", config.user, config.auth);
  155. if (reply != NULL) {
  156. if (reply->type == REDIS_REPLY_ERROR) {
  157. if (hostsocket == NULL)
  158. fprintf(stderr, "Node %s:%d replied with error:\n%s\n", ip, port, reply->str);
  159. else
  160. fprintf(stderr, "Node %s replied with error:\n%s\n", hostsocket, reply->str);
  161. freeReplyObject(reply);
  162. redisFree(ctx);
  163. exit(1);
  164. }
  165. freeReplyObject(reply);
  166. return ctx;
  167. }
  168. fprintf(stderr, "ERROR: failed to fetch reply from ");
  169. if (hostsocket == NULL)
  170. fprintf(stderr, "%s:%d\n", ip, port);
  171. else
  172. fprintf(stderr, "%s\n", hostsocket);
  173. cleanup:
  174. freeReplyObject(reply);
  175. redisFree(ctx);
  176. return NULL;
  177. }
  178. static redisConfig *getRedisConfig(const char *ip, int port,
  179. const char *hostsocket)
  180. {
  181. redisConfig *cfg = zcalloc(sizeof(*cfg));
  182. if (!cfg) return NULL;
  183. redisContext *c = NULL;
  184. redisReply *reply = NULL, *sub_reply = NULL;
  185. c = getRedisContext(ip, port, hostsocket);
  186. if (c == NULL) {
  187. freeRedisConfig(cfg);
  188. return NULL;
  189. }
  190. redisAppendCommand(c, "CONFIG GET %s", "save");
  191. redisAppendCommand(c, "CONFIG GET %s", "appendonly");
  192. int i = 0;
  193. void *r = NULL;
  194. for (; i < 2; i++) {
  195. int res = redisGetReply(c, &r);
  196. if (reply) freeReplyObject(reply);
  197. reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
  198. if (res != REDIS_OK || !r) goto fail;
  199. if (reply->type == REDIS_REPLY_ERROR) {
  200. fprintf(stderr, "ERROR: %s\n", reply->str);
  201. goto fail;
  202. }
  203. if (reply->type != REDIS_REPLY_ARRAY || reply->elements < 2) goto fail;
  204. sub_reply = reply->element[1];
  205. char *value = sub_reply->str;
  206. if (!value) value = "";
  207. switch (i) {
  208. case 0: cfg->save = sdsnew(value); break;
  209. case 1: cfg->appendonly = sdsnew(value); break;
  210. }
  211. }
  212. freeReplyObject(reply);
  213. redisFree(c);
  214. return cfg;
  215. fail:
  216. fprintf(stderr, "ERROR: failed to fetch CONFIG from ");
  217. if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port);
  218. else fprintf(stderr, "%s\n", hostsocket);
  219. int abort_test = 0;
  220. if (reply && reply->type == REDIS_REPLY_ERROR &&
  221. (!strncmp(reply->str,"NOAUTH",5) ||
  222. !strncmp(reply->str,"WRONGPASS",9) ||
  223. !strncmp(reply->str,"NOPERM",5)))
  224. abort_test = 1;
  225. freeReplyObject(reply);
  226. redisFree(c);
  227. freeRedisConfig(cfg);
  228. if (abort_test) exit(1);
  229. return NULL;
  230. }
  231. static void freeRedisConfig(redisConfig *cfg) {
  232. if (cfg->save) sdsfree(cfg->save);
  233. if (cfg->appendonly) sdsfree(cfg->appendonly);
  234. zfree(cfg);
  235. }
  236. static void freeClient(client c) {
  237. aeEventLoop *el = CLIENT_GET_EVENTLOOP(c);
  238. listNode *ln;
  239. aeDeleteFileEvent(el,c->context->fd,AE_WRITABLE);
  240. aeDeleteFileEvent(el,c->context->fd,AE_READABLE);
  241. if (c->thread_id >= 0) {
  242. int requests_finished = 0;
  243. atomicGet(config.requests_finished, requests_finished);
  244. if (requests_finished >= config.requests) {
  245. aeStop(el);
  246. }
  247. }
  248. redisFree(c->context);
  249. sdsfree(c->obuf);
  250. zfree(c->randptr);
  251. zfree(c->stagptr);
  252. zfree(c);
  253. if (config.num_threads) pthread_mutex_lock(&(config.liveclients_mutex));
  254. config.liveclients--;
  255. ln = listSearchKey(config.clients,c);
  256. assert(ln != NULL);
  257. listDelNode(config.clients,ln);
  258. if (config.num_threads) pthread_mutex_unlock(&(config.liveclients_mutex));
  259. }
  260. static void freeAllClients(void) {
  261. listNode *ln = config.clients->head, *next;
  262. while(ln) {
  263. next = ln->next;
  264. freeClient(ln->value);
  265. ln = next;
  266. }
  267. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载