这里主要讲述了Redis池分配的一些事情和操作。
在阅读源码的时候,需要大致注意下结构体的含义,以及嵌套在其中的逻辑关系。
一般来说,我看这些代码,只看大概意思,不注重细节。
- /* AutMemEntry type field values. */
- #define REDISMODULE_AM_KEY 0
- #define REDISMODULE_AM_STRING 1
- #define REDISMODULE_AM_REPLY 2
- #define REDISMODULE_AM_FREED 3 /* Explicitly freed by user already. */
- #define REDISMODULE_AM_DICT 4
- #define REDISMODULE_AM_INFO 5
-
- /* The pool allocator block. Redis Modules can allocate memory via this special
- * allocator that will automatically release it all once the callback returns.
- * This means that it can only be used for ephemeral allocations. However
- * there are two advantages for modules to use this API:
- *
- * 1) The memory is automatically released when the callback returns.
- * 2) This allocator is faster for many small allocations since whole blocks
- * are allocated, and small pieces returned to the caller just advancing
- * the index of the allocation.
- *
- * Allocations are always rounded to the size of the void pointer in order
- * to always return aligned memory chunks. */
-
- #define REDISMODULE_POOL_ALLOC_MIN_SIZE (1024*8)
- #define REDISMODULE_POOL_ALLOC_ALIGN (sizeof(void*))
-
- typedef struct RedisModulePoolAllocBlock {
- uint32_t size;
- uint32_t used;
- struct RedisModulePoolAllocBlock *next;
- char memory[];
- } RedisModulePoolAllocBlock;
-
- /* This structure represents the context in which Redis modules operate.
- * Most APIs module can access, get a pointer to the context, so that the API
- * implementation can hold state across calls, or remember what to free after
- * the call and so forth.
- *
- * Note that not all the context structure is always filled with actual values
- * but only the fields needed in a given context. */
-
- struct RedisModuleBlockedClient;
-
- struct RedisModuleCtx {
- void *getapifuncptr; /* NOTE: Must be the first field. */
- struct RedisModule *module; /* Module reference. */
- client *client; /* Client calling a command. */
- struct RedisModuleBlockedClient *blocked_client; /* Blocked client for
- thread safe context. */
- struct AutoMemEntry *amqueue; /* Auto memory queue of objects to free. */
- int amqueue_len; /* Number of slots in amqueue. */
- int amqueue_used; /* Number of used slots in amqueue. */
- int flags; /* REDISMODULE_CTX_... flags. */
- void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
- int postponed_arrays_count; /* Number of entries in postponed_arrays. */
- void *blocked_privdata; /* Privdata set when unblocking a client. */
- RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
- gets called for clients blocked
- on keys. */
-
- /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
- getKeysResult *keys_result;
-
- struct RedisModulePoolAllocBlock *pa_head;
- redisOpArray saved_oparray; /* When propagating commands in a callback
- we reallocate the "also propagate" op
- array. Here we save the old one to
- restore it later. */
- };
- typedef struct RedisModuleCtx RedisModuleCtx;
-
- #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}
- #define REDISMODULE_CTX_AUTO_MEMORY (1<<0)
- #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<1)
- #define REDISMODULE_CTX_BLOCKED_REPLY (1<<2)
- #define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3)
- #define REDISMODULE_CTX_THREAD_SAFE (1<<4)
- #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
- #define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6)
- #define REDISMODULE_CTX_MULTI_EMITTED (1<<7)
-
- /* This represents a Redis key opened with RM_OpenKey(). */
- struct RedisModuleKey {
- RedisModuleCtx *ctx;
- redisDb *db;
- robj *key; /* Key name object. */
- robj *value; /* Value object, or NULL if the key was not found. */
- void *iter; /* Iterator. */
- int mode; /* Opening mode. */
-
- union {
- struct {
- /* Zset iterator, use only if value->type == OBJ_ZSET */
- uint32_t type; /* REDISMODULE_ZSET_RANGE_* */
- zrangespec rs; /* Score range. */
- zlexrangespec lrs; /* Lex range. */
- uint32_t start; /* Start pos for positional ranges. */
- uint32_t end; /* End pos for positional ranges. */
- void *current; /* Zset iterator current node. */
- int er; /* Zset iterator end reached flag
- (true if end was reached). */
- } zset;
- struct {
- /* Stream, use only if value->type == OBJ_STREAM */
- streamID currentid; /* Current entry while iterating. */
- int64_t numfieldsleft; /* Fields left to fetch for current entry. */
- int signalready; /* Flag that signalKeyAsReady() is needed. */
- } stream;
- } u;
- };
- typedef struct RedisModuleKey RedisModuleKey;
-
- /* RedisModuleKey 'ztype' values. */
- #define REDISMODULE_ZSET_RANGE_NONE 0 /* This must always be 0. */
- #define REDISMODULE_ZSET_RANGE_LEX 1
- #define REDISMODULE_ZSET_RANGE_SCORE 2
- #define REDISMODULE_ZSET_RANGE_POS 3
-
- /* Function pointer type of a function representing a command inside
- * a Redis module. */
- struct RedisModuleBlockedClient;
- typedef int (*RedisModuleCmdFunc) (RedisModuleCtx *ctx, void **argv, int argc);
- typedef void (*RedisModuleDisconnectFunc) (RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc);
-
- /* This struct holds the information about a command registered by a module.*/
- struct RedisModuleCommandProxy {
- struct RedisModule *module;
- RedisModuleCmdFunc func;
- struct redisCommand *rediscmd;
- };
- typedef struct RedisModuleCommandProxy RedisModuleCommandProxy;
-
- #define REDISMODULE_REPLYFLAG_NONE 0
- #define REDISMODULE_REPLYFLAG_TOPARSE (1<<0) /* Protocol must be parsed. */
- #define REDISMODULE_REPLYFLAG_NESTED (1<<1) /* Nested reply object. No proto
- or struct free. */
-
- /* Reply of RM_Call() function. The function is filled in a lazy
- * way depending on the function called on the reply structure. By default
- * only the type, proto and protolen are filled. */
- typedef struct RedisModuleCallReply {
- RedisModuleCtx *ctx;
- int type; /* REDISMODULE_REPLY_... */
- int flags; /* REDISMODULE_REPLYFLAG_... */
- size_t len; /* Len of strings or num of elements of arrays. */
- char *proto; /* Raw reply protocol. An SDS string at top-level object. */
- size_t protolen;/* Length of protocol. */
- union {
- const char *str; /* String pointer for string and error replies. This
- does not need to be freed, always points inside
- a reply->proto buffer of the reply object or, in
- case of array elements, of parent reply objects. */
- long long ll; /* Reply value for integer reply. */
- struct RedisModuleCallReply *array; /* Array of sub-reply elements. */
- } val;
- } RedisModuleCallReply;
-
- /* Structure representing a blocked client. We get a pointer to such
- * an object when blocking from modules. */
- typedef struct RedisModuleBlockedClient {
- client *client; /* Pointer to the blocked client. or NULL if the client
- was destroyed during the life of this object. */
- RedisModule *module; /* Module blocking the client. */
- RedisModuleCmdFunc reply_callback; /* Reply callback on normal completion.*/
- RedisModuleCmdFunc timeout_callback; /* Reply callback on timeout. */
- RedisModuleDisconnectFunc disconnect_callback; /* Called on disconnection.*/
- void (*free_privdata)(RedisModuleCtx*,void*);/* privdata cleanup callback.*/
- void *privdata; /* Module private data that may be used by the reply
- or timeout callback. It is set via the
- RedisModule_UnblockClient() API. */
- client *reply_client; /* Fake client used to accumulate replies
- in thread safe contexts. */
- int dbid; /* Database number selected by the original client. */
- int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
- int unblocked; /* Already on the moduleUnblocked list. */
- monotime background_timer; /* Timer tracking the start of background work */
- uint64_t background_duration; /* Current command background time duration.
- Used for measuring latency of blocking cmds */
- } RedisModuleBlockedClient;
-
- static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
- static list *moduleUnblockedClients;
-
- /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
- * allow thread safe contexts to execute commands at a safe moment. */
- static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
-
-
- /* Function pointer type for keyspace event notification subscriptions from modules. */
- typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
-
- /* Keyspace notification subscriber information.
- * See RM_SubscribeToKeyspaceEvents() for more information. */
- typedef struct RedisModuleKeyspaceSubscriber {
- /* The module subscribed to the event */
- RedisModule *module;
- /* Notification callback in the module*/
- RedisModuleNotificationFunc notify_callback;
- /* A bit mask of the events the module is interested in */
- int event_mask;
- /* Active flag set on entry, to avoid reentrant subscribers
- * calling themselves */
- int active;
- } RedisModuleKeyspaceSubscriber;
-
- /* The module keyspace notification subscribers list */
- static list *moduleKeyspaceSubscribers;
-
- /* Static client recycled for when we need to provide a context with a client
- * in a situation where there is no client to provide. This avoids allocating
- * a new client per round. For instance this is used in the keyspace
- * notifications, timers and cluster messages callbacks. */
- static client *moduleFreeContextReusedClient;
-
- /* Data structures related to the exported dictionary data structure. */
- typedef struct RedisModuleDict {
- rax *rax; /* The radix tree. */
- } RedisModuleDict;
-
- typedef struct RedisModuleDictIter {
- RedisModuleDict *dict;
- raxIterator ri;
- } RedisModuleDictIter;
-
- typedef struct RedisModuleCommandFilterCtx {
- RedisModuleString **argv;
- int argc;
- } RedisModuleCommandFilterCtx;
-
- typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
-
- typedef struct RedisModuleCommandFilter {
- /* The module that registered the filter */
- RedisModule *module;
- /* Filter callback function */
- RedisModuleCommandFilterFunc callback;
- /* REDISMODULE_CMDFILTER_* flags */
- int flags;
- } RedisModuleCommandFilter;
-
- /* Registered filters */
- static list *moduleCommandFilters;
-
- typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
-
- static struct RedisModuleForkInfo {
- RedisModuleForkDoneHandler done_handler;
- void* done_handler_user_data;
- } moduleForkInfo = {0};
-
- typedef struct RedisModuleServerInfoData {
- rax *rax; /* parsed info data. */
- } RedisModuleServerInfoData;
-
- /* Flags for moduleCreateArgvFromUserFormat(). */
- #define REDISMODULE_ARGV_REPLICATE (1<<0)
- #define REDISMODULE_ARGV_NO_AOF (1<<1)
- #define REDISMODULE_ARGV_NO_REPLICAS (1<<2)
-
- /* Determine whether Redis should signalModifiedKey implicitly.
- * In case 'ctx' has no 'module' member (and therefore no module->options),
- * we assume default behavior, that is, Redis signals.
- * (see RM_GetThreadSafeContext) */
- #define SHOULD_SIGNAL_MODIFIED_KEYS(ctx) \
- ctx->module? !(ctx->module->options & REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED) : 1
-
- /* Server events hooks data structures and defines: this modules API
- * allow modules to subscribe to certain events in Redis, such as
- * the start and end of an RDB or AOF save, the change of role in replication,
- * and similar other events. */
-
- typedef struct RedisModuleEventListener {
- RedisModule *module;
- RedisModuleEvent event;
- RedisModuleEventCallback callback;
- } RedisModuleEventListener;
-
- list *RedisModule_EventListeners; /* Global list of all the active events. */
- unsigned long long ModulesInHooks = 0; /* Total number of modules in hooks
- callbacks right now. */
-
- /* Data structures related to the redis module users */
-
- /* This is the object returned by RM_CreateModuleUser(). The module API is
- * able to create users, set ACLs to such users, and later authenticate
- * clients using such newly created users. */
- typedef struct RedisModuleUser {
- user *user; /* Reference to the real redis user */
- } RedisModuleUser;
-
-
- /* --------------------------------------------------------------------------
- * Prototypes
- * -------------------------------------------------------------------------- */
-
- void RM_FreeCallReply(RedisModuleCallReply *reply);
- void RM_CloseKey(RedisModuleKey *key);
- void autoMemoryCollect(RedisModuleCtx *ctx);
- robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int *argcp, int *flags, va_list ap);
- void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
- void RM_ZsetRangeStop(RedisModuleKey *kp);
- static void zsetKeyReset(RedisModuleKey *key);
- static void moduleInitKeyTypeSpecific(RedisModuleKey *key);
- void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
- void RM_FreeServerInfo(RedisModuleCtx *ctx, RedisModuleServerInfoData *data);
-
- /* --------------------------------------------------------------------------
- * ## Heap allocation raw functions
- *
- * Memory allocated with these functions are taken into account by Redis key
- * eviction algorithms and are reported in Redis memory usage information.
- * -------------------------------------------------------------------------- */
-
- /* Use like malloc(). Memory allocated with this function is reported in
- * Redis INFO memory, used for keys eviction according to maxmemory settings
- * and in general is taken into account as memory allocated by Redis.
- * You should avoid using malloc(). */
- void *RM_Alloc(size_t bytes) {
- return zmalloc(bytes);
- }
-
- /* Use like calloc(). Memory allocated with this function is reported in
- * Redis INFO memory, used for keys eviction according to maxmemory settings
- * and in general is taken into account as memory allocated by Redis.
- * You should avoid using calloc() directly. */
- void *RM_Calloc(size_t nmemb, size_t size) {
- return zcalloc(nmemb*size);
- }
-
- /* Use like realloc() for memory obtained with RedisModule_Alloc(). */
- void* RM_Realloc(void *ptr, size_t bytes) {
- return zrealloc(ptr,bytes);
- }
-
- /* Use like free() for memory obtained by RedisModule_Alloc() and
- * RedisModule_Realloc(). However you should never try to free with
- * RedisModule_Free() memory allocated with malloc() inside your module. */
- void RM_Free(void *ptr) {
- zfree(ptr);
- }
-
- /* Like strdup() but returns memory allocated with RedisModule_Alloc(). */
- char *RM_Strdup(const char *str) {
- return zstrdup(str);
- }
-
- /* --------------------------------------------------------------------------
- * Pool allocator
- * -------------------------------------------------------------------------- */
-
- /* Release the chain of blocks used for pool allocations. */
- void poolAllocRelease(RedisModuleCtx *ctx) {
- RedisModulePoolAllocBlock *head = ctx->pa_head, *next;
-
- while(head != NULL) {
- next = head->next;
- zfree(head);
- head = next;
- }
- ctx->pa_head = NULL;
- }
-
- /* Return heap allocated memory that will be freed automatically when the
- * module callback function returns. Mostly suitable for small allocations
- * that are short living and must be released when the callback returns
- * anyway. The returned memory is aligned to the architecture word size
- * if at least word size bytes are requested, otherwise it is just
- * aligned to the next power of two, so for example a 3 bytes request is
- * 4 bytes aligned while a 2 bytes request is 2 bytes aligned.
- *
- * There is no realloc style function since when this is needed to use the
- * pool allocator is not a good idea.
- *
- * The function returns NULL if `bytes` is 0. */
- void *RM_PoolAlloc(RedisModuleCtx *ctx, size_t bytes) {
- if (bytes == 0) return NULL;
- RedisModulePoolAllocBlock *b = ctx->pa_head;
- size_t left = b ? b->size - b->used : 0;
-
- /* Fix alignment. */
- if (left >= bytes) {
- size_t alignment = REDISMODULE_POOL_ALLOC_ALIGN;
- while (bytes < alignment && alignment/2 >= bytes) alignment /= 2;
- if (b->used % alignment)
- b->used += alignment - (b->used % alignment);
- left = (b->used > b->size) ? 0 : b->size - b->used;
- }
-
- /* Create a new block if needed. */
- if (left < bytes) {
- size_t blocksize = REDISMODULE_POOL_ALLOC_MIN_SIZE;
- if (blocksize < bytes) blocksize = bytes;
- b = zmalloc(sizeof(*b) + blocksize);
- b->size = blocksize;
- b->used = 0;
- b->next = ctx->pa_head;
- ctx->pa_head = b;
- }
-
- char *retval = b->memory + b->used;
- b->used += bytes;
- return retval;
- }
-
- /* --------------------------------------------------------------------------
- * Helpers for modules API implementation
- * -------------------------------------------------------------------------- */
-
- /* Create an empty key of the specified type. `key` must point to a key object
- * opened for writing where the `.value` member is set to NULL because the
- * key was found to be non existing.
- *
- * On success REDISMODULE_OK is returned and the key is populated with
- * the value of the specified type. The function fails and returns
- * REDISMODULE_ERR if:
- *
- * 1. The key is not open for writing.
- * 2. The key is not empty.
- * 3. The specified type is unknown.
- */
- int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
- robj *obj;
-
- /* The key must be open for writing and non existing to proceed. */
- if (!(key->mode & REDISMODULE_WRITE) || key->value)
- return REDISMODULE_ERR;
-
- switch(type) {
- case REDISMODULE_KEYTYPE_LIST:
- obj = createQuicklistObject();
- quicklistSetOptions(obj->ptr, server.list_max_ziplist_size,
- server.list_compress_depth);
- break;
- case REDISMODULE_KEYTYPE_ZSET:
- obj = createZsetZiplistObject();
- break;
- case REDISMODULE_KEYTYPE_HASH:
- obj = createHashObject();
- break;
- case REDISMODULE_KEYTYPE_STREAM:
- obj = createStreamObject();
- break;
- default: return REDISMODULE_ERR;
- }
- dbAdd(key->db,key->key,obj);
- key->value = obj;
- moduleInitKeyTypeSpecific(key);
- return REDISMODULE_OK;
- }
-
- /* This function is called in low-level API implementation functions in order
- * to check if the value associated with the key remained empty after an
- * operation that removed elements from an aggregate data type.
- *
- * If this happens, the key is deleted from the DB and the key object state
- * is set to the right one in order to be targeted again by write operations
- * possibly recreating the key if needed.
- *
- * The function returns 1 if the key value object is found empty and is
- * deleted, otherwise 0 is returned. */
- int moduleDelKeyIfEmpty(RedisModuleKey *key) {
- if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL) return 0;
- int isempty;
- robj *o = key->value;
-
- switch(o->type) {
- case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
- case OBJ_SET: isempty = setTypeSize(o) == 0; break;
- case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
- case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
- case OBJ_STREAM: isempty = streamLength(o) == 0; break;
- default: isempty = 0;
- }
-
- if (isempty) {
- dbDelete(key->db,key->key);
- key->value = NULL;
- return 1;
- } else {
- return 0;
- }
- }
-
- /* --------------------------------------------------------------------------
- * Service API exported to modules
- *
- * Note that all the exported APIs are called RM_<funcname> in the core
- * and RedisModule_<funcname> in the module side (defined as function
- * pointers in redismodule.h). In this way the dynamic linker does not
- * mess with our global function pointers, overriding it with the symbols
- * defined in the main executable having the same names.
- * -------------------------------------------------------------------------- */
-
- int RM_GetApi(const char *funcname, void **targetPtrPtr) {
- /* Lookup the requested module API and store the function pointer into the
- * target pointer. The function returns REDISMODULE_ERR if there is no such
- * named API, otherwise REDISMODULE_OK.
- *
- * This function is not meant to be used by modules developer, it is only
- * used implicitly by including redismodule.h. */
- dictEntry *he = dictFind(server.moduleapi, funcname);
- if (!he) return REDISMODULE_ERR;
- *targetPtrPtr = dictGetVal(he);
- return REDISMODULE_OK;
- }
-
- /* Helper function for when a command callback is called, in order to handle
- * details needed to correctly replicate commands. */
- void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
- client *c = ctx->client;
-
- /* We don't need to do anything here if the context was never used
- * in order to propagate commands. */
- if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
-
- /* We don't need to do anything here if the server isn't inside
- * a transaction. */
- if (!server.propagate_in_transaction) return;
-
- /* If this command is executed from with Lua or MULTI/EXEC we do not
- * need to propagate EXEC */
- if (server.in_eval || server.in_exec) return;
-
- /* Handle the replication of the final EXEC, since whatever a command
- * emits is always wrapped around MULTI/EXEC. */
- alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
- PROPAGATE_AOF|PROPAGATE_REPL);
- afterPropagateExec();
-
- /* If this is not a module command context (but is instead a simple
- * callback context), we have to handle directly the "also propagate"
- * array and emit it. In a module command call this will be handled
- * directly by call(). */
- if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL) &&
- server.also_propagate.numops)
- {
- for (int j = 0; j < server.also_propagate.numops; j++) {
- redisOp *rop = &server.also_propagate.ops[j];
- int target = rop->target;
- if (target)
- propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
- }
- redisOpArrayFree(&server.also_propagate);
- /* Restore the previous oparray in case of nexted use of the API. */
- server.also_propagate = ctx->saved_oparray;
- /* We're done with saved_oparray, let's invalidate it. */
- redisOpArrayInit(&ctx->saved_oparray);
- }
- }