zmap-freebsd/lib/redis.c
2013-08-16 11:12:47 -04:00

262 lines
5.6 KiB
C

/*
* ZMap Redis Helpers Copyright 2013 Regents of the University of Michigan
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*/
#include <string.h>
#include "redis.h"
#include <stdlib.h>
#include <stdio.h>
#include "assert.h"
#include "logger.h"
#include <stdint.h>
#include <hiredis/hiredis.h>
#define REDIS_UNIX_PATH "/tmp/redis.sock"
#define REDIS_TIMEOUT 2
#undef MIN
#define MIN(X,Y) ((X) < (Y) ? (X) : (Y))
static redisContext *rctx;
static redisContext* redis_connect(void)
{
struct timeval timeout;
timeout.tv_sec = REDIS_TIMEOUT;
timeout.tv_usec = 0;
return (redisContext*) redisConnectUnixWithTimeout(REDIS_UNIX_PATH,
timeout);
}
static int chkerr(redisReply *reply)
{
assert(rctx);
if (reply == NULL || reply->type == REDIS_REPLY_ERROR) {
log_error("redis", "an error occurred when "
"retreiving item from redis: %s",
rctx->errstr);
if (reply) {
freeReplyObject(reply);
}
return -1;
}
return 0;
}
int redis_init(void)
{
rctx = redis_connect();
if (!rctx) {
return -1;
}
return 0;
}
int redis_close(void)
{
redisFree(rctx);
return 0;
}
redisContext* redis_get_context(void)
{
return rctx;
}
int redis_flush(void)
{
redisReply *reply = (redisReply*) redisCommand(rctx, "FLUSHDB");
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_existconf(const char *name)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "EXISTS %s", name);
if (chkerr(reply)) {
return -1;
}
int v = reply->integer;
freeReplyObject(reply);
return v;
}
int redis_delconf(const char *name)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "DEL %s", name);
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_setconf(const char *name, char *value)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "SET %s %s",
name, value);
if (chkerr(reply)) {
return -1;
}
freeReplyObject(reply);
return 0;
}
int redis_getconf(const char *name, char *buf, size_t maxlen)
{
assert(rctx);
redisReply *reply = (redisReply*) redisCommand(rctx, "GET %s", name);
if (chkerr(reply)) {
return -1;
}
strncpy(buf, reply->str, maxlen);
freeReplyObject(reply);
return 0;
}
uint32_t redis_getconf_uint32_t(const char *key)
{
assert(rctx);
char buf[50];
redis_getconf(key, buf, 50);
return atoi(buf);
}
int redis_setconf_uint32_t(const char *key, uint32_t value)
{
assert(rctx);
char buf[50];
sprintf(buf, "%u", value);
return redis_setconf(key, buf);
}
static long redis_get_sizeof(const char *cmd, const char *name)
{
assert(rctx);
redisReply *reply;
reply = (redisReply*) redisCommand(rctx, "%s %s", cmd, name);
assert(reply);
assert(reply->type == REDIS_REPLY_INTEGER);
long rtr = reply->integer;
freeReplyObject(reply);
return rtr;
}
long redis_get_sizeof_list(const char *name)
{
return redis_get_sizeof("LLEN", name);
}
long redis_get_sizeof_set(const char *name)
{
return redis_get_sizeof("SCARD", name);
}
int redis_pull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded, const char* cmd)
{
assert(rctx);
long elems_in_redis = redis_get_sizeof_list(redisqueuename);
long num_to_add = MIN(elems_in_redis, maxload);
log_info("redis", "INFO: redis load called on %s. Transfering %li "
"of %li elements to in-memory queue.",
redisqueuename,
num_to_add, elems_in_redis);
for(int i=0; i < num_to_add; i++) {
redisAppendCommand(rctx, "%s %s", cmd, redisqueuename);
}
for(int i=0; i < num_to_add; i++) {
redisReply *reply;
int rc = redisGetReply(rctx, (void**) &reply);
if (rc != REDIS_OK) {
log_fatal("redis", "response from redis != REDIS_OK");
return -1;
}
if (!reply) {
log_fatal("redis", "no reply provided by redis.");
return -1;
}
if (reply->type != REDIS_REPLY_STRING) {
log_fatal("redis",
"unxpected reply type from redis.");
return -1;
}
if ((size_t)reply->len != obj_size) {
log_fatal("redis", "ERROR: unexpected lengthed "
"object provided by redis.\n");
return -1;
}
memcpy((void*)((intptr_t)buf+i*obj_size), reply->str, obj_size);
freeReplyObject(reply);
}
*numloaded = num_to_add;
return 0;
}
int redis_lpull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded)
{
return redis_pull(redisqueuename, buf,
maxload, obj_size, numloaded, "LPOP");
}
int redis_spull(char *redisqueuename, void *buf,
int maxload, size_t obj_size, int *numloaded)
{
return redis_pull(redisqueuename, buf,
maxload, obj_size, numloaded, "SRAND");
}
static int redis_push(char *redisqueuename,
void *buf, int num, size_t len, const char *cmd)
{
assert(rctx);
for (int i=0; i < num; i++) {
void* load = (void*)((intptr_t)buf + i*len);
int rc = redisAppendCommand(rctx, "%s %s %b",
cmd, redisqueuename, load, len);
if (rc != REDIS_OK || rctx->err) {
log_fatal("redis", "%s", rctx->errstr);
return -1;
}
}
redisReply *reply;
for (int i=0; i < num; i++) {
if (redisGetReply(rctx, (void**) &reply) != REDIS_OK
|| rctx->err) {
log_fatal("redis","%s", rctx->errstr);
return -1;
}
if (reply->type == REDIS_REPLY_ERROR) {
log_fatal("redis", "%s", rctx->errstr);
return -1;
}
freeReplyObject(reply);
}
return 0;
}
int redis_lpush(char *redisqueuename,
void *buf, int num, size_t len)
{
return redis_push(redisqueuename, buf, num, len, "RPUSH");
}
int redis_spush(char *redisqueuename,
void *buf, int num, size_t len)
{
return redis_push(redisqueuename, buf, num, len, "SADD");
}