summaryrefslogtreecommitdiff
path: root/src/redis-cli.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/redis-cli.c')
-rw-r--r--src/redis-cli.c158
1 files changed, 104 insertions, 54 deletions
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 75845f346..0148964bf 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1989,6 +1989,7 @@ static void repl(void) {
if (argv == NULL) {
printf("Invalid argument(s)\n");
+ fflush(stdout);
linenoiseFree(line);
continue;
} else if (argc > 0) {
@@ -6784,10 +6785,53 @@ void sendCapa() {
sendReplconf("capa", "eof");
}
+/* Wrapper around hiredis to allow arbitrary reads and writes.
+ *
+ * We piggybacks on top of hiredis to achieve transparent TLS support,
+ * and use its internal buffers so it can co-exist with commands
+ * previously/later issued on the connection.
+ *
+ * Interface is close to enough to read()/write() so things should mostly
+ * work transparently.
+ */
+
+/* Write a raw buffer through a redisContext. If we already have something
+ * in the buffer (leftovers from hiredis operations) it will be written
+ * as well.
+ */
+static ssize_t writeConn(redisContext *c, const char *buf, size_t buf_len)
+{
+ int done = 0;
+
+ c->obuf = sdscatlen(c->obuf, buf, buf_len);
+ if (redisBufferWrite(c, &done) == REDIS_ERR) {
+ sdsrange(c->obuf, 0, -(buf_len+1));
+ if (!(c->flags & REDIS_BLOCK))
+ errno = EAGAIN;
+ return -1;
+ }
+
+ size_t left = sdslen(c->obuf);
+ sdsclear(c->obuf);
+ if (!done) {
+ return buf_len - left;
+ }
+
+ return buf_len;
+}
+
+/* Read raw bytes through a redisContext. The read operation is not greedy
+ * and may not fill the buffer entirely.
+ */
+static ssize_t readConn(redisContext *c, char *buf, size_t len)
+{
+ return c->funcs->read(c, buf, len);
+}
+
/* Sends SYNC and reads the number of bytes in the payload. Used both by
* slaveMode() and getRDB().
* returns 0 in case an EOF marker is used. */
-unsigned long long sendSync(int fd, char *out_eof) {
+unsigned long long sendSync(redisContext *c, char *out_eof) {
/* To start we need to send the SYNC command and return the payload.
* The hiredis client lib does not understand this part of the protocol
* and we don't want to mess with its buffers, so everything is performed
@@ -6796,7 +6840,7 @@ unsigned long long sendSync(int fd, char *out_eof) {
ssize_t nread;
/* Send the SYNC command. */
- if (write(fd,"SYNC\r\n",6) != 6) {
+ if (writeConn(c, "SYNC\r\n", 6) != 6) {
fprintf(stderr,"Error writing to master\n");
exit(1);
}
@@ -6804,7 +6848,7 @@ unsigned long long sendSync(int fd, char *out_eof) {
/* Read $<payload>\r\n, making sure to read just up to "\n" */
p = buf;
while(1) {
- nread = read(fd,p,1);
+ nread = readConn(c,p,1);
if (nread <= 0) {
fprintf(stderr,"Error reading bulk length while SYNCing\n");
exit(1);
@@ -6825,11 +6869,10 @@ unsigned long long sendSync(int fd, char *out_eof) {
}
static void slaveMode(void) {
- int fd = context->fd;
static char eofmark[RDB_EOF_MARK_SIZE];
static char lastbytes[RDB_EOF_MARK_SIZE];
static int usemark = 0;
- unsigned long long payload = sendSync(fd, eofmark);
+ unsigned long long payload = sendSync(context,eofmark);
char buf[1024];
int original_output = config.output;
@@ -6849,7 +6892,7 @@ static void slaveMode(void) {
while(payload) {
ssize_t nread;
- nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
+ nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
if (nread <= 0) {
fprintf(stderr,"Error reading RDB payload while SYNCing\n");
exit(1);
@@ -6892,14 +6935,15 @@ static void slaveMode(void) {
/* This function implements --rdb, so it uses the replication protocol in order
* to fetch the RDB file from a remote server. */
static void getRDB(clusterManagerNode *node) {
- int s, fd;
+ int fd;
+ redisContext *s;
char *filename;
if (node != NULL) {
assert(node->context);
- s = node->context->fd;
+ s = node->context;
filename = clusterManagerGetNodeRDBFilename(node);
} else {
- s = context->fd;
+ s = context;
filename = config.rdb_filename;
}
static char eofmark[RDB_EOF_MARK_SIZE];
@@ -6934,7 +6978,7 @@ static void getRDB(clusterManagerNode *node) {
while(payload) {
ssize_t nread, nwritten;
- nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
+ nread = readConn(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
if (nread <= 0) {
fprintf(stderr,"I/O Error reading RDB payload from socket\n");
exit(1);
@@ -6968,7 +7012,7 @@ static void getRDB(clusterManagerNode *node) {
} else {
fprintf(stderr,"Transfer finished with success.\n");
}
- close(s); /* Close the file descriptor ASAP as fsync() may take time. */
+ redisFree(s); /* Close the file descriptor ASAP as fsync() may take time. */
fsync(fd);
close(fd);
fprintf(stderr,"Transfer finished with success.\n");
@@ -6985,11 +7029,9 @@ static void getRDB(clusterManagerNode *node) {
#define PIPEMODE_WRITE_LOOP_MAX_BYTES (128*1024)
static void pipeMode(void) {
- int fd = context->fd;
long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0;
- char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */
+ char obuf[1024*16]; /* Output buffer */
char aneterr[ANET_ERR_LEN];
- redisReader *reader = redisReaderCreate();
redisReply *reply;
int eof = 0; /* True once we consumed all the standard input. */
int done = 0;
@@ -6999,47 +7041,38 @@ static void pipeMode(void) {
srand(time(NULL));
/* Use non blocking I/O. */
- if (anetNonBlock(aneterr,fd) == ANET_ERR) {
+ if (anetNonBlock(aneterr,context->fd) == ANET_ERR) {
fprintf(stderr, "Can't set the socket in non blocking mode: %s\n",
aneterr);
exit(1);
}
+ context->flags &= ~REDIS_BLOCK;
+
/* Transfer raw protocol and read replies from the server at the same
* time. */
while(!done) {
int mask = AE_READABLE;
if (!eof || obuf_len != 0) mask |= AE_WRITABLE;
- mask = aeWait(fd,mask,1000);
+ mask = aeWait(context->fd,mask,1000);
/* Handle the readable state: we can read replies from the server. */
if (mask & AE_READABLE) {
- ssize_t nread;
int read_error = 0;
- /* Read from socket and feed the hiredis reader. */
do {
- nread = read(fd,ibuf,sizeof(ibuf));
- if (nread == -1 && errno != EAGAIN && errno != EINTR) {
- fprintf(stderr, "Error reading from the server: %s\n",
- strerror(errno));
+ if (!read_error && redisBufferRead(context) == REDIS_ERR) {
read_error = 1;
- break;
- }
- if (nread > 0) {
- redisReaderFeed(reader,ibuf,nread);
- last_read_time = time(NULL);
}
- } while(nread > 0);
- /* Consume replies. */
- do {
- if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) {
+ reply = NULL;
+ if (redisGetReply(context, (void **) &reply) == REDIS_ERR) {
fprintf(stderr, "Error reading replies from server\n");
exit(1);
}
if (reply) {
+ last_read_time = time(NULL);
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr,"%s\n", reply->str);
errors++;
@@ -7072,7 +7105,7 @@ static void pipeMode(void) {
while(1) {
/* Transfer current buffer to server. */
if (obuf_len != 0) {
- ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len);
+ ssize_t nwritten = writeConn(context,obuf+obuf_pos,obuf_len);
if (nwritten == -1) {
if (errno != EAGAIN && errno != EINTR) {
@@ -7088,6 +7121,10 @@ static void pipeMode(void) {
loop_nwritten += nwritten;
if (obuf_len != 0) break; /* Can't accept more data. */
}
+ if (context->err) {
+ fprintf(stderr, "Server I/O Error: %s\n", context->errstr);
+ exit(1);
+ }
/* If buffer is empty, load from stdin. */
if (obuf_len == 0 && !eof) {
ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf));
@@ -7138,7 +7175,6 @@ static void pipeMode(void) {
break;
}
}
- redisReaderFree(reader);
printf("errors: %lld, replies: %lld\n", errors, replies);
if (errors)
exit(1);
@@ -7246,7 +7282,9 @@ static void getKeyTypes(dict *types_dict, redisReply *keys, typeinfo **types) {
/* Pipeline TYPE commands */
for(i=0;i<keys->elements;i++) {
- redisAppendCommand(context, "TYPE %s", keys->element[i]->str);
+ const char* argv[] = {"TYPE", keys->element[i]->str};
+ size_t lens[] = {4, keys->element[i]->len};
+ redisAppendCommandArgv(context, 2, argv, lens);
}
/* Retrieve types */
@@ -7292,15 +7330,21 @@ static void getKeySizes(redisReply *keys, typeinfo **types,
if(!types[i] || (!types[i]->sizecmd && !memkeys))
continue;
- if (!memkeys)
- redisAppendCommand(context, "%s %s",
- types[i]->sizecmd, keys->element[i]->str);
- else if (memkeys_samples==0)
- redisAppendCommand(context, "%s %s %s",
- "MEMORY", "USAGE", keys->element[i]->str);
- else
- redisAppendCommand(context, "%s %s %s SAMPLES %u",
- "MEMORY", "USAGE", keys->element[i]->str, memkeys_samples);
+ if (!memkeys) {
+ const char* argv[] = {types[i]->sizecmd, keys->element[i]->str};
+ size_t lens[] = {strlen(types[i]->sizecmd), keys->element[i]->len};
+ redisAppendCommandArgv(context, 2, argv, lens);
+ } else if (memkeys_samples==0) {
+ const char* argv[] = {"MEMORY", "USAGE", keys->element[i]->str};
+ size_t lens[] = {6, 5, keys->element[i]->len};
+ redisAppendCommandArgv(context, 3, argv, lens);
+ } else {
+ sds samplesstr = sdsfromlonglong(memkeys_samples);
+ const char* argv[] = {"MEMORY", "USAGE", keys->element[i]->str, "SAMPLES", samplesstr};
+ size_t lens[] = {6, 5, keys->element[i]->len, 7, sdslen(samplesstr)};
+ redisAppendCommandArgv(context, 5, argv, lens);
+ sdsfree(samplesstr);
+ }
}
/* Retrieve sizes */
@@ -7396,20 +7440,20 @@ static void findBigKeys(int memkeys, unsigned memkeys_samples) {
sampled++;
if(type->biggest<sizes[i]) {
- printf(
- "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n",
- pct, type->name, keys->element[i]->str, sizes[i],
- !memkeys? type->sizeunit: "bytes");
-
/* Keep track of biggest key name for this type */
if (type->biggest_key)
sdsfree(type->biggest_key);
- type->biggest_key = sdsnew(keys->element[i]->str);
+ type->biggest_key = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len);
if(!type->biggest_key) {
fprintf(stderr, "Failed to allocate memory for key!\n");
exit(1);
}
+ printf(
+ "[%05.2f%%] Biggest %-6s found so far '%s' with %llu %s\n",
+ pct, type->name, type->biggest_key, sizes[i],
+ !memkeys? type->sizeunit: "bytes");
+
/* Keep track of the biggest size for this type */
type->biggest = sizes[i];
}
@@ -7473,21 +7517,27 @@ static void getKeyFreqs(redisReply *keys, unsigned long long *freqs) {
/* Pipeline OBJECT freq commands */
for(i=0;i<keys->elements;i++) {
- redisAppendCommand(context, "OBJECT freq %s", keys->element[i]->str);
+ const char* argv[] = {"OBJECT", "FREQ", keys->element[i]->str};
+ size_t lens[] = {6, 4, keys->element[i]->len};
+ redisAppendCommandArgv(context, 3, argv, lens);
}
/* Retrieve freqs */
for(i=0;i<keys->elements;i++) {
if(redisGetReply(context, (void**)&reply)!=REDIS_OK) {
+ sds keyname = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len);
fprintf(stderr, "Error getting freq for key '%s' (%d: %s)\n",
- keys->element[i]->str, context->err, context->errstr);
+ keyname, context->err, context->errstr);
+ sdsfree(keyname);
exit(1);
} else if(reply->type != REDIS_REPLY_INTEGER) {
if(reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "Error: %s\n", reply->str);
exit(1);
} else {
- fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keys->element[i]->str);
+ sds keyname = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len);
+ fprintf(stderr, "Warning: OBJECT freq on '%s' failed (may have been deleted)\n", keyname);
+ sdsfree(keyname);
freqs[i] = 0;
}
} else {
@@ -7558,10 +7608,10 @@ static void findHotKeys(void) {
memmove(hotkeys,hotkeys+1,sizeof(hotkeys[0])*k);
}
counters[k] = freqs[i];
- hotkeys[k] = sdsnew(keys->element[i]->str);
+ hotkeys[k] = sdscatrepr(sdsempty(), keys->element[i]->str, keys->element[i]->len);
printf(
"[%05.2f%%] Hot key '%s' found so far with counter %llu\n",
- pct, keys->element[i]->str, freqs[i]);
+ pct, hotkeys[k], freqs[i]);
}
/* Sleep if we've been directed to do so */