FlowTuple - STARDUST
See the latest slide deck about Flowtuples presented in 2021.
FlowTuple data is an aggregated representation of traffic captured at the network telescope that enables a more efficient processing and analysis for many research use cases that do not need access to the full packet contents. In addition, the FlowTuple format includes meta-data associated with the corresponding source IP address of each FlowTuple, such as IP geolocation, IP2ASN, or whether we categorize the source address as spoofed.
A STARDUST flowtuple has a slightly different definition compared to the typical flowtuple definition (i.e., the 5-tuple of source IP, dest IP, source port, dest port and transport protocol). The STARDUST flowtuple takes into account additional header fields (e.g., TCP flags) because of the characterstics of various componenets found in unsolicited one-way traffic and the most common types of analyses that research perform.
FlowTuples are stored in the Avro data format. A single flow entry is distinguished by the timestamp, source IP address, destination IP network (within a /24 subnet mask), destination port, and protocol.
Data fields in a FlowTuple record the following:
uniq_<data field>: the number of unique values seen for the data field
common_<data field>: an array containing any data values which occur frequently for the flow
common_<data field>_freqs: an array containing the frequency at which the values shown in the preceding array occurred.
- A note about required frequencies: To be considered a “frequently occurring” value, the value must be seen in at least 20% of packets that are matched to a given flow. For flows where the packet count is low, the ratio is increased to compensate for the low sample size.
Total Flow Packets Minimum Ratio 1 - 4 1.0 5 - 6 0.5 7 - 14 0.33 15+ 0.2
Contents of a FlowTuple v4 Record
|time||long||Key||Timestamp of the interval that this flowtuple belongs to|
|src_ip||long||Key||Source IP address, as an integer|
|dst_net||long||Key||The destination IP network, as an integer|
|dst_port||long||Key||The destination port for TCP and UDP flows, (type « 8) + code for ICMP flows|
|protocol||int||Key||The transport protocol used, e.g. 6 = TCP, 17 = UDP|
|packet_cnt||long||Counter||Number of packets seen in this interval that match this flow description|
|uniq_dst_ips||long||Counter||Number of unique destination IPs seen for this flow|
|uniq_pkt_sizes||long||Counter||Number of unique packet sizes seen for this flow|
|uniq_ttls||long||Counter||Number of unique IP TTLs seen for this flow|
|uniq_src_ports||long||Counter||Number of unique source ports seen for this flow (TCP and UDP only)|
|uniq_tcp_flags||long||Counter||Number of unique TCP flag combinations seen for this flow (TCP only)|
|first_syn_length||int||First||Only applies to TCP flows; the size of the TCP header (e.g. doff * 5) for the first observed packet|
|first_tcp_rwin||int||First||Only applies to TCP flows; the receive window announced in the first observed TCP SYN packet|
|common_pktsizes||array(long)||Observed Values||Array containing packet sizes that were frequently observed for this flow|
|common_pktsize_freqs||array(long)||Frequencies||Array containing frequencies for packet sizes listed in common_pktsizes array|
|common_ttls||array(long)||Observed Values||Array containing IP TTLs that were frequently observed for this flow|
|common_ttl_freqs||array(long)||Frequencies||Array containing frequencies for IP TTLs listed in common_ttls array|
|common_srcports||array(long)||Observed Values||Array containing TCP/UDP source ports that were frequently observed for this flow|
|common_srcport_freqs||array(long)||Frequencies||Array containing frequencies for IP TTLs listed in common_srcports array|
|common_tcpflags||array(long)||Observed Values||Array containing TCP flag combinations that were frequently observed for this flow|
|common_tcpflag_freqs||array(long)||Frequencies||Array containing frequencies for TCP flags listed in common_tcpflags array|
|maxmind_continent||string||Derived from Source IP||Geo-location of the source IP address, according to Maxmind (continent level)|
|maxmind_country||string||Derived from Source IP||Geo-location of the source IP address, according to Maxmind (country level)|
|netacq_continent||string||Derived from Source IP||Geo-location of the source IP address, according to Netacq-Edge (continent level)|
|netacq_country||string||Derived from Source IP||Geo-location of the source IP address, according to Netacq-Edge (country level)|
|prefix2asn||long||Derived from Source IP||ASN that the source IP address belongs to, according to the
|spoofed_packet_cnt||long||Counter||Number of packets where the source IP address was inferred to be spoofed|
|masscan_packet_cnt||long||Counter||Number of packets that were inferred to be sent by the masscan tool|
FlowTuple data is stored in Openstack Swift using the Apache Avro data format. Specifically, data for each year is saved in a separate Swift container:
Users can read the Avro files directly using existing Avro-compatible tools or libraries, if desired, but the best way to work with the flowtuple data is by writing Python scripts using PyAvro-STARDUST or the Pyspark STARDUST API.
Note: Flowtuple File Name Formats
- August 2008 to July 2020 :
- August 2020 to Aug 2021 :
- September 2021 to Present :
Processing FlowTuples with PyAvro-STARDUST
You can write Python analysis code that will process the flowtuple Avro files in the STARDUST object store directly using the
PyAvro-STARDUST module. This module provides a simple interface that aims to
be faster and easier to use than existing Python Avro libraries (such as
Documentation on how to install and use PyAvro-STARDUST can be found in the GitHub repository.
An example script for processing flowtuple v4 data can be found here.
Processing FlowTuples with the STARDUST Pyspark API
For larger scale processing, we have developed an API for writing Python scripts that can run flowtuple analysis jobs on an Apache Spark cluster. The API allows you to specify a time period that you are interested in, then run methods that can perform common filtering or data processing tasks (e.g. filtering flowtuples by source address prefix, or finding the top 10 more frequent values for a metric.
You can also run raw Spark SQL queries directly against a set of flowtuples, if the exact analysis that you want to perform is not implemented as an API method.
This API is still a work in progress, but our implementation thus far is available here.
Some example flowtuple analysis code written using the STARDUST Pyspark API is given below:
from stardust import StardustPysparkHelper # creating an instance of a helper sd = StardustPysparkHelper("telescope-ucsdnt-avro-flowtuple-v4-2021", "ucsd-nt", "v3_5min") # starting a spark session with the name "test", using 4 partitions (CPUs) sd.startSparkSession("test", 4) # getting flowtuple records from a specific time range range_recs = sd.getFlowtuplesByTimeRange(1612130100, 1612133700) # filtering a set of flowtuples based on a source IP prefix # NOTE: the prefix must be a unicode string prefix_df = sd.filterFlowtuplesByPrefix(lastday_recs, u"188.8.131.52/8") # get the number of flows that matched our filter prefix print(prefix_df.count()) # print the first 20 flows that matched our query # ALWAYS use collect() to convert a data frame into a list # of rows if you want to inspect the results results = prefix_df.limit(20).collect() for r in results: # this prints the raw row object, which is fine for debugging but # you'll want to do some proper formatting for usable output print(r) print() # finding flowtuples that match a set of filtering criteria # base will contain flowtuples with a Russian source IP, # sect will contain flowtuples with a Russian source IP that # use multiple TTLs. sect, base = sd.createFlowtupleIntersection(range_recs, ["netacq_country == 'RU'", "uniq_ttls > 1"]) # generate aggregated report-style time series data for the intersection # we just generated sect_report = sd.generateReportOutputFromFlowtuples(sect, "example", "ftintersect", "RU_multiTTL") # again, use collect() to convert our dataframe in rows that we can # write as output sect_results = sect_report.collect() base_results = base_report.collect() # dump the time series to stdout as pairs of Row objects # this is a bit lazy -- if you were doing this for real, # you would want better error checking and output formatting for i in range(0, sect_report.count()): print(sect_results[i], base_results[i]) # get all flowtuples where the TTL falls between 20 and 30 OR 70 and 80 q_res = sd.filterFlowtuplesByCommonValue(range_recs, "common_ttls", [(20, 30), (70, 80)]) # show the top 10 destination ports (by flow count) for Russian source IPs # setting the last parameter to True will also include an "Other" category topn = sd.getTopValuesByFlowCount(base, "netacq_country", 10, True) for k,v in topn.items(): print(v) # there are also other handy methods, see the module documentation # (can be accessed using help())