关键词搜索

源码搜索 ×
×

漫话Redis源码之二十六

发布2021-12-05浏览1800次

详情内容

这里不需要重点看,主要看下cancelReplicationHandshake就行:

  1. #include "server.h"
  2. #include "cluster.h"
  3. #include "bio.h"
  4. #include <sys/time.h>
  5. #include <unistd.h>
  6. #include <fcntl.h>
  7. #include <sys/socket.h>
  8. #include <sys/stat.h>
  9. void replicationDiscardCachedMaster(void);
  10. void replicationResurrectCachedMaster(connection *conn);
  11. void replicationSendAck(void);
  12. void putSlaveOnline(client *slave);
  13. int cancelReplicationHandshake(int reconnect);
  14. /* We take a global flag to remember if this instance generated an RDB
  15. * because of replication, so that we can remove the RDB file in case
  16. * the instance is configured to have no persistence. */
  17. int RDBGeneratedByReplication = 0;
  18. /* --------------------------- Utility functions ---------------------------- */
  19. /* Return the pointer to a string representing the slave ip:listening_port
  20. * pair. Mostly useful for logging, since we want to log a slave using its
  21. * IP address and its listening port which is more clear for the user, for
  22. * example: "Closing connection with replica 10.1.2.3:6380". */
  23. char *replicationGetSlaveName(client *c) {
  24. static char buf[NET_HOST_PORT_STR_LEN];
  25. char ip[NET_IP_STR_LEN];
  26. ip[0] = '\0';
  27. buf[0] = '\0';
  28. if (c->slave_addr ||
  29. connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
  30. {
  31. char *addr = c->slave_addr ? c->slave_addr : ip;
  32. if (c->slave_listening_port)
  33. anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port);
  34. else
  35. snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr);
  36. } else {
  37. snprintf(buf,sizeof(buf),"client id #%llu",
  38. (unsigned long long) c->id);
  39. }
  40. return buf;
  41. }
  42. /* Plain unlink() can block for quite some time in order to actually apply
  43. * the file deletion to the filesystem. This call removes the file in a
  44. * background thread instead. We actually just do close() in the thread,
  45. * by using the fact that if there is another instance of the same file open,
  46. * the foreground unlink() will only remove the fs name, and deleting the
  47. * file's storage space will only happen once the last reference is lost. */
  48. int bg_unlink(const char *filename) {
  49. int fd = open(filename,O_RDONLY|O_NONBLOCK);
  50. if (fd == -1) {
  51. /* Can't open the file? Fall back to unlinking in the main thread. */
  52. return unlink(filename);
  53. } else {
  54. /* The following unlink() removes the name but doesn't free the
  55. * file contents because a process still has it open. */
  56. int retval = unlink(filename);
  57. if (retval == -1) {
  58. /* If we got an unlink error, we just return it, closing the
  59. * new reference we have to the file. */
  60. int old_errno = errno;
  61. close(fd); /* This would overwrite our errno. So we saved it. */
  62. errno = old_errno;
  63. return -1;
  64. }
  65. bioCreateCloseJob(fd);
  66. return 0; /* Success. */
  67. }
  68. }
  69. /* ---------------------------------- MASTER -------------------------------- */
  70. void createReplicationBacklog(void) {
  71. serverAssert(server.repl_backlog == NULL);
  72. server.repl_backlog = zmalloc(server.repl_backlog_size);
  73. server.repl_backlog_histlen = 0;
  74. server.repl_backlog_idx = 0;
  75. /* We don't have any data inside our buffer, but virtually the first
  76. * byte we have is the next byte that will be generated for the
  77. * replication stream. */
  78. server.repl_backlog_off = server.master_repl_offset+1;
  79. }
  80. /* This function is called when the user modifies the replication backlog
  81. * size at runtime. It is up to the function to both update the
  82. * server.repl_backlog_size and to resize the buffer and setup it so that
  83. * it contains the same data as the previous one (possibly less data, but
  84. * the most recent bytes, or the same data and more free space in case the
  85. * buffer is enlarged). */
  86. void resizeReplicationBacklog(long long newsize) {
  87. if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
  88. newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
  89. if (server.repl_backlog_size == newsize) return;
  90. server.repl_backlog_size = newsize;
  91. if (server.repl_backlog != NULL) {
  92. /* What we actually do is to flush the old buffer and realloc a new
  93. * empty one. It will refill with new data incrementally.
  94. * The reason is that copying a few gigabytes adds latency and even
  95. * worse often we need to alloc additional space before freeing the
  96. * old buffer. */
  97. zfree(server.repl_backlog);
  98. server.repl_backlog = zmalloc(server.repl_backlog_size);
  99. server.repl_backlog_histlen = 0;
  100. server.repl_backlog_idx = 0;
  101. /* Next byte we have is... the next since the buffer is empty. */
  102. server.repl_backlog_off = server.master_repl_offset+1;
  103. }
  104. }
  105. void freeReplicationBacklog(void) {
  106. serverAssert(listLength(server.slaves) == 0);
  107. zfree(server.repl_backlog);
  108. server.repl_backlog = NULL;
  109. }
  110. /* Add data to the replication backlog.
  111. * This function also increments the global replication offset stored at
  112. * server.master_repl_offset, because there is no case where we want to feed
  113. * the backlog without incrementing the offset. */
  114. void feedReplicationBacklog(void *ptr, size_t len) {
  115. unsigned char *p = ptr;
  116. server.master_repl_offset += len;
  117. /* This is a circular buffer, so write as much data we can at every
  118. * iteration and rewind the "idx" index if we reach the limit. */
  119. while(len) {
  120. size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
  121. if (thislen > len) thislen = len;
  122. memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
  123. server.repl_backlog_idx += thislen;
  124. if (server.repl_backlog_idx == server.repl_backlog_size)
  125. server.repl_backlog_idx = 0;
  126. len -= thislen;
  127. p += thislen;
  128. server.repl_backlog_histlen += thislen;
  129. }
  130. if (server.repl_backlog_histlen > server.repl_backlog_size)
  131. server.repl_backlog_histlen = server.repl_backlog_size;
  132. /* Set the offset of the first byte we have in the backlog. */
  133. server.repl_backlog_off = server.master_repl_offset -
  134. server.repl_backlog_histlen + 1;
  135. }
  136. /* Wrapper for feedReplicationBacklog() that takes Redis string objects
  137. * as input. */
  138. void feedReplicationBacklogWithObject(robj *o) {
  139. char llstr[LONG_STR_SIZE];
  140. void *p;
  141. size_t len;
  142. if (o->encoding == OBJ_ENCODING_INT) {
  143. len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
  144. p = llstr;
  145. } else {
  146. len = sdslen(o->ptr);
  147. p = o->ptr;
  148. }
  149. feedReplicationBacklog(p,len);
  150. }
  151. int canFeedReplicaReplBuffer(client *replica) {
  152. /* Don't feed replicas that only want the RDB. */
  153. if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
  154. /* Don't feed replicas that are still waiting for BGSAVE to start. */
  155. if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
  156. return 1;
  157. }
  158. /* Propagate write commands to slaves, and populate the replication backlog
  159. * as well. This function is used if the instance is a master: we use
  160. * the commands received by our clients in order to create the replication
  161. * stream. Instead if the instance is a slave and has sub-slaves attached,
  162. * we use replicationFeedSlavesFromMasterStream() */
  163. void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
  164. listNode *ln;
  165. listIter li;
  166. int j, len;
  167. char llstr[LONG_STR_SIZE];
  168. /* If the instance is not a top level master, return ASAP: we'll just proxy
  169. * the stream of data we receive from our master instead, in order to
  170. * propagate *identical* replication stream. In this way this slave can
  171. * advertise the same replication ID as the master (since it shares the
  172. * master replication history and has the same backlog and offsets). */
  173. if (server.masterhost != NULL) return;
  174. /* If there aren't slaves, and there is no backlog buffer to populate,
  175. * we can return ASAP. */
  176. if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
  177. /* We can't have slaves attached and no backlog. */
  178. serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
  179. /* Send SELECT command to every slave if needed. */
  180. if (server.slaveseldb != dictid) {
  181. robj *selectcmd;
  182. /* For a few DBs we have pre-computed SELECT command. */
  183. if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
  184. selectcmd = shared.select[dictid];
  185. } else {
  186. int dictid_len;
  187. dictid_len = ll2string(llstr,sizeof(llstr),dictid);
  188. selectcmd = createObject(OBJ_STRING,
  189. sdscatprintf(sdsempty(),
  190. "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
  191. dictid_len, llstr));
  192. }
  193. /* Add the SELECT command into the backlog. */
  194. if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
  195. /* Send it to slaves. */
  196. listRewind(slaves,&li);
  197. while((ln = listNext(&li))) {
  198. client *slave = ln->value;
  199. if (!canFeedReplicaReplBuffer(slave)) continue;
  200. addReply(slave,selectcmd);
  201. }
  202. if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
  203. decrRefCount(selectcmd);
  204. }
  205. server.slaveseldb = dictid;
  206. /* Write the command to the replication backlog if any. */
  207. if (server.repl_backlog) {
  208. char aux[LONG_STR_SIZE+3];
  209. /* Add the multi bulk reply length. */
  210. aux[0] = '*';
  211. len = ll2string(aux+1,sizeof(aux)-1,argc);
  212. aux[len+1] = '\r';
  213. aux[len+2] = '\n';
  214. feedReplicationBacklog(aux,len+3);
  215. for (j = 0; j < argc; j++) {
  216. long objlen = stringObjectLen(argv[j]);
  217. /* We need to feed the buffer with the object as a bulk reply
  218. * not just as a plain string, so create the $..CRLF payload len
  219. * and add the final CRLF */
  220. aux[0] = '$';
  221. len = ll2string(aux+1,sizeof(aux)-1,objlen);
  222. aux[len+1] = '\r';
  223. aux[len+2] = '\n';
  224. feedReplicationBacklog(aux,len+3);
  225. feedReplicationBacklogWithObject(argv[j]);
  226. feedReplicationBacklog(aux+len+1,2);
  227. }
  228. }
  229. /* Write the command to every slave. */
  230. listRewind(slaves,&li);
  231. while((ln = listNext(&li))) {
  232. client *slave = ln->value;
  233. if (!canFeedReplicaReplBuffer(slave)) continue;
  234. /* Feed slaves that are waiting for the initial SYNC (so these commands
  235. * are queued in the output buffer until the initial SYNC completes),
  236. * or are already in sync with the master. */
  237. /* Add the multi bulk length. */
  238. addReplyArrayLen(slave,argc);
  239. /* Finally any additional argument that was not stored inside the
  240. * static buffer if any (from j to argc). */
  241. for (j = 0; j < argc; j++)
  242. addReplyBulk(slave,argv[j]);
  243. }
  244. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载