关键词搜索

源码搜索 ×
×

漫话Redis源码之八

发布2021-11-21浏览548次

详情内容

这行的意思确实没看太明白为啥这样:group_inserted == 1 && consumer_inserted == 0

有看懂的欢迎交流。

  1. if (!(flags & STREAM_RWR_RAWENTRIES))
  2. arraylen_ptr = addReplyDeferredLen(c);
  3. streamIteratorStart(&si,s,start,end,rev);
  4. while(streamIteratorGetID(&si,&id,&numfields)) {
  5. /* Update the group last_id if needed. */
  6. if (group && streamCompareID(&id,&group->last_id) > 0) {
  7. group->last_id = id;
  8. /* Group last ID should be propagated only if NOACK was
  9. * specified, otherwise the last id will be included
  10. * in the propagation of XCLAIM itself. */
  11. if (noack) propagate_last_id = 1;
  12. }
  13. /* Emit a two elements array for each item. The first is
  14. * the ID, the second is an array of field-value pairs. */
  15. addReplyArrayLen(c,2);
  16. addReplyStreamID(c,&id);
  17. addReplyArrayLen(c,numfields*2);
  18. /* Emit the field-value pairs. */
  19. while(numfields--) {
  20. unsigned char *key, *value;
  21. int64_t key_len, value_len;
  22. streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
  23. addReplyBulkCBuffer(c,key,key_len);
  24. addReplyBulkCBuffer(c,value,value_len);
  25. }
  26. /* If a group is passed, we need to create an entry in the
  27. * PEL (pending entries list) of this group *and* this consumer.
  28. *
  29. * Note that we cannot be sure about the fact the message is not
  30. * already owned by another consumer, because the admin is able
  31. * to change the consumer group last delivered ID using the
  32. * XGROUP SETID command. So if we find that there is already
  33. * a NACK for the entry, we need to associate it to the new
  34. * consumer. */
  35. if (group && !noack) {
  36. unsigned char buf[sizeof(streamID)];
  37. streamEncodeID(buf,&id);
  38. /* Try to add a new NACK. Most of the time this will work and
  39. * will not require extra lookups. We'll fix the problem later
  40. * if we find that there is already a entry for this ID. */
  41. streamNACK *nack = streamCreateNACK(consumer);
  42. int group_inserted =
  43. raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
  44. int consumer_inserted =
  45. raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
  46. /* Now we can check if the entry was already busy, and
  47. * in that case reassign the entry to the new consumer,
  48. * or update it if the consumer is the same as before. */
  49. if (group_inserted == 0) {
  50. streamFreeNACK(nack);
  51. nack = raxFind(group->pel,buf,sizeof(buf));
  52. serverAssert(nack != raxNotFound);
  53. raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
  54. /* Update the consumer and NACK metadata. */
  55. nack->consumer = consumer;
  56. nack->delivery_time = mstime();
  57. nack->delivery_count = 1;
  58. /* Add the entry in the new consumer local PEL. */
  59. raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
  60. } else if (group_inserted == 1 && consumer_inserted == 0) {
  61. serverPanic("NACK half-created. Should not be possible.");
  62. }
  63. /* Propagate as XCLAIM. */
  64. if (spi) {
  65. robj *idarg = createObjectFromStreamID(&id);
  66. streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
  67. decrRefCount(idarg);
  68. }
  69. }
  70. arraylen++;
  71. if (count && count == arraylen) break;
  72. }
  73. if (spi && propagate_last_id)
  74. streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
  75. streamIteratorStop(&si);
  76. if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
  77. return arraylen;
  78. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载