什么时候调用这些函数呢?值得思考,其实,每次传世壶数据数据到客户端时,都会调用。
- /* This function is called every time we are going to transmit new data
- * to the client. The behavior is the following:
- *
- * If the client should receive new data (normal clients will) the function
- * returns C_OK, and make sure to install the write handler in our event
- * loop so that when the socket is writable new data gets written.
- *
- * If the client should not receive new data, because it is a fake client
- * (used to load AOF in memory), a master or because the setup of the write
- * handler failed, the function returns C_ERR.
- *
- * The function may return C_OK without actually installing the write
- * event handler in the following cases:
- *
- * 1) The event handler should already be installed since the output buffer
- * already contains something.
- * 2) The client is a slave but not yet online, so we want to just accumulate
- * writes in the buffer but not actually sending them yet.
- *
- * Typically gets called every time a reply is built, before adding more
- * data to the clients output buffers. If the function returns C_ERR no
- * data should be appended to the output buffers. */
- int prepareClientToWrite(client *c) {
- /* If it's the Lua client we always return ok without installing any
- * handler since there is no socket at all. */
- if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
-
- /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
- if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
-
- /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
- if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
-
- /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
- * is set. */
- if ((c->flags & CLIENT_MASTER) &&
- !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
-
- if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
-
- /* Schedule the client to write the output buffers to the socket, unless
- * it should already be setup to do so (it has already pending data).
- *
- * If CLIENT_PENDING_READ is set, we're in an IO thread and should
- * not install a write handler. Instead, it will be done by
- * handleClientsWithPendingReadsUsingThreads() upon return.
- */
- if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
- clientInstallWriteHandler(c);
-
- /* Authorize the caller to queue in the output buffer of this client. */
- return C_OK;
- }
-
- /* -----------------------------------------------------------------------------
- * Low level functions to add more data to output buffers.
- * -------------------------------------------------------------------------- */
-
- /* Attempts to add the reply to the static buffer in the client struct.
- * Returns C_ERR if the buffer is full, or the reply list is not empty,
- * in which case the reply must be added to the reply list. */
- int _addReplyToBuffer(client *c, const char *s, size_t len) {
- size_t available = sizeof(c->buf)-c->bufpos;
-
- if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
-
- /* If there already are entries in the reply list, we cannot
- * add anything more to the static buffer. */
- if (listLength(c->reply) > 0) return C_ERR;
-
- /* Check that the buffer has enough space available for this string. */
- if (len > available) return C_ERR;
-
- memcpy(c->buf+c->bufpos,s,len);
- c->bufpos+=len;
- return C_OK;
- }
-
- /* Adds the reply to the reply linked list.
- * Note: some edits to this function need to be relayed to AddReplyFromClient. */
- void _addReplyProtoToList(client *c, const char *s, size_t len) {
- if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
-
- listNode *ln = listLast(c->reply);
- clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
-
- /* Note that 'tail' may be NULL even if we have a tail node, because when
- * addReplyDeferredLen() is used, it sets a dummy node to NULL just
- * fo fill it later, when the size of the bulk length is set. */
-
- /* Append to tail string when possible. */
- if (tail) {
- /* Copy the part we can fit into the tail, and leave the rest for a
- * new node */
- size_t avail = tail->size - tail->used;
- size_t copy = avail >= len? len: avail;
- memcpy(tail->buf + tail->used, s, copy);
- tail->used += copy;
- s += copy;
- len -= copy;
- }
- if (len) {
- /* Create a new node, make sure it is allocated to at
- * least PROTO_REPLY_CHUNK_BYTES */
- size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
- tail = zmalloc(size + sizeof(clientReplyBlock));
- /* take over the allocation's internal fragmentation */
- tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
- tail->used = len;
- memcpy(tail->buf, s, len);
- listAddNodeTail(c->reply, tail);
- c->reply_bytes += tail->size;
-
- closeClientOnOutputBufferLimitReached(c, 1);
- }
- }
-
- /* -----------------------------------------------------------------------------
- * Higher level functions to queue data on the client output buffer.
- * The following functions are the ones that commands implementations will call.
- * -------------------------------------------------------------------------- */
-
- /* Add the object 'obj' string representation to the client output buffer. */
- void addReply(client *c, robj *obj) {
- if (prepareClientToWrite(c) != C_OK) return;
-
- if (sdsEncodedObject(obj)) {
- if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
- _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
- } else if (obj->encoding == OBJ_ENCODING_INT) {
- /* For integer encoded strings we just convert it into a string
- * using our optimized function, and attach the resulting string
- * to the output buffer. */
- char buf[32];
- size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
- if (_addReplyToBuffer(c,buf,len) != C_OK)
- _addReplyProtoToList(c,buf,len);
- } else {
- serverPanic("Wrong obj->encoding in addReply()");
- }
- }
-
- /* Add the SDS 's' string to the client output buffer, as a side effect
- * the SDS string is freed. */
- void addReplySds(client *c, sds s) {
- if (prepareClientToWrite(c) != C_OK) {
- /* The caller expects the sds to be free'd. */
- sdsfree(s);
- return;
- }
- if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
- _addReplyProtoToList(c,s,sdslen(s));
- sdsfree(s);
- }
-
- /* This low level function just adds whatever protocol you send it to the
- * client buffer, trying the static buffer initially, and using the string
- * of objects if not possible.
- *
- * It is efficient because does not create an SDS object nor an Redis object
- * if not needed. The object will only be created by calling
- * _addReplyProtoToList() if we fail to extend the existing tail object
- * in the list of objects. */
- void addReplyProto(client *c, const char *s, size_t len) {
- if (prepareClientToWrite(c) != C_OK) return;
- if (_addReplyToBuffer(c,s,len) != C_OK)
- _addReplyProtoToList(c,s,len);
- }
-
- /* Low level function called by the addReplyError...() functions.
- * It emits the protocol for a Redis error, in the form:
- *
- * -ERRORCODE Error Message<CR><LF>
- *
- * If the error code is already passed in the string 's', the error
- * code provided is used, otherwise the string "-ERR " for the generic
- * error code is automatically added.
- * Note that 's' must NOT end with \r\n. */
- void addReplyErrorLength(client *c, const char *s, size_t len) {
- /* If the string already starts with "-..." then the error code
- * is provided by the caller. Otherwise we use "-ERR". */
- if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
- addReplyProto(c,s,len);
- addReplyProto(c,"\r\n",2);
- }
-
- /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
- void afterErrorReply(client *c, const char *s, size_t len) {
- /* Increment the global error counter */
- server.stat_total_error_replies++;
- /* Increment the error stats
- * If the string already starts with "-..." then the error prefix
- * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
- if (s[0] != '-') {
- incrementErrorCount("ERR", 3);
- } else {
- char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
- if (spaceloc) {
- const size_t errEndPos = (size_t)(spaceloc - s);
- incrementErrorCount(s+1, errEndPos-1);
- } else {
- /* Fallback to ERR if we can't retrieve the error prefix */
- incrementErrorCount("ERR", 3);
- }
- }
-
- /* Sometimes it could be normal that a slave replies to a master with
- * an error and this function gets called. Actually the error will never
- * be sent because addReply*() against master clients has no effect...
- * A notable example is:
- *
- * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
- *
- * Where the master must propagate the first change even if the second
- * will produce an error. However it is useful to log such events since
- * they are rare and may hint at errors in a script or a bug in Redis. */
- int ctype = getClientType(c);
- if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
- char *to, *from;
-
- if (c->id == CLIENT_ID_AOF) {
- to = "AOF-loading-client";
- from = "server";
- } else if (ctype == CLIENT_TYPE_MASTER) {
- to = "master";
- from = "replica";
- } else {
- to = "replica";
- from = "master";
- }
-
- if (len > 4096) len = 4096;
- char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
- serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
- "to its %s: '%.*s' after processing the command "
- "'%s'", from, to, (int)len, s, cmdname);
- if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
- server.repl_backlog_histlen > 0)
- {
- showLatestBacklog();
- }
- server.stat_unexpected_error_replies++;
- }
- }
-
- /* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
- * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
- void addReplyErrorObject(client *c, robj *err) {
- addReply(c, err);
- afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */
- }
-
- /* See addReplyErrorLength for expectations from the input string. */
- void addReplyError(client *c, const char *err) {
- addReplyErrorLength(c,err,strlen(err));
- afterErrorReply(c,err,strlen(err));
- }
-
- /* See addReplyErrorLength for expectations from the input string. */
- /* As a side effect the SDS string is freed. */
- void addReplyErrorSds(client *c, sds err) {
- addReplyErrorLength(c,err,sdslen(err));
- afterErrorReply(c,err,sdslen(err));
- sdsfree(err);
- }
-
- /* See addReplyErrorLength for expectations from the formatted string.
- * The formatted string is safe to contain \r and \n anywhere. */
- void addReplyErrorFormat(client *c, const char *fmt, ...) {
- va_list ap;
- va_start(ap,fmt);
- sds s = sdscatvprintf(sdsempty(),fmt,ap);
- va_end(ap);
- /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
- s = sdstrim(s, "\r\n");
- /* Make sure there are no newlines in the middle of the string, otherwise
- * invalid protocol is emitted. */
- s = sdsmapchars(s, "\r\n", " ", 2);
- addReplyErrorLength(c,s,sdslen(s));
- afterErrorReply(c,s,sdslen(s));
- sdsfree(s);
- }
-
- void addReplyStatusLength(client *c, const char *s, size_t len) {
- addReplyProto(c,"+",1);
- addReplyProto(c,s,len);
- addReplyProto(c,"\r\n",2);
- }
-
- void addReplyStatus(client *c, const char *status) {
- addReplyStatusLength(c,status,strlen(status));
- }
-
- void addReplyStatusFormat(client *c, const char *fmt, ...) {
- va_list ap;
- va_start(ap,fmt);
- sds s = sdscatvprintf(sdsempty(),fmt,ap);
- va_end(ap);
- addReplyStatusLength(c,s,sdslen(s));
- sdsfree(s);
- }
-
- /* Sometimes we are forced to create a new reply node, and we can't append to
- * the previous one, when that happens, we wanna try to trim the unused space
- * at the end of the last reply node which we won't use anymore. */
- void trimReplyUnusedTailSpace(client *c) {
- listNode *ln = listLast(c->reply);
- clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
-
- /* Note that 'tail' may be NULL even if we have a tail node, because when
- * addReplyDeferredLen() is used */
- if (!tail) return;
-
- /* We only try to trim the space is relatively high (more than a 1/4 of the
- * allocation), otherwise there's a high chance realloc will NOP.
- * Also, to avoid large memmove which happens as part of realloc, we only do
- * that if the used part is small. */
- if (tail->size - tail->used > tail->size / 4 &&
- tail->used < PROTO_REPLY_CHUNK_BYTES)
- {
- size_t old_size = tail->size;
- tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
- /* take over the allocation's internal fragmentation (at least for
- * memory usage tracking) */
- tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
- c->reply_bytes = c->reply_bytes + tail->size - old_size;
- listNodeValue(ln) = tail;
- }
- }
-
- /* Adds an empty object to the reply list that will contain the multi bulk
- * length, which is not known when this function is called. */
- void *addReplyDeferredLen(client *c) {
- /* Note that we install the write event here even if the object is not
- * ready to be sent, since we are sure that before returning to the
- * event loop setDeferredAggregateLen() will be called. */
- if (prepareClientToWrite(c) != C_OK) return NULL;
- trimReplyUnusedTailSpace(c);
- listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
- return listLast(c->reply);
- }
-
- void setDeferredReply(client *c, void *node, const char *s, size_t length) {
- listNode *ln = (listNode*)node;
- clientReplyBlock *next, *prev;
-
- /* Abort when *node is NULL: when the client should not accept writes
- * we return NULL in addReplyDeferredLen() */
- if (node == NULL) return;
- serverAssert(!listNodeValue(ln));
-
- /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
- * with a new buffer structure containing the protocol needed to specify
- * the length of the array following. However sometimes there might be room
- * in the previous/next node so we can instead remove this NULL node, and
- * suffix/prefix our data in the node immediately before/after it, in order
- * to save a write(2) syscall later. Conditions needed to do it:
- *
- * - The prev node is non-NULL and has space in it or
- * - The next node is non-NULL,
- * - It has enough room already allocated
- * - And not too large (avoid large memmove) */
- if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) &&
- prev->size - prev->used > 0)
- {
- size_t len_to_copy = prev->size - prev->used;
- if (len_to_copy > length)
- len_to_copy = length;
- memcpy(prev->buf + prev->used, s, len_to_copy);
- prev->used += len_to_copy;
- length -= len_to_copy;
- if (length == 0) {
- listDelNode(c->reply, ln);
- return;
- }
- s += len_to_copy;
- }
-
- if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
- next->size - next->used >= length &&
- next->used < PROTO_REPLY_CHUNK_BYTES * 4)
- {
- memmove(next->buf + length, next->buf, next->used);
- memcpy(next->buf, s, length);
- next->used += length;
- listDelNode(c->reply,ln);
- } else {
- /* Create a new node */
- clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
- /* Take over the allocation's internal fragmentation */
- buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
- buf->used = length;
- memcpy(buf->buf, s, length);
- listNodeValue(ln) = buf;
- c->reply_bytes += buf->size;
-
- closeClientOnOutputBufferLimitReached(c, 1);
- }
- }
-
- /* Populate the length object and try gluing it to the next chunk. */
- void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
- serverAssert(length >= 0);
-
- /* Abort when *node is NULL: when the client should not accept writes
- * we return NULL in addReplyDeferredLen() */
- if (node == NULL) return;
-
- char lenstr[128];
- size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
- setDeferredReply(c, node, lenstr, lenstr_len);
- }
-
- void setDeferredArrayLen(client *c, void *node, long length) {
- setDeferredAggregateLen(c,node,length,'*');
- }
-
- void setDeferredMapLen(client *c, void *node, long length) {
- int prefix = c->resp == 2 ? '*' : '%';
- if (c->resp == 2) length *= 2;
- setDeferredAggregateLen(c,node,length,prefix);
- }
-
- void setDeferredSetLen(client *c, void *node, long length) {
- int prefix = c->resp == 2 ? '*' : '~';
- setDeferredAggregateLen(c,node,length,prefix);
- }
-
- void setDeferredAttributeLen(client *c, void *node, long length) {
- serverAssert(c->resp >= 3);
- setDeferredAggregateLen(c,node,length,'|');
- }
-
- void setDeferredPushLen(client *c, void *node, long length) {
- serverAssert(c->resp >= 3);
- setDeferredAggregateLen(c,node,length,'>');
- }
-
- /* Add a double as a bulk reply */
- void addReplyDouble(client *c, double d) {
- if (isinf(d)) {
- /* Libc in odd systems (Hi Solaris!) will format infinite in a
- * different way, so better to handle it in an explicit way. */
- if (c->resp == 2) {
- addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
- } else {
- addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
- d > 0 ? 6 : 7);
- }
- } else {
- char dbuf[MAX_LONG_DOUBLE_CHARS+3],
- sbuf[MAX_LONG_DOUBLE_CHARS+32];
- int dlen, slen;
- if (c->resp == 2) {
- dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
- slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
- addReplyProto(c,sbuf,slen);
- } else {
- dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
- addReplyProto(c,dbuf,dlen);
- }
- }
- }
-
- void addReplyBigNum(client *c, const char* num, size_t len) {
- if (c->resp == 2) {
- addReplyBulkCBuffer(c, num, len);
- } else {
- addReplyProto(c,"(",1);
- addReplyProto(c,num,len);
- addReply(c,shared.crlf);
- }
- }
-
- /* Add a long double as a bulk reply, but uses a human readable formatting
- * of the double instead of exposing the crude behavior of doubles to the
- * dear user. */
- void addReplyHumanLongDouble(client *c, long double d) {
- if (c->resp == 2) {
- robj *o = createStringObjectFromLongDouble(d,1);
- addReplyBulk(c,o);
- decrRefCount(o);
- } else {
- char buf[MAX_LONG_DOUBLE_CHARS];
- int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
- addReplyProto(c,",",1);
- addReplyProto(c,buf,len);
- addReplyProto(c,"\r\n",2);
- }
- }