关键词搜索

源码搜索 ×
×

漫话Redis源码之五十四

发布2022-01-09浏览404次

详情内容

这里主要讲一些分配和释放操作,简单。

  1. #include "server.h"
  2. #include "bio.h"
  3. #include "atomicvar.h"
  4. #include "cluster.h"
  5. static redisAtomic size_t lazyfree_objects = 0;
  6. static redisAtomic size_t lazyfreed_objects = 0;
  7. /* Release objects from the lazyfree thread. It's just decrRefCount()
  8. * updating the count of objects to release. */
  9. void lazyfreeFreeObject(void *args[]) {
  10. robj *o = (robj *) args[0];
  11. decrRefCount(o);
  12. atomicDecr(lazyfree_objects,1);
  13. atomicIncr(lazyfreed_objects,1);
  14. }
  15. /* Release a database from the lazyfree thread. The 'db' pointer is the
  16. * database which was substituted with a fresh one in the main thread
  17. * when the database was logically deleted. */
  18. void lazyfreeFreeDatabase(void *args[]) {
  19. dict *ht1 = (dict *) args[0];
  20. dict *ht2 = (dict *) args[1];
  21. size_t numkeys = dictSize(ht1);
  22. dictRelease(ht1);
  23. dictRelease(ht2);
  24. atomicDecr(lazyfree_objects,numkeys);
  25. atomicIncr(lazyfreed_objects,numkeys);
  26. }
  27. /* Release the skiplist mapping Redis Cluster keys to slots in the
  28. * lazyfree thread. */
  29. void lazyfreeFreeSlotsMap(void *args[]) {
  30. rax *rt = args[0];
  31. size_t len = rt->numele;
  32. raxFree(rt);
  33. atomicDecr(lazyfree_objects,len);
  34. atomicIncr(lazyfreed_objects,len);
  35. }
  36. /* Release the key tracking table. */
  37. void lazyFreeTrackingTable(void *args[]) {
  38. rax *rt = args[0];
  39. size_t len = rt->numele;
  40. freeTrackingRadixTree(rt);
  41. atomicDecr(lazyfree_objects,len);
  42. atomicIncr(lazyfreed_objects,len);
  43. }
  44. void lazyFreeLuaScripts(void *args[]) {
  45. dict *lua_scripts = args[0];
  46. long long len = dictSize(lua_scripts);
  47. dictRelease(lua_scripts);
  48. atomicDecr(lazyfree_objects,len);
  49. atomicIncr(lazyfreed_objects,len);
  50. }
  51. /* Return the number of currently pending objects to free. */
  52. size_t lazyfreeGetPendingObjectsCount(void) {
  53. size_t aux;
  54. atomicGet(lazyfree_objects,aux);
  55. return aux;
  56. }
  57. /* Return the number of objects that have been freed. */
  58. size_t lazyfreeGetFreedObjectsCount(void) {
  59. size_t aux;
  60. atomicGet(lazyfreed_objects,aux);
  61. return aux;
  62. }
  63. /* Return the amount of work needed in order to free an object.
  64. * The return value is not always the actual number of allocations the
  65. * object is composed of, but a number proportional to it.
  66. *
  67. * For strings the function always returns 1.
  68. *
  69. * For aggregated objects represented by hash tables or other data structures
  70. * the function just returns the number of elements the object is composed of.
  71. *
  72. * Objects composed of single allocations are always reported as having a
  73. * single item even if they are actually logical composed of multiple
  74. * elements.
  75. *
  76. * For lists the function returns the number of elements in the quicklist
  77. * representing the list. */
  78. size_t lazyfreeGetFreeEffort(robj *key, robj *obj) {
  79. if (obj->type == OBJ_LIST) {
  80. quicklist *ql = obj->ptr;
  81. return ql->len;
  82. } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
  83. dict *ht = obj->ptr;
  84. return dictSize(ht);
  85. } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
  86. zset *zs = obj->ptr;
  87. return zs->zsl->length;
  88. } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
  89. dict *ht = obj->ptr;
  90. return dictSize(ht);
  91. } else if (obj->type == OBJ_STREAM) {
  92. size_t effort = 0;
  93. stream *s = obj->ptr;
  94. /* Make a best effort estimate to maintain constant runtime. Every macro
  95. * node in the Stream is one allocation. */
  96. effort += s->rax->numnodes;
  97. /* Every consumer group is an allocation and so are the entries in its
  98. * PEL. We use size of the first group's PEL as an estimate for all
  99. * others. */
  100. if (s->cgroups && raxSize(s->cgroups)) {
  101. raxIterator ri;
  102. streamCG *cg;
  103. raxStart(&ri,s->cgroups);
  104. raxSeek(&ri,"^",NULL,0);
  105. /* There must be at least one group so the following should always
  106. * work. */
  107. serverAssert(raxNext(&ri));
  108. cg = ri.data;
  109. effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
  110. raxStop(&ri);
  111. }
  112. return effort;
  113. } else if (obj->type == OBJ_MODULE) {
  114. moduleValue *mv = obj->ptr;
  115. moduleType *mt = mv->type;
  116. if (mt->free_effort != NULL) {
  117. size_t effort = mt->free_effort(key,mv->value);
  118. /* If the module's free_effort returns 0, it will use asynchronous free
  119. memory by default */
  120. return effort == 0 ? ULONG_MAX : effort;
  121. } else {
  122. return 1;
  123. }
  124. } else {
  125. return 1; /* Everything else is a single allocation. */
  126. }
  127. }
  128. /* Delete a key, value, and associated expiration entry if any, from the DB.
  129. * If there are enough allocations to free the value object may be put into
  130. * a lazy free list instead of being freed synchronously. The lazy free list
  131. * will be reclaimed in a different bio.c thread. */
  132. #define LAZYFREE_THRESHOLD 64
  133. int dbAsyncDelete(redisDb *db, robj *key) {
  134. /* Deleting an entry from the expires dict will not free the sds of
  135. * the key, because it is shared with the main dictionary. */
  136. if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
  137. /* If the value is composed of a few allocations, to free in a lazy way
  138. * is actually just slower... So under a certain limit we just free
  139. * the object synchronously. */
  140. dictEntry *de = dictUnlink(db->dict,key->ptr);
  141. if (de) {
  142. robj *val = dictGetVal(de);
  143. /* Tells the module that the key has been unlinked from the database. */
  144. moduleNotifyKeyUnlink(key,val);
  145. size_t free_effort = lazyfreeGetFreeEffort(key,val);
  146. /* If releasing the object is too much work, do it in the background
  147. * by adding the object to the lazy free list.
  148. * Note that if the object is shared, to reclaim it now it is not
  149. * possible. This rarely happens, however sometimes the implementation
  150. * of parts of the Redis core may call incrRefCount() to protect
  151. * objects, and then call dbDelete(). In this case we'll fall
  152. * through and reach the dictFreeUnlinkedEntry() call, that will be
  153. * equivalent to just calling decrRefCount(). */
  154. if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
  155. atomicIncr(lazyfree_objects,1);
  156. bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
  157. dictSetVal(db->dict,de,NULL);
  158. }
  159. }
  160. /* Release the key-val pair, or just the key if we set the val
  161. * field to NULL in order to lazy free it later. */
  162. if (de) {
  163. dictFreeUnlinkedEntry(db->dict,de);
  164. if (server.cluster_enabled) slotToKeyDel(key->ptr);
  165. return 1;
  166. } else {
  167. return 0;
  168. }
  169. }
  170. /* Free an object, if the object is huge enough, free it in async way. */
  171. void freeObjAsync(robj *key, robj *obj) {
  172. size_t free_effort = lazyfreeGetFreeEffort(key,obj);
  173. if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
  174. atomicIncr(lazyfree_objects,1);
  175. bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
  176. } else {
  177. decrRefCount(obj);
  178. }
  179. }
  180. /* Empty a Redis DB asynchronously. What the function does actually is to
  181. * create a new empty set of hash tables and scheduling the old ones for
  182. * lazy freeing. */
  183. void emptyDbAsync(redisDb *db) {
  184. dict *oldht1 = db->dict, *oldht2 = db->expires;
  185. db->dict = dictCreate(&dbDictType,NULL);
  186. db->expires = dictCreate(&dbExpiresDictType,NULL);
  187. atomicIncr(lazyfree_objects,dictSize(oldht1));
  188. bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
  189. }
  190. /* Release the radix tree mapping Redis Cluster keys to slots asynchronously. */
  191. void freeSlotsToKeysMapAsync(rax *rt) {
  192. atomicIncr(lazyfree_objects,rt->numele);
  193. bioCreateLazyFreeJob(lazyfreeFreeSlotsMap,1,rt);
  194. }
  195. /* Free an object, if the object is huge enough, free it in async way. */
  196. void freeTrackingRadixTreeAsync(rax *tracking) {
  197. atomicIncr(lazyfree_objects,tracking->numele);
  198. bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
  199. }
  200. /* Free lua_scripts dict, if the dict is huge enough, free it in async way. */
  201. void freeLuaScriptsAsync(dict *lua_scripts) {
  202. if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
  203. atomicIncr(lazyfree_objects,dictSize(lua_scripts));
  204. bioCreateLazyFreeJob(lazyFreeLuaScripts,1,lua_scripts);
  205. } else {
  206. dictRelease(lua_scripts);
  207. }
  208. }

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载