Skip to content

Commit

Permalink
* Introduced 'label' primitive: it is a variable length string equiva…
Browse files Browse the repository at this point in the history
…lent

  of tag and tag2 primitives. Its value can be set via 'set_label' in a
  pre_tag_map (see examples/pretag.map.example). If, ie. as a result of
  JEQ's in a pre_tag_map, multiple 'set_label' are applied, then default
  operation is append labels and separate by a comma.
* fix, nfacctd: when nfacctd_time_new was set to false (default) plugins
  were reporting incorrect query number (QN) in their concluding line of
  purge reports. Only 1.5.0 CVS releases are affected.
* Updated docs
  • Loading branch information
paololucente authored and paolo committed Jul 17, 2014
1 parent 3cfcee8 commit a61b396
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 65 deletions.
27 changes: 14 additions & 13 deletions CONFIG-KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ DESC: daemonizes the process (default: false).
KEY: aggregate (-c)
VALUES: [ src_mac, dst_mac, vlan, cos, etype, src_host, dst_host, src_net, dst_net,
src_mask, dst_mask, src_as, dst_as, src_port, dst_port, tos, proto, none,
sum_mac, sum_host, sum_net, sum_as, sum_port, flows, tag, tag2, class,
tcpflags, in_iface, out_iface, std_comm, ext_comm, as_path, peer_src_ip,
peer_dst_ip, peer_src_as, peer_dst_as, local_pref, med, src_std_comm,
src_ext_comm, src_as_path, src_local_pref, src_med, mpls_vpn_rd,
sum_mac, sum_host, sum_net, sum_as, sum_port, flows, tag, tag2, label,
class, tcpflags, in_iface, out_iface, std_comm, ext_comm, as_path,
peer_src_ip, peer_dst_ip, peer_src_as, peer_dst_as, local_pref, med,
src_std_comm, src_ext_comm, src_as_path, src_local_pref, src_med, mpls_vpn_rd,
mpls_label_top, mpls_label_bottom, mpls_stack_depth, sampling_rate,
src_host_country, dst_host_country, pkt_len_distrib, nat_event,
post_nat_src_host, post_nat_dst_host, post_nat_src_port, post_nat_dst_port,
Expand All @@ -59,10 +59,10 @@ DESC: aggregate captured traffic data by selecting the specified set of primiti
sum_<primitive> are compound primitives which join together both inbound and
outbound traffic into a single aggregate. The 'none' primitive allows to make
an unique aggregate which accounts for the grand total of traffic flowing
through a specific interface. 'tag' and 'tag2' enable generation of tags when
tagging engines (pre_tag_map, post_tag) are in use. 'class' enables reception
of L7 traffic classes when Packet/Flow Classification engine (classifiers) is
in use. (default: src_host).
through a specific interface. 'tag', 'tag2' and 'label' enable generation of
tags when tagging engines (pre_tag_map, post_tag) are in use. 'class' enables
L7 traffic classes when Packet/Flow Classification engine (classifiers) is in
use. (default: src_host).
NOTES: * Some primitives (ie. tag2, timestamp_start, timestamp_end) are not part of
any default SQL table schema shipped. Always check out documentation related
to the RDBMS in use (ie. 'sql/README.mysql') which will point you to extra
Expand Down Expand Up @@ -999,11 +999,12 @@ DESC: both nfacctd and sfacctd check health of incoming NetFlow/sFlow

KEY: pre_tag_map
DESC: full pathname to a file containing tag mappings. Tags can be internal-only (ie. for filtering
purposes, see pre_tag_filter configuration directive) or exposed to users (ie. if 'tag' and/or
'tag2' primitives are part of the aggregation method). Take a look to the examples/ sub-tree
for all supported keys and detailed examples (pretag.map.example). Pre-Tagging is evaluated in
the Core Process and each plugin can be defined a local pre_tag_map. Result of evaluation of
pre_tag_map overrides any tags passed via NetFlow/sFlow by a pmacct nfprobe/sfprobe plugin.
purposes, see pre_tag_filter configuration directive) or exposed to users (ie. if 'tag', 'tag2'
and/or 'label' primitives are part of the aggregation method). Take a look to the examples/
sub-tree for all supported keys and detailed examples (pretag.map.example). Pre-Tagging is
evaluated in the Core Process and each plugin can be defined a local pre_tag_map. Result of
evaluation of pre_tag_map overrides any tags passed via NetFlow/sFlow by a pmacct nfprobe/
sfprobe plugin.

KEY: maps_entries
DESC: defines the maximum number of entries a map (ie. pre_tag_map) can contain. The default value
Expand Down
63 changes: 39 additions & 24 deletions examples/pretag.map.example
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
! pre_tag_map: Pre-Tagging map -- multiplexes various fields into a 8-bytes
! numerical-only ID
! Pre-Tagging map -- upon matching a set of given conditions, pre_tag_map does
! return numerical (set_tag, set_tag2) or string (label) IDs.
!
! File syntax is key-based. Position of keys inside the same row (rule) is not
! relevant; Spaces are not allowed (ie. 'id = 1' is not valid). The first full
! match wins (like in firewall rules). Negative values mean negations (ie. match
! data NOT entering interface 2: 'in=-2'); 'set_tag', 'set_tag2', 'filter' and
! 'ip' keys don't support negative values. 'label', 'jeq', 'return' and 'stack'
! keys can be used to alter the standard rule evaluation flow.
! data NOT entering interface 2: 'in=-2'); 'set_tag', 'set_tag2', 'set_label',
! 'filter' and 'ip' keys don't support negative values. 'label', 'jeq', 'return'
! and 'stack' keys can be used to alter the standard rule evaluation flow.
!
! nfacctd: valid keys: set_tag, set_tag2, set_tos, ip, in, out, engine_type,
! engine_id, flowset_id, nexthop, bgp_nexthop, filter, v8agg, sampling_rate,
! sample_type, direction, src_mac, vlan; mandatory keys for each rule: ip.
! nfacctd: valid keys: set_tag, set_tag2, set_label, set_tos, ip, in, out,
! engine_type, engine_id, flowset_id, nexthop, bgp_nexthop, filter, v8agg,
! sampling_rate, sample_type, direction, src_mac, vlan; mandatory keys for each
! rule: ip.
!
! sfacctd: valid keys: set_tag, set_tag2, set_tos, ip, in, out, nexthop,
! bgp_nexthop, filter, agent_id, sampling_rate, sample_type, src_mac, vlan;
! mandatory keys for each rule: ip.
! sfacctd: valid keys: set_tag, set_tag2, set_label, set_tos, ip, in, out,
! nexthop, bgp_nexthop, filter, agent_id, sampling_rate, sample_type, src_mac,
! vlan; mandatory keys for each rule: ip.
!
! pmacctd: valid keys: set_tag, set_tag2 and filter.
! pmacctd: valid keys: set_tag, set_tag2, set_label and filter.
!
! sfacctd, nfacctd when in 'tee' mode: valid keys: set_tag, set_tag2, ip;
! mandatory keys for each rule: ip.
! sfacctd, nfacctd when in 'tee' mode: valid keys: set_tag, set_tag2, set_label,
! ip; mandatory keys for each rule: ip.
!
! BGP-related keys are independent of the collection method in use, hence apply
! to all daemons (BGP daemon must be enabled): src_as, dst_as, src_comms, comms,
Expand All @@ -29,20 +30,24 @@
!
! 'set_tag' SET: tag assigned to a matching packet, flow or sample;
! tag can be also defined auto-increasing, ie. <tag #>++;
! its use is mutually exclusive to 'set_tag2' within the
! same rule. The resulting value is written to the 'tag'
! field when using memory tables and 'agent_id' when using
! a SQL plugin (unless a schema v9 is used). Legacy name
! for this primitive is 'id'.
! its use is mutually exclusive to set_tag2 and set_label
! within the same rule. The resulting value is written to
! the 'tag' field when using memory tables and 'agent_id'
! when using a SQL plugin (unless a schema v9 is used).
! Legacy name for this primitive is 'id'.
! 'set_tag2' SET: tag assigned to a matching packet, flow or sample;
! tag can be also defined auto-increasing, ie. <tag #>++;
! its use is mutually exclusive to 'set_tag' within the
! same rule. The resulting value is written to the 'tag2'
! field when using memory tables and 'agent_id2' when
! using a SQL plugin (unless a schema v9 is used). If
! using a SQL plugin, read more about the 'agent_id2'
! its use is mutually exclusive to set_tag and set_label
! within the same rule. The resulting value is written to
! the 'tag2' field when using memory tables and 'agent_id2'
! when using a SQL plugin (unless a schema v9 is used).
! If using a SQL plugin, read more about the 'agent_id2'
! field in the 'sql/README.agent_id2' document. Legacy
! name for this primitive is 'id2'.
! 'set_label' SET: string label assigned to a matching packet, flow
! or sample; its use is mutually exclusive to tags within
! the same rule. The resulting value is written to the
! 'label' field.
! 'set_tos' SET: Matching packets are set their 'tos' primitive to
! the specified value. Currently valid only in nfacctd. If
! collecting ingress NetFlow at both trusted and untrusted
Expand Down Expand Up @@ -291,3 +296,13 @@ set_tag2=6 label=sixsix
set_tag2=7 label=sevenseven
!
! ===
!
! === Basic set_label example
! Tag as "blabla,blabla2" all NetFlow/sFlow data received from any exporter.
! If, ie. as a result of JEQ's in a pre_tag_map, multiple 'set_label' are
! applied, then default operation is append labels and separate by a comma.
!
set_label=blabla ip=0.0.0.0/0 jeq=blabla2
set_label=blabla2 ip=0.0.0.0/0 label=blabla2
!
! ===
14 changes: 14 additions & 0 deletions sql/README.label
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
This document doesn't replace documentation relevant to the database software you are
using, ie. README.mysql, README.pgsql or README.sqlite3.

The 'label' field.
Such field carries variable string tags and is equivalent in purpose to tag and tag2,
with the exception tag and tag2 are numerical-only. The guidelines below (typically
in MySQL format) are to add such primitive to the SQL schema:

* label field:
- "label VARCHAR(255) NOT NULL," to declare the field itself
- "PRIMARY KEY (..., label, ...)" to put it in the primary key

The primitive is not declared as part of any default table version; yet will not fail
version checks which are enabled when 'sql_optimize_clauses' feature is disabled.
1 change: 1 addition & 0 deletions sql/README.mysql
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Similarly, BGP tables:
* tag => agent_id (INT(4) UNSIGNED NOT NULL)
- tag => tag (INT(4) UNSIGNED NOT NULL, if sql_table_version >= 9)
* tag2 => tag2 (INT(4) UNSIGNED NOT NULL, see README.tag2)
* label => label (VARCHAR(255) NOT NULL, see README.label)
* src_as => as_src (INT(4) UNSIGNED NOT NULL)
* dst_as => as_dst (INT(4) UNSIGNED NOT NULL)
* peer_src_as => peer_as_src (INT(4) UNSIGNED NOT NULL)
Expand Down
1 change: 1 addition & 0 deletions sql/README.pgsql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ CHAR fields because making use of IP prefix labels, transparently to pmacct.
* tag => agent_id (BIGINT NOT NULL DEFAULT 0)
- tag => tag (BIGINT NOT NULL DEFAULT 0, if sql_table_version >= 9)
* tag2 => tag2 (BIGINT NOT NULL DEFAULT 0, see README.tag2)
* label => label (VARCHAR(255) NOT NULL DEFAULT ' ', see README.label)
* src_as => as_src (BIGINT NOT NULL DEFAULT 0)
* dst_as => as_dst (BIGINT NOT NULL DEFAULT 0)
* peer_src_as => peer_as_src (BIGINT NOT NULL DEFAULT 0)
Expand Down
1 change: 1 addition & 0 deletions sql/README.sqlite3
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Similarly, BGP tables:
* tag => agent_id (INT(8) NOT NULL DEFAULT 0)
- tag => tag (INT(8) NOT NULL DEFAULT 0, if sql_table_version >= 9)
* tag2 => tag2 (INT(8) NOT NULL DEFAULT 0, see README.tag2)
* label => label (VARCHAR(255) NOT NULL DEFAULT ' ', see README.label)
* src_as => as_src (INT(8) NOT NULL DEFAULT 0)
* dst_as => as_dst (INT(8) NOT NULL DEFAULT 0)
* peer_src_as => peer_as_src (INT(8) NOT NULL DEFAULT 0)
Expand Down
4 changes: 2 additions & 2 deletions src/amqp_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void amqp_cache_purge(struct chained_cache *queue[], int index)
char *empty_pcust = NULL;
char src_mac[18], dst_mac[18], src_host[INET6_ADDRSTRLEN], dst_host[INET6_ADDRSTRLEN], ip_address[INET6_ADDRSTRLEN];
char rd_str[SRVBUFLEN], misc_str[SRVBUFLEN], dyn_amqp_routing_key[SRVBUFLEN], *orig_amqp_routing_key = NULL;
int i, j, stop, batch_idx, is_routing_key_dyn = FALSE, qn = 0, ret;
int i, j, stop, batch_idx, is_routing_key_dyn = FALSE, qn = 0, ret, saved_index = index;
time_t start, duration;
pid_t writer_pid = getpid();

Expand Down Expand Up @@ -334,7 +334,7 @@ void amqp_cache_purge(struct chained_cache *queue[], int index)

duration = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
config.name, config.type, writer_pid, qn, index, duration);
config.name, config.type, writer_pid, qn, saved_index, duration);

if (config.sql_trigger_exec) P_trigger_exec(config.sql_trigger_exec);
}
Expand Down
4 changes: 2 additions & 2 deletions src/mongodb_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void MongoDB_cache_purge(struct chained_cache *queue[], int index)
char rd_str[SRVBUFLEN], misc_str[SRVBUFLEN], tmpbuf[LONGLONGSRVBUFLEN], mongo_database[SRVBUFLEN];
char *as_path, *bgp_comm, default_table[] = "test.acct";
char default_user[] = "pmacct", default_passwd[] = "arealsmartpwd";
int qn = 0, i, j, stop, db_status, batch_idx, go_to_pending;
int qn = 0, i, j, stop, db_status, batch_idx, go_to_pending, saved_index = index;
time_t stamp, start, duration;
char current_table[SRVBUFLEN], elem_table[SRVBUFLEN];
struct primitives_ptrs prim_ptrs;
Expand Down Expand Up @@ -676,7 +676,7 @@ void MongoDB_cache_purge(struct chained_cache *queue[], int index)

duration = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
config.name, config.type, writer_pid, qn, index, duration);
config.name, config.type, writer_pid, qn, saved_index, duration);

if (config.sql_trigger_exec) P_trigger_exec(config.sql_trigger_exec);
}
Expand Down
4 changes: 2 additions & 2 deletions src/mysql_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ void MY_cache_purge(struct db_cache *queue[], int index, struct insert_data *ida
struct db_cache *LastElemCommitted = NULL;
struct logfile lf;
time_t start;
int j, stop, ret, go_to_pending;
int j, stop, ret, go_to_pending, saved_index = index;
char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
char tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
struct primitives_ptrs prim_ptrs;
Expand Down Expand Up @@ -492,7 +492,7 @@ void MY_cache_purge(struct db_cache *queue[], int index, struct insert_data *ida

idata->elap_time = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
config.name, config.type, writer_pid, idata->qn, index, idata->elap_time);
config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);

if (config.sql_trigger_exec) {
if (!config.debug) idata->elap_time = time(NULL)-start;
Expand Down
4 changes: 2 additions & 2 deletions src/pgsql_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ void PG_cache_purge(struct db_cache *queue[], int index, struct insert_data *ida
char orig_insert_clause[LONGSRVBUFLEN], orig_update_clause[LONGSRVBUFLEN], orig_lock_clause[LONGSRVBUFLEN];
char orig_copy_clause[LONGSRVBUFLEN], tmpbuf[LONGLONGSRVBUFLEN], tmptable[SRVBUFLEN];
time_t start;
int j, r, reprocess = 0, stop, go_to_pending, reprocess_idx, bulk_reprocess_idx;
int j, r, reprocess = 0, stop, go_to_pending, reprocess_idx, bulk_reprocess_idx, saved_index = index;
struct primitives_ptrs prim_ptrs;
struct pkt_data dummy_data;
pid_t writer_pid = getpid();
Expand Down Expand Up @@ -552,7 +552,7 @@ void PG_cache_purge(struct db_cache *queue[], int index, struct insert_data *ida

idata->elap_time = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
config.name, config.type, writer_pid, idata->qn, index, idata->elap_time);
config.name, config.type, writer_pid, idata->qn, saved_index, idata->elap_time);

if (config.sql_trigger_exec) {
if (!config.debug) idata->elap_time = time(NULL)-start;
Expand Down
10 changes: 0 additions & 10 deletions src/plugin_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,6 @@ void P_cache_insert_pending(struct chained_cache *queue[], int index, struct cha

for (j = 0; j < index; j++) {
primptrs_set_all_from_chained_cache(&prim_ptrs, queue[j]);
/*
memset(&pdata, 0, sizeof(pdata));
memcpy(&pdata.primitives, &queue[j]->primitives, pp_size);
prim_ptrs.data = &pdata;
prim_ptrs.pbgp = queue[j]->pbgp;
prim_ptrs.pnat = queue[j]->pnat;
prim_ptrs.pmpls = queue[j]->pmpls;
prim_ptrs.pcust = queue[j]->pcust;
prim_ptrs.pvlen = queue[j]->pvlen;
*/

modulo = P_cache_modulo(&prim_ptrs);
cache_ptr = &cache[modulo];
Expand Down
2 changes: 1 addition & 1 deletion src/pmacct-build.h
Original file line number Diff line number Diff line change
@@ -1 +1 @@
#define PMACCT_BUILD "20140716-00"
#define PMACCT_BUILD "20140717-00"
4 changes: 4 additions & 0 deletions src/pmmyplay.c
Original file line number Diff line number Diff line change
Expand Up @@ -806,3 +806,7 @@ int bgp_rd2str(u_char *str, rd_t *rd)
void custom_primitive_value_print(char *out, int outlen, char *in, struct custom_primitive_ptrs *cp_entry, int formatted)
{
}

void vlen_prims_get(struct pkt_vlen_hdr_primitives *pvlen, pm_cfgreg_t wtc, char **label_ptr)
{
}
4 changes: 4 additions & 0 deletions src/pmpgplay.c
Original file line number Diff line number Diff line change
Expand Up @@ -903,3 +903,7 @@ int bgp_rd2str(u_char *str, rd_t *rd)
void custom_primitive_value_print(char *out, int outlen, char *in, struct custom_primitive_ptrs *cp_entry, int formatted)
{
}

void vlen_prims_get(struct pkt_vlen_hdr_primitives *pvlen, pm_cfgreg_t wtc, char **label_ptr)
{
}
4 changes: 2 additions & 2 deletions src/print_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void P_cache_purge(struct chained_cache *queue[], int index)
char *as_path, *bgp_comm, empty_string[] = "", empty_aspath[] = "^$", empty_ip4[] = "0.0.0.0", empty_ip6[] = "::";
char empty_macaddress[] = "00:00:00:00:00:00", empty_rd[] = "0:0";
FILE *f = NULL;
int j, stop, is_event = FALSE, qn = 0, go_to_pending;
int j, stop, is_event = FALSE, qn = 0, go_to_pending, saved_index = index;
time_t start, duration;
char tmpbuf[LONGLONGSRVBUFLEN], current_table[SRVBUFLEN], elem_table[SRVBUFLEN];
struct primitives_ptrs prim_ptrs, elem_prim_ptrs;
Expand Down Expand Up @@ -916,7 +916,7 @@ void P_cache_purge(struct chained_cache *queue[], int index)

duration = time(NULL)-start;
Log(LOG_INFO, "INFO ( %s/%s ): *** Purging cache - END (PID: %u, QN: %u/%u, ET: %u) ***\n",
config.name, config.type, writer_pid, qn, index, duration);
config.name, config.type, writer_pid, qn, saved_index, duration);

if (config.sql_trigger_exec) P_trigger_exec(config.sql_trigger_exec);
}
Expand Down
Loading

0 comments on commit a61b396

Please sign in to comment.