Skip to Content
[CAIDA - Cooperative Association for Internet Data Analysis logo]
The Cooperative Association for Internet Data Analysis
corsaro.c
Go to the documentation of this file.
1 /*
2  * corsaro
3  *
4  * Alistair King, CAIDA, UC San Diego
5  * corsaro-info@caida.org
6  *
7  * Copyright (C) 2012 The Regents of the University of California.
8  *
9  * This file is part of corsaro.
10  *
11  * corsaro is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation, either version 3 of the License, or
14  * (at your option) any later version.
15  *
16  * corsaro is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with corsaro. If not, see <http://www.gnu.org/licenses/>.
23  *
24  */
25 
26 #include "config.h"
27 #include "corsaro_int.h"
28 
29 #include <assert.h>
30 #include <inttypes.h>
31 #include <stdlib.h>
32 #include <string.h>
33 
34 #include "libtrace.h"
35 
36 #include "corsaro_file.h"
37 #include "corsaro_io.h"
38 #include "corsaro_log.h"
39 #include "utils.h"
40 
49 static corsaro_packet_t *corsaro_packet_alloc(corsaro_t *corsaro)
50 {
51  corsaro_packet_t *pkt;
52 
53  if((pkt = malloc_zero(sizeof(corsaro_packet_t))) == NULL)
54  {
55  corsaro_log(__func__, corsaro, "could not malloc corsaro_packet");
56  return NULL;
57  }
58  /* this corsaro packet still needs a libtrace packet to be loaded... */
59  return pkt;
60 }
61 
62 static void corsaro_packet_state_reset(corsaro_packet_t *packet)
63 {
64  assert(packet != NULL);
65  packet->state.flags = 0;
66 }
67 
68 static void corsaro_packet_free(corsaro_packet_t *packet)
69 {
70  /* we will assume that somebody else is taking care of the libtrace packet */
71  if(packet != NULL)
72  {
73  free(packet);
74  }
75 }
76 
77 static void corsaro_free(corsaro_t *corsaro)
78 {
79  corsaro_plugin_t *p = NULL;
80 
81  if(corsaro == NULL)
82  {
83  /* nothing to be done... */
84  return;
85  }
86 
87  if(corsaro->uridata != NULL)
88  {
89  free(corsaro->uridata);
90  corsaro->uridata = NULL;
91  }
92 
93  if(corsaro->monitorname != NULL)
94  {
95  free(corsaro->monitorname);
96  corsaro->monitorname = NULL;
97  }
98 
99  if(corsaro->template != NULL)
100  {
101  free(corsaro->template);
102  corsaro->template = NULL;
103  }
104 
105  if(corsaro->global_file != NULL)
106  {
107  corsaro_file_close(corsaro, corsaro->global_file);
108  corsaro->global_file = NULL;
109  }
110 
111  corsaro_log_close(corsaro);
112 
113  if(corsaro->packet != NULL)
114  {
115  corsaro_packet_free(corsaro->packet);
116  corsaro->packet = NULL;
117  }
118 
119  if(corsaro->plugin_manager != NULL)
120  {
121  while((p = corsaro_plugin_next(corsaro->plugin_manager, p)) != NULL)
122  {
123  p->close_output(corsaro);
124  }
125 
126  corsaro_plugin_manager_free(corsaro->plugin_manager);
127  corsaro->plugin_manager = NULL;
128  }
129 
130  free(corsaro);
131 
132  return;
133 }
134 
135 static void populate_interval(corsaro_interval_t *interval, uint32_t number,
136  uint32_t time)
137 {
138  interval->corsaro_magic = CORSARO_MAGIC;
139  interval->magic = CORSARO_MAGIC_INTERVAL;
140  interval->number = number;
141  interval->time = time;
142 }
143 
144 static corsaro_t *corsaro_init(char *template, corsaro_file_mode_t mode)
145 {
146  corsaro_t *e;
147 
148  if((e = malloc_zero(sizeof(corsaro_t))) == NULL)
149  {
150  corsaro_log(__func__, NULL, "could not malloc corsaro_t");
151  return NULL;
152  }
153 
154  /* what time is it? */
155  gettimeofday_wrap(&e->init_time);
156 
157  /* uridata doesn't *need* to be set */
158 
159  /* monitorname also doesn't need to be set - we have a default from config */
160 
161  /* template does, however */
162  /* check that it is valid */
163  if(corsaro_io_validate_template(e, template) == 0)
164  {
165  corsaro_log(__func__, e, "invalid template %s", template);
166  goto err;
167  }
168  if((e->template = strdup(template)) == NULL)
169  {
170  corsaro_log(__func__, e,
171  "could not duplicate template string (no memory?)");
172  goto err;
173  }
174 
175  /* as does the mode */
176  e->output_mode = mode;
177 
178  /* check if compression should be used based on the file name */
179  e->compress = corsaro_file_detect_compression(e, e->template);
180 
181  /* use the default compression level for now */
183 
184  /* we have the template and file details, now we can open the global file */
185  if((e->global_file = corsaro_io_prepare_file(e, CORSARO_IO_GLOBAL_NAME)) == NULL)
186  {
187  corsaro_log(__func__, e, "could not open global output file");
188  goto err;
189  }
190 
191  /* initialize the log file */
192  if(corsaro_log_init(e) != 0)
193  {
194  corsaro_log(__func__, e, "could not initialize log file");
195  goto err;
196  }
197 
198  /* lets get us a wrapper packet ready */
199  if((e->packet = corsaro_packet_alloc(e)) == NULL)
200  {
201  corsaro_log(__func__, e, "could not create corsaro packet");
202  goto err;
203  }
204 
205  /* ask the plugin manager to get us some plugins */
206  /* this will init all compiled plugins but not start them, this gives
207  us a chance to wait for the user to choose a subset to enable
208  with corsaro_enable_plugin and then we'll re-init */
209  if((e->plugin_manager = corsaro_plugin_manager_init(e->logfile)) == NULL)
210  {
211  corsaro_log(__func__, e, "could not initialize plugin manager");
212  goto err;
213  }
214 
215  /* interval doesn't need to be actively set, use the default for now */
217 
218  /* initialize the current interval */
219  populate_interval(&e->interval_start, 0, 0);
220 
221  /* the rest are zero, as they should be. */
222 
223  /* ready to rock and roll! */
224 
225  return e;
226 
227  err:
228  corsaro_free(e);
229  return NULL;
230 }
231 
232 static int start_interval(corsaro_t *corsaro, struct timeval int_start)
233 {
234  corsaro_plugin_t *tmp = NULL;
235 
236  /* record this so we know when the interval started */
237  /* the interval number is already incremented by end_interval */
238  corsaro->interval_start.time = int_start.tv_sec;
239 
240  /* ask each plugin to start a new interval */
241  while((tmp = corsaro_plugin_next(corsaro->plugin_manager, tmp)) != NULL)
242  {
243  if(tmp->start_interval(corsaro, &corsaro->interval_start) != 0)
244  {
245  corsaro_log(__func__, corsaro, "%s failed to start interval at %ld",
246  tmp->name, int_start.tv_sec);
247  return -1;
248  }
249  }
250  return 0;
251 }
252 
253 
254 static int end_interval(corsaro_t *corsaro, struct timeval int_end)
255 {
256  corsaro_plugin_t *tmp = NULL;
257 
258  corsaro_interval_t interval_end;
259 
260  populate_interval(&interval_end, corsaro->interval_start.number,
261  int_end.tv_sec);
262 
263  /* write the global interval start header */
264  if(corsaro_io_write_interval_start(corsaro, corsaro->global_file,
265  &corsaro->interval_start) <= 0)
266  {
267  corsaro_log(__func__, corsaro,
268  "could not write global interval start headers at %ld",
269  corsaro->interval_start.time);
270  return -1;
271  }
272  /* ask each plugin to end the current interval */
273  while((tmp = corsaro_plugin_next(corsaro->plugin_manager, tmp)) != NULL)
274  {
275  if(tmp->end_interval(corsaro, &interval_end) != 0)
276  {
277  corsaro_log(__func__, corsaro, "%s failed to end interval at %ld",
278  tmp->name, int_end.tv_sec);
279  return -1;
280  }
281  }
282  /* write the global interval end header */
283  if(corsaro_io_write_interval_end(corsaro, corsaro->global_file,
284  &interval_end) <= 0)
285  {
286  corsaro_log(__func__, corsaro,
287  "could not write global interval end headers at %ld",
288  interval_end.time);
289  return -1;
290  }
291  return 0;
292 }
293 
294 static void corsaro_in_free(corsaro_in_t *corsaro)
295 {
296  if(corsaro == NULL)
297  {
298  /* nothing to be done */
299 
300  corsaro_log_in(__func__, corsaro,
301  "WARNING: corsaro_in_free called on NULL object; "
302  "this could indicate a double-free");
303  return;
304  }
305 
306  /* free the uridata */
307  free(corsaro->uridata);
308  corsaro->uridata = NULL;
309 
310  /* close the plugin */
311  if(corsaro->plugin != NULL)
312  {
313  corsaro->plugin->close_input(corsaro);
314  }
315  /*
316  while((p = corsaro_plugin_next(corsaro->plugin_manager, p)) != NULL)
317  {
318  p->close_input(corsaro);
319  }
320  */
321 
322  /* free the plugin manager */
323  if(corsaro->plugin_manager != NULL)
324  {
325  corsaro_plugin_manager_free(corsaro->plugin_manager);
326  corsaro->plugin_manager = NULL;
327  }
328 
329  /* close the input file */
330  if(corsaro->file != NULL)
331  {
332  corsaro_file_rclose(corsaro, corsaro->file);
333  corsaro->file = NULL;
334  }
335 
336  corsaro->started = 0;
337 
338  free(corsaro);
339 }
340 
341 static corsaro_in_t *corsaro_in_init(char *corsarouri)
342 {
343  corsaro_in_t *e;
344 
345  if((e = malloc_zero(sizeof(corsaro_in_t))) == NULL)
346  {
347  corsaro_log_in(__func__, NULL, "could not malloc corsaro_t");
348  return NULL;
349  }
350 
351  if((e->uridata = strdup(corsarouri)) == NULL)
352  {
353  corsaro_log_in(__func__, e,
354  "could not duplicate uri string (no memory?)");
355  goto err;
356  }
357 
358  /* set to null until we know if this is a global file or a plugin */
360 
361  /* initialize the logging */
362  if(corsaro_log_in_init(e) != 0)
363  {
364  corsaro_log_in(__func__, e, "could not initialize log file");
365  goto err;
366  }
367 
368  /* ask the plugin manager to get us some plugins */
369  if((e->plugin_manager = corsaro_plugin_manager_init(NULL)) == 0)
370  {
371  corsaro_log_in(__func__, e, "could not initialize plugins");
372  goto err;
373  }
374 
375  /* do not init plugins here, we will init only the one needed */
376 
377  /* delay opening the input file until we 'start' */
378 
379  return e;
380 
381  err:
382  corsaro_in_free(e);
383  return NULL;
384 }
385 
386 static int process_packet(corsaro_t *corsaro, corsaro_packet_t *packet)
387 {
388  corsaro_plugin_t *tmp = NULL;
389  while((tmp = corsaro_plugin_next(corsaro->plugin_manager, tmp)) != NULL)
390  {
391  if(tmp->process_packet(corsaro, packet) < 0)
392  {
393  corsaro_log(__func__, corsaro, "%s failed to process packet",
394  tmp->name);
395  return -1;
396  }
397  }
398  return 0;
399 }
400 
401 static int check_global_filename(char *fname)
402 {
403  if(strstr(fname, CORSARO_IO_GLOBAL_NAME) != NULL)
404  {
405  return 1;
406  }
407  return 0;
408 }
409 
410 static int check_global_magic(corsaro_in_t *corsaro, corsaro_file_in_t *file)
411 {
412  char buffer[1024];
413  int len;
414 
415  len = corsaro_file_rpeek(corsaro, file, buffer, sizeof(buffer));
416 
417  /* an corsaro global file will have 'EDGRHEAD' as the first 8 bytes */
418  if(len < 8)
419  {
420  return 0;
421  }
422 
423  /* lets make this easy and just do a string compare */
424  buffer[8] = '\0';
425 
426  if(strncmp(&buffer[0], "EDGRHEAD", 8) == 0)
427  {
428  return 1;
429  }
430  return 0;
431 }
432 
433 static int is_plugin_data_or_interval(corsaro_in_t *corsaro)
434 {
435  char buffer[1024];
436  int len;
437 
438  /* we need to peek and see if there is any plugin data */
439  len = corsaro_file_rpeek(corsaro, corsaro->file, buffer,
440  sizeof(buffer));
441  if(len < sizeof(corsaro_plugin_data_t)
442  && len < sizeof(corsaro_interval_t))
443  {
444  corsaro_log_in(__func__, corsaro,
445  "invalid corsaro global file");
446  return -1;
447  }
448 
449  /* a plugin data record will have 'EDGRDATA' */
450  /* an interval start record will have 'EDGRINTR' */
451  /* either way, use strncmp */
452  buffer[8] = '\0';
453  if(strncmp(buffer, "EDGRDATA", 8) == 0)
454  {
455  return 1;
456  }
457  else if(strncmp(buffer, "EDGRINTR", 8) == 0)
458  {
459  return 0;
460 
461  }
462  else
463  {
464  return -1;
465  }
466 }
467 
468 static int is_trailer_or_interval(corsaro_in_t *corsaro)
469 {
470  char buffer[1024];
471  int len;
472 
473  /* we need to peek and see if there is any plugin data */
474  len = corsaro_file_rpeek(corsaro, corsaro->file, buffer,
475  sizeof(buffer));
476  if(len < sizeof(corsaro_trailer_t)
477  && len < sizeof(corsaro_interval_t))
478  {
479  corsaro_log_in(__func__, corsaro,
480  "invalid corsaro global file");
481  return -1;
482  }
483 
484  /* a plugin data record will have 'EDGRFOOT' */
485  /* an interval start record will have 'EDGRINTR' */
486  /* either way, use strncmp */
487  buffer[8] = '\0';
488  if(strncmp(buffer, "EDGRFOOT", 8) == 0)
489  {
490  return 1;
491  }
492  else if(strncmp(buffer, "EDGRINTR", 8) == 0)
493  {
494  return 0;
495  }
496  else
497  {
498  return -1;
499  }
500 }
501 
502 static off_t read_record(corsaro_in_t *corsaro,
503  corsaro_in_record_type_t *record_type,
504  corsaro_in_record_t *record)
505 {
506  off_t bytes_read = -1;
507  int rc;
508 
509  /* this code is adapted from corsaro_flowtuple.c */
510  switch(corsaro->expected_type)
511  {
513  /* ask the io subsystem to read it for us */
514  bytes_read = corsaro_io_read_header(corsaro, corsaro->file, record_type,
515  record);
516  if(bytes_read > 0)
517  {
519  }
520  break;
521 
523  /* ask the io subsystem to read it for us */
524  bytes_read = corsaro_io_read_interval_start(corsaro, corsaro->file,
525  record_type, record);
526  if(bytes_read == sizeof(corsaro_interval_t))
527  {
528  rc = is_plugin_data_or_interval(corsaro);
529  if(rc == 1)
530  {
532  }
533  else if(rc == 0)
534  {
536  }
537  else
538  {
539  *record_type = CORSARO_IN_RECORD_TYPE_NULL;
540  }
541  }
542  break;
543 
545  /* ask the io subsystem to read it for us */
546  bytes_read = corsaro_io_read_plugin_start(corsaro, corsaro->file,
547  record_type, record);
548  if(bytes_read == sizeof(corsaro_plugin_data_t))
549  {
550  /* which plugin is this? */
551  if((corsaro->plugin = corsaro_plugin_get_by_id(
552  corsaro->plugin_manager,
553  ((corsaro_plugin_data_t*)record->buffer)->plugin_id)
554  )
555  == NULL)
556  {
557  corsaro_log_in(__func__, corsaro, "invalid plugin id detected");
558  corsaro_log_in(__func__, corsaro, "is corsaro built with all "
559  "necessary plugins?");
560  *record_type = CORSARO_IN_RECORD_TYPE_NULL;
561  }
562  else
563  {
564  /* we'll pass these over to the plugin */
566  }
567  }
568  else
569  {
570  corsaro_log_in(__func__, corsaro,
571  "failed to read plugin data start");
572  *record_type = CORSARO_IN_RECORD_TYPE_NULL;
573  }
574  break;
575 
577  /* pass this over to the plugin */
578  assert(corsaro->plugin != NULL);
579  bytes_read = corsaro->plugin->read_global_data_record(corsaro, record_type, record);
580 
581  if(bytes_read >= 0)
582  {
584  }
585  break;
586 
588  /* ask the io subsystem to read it for us */
589  bytes_read = corsaro_io_read_plugin_end(corsaro, corsaro->file,
590  record_type, record);
591  if(bytes_read == sizeof(corsaro_plugin_data_t))
592  {
593  /* check if there is more plugin data */
594  rc = is_plugin_data_or_interval(corsaro);
595  if(rc == 1)
596  {
598  }
599  else if(rc == 0)
600  {
602  }
603  else
604  {
605  *record_type = CORSARO_IN_RECORD_TYPE_NULL;
606  }
607  }
608  break;
609 
611  /* ask the io subsystem to read it for us */
612  bytes_read = corsaro_io_read_interval_end(corsaro, corsaro->file,
613  record_type, record);
614  if(bytes_read == sizeof(corsaro_interval_t))
615  {
616  /* check for an interval start, or for a trailer */
617  rc = is_trailer_or_interval(corsaro);
618  if(rc == 0)
619  {
621  }
622  else if(rc == 1)
623  {
625  }
626  else
627  {
628  *record_type = CORSARO_IN_RECORD_TYPE_NULL;
629  }
630  }
631  break;
632 
634  /* ask the io subsystem to read it for us */
635  bytes_read = corsaro_io_read_trailer(corsaro, corsaro->file, record_type,
636  record);
637  if(bytes_read == sizeof(corsaro_trailer_t))
638  {
640  }
641  break;
642 
643  default:
644  corsaro_log_in(__func__, corsaro, "invalid expected record type");
645  }
646 
647  return bytes_read;
648 }
649 
650 /* == PUBLIC CORSARO API FUNCS BELOW HERE == */
651 
653 {
654  corsaro_t *corsaro;
655 
656  /* quick sanity check that folks aren't trying to write to stdout */
657  if(template == NULL || strcmp(template, "-") == 0)
658  {
659  corsaro_log(__func__, NULL, "writing to stdout not supported");
660  return NULL;
661  }
662 
663  /* initialize the corsaro object */
664  if((corsaro = corsaro_init(template, mode)) == NULL)
665  {
666  corsaro_log(__func__, NULL, "could not initialize corsaro object");
667  return NULL;
668  }
669 
670  return corsaro;
671 }
672 
674 {
675  corsaro_plugin_t *p = NULL;
676 
677  assert(corsaro != NULL);
678 
679  /* ask the plugin manager to start up */
680  /* this allows it to remove disabled plugins */
681  if(corsaro_plugin_manager_start(corsaro->plugin_manager) != 0)
682  {
683  corsaro_log(__func__, corsaro, "could not start plugin manager");
684  corsaro_free(corsaro);
685  return -1;
686  }
687 
688  /* now, ask each plugin to open its output file */
689  /* we need to do this here so that the corsaro object is fully set up
690  that is, the traceuri etc is set */
691  while((p = corsaro_plugin_next(corsaro->plugin_manager, p)) != NULL)
692  {
693  if(p->init_output(corsaro) != 0)
694  {
695  corsaro_log(__func__, corsaro, "plugin could not init output");
696  corsaro_free(corsaro);
697  return -1;
698  }
699  }
700 
701  /* write headers to the global file */
702  if(corsaro_io_write_header(corsaro, corsaro->global_file, NULL) <= 0)
703  {
704  corsaro_log(__func__, corsaro, "could not write global headers");
705  corsaro_free(corsaro);
706  return -1;
707  }
708 
709  corsaro->started = 1;
710 
711  return 0;
712 }
713 
714 void corsaro_set_interval(corsaro_t *corsaro, int i)
715 {
716  assert(corsaro != NULL);
717 
718  /* you can't set the interval once corsaro has been started */
719  assert(corsaro->started == 0);
720 
721  corsaro->interval = i;
722 }
723 
724 int corsaro_set_traceuri(corsaro_t *corsaro, char *uri)
725 {
726  assert(corsaro != NULL);
727 
728  if(corsaro->started != 0)
729  {
730  corsaro_log(__func__, corsaro,
731  "trace uri can only be set before "
732  "corsaro_start_output is called");
733  return -1;
734  }
735 
736  if(corsaro->uridata != NULL)
737  {
738  corsaro_log(__func__, corsaro, "WARNING: updating trace uri from %s to %s",
739  corsaro->uridata, uri);
740  }
741  if((corsaro->uridata = strdup(uri)) == NULL)
742  {
743  corsaro_log(__func__, corsaro,
744  "could not duplicate uri string (no memory?)");
745  return -1;
746  }
747  corsaro_log(__func__, corsaro, "%s", corsaro->uridata);
748  return 0;
749 }
750 
751 int corsaro_enable_plugin(corsaro_t *corsaro, const char *plugin_name)
752 {
753  assert(corsaro != NULL);
754  assert(corsaro->plugin_manager != NULL);
755 
756  if(corsaro->started != 0)
757  {
758  corsaro_log(__func__, corsaro,
759  "trace uri can only be set before "
760  "corsaro_start_output is called");
761  return -1;
762  }
763 
764  return corsaro_plugin_enable_plugin(corsaro->plugin_manager, plugin_name);
765 
766 }
767 
768 const char *corsaro_get_traceuri(corsaro_t *corsaro)
769 {
770  return corsaro->uridata;
771 }
772 
773 int corsaro_set_monitorname(corsaro_t *corsaro, char *name)
774 {
775  assert(corsaro != NULL);
776 
777  if(corsaro->started != 0)
778  {
779  corsaro_log(__func__, corsaro,
780  "monitor name can only be set before "
781  "corsaro_start_output is called");
782  return -1;
783  }
784 
785  if(corsaro->monitorname != NULL)
786  {
787  corsaro_log(__func__, corsaro,
788  "WARNING: updating monitor name from %s to %s",
789  corsaro->monitorname, name);
790  }
791 
792  if((corsaro->monitorname = strdup(name)) == NULL)
793  {
794  corsaro_log(__func__, corsaro,
795  "could not duplicate monitor name string (no memory?)");
796  return -1;
797  }
798  corsaro_log(__func__, corsaro, "%s", corsaro->monitorname);
799  return 0;
800 }
801 
802 const char *corsaro_get_monitorname(corsaro_t *corsaro)
803 {
804  if(corsaro->monitorname != NULL)
805  {
806  return corsaro->monitorname;
807  }
808  else
809  {
810  return STR(CORSARO_MONITOR_NAME);
811  }
812 }
813 
814 int corsaro_per_packet(corsaro_t *corsaro, libtrace_packet_t *ltpacket)
815 {
816  struct timeval ts;
817  struct timeval report;
818 
819  assert(corsaro != NULL);
820 
821  if(corsaro->started != 1)
822  {
823  corsaro_log(__func__, corsaro, "corsaro_start_output must be called before"
824  "packets can be processed");
825  return -1;
826  }
827 
828  /* poke this ltpacket into our corsaro packet */
829  corsaro->packet->ltpacket = ltpacket;
830 
831  /* ensure that the state is clear */
832  corsaro_packet_state_reset(corsaro->packet);
833 
834  /* this is now the latest packet we have seen */
835  corsaro->last_ts = ts = trace_get_timeval(ltpacket);
836 
837  /* if this is the first packet we record, keep the timestamp */
838  if(corsaro->packet_cnt == 0)
839  {
840  corsaro->first_ts = ts;
841  if(start_interval(corsaro, ts) != 0)
842  {
843  corsaro_log(__func__, corsaro, "could not start interval at %ld",
844  ts.tv_sec);
845  return -1;
846  }
847  corsaro->next_report = ts.tv_sec + corsaro->interval;
848  }
849 
850  /* using an interval value of less than zero disables intervals
851  such that only one distribution will be generated upon completion */
852  while(corsaro->interval >= 0 && (uint32_t)ts.tv_sec >= corsaro->next_report)
853  {
854  /* we want to mark the end of the interval such that all pkt times are <=
855  the time of the end of the interval.
856  because we deal in second granularity, we simply subtract one from the
857  time */
858  report.tv_sec = corsaro->next_report-1;
859 
860  if(end_interval(corsaro, report) != 0)
861  {
862  corsaro_log(__func__, corsaro, "could not end interval at %ld",
863  ts.tv_sec);
864  /* we don't free in case the client wants to try to carry on */
865  return -1;
866  }
867 
868  /* we now add the second back on to the time to get the start time */
869  report.tv_sec = corsaro->next_report;
870  if(start_interval(corsaro, report) != 0)
871  {
872  corsaro_log(__func__, corsaro, "could not start interval at %ld",
873  ts.tv_sec);
874  /* we don't free in case the client wants to try to carry on */
875  return -1;
876  }
877  corsaro->interval_start.number++;
878  corsaro->next_report += corsaro->interval;
879  }
880 
881  /* count this packet for our overall packet count */
882  corsaro->packet_cnt++;
883 
884  /* ask each plugin to process this packet */
885  return process_packet(corsaro, corsaro->packet);
886 }
887 
889 {
890  if(corsaro->started != 0)
891  {
892  if(end_interval(corsaro, corsaro->last_ts) != 0)
893  {
894  corsaro_log(__func__, corsaro, "could not end interval at %ld",
895  corsaro->last_ts.tv_sec);
896  corsaro_free(corsaro);
897  return -1;
898  }
899 
900  /* write headers to the global file */
901  if(corsaro_io_write_trailer(corsaro, corsaro->global_file, NULL) <= 0)
902  {
903  corsaro_log(__func__, corsaro, "could not write global trailers");
904  corsaro_free(corsaro);
905  return -1;
906  }
907  }
908 
909  corsaro_free(corsaro);
910  return 0;
911 }
912 
913 /* ===== corsaro_in API functions ===== */
914 
916 {
917  corsaro_in_t *corsaro;
918 
919  /* initialize the corsaro object */
920  if((corsaro = corsaro_in_init(corsarouri)) == NULL)
921  {
922  corsaro_log_in(__func__, NULL, "could not initialize corsaro_in object");
923  return NULL;
924  }
925 
926  return corsaro;
927 }
928 
930 {
931  corsaro_plugin_t *p = NULL;
932 
933  assert(corsaro != NULL);
934  assert(corsaro->started == 0);
935  assert(corsaro->plugin == NULL);
936 
937  /* open the file! */
938  if((corsaro->file = corsaro_file_ropen(corsaro, corsaro->uridata)) == NULL)
939  {
940  corsaro_log_in(__func__, corsaro, "could not open input file %s",
941  corsaro->uridata);
942  /* ak comments the following, leave it up to the caller to free
943  the state object */
944  /*corsaro_in_free(corsaro);*/
945  return -1;
946  }
947 
948  /* determine the plugin which created this file */
949  while((p = corsaro_plugin_next(corsaro->plugin_manager, p)) != NULL &&
950  corsaro->plugin == NULL)
951  {
952  if(p->probe_filename(corsaro->uridata) == 1)
953  {
954  corsaro_log_in(__func__, corsaro,
955  "%s plugin selected to read %s (using file name)",
956  p->name, corsaro->uridata);
957  corsaro->plugin = p;
958  }
959  }
960 
961  /* if the previous method for detection failed, lets try peeking into
962  the file */
963  p = NULL;
964  while(corsaro->plugin == NULL &&
965  (p = corsaro_plugin_next(corsaro->plugin_manager, p)) != NULL)
966  {
967  if(p->probe_magic(corsaro, corsaro->file) == 1)
968  {
969  corsaro_log_in(__func__, corsaro,
970  "%s plugin selected to read %s (using magic)",
971  p->name, corsaro->uridata);
972  corsaro->plugin = p;
973  }
974  }
975 
976  /* if corsaro->plugin is still NULL, see if this is the global output */
977  if(corsaro->plugin == NULL)
978  {
979  if(check_global_filename(corsaro->uridata) != 1 &&
980  check_global_magic(corsaro, corsaro->file) != 1)
981  {
982  /* we have no idea what this file was created by */
983  corsaro_log_in(__func__, corsaro, "unable to find plugin to decode %s\n"
984  " - is this a corsaro file?\n"
985  " - is corsaro compiled with all needed plugins?",
986  corsaro->uridata);
987  return -1;
988  }
989  else
990  {
991  /* this the corsaro global output */
992  /* we initially expect an corsaro header record in a global file */
994  corsaro_log_in(__func__, corsaro, "corsaro_global selected to read %s",
995  corsaro->uridata);
996  }
997  }
998  else
999  {
1000  /* start up the plugin we detected */
1001  if(corsaro->plugin->init_input(corsaro) != 0)
1002  {
1003  corsaro_log_in(__func__, corsaro, "could not initialize %s",
1004  corsaro->plugin->name);
1005  return -1;
1006  }
1007  }
1008 
1009  corsaro->started = 1;
1010 
1011  return 0;
1012 }
1013 
1015 {
1016  corsaro_in_record_t *record = NULL;
1017 
1018  if((record = malloc(sizeof(corsaro_in_record_t))) == NULL)
1019  {
1020  corsaro_log_in(__func__, corsaro, "could not malloc corsaro_in_record_t");
1021  return NULL;
1022  }
1023 
1024  record->corsaro = corsaro;
1025 
1026  /* pre-allocate some memory for the buffer */
1027  if((record->buffer = malloc(sizeof(uint8_t)*
1029  {
1030  corsaro_log_in(__func__, corsaro, "could not malloc record buffer");
1031  corsaro_in_free_record(record);
1032  return NULL;
1033  }
1034 
1036 
1037  record->type = -1;
1038 
1039  return record;
1040 }
1041 
1043 {
1044  if(record == NULL)
1045  {
1046  corsaro_log_file(__func__, NULL, "possible double free of record pointer");
1047  return;
1048  }
1049 
1050  if(record->buffer != NULL)
1051  {
1052  free(record->buffer);
1053  record->buffer = NULL;
1054  }
1055  record->buffer_len = -1;
1056 
1057  record->type = -1;
1058 
1059  free(record);
1060 }
1061 
1063  corsaro_in_record_type_t *record_type,
1064  corsaro_in_record_t *record)
1065 {
1066  /* the first check makes sure we don't have a plugin that we have assigned
1067  the file to, the second ensures that if there is a plugin, we will
1068  only directly use it when we're not in global mode */
1069  if(corsaro->plugin != NULL
1071  {
1072  return corsaro->plugin->read_record(corsaro, record_type, record);
1073  }
1074  else
1075  {
1076  /* this is the global plugin, handle it ourselves */
1077  return read_record(corsaro, record_type, record);
1078  }
1079 
1080  return -1;
1081 }
1082 
1084 {
1085  return record->buffer;
1086 }
1087 
1089 {
1090  corsaro_in_free(corsaro);
1091  return 0;
1092 }