diff --git a/command.c b/command.c index e32091b..c4c463f 100644 --- a/command.c +++ b/command.c @@ -46,6 +46,7 @@ redis_arg0(struct cmd *r) case CMD_REQ_REDIS_DECR: case CMD_REQ_REDIS_GET: + case CMD_REQ_REDIS_KEYS: case CMD_REQ_REDIS_INCR: case CMD_REQ_REDIS_STRLEN: @@ -485,6 +486,11 @@ redis_parse_cmd(struct cmd *r) break; case 4: + if (str4icmp(m, 'k', 'e', 'y', 's')) { + r->type = CMD_REQ_REDIS_KEYS; + break; + } + if (str4icmp(m, 'p', 't', 't', 'l')) { r->type = CMD_REQ_REDIS_PTTL; break; diff --git a/command.h b/command.h index b7c388a..37c1e16 100644 --- a/command.h +++ b/command.h @@ -126,7 +126,7 @@ typedef enum cmd_parse_result { ACTION( RSP_REDIS_BULK ) \ ACTION( RSP_REDIS_MULTIBULK ) \ ACTION( SENTINEL ) \ - + ACTION( REQ_REDIS_KEYS ) #define DEFINE_ACTION(_name) CMD_##_name, typedef enum cmd_type { diff --git a/hircluster.c b/hircluster.c index b051936..a7eb700 100644 --- a/hircluster.c +++ b/hircluster.c @@ -3769,6 +3769,33 @@ void *redisClusterCommand(redisClusterContext *cc, const char *format, ...) { return reply; } +void *redisClusterBroadcastCommand(redisClusterContext *cc, const char *format, ...){ + va_list ap; + redisReply *reply = NULL; + char *cmd; + int len; + + if(cc == NULL) + { + return NULL; + } + va_start(ap,format); + len = redisvFormatCommand(&cmd,format,ap); + + if (len == -1) { + __redisClusterSetError(cc,REDIS_ERR_OOM,"Out of memory"); + return NULL; + } else if (len == -2) { + __redisClusterSetError(cc,REDIS_ERR_OTHER,"Invalid format string"); + return NULL; + } + + reply=redisCLusterCommandSendAll(cc,cmd,len); + va_end(ap); + free(cmd); + return reply; +} + void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen) { redisReply *reply = NULL; char *cmd; @@ -4030,6 +4057,113 @@ static int redisCLusterSendAll(redisClusterContext *cc) return REDIS_OK; } +redisReply *redisCLusterCommandSendAll(redisClusterContext *cc,char *cmd,size_t len) +{ + dictIterator *di; + dictEntry *de; + struct cluster_node *node; + redisContext *c = NULL; + void *aux = NULL; + int elements = 0; + if(cc == NULL || cc->nodes == NULL) + { + return NULL; + } + + redisReply *reply=NULL; + reply = hi_calloc(1,sizeof(*reply)); + if (reply == NULL) + { + __redisClusterSetError(cc,REDIS_ERR_OOM,"Out of memory"); + return NULL; + } + reply->type = REDIS_REPLY_ARRAY; + di = dictGetIterator(cc->nodes); + while((de = dictNext(di)) != NULL) + { + int wdone = 0; + node = dictGetEntryVal(de); + if(node == NULL) + { + continue; + } + + c = ctx_get_by_node(cc, node); + if(c == NULL) + { + continue; + } + if (__redisAppendCommand(c,cmd,len) != REDIS_OK) + { + continue; + } + + /* Try to read pending replies */ + if (redisGetReplyFromReader(c,&aux) == REDIS_ERR){ + free(reply); + return NULL; + } + + if (c->flags & REDIS_BLOCK) { + /* Write until done */ + do { + if (redisBufferWrite(c,&wdone) == REDIS_ERR) + { + dictReleaseIterator(di); + free(reply); + return NULL; + } + } while (!wdone); + + /* Read until there is a reply */ + do { + if (redisBufferRead(c) == REDIS_ERR){ + free(reply); + return NULL; + } + if (redisGetReplyFromReader(c,&aux) == REDIS_ERR){ + free(reply); + return NULL; + } + } while (aux == NULL); + } + redisReply *local_reply = (redisReply*)aux; + //Memory Allocation + if(reply->element==NULL){ + size_t members=1; + if(local_reply->type==REDIS_REPLY_ARRAY){ + members=local_reply->elements; + } + reply->element = hi_alloc(members*sizeof(*reply)); + } + else{ + if(local_reply->type==REDIS_REPLY_ARRAY){ + reply->element = hi_realloc(reply->element,(elements+local_reply->elements)* sizeof(*reply)); + } + else{ + reply->element = hi_realloc(reply->element,(elements+1)* sizeof(*reply)); + } + + } + //Adding element within array + if(local_reply->type==REDIS_REPLY_ARRAY){ + for(int i=0;ielements;i++){ + reply->element[elements] = local_reply->element[i]; + elements++; + } + local_reply->elements=NULL; + freeReplyObject(local_reply); + } + else{ + reply->element[elements] = local_reply; + elements++; + } + } + reply->elements=elements; + dictReleaseIterator(di); + return reply; +} + static int redisCLusterClearAll(redisClusterContext *cc) { dictIterator *di; @@ -4962,3 +5096,18 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc) hi_free(acc); } +void print_reply(redisReply *reply){ + switch(reply->type){ + case REDIS_REPLY_STRING:printf("REDIS_REPLY_STRING, Reply:%s\n",reply->str);break; + case REDIS_REPLY_ARRAY:printf("REDIS_REPLY_ARRAY with %d elements\n",reply->elements); + for(int i=0;i<(reply->elements);i++){ + print_reply((reply->element)[i]); + } + break; + case REDIS_REPLY_INTEGER:printf("REDIS_REPLY_INTEGER, Reply:%lld\n",reply->integer);break; + case REDIS_REPLY_NIL:printf("REDIS_REPLY_NIL\n");break; + case REDIS_REPLY_STATUS:printf("REDIS_REPLY_STATUS, Reply:%s\n",reply->str);break; + case REDIS_REPLY_ERROR:printf("REDIS_REPLY_ERROR, Reply:%s\n",reply->str);break; + default: printf("Unknown Reply Type"); + } +} \ No newline at end of file diff --git a/hircluster.h b/hircluster.h index 95585c9..1621bbd 100644 --- a/hircluster.h +++ b/hircluster.h @@ -142,7 +142,9 @@ int test_cluster_update_route(redisClusterContext *cc); struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags); struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags); - +void *redisClusterBroadcastCommand(redisClusterContext *cc, const char *format, ...); +redisReply *redisCLusterCommandSendAll(redisClusterContext *cc,char *cmd,size_t len); +void print_reply(redisReply *reply); /*############redis cluster async############*/ struct redisClusterAsyncContext;