关键词搜索

源码搜索 ×
×

漫话Redis源码之八十三

发布2022-02-13浏览1107次

详情内容

触发并掩码掉给定的fd:

  1. static int aeApiLookupPending(aeApiState *state, int fd) {
  2. uint_t i;
  3. for (i = 0; i < state->npending; i++) {
  4. if (state->pending_fds[i] == fd)
  5. return (i);
  6. }
  7. return (-1);
  8. }
  9. /*
  10. * Helper function to invoke port_associate for the given fd and mask.
  11. */
  12. static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
  13. int events = 0;
  14. int rv, err;
  15. if (mask & AE_READABLE)
  16. events |= POLLIN;
  17. if (mask & AE_WRITABLE)
  18. events |= POLLOUT;
  19. if (evport_debug)
  20. fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
  21. rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
  22. (void *)(uintptr_t)mask);
  23. err = errno;
  24. if (evport_debug)
  25. fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
  26. if (rv == -1) {
  27. fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
  28. if (err == EAGAIN)
  29. fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
  30. }
  31. return rv;
  32. }
  33. static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  34. aeApiState *state = eventLoop->apidata;
  35. int fullmask, pfd;
  36. if (evport_debug)
  37. fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
  38. /*
  39. * Since port_associate's "events" argument replaces any existing events, we
  40. * must be sure to include whatever events are already associated when
  41. * we call port_associate() again.
  42. */
  43. fullmask = mask | eventLoop->events[fd].mask;
  44. pfd = aeApiLookupPending(state, fd);
  45. if (pfd != -1) {
  46. /*
  47. * This fd was recently returned from aeApiPoll. It should be safe to
  48. * assume that the consumer has processed that poll event, but we play
  49. * it safer by simply updating pending_mask. The fd will be
  50. * re-associated as usual when aeApiPoll is called again.
  51. */
  52. if (evport_debug)
  53. fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
  54. state->pending_masks[pfd] |= fullmask;
  55. return 0;
  56. }
  57. return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
  58. }
  59. static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
  60. aeApiState *state = eventLoop->apidata;
  61. int fullmask, pfd;
  62. if (evport_debug)
  63. fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
  64. pfd = aeApiLookupPending(state, fd);
  65. if (pfd != -1) {
  66. if (evport_debug)
  67. fprintf(stderr, "deleting event from pending fd %d\n", fd);
  68. /*
  69. * This fd was just returned from aeApiPoll, so it's not currently
  70. * associated with the port. All we need to do is update
  71. * pending_mask appropriately.
  72. */
  73. state->pending_masks[pfd] &= ~mask;
  74. if (state->pending_masks[pfd] == AE_NONE)
  75. state->pending_fds[pfd] = -1;
  76. return;
  77. }
  78. /*
  79. * The fd is currently associated with the port. Like with the add case
  80. * above, we must look at the full mask for the file descriptor before
  81. * updating that association. We don't have a good way of knowing what the
  82. * events are without looking into the eventLoop state directly. We rely on
  83. * the fact that our caller has already updated the mask in the eventLoop.
  84. */
  85. fullmask = eventLoop->events[fd].mask;
  86. if (fullmask == AE_NONE) {
  87. /*
  88. * We're removing *all* events, so use port_dissociate to remove the
  89. * association completely. Failure here indicates a bug.
  90. */
  91. if (evport_debug)
  92. fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
  93. if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
  94. perror("aeApiDelEvent: port_dissociate");
  95. abort(); /* will not return */
  96. }
  97. } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
  98. fullmask) != 0) {
  99. /*
  100. * ENOMEM is a potentially transient condition, but the kernel won't
  101. * generally return it unless things are really bad. EAGAIN indicates
  102. * we've reached a resource limit, for which it doesn't make sense to
  103. * retry (counter-intuitively). All other errors indicate a bug. In any
  104. * of these cases, the best we can do is to abort.
  105. */
  106. abort(); /* will not return */
  107. }
  108. }
  109. static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  110. aeApiState *state = eventLoop->apidata;
  111. struct timespec timeout, *tsp;
  112. uint_t mask, i;
  113. uint_t nevents;
  114. port_event_t event[MAX_EVENT_BATCHSZ];
  115. /*
  116. * If we've returned fd events before, we must re-associate them with the
  117. * port now, before calling port_get(). See the block comment at the top of
  118. * this file for an explanation of why.
  119. */
  120. for (i = 0; i < state->npending; i++) {
  121. if (state->pending_fds[i] == -1)
  122. /* This fd has since been deleted. */
  123. continue;
  124. if (aeApiAssociate("aeApiPoll", state->portfd,
  125. state->pending_fds[i], state->pending_masks[i]) != 0) {
  126. /* See aeApiDelEvent for why this case is fatal. */
  127. abort();
  128. }
  129. state->pending_masks[i] = AE_NONE;
  130. state->pending_fds[i] = -1;
  131. }
  132. state->npending = 0;
  133. if (tvp != NULL) {
  134. timeout.tv_sec = tvp->tv_sec;
  135. timeout.tv_nsec = tvp->tv_usec * 1000;
  136. tsp = &timeout;
  137. } else {
  138. tsp = NULL;
  139. }
  140. /*
  141. * port_getn can return with errno == ETIME having returned some events (!).
  142. * So if we get ETIME, we check nevents, too.
  143. */
  144. nevents = 1;
  145. if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
  146. tsp) == -1 && (errno != ETIME || nevents == 0)) {
  147. if (errno == ETIME || errno == EINTR)
  148. return 0;
  149. /* Any other error indicates a bug. */
  150. perror("aeApiPoll: port_get");
  151. abort();
  152. }
  153. state->npending = nevents;
  154. for (i = 0; i < nevents; i++) {
  155. mask = 0;
  156. if (event[i].portev_events & POLLIN)
  157. mask |= AE_READABLE;
  158. if (event[i].portev_events & POLLOUT)
  159. mask |= AE_WRITABLE;
  160. eventLoop->fired[i].fd = event[i].portev_object;
  161. eventLoop->fired[i].mask = mask;
  162. if (evport_debug)
  163. fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
  164. (int)event[i].portev_object, mask);
  165. state->pending_fds[i] = event[i].portev_object;
  166. state->pending_masks[i] = (uintptr_t)event[i].portev_user;
  167. }
  168. return nevents;
  169. }
  170. static char *aeApiName(void) {
  171. return "evport";
  172. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载