diff options
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r-- | src/redis-cli.c | 158 |
1 files changed, 104 insertions, 54 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c index 75845f346..0148964bf 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1989,6 +1989,7 @@ static void repl(void) { if (argv == NULL) { printf("Invalid argument(s)\n"); + fflush(stdout); linenoiseFree(line); continue; } else if (argc > 0) { @@ -6784,10 +6785,53 @@ void sendCapa() { sendReplconf("capa", "eof"); } +/* Wrapper around hiredis to allow arbitrary reads and writes. + * + * We piggybacks on top of hiredis to achieve transparent TLS support, + * and use its internal buffers so it can co-exist with commands + * previously/later issued on the connection. + * + * Interface is close to enough to read()/write() so things should mostly + * work transparently. + */ + +/* Write a raw buffer through a redisContext. If we already have something + * in the buffer (leftovers from hiredis operations) it will be written + * as well. + */ +static ssize_t writeConn(redisContext *c, const char *buf, size_t buf_len) +{ + int done = 0; + + c->obuf = sdscatlen(c->obuf, buf, buf_len); + if (redisBufferWrite(c, &done) == REDIS_ERR) { + sdsrange(c->obuf, 0, -(buf_len+1)); + if (!(c->flags & REDIS_BLOCK)) + errno = EAGAIN; + return -1; + } + + size_t left = sdslen(c->obuf); + sdsclear(c->obuf); + if (!done) { + return buf_len - left; + } + + return buf_len; +} + +/* Read raw bytes through a redisContext. The read operation is not greedy + * and may not fill the buffer entirely. + */ +static ssize_t readConn(redisContext *c, char *buf, size_t len) +{ + return c->funcs->read(c, buf, len); +} + /* Sends SYNC and reads the number of bytes in the payload. Used both by * slaveMode() and getRDB(). * returns 0 in case an EOF marker is used. */ -unsigned long long sendSync(int fd, char *out_eof) { +unsigned long long sendSync(redisContext *c, char *out_eof) { /* To start we need to send the SYNC command and return the payload. * The hiredis client lib does not understand this part of the protocol * and we don't want to mess with its buffers, so everything is performed @@ -6796,7 +6840,7 @@ unsigned long long sendSync(int fd, char *out_eof) { ssize_t nread; /* Send the SYNC command. */ - if (write(fd,"SYNC\r\n",6) != 6) { + if (writeConn(c, "SYNC\r\n", 6) != 6) { fprintf(stderr,"Error writing to master\n"); exit(1); } @@ -6804,7 +6848,7 @@ unsigned long long sendSync(int fd, char *out_eof) { /* Read $<payload>\r\n, making sure to read just up to "\n" */ p = buf; while(1) { - nread = read(fd,p,1); + nread = readConn(c,p,1); if (nread <= 0) { fprintf(stderr,"Error reading bulk length while SYNCing\n"); exit(1); @@ -6825,11 +6869,10 @@ unsigned long long sendSync(int fd, char *out_eof) { } static void slaveMode(void) { - int fd = context->fd; static char eofmark[RDB_EOF_MARK_SIZE]; static char lastbytes[RDB_EOF_MARK_SIZE]; static int usemark = 0; - unsigned long long payload = sendSync(fd, eofmark); + unsigned long long payload = sendSync(context,eofmark); char buf[1024]; int original_output = config.output; @@ -6849,7 +6892,7 @@ static void slaveMode(void) { while(payload) { ssize_t nread; - nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); + nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); if (nread <= 0) { fprintf(stderr,"Error reading RDB payload while SYNCing\n"); exit(1); @@ -6892,14 +6935,15 @@ static void slaveMode(void) { /* This function implements --rdb, so it uses the replication protocol in order * to fetch the RDB file from a remote server. */ static void getRDB(clusterManagerNode *node) { - int s, fd; + int fd; + redisContext *s; char *filename; if (node != NULL) { assert(node->context); - s = node->context->fd; + s = node->context; filename = clusterManagerGetNodeRDBFilename(node); } else { - s = context->fd; + s = context; filename = config.rdb_filename; } static char eofmark[RDB_EOF_MARK_SIZE]; @@ -6934,7 +6978,7 @@ static void getRDB(clusterManagerNode *node) { while(payload) { ssize_t nread, nwritten; - nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); + nread = readConn(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); if (nread <= 0) { fprintf(stderr,"I/O Error reading RDB payload from socket\n"); exit(1); @@ -6968,7 +7012,7 @@ static void getRDB(clusterManagerNode *node) { } else { fprintf(stderr,"Transfer finished with success.\n"); } - close(s); /* Close the file descriptor ASAP as fsync() may take time. */ + redisFree(s); /* Close the file descriptor ASAP as fsync() may take time. */ fsync(fd); close(fd); fprintf(stderr,"Transfer finished with success.\n"); @@ -6985,11 +7029,9 @@ static void getRDB(clusterManagerNode *node) { #define PIPEMODE_WRITE_LOOP_MAX_BYTES (128*1024) static void pipeMode(void) { - int fd = context->fd; long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0; - char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */ + char obuf[1024*16]; /* Output buffer */ char aneterr[ANET_ERR_LEN]; - redisReader *reader = redisReaderCreate(); redisReply *reply; int eof = 0; /* True once we consumed all the standard input. */ int done = 0; @@ -6999,47 +7041,38 @@ static void pipeMode(void) { srand(time(NULL)); /* Use non blocking I/O. */ - if (anetNonBlock(aneterr,fd) == ANET_ERR) { + if (anetNonBlock(aneterr,context->fd) == ANET_ERR) { fprintf(stderr, "Can't set the socket in non blocking mode: %s\n", aneterr); exit(1); } + context->flags &= ~REDIS_BLOCK; + /* Transfer raw protocol and read replies from the server at the same * time. */ while(!done) { int mask = AE_READABLE; if (!eof || obuf_len != 0) mask |= AE_WRITABLE; - mask = aeWait(fd,mask,1000); + mask = aeWait(context->fd,mask,1000); /* Handle the readable state: we can read replies from the server. */ if (mask & AE_READABLE) { - ssize_t nread; int read_error = 0; - /* Read from socket and feed the hiredis reader. */ do { - nread = read(fd,ibuf,sizeof(ibuf)); - if (nread == -1 && errno != EAGAIN && errno != EINTR) { - fprintf(stderr, "Error reading from the server: %s\n", - strerror(errno)); + if (!read_error && redisBufferRead(context) == REDIS_ERR) { read_error = 1; - break; - } - if (nread > 0) { - redisReaderFeed(reader,ibuf,nread); - last_read_time = time(NULL); } - } while(nread > 0); - /* Consume replies. */ - do { - if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) { + reply = NULL; + if (redisGetReply(context, (void **) &reply) == REDIS_ERR) { fprintf(stderr, "Error reading replies from server\n"); exit(1); } if (reply) { + last_read_time = time(NULL); if (reply->type == REDIS_REPLY_ERROR) { fprintf(stderr,"%s\n", reply->str); errors++; @@ -7072,7 +7105,7 @@ static void pipeMode(void) { while(1) { /* Transfer current buffer to server. */ if (obuf_len != 0) { - ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len); + ssize_t nwritten = writeConn(context,obuf+obuf_pos,obuf_len); if (nwritten == -1) { if (errno != EAGAIN && errno != EINTR) { @@ -7088,6 +7121,10 @@ static void pipeMode(void) { loop_nwritten += nwritten; if (obuf_len != 0) break; /* Can't accept more data. */ } + if (context->err) { + fprintf(stderr, "Server I/O Error: %s\n", context->errstr); + exit(1); + } /* If buffer is empty, load from stdin. */ if (obuf_len == 0 && !eof) { ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf)); @@ -7138,7 +7175,6 @@ static void pipeMode(void) { break; } } - redisReaderFree(reader); printf("errors: %lld, replies: %lld\n", errors, replies); if (errors) exit(1); @@ -7246,7 +7282,9 @@ static void getKeyTypes(dict *types_dict, redisReply *keys, typeinfo **types) { /* Pipeline TYPE commands */ for(i=0;i<keys->elements;i++) { - redisAppendCommand(context, "TYPE %s", keys->element[i]->str); + const char* argv[] = {"TYPE", keys->element[i]->str}; + size_t lens[] = {4, keys->element[i]->len}; + redisAppendCommandArgv(context, 2, argv, lens); } /* Retrieve types */ @@ -7292,15 +7330,21 @@ static void getKeySizes(redisReply *keys, typeinfo **types, if(!types[i] || (!types[i]->sizecmd && !memkeys)) continue; - if (!memkeys) - redisAppendCommand(context, "%s %s", - types[i]->sizecmd, keys->element[i]->str); - else if (memkeys_samples==0) - redisAppendCommand(context, "%s %s %s", - "MEMORY", "USAGE", keys->element[i]->str); - else - redisAppendCommand(context, "%s %s %s SAMPLES %u", - "MEMORY", "USAGE", keys->element[i]->str, memkeys_samples); + if (!memkeys) { + const char* argv[] = {types[i]->sizecmd, keys->element[i]->str}; + size_t lens[] = {strlen(types[i]->sizecmd), keys->element[i]->len}; + redisAppendCommandArgv(context, 2, argv, lens); + } else if (memkeys_samples==0) { + const char* argv[] = {"MEMORY", "USAGE", keys->element[i]->str}; + size_t lens[] = {6, 5, keys->element[i]->len}; + redisAppendCommandArgv(context, 3, argv, lens); + } else { + sds samplesstr = sdsfromlonglong(memkeys_samples); + const char* argv[] = {"MEMORY", "USAGE", keys->element[i]->str, "SAMPLES", samplesstr}; + size_t lens[] = {6, 5, keys->element[i]->len, 7, sdslen(samplesstr)}; + redisAppendCommandArgv(context, 5, argv, lens); + sdsfree(samplesstr); + } } /* Retrieve sizes */ @@ -7396,20 +7440,20 @@ static void findBigKeys(int memkeys, unsigned memkeys_samples) { sampled++; if(type->biggest<sizes[i]) { - printf( - "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n", - pct, type->name, keys->element[i]->str, sizes[i], - !memkeys? type->sizeunit: "bytes"); - /* Keep track of biggest key name for this type */ if (type->biggest_key) sdsfree(type->biggest_key); - type->biggest_key = sdsnew(keys->element[i]->str); + type->biggest_key = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len); if(!type->biggest_key) { fprintf(stderr, "Failed to allocate memory for key!\n"); exit(1); } + printf( + "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n", + pct, type->name, type->biggest_key, sizes[i], + !memkeys? type->sizeunit: "bytes"); + /* Keep track of the biggest size for this type */ type->biggest = sizes[i]; } @@ -7473,21 +7517,27 @@ static void getKeyFreqs(redisReply *keys, unsigned long long *freqs) { /* Pipeline OBJECT freq commands */ for(i=0;i<keys->elements;i++) { - redisAppendCommand(context, "OBJECT freq %s", keys->element[i]->str); + const char* argv[] = {"OBJECT", "FREQ", keys->element[i]->str}; + size_t lens[] = {6, 4, keys->element[i]->len}; + redisAppendCommandArgv(context, 3, argv, lens); } /* Retrieve freqs */ for(i=0;i<keys->elements;i++) { if(redisGetReply(context, (void**)&reply)!=REDIS_OK) { + sds keyname = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len); fprintf(stderr, "Error getting freq for key '%s' (%d: %s)\n", - keys->element[i]->str, context->err, context->errstr); + keyname, context->err, context->errstr); + sdsfree(keyname); exit(1); } else if(reply->type != REDIS_REPLY_INTEGER) { if(reply->type == REDIS_REPLY_ERROR) { fprintf(stderr, "Error: %s\n", reply->str); exit(1); } else { - fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keys->element[i]->str); + sds keyname = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len); + fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keyname); + sdsfree(keyname); freqs[i] = 0; } } else { @@ -7558,10 +7608,10 @@ static void findHotKeys(void) { memmove(hotkeys,hotkeys+1,sizeof(hotkeys[0])*k); } counters[k] = freqs[i]; - hotkeys[k] = sdsnew(keys->element[i]->str); + hotkeys[k] = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len); printf( "[%05.2f%%] Hot key '%s' found so far with counter %llu\n", - pct, keys->element[i]->str, freqs[i]); + pct, hotkeys[k], freqs[i]); } /* Sleep if we've been directed to do so */ |