Skip to content

Commit

Permalink
* NetFlow v9 sampling is now implemented as part of the NetFlow probe
Browse files Browse the repository at this point in the history
  plugin, nfprobe. FLOW_SAMPLER_ID (48), FLOW_SAMPLER_MODE (49) and
  FLOW_SAMPLER_RANDOM_INTERVAL (50) are sent as part of both the
  options template and options data packets. Only system scope is
  supported; multiple plugins off the same IP address sampling at
  different rates can be recognized by tagging them with differen
  source_id (engined_id, engine_type)
* nfprobe, NetFlow v9: minor bug was fixed when counting total flows
  in a datagram. Relevant only to packets carrying templates.
  • Loading branch information
paololucente authored and paolo committed Oct 4, 2009
1 parent c98b0aa commit 5579b0f
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 14 deletions.
5 changes: 0 additions & 5 deletions CONFIG-KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,6 @@ KEY: sfprobe_receiver
DESC: defines the remote IP address/hostname and port to which sFlow dagagrams are to be exported.
The value is expected to be in the usual form 'address:port'. (default: 127.0.0.1:6343)

KEY: sfprobe_sampling_rate
VALUES: [>= 1]
DESC: defines packet sampling. It expects a number which is the mean ratio of packets to be sampled
(1 out of N). The currently implemented sampling algorithm is a simple randomic one. (default: no sampling)

KEY: sfprobe_agentip
DESC: sets the value of agentIp field inside the sFlow datagram header.

Expand Down
2 changes: 1 addition & 1 deletion EXAMPLES
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ b) build sFlow probe configuration, using pmacctd:
daemonize: true
interface: eth0
plugins: sfprobe
sampling_rate: 20
sfprobe_agentsubid: 1402
sfprobe_receiver: 1.2.3.4:6343
sfprobe_sampling_rate: 20
!
! networks_file: /path/to/networks.lst
! classifiers: /path/to/classifiers/
Expand Down
164 changes: 158 additions & 6 deletions src/nfprobe_plugin/netflow9.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ struct NF9_TEMPLATE_FLOWSET_HEADER {
struct NF9_FLOWSET_HEADER_COMMON c;
u_int16_t template_id, count;
} __packed;
struct NF9_OPTIONS_TEMPLATE_FLOWSET_HEADER {
struct NF9_FLOWSET_HEADER_COMMON c;
u_int16_t template_id, scope_len;
u_int16_t option_len;
} __packed;
struct NF9_TEMPLATE_FLOWSET_RECORD {
u_int16_t type, length;
} __packed;
Expand Down Expand Up @@ -77,6 +82,9 @@ struct NF9_DATA_FLOWSET_HEADER {
#define NF9_IPV6_SRC_ADDR 27
#define NF9_IPV6_DST_ADDR 28
/* ... */
#define NF9_FLOW_SAMPLER_ID 48
#define NF9_FLOW_SAMPLER_MODE 49
#define NF9_FLOW_SAMPLER_INTERVAL 50
#define NF9_SRC_MAC 56
#define NF9_DST_MAC 57
#define NF9_SRC_VLAN 58
Expand All @@ -89,24 +97,40 @@ struct NF9_DATA_FLOWSET_HEADER {
#define NF9_CUST_TAG 201
/* CUSTOM TYPES END HERE */

/* OPTION SCOPES */
#define NF9_OPT_SCOPE_SYSTEM 1

/* Stuff pertaining to the templates that softflowd uses */
#define NF9_SOFTFLOWD_TEMPLATE_NRECORDS 21
#define NF9_SOFTFLOWD_TEMPLATE_NRECORDS 22
struct NF9_SOFTFLOWD_TEMPLATE {
struct NF9_TEMPLATE_FLOWSET_HEADER h;
struct NF9_TEMPLATE_FLOWSET_RECORD r[NF9_SOFTFLOWD_TEMPLATE_NRECORDS];
u_int16_t tot_len;
} __packed;

#define NF9_OPTIONS_TEMPLATE_NRECORDS 4
struct NF9_OPTIONS_TEMPLATE {
struct NF9_OPTIONS_TEMPLATE_FLOWSET_HEADER h;
struct NF9_TEMPLATE_FLOWSET_RECORD r[NF9_OPTIONS_TEMPLATE_NRECORDS];
u_int16_t tot_len;
} __packed;

typedef void (*flow_to_flowset_handler) (char *, const struct FLOW *, int, int);
struct NF9_INTERNAL_TEMPLATE_RECORD {
flow_to_flowset_handler handler;
u_int16_t length;
};

struct NF9_INTERNAL_TEMPLATE {
struct NF9_INTERNAL_TEMPLATE_RECORD r[NF9_SOFTFLOWD_TEMPLATE_NRECORDS];
u_int16_t tot_rec_len;
};

struct NF9_INTERNAL_OPTIONS_TEMPLATE {
struct NF9_INTERNAL_TEMPLATE_RECORD r[NF9_OPTIONS_TEMPLATE_NRECORDS];
u_int16_t tot_rec_len;
};

/* softflowd data flowset types */
struct NF9_SOFTFLOWD_DATA_COMMON {
u_int32_t last_switched, first_switched;
Expand All @@ -132,17 +156,21 @@ struct NF9_SOFTFLOWD_DATA_V6 {
#define NF9_SOFTFLOWD_MAX_PACKET_SIZE 512
#define NF9_SOFTFLOWD_V4_TEMPLATE_ID 1024
#define NF9_SOFTFLOWD_V6_TEMPLATE_ID 2048
#define NF9_OPTIONS_TEMPLATE_ID 4096

#define NF9_DEFAULT_TEMPLATE_INTERVAL 18

static struct NF9_SOFTFLOWD_TEMPLATE v4_template;
static struct NF9_INTERNAL_TEMPLATE v4_int_template;
static struct NF9_SOFTFLOWD_TEMPLATE v6_template;
static struct NF9_INTERNAL_TEMPLATE v6_int_template;
static struct NF9_OPTIONS_TEMPLATE options_template;
static struct NF9_INTERNAL_OPTIONS_TEMPLATE options_int_template;
static char ftoft_buf_0[sizeof(struct NF9_SOFTFLOWD_DATA_V6)];
static char ftoft_buf_1[sizeof(struct NF9_SOFTFLOWD_DATA_V6)];

static int nf9_pkts_until_template = -1;
static u_int8_t send_options = FALSE;

static void
flow_to_flowset_flows_handler(char *flowset, const struct FLOW *flow, int idx, int size)
Expand Down Expand Up @@ -270,6 +298,15 @@ flow_to_flowset_tag_handler(char *flowset, const struct FLOW *flow, int idx, int
memcpy(flowset, &rec16, size);
}

static void
flow_to_flowset_sampler_id_handler(char *flowset, const struct FLOW *flow, int idx, int size)
{
u_int8_t rec8;

rec8 = 1;
memcpy(flowset, &rec8, size);
}

static void
nf9_init_template(void)
{
Expand Down Expand Up @@ -433,6 +470,13 @@ nf9_init_template(void)
v4_int_template.r[rcount].length = 2;
rcount++;
}
if (config.sampling_rate) {
v4_template.r[rcount].type = htons(NF9_FLOW_SAMPLER_ID);
v4_template.r[rcount].length = htons(1);
v4_int_template.r[rcount].handler = flow_to_flowset_sampler_id_handler;
v4_int_template.r[rcount].length = 1;
rcount++;
}
v4_template.h.c.flowset_id = htons(0);
v4_template.h.c.length = htons( sizeof(struct NF9_TEMPLATE_FLOWSET_HEADER) + (sizeof(struct NF9_TEMPLATE_FLOWSET_RECORD) * rcount) );
v4_template.h.template_id = htons(NF9_SOFTFLOWD_V4_TEMPLATE_ID + config.nfprobe_id);
Expand Down Expand Up @@ -578,6 +622,13 @@ nf9_init_template(void)
v6_int_template.r[rcount].length = 2;
rcount++;
}
if (config.sampling_rate) {
v6_template.r[rcount].type = htons(NF9_FLOW_SAMPLER_ID);
v6_template.r[rcount].length = htons(1);
v6_int_template.r[rcount].handler = flow_to_flowset_sampler_id_handler;
v6_int_template.r[rcount].length = 1;
rcount++;
}
v6_template.h.c.flowset_id = htons(0);
v6_template.h.c.length = htons( sizeof(struct NF9_TEMPLATE_FLOWSET_HEADER) + (sizeof(struct NF9_TEMPLATE_FLOWSET_RECORD) * rcount) );
v6_template.h.template_id = htons(NF9_SOFTFLOWD_V6_TEMPLATE_ID + config.nfprobe_id);
Expand All @@ -588,6 +639,42 @@ nf9_init_template(void)
v6_int_template.tot_rec_len += v6_int_template.r[idx].length;
}

static void
nf9_init_options_template(void)
{
int rcount, idx;

rcount = 0;
bzero(&options_template, sizeof(options_template));
bzero(&options_int_template, sizeof(options_int_template));

options_template.r[rcount].type = htons(NF9_OPT_SCOPE_SYSTEM);
options_template.r[rcount].length = htons(0);
options_int_template.r[rcount].length = 0;
rcount++;
options_template.r[rcount].type = htons(NF9_FLOW_SAMPLER_ID);
options_template.r[rcount].length = htons(1);
options_int_template.r[rcount].length = 1;
rcount++;
options_template.r[rcount].type = htons(NF9_FLOW_SAMPLER_MODE);
options_template.r[rcount].length = htons(1);
options_int_template.r[rcount].length = 1;
rcount++;
options_template.r[rcount].type = htons(NF9_FLOW_SAMPLER_INTERVAL);
options_template.r[rcount].length = htons(4);
options_int_template.r[rcount].length = 4;
rcount++;
options_template.h.c.flowset_id = htons(1);
options_template.h.c.length = htons( sizeof(struct NF9_OPTIONS_TEMPLATE_FLOWSET_HEADER) + (sizeof(struct NF9_TEMPLATE_FLOWSET_RECORD) * rcount) );
options_template.h.template_id = htons(NF9_OPTIONS_TEMPLATE_ID + config.nfprobe_id );
options_template.h.scope_len = htons(4); /* NF9_OPT_SCOPE_SYSTEM */
options_template.h.option_len = htons(12); /* NF9_FLOW_SAMPLER_ID + NF9_FLOW_SAMPLER_MODE + NF9_FLOW_SAMPLER_INTERVAL */
options_template.tot_len = sizeof(struct NF9_OPTIONS_TEMPLATE_FLOWSET_HEADER) + (sizeof(struct NF9_TEMPLATE_FLOWSET_RECORD) * rcount);

for (idx = 0, options_int_template.tot_rec_len = 0; idx < rcount; idx++)
options_int_template.tot_rec_len += options_int_template.r[idx].length;
}

static int
nf_flow_to_flowset(const struct FLOW *flow, u_char *packet, u_int len,
const struct timeval *system_boot_time, u_int *len_used)
Expand Down Expand Up @@ -680,6 +767,42 @@ nf_flow_to_flowset(const struct FLOW *flow, u_char *packet, u_int len,
return (nflows);
}

static int
nf_options_to_flowset(u_char *packet, u_int len, const struct timeval *system_boot_time, u_int *len_used)
{
u_int freclen, ret_len, nflows;
u_int32_t rec32;
u_int8_t rec8;
char *ftoft_ptr_0 = ftoft_buf_0;

bzero(ftoft_buf_0, sizeof(ftoft_buf_0));
*len_used = nflows = ret_len = 0;

rec8 = 1; /* NF9_FLOW_SAMPLER_ID */
memcpy(ftoft_ptr_0, &rec8, 1);
ftoft_ptr_0 += 1;

rec8 = 0x02; /* NF9_FLOW_SAMPLER_MODE */
memcpy(ftoft_ptr_0, &rec8, 1);
ftoft_ptr_0 += 1;

rec32 = htonl(config.sampling_rate); /* NF9_FLOW_SAMPLER_INTERVAL */
memcpy(ftoft_ptr_0, &rec32, 4);
ftoft_ptr_0 += 4;

freclen = options_int_template.tot_rec_len;

if (ret_len + freclen > len)
return (-1);

memcpy(packet + ret_len, ftoft_buf_0, freclen);
ret_len += freclen;
nflows++;

*len_used = ret_len;
return (nflows);
}

/*
* Given an array of expired flows, send netflow v9 report packets
* Returns number of packets sent or -1 on error
Expand All @@ -692,16 +815,17 @@ send_netflow_v9(struct FLOW **flows, int num_flows, int nfsock,
struct NF9_HEADER *nf9;
struct NF9_DATA_FLOWSET_HEADER *dh;
struct timeval now;
u_int offset, last_af, i, j, num_packets, inc, last_valid;
u_int offset, last_af, j, num_packets, inc, last_valid;
socklen_t errsz;
int err, r;
int err, r, i;
u_char packet[NF9_SOFTFLOWD_MAX_PACKET_SIZE];
u_int8_t *sid_ptr;

gettimeofday(&now, NULL);

if (nf9_pkts_until_template == -1) {
nf9_init_template();
nf9_init_options_template();
nf9_pkts_until_template = 0;
}

Expand All @@ -728,8 +852,17 @@ send_netflow_v9(struct FLOW **flows, int num_flows, int nfsock,
if (nf9_pkts_until_template <= 0) {
memcpy(packet + offset, &v4_template, v4_template.tot_len);
offset += v4_template.tot_len;
nf9->flows++;
memcpy(packet + offset, &v6_template, v6_template.tot_len);
offset += v6_template.tot_len;
nf9->flows++;
if (config.sampling_rate) {
memcpy(packet + offset, &options_template, options_template.tot_len);
offset += options_template.tot_len;
nf9->flows++;
send_options = TRUE;
printf("CI PASSO [OPTIONS TEMPLATE]\n");
}
nf9_pkts_until_template = NF9_DEFAULT_TEMPLATE_INTERVAL;
}

Expand All @@ -753,17 +886,30 @@ send_netflow_v9(struct FLOW **flows, int num_flows, int nfsock,
}
dh = (struct NF9_DATA_FLOWSET_HEADER *)
(packet + offset);
dh->c.flowset_id =
if (send_options) {
dh->c.flowset_id = options_template.h.template_id;
last_af = 0;
printf("CI PASSO [OPTIONS DATA]: ID: %u\n", ntohs(options_template.h.template_id));
}
else {
dh->c.flowset_id =
(flows[i + j]->af == AF_INET) ?
v4_template.h.template_id :
v6_template.h.template_id;
last_af = flows[i + j]->af;
last_af = flows[i + j]->af;
}
last_valid = offset;
dh->c.length = sizeof(*dh); /* Filled as we go */
offset += sizeof(*dh);
}

r = nf_flow_to_flowset(flows[i + j], packet + offset,
if (send_options) {
r = nf_options_to_flowset(packet + offset,
sizeof(packet) - offset, system_boot_time, &inc);
printf("CI PASSO [OPTIONS DATA]: R: %u\n", r);
}
else
r = nf_flow_to_flowset(flows[i + j], packet + offset,
sizeof(packet) - offset, system_boot_time, &inc);
if (r <= 0) {
/* yank off data header, if we had to go back */
Expand All @@ -782,6 +928,12 @@ send_netflow_v9(struct FLOW **flows, int num_flows, int nfsock,
dh->c.flowset_id, dh->c.length,
dh->c.length, nf9->flows);
}

if (send_options) {
printf("CI PASSO [OPTIONS DATA]: CHIUDO\n");
send_options = FALSE;
i--;
}
}
/* Don't finish header if it has already been done */
if (dh != NULL) {
Expand Down
2 changes: 0 additions & 2 deletions src/pmacct-data.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,6 @@ static const struct _dictionary_line dictionary[] = {
{"nfprobe_engine", cfg_key_nfprobe_engine},
{"nfprobe_version", cfg_key_nfprobe_version},
{"sfprobe_receiver", cfg_key_sfprobe_receiver},
// {"sfprobe_sampling_rate", cfg_key_sfprobe_sampling_rate},
{"sfprobe_sampling_rate", cfg_key_sampling_rate},
{"sfprobe_agentip", cfg_key_sfprobe_agentip},
{"sfprobe_agentsubid", cfg_key_sfprobe_agentsubid},
{"flow_handling_threads", cfg_key_flow_handling_threads},
Expand Down

0 comments on commit 5579b0f

Please sign in to comment.