zmap-freebsd/lib/redis.c
2013-10-09 15:11:23 -04:00

334 lines
7.5 KiB
C

/*
* ZMap Redis Helpers Copyright 2013 Regents of the University of Michigan
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*/
#include "redis.h"
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <assert.h>
#include <hiredis/hiredis.h>
#include "logger.h"
#define REDIS_TIMEOUT 2
#undef MIN
#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
static redisContext *rctx;
#define T_TCP 0
#define T_LOCAL 1
typedef struct redisconf {
int type;
char *path;
char *server;
uint16_t port;
char *list_name;
} redisconf_t;
static redisconf_t *redis_parse_connstr(char *connstr)
{
redisconf_t *retv = malloc(sizeof(redisconf_t));
if (strcmp("tcp://", connstr) == 6) {
char *servername = malloc(strlen(connstr));
assert(servername);
char *list_name = malloc(strlen(connstr));
assert(list_name);
uint16_t port;
if (scanf(connstr, "tcp://%s:%u/%s", servername,
port, list_name) != 3) {
log_fatal("redis", "unable to parse redis connection string. This "
"should be of the form tcp://server:port/list-name "
"for TCP connections. All fields are required.");
}
retv->type = T_TCP;
retv->server = servername;
retv->port = port;
retv->list_name = list_name;
retv->path = NULL;
} else if (strcmp("local://", connstr) == 8) {
char *path = malloc(strlen(connstr));
assert(path);
char *list_name = malloc(strlen(connstr));
assert(list_name);
if (scanf(connstr, "local://%s/%s", path,
list_name) != 3) {
log_fatal("redis", "unable to parse redis connection string. This "
"should be of the form tcp://server:port/list-name "
"for TCP connections. All fields are required.");
}
retv->type = T_LOCAL;
retv->list_name = list_name;
retv->path = path;
retv->server = NULL;
retv->port = 0;
} else {
log_fatal("redis", "unable to parse connection string. does not begin with "
"unix:// or tcp:// as expected");
}
}
static redisContext* redis_connect(char *connstr)
{
redisconf_t *c;
// handle old behavior where we only connected to a specific
// socket that we #defined.
if (!connstr) {
c = malloc(sizeof(redisconf_t));
assert(c);
c->type = T_LOCAL;
c->path = "/tmp/redis.sock";
} else {
c = redis_parse_connstr(connstr);
assert(c);
}
struct timeval timeout;
timeout.tv_sec = REDIS_TIMEOUT;
timeout.tv_usec = 0;
if (c->type == T_LOCAL) {
return (redisContext*) redisConnectUnixWithTimeout(c->path,
timeout);
} else {
return (redisContext*) redisConnectWithTimeout(c->server,
c->port, timeout);
}
}
static int chkerr(redisReply *reply)
{
assert(rctx);
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
log_error("redis", "an error occurred when "
"retreiving item from redis: %s",
rctx->errstr);
if (reply) {
freeReplyObject(reply);
}
return -1;
}
return 0;
}
int redis_init(char *connstr)
{
rctx = redis_connect(connstr);
if (!rctx) {
return -1;
}
return 0;
}
int redis_close(void)
{
redisFree(rctx);
return 0;
}
redisContext* redis_get_context(void)
{
return rctx;
}
int redis_flush(void)
{
redisReply *reply = (redisReply*) redisCommand(rctx, "FLUSHDB");
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_existconf(const char *name)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "EXISTS %s", name);
if (chkerr(reply)) {
return -1;
}
int v = reply->integer;
freeReplyObject(reply);
return v;
}
int redis_delconf(const char *name)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "DEL %s", name);
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_setconf(const char *name, char *value)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "SET %s %s",
name, value);
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_getconf(const char *name, char *buf, size_t maxlen)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "GET %s", name);
if (chkerr(reply)) {
return -1;
}
strncpy(buf, reply->str, maxlen);
freeReplyObject(reply);
return 0;
}
uint32_t redis_getconf_uint32_t(const char *key)
{
assert(rctx);
char buf[50];
redis_getconf(key, buf, 50);
return atoi(buf);
}
int redis_setconf_uint32_t(const char *key, uint32_t value)
{
assert(rctx);
char buf[50];
sprintf(buf, "%u", value);
return redis_setconf(key, buf);
}
static long redis_get_sizeof(const char *cmd, const char *name)
{
assert(rctx);
redisReply *reply;
reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, name);
assert(reply);
assert(reply->type == REDIS_REPLY_INTEGER);
long rtr = reply->integer;
freeReplyObject(reply);
return rtr;
}
long redis_get_sizeof_list(const char *name)
{
return redis_get_sizeof("LLEN", name);
}
long redis_get_sizeof_set(const char *name)
{
return redis_get_sizeof("SCARD", name);
}
int redis_pull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded, const char* cmd)
{
assert(rctx);
long elems_in_redis = redis_get_sizeof_list(redisqueuename);
long num_to_add = MIN(elems_in_redis, maxload);
log_info("redis", "INFO: redis load called on %s. Transfering %li "
"of %li elements to in-memory queue.",
redisqueuename,
num_to_add, elems_in_redis);
for(int i=0; i < num_to_add; i++) {
redisAppendCommand(rctx, "%s %s", cmd, redisqueuename);
}
for(int i=0; i < num_to_add; i++) {
redisReply *reply;
int rc = redisGetReply(rctx, (void**) &reply);
if (rc != REDIS_OK) {
log_fatal("redis", "response from redis != REDIS_OK");
return -1;
}
if (!reply) {
log_fatal("redis", "no reply provided by redis.");
return -1;
}
if (reply->type != REDIS_REPLY_STRING) {
log_fatal("redis",
"unxpected reply type from redis.");
return -1;
}
if ((size_t)reply->len != obj_size) {
log_fatal("redis", "ERROR: unexpected lengthed "
"object provided by redis.\n");
return -1;
}
memcpy((void*)((intptr_t)buf+i*obj_size), reply->str, obj_size);
freeReplyObject(reply);
}
*numloaded = num_to_add;
return 0;
}
int redis_lpull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded)
{
return redis_pull(redisqueuename, buf,
maxload, obj_size, numloaded, "LPOP");
}
int redis_spull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded)
{
return redis_pull(redisqueuename, buf,
maxload, obj_size, numloaded, "SRAND");
}
static int redis_push(char *redisqueuename,
void *buf, int num, size_t len, const char *cmd)
{
assert(rctx);
for (int i=0; i < num; i++) {
void* load = (void*)((intptr_t)buf + i*len);
int rc = redisAppendCommand(rctx, "%s %s %b",
cmd, redisqueuename, load, len);
if (rc != REDIS_OK || rctx->err) {
log_fatal("redis", "%s", rctx->errstr);
return -1;
}
}
redisReply *reply;
for (int i=0; i < num; i++) {
if (redisGetReply(rctx, (void**) &reply) != REDIS_OK
|| rctx->err) {
log_fatal("redis","%s", rctx->errstr);
return -1;
}
if (reply->type == REDIS_REPLY_ERROR) {
log_fatal("redis", "%s", rctx->errstr);
return -1;
}
freeReplyObject(reply);
}
return 0;
}
int redis_lpush(char *redisqueuename,
void *buf, int num, size_t len)
{
return redis_push(redisqueuename, buf, num, len, "RPUSH");
}
int redis_spush(char *redisqueuename,
void *buf, int num, size_t len)
{
return redis_push(redisqueuename, buf, num, len, "SADD");
}