From 13ca18479ae01381739b1fbfffde25d6006df904 Mon Sep 17 00:00:00 2001 From: Zakir Durumeric Date: Wed, 9 Oct 2013 15:11:23 -0400 Subject: [PATCH] compiling redis support --- lib/redis.c | 88 ++++++++++++++++++++++++++++--- lib/redis.h | 2 +- src/cyclic.c | 3 +- src/output_modules/module_redis.c | 48 +++++++---------- src/zmap.c | 8 +++ 5 files changed, 110 insertions(+), 39 deletions(-) diff --git a/lib/redis.c b/lib/redis.c index 20bf4e7..aeacf08 100644 --- a/lib/redis.c +++ b/lib/redis.c @@ -6,16 +6,18 @@ * of the License at http://www.apache.org/licenses/LICENSE-2.0 */ -#include #include "redis.h" + +#include #include #include -#include "assert.h" -#include "logger.h" #include +#include + #include -#define REDIS_UNIX_PATH "/tmp/redis.sock" +#include "logger.h" + #define REDIS_TIMEOUT 2 #undef MIN @@ -23,13 +25,83 @@ static redisContext *rctx; -static redisContext* redis_connect(void) +#define T_TCP 0 +#define T_LOCAL 1 + +typedef struct redisconf { + int type; + char *path; + char *server; + uint16_t port; + char *list_name; +} redisconf_t; + +static redisconf_t *redis_parse_connstr(char *connstr) { + redisconf_t *retv = malloc(sizeof(redisconf_t)); + if (strcmp("tcp://", connstr) == 6) { + char *servername = malloc(strlen(connstr)); + assert(servername); + char *list_name = malloc(strlen(connstr)); + assert(list_name); + uint16_t port; + if (scanf(connstr, "tcp://%s:%u/%s", servername, + port, list_name) != 3) { + log_fatal("redis", "unable to parse redis connection string. This " + "should be of the form tcp://server:port/list-name " + "for TCP connections. All fields are required."); + } + retv->type = T_TCP; + retv->server = servername; + retv->port = port; + retv->list_name = list_name; + retv->path = NULL; + } else if (strcmp("local://", connstr) == 8) { + char *path = malloc(strlen(connstr)); + assert(path); + char *list_name = malloc(strlen(connstr)); + assert(list_name); + if (scanf(connstr, "local://%s/%s", path, + list_name) != 3) { + log_fatal("redis", "unable to parse redis connection string. This " + "should be of the form tcp://server:port/list-name " + "for TCP connections. All fields are required."); + } + retv->type = T_LOCAL; + retv->list_name = list_name; + retv->path = path; + retv->server = NULL; + retv->port = 0; + } else { + log_fatal("redis", "unable to parse connection string. does not begin with " + "unix:// or tcp:// as expected"); + } +} + +static redisContext* redis_connect(char *connstr) +{ + redisconf_t *c; + // handle old behavior where we only connected to a specific + // socket that we #defined. + if (!connstr) { + c = malloc(sizeof(redisconf_t)); + assert(c); + c->type = T_LOCAL; + c->path = "/tmp/redis.sock"; + } else { + c = redis_parse_connstr(connstr); + assert(c); + } struct timeval timeout; timeout.tv_sec = REDIS_TIMEOUT; timeout.tv_usec = 0; - return (redisContext*) redisConnectUnixWithTimeout(REDIS_UNIX_PATH, + if (c->type == T_LOCAL) { + return (redisContext*) redisConnectUnixWithTimeout(c->path, timeout); + } else { + return (redisContext*) redisConnectWithTimeout(c->server, + c->port, timeout); + } } static int chkerr(redisReply *reply) @@ -47,9 +119,9 @@ static int chkerr(redisReply *reply) return 0; } -int redis_init(void) +int redis_init(char *connstr) { - rctx = redis_connect(); + rctx = redis_connect(connstr); if (!rctx) { return -1; } diff --git a/lib/redis.h b/lib/redis.h index 229136c..e382375 100644 --- a/lib/redis.h +++ b/lib/redis.h @@ -5,7 +5,7 @@ #ifndef REDIS_ZHELPERS_H #define REDIS_ZHELPERS_H -int redis_init(void); +int redis_init(char*); int redis_close(void); diff --git a/src/cyclic.c b/src/cyclic.c index 9f53690..814a49c 100644 --- a/src/cyclic.c +++ b/src/cyclic.c @@ -169,7 +169,8 @@ int cyclic_init(uint32_t primroot_, uint32_t current_) for (uint32_t i=0; i num_addrs) { cur_group = &groups[i]; - log_debug("cyclic", "using prime %lu, known_primroot %lu", cur_group->prime, cur_group->known_primroot); + log_debug("cyclic", "using prime %lu, known_primroot %lu", + cur_group->prime, cur_group->known_primroot); prime = groups[i].prime; break; } diff --git a/src/output_modules/module_redis.c b/src/output_modules/module_redis.c index a6dc8f8..703ad55 100644 --- a/src/output_modules/module_redis.c +++ b/src/output_modules/module_redis.c @@ -22,55 +22,46 @@ #define UNUSED __attribute__((unused)) -typedef struct scannable_t { - in_addr_t ip_address; - uint8_t source; -} scannable_t; - -#define QUEUE_NAME "zmap_results" #define BUFFER_SIZE 500 #define SOURCE_ZMAP 0 -static scannable_t* buffer; +static uint32_t *buffer; static int buffer_fill = 0; +static char *queue_name = NULL; -int redismodule_init(UNUSED struct state_conf *conf) +static int redismodule_init(struct state_conf *conf, char **fields, int fieldlens) { - buffer = calloc(BUFFER_SIZE, sizeof(scannable_t)); + assert(fieldlens == 1); + buffer = calloc(BUFFER_SIZE, sizeof(uint32_t)); assert(buffer); buffer_fill = 0; - return redis_init(); + return redis_init(conf->output_args); } -int redismodule_flush(void) +static int redismodule_flush(void) { - if (redis_lpush((char *)QUEUE_NAME, buffer, - buffer_fill, sizeof(scannable_t))) { + if (redis_lpush((char *)queue_name, buffer, + buffer_fill, sizeof(uint32_t))) { return EXIT_FAILURE; } buffer_fill = 0; return EXIT_SUCCESS; } -int redismodule_newip(ipaddr_n_t saddr, UNUSED ipaddr_n_t daddr, - UNUSED const char *response_type, int is_repeat, - UNUSED int in_cooldown, UNUSED const u_char *packet, UNUSED size_t len) +static int redismodule_process(fieldset_t *fs) { - if (!is_repeat) { - buffer[buffer_fill].ip_address = saddr; - buffer[buffer_fill].source = SOURCE_ZMAP; - - if (++buffer_fill == BUFFER_SIZE) { - if (redismodule_flush()) { - return EXIT_FAILURE; - } + field_t *f = &(fs->fields[0]); + buffer[buffer_fill] = (uint32_t) f->value.num; + if (++buffer_fill == BUFFER_SIZE) { + if (redismodule_flush()) { + return EXIT_FAILURE; } } return EXIT_SUCCESS; } -int redismodule_close(UNUSED struct state_conf* c, - UNUSED struct state_send* s, +static int redismodule_close(UNUSED struct state_conf* c, + UNUSED struct state_send* s, UNUSED struct state_recv* r) { if (redismodule_flush()) { @@ -83,13 +74,12 @@ int redismodule_close(UNUSED struct state_conf* c, } output_module_t module_redis = { - .name = "redis", + .name = "redis-packed", .init = &redismodule_init, .start = NULL, .update = NULL, .update_interval = 0, .close = &redismodule_close, - .success_ip = &redismodule_newip, - .other_ip = NULL + .process_ip = &redismodule_process }; diff --git a/src/zmap.c b/src/zmap.c index 1356489..dee0b9e 100644 --- a/src/zmap.c +++ b/src/zmap.c @@ -429,6 +429,14 @@ int main(int argc, char *argv[]) "seqnum, acknum, cooldown, " "repeat, timestamp-str"; zconf.filter_duplicates = 0; + } else if (!strcmp(args.output_module_arg, "redis")) { + log_warn("zmap", "the redis output interface has been deprecated and " + "will be removed in the future. Users should " + "either use redis-packed or redis-json in the " + "future."); + zconf.output_module = get_output_module_by_name("redis-packed"); + zconf.raw_output_fields = (char*) "saddr"; + zconf.filter_duplicates = 1; } else { zconf.output_module = get_output_module_by_name(args.output_module_arg); if (!zconf.output_module) {