这里不需要重点看,主要看下cancelReplicationHandshake就行:
- #include "server.h"
- #include "cluster.h"
- #include "bio.h"
-
- #include <sys/time.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <sys/socket.h>
- #include <sys/stat.h>
-
- void replicationDiscardCachedMaster(void);
- void replicationResurrectCachedMaster(connection *conn);
- void replicationSendAck(void);
- void putSlaveOnline(client *slave);
- int cancelReplicationHandshake(int reconnect);
-
- /* We take a global flag to remember if this instance generated an RDB
- * because of replication, so that we can remove the RDB file in case
- * the instance is configured to have no persistence. */
- int RDBGeneratedByReplication = 0;
-
- /* --------------------------- Utility functions ---------------------------- */
-
- /* Return the pointer to a string representing the slave ip:listening_port
- * pair. Mostly useful for logging, since we want to log a slave using its
- * IP address and its listening port which is more clear for the user, for
- * example: "Closing connection with replica 10.1.2.3:6380". */
- char *replicationGetSlaveName(client *c) {
- static char buf[NET_HOST_PORT_STR_LEN];
- char ip[NET_IP_STR_LEN];
-
- ip[0] = '\0';
- buf[0] = '\0';
- if (c->slave_addr ||
- connPeerToString(c->conn,ip,sizeof(ip),NULL) != -1)
- {
- char *addr = c->slave_addr ? c->slave_addr : ip;
- if (c->slave_listening_port)
- anetFormatAddr(buf,sizeof(buf),addr,c->slave_listening_port);
- else
- snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr);
- } else {
- snprintf(buf,sizeof(buf),"client id #%llu",
- (unsigned long long) c->id);
- }
- return buf;
- }
-
- /* Plain unlink() can block for quite some time in order to actually apply
- * the file deletion to the filesystem. This call removes the file in a
- * background thread instead. We actually just do close() in the thread,
- * by using the fact that if there is another instance of the same file open,
- * the foreground unlink() will only remove the fs name, and deleting the
- * file's storage space will only happen once the last reference is lost. */
- int bg_unlink(const char *filename) {
- int fd = open(filename,O_RDONLY|O_NONBLOCK);
- if (fd == -1) {
- /* Can't open the file? Fall back to unlinking in the main thread. */
- return unlink(filename);
- } else {
- /* The following unlink() removes the name but doesn't free the
- * file contents because a process still has it open. */
- int retval = unlink(filename);
- if (retval == -1) {
- /* If we got an unlink error, we just return it, closing the
- * new reference we have to the file. */
- int old_errno = errno;
- close(fd); /* This would overwrite our errno. So we saved it. */
- errno = old_errno;
- return -1;
- }
- bioCreateCloseJob(fd);
- return 0; /* Success. */
- }
- }
-
- /* ---------------------------------- MASTER -------------------------------- */
-
- void createReplicationBacklog(void) {
- serverAssert(server.repl_backlog == NULL);
- server.repl_backlog = zmalloc(server.repl_backlog_size);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
-
- /* We don't have any data inside our buffer, but virtually the first
- * byte we have is the next byte that will be generated for the
- * replication stream. */
- server.repl_backlog_off = server.master_repl_offset+1;
- }
-
- /* This function is called when the user modifies the replication backlog
- * size at runtime. It is up to the function to both update the
- * server.repl_backlog_size and to resize the buffer and setup it so that
- * it contains the same data as the previous one (possibly less data, but
- * the most recent bytes, or the same data and more free space in case the
- * buffer is enlarged). */
- void resizeReplicationBacklog(long long newsize) {
- if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
- newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
- if (server.repl_backlog_size == newsize) return;
-
- server.repl_backlog_size = newsize;
- if (server.repl_backlog != NULL) {
- /* What we actually do is to flush the old buffer and realloc a new
- * empty one. It will refill with new data incrementally.
- * The reason is that copying a few gigabytes adds latency and even
- * worse often we need to alloc additional space before freeing the
- * old buffer. */
- zfree(server.repl_backlog);
- server.repl_backlog = zmalloc(server.repl_backlog_size);
- server.repl_backlog_histlen = 0;
- server.repl_backlog_idx = 0;
- /* Next byte we have is... the next since the buffer is empty. */
- server.repl_backlog_off = server.master_repl_offset+1;
- }
- }
-
- void freeReplicationBacklog(void) {
- serverAssert(listLength(server.slaves) == 0);
- zfree(server.repl_backlog);
- server.repl_backlog = NULL;
- }
-
- /* Add data to the replication backlog.
- * This function also increments the global replication offset stored at
- * server.master_repl_offset, because there is no case where we want to feed
- * the backlog without incrementing the offset. */
- void feedReplicationBacklog(void *ptr, size_t len) {
- unsigned char *p = ptr;
-
- server.master_repl_offset += len;
-
- /* This is a circular buffer, so write as much data we can at every
- * iteration and rewind the "idx" index if we reach the limit. */
- while(len) {
- size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
- if (thislen > len) thislen = len;
- memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
- server.repl_backlog_idx += thislen;
- if (server.repl_backlog_idx == server.repl_backlog_size)
- server.repl_backlog_idx = 0;
- len -= thislen;
- p += thislen;
- server.repl_backlog_histlen += thislen;
- }
- if (server.repl_backlog_histlen > server.repl_backlog_size)
- server.repl_backlog_histlen = server.repl_backlog_size;
- /* Set the offset of the first byte we have in the backlog. */
- server.repl_backlog_off = server.master_repl_offset -
- server.repl_backlog_histlen + 1;
- }
-
- /* Wrapper for feedReplicationBacklog() that takes Redis string objects
- * as input. */
- void feedReplicationBacklogWithObject(robj *o) {
- char llstr[LONG_STR_SIZE];
- void *p;
- size_t len;
-
- if (o->encoding == OBJ_ENCODING_INT) {
- len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
- p = llstr;
- } else {
- len = sdslen(o->ptr);
- p = o->ptr;
- }
- feedReplicationBacklog(p,len);
- }
-
- int canFeedReplicaReplBuffer(client *replica) {
- /* Don't feed replicas that only want the RDB. */
- if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
-
- /* Don't feed replicas that are still waiting for BGSAVE to start. */
- if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
-
- return 1;
- }
-
- /* Propagate write commands to slaves, and populate the replication backlog
- * as well. This function is used if the instance is a master: we use
- * the commands received by our clients in order to create the replication
- * stream. Instead if the instance is a slave and has sub-slaves attached,
- * we use replicationFeedSlavesFromMasterStream() */
- void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
- listNode *ln;
- listIter li;
- int j, len;
- char llstr[LONG_STR_SIZE];
-
- /* If the instance is not a top level master, return ASAP: we'll just proxy
- * the stream of data we receive from our master instead, in order to
- * propagate *identical* replication stream. In this way this slave can
- * advertise the same replication ID as the master (since it shares the
- * master replication history and has the same backlog and offsets). */
- if (server.masterhost != NULL) return;
-
- /* If there aren't slaves, and there is no backlog buffer to populate,
- * we can return ASAP. */
- if (server.repl_backlog == NULL && listLength(slaves) == 0) return;
-
- /* We can't have slaves attached and no backlog. */
- serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
-
- /* Send SELECT command to every slave if needed. */
- if (server.slaveseldb != dictid) {
- robj *selectcmd;
-
- /* For a few DBs we have pre-computed SELECT command. */
- if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
- selectcmd = shared.select[dictid];
- } else {
- int dictid_len;
-
- dictid_len = ll2string(llstr,sizeof(llstr),dictid);
- selectcmd = createObject(OBJ_STRING,
- sdscatprintf(sdsempty(),
- "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
- dictid_len, llstr));
- }
-
- /* Add the SELECT command into the backlog. */
- if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
-
- /* Send it to slaves. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
- addReply(slave,selectcmd);
- }
-
- if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
- decrRefCount(selectcmd);
- }
- server.slaveseldb = dictid;
-
- /* Write the command to the replication backlog if any. */
- if (server.repl_backlog) {
- char aux[LONG_STR_SIZE+3];
-
- /* Add the multi bulk reply length. */
- aux[0] = '*';
- len = ll2string(aux+1,sizeof(aux)-1,argc);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
-
- for (j = 0; j < argc; j++) {
- long objlen = stringObjectLen(argv[j]);
-
- /* We need to feed the buffer with the object as a bulk reply
- * not just as a plain string, so create the $..CRLF payload len
- * and add the final CRLF */
- aux[0] = '$';
- len = ll2string(aux+1,sizeof(aux)-1,objlen);
- aux[len+1] = '\r';
- aux[len+2] = '\n';
- feedReplicationBacklog(aux,len+3);
- feedReplicationBacklogWithObject(argv[j]);
- feedReplicationBacklog(aux+len+1,2);
- }
- }
-
- /* Write the command to every slave. */
- listRewind(slaves,&li);
- while((ln = listNext(&li))) {
- client *slave = ln->value;
-
- if (!canFeedReplicaReplBuffer(slave)) continue;
-
- /* Feed slaves that are waiting for the initial SYNC (so these commands
- * are queued in the output buffer until the initial SYNC completes),
- * or are already in sync with the master. */
-
- /* Add the multi bulk length. */
- addReplyArrayLen(slave,argc);
-
- /* Finally any additional argument that was not stored inside the
- * static buffer if any (from j to argc). */
- for (j = 0; j < argc; j++)
- addReplyBulk(slave,argv[j]);
- }
- }