Skip to Content
[CAIDA - Cooperative Association for Internet Data Analysis logo]
The Cooperative Association for Internet Data Analysis
cors-ft-aggregate.c
Go to the documentation of this file.
1 /*
2  * corsaro
3  *
4  * Alistair King, CAIDA, UC San Diego
5  * corsaro-info@caida.org
6  *
7  * Copyright (C) 2012 The Regents of the University of California.
8  *
9  * This file is part of corsaro.
10  *
11  * corsaro is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation, either version 3 of the License, or
14  * (at your option) any later version.
15  *
16  * corsaro is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with corsaro. If not, see <http://www.gnu.org/licenses/>.
23  *
24  */
25 
26 #include <assert.h>
27 #include <getopt.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 
32 #include "libtrace.h"
33 
34 #include "corsaro.h"
35 #include "corsaro_log.h"
36 #include "corsaro_io.h"
37 
38 #include "corsaro_flowtuple.h"
39 
49 KHASH_SET_INIT_INT64(64xx)
50 
51 /*KHASH_INIT(sixt, corsaro_flowtuple_t*, khint32_t, 1,*/
52 KHASH_INIT(sixt_map, corsaro_flowtuple_t*, kh_64xx_t*, 1,
54 
56 KHASH_INIT(sixt_int, corsaro_flowtuple_t*, char, 0,
57  corsaro_flowtuple_hash_func, corsaro_flowtuple_hash_equal);
58 
59 static kh_sixt_map_t *sixt_f = NULL;
60 static kh_sixt_int_t *sixt_v = NULL;
61 
62 static corsaro_in_t *corsaro = NULL;
63 static corsaro_in_record_t *record = NULL;
64 
66 static int interval = 0;
67 
68 #define SRC_IP 0
69 #define DST_IP 1
70 #define SRC_PORT 2
71 #define DST_PORT 3
72 #define PROTO 4
73 #define TTL 5
74 #define TCP_FLAGS 6
75 #define IP_LEN 7
76 #define VALUE 8
77 
78 #define FIELD_CNT 9
79 
80 #define FIELD_ENABLED 1
81 
82 static char *field_names[] = {
83  "src_ip",
84  "dst_ip",
85  "src_port",
86  "dst_port",
87  "protocol",
88  "ttl",
89  "tcp_flags",
90  "ip_len",
91  "packet_cnt",
92 };
93 
94 static int legacy = 0;
95 
96 static int fields[FIELD_CNT];
97 
98 static int value_field = -1;
99 
101 static uint64_t flowtuple_cnt = 0;
102 
107  0,
108  0
109 };
110 
112 static int next_interval = 0;
117  0,
118  0
119 };
120 
121 static void clean()
122 {
123  if(record != NULL)
124  {
125  corsaro_in_free_record(record);
126  record = NULL;
127  }
128 
129  if(corsaro != NULL)
130  {
132  corsaro = NULL;
133  }
134 }
135 
136 static int init_corsaro(char *corsarouri)
137 {
138  /* get an corsaro_in object */
139  if((corsaro = corsaro_alloc_input(corsarouri)) == NULL)
140  {
141  fprintf(stderr, "could not alloc corsaro_in\n");
142  clean();
143  return -1;
144  }
145 
146  /* get a record */
147  if ((record = corsaro_in_alloc_record(corsaro)) == NULL) {
148  fprintf(stderr, "could not alloc record\n");
149  clean();
150  return -1;
151  }
152 
153  /* start corsaro */
154  if(corsaro_start_input(corsaro) != 0)
155  {
156  fprintf(stderr, "could not start corsaro\n");
157  clean();
158  return -1;
159  }
160 
161  return 0;
162 }
163 
165 static int add_inc(void *h, corsaro_flowtuple_t *t, uint32_t value)
166 {
167  kh_sixt_map_t *hash = (kh_sixt_map_t *)h;
168  int khret;
169  khiter_t khiter;
170  corsaro_flowtuple_t *new_6t = NULL;
171  kh_64xx_t *val_map = NULL;
172 
173  assert(hash != NULL);
174 
175  /* this function must not be used to aggregate for packet counts */
176  assert(value_field != VALUE);
177 
178  /* check if this is in the hash already */
179  if((khiter = kh_get(sixt_map, hash, t)) == kh_end(hash))
180  {
181  /* create a new tuple struct */
182  if((new_6t = malloc(sizeof(corsaro_flowtuple_t))) == NULL)
183  {
184  corsaro_log_file(__func__, NULL, "malloc failed");
185  return -1;
186  }
187 
188  /* fill it */
189  memcpy(new_6t, t, sizeof(corsaro_flowtuple_t));
190 
191  /* add it to the hash */
192  khiter = kh_put(sixt_map, hash, new_6t, &khret);
193 
194  /* create a new map for this key */
195  val_map = kh_init(64xx);
196 
197  /* add this value to the map */
198  kh_put(64xx, val_map, value, &khret);
199 
200  /* set the value in the hash to the map */
201  kh_value(hash, khiter) = val_map;
202  }
203  else
204  {
205  /* simply add this value to the map */
206  kh_put(64xx, kh_value(hash, khiter), value, &khret);
207  }
208  return 0;
209 }
210 
211 static void dump_hash_map(kh_sixt_map_t *hash)
212 {
213  khiter_t k;
214  corsaro_flowtuple_t *key;
215 
216  /* dump the hash */
217  if(kh_size(hash) > 0)
218  {
219  for(k = kh_begin(hash); k != kh_end(hash); ++k)
220  {
221  if(kh_exist(hash, k))
222  {
223  key = kh_key(hash, k);
224  /*key->packet_cnt = htonl(kh_val(hash,k));*/
225  key->packet_cnt = htonl(kh_size(kh_val(hash, k)));
227  /* free the map while we still have a pointer to it */
228  kh_destroy(64xx, kh_val(hash, k));
229  }
230  }
231  }
232 
233  /* empty the hash */
234  kh_free(sixt_map, hash, &corsaro_flowtuple_free);
235  kh_clear(sixt_map, hash);
236 }
237 
238 static void dump_hash_int(kh_sixt_int_t *hash)
239 {
240  khiter_t k;
241  corsaro_flowtuple_t *key;
242 
243  /* dump the hash */
244  if(kh_size(hash) > 0)
245  {
246  for(k = kh_begin(hash); k != kh_end(hash); ++k)
247  {
248  if(kh_exist(hash, k))
249  {
250  key = kh_key(hash, k);
251  /*key->packet_cnt = htonl(kh_val(hash,k));*/
253  }
254  }
255  }
256 
257  /* empty the hash */
258  kh_free(sixt_int, hash, &corsaro_flowtuple_free);
259  kh_clear(sixt_int, hash);
260 }
261 
262 static void dump_hash()
263 {
264  assert(sixt_f || sixt_v);
265 
266  corsaro_io_print_interval_start(&last_dump_end);
267 
268  if(sixt_f != NULL)
269  {
270  dump_hash_map(sixt_f);
271  }
272  else
273  {
274  dump_hash_int(sixt_v);
275  }
276 
277  corsaro_io_print_interval_end(&last_interval_end);
278 
279  /* move on to the next interval start */
280  last_dump_end.number++;
281  /* move on to the next interval end */
282  last_interval_end.number++;
283  /* translate from int_end to int_start */
284  last_dump_end.time = last_interval_end.time+1;
285 }
286 
287 static int process_flowtuple(corsaro_flowtuple_t *tuple)
288 {
289  int i;
290 
291  int value;
292 
293  /* work out which field from the tuple we want to use as the value */
294  switch(value_field)
295  {
296  case SRC_IP:
297  value = ntohl(tuple->src_ip);
298  break;
299  case DST_IP:
300  value = ntohl(CORSARO_FLOWTUPLE_SIXT_TO_IP(tuple));
301  break;
302  case SRC_PORT:
303  value = ntohs(tuple->src_port);
304  break;
305  case DST_PORT:
306  value = ntohs(tuple->dst_port);
307  break;
308  case PROTO:
309  value = tuple->protocol;
310  break;
311  case TTL:
312  value = tuple->ttl;
313  break;
314  case TCP_FLAGS:
315  value = tuple->tcp_flags;
316  break;
317  case IP_LEN:
318  value = ntohs(tuple->ip_len);
319  break;
320  case VALUE:
321  value = ntohl(tuple->packet_cnt);
322  break;
323  default:
324  fprintf(stderr, "ERROR: invalid value field number\n");
325  clean();
326  return -1;
327  }
328 
329  /* zero out the fields we do not care about */
330  for(i = 0; i < FIELD_CNT; i++)
331  {
332  if(fields[i] != FIELD_ENABLED)
333  {
334  switch(i)
335  {
336  case SRC_IP:
337  tuple->src_ip = 0;
338  break;
339  case DST_IP:
341  break;
342  case SRC_PORT:
343  tuple->src_port = 0;
344  break;
345  case DST_PORT:
346  tuple->dst_port = 0;
347  break;
348  case PROTO:
349  tuple->protocol = 0;
350  break;
351  case TTL:
352  tuple->ttl = 0;
353  break;
354  case TCP_FLAGS:
355  tuple->tcp_flags = 0;
356  break;
357  case IP_LEN:
358  tuple->ip_len = 0;
359  break;
360  case VALUE:
361  tuple->packet_cnt = 0;
362  break;
363  default:
364  fprintf(stderr, "ERROR: invalid field number\n");
365  clean();
366  return -1;
367  }
368  }
369  }
370 
371  /* check if this stripped down flowtuple is already in the hash,
372  if not, add it.*/
373  if(value_field == VALUE)
374  {
375  if(corsaro_flowtuple_add_inc(sixt_v, tuple, value) != 0)
376  {
377  fprintf(stderr, "couldn't increment flowtuple packet_cnt value\n");
378  return -1;
379  }
380  }
381  else
382  {
383  if(add_inc(sixt_f, tuple, value) != 0)
384  {
385  fprintf(stderr, "could not add value to map");
386  return -1;
387  }
388  }
389 
390  return 0;
391 }
392 
393 static void usage(const char *name)
394 {
395  fprintf(stderr,
396  "usage: %s [-l] [-i interval] [-v value_field] -f field [-f field]... file_list\n"
397  " -l treat the input files as containing legacy format data\n"
398  " -i <interval> new distribution interval in seconds. (default: 0)\n"
399  " a value of -1 aggregates to a single interval\n"
400  " a value of 0 uses the original interval\n"
401  " -v <value> field to use as aggregation value (default: packet_cnt)\n"
402  " -f <field> a tuple field to re-aggregate with\n"
403  "\n"
404  "Supported field names are:\n"
405  " src_ip, dst_ip, src_port, dst_port, protocol, ttl, tcp_flags, \n"
406  " ip_len, packet_cnt\n",
407  name);
408 }
409 
410 int main(int argc, char *argv[])
411 {
412  int opt;
413  int i;
414 
415  int field_cnt = 0;
416 
418  char *flist_name = NULL;
420  FILE *flist = NULL;
422  char file[1024];
423 
425  off_t len = 0;
426 
427  corsaro_interval_t *interval_record;
428  corsaro_flowtuple_t *tuple;
429 
430  int wanted_n_fields = 0;
431 
432  while((opt = getopt(argc, argv, "li:f:v:?")) >= 0)
433  {
434  switch(opt)
435  {
436  case 'l':
437  /* the user has indicated they're giving us old format data
438  which has the interval end at +1 than we currently use */
439  legacy = 1;
440  break;
441 
442  case 'i':
443  interval = atoi(optarg);
444  break;
445 
446  case 'f':
447  wanted_n_fields++;
448  /* figure out what field they have asked for and then set the
449  appropriate field in the bitmap */
450  for(i = 0; i < FIELD_CNT; i++)
451  {
452  if(strcmp(optarg, field_names[i]) == 0)
453  {
454  fields[i] = FIELD_ENABLED;
455  field_cnt++;
456  break;
457  }
458  }
459  break;
460 
461  case 'v':
462  if(value_field >= 0)
463  {
464  fprintf(stderr, "WARNING: Multiple value fields detected\n"
465  "Last specified will be used\n");
466  }
467  /* figure out what value they have asked for */
468  for(i = 0; i < FIELD_CNT; i++)
469  {
470  if(strcmp(optarg, field_names[i]) == 0)
471  {
472  value_field = i;
473  break;
474  }
475  }
476  break;
477 
478  case '?':
479  usage(argv[0]);
480  exit(0);
481  break;
482 
483  default:
484  usage(argv[0]);
485  exit(-1);
486  }
487  }
488 
489  if(field_cnt < 1)
490  {
491  for(i = 0; i < FIELD_CNT; i++)
492  {
493  fields[i] = FIELD_ENABLED;
494  }
495  }
496 
497  if(wanted_n_fields != field_cnt)
498  {
499  fprintf(stderr, "Invalid field name\n");
500  usage(argv[0]);
501  exit(-1);
502  }
503 
504  if(optind != argc - 1)
505  {
506  usage(argv[0]);
507  exit(-1);
508  }
509 
510  if(value_field < 0)
511  {
512  fprintf(stderr, "No value field specified. Defaulting to packet count\n");
513  value_field = VALUE;
514  }
515 
516  /* argv[1] is the list of corsaro files */
517  flist_name = argv[optind];
518 
519  /* read each file in the list */
520  if(strcmp(flist_name, "-") == 0)
521  {
522  flist = stdin;
523  }
524  else if((flist = fopen(flist_name, "r")) == NULL)
525  {
526  fprintf(stderr, "failed to open list of input files (%s)\n"
527  "NB: File List MUST be sorted\n", flist_name);
528  return -1;
529  }
530 
531  /* initialize the hash */
532  if(value_field == VALUE)
533  {
534  sixt_v = kh_init(sixt_int);
535  }
536  else
537  {
538  sixt_f = kh_init(sixt_map);
539  }
540 
541  while(fgets(file, sizeof(file), flist) != NULL)
542  {
543  /* chomp off the newline */
544  file[strlen(file)-1] = '\0';
545 
546  fprintf(stderr, "processing %s\n", file);
547 
548  /* this must be done before corsaro_init_output */
549  if(init_corsaro(file) != 0)
550  {
551  fprintf(stderr, "failed to init corsaro\n");
552  clean();
553  return -1;
554  }
555 
556  /* dirty hack to not -1 on the last interval in the previous file */
557  if(last_interval_end.time > 0)
558  {
559  last_interval_end.time+=legacy;
560  }
561 
562  while ((len = corsaro_in_read_record(corsaro, &type, record)) > 0) {
563  /* we want to know the current time, so we will watch for interval start
564  records */
566  {
567  interval_record = (corsaro_interval_t *)
569 
570  if(interval_record->time <= last_dump_end.time)
571  {
572  fprintf(stderr, "ERROR: decrease in timestamp.\n"
573  "Are the input files sorted properly?\n");
574  clean();
575  return -1;
576  }
577 
578  if(flowtuple_cnt == 0)
579  {
580  last_dump_end.time = interval_record->time;
581  next_interval = interval_record->time + interval;
582  }
583 
584  /* an interval of 0 means dump at the source interval */
585  if(last_interval_end.time > 0)
586  {
587  /* this was a non-end interval, if it is legacy, subtract
588  one from the last_interval_end time */
589  last_interval_end.time-=legacy;
590  if(interval == 0)
591  {
592  dump_hash();
593  }
594  else if(interval > 0)
595  {
596  while(interval_record->time >= next_interval)
597  {
598  dump_hash();
599  next_interval += interval;
600  }
601  }
602  /* else, if interval < 0, only dump at the end */
603  }
604  }
606  {
607  interval_record = (corsaro_interval_t *)
609 
610  last_interval_end.time = interval_record->time;
611 
612  }
614  {
615  tuple = (corsaro_flowtuple_t *)corsaro_in_get_record_data(corsaro, record);
616  flowtuple_cnt++;
617 
618  process_flowtuple(tuple);
619  }
620 
621  /* reset the type to NULL to indicate we don't care */
623  }
624 
625  if(len < 0)
626  {
627  fprintf(stderr, "corsaro_in_read_record failed to read record\n");
628  clean();
629  return -1;
630  }
631 
632  clean();
633  }
634 
635  /* dump again if the hash is not empty */
636  if((sixt_f != NULL && kh_size(sixt_f) > 0)
637  || (sixt_v != NULL && kh_size(sixt_v) > 0))
638  {
639  dump_hash();
640  }
641 
642  if(sixt_f != NULL)
643  {
644  kh_free(sixt_map, sixt_f, &corsaro_flowtuple_free);
645  kh_destroy(sixt_map, sixt_f);
646  sixt_f = NULL;
647  }
648 
649  if(sixt_v != NULL)
650  {
651  kh_free(sixt_int, sixt_v, &corsaro_flowtuple_free);
652  kh_destroy(sixt_int, sixt_v);
653  sixt_v = NULL;
654  }
655 
656  fclose(flist);
657  return 0;
658 }