关键词搜索

源码搜索 ×
×

漫话Redis源码之四十一

发布2022-01-02浏览462次

详情内容

什么时候调用这些函数呢?值得思考,其实,每次传世壶数据数据到客户端时,都会调用。

  1. /* This function is called every time we are going to transmit new data
  2. * to the client. The behavior is the following:
  3. *
  4. * If the client should receive new data (normal clients will) the function
  5. * returns C_OK, and make sure to install the write handler in our event
  6. * loop so that when the socket is writable new data gets written.
  7. *
  8. * If the client should not receive new data, because it is a fake client
  9. * (used to load AOF in memory), a master or because the setup of the write
  10. * handler failed, the function returns C_ERR.
  11. *
  12. * The function may return C_OK without actually installing the write
  13. * event handler in the following cases:
  14. *
  15. * 1) The event handler should already be installed since the output buffer
  16. * already contains something.
  17. * 2) The client is a slave but not yet online, so we want to just accumulate
  18. * writes in the buffer but not actually sending them yet.
  19. *
  20. * Typically gets called every time a reply is built, before adding more
  21. * data to the clients output buffers. If the function returns C_ERR no
  22. * data should be appended to the output buffers. */
  23. int prepareClientToWrite(client *c) {
  24. /* If it's the Lua client we always return ok without installing any
  25. * handler since there is no socket at all. */
  26. if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
  27. /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
  28. if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
  29. /* CLIENT REPLY OFF / SKIP handling: don't send replies. */
  30. if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
  31. /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
  32. * is set. */
  33. if ((c->flags & CLIENT_MASTER) &&
  34. !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
  35. if (!c->conn) return C_ERR; /* Fake client for AOF loading. */
  36. /* Schedule the client to write the output buffers to the socket, unless
  37. * it should already be setup to do so (it has already pending data).
  38. *
  39. * If CLIENT_PENDING_READ is set, we're in an IO thread and should
  40. * not install a write handler. Instead, it will be done by
  41. * handleClientsWithPendingReadsUsingThreads() upon return.
  42. */
  43. if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
  44. clientInstallWriteHandler(c);
  45. /* Authorize the caller to queue in the output buffer of this client. */
  46. return C_OK;
  47. }
  48. /* -----------------------------------------------------------------------------
  49. * Low level functions to add more data to output buffers.
  50. * -------------------------------------------------------------------------- */
  51. /* Attempts to add the reply to the static buffer in the client struct.
  52. * Returns C_ERR if the buffer is full, or the reply list is not empty,
  53. * in which case the reply must be added to the reply list. */
  54. int _addReplyToBuffer(client *c, const char *s, size_t len) {
  55. size_t available = sizeof(c->buf)-c->bufpos;
  56. if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
  57. /* If there already are entries in the reply list, we cannot
  58. * add anything more to the static buffer. */
  59. if (listLength(c->reply) > 0) return C_ERR;
  60. /* Check that the buffer has enough space available for this string. */
  61. if (len > available) return C_ERR;
  62. memcpy(c->buf+c->bufpos,s,len);
  63. c->bufpos+=len;
  64. return C_OK;
  65. }
  66. /* Adds the reply to the reply linked list.
  67. * Note: some edits to this function need to be relayed to AddReplyFromClient. */
  68. void _addReplyProtoToList(client *c, const char *s, size_t len) {
  69. if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
  70. listNode *ln = listLast(c->reply);
  71. clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
  72. /* Note that 'tail' may be NULL even if we have a tail node, because when
  73. * addReplyDeferredLen() is used, it sets a dummy node to NULL just
  74. * fo fill it later, when the size of the bulk length is set. */
  75. /* Append to tail string when possible. */
  76. if (tail) {
  77. /* Copy the part we can fit into the tail, and leave the rest for a
  78. * new node */
  79. size_t avail = tail->size - tail->used;
  80. size_t copy = avail >= len? len: avail;
  81. memcpy(tail->buf + tail->used, s, copy);
  82. tail->used += copy;
  83. s += copy;
  84. len -= copy;
  85. }
  86. if (len) {
  87. /* Create a new node, make sure it is allocated to at
  88. * least PROTO_REPLY_CHUNK_BYTES */
  89. size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
  90. tail = zmalloc(size + sizeof(clientReplyBlock));
  91. /* take over the allocation's internal fragmentation */
  92. tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
  93. tail->used = len;
  94. memcpy(tail->buf, s, len);
  95. listAddNodeTail(c->reply, tail);
  96. c->reply_bytes += tail->size;
  97. closeClientOnOutputBufferLimitReached(c, 1);
  98. }
  99. }
  100. /* -----------------------------------------------------------------------------
  101. * Higher level functions to queue data on the client output buffer.
  102. * The following functions are the ones that commands implementations will call.
  103. * -------------------------------------------------------------------------- */
  104. /* Add the object 'obj' string representation to the client output buffer. */
  105. void addReply(client *c, robj *obj) {
  106. if (prepareClientToWrite(c) != C_OK) return;
  107. if (sdsEncodedObject(obj)) {
  108. if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
  109. _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
  110. } else if (obj->encoding == OBJ_ENCODING_INT) {
  111. /* For integer encoded strings we just convert it into a string
  112. * using our optimized function, and attach the resulting string
  113. * to the output buffer. */
  114. char buf[32];
  115. size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
  116. if (_addReplyToBuffer(c,buf,len) != C_OK)
  117. _addReplyProtoToList(c,buf,len);
  118. } else {
  119. serverPanic("Wrong obj->encoding in addReply()");
  120. }
  121. }
  122. /* Add the SDS 's' string to the client output buffer, as a side effect
  123. * the SDS string is freed. */
  124. void addReplySds(client *c, sds s) {
  125. if (prepareClientToWrite(c) != C_OK) {
  126. /* The caller expects the sds to be free'd. */
  127. sdsfree(s);
  128. return;
  129. }
  130. if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
  131. _addReplyProtoToList(c,s,sdslen(s));
  132. sdsfree(s);
  133. }
  134. /* This low level function just adds whatever protocol you send it to the
  135. * client buffer, trying the static buffer initially, and using the string
  136. * of objects if not possible.
  137. *
  138. * It is efficient because does not create an SDS object nor an Redis object
  139. * if not needed. The object will only be created by calling
  140. * _addReplyProtoToList() if we fail to extend the existing tail object
  141. * in the list of objects. */
  142. void addReplyProto(client *c, const char *s, size_t len) {
  143. if (prepareClientToWrite(c) != C_OK) return;
  144. if (_addReplyToBuffer(c,s,len) != C_OK)
  145. _addReplyProtoToList(c,s,len);
  146. }
  147. /* Low level function called by the addReplyError...() functions.
  148. * It emits the protocol for a Redis error, in the form:
  149. *
  150. * -ERRORCODE Error Message<CR><LF>
  151. *
  152. * If the error code is already passed in the string 's', the error
  153. * code provided is used, otherwise the string "-ERR " for the generic
  154. * error code is automatically added.
  155. * Note that 's' must NOT end with \r\n. */
  156. void addReplyErrorLength(client *c, const char *s, size_t len) {
  157. /* If the string already starts with "-..." then the error code
  158. * is provided by the caller. Otherwise we use "-ERR". */
  159. if (!len || s[0] != '-') addReplyProto(c,"-ERR ",5);
  160. addReplyProto(c,s,len);
  161. addReplyProto(c,"\r\n",2);
  162. }
  163. /* Do some actions after an error reply was sent (Log if needed, updates stats, etc.) */
  164. void afterErrorReply(client *c, const char *s, size_t len) {
  165. /* Increment the global error counter */
  166. server.stat_total_error_replies++;
  167. /* Increment the error stats
  168. * If the string already starts with "-..." then the error prefix
  169. * is provided by the caller ( we limit the search to 32 chars). Otherwise we use "-ERR". */
  170. if (s[0] != '-') {
  171. incrementErrorCount("ERR", 3);
  172. } else {
  173. char *spaceloc = memchr(s, ' ', len < 32 ? len : 32);
  174. if (spaceloc) {
  175. const size_t errEndPos = (size_t)(spaceloc - s);
  176. incrementErrorCount(s+1, errEndPos-1);
  177. } else {
  178. /* Fallback to ERR if we can't retrieve the error prefix */
  179. incrementErrorCount("ERR", 3);
  180. }
  181. }
  182. /* Sometimes it could be normal that a slave replies to a master with
  183. * an error and this function gets called. Actually the error will never
  184. * be sent because addReply*() against master clients has no effect...
  185. * A notable example is:
  186. *
  187. * EVAL 'redis.call("incr",KEYS[1]); redis.call("nonexisting")' 1 x
  188. *
  189. * Where the master must propagate the first change even if the second
  190. * will produce an error. However it is useful to log such events since
  191. * they are rare and may hint at errors in a script or a bug in Redis. */
  192. int ctype = getClientType(c);
  193. if (ctype == CLIENT_TYPE_MASTER || ctype == CLIENT_TYPE_SLAVE || c->id == CLIENT_ID_AOF) {
  194. char *to, *from;
  195. if (c->id == CLIENT_ID_AOF) {
  196. to = "AOF-loading-client";
  197. from = "server";
  198. } else if (ctype == CLIENT_TYPE_MASTER) {
  199. to = "master";
  200. from = "replica";
  201. } else {
  202. to = "replica";
  203. from = "master";
  204. }
  205. if (len > 4096) len = 4096;
  206. char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
  207. serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
  208. "to its %s: '%.*s' after processing the command "
  209. "'%s'", from, to, (int)len, s, cmdname);
  210. if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
  211. server.repl_backlog_histlen > 0)
  212. {
  213. showLatestBacklog();
  214. }
  215. server.stat_unexpected_error_replies++;
  216. }
  217. }
  218. /* The 'err' object is expected to start with -ERRORCODE and end with \r\n.
  219. * Unlike addReplyErrorSds and others alike which rely on addReplyErrorLength. */
  220. void addReplyErrorObject(client *c, robj *err) {
  221. addReply(c, err);
  222. afterErrorReply(c, err->ptr, sdslen(err->ptr)-2); /* Ignore trailing \r\n */
  223. }
  224. /* See addReplyErrorLength for expectations from the input string. */
  225. void addReplyError(client *c, const char *err) {
  226. addReplyErrorLength(c,err,strlen(err));
  227. afterErrorReply(c,err,strlen(err));
  228. }
  229. /* See addReplyErrorLength for expectations from the input string. */
  230. /* As a side effect the SDS string is freed. */
  231. void addReplyErrorSds(client *c, sds err) {
  232. addReplyErrorLength(c,err,sdslen(err));
  233. afterErrorReply(c,err,sdslen(err));
  234. sdsfree(err);
  235. }
  236. /* See addReplyErrorLength for expectations from the formatted string.
  237. * The formatted string is safe to contain \r and \n anywhere. */
  238. void addReplyErrorFormat(client *c, const char *fmt, ...) {
  239. va_list ap;
  240. va_start(ap,fmt);
  241. sds s = sdscatvprintf(sdsempty(),fmt,ap);
  242. va_end(ap);
  243. /* Trim any newlines at the end (ones will be added by addReplyErrorLength) */
  244. s = sdstrim(s, "\r\n");
  245. /* Make sure there are no newlines in the middle of the string, otherwise
  246. * invalid protocol is emitted. */
  247. s = sdsmapchars(s, "\r\n", " ", 2);
  248. addReplyErrorLength(c,s,sdslen(s));
  249. afterErrorReply(c,s,sdslen(s));
  250. sdsfree(s);
  251. }
  252. void addReplyStatusLength(client *c, const char *s, size_t len) {
  253. addReplyProto(c,"+",1);
  254. addReplyProto(c,s,len);
  255. addReplyProto(c,"\r\n",2);
  256. }
  257. void addReplyStatus(client *c, const char *status) {
  258. addReplyStatusLength(c,status,strlen(status));
  259. }
  260. void addReplyStatusFormat(client *c, const char *fmt, ...) {
  261. va_list ap;
  262. va_start(ap,fmt);
  263. sds s = sdscatvprintf(sdsempty(),fmt,ap);
  264. va_end(ap);
  265. addReplyStatusLength(c,s,sdslen(s));
  266. sdsfree(s);
  267. }
  268. /* Sometimes we are forced to create a new reply node, and we can't append to
  269. * the previous one, when that happens, we wanna try to trim the unused space
  270. * at the end of the last reply node which we won't use anymore. */
  271. void trimReplyUnusedTailSpace(client *c) {
  272. listNode *ln = listLast(c->reply);
  273. clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
  274. /* Note that 'tail' may be NULL even if we have a tail node, because when
  275. * addReplyDeferredLen() is used */
  276. if (!tail) return;
  277. /* We only try to trim the space is relatively high (more than a 1/4 of the
  278. * allocation), otherwise there's a high chance realloc will NOP.
  279. * Also, to avoid large memmove which happens as part of realloc, we only do
  280. * that if the used part is small. */
  281. if (tail->size - tail->used > tail->size / 4 &&
  282. tail->used < PROTO_REPLY_CHUNK_BYTES)
  283. {
  284. size_t old_size = tail->size;
  285. tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
  286. /* take over the allocation's internal fragmentation (at least for
  287. * memory usage tracking) */
  288. tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
  289. c->reply_bytes = c->reply_bytes + tail->size - old_size;
  290. listNodeValue(ln) = tail;
  291. }
  292. }
  293. /* Adds an empty object to the reply list that will contain the multi bulk
  294. * length, which is not known when this function is called. */
  295. void *addReplyDeferredLen(client *c) {
  296. /* Note that we install the write event here even if the object is not
  297. * ready to be sent, since we are sure that before returning to the
  298. * event loop setDeferredAggregateLen() will be called. */
  299. if (prepareClientToWrite(c) != C_OK) return NULL;
  300. trimReplyUnusedTailSpace(c);
  301. listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
  302. return listLast(c->reply);
  303. }
  304. void setDeferredReply(client *c, void *node, const char *s, size_t length) {
  305. listNode *ln = (listNode*)node;
  306. clientReplyBlock *next, *prev;
  307. /* Abort when *node is NULL: when the client should not accept writes
  308. * we return NULL in addReplyDeferredLen() */
  309. if (node == NULL) return;
  310. serverAssert(!listNodeValue(ln));
  311. /* Normally we fill this dummy NULL node, added by addReplyDeferredLen(),
  312. * with a new buffer structure containing the protocol needed to specify
  313. * the length of the array following. However sometimes there might be room
  314. * in the previous/next node so we can instead remove this NULL node, and
  315. * suffix/prefix our data in the node immediately before/after it, in order
  316. * to save a write(2) syscall later. Conditions needed to do it:
  317. *
  318. * - The prev node is non-NULL and has space in it or
  319. * - The next node is non-NULL,
  320. * - It has enough room already allocated
  321. * - And not too large (avoid large memmove) */
  322. if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) &&
  323. prev->size - prev->used > 0)
  324. {
  325. size_t len_to_copy = prev->size - prev->used;
  326. if (len_to_copy > length)
  327. len_to_copy = length;
  328. memcpy(prev->buf + prev->used, s, len_to_copy);
  329. prev->used += len_to_copy;
  330. length -= len_to_copy;
  331. if (length == 0) {
  332. listDelNode(c->reply, ln);
  333. return;
  334. }
  335. s += len_to_copy;
  336. }
  337. if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
  338. next->size - next->used >= length &&
  339. next->used < PROTO_REPLY_CHUNK_BYTES * 4)
  340. {
  341. memmove(next->buf + length, next->buf, next->used);
  342. memcpy(next->buf, s, length);
  343. next->used += length;
  344. listDelNode(c->reply,ln);
  345. } else {
  346. /* Create a new node */
  347. clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
  348. /* Take over the allocation's internal fragmentation */
  349. buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
  350. buf->used = length;
  351. memcpy(buf->buf, s, length);
  352. listNodeValue(ln) = buf;
  353. c->reply_bytes += buf->size;
  354. closeClientOnOutputBufferLimitReached(c, 1);
  355. }
  356. }
  357. /* Populate the length object and try gluing it to the next chunk. */
  358. void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
  359. serverAssert(length >= 0);
  360. /* Abort when *node is NULL: when the client should not accept writes
  361. * we return NULL in addReplyDeferredLen() */
  362. if (node == NULL) return;
  363. char lenstr[128];
  364. size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
  365. setDeferredReply(c, node, lenstr, lenstr_len);
  366. }
  367. void setDeferredArrayLen(client *c, void *node, long length) {
  368. setDeferredAggregateLen(c,node,length,'*');
  369. }
  370. void setDeferredMapLen(client *c, void *node, long length) {
  371. int prefix = c->resp == 2 ? '*' : '%';
  372. if (c->resp == 2) length *= 2;
  373. setDeferredAggregateLen(c,node,length,prefix);
  374. }
  375. void setDeferredSetLen(client *c, void *node, long length) {
  376. int prefix = c->resp == 2 ? '*' : '~';
  377. setDeferredAggregateLen(c,node,length,prefix);
  378. }
  379. void setDeferredAttributeLen(client *c, void *node, long length) {
  380. serverAssert(c->resp >= 3);
  381. setDeferredAggregateLen(c,node,length,'|');
  382. }
  383. void setDeferredPushLen(client *c, void *node, long length) {
  384. serverAssert(c->resp >= 3);
  385. setDeferredAggregateLen(c,node,length,'>');
  386. }
  387. /* Add a double as a bulk reply */
  388. void addReplyDouble(client *c, double d) {
  389. if (isinf(d)) {
  390. /* Libc in odd systems (Hi Solaris!) will format infinite in a
  391. * different way, so better to handle it in an explicit way. */
  392. if (c->resp == 2) {
  393. addReplyBulkCString(c, d > 0 ? "inf" : "-inf");
  394. } else {
  395. addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n",
  396. d > 0 ? 6 : 7);
  397. }
  398. } else {
  399. char dbuf[MAX_LONG_DOUBLE_CHARS+3],
  400. sbuf[MAX_LONG_DOUBLE_CHARS+32];
  401. int dlen, slen;
  402. if (c->resp == 2) {
  403. dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
  404. slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
  405. addReplyProto(c,sbuf,slen);
  406. } else {
  407. dlen = snprintf(dbuf,sizeof(dbuf),",%.17g\r\n",d);
  408. addReplyProto(c,dbuf,dlen);
  409. }
  410. }
  411. }
  412. void addReplyBigNum(client *c, const char* num, size_t len) {
  413. if (c->resp == 2) {
  414. addReplyBulkCBuffer(c, num, len);
  415. } else {
  416. addReplyProto(c,"(",1);
  417. addReplyProto(c,num,len);
  418. addReply(c,shared.crlf);
  419. }
  420. }
  421. /* Add a long double as a bulk reply, but uses a human readable formatting
  422. * of the double instead of exposing the crude behavior of doubles to the
  423. * dear user. */
  424. void addReplyHumanLongDouble(client *c, long double d) {
  425. if (c->resp == 2) {
  426. robj *o = createStringObjectFromLongDouble(d,1);
  427. addReplyBulk(c,o);
  428. decrRefCount(o);
  429. } else {
  430. char buf[MAX_LONG_DOUBLE_CHARS];
  431. int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN);
  432. addReplyProto(c,",",1);
  433. addReplyProto(c,buf,len);
  434. addReplyProto(c,"\r\n",2);
  435. }
  436. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载