49 KHASH_SET_INIT_INT64(64xx)
56 KHASH_INIT(sixt_int, corsaro_flowtuple_t*,
char, 0,
57 corsaro_flowtuple_hash_func, corsaro_flowtuple_hash_equal);
59 static kh_sixt_map_t *sixt_f = NULL;
60 static kh_sixt_int_t *sixt_v = NULL;
80 #define FIELD_ENABLED 1
82 static char *field_names[] = {
94 static int legacy = 0;
96 static int fields[FIELD_CNT];
98 static int value_field = -1;
136 static int init_corsaro(
char *corsarouri)
141 fprintf(stderr,
"could not alloc corsaro_in\n");
148 fprintf(stderr,
"could not alloc record\n");
156 fprintf(stderr,
"could not start corsaro\n");
165 static int add_inc(
void *h, corsaro_flowtuple_t *t, uint32_t value)
167 kh_sixt_map_t *hash = (kh_sixt_map_t *)h;
170 corsaro_flowtuple_t *new_6t = NULL;
171 kh_64xx_t *val_map = NULL;
173 assert(hash != NULL);
176 assert(value_field != VALUE);
179 if((khiter = kh_get(sixt_map, hash, t)) == kh_end(hash))
182 if((new_6t = malloc(
sizeof(corsaro_flowtuple_t))) == NULL)
184 corsaro_log_file(__func__, NULL,
"malloc failed");
189 memcpy(new_6t, t,
sizeof(corsaro_flowtuple_t));
192 khiter = kh_put(sixt_map, hash, new_6t, &khret);
195 val_map = kh_init(64xx);
198 kh_put(64xx, val_map, value, &khret);
201 kh_value(hash, khiter) = val_map;
206 kh_put(64xx, kh_value(hash, khiter), value, &khret);
211 static void dump_hash_map(kh_sixt_map_t *hash)
214 corsaro_flowtuple_t *key;
217 if(kh_size(hash) > 0)
219 for(k = kh_begin(hash); k != kh_end(hash); ++k)
221 if(kh_exist(hash, k))
223 key = kh_key(hash, k);
225 key->
packet_cnt = htonl(kh_size(kh_val(hash, k)));
228 kh_destroy(64xx, kh_val(hash, k));
235 kh_clear(sixt_map, hash);
238 static void dump_hash_int(kh_sixt_int_t *hash)
241 corsaro_flowtuple_t *key;
244 if(kh_size(hash) > 0)
246 for(k = kh_begin(hash); k != kh_end(hash); ++k)
248 if(kh_exist(hash, k))
250 key = kh_key(hash, k);
259 kh_clear(sixt_int, hash);
262 static void dump_hash()
264 assert(sixt_f || sixt_v);
266 corsaro_io_print_interval_start(&last_dump_end);
270 dump_hash_map(sixt_f);
274 dump_hash_int(sixt_v);
277 corsaro_io_print_interval_end(&last_interval_end);
282 last_interval_end.
number++;
284 last_dump_end.
time = last_interval_end.
time+1;
287 static int process_flowtuple(corsaro_flowtuple_t *tuple)
297 value = ntohl(tuple->
src_ip);
318 value = ntohs(tuple->
ip_len);
324 fprintf(stderr,
"ERROR: invalid value field number\n");
330 for(i = 0; i < FIELD_CNT; i++)
332 if(fields[i] != FIELD_ENABLED)
364 fprintf(stderr,
"ERROR: invalid field number\n");
373 if(value_field == VALUE)
377 fprintf(stderr,
"couldn't increment flowtuple packet_cnt value\n");
383 if(
add_inc(sixt_f, tuple, value) != 0)
385 fprintf(stderr,
"could not add value to map");
393 static void usage(
const char *name)
396 "usage: %s [-l] [-i interval] [-v value_field] -f field [-f field]... file_list\n"
397 " -l treat the input files as containing legacy format data\n"
398 " -i <interval> new distribution interval in seconds. (default: 0)\n"
399 " a value of -1 aggregates to a single interval\n"
400 " a value of 0 uses the original interval\n"
401 " -v <value> field to use as aggregation value (default: packet_cnt)\n"
402 " -f <field> a tuple field to re-aggregate with\n"
404 "Supported field names are:\n"
405 " src_ip, dst_ip, src_port, dst_port, protocol, ttl, tcp_flags, \n"
406 " ip_len, packet_cnt\n",
410 int main(
int argc,
char *argv[])
418 char *flist_name = NULL;
428 corsaro_flowtuple_t *tuple;
430 int wanted_n_fields = 0;
432 while((opt = getopt(argc, argv,
"li:f:v:?")) >= 0)
443 interval = atoi(optarg);
450 for(i = 0; i < FIELD_CNT; i++)
452 if(strcmp(optarg, field_names[i]) == 0)
454 fields[i] = FIELD_ENABLED;
464 fprintf(stderr,
"WARNING: Multiple value fields detected\n"
465 "Last specified will be used\n");
468 for(i = 0; i < FIELD_CNT; i++)
470 if(strcmp(optarg, field_names[i]) == 0)
491 for(i = 0; i < FIELD_CNT; i++)
493 fields[i] = FIELD_ENABLED;
497 if(wanted_n_fields != field_cnt)
499 fprintf(stderr,
"Invalid field name\n");
504 if(optind != argc - 1)
512 fprintf(stderr,
"No value field specified. Defaulting to packet count\n");
517 flist_name = argv[optind];
520 if(strcmp(flist_name,
"-") == 0)
524 else if((flist = fopen(flist_name,
"r")) == NULL)
526 fprintf(stderr,
"failed to open list of input files (%s)\n"
527 "NB: File List MUST be sorted\n", flist_name);
532 if(value_field == VALUE)
534 sixt_v = kh_init(sixt_int);
538 sixt_f = kh_init(sixt_map);
541 while(fgets(file,
sizeof(file), flist) != NULL)
544 file[strlen(file)-1] =
'\0';
546 fprintf(stderr,
"processing %s\n", file);
549 if(init_corsaro(file) != 0)
551 fprintf(stderr,
"failed to init corsaro\n");
557 if(last_interval_end.
time > 0)
559 last_interval_end.
time+=legacy;
570 if(interval_record->
time <= last_dump_end.
time)
572 fprintf(stderr,
"ERROR: decrease in timestamp.\n"
573 "Are the input files sorted properly?\n");
578 if(flowtuple_cnt == 0)
580 last_dump_end.
time = interval_record->
time;
585 if(last_interval_end.
time > 0)
589 last_interval_end.
time-=legacy;
594 else if(interval > 0)
596 while(interval_record->
time >= next_interval)
610 last_interval_end.
time = interval_record->
time;
618 process_flowtuple(tuple);
627 fprintf(stderr,
"corsaro_in_read_record failed to read record\n");
636 if((sixt_f != NULL && kh_size(sixt_f) > 0)
637 || (sixt_v != NULL && kh_size(sixt_v) > 0))
645 kh_destroy(sixt_map, sixt_f);
652 kh_destroy(sixt_int, sixt_v);