Flowgrind
Advanced TCP traffic generator
flowgrind.c
Go to the documentation of this file.
1 
6 /*
7  * Copyright (C) 2013-2014 Alexander Zimmermann <alexander.zimmermann@netapp.com>
8  * Copyright (C) 2010-2013 Arnd Hannemann <arnd@arndnet.de>
9  * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
10  * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
11  * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
12  *
13  * This file is part of Flowgrind.
14  *
15  * Flowgrind is free software: you can redistribute it and/or modify
16  * it under the terms of the GNU General Public License as published by
17  * the Free Software Foundation, either version 3 of the License, or
18  * (at your option) any later version.
19  *
20  * Flowgrind is distributed in the hope that it will be useful,
21  * but WITHOUT ANY WARRANTY; without even the implied warranty of
22  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23  * GNU General Public License for more details.
24  *
25  * You should have received a copy of the GNU General Public License
26  * along with Flowgrind. If not, see <http://www.gnu.org/licenses/>.
27  *
28  */
29 
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif /* HAVE_CONFIG_H */
33 
34 #include <assert.h>
35 #include <errno.h>
36 #include <limits.h>
37 #include <math.h>
38 #include <sys/types.h>
39 /* for AF_INET6 */
40 #include <sys/socket.h>
41 #include <arpa/inet.h>
42 #include <netinet/in.h>
43 #include <netinet/ip.h>
44 /* for CA states (on Linux only) */
45 #include <netinet/tcp.h>
46 #include <signal.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/param.h>
51 #include <sys/uio.h>
52 #include <sys/utsname.h>
53 #include <time.h>
54 #include <unistd.h>
55 #include <fcntl.h>
56 #include <syslog.h>
57 /* xmlrpc-c */
58 #include <xmlrpc-c/base.h>
59 #include <xmlrpc-c/client.h>
60 
61 #include "flowgrind.h"
62 #include "common.h"
63 #include "fg_error.h"
64 #include "fg_progname.h"
65 #include "fg_time.h"
66 #include "fg_definitions.h"
67 #include "fg_string.h"
68 #include "debug.h"
69 #include "fg_rpc_client.h"
70 #include "fg_argparser.h"
71 #include "fg_log.h"
72 
74 #define SHOW_COLUMNS(...) \
75  (set_column_visibility(true, NARGS(__VA_ARGS__), __VA_ARGS__))
76 
78 #define HIDE_COLUMNS(...) \
79  (set_column_visibility(false, NARGS(__VA_ARGS__), __VA_ARGS__))
80 
82 #define SET_COLUMN_UNIT(unit, ...) \
83  (set_column_unit(unit, NARGS(__VA_ARGS__), __VA_ARGS__))
84 
86 #define PARSE_ERR(err_msg, ...) do { \
87  errx(err_msg, ##__VA_ARGS__); \
88  usage(EXIT_FAILURE); \
89 } while (0)
90 
91 /* External global variables */
92 extern const char *progname;
93 
95 static FILE *log_stream = NULL;
96 
98 static char *log_filename = NULL;
99 
101 static bool sigint_caught = false;
102 
103 /* XML-RPC environment object that contains any error that has occurred. */
104 static xmlrpc_env rpc_env;
105 
108 
111 
113 static struct arg_parser parser;
114 
116 static struct controller_options copt;
117 
120 
122 static struct arg_parser parser;
123 
125 static unsigned short active_flows = 0;
126 
127 /* To cover a gcc bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=36446) */
128 #pragma GCC diagnostic push
129 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
130 
131 static struct column column_info[] = {
132  {.type = COL_FLOW_ID, .header.name = "# ID",
133  .header.unit = "# ", .state.visible = true},
134  {.type = COL_BEGIN, .header.name = "begin",
135  .header.unit = "[s]", .state.visible = true},
136  {.type = COL_END, .header.name = "end",
137  .header.unit = "[s]", .state.visible = true},
138  {.type = COL_THROUGH, .header.name = "through",
139  .header.unit = "[Mbit/s]", .state.visible = true},
140  {.type = COL_TRANSAC, .header.name = "transac",
141  .header.unit = "[#/s]", .state.visible = true},
142  {.type = COL_BLOCK_REQU, .header.name = "requ",
143  .header.unit = "[#]", .state.visible = false},
144  {.type = COL_BLOCK_RESP, .header.name = "resp",
145  .header.unit = "[#]", .state.visible = false},
146  {.type = COL_RTT_MIN, .header.name = "min RTT",
147  .header.unit = "[ms]", .state.visible = false},
148  {.type = COL_RTT_AVG, .header.name = "avg RTT",
149  .header.unit = "[ms]", .state.visible = false},
150  {.type = COL_RTT_MAX, .header.name = "max RTT",
151  .header.unit = "[ms]", .state.visible = false},
152  {.type = COL_IAT_MIN, .header.name = "min IAT",
153  .header.unit = "[ms]", .state.visible = true},
154  {.type = COL_IAT_AVG, .header.name = "avg IAT",
155  .header.unit = "[ms]", .state.visible = true},
156  {.type = COL_IAT_MAX, .header.name = "max IAT",
157  .header.unit = "[ms]", .state.visible = true},
158  {.type = COL_DLY_MIN, .header.name = "min DLY",
159  .header.unit = "[ms]", .state.visible = false},
160  {.type = COL_DLY_AVG, .header.name = "avg DLY",
161  .header.unit = "[ms]", .state.visible = false},
162  {.type = COL_DLY_MAX, .header.name = "max DLY",
163  .header.unit = "[ms]", .state.visible = false},
164  {.type = COL_TCP_CWND, .header.name = "cwnd",
165  .header.unit = "[#]", .state.visible = true},
166  {.type = COL_TCP_SSTH, .header.name = "ssth",
167  .header.unit = "[#]", .state.visible = true},
168  {.type = COL_TCP_UACK, .header.name = "uack",
169  .header.unit = "[#]", .state.visible = true},
170  {.type = COL_TCP_SACK, .header.name = "sack",
171  .header.unit = "[#]", .state.visible = true},
172  {.type = COL_TCP_LOST, .header.name = "lost",
173  .header.unit = "[#]", .state.visible = true},
174  {.type = COL_TCP_RETR, .header.name = "retr",
175  .header.unit = "[#]", .state.visible = true},
176  {.type = COL_TCP_TRET, .header.name = "tret",
177  .header.unit = "[#]", .state.visible = true},
178  {.type = COL_TCP_FACK, .header.name = "fack",
179  .header.unit = "[#]", .state.visible = true},
180  {.type = COL_TCP_REOR, .header.name = "reor",
181  .header.unit = "[#]", .state.visible = true},
182  {.type = COL_TCP_BKOF, .header.name = "bkof",
183  .header.unit = "[#]", .state.visible = true},
184  {.type = COL_TCP_RTT, .header.name = "rtt",
185  .header.unit = "[ms]", .state.visible = true},
186  {.type = COL_TCP_RTTVAR, .header.name = "rttvar",
187  .header.unit = "[ms]", .state.visible = true},
188  {.type = COL_TCP_RTO, .header.name = "rto",
189  .header.unit = "[ms]", .state.visible = true},
190  {.type = COL_TCP_CA_STATE, .header.name = "ca state",
191  .header.unit = "", .state.visible = true},
192  {.type = COL_SMSS, .header.name = "smss",
193  .header.unit = "[B]", .state.visible = true},
194  {.type = COL_PMTU, .header.name = "pmtu",
195  .header.unit = "[B]", .state.visible = true},
196 #ifdef DEBUG
197  {.type = COL_STATUS, .header.name = "status",
198  .header.unit = "", .state.visible = false}
199 #endif /* DEBUG */
200 };
201 #pragma GCC diagnostic pop
202 
203 /* Forward declarations */
204 static void usage(short status)
205  __attribute__((noreturn));
206 static void usage_sockopt(void)
207  __attribute__((noreturn));
208 static void usage_trafgenopt(void)
209  __attribute__((noreturn));
210 inline static void print_output(const char *fmt, ...)
211  __attribute__((format(printf, 1, 2)));
212 static void fetch_reports(xmlrpc_client *);
213 static void report_flow(struct report* report);
214 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
215  struct report *report);
216 
225 static void usage(short status)
226 {
227  /* Syntax error. Emit 'try help' to stderr and exit */
228  if (status != EXIT_SUCCESS) {
229  fprintf(stderr, "Try '%s -h' for more information\n", progname);
230  exit(status);
231  }
232 
233  fprintf(stdout,
234  "Usage: %1$s [OPTION]...\n"
235  "Advanced TCP traffic generator for Linux, FreeBSD, and Mac OS X.\n\n"
236 
237  "Mandatory arguments to long options are mandatory for short options too.\n\n"
238 
239  "General options:\n"
240  " -h, --help[=WHAT]\n"
241  " display help and exit. Optional WHAT can either be 'socket' for\n"
242  " help on socket options or 'traffic' traffic generation help\n"
243  " -v, --version print version information and exit\n\n"
244 
245  "Controller options:\n"
246  " -c, --show-colon=TYPE[,TYPE]...\n"
247  " display intermediated interval report column TYPE in output.\n"
248  " Allowed values for TYPE are: 'interval', 'through', 'transac',\n"
249  " 'iat', 'kernel' (all show per default), and 'blocks', 'rtt',\n"
250 #ifdef DEBUG
251  " 'delay', 'status' (optional)\n"
252 #else /* DEBUG */
253  " 'delay' (optional)\n"
254 #endif /* DEBUG */
255 #ifdef DEBUG
256  " -d, --debug increase debugging verbosity. Add option multiple times to\n"
257  " increase the verbosity\n"
258 #endif /* DEBUG */
259  " -e, --dump-prefix=PRE\n"
260  " prepend prefix PRE to pcap dump filename (default: \"%3$s\")\n"
261  " -i, --report-interval=#.#\n"
262  " reporting interval, in seconds (default: 0.05s)\n"
263  " --log-file[=FILE]\n"
264  " write output to logfile FILE (default: %1$s-'timestamp'.log)\n"
265  " -m report throughput in 2**20 bytes/s (default: 10**6 bit/s)\n"
266  " -n, --flows=# number of test flows (default: 1)\n"
267  " -o overwrite existing log files (default: don't)\n"
268  " -p don't print symbolic values (like INT_MAX) instead of numbers\n"
269  " -q, --quiet be quiet, do not log to screen (default: off)\n"
270  " -s, --tcp-stack=TYPE\n"
271  " don't determine unit of source TCP stacks automatically. Force\n"
272  " unit to TYPE, where TYPE is 'segment' or 'byte'\n"
273  " -w write output to logfile (same as --log-file)\n\n"
274 
275  "Flow options:\n"
276  " Some of these options take the flow endpoint as argument, denoted by 'x' in\n"
277  " the option syntax. 'x' needs to be replaced with either 's' for the source\n"
278  " endpoint, 'd' for the destination endpoint or 'b' for both endpoints. To\n"
279  " specify different values for each endpoints, separate them by comma. For\n"
280  " instance -W s=8192,d=4096 sets the advertised window to 8192 at the source\n"
281  " and 4096 at the destination.\n\n"
282  " -A x use minimal response size needed for RTT calculation\n"
283  " (same as -G s=p,C,%2$d)\n"
284  " -B x=# set requested sending buffer, in bytes\n"
285  " -C x stop flow if it is experiencing local congestion\n"
286  " -D x=DSCP DSCP value for TOS byte\n"
287  " -E enumerate bytes in payload instead of sending zeros\n"
288  " -F #[,#]... flow options following this option apply only to the given flow \n"
289  " IDs. Useful in combination with -n to set specific options\n"
290  " for certain flows. Numbering starts with 0, so -F 1 refers\n"
291  " to the second flow. With -1 all flow are refered\n"
292 #ifdef HAVE_LIBGSL
293  " -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
294 #else /* HAVE_LIBGSL */
295  " -G x=(q|p|g):(C|U):#1:[#2]\n"
296 #endif /* HAVE_LIBGSL */
297  " activate stochastic traffic generation and set parameters\n"
298  " according to the used distribution. For additional information \n"
299  " see 'flowgrind --help=traffic'\n"
300  " -H x=HOST[/CONTROL[:PORT]]\n"
301  " test from/to HOST. Optional argument is the address and port\n"
302  " for the CONTROL connection to the same host.\n"
303  " An endpoint that isn't specified is assumed to be localhost\n"
304  " -J # use random seed # (default: read /dev/urandom)\n"
305  " -I enable one-way delay calculation (no clock synchronization)\n"
306  " -L call connect() on test socket immediately before starting to\n"
307  " send data (late connect). If not specified the test connection\n"
308  " is established in the preparation phase before the test starts\n"
309  " -M x dump traffic using libpcap. flowgrindd must be run as root\n"
310  " -N shutdown() each socket direction after test flow\n"
311  " -O x=OPT set socket option OPT on test socket. For additional information\n"
312  " see 'flowgrind --help=socket'\n"
313  " -P x do not iterate through select() to continue sending in case\n"
314  " block size did not suffice to fill sending queue (pushy)\n"
315  " -Q summarize only, no intermediated interval reports are\n"
316  " computed (quiet)\n"
317  " -R x=#.#(z|k|M|G)(b|B)\n"
318  " send at specified rate per second, where: z = 2**0, k = 2**10,\n"
319  " M = 2**20, G = 2**30, and b = bits/s (default), B = bytes/s\n"
320  " -S x=# set block (message) size, in bytes (same as -G s=q,C,#)\n"
321  " -T x=#.# set flow duration, in seconds (default: s=10,d=0)\n"
322  " -U x=# set application buffer size, in bytes (default: 8192)\n"
323  " truncates values if used with stochastic traffic generation\n"
324  " -W x=# set requested receiver buffer (advertised window), in bytes\n"
325  " -Y x=#.# set initial delay before the host starts to send, in seconds\n"
326 /* " -Z x=#.# set amount of data to be send, in bytes (instead of -t)\n"*/,
327  progname,
329  , copt.dump_prefix
330  );
331  exit(EXIT_SUCCESS);
332 }
333 
337 static void usage_sockopt(void)
338 {
339  fprintf(stdout,
340  "%s allows to set the following standard and non-standard socket options. \n\n"
341 
342  "All socket options take the flow endpoint as argument, denoted by 'x' in the\n"
343  "option syntax. 'x' needs to be replaced with either 's' for the source endpoint,\n"
344  "'d' for the destination endpoint or 'b' for both endpoints. To specify different\n"
345  "values for each endpoints, separate them by comma. Moreover, it is possible to\n"
346  "repeatedly pass the same endpoint in order to specify multiple socket options\n\n"
347 
348  "Standard socket options:\n"
349  " -O x=TCP_CONGESTION=ALG\n"
350  " set congestion control algorithm ALG on test socket\n"
351  " -O x=TCP_CORK\n"
352  " set TCP_CORK on test socket\n"
353  " -O x=TCP_NODELAY\n"
354  " disable nagle algorithm on test socket\n"
355  " -O x=SO_DEBUG\n"
356  " set SO_DEBUG on test socket\n"
357  " -O x=IP_MTU_DISCOVER\n"
358  " set IP_MTU_DISCOVER on test socket if not already enabled by\n"
359  " system default\n"
360  " -O x=ROUTE_RECORD\n"
361  " set ROUTE_RECORD on test socket\n\n"
362 
363  "Non-standard socket options:\n"
364  " -O x=TCP_MTCP\n"
365  " set TCP_MTCP (15) on test socket\n"
366  " -O x=TCP_ELCN\n"
367  " set TCP_ELCN (20) on test socket\n"
368  " -O x=TCP_LCD set TCP_LCD (21) on test socket\n\n"
369 
370  "Examples:\n"
371  " -O s=TCP_CONGESTION=reno,d=SO_DEBUG\n"
372  " sets Reno TCP as congestion control algorithm at the source and\n"
373  " SO_DEBUG as socket option at the destinatio\n"
374  " -O s=SO_DEBUG,s=TCP_CORK\n"
375  " set SO_DEBUG and TCP_CORK as socket option at the source\n",
376  progname);
377  exit(EXIT_SUCCESS);
378 }
379 
383 static void usage_trafgenopt(void)
384 {
385  fprintf(stdout,
386  "%s supports stochastic traffic generation, which allows to conduct\n"
387  "besides normal bulk also advanced rate-limited and request-response data\n"
388  "transfers.\n\n"
389 
390  "The stochastic traffic generation option '-G' takes the flow endpoint as\n"
391  "argument, denoted by 'x' in the option syntax. 'x' needs to be replaced with\n"
392  "either 's' for the source endpoint, 'd' for the destination endpoint or 'b' for\n"
393  "both endpoints. However, please note that bidirectional traffic generation can\n"
394  "lead to unexpected results. To specify different values for each endpoints,\n"
395  "separate them by comma.\n\n"
396 
397  "Stochastic traffic generation:\n"
398 #ifdef HAVE_LIBGSL
399  " -G x=(q|p|g):(C|U|E|N|L|P|W):#1:[#2]\n"
400 #else /* HAVE_LIBGSL */
401  " -G x=(q|p|g):(C|U):#1:[#2]\n"
402 #endif /* HAVE_LIBGSL */
403  " Flow parameter:\n"
404  " q = request size (in bytes)\n"
405  " p = response size (in bytes)\n"
406  " g = request interpacket gap (in seconds)\n\n"
407 
408  " Distributions:\n"
409  " C = constant (#1: value, #2: not used)\n"
410  " U = uniform (#1: min, #2: max)\n"
411 #ifdef HAVE_LIBGSL
412  " E = exponential (#1: lamba - lifetime, #2: not used)\n"
413  " N = normal (#1: mu - mean value, #2: sigma_square - variance)\n"
414  " L = lognormal (#1: zeta - mean, #2: sigma - std dev)\n"
415  " P = pareto (#1: k - shape, #2 x_min - scale)\n"
416  " W = weibull (#1: lambda - scale, #2: k - shape)\n"
417 #else /* HAVE_LIBGSL */
418  " advanced distributions are only available if compiled with libgsl\n"
419 #endif /* HAVE_LIBGSL */
420  " -U x=# specify a cap for the calculated values for request and response\n"
421  " size (not needed for constant values or uniform distribution),\n"
422  " values over this cap are recalculated\n\n"
423 
424  "Examples:\n"
425  " -G s=q:C:40\n"
426  " use contant request size of 40 bytes\n"
427  " -G s=p:N:2000:50\n"
428  " use normal distributed response size with mean 2000 bytes and\n"
429  " variance 50\n"
430  " -G s=g:U:0.005:0.01\n"
431  " use uniform distributed interpacket gap with minimum 0.005s and\n"
432  " maximum 0.01s\n\n"
433 
434  "Notes: \n"
435  " - The man page contains more explained examples\n"
436  " - Using bidirectional traffic generation can lead to unexpected results\n"
437  " - Usage of -G in conjunction with -A, -R, -S is not recommended, as they\n"
438  " overwrite each other. -A, -R and -S exist as shortcut only\n",
439  progname);
440  exit(EXIT_SUCCESS);
441 }
442 
448 static void sighandler(int sig)
449 {
450  UNUSED_ARGUMENT(sig);
451 
452  DEBUG_MSG(LOG_ERR, "caught %s", strsignal(sig));
453 
454  if (!sigint_caught) {
455  warnx("caught SIGINT, trying to gracefully close flows. "
456  "Press CTRL+C again to force termination \n");
457  sigint_caught = true;
458  } else {
459  exit(EXIT_FAILURE);
460  }
461 }
462 
466 static void init_controller_options(void)
467 {
468  copt.num_flows = 1;
469  copt.reporting_interval = 0.05;
470  copt.log_to_stdout = true;
471  copt.log_to_file = false;
472  copt.dump_prefix = "flowgrind-";
473  copt.clobber = false;
474  copt.mbyte = false;
475  copt.symbolic = true;
476  copt.force_unit = INT_MAX;
477 }
478 
486 static void init_flow_options(void)
487 {
488  for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
489 
490  cflow[id].proto = PROTO_TCP;
491 
492  foreach(int *i, SOURCE, DESTINATION) {
495  cflow[id].settings[*i].delay[WRITE] = 0;
496  cflow[id].settings[*i].maximum_block_size = 8192;
499  cflow[id].settings[*i].route_record = 0;
500  strcpy(cflow[id].endpoint[*i].test_address, "localhost");
501 
502  /* Default daemon is localhost, set in parse_cmdline */
503  cflow[id].endpoint[*i].rpc_info = 0;
504  cflow[id].endpoint[*i].daemon = 0;
505 
506  cflow[id].settings[*i].pushy = 0;
507  cflow[id].settings[*i].cork = 0;
508  cflow[id].settings[*i].cc_alg[0] = 0;
509  cflow[id].settings[*i].elcn = 0;
510  cflow[id].settings[*i].lcd = 0;
511  cflow[id].settings[*i].mtcp = 0;
512  cflow[id].settings[*i].nonagle = 0;
513  cflow[id].settings[*i].traffic_dump = 0;
514  cflow[id].settings[*i].so_debug = 0;
515  cflow[id].settings[*i].dscp = 0;
516  cflow[id].settings[*i].ipmtudiscover = 0;
517 
519  }
520  cflow[id].settings[SOURCE].duration[WRITE] = 10.0;
522 
523  cflow[id].endpoint_id[0] = cflow[id].endpoint_id[1] = -1;
524  cflow[id].start_timestamp[0].tv_sec = 0;
525  cflow[id].start_timestamp[0].tv_nsec = 0;
526  cflow[id].start_timestamp[1].tv_sec = 0;
527  cflow[id].start_timestamp[1].tv_nsec = 0;
528 
529  cflow[id].finished[0] = 0;
530  cflow[id].finished[1] = 0;
531  cflow[id].final_report[0] = NULL;
532  cflow[id].final_report[1] = NULL;
533 
534  cflow[id].summarize_only = 0;
535  cflow[id].late_connect = 0;
536  cflow[id].shutdown = 0;
537  cflow[id].byte_counting = 0;
538  cflow[id].random_seed = 0;
539 
540  int data = open("/dev/urandom", O_RDONLY);
541  int rc = read(data, &cflow[id].random_seed, sizeof (int) );
542  close(data);
543  if(rc == -1)
544  crit("read /dev/urandom failed");
545  }
546 }
547 
551 static void open_logfile(void)
552 {
553  if (!copt.log_to_file)
554  return;
555 
556  /* Log filename is not given by cmdline */
557  if (!log_filename) {
558  if (asprintf(&log_filename, "%s-%s.log", progname,
559  ctimenow(false)) == -1)
560  critx("could not allocate memory for log filename");
561  }
562 
563  if (!copt.clobber && access(log_filename, R_OK) == 0)
564  critx("log file exists");
565 
566  log_stream = fopen(log_filename, "w");
567  if (!log_stream)
568  critx("could not open logfile '%s'", log_filename);
569 
570  DEBUG_MSG(LOG_NOTICE, "logging to '%s'", log_filename);
571 }
572 
576 static void close_logfile(void)
577 {
578  if (!copt.log_to_file)
579  return;
580  if (fclose(log_stream) == -1)
581  critx("could not close logfile '%s'", log_filename);
582 
583  free(log_filename);
584 }
585 
592 inline static void print_output(const char *fmt, ...)
593 {
594  va_list ap;
595 
596  va_start(ap, fmt);
597  if (copt.log_to_stdout) {
598  vprintf(fmt, ap);
599  fflush(stdout);
600  }
601  if (copt.log_to_file) {
602  vfprintf(log_stream, fmt, ap);
603  fflush(log_stream);
604  }
605  va_end(ap);
606 }
607 
608 inline static void die_if_fault_occurred(xmlrpc_env *env)
609 {
610  if (env->fault_occurred)
611  critx("XML-RPC Fault: %s (%d)", env->fault_string, env->fault_code);
612 }
613 
614 /* creates an xmlrpc_client for connect to server, uses global env rpc_env */
615 static void prepare_xmlrpc_client(xmlrpc_client **rpc_client)
616 {
617  struct xmlrpc_clientparms clientParms;
618  size_t clientParms_cpsize = XMLRPC_CPSIZE(transport);
619 
620  /* Since version 1.21 xmlrpclib will automatically generate a
621  * rather long user_agent, we will do a lot of RPC calls so let's
622  * spare some bytes and omit this header */
623 #ifdef HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE
624  struct xmlrpc_curl_xportparms curlParms;
625  memset(&curlParms, 0, sizeof(curlParms));
626 
627  curlParms.dont_advertise = 1;
628  clientParms.transportparmsP = &curlParms;
629  clientParms.transportparm_size = XMLRPC_CXPSIZE(dont_advertise);
630  clientParms_cpsize = XMLRPC_CPSIZE(transportparm_size);
631 #endif /* HAVE_STRUCT_XMLRPC_CURL_XPORTPARMS_DONT_ADVERTISE */
632 
633  /* Force usage of curl transport, we require it in configure script
634  * anyway and at least FreeBSD 9.1 will use libwww otherwise */
635  clientParms.transport = "curl";
636 
637  DEBUG_MSG(LOG_WARNING, "prepare xmlrpc client");
638  xmlrpc_client_create(&rpc_env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind",
639  FLOWGRIND_VERSION, &clientParms,
640  clientParms_cpsize, rpc_client);
641 }
642 
652 static void check_version(xmlrpc_client *rpc_client)
653 {
654  xmlrpc_value * resultP = 0;
655  char mismatch = 0;
656 
657  const struct list_node *node = fg_list_front(&unique_daemons);
658 
659  while (node) {
660  if (sigint_caught)
661  return;
662 
663  struct daemon *daemon = node->data;
664  node = node->next;
665 
666  xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
667  "get_version", &resultP, "()");
668  if ((rpc_env.fault_occurred) && (strcasestr(rpc_env.fault_string,"response code is 400")))
669  critx("node %s could not parse request.You are "
670  "probably trying to use a numeric IPv6 address "
671  "and the node's libxmlrpc is too old, please "
672  "upgrade!", daemon->url);
673 
675 
676  /* Decomposes the xmlrpc value and extract the daemons data in
677  * it into controller local variable */
678  if (resultP) {
679  char* version;
680  int api_version;
681  char* os_name;
682  char* os_release;
683  xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,s:i,s:s,s:s,*}",
684  "version", &version,
685  "api_version", &api_version,
686  "os_name", &os_name,
687  "os_release", &os_release);
689 
690  if (strcmp(version, FLOWGRIND_VERSION)) {
691  mismatch = 1;
692  warnx("node %s uses version %s",
693  daemon->url, version);
694  }
695  /* Store the daemons XML RPC API version,
696  * OS name and release in daemons linked list */
697  daemon->api_version = api_version;
698  strncpy(daemon->os_name, os_name, 256);
699  strncpy(daemon->os_release, os_release, 256);
700  free_all(version, os_name, os_release);
701  xmlrpc_DECREF(resultP);
702  }
703  }
704 
705  if (mismatch) {
706  warnx("our version is %s\n\nContinuing in 5 seconds", FLOWGRIND_VERSION);
707  sleep(5);
708  }
709 }
710 
720 static struct daemon * add_daemon_by_uuid(const char* server_uuid,
721  char* daemon_url)
722 {
723  struct daemon *daemon;
724  daemon = malloc((sizeof(struct daemon)));
725 
726  if (!daemon) {
727  logging(LOG_ALERT, "could not allocate memory for daemon");
728  return 0;
729  }
730 
731  memset(daemon, 0, sizeof(struct daemon));
732  strcpy(daemon->uuid, server_uuid);
733  daemon->url = daemon_url;
735  return daemon;
736 }
737 
747 static struct daemon * set_unique_daemon_by_uuid(const char* server_uuid,
748  char* daemon_url)
749 {
750  /* Store the first daemon UUID and XML RPC url connection string.
751  * First daemon is used as reference to avoid the daemon duplication
752  * by their UUID */
753  if (fg_list_size(&unique_daemons) == 0)
754  return add_daemon_by_uuid(server_uuid, daemon_url);
755 
756  /* Compare the incoming daemons UUID with all daemons UUID in
757  * memory in order to prevent dupliclity in storing the daemons.
758  * If the incoming daemon UUID is already present in the daemons list,
759  * then return existing daemon pointer to controller connection.
760  * This is because a single daemons can run and maintain mutliple
761  * data connection */
762  const struct list_node *node = fg_list_front(&unique_daemons);
763  while (node) {
764  struct daemon *daemon = node->data;
765  node = node->next;
766  if (!strcmp(daemon->uuid, server_uuid))
767  return daemon;
768  }
769 
770  return add_daemon_by_uuid(server_uuid, daemon_url);
771 }
772 
779 static void set_flow_endpoint_daemon(const char* server_uuid, char* server_url)
780 {
781  /* Determine the daemon in controller flow data by UUID
782  * This prevent the daemons duplication */
783  for (unsigned id = 0; id < copt.num_flows; id++) {
784  foreach(int *i, SOURCE, DESTINATION) {
785  struct flow_endpoint* e = &cflow[id].endpoint[*i];
786  if(!strcmp(e->rpc_info->server_url, server_url) && !e->daemon) {
787  e->daemon = set_unique_daemon_by_uuid(server_uuid,
788  server_url);
789  }
790  }
791  }
792 }
793 
805 static void find_daemon(xmlrpc_client *rpc_client)
806 {
807  xmlrpc_value * resultP = 0;
808  const struct list_node *node = fg_list_front(&flows_rpc_info);
809 
810  while (node) {
811  if (sigint_caught)
812  return;
813 
814  struct rpc_info *flow_rpc_info= node->data;
815  node = node->next;
816  /* call daemons by flow option XML-RPC URL connection string */
817  xmlrpc_client_call2f(&rpc_env, rpc_client,
818  flow_rpc_info->server_url,
819  "get_uuid", &resultP, "()");
821 
822  /* Decomposes the xmlrpc_value and extract the daemon UUID
823  * in it into controller local variable */
824  if (resultP) {
825  char* server_uuid = 0;
826 
827  xmlrpc_decompose_value(&rpc_env, resultP, "{s:s,*}",
828  "server_uuid", &server_uuid);
829  set_flow_endpoint_daemon(server_uuid, flow_rpc_info->server_url);
831  xmlrpc_DECREF(resultP);
832  }
833  }
834 }
835 
845 static void check_idle(xmlrpc_client *rpc_client)
846 {
847  xmlrpc_value * resultP = 0;
848  const struct list_node *node = fg_list_front(&unique_daemons);
849 
850  while (node) {
851  if (sigint_caught)
852  return;
853 
854  struct daemon *daemon = node->data;
855  node = node->next;
856 
857  xmlrpc_client_call2f(&rpc_env, rpc_client,
858  daemon->url,
859  "get_status", &resultP, "()");
861 
862  /* Decomposes the xmlrpc_value and extract the daemons data
863  * in it into controller local variable */
864  if (resultP) {
865  int started;
866  int num_flows;
867 
868  xmlrpc_decompose_value(&rpc_env, resultP,
869  "{s:i,s:i,*}", "started",
870  &started, "num_flows",
871  &num_flows);
873 
874  /* Daemon start status and number of flows is used to
875  * determine node idle status */
876  if (started || num_flows)
877  critx("node %s is busy. %d flows, started=%d",
878  daemon->url, num_flows,
879  started);
880  xmlrpc_DECREF(resultP);
881  }
882  }
883 }
884 
893 static void set_column_visibility(bool visibility, unsigned nargs, ...)
894 {
895  va_list ap;
896  enum column_id col_id;
897 
898  va_start(ap, nargs);
899  while (nargs--) {
900  col_id = va_arg(ap, enum column_id);
901  column_info[col_id].state.visible = visibility;
902  }
903  va_end(ap);
904 }
905 
914 static void set_column_unit(const char *unit, unsigned nargs, ...)
915 {
916  va_list ap;
917  enum column_id col_id;
918 
919  va_start(ap, nargs);
920  while (nargs--) {
921  col_id = va_arg(ap, enum column_id);
922  column_info[col_id].header.unit = unit;
923  }
924  va_end(ap);
925 }
926 
931 static void print_headline(void)
932 {
933  /* Print headline */
934  struct utsname me;
935  int rc = uname(&me);
936  print_output("# Date: %s, controlling host = %s, number of flows = %d, "
937  "reporting interval = %.2fs, [through] = %s (%s)\n",
938  ctimenow(false), (rc == -1 ? "(unknown)" : me.nodename),
940  (copt.mbyte ? "2**20 bytes/second": "10**6 bit/second"),
942 
943  /* Prepare column visibility based on involved OSes */
944  bool involved_os[] = {[0 ... NUM_OSes-1] = false};
945  const struct list_node *node = fg_list_front(&unique_daemons);
946  while (node) {
947  struct daemon *daemon = node->data;
948  node = node->next;
949  if (!strcmp(daemon->os_name, "Linux"))
950  involved_os[LINUX] = true;
951  else if (!strcmp(daemon->os_name, "FreeBSD"))
952  involved_os[FREEBSD] = true;
953  else if (!strcmp(daemon->os_name, "Darwin"))
954  involved_os[DARWIN] = true;
955  }
956 
957  /* No Linux OS is involved in the test */
958  if (!involved_os[LINUX])
962  COL_PMTU);
963 
964  /* No Linux and FreeBSD OS is involved in the test */
965  if (!involved_os[FREEBSD] && !involved_os[LINUX])
968 
969  const struct list_node *firstnode = fg_list_front(&unique_daemons);
970  struct daemon *daemon_firstnode = firstnode->data;
971 
972  /* Set unit for kernel TCP metrics to bytes */
974  strcmp(daemon_firstnode->os_name, "Linux")))
979 }
980 
991 static void prepare_flow(int id, xmlrpc_client *rpc_client)
992 {
993  xmlrpc_value *resultP, *extra_options;
994 
995  int listen_data_port;
996  DEBUG_MSG(LOG_WARNING, "prepare flow %d destination", id);
997 
998  /* Contruct extra socket options array */
999  extra_options = xmlrpc_array_new(&rpc_env);
1000  for (int i = 0; i < cflow[id].settings[DESTINATION].num_extra_socket_options; i++) {
1001  xmlrpc_value *value;
1002  xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1003  "level", cflow[id].settings[DESTINATION].extra_socket_options[i].level,
1004  "optname", cflow[id].settings[DESTINATION].extra_socket_options[i].optname);
1005 
1006  value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[DESTINATION].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[DESTINATION].extra_socket_options[i].optval);
1007 
1008  xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1009 
1010  xmlrpc_array_append_item(&rpc_env, extra_options, option);
1011  xmlrpc_DECREF(value);
1012  xmlrpc_DECREF(option);
1013  }
1014  xmlrpc_client_call2f(&rpc_env, rpc_client,
1015  cflow[id].endpoint[DESTINATION].rpc_info->server_url,
1016  "add_flow_destination", &resultP,
1017  "("
1018  "{s:s}"
1019  "{s:i}"
1020  "{s:d,s:d,s:d,s:d,s:d}"
1021  "{s:i,s:i}"
1022  "{s:i}"
1023  "{s:b,s:b,s:b,s:b,s:b}"
1024  "{s:i,s:i}"
1025  "{s:i,s:d,s:d}" /* request */
1026  "{s:i,s:d,s:d}" /* response */
1027  "{s:i,s:d,s:d}" /* interpacket_gap */
1028  "{s:b,s:b,s:i,s:i}"
1029  "{s:s}"
1030  "{s:i,s:i,s:i,s:i,s:i}"
1031  "{s:s}"
1032  "{s:i,s:A}"
1033  ")",
1034 
1035  /* general flow settings */
1036  "bind_address", cflow[id].endpoint[DESTINATION].test_address,
1037 
1038  "flow_id",id,
1039 
1040  "write_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1041  "write_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1042  "read_delay", cflow[id].settings[SOURCE].delay[WRITE],
1043  "read_duration", cflow[id].settings[SOURCE].duration[WRITE],
1044  "reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1045 
1046  "requested_send_buffer_size", cflow[id].settings[DESTINATION].requested_send_buffer_size,
1047  "requested_read_buffer_size", cflow[id].settings[DESTINATION].requested_read_buffer_size,
1048 
1049  "maximum_block_size", cflow[id].settings[DESTINATION].maximum_block_size,
1050 
1051  "traffic_dump", cflow[id].settings[DESTINATION].traffic_dump,
1052  "so_debug", cflow[id].settings[DESTINATION].so_debug,
1053  "route_record", (int)cflow[id].settings[DESTINATION].route_record,
1054  "pushy", cflow[id].settings[DESTINATION].pushy,
1055  "shutdown", (int)cflow[id].shutdown,
1056 
1057  "write_rate", cflow[id].settings[DESTINATION].write_rate,
1058  "random_seed",cflow[id].random_seed,
1059 
1060  "traffic_generation_request_distribution", cflow[id].settings[DESTINATION].request_trafgen_options.distribution,
1061  "traffic_generation_request_param_one", cflow[id].settings[DESTINATION].request_trafgen_options.param_one,
1062  "traffic_generation_request_param_two", cflow[id].settings[DESTINATION].request_trafgen_options.param_two,
1063 
1064  "traffic_generation_response_distribution", cflow[id].settings[DESTINATION].response_trafgen_options.distribution,
1065  "traffic_generation_response_param_one", cflow[id].settings[DESTINATION].response_trafgen_options.param_one,
1066  "traffic_generation_response_param_two", cflow[id].settings[DESTINATION].response_trafgen_options.param_two,
1067 
1068  "traffic_generation_gap_distribution", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.distribution,
1069  "traffic_generation_gap_param_one", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_one,
1070  "traffic_generation_gap_param_two", cflow[id].settings[DESTINATION].interpacket_gap_trafgen_options.param_two,
1071 
1072  "flow_control", cflow[id].settings[DESTINATION].flow_control,
1073  "byte_counting", cflow[id].byte_counting,
1074  "cork", (int)cflow[id].settings[DESTINATION].cork,
1075  "nonagle", cflow[id].settings[DESTINATION].nonagle,
1076 
1077  "cc_alg", cflow[id].settings[DESTINATION].cc_alg,
1078 
1079  "elcn", cflow[id].settings[DESTINATION].elcn,
1080  "lcd", cflow[id].settings[DESTINATION].lcd,
1081  "mtcp", cflow[id].settings[DESTINATION].mtcp,
1082  "dscp", (int)cflow[id].settings[DESTINATION].dscp,
1083  "ipmtudiscover", cflow[id].settings[DESTINATION].ipmtudiscover,
1084  "dump_prefix", copt.dump_prefix,
1085  "num_extra_socket_options", cflow[id].settings[DESTINATION].num_extra_socket_options,
1086  "extra_socket_options", extra_options);
1087 
1089 
1090  xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,s:i,*}",
1091  "flow_id", &cflow[id].endpoint_id[DESTINATION],
1092  "listen_data_port", &listen_data_port,
1093  "real_listen_send_buffer_size", &cflow[id].endpoint[DESTINATION].send_buffer_size_real,
1094  "real_listen_read_buffer_size", &cflow[id].endpoint[DESTINATION].receive_buffer_size_real);
1096 
1097  if (resultP)
1098  xmlrpc_DECREF(resultP);
1099 
1100  /* Contruct extra socket options array */
1101  extra_options = xmlrpc_array_new(&rpc_env);
1102  for (int i = 0; i < cflow[id].settings[SOURCE].num_extra_socket_options; i++) {
1103 
1104  xmlrpc_value *value;
1105  xmlrpc_value *option = xmlrpc_build_value(&rpc_env, "{s:i,s:i}",
1106  "level", cflow[id].settings[SOURCE].extra_socket_options[i].level,
1107  "optname", cflow[id].settings[SOURCE].extra_socket_options[i].optname);
1108 
1109  value = xmlrpc_base64_new(&rpc_env, cflow[id].settings[SOURCE].extra_socket_options[i].optlen, (unsigned char*)cflow[id].settings[SOURCE].extra_socket_options[i].optval);
1110 
1111  xmlrpc_struct_set_value(&rpc_env, option, "value", value);
1112 
1113  xmlrpc_array_append_item(&rpc_env, extra_options, option);
1114  xmlrpc_DECREF(value);
1115  xmlrpc_DECREF(option);
1116  }
1117  DEBUG_MSG(LOG_WARNING, "prepare flow %d source", id);
1118 
1119  xmlrpc_client_call2f(&rpc_env, rpc_client,
1120  cflow[id].endpoint[SOURCE].rpc_info->server_url,
1121  "add_flow_source", &resultP,
1122  "("
1123  "{s:s}"
1124  "{s:i}"
1125  "{s:d,s:d,s:d,s:d,s:d}"
1126  "{s:i,s:i}"
1127  "{s:i}"
1128  "{s:b,s:b,s:b,s:b,s:b}"
1129  "{s:i,s:i}"
1130  "{s:i,s:d,s:d}" /* request */
1131  "{s:i,s:d,s:d}" /* response */
1132  "{s:i,s:d,s:d}" /* interpacket_gap */
1133  "{s:b,s:b,s:i,s:i}"
1134  "{s:s}"
1135  "{s:i,s:i,s:i,s:i,s:i}"
1136  "{s:s}"
1137  "{s:i,s:A}"
1138  "{s:s,s:i,s:i}"
1139  ")",
1140 
1141  /* general flow settings */
1142  "bind_address", cflow[id].endpoint[SOURCE].test_address,
1143 
1144  "flow_id",id,
1145 
1146  "write_delay", cflow[id].settings[SOURCE].delay[WRITE],
1147  "write_duration", cflow[id].settings[SOURCE].duration[WRITE],
1148  "read_delay", cflow[id].settings[DESTINATION].delay[WRITE],
1149  "read_duration", cflow[id].settings[DESTINATION].duration[WRITE],
1150  "reporting_interval", cflow[id].summarize_only ? 0 : copt.reporting_interval,
1151 
1152  "requested_send_buffer_size", cflow[id].settings[SOURCE].requested_send_buffer_size,
1153  "requested_read_buffer_size", cflow[id].settings[SOURCE].requested_read_buffer_size,
1154 
1155  "maximum_block_size", cflow[id].settings[SOURCE].maximum_block_size,
1156 
1157  "traffic_dump", cflow[id].settings[SOURCE].traffic_dump,
1158  "so_debug", cflow[id].settings[SOURCE].so_debug,
1159  "route_record", (int)cflow[id].settings[SOURCE].route_record,
1160  "pushy", cflow[id].settings[SOURCE].pushy,
1161  "shutdown", (int)cflow[id].shutdown,
1162 
1163  "write_rate", cflow[id].settings[SOURCE].write_rate,
1164  "random_seed",cflow[id].random_seed,
1165 
1166  "traffic_generation_request_distribution", cflow[id].settings[SOURCE].request_trafgen_options.distribution,
1167  "traffic_generation_request_param_one", cflow[id].settings[SOURCE].request_trafgen_options.param_one,
1168  "traffic_generation_request_param_two", cflow[id].settings[SOURCE].request_trafgen_options.param_two,
1169 
1170  "traffic_generation_response_distribution", cflow[id].settings[SOURCE].response_trafgen_options.distribution,
1171  "traffic_generation_response_param_one", cflow[id].settings[SOURCE].response_trafgen_options.param_one,
1172  "traffic_generation_response_param_two", cflow[id].settings[SOURCE].response_trafgen_options.param_two,
1173 
1174  "traffic_generation_gap_distribution", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.distribution,
1175  "traffic_generation_gap_param_one", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_one,
1176  "traffic_generation_gap_param_two", cflow[id].settings[SOURCE].interpacket_gap_trafgen_options.param_two,
1177 
1178 
1179  "flow_control", cflow[id].settings[SOURCE].flow_control,
1180  "byte_counting", cflow[id].byte_counting,
1181  "cork", (int)cflow[id].settings[SOURCE].cork,
1182  "nonagle", (int)cflow[id].settings[SOURCE].nonagle,
1183 
1184  "cc_alg", cflow[id].settings[SOURCE].cc_alg,
1185 
1186  "elcn", cflow[id].settings[SOURCE].elcn,
1187  "lcd", cflow[id].settings[SOURCE].lcd,
1188  "mtcp", cflow[id].settings[SOURCE].mtcp,
1189  "dscp", (int)cflow[id].settings[SOURCE].dscp,
1190  "ipmtudiscover", cflow[id].settings[SOURCE].ipmtudiscover,
1191  "dump_prefix", copt.dump_prefix,
1192  "num_extra_socket_options", cflow[id].settings[SOURCE].num_extra_socket_options,
1193  "extra_socket_options", extra_options,
1194 
1195  /* source settings */
1196  "destination_address", cflow[id].endpoint[DESTINATION].test_address,
1197  "destination_port", listen_data_port,
1198  "late_connect", (int)cflow[id].late_connect);
1200 
1201  xmlrpc_DECREF(extra_options);
1202 
1203  xmlrpc_parse_value(&rpc_env, resultP, "{s:i,s:i,s:i,*}",
1204  "flow_id", &cflow[id].endpoint_id[SOURCE],
1205  "real_send_buffer_size", &cflow[id].endpoint[SOURCE].send_buffer_size_real,
1206  "real_read_buffer_size", &cflow[id].endpoint[SOURCE].receive_buffer_size_real);
1208 
1209  if (resultP)
1210  xmlrpc_DECREF(resultP);
1211  DEBUG_MSG(LOG_WARNING, "prepare flow %d completed", id);
1212 }
1213 
1219 static void prepare_all_flows(xmlrpc_client *rpc_client)
1220 {
1221  /* prepare flows */
1222  for (unsigned short id = 0; id < copt.num_flows; id++) {
1223  if (sigint_caught)
1224  return;
1225  prepare_flow(id, rpc_client);
1226  }
1227 }
1228 
1240 static void start_all_flows(xmlrpc_client *rpc_client)
1241 {
1242  xmlrpc_value * resultP = 0;
1243 
1244  struct timespec lastreport_end;
1245  struct timespec lastreport_begin;
1246  struct timespec now;
1247 
1248  gettime(&lastreport_end);
1249  gettime(&lastreport_begin);
1250  gettime(&now);
1251 
1252  const struct list_node *node = fg_list_front(&unique_daemons);
1253  while (node) {
1254  if (sigint_caught)
1255  return;
1256  struct daemon *daemon = node->data;
1257  node = node->next;
1258 
1259  DEBUG_MSG(LOG_ERR, "starting flow on server with UUID %s",daemon->uuid);
1260  xmlrpc_client_call2f(&rpc_env, rpc_client,
1261  daemon->url,
1262  "start_flows", &resultP, "({s:i})",
1263  "start_timestamp", now.tv_sec + 2);
1265  if (resultP)
1266  xmlrpc_DECREF(resultP);
1267  }
1268 
1270 
1271  /* Reports are fetched from the daemons based on the
1272  * report interval duration */
1273  while (!sigint_caught) {
1274  if ( time_diff_now(&lastreport_begin) < copt.reporting_interval ) {
1275  usleep(copt.reporting_interval - time_diff(&lastreport_begin,&lastreport_end) );
1276  continue;
1277  }
1278  gettime(&lastreport_begin);
1279  fetch_reports(rpc_client);
1280  gettime(&lastreport_end);
1281 
1282  /* All flows have ended */
1283  if (active_flows < 1)
1284  return;
1285  }
1286 }
1287 
1297 static void fetch_reports(xmlrpc_client *rpc_client)
1298 {
1299 
1300  xmlrpc_value * resultP = 0;
1301  const struct list_node *node = fg_list_front(&unique_daemons);
1302 
1303  while (node) {
1304  struct daemon *daemon = node->data;
1305  node = node->next;
1306  int array_size, has_more;
1307  xmlrpc_value *rv = 0;
1308 
1309 has_more_reports:
1310 
1311  xmlrpc_client_call2f(&rpc_env, rpc_client, daemon->url,
1312  "get_reports", &resultP, "()");
1313  if (rpc_env.fault_occurred) {
1314  errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1315  rpc_env.fault_code);
1316  continue;
1317  }
1318 
1319  if (!resultP)
1320  continue;
1321 
1322  array_size = xmlrpc_array_size(&rpc_env, resultP);
1323  if (!array_size) {
1324  warnx("empty array in get_reports reply");
1325  continue;
1326  }
1327 
1328  xmlrpc_array_read_item(&rpc_env, resultP, 0, &rv);
1329  xmlrpc_read_int(&rpc_env, rv, &has_more);
1330  if (rpc_env.fault_occurred) {
1331  errx("XML-RPC fault: %s (%d)", rpc_env.fault_string,
1332  rpc_env.fault_code);
1333  xmlrpc_DECREF(rv);
1334  continue;
1335  }
1336  xmlrpc_DECREF(rv);
1337 
1338  for (int i = 1; i < array_size; i++) {
1339  xmlrpc_value *rv = 0;
1340 
1341  xmlrpc_array_read_item(&rpc_env, resultP, i, &rv);
1342  if (rv) {
1343  struct report report;
1344  int begin_sec, begin_nsec, end_sec, end_nsec;
1345  int tcpi_snd_cwnd;
1346  int tcpi_snd_ssthresh;
1347  int tcpi_unacked;
1348  int tcpi_sacked;
1349  int tcpi_lost;
1350  int tcpi_retrans;
1351  int tcpi_retransmits;
1352  int tcpi_fackets;
1353  int tcpi_reordering;
1354  int tcpi_rtt;
1355  int tcpi_rttvar;
1356  int tcpi_rto;
1357  int tcpi_backoff;
1358  int tcpi_ca_state;
1359  int tcpi_snd_mss;
1360  int bytes_read_low, bytes_read_high;
1361  int bytes_written_low, bytes_written_high;
1362 
1363  xmlrpc_decompose_value(&rpc_env, rv,
1364  "("
1365  "{s:i,s:i,s:i,s:i,s:i,s:i,s:i,*}" /* Report data & timeval */
1366  "{s:i,s:i,s:i,s:i,*}" /* bytes */
1367  "{s:i,s:i,s:i,s:i,*}" /* blocks */
1368  "{s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,s:d,*}" /* RTT, IAT, Delay */
1369  "{s:i,s:i,*}" /* MTU */
1370  "{s:i,s:i,s:i,s:i,s:i,*}" /* TCP info */
1371  "{s:i,s:i,s:i,s:i,s:i,*}" /* ... */
1372  "{s:i,s:i,s:i,s:i,s:i,*}" /* ... */
1373  "{s:i,*}"
1374  ")",
1375 
1376  "id", &report.id,
1377  "endpoint", &report.endpoint,
1378  "type", &report.type,
1379  "begin_tv_sec", &begin_sec,
1380  "begin_tv_nsec", &begin_nsec,
1381  "end_tv_sec", &end_sec,
1382  "end_tv_nsec", &end_nsec,
1383 
1384  "bytes_read_high", &bytes_read_high,
1385  "bytes_read_low", &bytes_read_low,
1386  "bytes_written_high", &bytes_written_high,
1387  "bytes_written_low", &bytes_written_low,
1388 
1389  "request_blocks_read", &report.request_blocks_read,
1390  "request_blocks_written", &report.request_blocks_written,
1391  "response_blocks_read", &report.response_blocks_read,
1392  "response_blocks_written", &report.response_blocks_written,
1393 
1394  "rtt_min", &report.rtt_min,
1395  "rtt_max", &report.rtt_max,
1396  "rtt_sum", &report.rtt_sum,
1397  "iat_min", &report.iat_min,
1398  "iat_max", &report.iat_max,
1399  "iat_sum", &report.iat_sum,
1400  "delay_min", &report.delay_min,
1401  "delay_max", &report.delay_max,
1402  "delay_sum", &report.delay_sum,
1403 
1404  "pmtu", &report.pmtu,
1405  "imtu", &report.imtu,
1406 
1407  "tcpi_snd_cwnd", &tcpi_snd_cwnd,
1408  "tcpi_snd_ssthresh", &tcpi_snd_ssthresh,
1409  "tcpi_unacked", &tcpi_unacked,
1410  "tcpi_sacked", &tcpi_sacked,
1411  "tcpi_lost", &tcpi_lost,
1412 
1413  "tcpi_retrans", &tcpi_retrans,
1414  "tcpi_retransmits", &tcpi_retransmits,
1415  "tcpi_fackets", &tcpi_fackets,
1416  "tcpi_reordering", &tcpi_reordering,
1417  "tcpi_rtt", &tcpi_rtt,
1418 
1419  "tcpi_rttvar", &tcpi_rttvar,
1420  "tcpi_rto", &tcpi_rto,
1421  "tcpi_backoff", &tcpi_backoff,
1422  "tcpi_ca_state", &tcpi_ca_state,
1423  "tcpi_snd_mss", &tcpi_snd_mss,
1424 
1425  "status", &report.status
1426  );
1427  xmlrpc_DECREF(rv);
1428 #ifdef HAVE_UNSIGNED_LONG_LONG_INT
1429  report.bytes_read = ((long long)bytes_read_high << 32) + (uint32_t)bytes_read_low;
1430  report.bytes_written = ((long long)bytes_written_high << 32) + (uint32_t)bytes_written_low;
1431 #else /* HAVE_UNSIGNED_LONG_LONG_INT */
1432  report.bytes_read = (uint32_t)bytes_read_low;
1433  report.bytes_written = (uint32_t)bytes_written_low;
1434 #endif /* HAVE_UNSIGNED_LONG_LONG_INT */
1435 
1436  /* FIXME Kernel metrics (tcp_info). Other OS than
1437  * Linux may not send valid values here. For
1438  * the moment we don't care and handle this in
1439  * the output/display routines. However, this
1440  * do not work in heterogeneous environments */
1441  report.tcp_info.tcpi_snd_cwnd = tcpi_snd_cwnd;
1442  report.tcp_info.tcpi_snd_ssthresh = tcpi_snd_ssthresh;
1443  report.tcp_info.tcpi_unacked = tcpi_unacked;
1444  report.tcp_info.tcpi_sacked = tcpi_sacked;
1445  report.tcp_info.tcpi_lost = tcpi_lost;
1446  report.tcp_info.tcpi_retrans = tcpi_retrans;
1447  report.tcp_info.tcpi_retransmits = tcpi_retransmits;
1448  report.tcp_info.tcpi_fackets = tcpi_fackets;
1449  report.tcp_info.tcpi_reordering = tcpi_reordering;
1450  report.tcp_info.tcpi_rtt = tcpi_rtt;
1451  report.tcp_info.tcpi_rttvar = tcpi_rttvar;
1452  report.tcp_info.tcpi_rto = tcpi_rto;
1453  report.tcp_info.tcpi_backoff = tcpi_backoff;
1454  report.tcp_info.tcpi_ca_state = tcpi_ca_state;
1455  report.tcp_info.tcpi_snd_mss = tcpi_snd_mss;
1456 
1457  report.begin.tv_sec = begin_sec;
1458  report.begin.tv_nsec = begin_nsec;
1459  report.end.tv_sec = end_sec;
1460  report.end.tv_nsec = end_nsec;
1461 
1462  report_flow(&report);
1463  }
1464  }
1465  xmlrpc_DECREF(resultP);
1466 
1467  if (has_more)
1468  goto has_more_reports;
1469  }
1470 }
1471 
1487 static void report_flow(struct report* report)
1488 {
1489  int *i = NULL;
1490  unsigned short id;
1491  struct cflow *f = NULL;
1492 
1493  /* Get matching flow for report */
1494  /* TODO Maybe just use compare daemon pointers? */
1495  for (id = 0; id < copt.num_flows; id++) {
1496  f = &cflow[id];
1497 
1498  foreach(i, SOURCE, DESTINATION)
1499  if (f->endpoint_id[*i] == report->id &&
1500  *i == (int)report->endpoint)
1501  goto exit_outer_loop;
1502  }
1503 exit_outer_loop:
1504 
1505  if (f->start_timestamp[*i].tv_sec == 0)
1506  f->start_timestamp[*i] = report->begin;
1507 
1508  if (report->type == FINAL) {
1509  DEBUG_MSG(LOG_DEBUG, "received final report for flow %d", id);
1510  /* Final report, keep it for later */
1511  free(f->final_report[*i]);
1512  f->final_report[*i] = malloc(sizeof(struct report));
1513  *f->final_report[*i] = *report;
1514 
1515  if (!f->finished[*i]) {
1516  f->finished[*i] = 1;
1517  if (f->finished[1 - *i]) {
1518  active_flows--;
1519  DEBUG_MSG(LOG_DEBUG, "remaining active flows: "
1520  "%d", active_flows);
1521  assert(active_flows >= 0);
1522  }
1523  }
1524  return;
1525  }
1526  print_interval_report(id, *i, report);
1527 }
1528 
1538 static void close_all_flows(void)
1539 {
1540  xmlrpc_env env;
1541  xmlrpc_client *client;
1542 
1543  for (unsigned short id = 0; id < copt.num_flows; id++) {
1544  DEBUG_MSG(LOG_WARNING, "closing flow %u", id);
1545 
1546  if (cflow[id].finished[SOURCE] && cflow[id].finished[DESTINATION])
1547  continue;
1548 
1549  /* We use new env and client, old one might be in fault condition */
1550  xmlrpc_env_init(&env);
1551  xmlrpc_client_create(&env, XMLRPC_CLIENT_NO_FLAGS, "Flowgrind", FLOWGRIND_VERSION, NULL, 0, &client);
1552  die_if_fault_occurred(&env);
1553  xmlrpc_env_clean(&env);
1554 
1555  foreach(int *i, SOURCE, DESTINATION) {
1556  xmlrpc_value * resultP = 0;
1557 
1558  if (cflow[id].endpoint_id[*i] == -1 ||
1559  cflow[id].finished[*i])
1560  /* Endpoint does not need closing */
1561  continue;
1562 
1563  cflow[id].finished[*i] = 1;
1564 
1565  xmlrpc_env_init(&env);
1566  xmlrpc_client_call2f(&env, client,
1567  cflow[id].endpoint[*i].rpc_info->server_url,
1568  "stop_flow", &resultP, "({s:i})",
1569  "flow_id", cflow[id].endpoint_id[*i]);
1570  if (resultP)
1571  xmlrpc_DECREF(resultP);
1572 
1573  xmlrpc_env_clean(&env);
1574  }
1575 
1576  if (active_flows > 0)
1577  active_flows--;
1578 
1579  xmlrpc_client_destroy(client);
1580  DEBUG_MSG(LOG_WARNING, "closed flow %u", id);
1581  }
1582 }
1583 
1590 inline static size_t det_num_digits(double value)
1591 {
1592  /* Avoiding divide-by-zero */
1593  if (unlikely((int)value == 0))
1594  return 1;
1595  else
1596  return floor(log10(abs((int)value))) + 1;
1597 }
1598 
1606 inline static double scale_thruput(double thruput)
1607 {
1608  if (copt.mbyte)
1609  return thruput / (1<<20);
1610  return thruput / 1e6 * (1<<3);
1611 }
1612 
1621 static bool update_column_width(struct column *column, unsigned column_width)
1622 {
1623  /* True if column width has changed */
1624  bool has_changed = false;
1625 
1626  if (column->state.last_width < column_width) {
1627  /* Column too small */
1628  has_changed = true;
1629  column->state.last_width = column_width;
1630  column->state.oversized = 0;
1631  } else if (column->state.last_width > 1 + column_width) {
1632  /* Column too big */
1633  if (column->state.oversized >= MAX_COLUM_TOO_LARGE) {
1634  /* Column too big for quite a while */
1635  has_changed = true;
1636  column->state.last_width = column_width;
1637  column->state.oversized = 0;
1638  } else {
1639  (column->state.oversized)++;
1640  }
1641  } else {
1642  /* This size was needed, keep it */
1643  column->state.oversized = 0;
1644  }
1645 
1646  return has_changed;
1647 }
1648 
1663 static bool print_column_str(char **header1, char **header2, char **data,
1664  enum column_id column_id, char* value)
1665 {
1666  /* Only for convenience */
1667  struct column *column = &column_info[column_id];
1668 
1669  if (!column->state.visible)
1670  return false;
1671 
1672  /* Get max column width */
1673  unsigned data_len = strlen(value);
1674  unsigned header_len = MAX(strlen(column->header.name),
1675  strlen(column->header.unit));
1676  unsigned column_width = MAX(data_len, header_len);
1677 
1678  /* Check if column width has changed */
1679  bool has_changed = update_column_width(column, column_width);
1680 
1681  /* Create format specifiers of right length */
1682  char *fmt_str = NULL;
1683  const size_t width = column->state.last_width;
1684  if (asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1685  critx("could not allocate memory for interval report");
1686 
1687  /* Print data, 1st and 2nd header row */
1688  asprintf_append(data, fmt_str, value);
1689  asprintf_append(header1, fmt_str, column->header.name);
1690  asprintf_append(header2, fmt_str, column->header.unit);
1691 
1692  free(fmt_str);
1693  return has_changed;
1694 }
1695 
1711 static bool print_column(char **header1, char **header2, char **data,
1712  enum column_id column_id, double value,
1713  unsigned accuracy)
1714 {
1715  /* Print symbolic values instead of numbers */
1716  if (copt.symbolic) {
1717  switch ((int)value) {
1718  case INT_MAX:
1719  return print_column_str(header1, header2, data,
1720  column_id, "INT_MAX");
1721  case USHRT_MAX:
1722  return print_column_str(header1, header2, data,
1723  column_id, "USHRT_MAX");
1724  case UINT_MAX:
1725  return print_column_str(header1, header2, data,
1726  column_id, "UINT_MAX");
1727  }
1728  }
1729 
1730  /* Only for convenience */
1731  struct column *column = &column_info[column_id];
1732 
1733  if (!column->state.visible)
1734  return false;
1735 
1736  /* Get max column width */
1737  unsigned data_len = det_num_digits(value) + (accuracy ? accuracy + 1 : 0);
1738  unsigned header_len = MAX(strlen(column->header.name),
1739  strlen(column->header.unit));
1740  unsigned column_width = MAX(data_len, header_len);
1741 
1742  /* Check if column width has changed */
1743  bool has_changed = update_column_width(column, column_width);
1744 
1745  /* Create format specifiers of right length */
1746  char *fmt_num = NULL, *fmt_str = NULL;
1747  const size_t width = column->state.last_width;
1748  if (asprintf(&fmt_num, "%%%zu.%df", width + GUARDBAND, accuracy) == -1 ||
1749  asprintf(&fmt_str, "%%%zus", width + GUARDBAND) == -1)
1750  critx("could not allocate memory for interval report");
1751 
1752  /* Print data, 1st and 2nd header row */
1753  asprintf_append(data, fmt_num, value);
1754  asprintf_append(header1, fmt_str, column->header.name);
1755  asprintf_append(header2, fmt_str, column->header.unit);
1756 
1757  free_all(fmt_num, fmt_str);
1758  return has_changed;
1759 }
1760 
1771 static void print_interval_report(unsigned short flow_id, enum endpoint_t e,
1772  struct report *report)
1773 {
1774  /* Whether or not column width has been changed */
1775  bool changed = false;
1776  /* 1st header row, 2nd header row, and the actual measured data */
1777  char *header1 = NULL, *header2 = NULL, *data = NULL;
1778 
1779  /* Flow ID and endpoint (source or destination) */
1780  if (asprintf(&header1, "%s", column_info[COL_FLOW_ID].header.name) == -1 ||
1781  asprintf(&header2, "%s", column_info[COL_FLOW_ID].header.unit) == -1 ||
1782  asprintf(&data, "%s%3d", e ? "D" : "S", flow_id) == -1)
1783  critx("could not allocate memory for interval report");
1784 
1785  /* Calculate time */
1786  double diff_first_last = time_diff(&cflow[flow_id].start_timestamp[e],
1787  &report->begin);
1788  double diff_first_now = time_diff(&cflow[flow_id].start_timestamp[e],
1789  &report->end);
1790  changed |= print_column(&header1, &header2, &data, COL_BEGIN,
1791  diff_first_last, 3);
1792  changed |= print_column(&header1, &header2, &data, COL_END,
1793  diff_first_now, 3);
1794 
1795  /* Throughput */
1796  double thruput = (double)report->bytes_written /
1797  (diff_first_now - diff_first_last);
1798  thruput = scale_thruput(thruput);
1799  changed |= print_column(&header1, &header2, &data, COL_THROUGH,
1800  thruput, 6);
1801 
1802  /* Transactions */
1803  double transac = (double)report->response_blocks_read /
1804  (diff_first_now - diff_first_last);
1805  changed |= print_column(&header1, &header2, &data, COL_TRANSAC,
1806  transac, 2);
1807 
1808  /* Blocks */
1809  changed |= print_column(&header1, &header2, &data, COL_BLOCK_REQU,
1810  report->request_blocks_written, 0);
1811  changed |= print_column(&header1, &header2, &data, COL_BLOCK_RESP,
1812  report->response_blocks_written, 0);
1813 
1814  /* RTT */
1815  double rtt_avg = 0.0;
1816  if (report->response_blocks_read && report->rtt_sum)
1817  rtt_avg = report->rtt_sum /
1818  (double)(report->response_blocks_read);
1819  else
1820  report->rtt_min = report->rtt_max = rtt_avg = INFINITY;
1821  changed |= print_column(&header1, &header2, &data, COL_RTT_MIN,
1822  report->rtt_min * 1e3, 3);
1823  changed |= print_column(&header1, &header2, &data, COL_RTT_AVG,
1824  rtt_avg * 1e3, 3);
1825  changed |= print_column(&header1, &header2, &data, COL_RTT_MAX,
1826  report->rtt_max * 1e3, 3);
1827 
1828  /* IAT */
1829  double iat_avg = 0.0;
1830  if (report->request_blocks_read && report->iat_sum)
1831  iat_avg = report->iat_sum /
1832  (double)(report->request_blocks_read);
1833  else
1834  report->iat_min = report->iat_max = iat_avg = INFINITY;
1835  changed |= print_column(&header1, &header2, &data, COL_IAT_MIN,
1836  report->rtt_min * 1e3, 3);
1837  changed |= print_column(&header1, &header2, &data, COL_IAT_AVG,
1838  iat_avg * 1e3, 3);
1839  changed |= print_column(&header1, &header2, &data, COL_IAT_MAX,
1840  report->iat_max * 1e3, 3);
1841 
1842  /* Delay */
1843  double delay_avg = 0.0;
1844  if (report->request_blocks_read && report->delay_sum)
1845  delay_avg = report->delay_sum /
1846  (double)(report->request_blocks_read);
1847  else
1848  report->delay_min = report->delay_max = delay_avg = INFINITY;
1849  changed |= print_column(&header1, &header2, &data, COL_DLY_MIN,
1850  report->delay_min * 1e3, 3);
1851  changed |= print_column(&header1, &header2, &data, COL_DLY_AVG,
1852  delay_avg * 1e3, 3);
1853  changed |= print_column(&header1, &header2, &data, COL_DLY_MAX,
1854  report->delay_max * 1e3, 3);
1855 
1856  /* TCP info struct */
1857  changed |= print_column(&header1, &header2, &data, COL_TCP_CWND,
1858  report->tcp_info.tcpi_snd_cwnd, 0);
1859  changed |= print_column(&header1, &header2, &data, COL_TCP_SSTH,
1860  report->tcp_info.tcpi_snd_ssthresh, 0);
1861  changed |= print_column(&header1, &header2, &data, COL_TCP_UACK,
1862  report->tcp_info.tcpi_unacked, 0);
1863  changed |= print_column(&header1, &header2, &data, COL_TCP_SACK,
1864  report->tcp_info.tcpi_sacked, 0);
1865  changed |= print_column(&header1, &header2, &data, COL_TCP_LOST,
1866  report->tcp_info.tcpi_lost, 0);
1867  changed |= print_column(&header1, &header2, &data, COL_TCP_RETR,
1868  report->tcp_info.tcpi_retrans, 0);
1869  changed |= print_column(&header1, &header2, &data, COL_TCP_TRET,
1870  report->tcp_info.tcpi_retransmits, 0);
1871  changed |= print_column(&header1, &header2, &data, COL_TCP_FACK,
1872  report->tcp_info.tcpi_fackets, 0);
1873  changed |= print_column(&header1, &header2, &data, COL_TCP_REOR,
1874  report->tcp_info.tcpi_reordering, 0);
1875  changed |= print_column(&header1, &header2, &data, COL_TCP_BKOF,
1876  report->tcp_info.tcpi_backoff, 0);
1877  changed |= print_column(&header1, &header2, &data, COL_TCP_RTT,
1878  report->tcp_info.tcpi_rtt / 1e3, 1);
1879  changed |= print_column(&header1, &header2, &data, COL_TCP_RTTVAR,
1880  report->tcp_info.tcpi_rttvar / 1e3, 1);
1881  changed |= print_column(&header1, &header2, &data, COL_TCP_RTO,
1882  report->tcp_info.tcpi_rto / 1e3, 1);
1883 
1884  /* TCP CA state */
1885  char *ca_state = NULL;
1886  switch (report->tcp_info.tcpi_ca_state) {
1887  case TCP_CA_Open:
1888  ca_state = "open";
1889  break;
1890  case TCP_CA_Disorder:
1891  ca_state = "disorder";
1892  break;
1893  case TCP_CA_CWR:
1894  ca_state = "cwr";
1895  break;
1896  case TCP_CA_Recovery:
1897  ca_state = "recover";
1898  break;
1899  case TCP_CA_Loss:
1900  ca_state = "loss";
1901  break;
1902  default:
1903  ca_state = "unknown";
1904  }
1905  changed |= print_column_str(&header1, &header2, &data,
1906  COL_TCP_CA_STATE, ca_state);
1907 
1908  /* SMSS & PMTU */
1909  changed |= print_column(&header1, &header2, &data, COL_SMSS,
1910  report->tcp_info.tcpi_snd_mss, 0);
1911  changed |= print_column(&header1, &header2, &data, COL_PMTU,
1912  report->pmtu, 0);
1913 
1914 /* Internal flowgrind state */
1915 #ifdef DEBUG
1916  int rc = 0;
1917  char *fg_state = NULL;
1918  if (cflow[flow_id].finished[e]) {
1919  rc = asprintf(&fg_state, "(stopped)");
1920  } else {
1921  /* Write status */
1922  char ws = (char)(report->status & 0xFF);
1923  if (ws != 'd' || ws != 'l' || ws != 'o' || ws != 'f' ||
1924  ws != 'c' || ws != 'n')
1925  ws = 'u';
1926 
1927  /* Read status */
1928  char rs = (char)(report->status >> 8);
1929  if (rs != 'd' || rs != 'l' || rs != 'o' || rs != 'f' ||
1930  rs != 'c' || rs != 'n')
1931  rs = 'u';
1932  rc = asprintf(&fg_state, "(%c/%c)", ws, rs);
1933  }
1934 
1935  if (rc == -1)
1936  critx("could not allocate memory for flowgrind status string");
1937 
1938  changed |= print_column_str(&header1, &header2, &data, COL_STATUS,
1939  fg_state);
1940  free(fg_state);
1941 #endif /* DEBUG */
1942 
1943  /* Print interval header again if either the column width has been
1944  * changed or MAX_REPORTS_BEFORE_HEADER reports have been emited
1945  * since last time header was printed */
1946  static unsigned short printed_reports = 0;
1947  if (changed || (printed_reports % MAX_REPORTS_IN_ROW) == 0) {
1948  print_output("%s\n", header1);
1949  print_output("%s\n", header2);
1950  }
1951 
1952  print_output("%s\n", data);
1953  printed_reports++;
1954  free_all(header1, header2, data);
1955 }
1956 
1963 static char *guess_topology(unsigned mtu)
1964 {
1965  struct mtu_hint {
1966  unsigned mtu;
1967  char *topology;
1968  };
1969 
1970  static const struct mtu_hint mtu_hints[] = {
1971  {65535, "Hyperchannel"}, /* RFC1374 */
1972  {17914, "16 MB/s Token Ring"},
1973  {16436, "Linux Loopback device"},
1974  {16384, "FreeBSD Loopback device"},
1975  {16352, "Darwin Loopback device"},
1976  {9000, "Gigabit Ethernet (Jumboframes)"},
1977  {8166, "802.4 Token Bus"}, /* RFC1042 */
1978  {4464, "4 MB/s Token Ring"},
1979  {4352, "FDDI"}, /* RFC1390 */
1980  {1500, "Ethernet/PPP"}, /* RFC894, RFC1548 */
1981  {1492, "PPPoE"}, /* RFC2516 */
1982  {1472, "IP-in-IP"}, /* RFC1853 */
1983  {1280, "IPv6 Tunnel"}, /* RFC4213 */
1984  {1006, "SLIP"}, /* RFC1055 */
1985  {576, "X.25 & ISDN"}, /* RFC1356 */
1986  {296, "PPP (low delay)"},
1987  };
1988 
1989  size_t array_size = sizeof(mtu_hints) / sizeof(struct mtu_hint);
1990  for (unsigned short i = 0; i < array_size; i++)
1991  if (mtu == mtu_hints[i].mtu)
1992  return mtu_hints[i].topology;
1993  return "unknown";
1994 }
1995 
2002 static void print_final_report(unsigned short flow_id, enum endpoint_t e)
2003 {
2004  /* To store the final report */
2005  char *buf = NULL;
2006 
2007  /* For convenience only */
2008  struct flow_endpoint *endpoint = &cflow[flow_id].endpoint[e];
2009  struct flow_settings *settings = &cflow[flow_id].settings[e];
2010  struct report *report = cflow[flow_id].final_report[e];
2011 
2012  /* Flow ID and endpoint (source or destination) */
2013  if (asprintf(&buf, "# ID %3d %s: ", flow_id, e ? "D" : "S") == -1)
2014  critx("could not allocate memory for final report");;
2015 
2016  /* No final report received. Skip final report line for this endpoint */
2017  if (!report) {
2018  asprintf_append(&buf, "Error: no final report received");
2019  goto out;
2020  }
2021 
2022  /* Infos about the test connections */
2023  asprintf_append(&buf, "%s", endpoint->test_address);
2024 
2025  if (strcmp(endpoint->rpc_info->server_name, endpoint->test_address) != 0)
2026  asprintf_append(&buf, "/%s", endpoint->rpc_info->server_name);
2027  if (endpoint->rpc_info->server_port != DEFAULT_LISTEN_PORT)
2028  asprintf_append(&buf, ":%d", endpoint->rpc_info->server_port);
2029 
2030  /* Infos about the daemon OS */
2031  asprintf_append(&buf, " (%s %s), ",
2032  endpoint->daemon->os_name, endpoint->daemon->os_release);
2033 
2034  /* Random seed */
2035  asprintf_append(&buf, "random seed: %u, ", cflow[flow_id].random_seed);
2036 
2037  /* Sending & Receiving buffer */
2038  asprintf_append(&buf, "sbuf = %d/%d [B] (real/req), ",
2039  endpoint->send_buffer_size_real,
2040  settings->requested_send_buffer_size);
2041  asprintf_append(&buf, "rbuf = %d/%d [B] (real/req), ",
2042  endpoint->receive_buffer_size_real,
2043  settings->requested_read_buffer_size);
2044 
2045  /* SMSS, Path MTU, Interface MTU */
2046  if (report->tcp_info.tcpi_snd_mss > 0)
2047  asprintf_append(&buf, "SMSS = %d [B], ",
2048  report->tcp_info.tcpi_snd_mss);
2049  if (report->pmtu > 0)
2050  asprintf_append(&buf, "PMTU = %d [B], ", report->pmtu);
2051  if (report->imtu > 0)
2052  asprintf_append(&buf, "Interface MTU = %d (%s) [B], ",
2053  report->imtu, guess_topology(report->imtu));
2054 
2055  /* Congestion control algorithms */
2056  if (settings->cc_alg[0])
2057  asprintf_append(&buf, "CC = %s, ", settings->cc_alg);
2058 
2059  /* Calculate time */
2060  double report_time = time_diff(&report->begin, &report->end);
2061  double delta_write = 0.0, delta_read = 0.0;
2062  if (settings->duration[WRITE])
2063  delta_write = report_time - settings->duration[WRITE]
2064  - settings->delay[SOURCE];
2065  if (settings->duration[READ])
2066  delta_read = report_time - settings->duration[READ]
2067  - settings->delay[DESTINATION];
2068 
2069  /* Calculate delta target vs. real report time */
2070  double real_write = settings->duration[WRITE] + delta_write;
2071  double real_read = settings->duration[READ] + delta_read;
2072  if (settings->duration[WRITE])
2073  asprintf_append(&buf, "duration = %.3f/%.3f [s] (real/req), ",
2074  real_write, settings->duration[WRITE]);
2075  if (settings->delay[WRITE])
2076  asprintf_append(&buf, "write delay = %.3f [s], ",
2077  settings->delay[WRITE]);
2078  if (settings->delay[READ])
2079  asprintf_append(&buf, "read delay = %.3f [s], ",
2080  settings->delay[READ]);
2081 
2082  /* Throughput */
2083  double thruput_read = report->bytes_read / MAX(real_read, real_write);
2084  double thruput_write = report->bytes_written / MAX(real_read, real_write);
2085  if (isnan(thruput_read))
2086  thruput_read = 0.0;
2087  if (isnan(thruput_write))
2088  thruput_write = 0.0;
2089 
2090  thruput_read = scale_thruput(thruput_read);
2091  thruput_write = scale_thruput(thruput_write);
2092 
2093  if (copt.mbyte)
2094  asprintf_append(&buf, "through = %.6f/%.6f [MiB/s] (out/in)",
2095  thruput_write, thruput_read);
2096  else
2097  asprintf_append(&buf, "through = " "%.6f/%.6f [Mbit/s] (out/in)",
2098  thruput_write, thruput_read);
2099 
2100  /* Transactions */
2101  double trans = report->response_blocks_read / MAX(real_read, real_write);
2102  if (isnan(trans))
2103  trans = 0.0;
2104  if (trans)
2105  asprintf_append(&buf, ", transactions/s = %.2f [#]", trans);
2106 
2107  /* Blocks */
2108  if (report->request_blocks_written || report->request_blocks_read)
2109  asprintf_append(&buf, ", request blocks = %u/%u [#] (out/in)",
2110  report->request_blocks_written,
2111  report->request_blocks_read);
2112  if (report->response_blocks_written || report->response_blocks_read)
2113  asprintf_append(&buf, ", response blocks = %u/%u [#] (out/in)",
2114  report->response_blocks_written,
2115  report->response_blocks_read);
2116 
2117  /* RTT */
2118  if (report->response_blocks_read) {
2119  double rtt_avg = report->rtt_sum /
2120  (double)(report->response_blocks_read);
2121  asprintf_append(&buf, ", RTT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2122  report->rtt_min * 1e3, rtt_avg * 1e3,
2123  report->rtt_max * 1e3);
2124  }
2125 
2126  /* IAT */
2127  if (report->request_blocks_read) {
2128  double iat_avg = report->iat_sum /
2129  (double)(report->request_blocks_read);
2130  asprintf_append(&buf, ", IAT = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2131  report->iat_min * 1e3, iat_avg * 1e3,
2132  report->iat_max * 1e3);
2133  }
2134 
2135  /* Delay */
2136  if (report->request_blocks_read) {
2137  double delay_avg = report->delay_sum /
2138  (double)(report->request_blocks_read);
2139  asprintf_append(&buf, ", delay = %.3f/%.3f/%.3f [ms] (min/avg/max)",
2140  report->delay_min * 1e3, delay_avg * 1e3,
2141  report->delay_max * 1e3);
2142  }
2143 
2144  /* Fixed sending rate per second was set */
2145  if (settings->write_rate_str)
2146  asprintf_append(&buf, ", rate = %s", settings->write_rate_str);
2147 
2148  /* Socket options */
2149  if (settings->elcn)
2150  asprintf_append(&buf, ", ELCN");
2151  if (settings->cork)
2152  asprintf_append(&buf, ", TCP_CORK");
2153  if (settings->pushy)
2154  asprintf_append(&buf, ", PUSHY");
2155  if (settings->nonagle)
2156  asprintf_append(&buf, ", TCP_NODELAY");
2157  if (settings->mtcp)
2158  asprintf_append(&buf, ", TCP_MTCP");
2159  if (settings->dscp)
2160  asprintf_append(&buf, ", dscp = 0x%02x", settings->dscp);
2161 
2162  /* Other flow options */
2163  if (cflow[flow_id].late_connect)
2164  asprintf_append(&buf, ", late connecting");
2165  if (cflow[flow_id].shutdown)
2166  asprintf_append(&buf, ", calling shutdown");
2167 
2168 out:
2169  print_output("%s\n", buf);
2170  free(buf);
2171 }
2172 
2176 static void print_all_final_reports(void)
2177 {
2178  for (unsigned id = 0; id < copt.num_flows; id++) {
2179  print_output("\n");
2180  foreach(int *i, SOURCE, DESTINATION) {
2181  print_final_report(id, *i);
2182  free(cflow[id].final_report[*i]);
2183  }
2184  }
2185 }
2186 
2195 static struct rpc_info * add_flow_endpoint_by_url(const char* server_url,
2196  const char* server_name,
2197  unsigned short server_port)
2198 {
2199  struct rpc_info *flow_rpc_info;
2200  flow_rpc_info = malloc((sizeof(struct rpc_info)));
2201 
2202  if (!flow_rpc_info ) {
2203  logging(LOG_ALERT, "could not allocate memory for flows rpc info");
2204  return 0;
2205  }
2206 
2207  memset(flow_rpc_info, 0, sizeof(struct rpc_info));
2208 
2209  strcpy(flow_rpc_info->server_url, server_url);
2210  strcpy(flow_rpc_info->server_name, server_name);
2211  flow_rpc_info->server_port = server_port;
2212  fg_list_push_back(&flows_rpc_info, flow_rpc_info);
2213  return flow_rpc_info;
2214 }
2215 
2224 static struct rpc_info * set_rpc_info(const char* server_url,
2225  const char* server_name,
2226  unsigned short server_port)
2227 {
2228  if(fg_list_size(&flows_rpc_info) == 0)
2229  return add_flow_endpoint_by_url(server_url,server_name, server_port);
2230 
2231  /* If we have already stored flow info for this URL return a pointer to it */
2232  const struct list_node *node = fg_list_front(&flows_rpc_info);
2233  while (node) {
2234  struct rpc_info *flow_rpc_info= node->data;
2235  node = node->next;
2236 
2237  if (!strcmp(flow_rpc_info->server_url, server_url))
2238  return flow_rpc_info;
2239  }
2240  /* didn't find anything, seems to be a new one */
2241  return add_flow_endpoint_by_url(server_url,server_name, server_port);
2242 }
2243 
2251 static void parse_trafgen_option(const char *params, int flow_id, int endpoint_id)
2252 {
2253  int rc;
2254  double param1 = 0, param2 = 0, unused;
2255  char typechar, distchar;
2256  enum distribution_t distr = CONSTANT;
2257 
2258  rc = sscanf(params, "%c:%c:%lf:%lf:%lf", &typechar, &distchar,
2259  &param1, &param2, &unused);
2260  if (rc != 3 && rc != 4)
2261  PARSE_ERR("flow %i: option -G: malformed traffic generation "
2262  "parameters", flow_id);
2263 
2264  switch (distchar) {
2265  case 'N':
2266  distr = NORMAL;
2267  if (!param1 || !param2)
2268  PARSE_ERR("flow %i: option -G: normal distribution "
2269  "needs two non-zero parameters", flow_id);
2270  break;
2271  case 'W':
2272  distr = WEIBULL;
2273  if (!param1 || !param2)
2274  PARSE_ERR("flow %i: option -G: weibull distribution "
2275  "needs two non-zero parameters", flow_id);
2276  break;
2277  case 'U':
2278  distr = UNIFORM;
2279  if (param1 <= 0 || param2 <= 0 || (param1 > param2))
2280  PARSE_ERR("flow %i: option -G: uniform distribution "
2281  "needs two positive parameters", flow_id);
2282  break;
2283  case 'E':
2284  distr = EXPONENTIAL;
2285  if (param1 <= 0)
2286  PARSE_ERR("flow %i: option -G: exponential distribution "
2287  "needs one positive parameter", flow_id);
2288  break;
2289  case 'P':
2290  distr = PARETO;
2291  if (!param1 || !param2)
2292  PARSE_ERR("flow %i: option -G: pareto distribution "
2293  "needs two non-zero parameters", flow_id);
2294  break;
2295  case 'L':
2296  distr = LOGNORMAL;
2297  if (!param1 || !param2)
2298  PARSE_ERR("flow %i: option -G: lognormal distribution "
2299  "needs two non-zero parameters", flow_id);
2300  break;
2301  case 'C':
2302  distr = CONSTANT;
2303  if (param1 <= 0)
2304  PARSE_ERR("flow %i: option -G: constant distribution "
2305  "needs one positive parameters", flow_id);
2306  break;
2307  default:
2308  PARSE_ERR("flow %i: option -G: syntax error: %c is not a "
2309  "distribution", flow_id, distchar);
2310  break;
2311  }
2312 
2313  switch (typechar) {
2314  case 'p':
2315  cflow[flow_id].settings[endpoint_id].response_trafgen_options.distribution = distr;
2316  cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_one = param1;
2317  cflow[flow_id].settings[endpoint_id].response_trafgen_options.param_two = param2;
2318  break;
2319  case 'q':
2320  cflow[flow_id].settings[endpoint_id].request_trafgen_options.distribution = distr;
2321  cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_one = param1;
2322  cflow[flow_id].settings[endpoint_id].request_trafgen_options.param_two = param2;
2323  break;
2324  case 'g':
2325  cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.distribution = distr;
2326  cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_one = param1;
2327  cflow[flow_id].settings[endpoint_id].interpacket_gap_trafgen_options.param_two = param2;
2328  break;
2329  }
2330 
2331  /* sanity check for max block size */
2332  foreach(int *i, SOURCE, DESTINATION) {
2333  if (distr == CONSTANT &&
2334  cflow[flow_id].settings[*i].maximum_block_size < param1)
2335  cflow[flow_id].settings[*i].maximum_block_size = param1;
2336  if (distr == UNIFORM &&
2337  cflow[flow_id].settings[*i].maximum_block_size < param2)
2338  cflow[flow_id].settings[*i].maximum_block_size = param2;
2339  }
2340 }
2341 
2349 static void parse_rate_option(const char *arg, int flow_id, int endpoint_id)
2350 {
2351  char unit = 0, type = 0;
2352  double optdouble = 0.0;
2353  /* last %c for catching wrong input... this is not nice. */
2354  int rc = sscanf(arg, "%lf%c%c%c",
2355  &optdouble, &unit, &type, &unit);
2356  if (rc < 1 || rc > 4)
2357  PARSE_ERR("flow %i: option -R: malformed rate", flow_id);
2358 
2359  if (optdouble == 0.0)
2360  PARSE_ERR("flow %i: option -R: rate of 0", flow_id);
2361 
2362 
2363  switch (unit) {
2364  case 0:
2365  case 'z':
2366  break;
2367 
2368  case 'k':
2369  optdouble *= 1<<10;
2370  break;
2371 
2372  case 'M':
2373  optdouble *= 1<<20;
2374  break;
2375 
2376  case 'G':
2377  optdouble *= 1<<30;
2378  break;
2379 
2380  default:
2381  PARSE_ERR("flow %i: option -R: illegal unit specifier", flow_id);
2382  break;
2383  }
2384 
2385  if (type != 'b' && type != 'B')
2386  PARSE_ERR("flow %i: option -R: illegal type specifier "
2387  "(either 'b' or 'B')", flow_id);
2388  if (type == 'b')
2389  optdouble /= 8;
2390 
2391  if (optdouble > 5e9)
2392  warnx("rate of flow %d too high", flow_id);
2393 
2394  cflow[flow_id].settings[endpoint_id].write_rate_str = strdup(arg);
2395  cflow[flow_id].settings[endpoint_id].write_rate = optdouble;
2396 }
2397 
2398 
2399 
2410 static void parse_host_option(const char* hostarg, int flow_id, int endpoint_id)
2411 {
2412  struct sockaddr_in6 source_in6;
2413  source_in6.sin6_family = AF_INET6;
2414  int port = DEFAULT_LISTEN_PORT;
2415  bool extra_rpc = false;
2416  bool is_ipv6 = false;
2417  char *rpc_address, *url = 0, *sepptr = 0;
2418  char *arg = strdup(hostarg);
2419  struct flow_endpoint* endpoint = &cflow[flow_id].endpoint[endpoint_id];
2420 
2421  /* extra RPC address ? */
2422  sepptr = strchr(arg, '/');
2423  if (sepptr) {
2424  *sepptr = '\0';
2425  rpc_address = sepptr + 1;
2426  extra_rpc = true;
2427  } else {
2428  rpc_address = arg;
2429  }
2430 
2431  /* IPv6 Address? */
2432  if (strchr(arg, ':')) {
2433  if (inet_pton(AF_INET6, arg, (char*)&source_in6.sin6_addr) <= 0)
2434  PARSE_ERR("flow %i: invalid IPv6 address '%s' for "
2435  "test connection", flow_id, arg);
2436 
2437  if (!extra_rpc)
2438  is_ipv6 = true;
2439  }
2440 
2441  /* optional dedicated rpc address was supplied and needs to be parsed */
2442  if (extra_rpc) {
2443  parse_rpc_address(&rpc_address, &port, &is_ipv6);
2444  if (is_ipv6 && (inet_pton(AF_INET6, rpc_address,
2445  (char*)&source_in6.sin6_addr) <= 0))
2446  PARSE_ERR("flow %i: invalid IPv6 address '%s' for RPC",
2447  flow_id, arg);
2448  if (port < 1 || port > 65535)
2449  PARSE_ERR("flow %i: invalid port for RPC", flow_id);
2450  }
2451 
2452  if (!*arg)
2453  PARSE_ERR("flow %i: no test host given in argument", flow_id);
2454 
2455  int rc = 0;
2456  if (is_ipv6)
2457  rc = asprintf(&url, "http://[%s]:%d/RPC2", rpc_address, port);
2458  else
2459  rc = asprintf(&url, "http://%s:%d/RPC2", rpc_address, port);
2460 
2461  if (rc == -1)
2462  critx("could not allocate memory for RPC URL");
2463 
2464  /* Get flow endpoint server information for each flow */
2465  endpoint->rpc_info = set_rpc_info(url, rpc_address, port);
2466  strcpy(endpoint->test_address, arg);
2467  free_all(arg, url);
2468 }
2469 
2479 static void parse_flow_option_endpoint(int code, const char* arg,
2480  const char* opt_string, int flow_id,
2481  int endpoint_id)
2482 {
2483  int optint = 0;
2484  double optdouble = 0.0;
2485 
2486  struct flow_settings* settings = &cflow[flow_id].settings[endpoint_id];
2487 
2488  switch (code) {
2489  case 'G':
2490  parse_trafgen_option(arg, flow_id, endpoint_id);
2491  break;
2492  case 'A':
2496  break;
2497  case 'B':
2498  if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2499  PARSE_ERR("in flow %i: option %s needs positive integer",
2500  flow_id, opt_string);
2501  settings->requested_send_buffer_size = optint;
2502  break;
2503  case 'C':
2504  settings->flow_control= 1;
2505  break;
2506  case 'D':
2507  if (sscanf(arg, "%x", &optint) != 1 || (optint & ~0x3f))
2508  PARSE_ERR("in flow %i: option %s service code point "
2509  "is malformed", flow_id, opt_string);
2510  settings->dscp = optint;
2511  break;
2512  case 'H':
2513  parse_host_option(arg, flow_id, endpoint_id);
2514  break;
2515  case 'M':
2516  settings->traffic_dump = 1;
2517  break;
2518  case 'O':
2519  if (!*arg)
2520  PARSE_ERR("in flow %i: option %s requires a value "
2521  "for each endpoint", flow_id, opt_string);
2522 
2523  if (!strcmp(arg, "TCP_CORK")) {
2524  settings->cork = 1;
2525  } else if (!strcmp(arg, "TCP_ELCN")) {
2526  settings->elcn = 1;
2527  } else if (!strcmp(arg, "TCP_LCD")) {
2528  settings->lcd = 1;
2529  } else if (!strcmp(arg, "TCP_MTCP")) {
2530  settings->mtcp = 1;
2531  } else if (!strcmp(arg, "TCP_NODELAY")) {
2532  settings->nonagle = 1;
2533  } else if (!strcmp(arg, "ROUTE_RECORD")) {
2534  settings->route_record = 1;
2535  /* keep TCP_CONG_MODULE for backward compatibility */
2536  } else if (!memcmp(arg, "TCP_CONG_MODULE=", 16)) {
2537  if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2538  PARSE_ERR("in flow %i: option %s: too large "
2539  "string for TCP_CONG_MODULE",
2540  flow_id, opt_string);
2541  strcpy(settings->cc_alg, arg + 16);
2542  } else if (!memcmp(arg, "TCP_CONGESTION=", 15)) {
2543  if (strlen(arg + 16) >= sizeof(cflow[0].settings[SOURCE].cc_alg))
2544  PARSE_ERR("in flow %i: option %s: too large "
2545  "string for TCP_CONGESTION",
2546  flow_id, opt_string);
2547  strcpy(settings->cc_alg, arg + 15);
2548  } else if (!strcmp(arg, "SO_DEBUG")) {
2549  settings->so_debug = 1;
2550  } else if (!strcmp(arg, "IP_MTU_DISCOVER")) {
2551  settings->ipmtudiscover = 1;
2552  } else {
2553  PARSE_ERR("in flow %i: option %s: unknown socket "
2554  "option or socket option not implemented",
2555  flow_id, opt_string);
2556  }
2557  break;
2558  case 'P':
2559  settings->pushy = 1;
2560  break;
2561  case 'R':
2562  if (!*arg)
2563  PARSE_ERR("in flow %i: option %s requires a value "
2564  "for each given endpoint", flow_id, opt_string);
2565  parse_rate_option(arg, flow_id, endpoint_id);
2566  break;
2567  case 'S':
2568  if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2569  PARSE_ERR("in flow %i: option %s needs positive integer",
2570  flow_id, opt_string);
2572  settings->request_trafgen_options.param_one = optint;
2573  for (int id = 0; id < MAX_FLOWS_CONTROLLER; id++) {
2574  foreach(int *i, SOURCE, DESTINATION) {
2575  if ((signed)optint >
2576  cflow[id].settings[*i].maximum_block_size)
2578  (signed)optint;
2579  }
2580  }
2581  break;
2582  case 'T':
2583  if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2584  PARSE_ERR("in flow %i: option %s needs positive number",
2585  flow_id, opt_string);
2586  settings->duration[WRITE] = optdouble;
2587  break;
2588  case 'U':
2589  if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2590  PARSE_ERR("in flow %i: option %s needs positive integer",
2591  flow_id, opt_string);
2592  settings->maximum_block_size = optint;
2593  break;
2594  case 'W':
2595  if (sscanf(arg, "%u", &optint) != 1 || optint < 0)
2596  PARSE_ERR("in flow %i: option %s needs non-negative number",
2597  flow_id, opt_string);
2598  settings->requested_read_buffer_size = optint;
2599  break;
2600  case 'Y':
2601  if (sscanf(arg, "%lf", &optdouble) != 1 || optdouble < 0)
2602  PARSE_ERR("in flow %i: option %s needs non-negative number",
2603  flow_id, opt_string);
2604  settings->delay[WRITE] = optdouble;
2605  break;
2606  }
2607 }
2608 
2617 static void parse_flow_option(int code, const char* arg, const char* opt_string,
2618  int flow_id)
2619 {
2620  unsigned optunsigned = 0;
2621 
2622  switch (code) {
2623  /* flow options w/o endpoint identifier */
2624  case 'E':
2626  break;
2627  case 'I':
2629  break;
2630  case 'J':
2631  if (sscanf(arg, "%u", &optunsigned) != 1)
2632  PARSE_ERR("option %s needs an integer argument",
2633  opt_string);
2634  cflow[flow_id].random_seed = optunsigned;
2635  break;
2636  case 'L':
2637  cflow[flow_id].late_connect = 1;
2638  break;
2639  case 'N':
2640  cflow[flow_id].shutdown = 1;
2641  break;
2642  case 'Q':
2644  break;
2645  }
2646 }
2647 
2654 static void parse_colon_option(const char *arg)
2655 {
2656  /* To make it easy (independed of default values), hide all colons */
2665 #ifdef DEBUG
2667 #endif /* DEBUG */
2668 
2669  /* Set colon visibility according option */
2670  char *argcpy = strdup(arg);
2671  for (char *token = strtok(argcpy, ","); token;
2672  token = strtok(NULL, ",")) {
2673  if (!strcmp(token, "interval"))
2675  else if (!strcmp(token, "through"))
2677  else if (!strcmp(token, "transac"))
2679  else if (!strcmp(token, "blocks"))
2681  else if (!strcmp(token, "rtt"))
2683  else if (!strcmp(token, "iat"))
2685  else if (!strcmp(token, "delay"))
2687  else if (!strcmp(token, "kernel"))
2693  COL_PMTU);
2694 #ifdef DEBUG
2695  else if (!strcmp(token, "status"))
2697 #endif /* DEBUG */
2698  else
2699  PARSE_ERR("%s", "malformed option '-c'");
2700  }
2701  free(argcpy);
2702 }
2703 
2711 static void parse_general_option(int code, const char* arg, const char* opt_string)
2712 {
2713 
2714  switch (code) {
2715  case 0:
2716  PARSE_ERR("invalid argument: %s", arg);
2717  /* general options */
2718  case 'h':
2719  if (!arg || !strlen(arg))
2720  usage(EXIT_SUCCESS);
2721  else if (!strcmp(arg, "socket"))
2722  usage_sockopt();
2723  else if (!strcmp(arg, "traffic"))
2724  usage_trafgenopt();
2725  else
2726  PARSE_ERR("invalid argument '%s' for %s", arg, opt_string);
2727  break;
2728  case 'v':
2729  fprintf(stdout, "%s %s\n%s\n%s\n\n%s\n", progname,
2732  exit(EXIT_SUCCESS);
2733 
2734  /* controller options */
2735  case 'c':
2736  parse_colon_option(arg);
2737  break;
2738 #ifdef DEBUG
2739  case 'd':
2741  break;
2742 #endif /* DEBUG */
2743  case 'e':
2744  copt.dump_prefix = strdup(arg);
2745  break;
2746  case 'i':
2747  if (sscanf(arg, "%lf", &copt.reporting_interval) != 1 ||
2748  copt.reporting_interval <= 0)
2749  PARSE_ERR("option %s needs a positive number "
2750  "(in seconds)", opt_string);
2751  break;
2752  case LOG_FILE_OPTION:
2753  copt.log_to_file = true;
2754  if (arg)
2755  log_filename = strdup(arg);
2756  break;
2757  case 'm':
2758  copt.mbyte = true;
2759  column_info[COL_THROUGH].header.unit = " [MiB/s]";
2760  break;
2761  case 'n':
2762  if (sscanf(arg, "%hd", &copt.num_flows) != 1 ||
2764  PARSE_ERR("option %s (number of flows) must be within "
2765  "[1..%d]", opt_string, MAX_FLOWS_CONTROLLER);
2766  break;
2767  case 'o':
2768  copt.clobber = true;
2769  break;
2770  case 'p':
2771  copt.symbolic = false;
2772  break;
2773  case 'q':
2774  copt.log_to_stdout = false;
2775  break;
2776  case 's':
2777  if (!strcmp(arg, "segment"))
2779  else if (!strcmp(arg, "byte"))
2781  else
2782  PARSE_ERR("invalid argument '%s' for option %s",
2783  arg, opt_string);
2784  break;
2785  case 'w':
2786  copt.log_to_file = true;
2787  break;
2788  /* unknown option or missing option-argument */
2789  default:
2790  PARSE_ERR("uncaught option: %s", arg);
2791  break;
2792  }
2793 
2794 }
2795 
2807 static void check_mutex(struct ap_Mutex_state ms[],
2808  const enum mutex_context_t context,
2809  const int argind, int flow_id)
2810 {
2811  int mutex_index;
2812  if (context == MUTEX_CONTEXT_CONTROLLER){
2813  if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2814  PARSE_ERR("Option %s conflicts with option %s",
2815  ap_opt_string(&parser, argind),
2816  ap_opt_string(&parser, mutex_index));
2817  } else {
2818  if (ap_set_check_mutex(&parser, &ms[context], argind, &mutex_index))
2819  PARSE_ERR("In flow %i: option %s conflicts with option %s",
2820  flow_id, ap_opt_string(&parser, argind),
2821  ap_opt_string(&parser, mutex_index));
2822  }
2823 }
2824 
2838 static void parse_multi_endpoint_option(int code, const char* arg,
2839  const char* opt_string,
2840  struct ap_Mutex_state ms[], int argind,
2841  int flow_id)
2842 {
2843  char *argcpy = strdup(arg);
2844  for (char *token = strtok(argcpy, ","); token;
2845  token = strtok(NULL, ",")) {
2846 
2847  char type = token[0];
2848  char* arg;
2849 
2850  if (token[1] == '=')
2851  arg = token + 2;
2852  else
2853  arg = token + 1;
2854 
2855  if (type != 's' && type != 'd' && type != 'b')
2856  PARSE_ERR("Invalid endpoint specifier in Option %s",
2857  opt_string);
2858 
2859  /* check mutex in context of current endpoint */
2860  if (type == 's' || type == 'b') {
2861  check_mutex(ms, MUTEX_CONTEXT_SOURCE, argind, flow_id);
2862  parse_flow_option_endpoint(code, arg, opt_string,
2863  flow_id, SOURCE);
2864  }
2865  if (type == 'd' || type == 'b') {
2866  check_mutex(ms, MUTEX_CONTEXT_DESTINATION, argind, flow_id);
2867  parse_flow_option_endpoint(code, arg, opt_string,
2868  flow_id, DESTINATION);
2869  }
2870  }
2871  free(argcpy);
2872 }
2873 
2883 static void parse_cmdline(int argc, char *argv[])
2884 {
2885  int rc = 0;
2886  int cur_num_flows = 0;
2887  int current_flow_ids[MAX_FLOWS_CONTROLLER];
2888  int max_flow_specifier = 0;
2889  int optint = 0;
2890 
2891  const struct ap_Option options[] = {
2892  {'c', "show-colon", ap_yes, OPT_CONTROLLER, 0},
2893 #ifdef DEBUG
2894  {'d', "debug", ap_no, OPT_CONTROLLER, 0},
2895 #endif /* DEBUG */
2896  {'e', "dump-prefix", ap_yes, OPT_CONTROLLER, 0},
2897  {'h', "help", ap_maybe, OPT_CONTROLLER, 0},
2898  {'i', "report-interval", ap_yes, OPT_CONTROLLER, 0},
2899  {LOG_FILE_OPTION, "log-file", ap_maybe, OPT_CONTROLLER, 0},
2900  {'m', 0, ap_no, OPT_CONTROLLER, 0},
2901  {'n', "flows", ap_yes, OPT_CONTROLLER, 0},
2902  {'o', 0, ap_no, OPT_CONTROLLER, 0},
2903  {'p', 0, ap_no, OPT_CONTROLLER, 0},
2904  {'q', "quiet", ap_no, OPT_CONTROLLER, 0},
2905  {'s', "tcp-stack", ap_yes, OPT_CONTROLLER, 0},
2906  {'v', "version", ap_no, OPT_CONTROLLER, 0},
2907  {'w', 0, ap_no, OPT_CONTROLLER, 0},
2908  {'A', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,0}},
2909  {'B', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2910  {'C', 0, ap_no, OPT_FLOW_ENDPOINT, 0},
2911  {'D', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2912  {'E', 0, ap_no, OPT_FLOW, 0},
2913  {'F', 0, ap_yes, OPT_SELECTOR, 0},
2914  {'G', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){1,2,3,0}},
2915  {'H', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2916  {'I', 0, ap_no, OPT_FLOW, 0},
2917  {'J', 0, ap_yes, OPT_FLOW, 0},
2918  {'L', 0, ap_no, OPT_FLOW, 0},
2919  {'M', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2920  {'N', 0, ap_no, OPT_FLOW, 0},
2921  {'O', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2922  {'P', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2923  {'Q', 0, ap_no, OPT_FLOW, 0},
2924  {'R', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){2,0}},
2925  {'S', 0, ap_yes, OPT_FLOW_ENDPOINT, (int[]){3,0}},
2926  {'T', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2927  {'U', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2928  {'W', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2929  {'Y', 0, ap_yes, OPT_FLOW_ENDPOINT, 0},
2930  {0, 0, ap_no, 0, 0}
2931  };
2932 
2933  if (!ap_init(&parser, argc, (const char* const*) argv, options, 0))
2934  critx("could not allocate memory for option parser");
2935  if (ap_error(&parser))
2936  PARSE_ERR("%s", ap_error(&parser));
2937 
2938  /* initialize 4 mutex contexts (for SOURCE+DESTINATION+CONTROLLER+BOTH ENDPOINTS) */
2939  struct ap_Mutex_state ms[4];
2942  ap_init_mutex_state(&parser, &ms[*i]);
2943 
2944  /* if no option -F is given, configure all flows*/
2945  for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2946  current_flow_ids[i] = i;
2947  cur_num_flows = MAX_FLOWS_CONTROLLER;
2948 
2949  /* parse command line */
2950  for (int argind = 0; argind < ap_arguments(&parser); argind++) {
2951  const int code = ap_code(&parser, argind);
2952  const char *arg = ap_argument(&parser, argind);
2953  const char *opt_string = ap_opt_string(&parser, argind);
2954  int tag = ap_option(&parser, argind)->tag;
2955 
2956  /* distinguish option types by tag first */
2957  switch (tag) {
2958  case OPT_CONTROLLER:
2959  check_mutex(ms, MUTEX_CONTEXT_CONTROLLER, argind, 0);
2960  parse_general_option(code, arg, opt_string);
2961  break;
2962  case OPT_SELECTOR:
2963  cur_num_flows = 0;
2964  char *argcpy = strdup(arg);
2965  for (char *token = strtok(argcpy, ","); token;
2966  token = strtok(NULL, ",")) {
2967  rc = sscanf(token, "%d", &optint);
2968  if (rc != 1)
2969  PARSE_ERR("%s", "Malformed flow specifier");
2970 
2971  /* all flows */
2972  if (optint == -1) {
2973  for (int i = 0; i < MAX_FLOWS_CONTROLLER; i++)
2974  current_flow_ids[i] = i;
2975  cur_num_flows = MAX_FLOWS_CONTROLLER;
2976  break;
2977  }
2978 
2979  current_flow_ids[cur_num_flows++] = optint;
2980  ASSIGN_MAX(max_flow_specifier, optint);
2981  }
2982  free(argcpy);
2983  /* reset mutex for each new flow */
2984  foreach(int *i, MUTEX_CONTEXT_SOURCE,
2987  ap_reset_mutex(&ms[*i]);
2988  break;
2989  case OPT_FLOW:
2991  current_flow_ids[0]);
2992  for (int i = 0; i < cur_num_flows; i++)
2993  parse_flow_option(code, arg, opt_string,
2994  current_flow_ids[i]);
2995  break;
2996  case OPT_FLOW_ENDPOINT:
2997  for (int i = 0; i < cur_num_flows; i++)
2998  parse_multi_endpoint_option(code, arg,
2999  opt_string, ms, argind,
3000  current_flow_ids[i]);
3001  break;
3002  default:
3003  PARSE_ERR("%s", "uncaught option tag!");
3004  break;
3005  }
3006  }
3007 
3008  if (copt.num_flows <= max_flow_specifier)
3009  PARSE_ERR("%s", "must not specify option for non-existing flow");
3010 
3011 #if 0
3012  /* Demonstration how to set arbitary socket options. Note that this is
3013  * only intended for quickly testing new options without having to
3014  * recompile and restart the daemons. To add support for a particular
3015  * options in future flowgrind versions it's recommended to implement
3016  * them like the other options supported by the -O argument.
3017  */
3018  {
3019  assert(cflow[0].settings[SOURCE].num_extra_socket_options < MAX_EXTRA_SOCKET_OPTIONS);
3020  struct extra_socket_options *option = &cflow[0].settings[SOURCE].extra_socket_options[cflow[0].settings[SOURCE].num_extra_socket_options++];
3021  int v;
3022 
3023  /* The value of the TCP_NODELAY constant gets passed to the daemons.
3024  * If daemons use a different system, constants may be different. In this case use
3025  * a value that matches the daemons'. */
3026  option->optname = TCP_NODELAY; /* or option->optname = 12345; as explained above */
3027 
3028  option->level = level_ipproto_tcp; /* See extra_socket_option_level enum in common.h */
3029 
3030  /* Again, value needs to be of correct size for the daemons.
3031  * Particular pitfalls can be differences in integer sizes or endianess.
3032  */
3033  assert(sizeof(v) < MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH);
3034  option->optlen = sizeof(v);
3035  memcpy(option->optval, &v, sizeof(v));
3036  }
3037 #endif /* 0 */
3038 
3039  for (unsigned short id = 0; id < copt.num_flows; id++) {
3044 
3045  foreach(int *i, SOURCE, DESTINATION) {
3046  /* Default to localhost, if no endpoints were set for a flow */
3047  if (!cflow[id].endpoint[*i].rpc_info) {
3048  cflow[id].endpoint[*i].rpc_info = set_rpc_info(
3049  "http://localhost:5999/RPC2", "localhost", DEFAULT_LISTEN_PORT);
3050  }
3051  }
3052  }
3053 
3056  ap_free_mutex_state(&ms[*i]);
3057 }
3058 
3062 static void sanity_check(void)
3063 {
3064  for (unsigned short id = 0; id < copt.num_flows; id++) {
3065  DEBUG_MSG(LOG_DEBUG, "sanity checking parameter set of flow %d", id);
3066  if (cflow[id].settings[DESTINATION].duration[WRITE] > 0 &&
3067  cflow[id].late_connect &&
3068  cflow[id].settings[DESTINATION].delay[WRITE] <
3069  cflow[id].settings[SOURCE].delay[WRITE]) {
3070  errx("server flow %d starts earlier than client "
3071  "flow while late connecting", id);
3072  exit(EXIT_FAILURE);
3073  }
3074  if (cflow[id].settings[SOURCE].delay[WRITE] > 0 &&
3075  cflow[id].settings[SOURCE].duration[WRITE] == 0) {
3076  errx("client flow %d has a delay but no runtime", id);
3077  exit(EXIT_FAILURE);
3078  }
3079  if (cflow[id].settings[DESTINATION].delay[WRITE] > 0 &&
3080  cflow[id].settings[DESTINATION].duration[WRITE] == 0) {
3081  errx("server flow %d has a delay but no runtime", id);
3082  exit(EXIT_FAILURE);
3083  }
3084  if (!cflow[id].settings[DESTINATION].duration[WRITE] &&
3085  !cflow[id].settings[SOURCE].duration[WRITE]) {
3086  errx("server and client flow have both zero runtime "
3087  "for flow %d", id);
3088  exit(EXIT_FAILURE);
3089  }
3090 
3091  foreach(int *i, SOURCE, DESTINATION) {
3092  if (cflow[id].settings[*i].flow_control &&
3093  !cflow[id].settings[*i].write_rate_str) {
3094  errx("flow %d has flow control enabled but no "
3095  "rate", id);
3096  exit(EXIT_FAILURE);
3097  }
3098 
3099  if (cflow[id].settings[*i].write_rate &&
3100  (cflow[id].settings[*i].write_rate /
3101  cflow[id].settings[*i].maximum_block_size) < 1) {
3102  errx("client block size for flow %u is too big for "
3103  "specified rate", id);
3104  exit(EXIT_FAILURE);
3105  }
3106  }
3107  DEBUG_MSG(LOG_DEBUG, "sanity check parameter set of flow %d completed", id);
3108  }
3109 }
3110 
3111 int main(int argc, char *argv[])
3112 {
3113  struct sigaction sa;
3114  sa.sa_handler = sighandler;
3115  sa.sa_flags = 0;
3116  sigemptyset (&sa.sa_mask);
3117  if (sigaction(SIGINT, &sa, NULL))
3118  critx("could not set handler for SIGINT");
3119 
3120  xmlrpc_client *rpc_client = 0;
3121  xmlrpc_env_init(&rpc_env);
3122  xmlrpc_client_setup_global_const(&rpc_env);
3123 
3126 
3127  set_progname(argv[0]);
3130  parse_cmdline(argc, argv);
3131  sanity_check();
3132  open_logfile();
3133  prepare_xmlrpc_client(&rpc_client);
3134 
3135  DEBUG_MSG(LOG_WARNING, "check daemons in the flows");
3136  if (!sigint_caught)
3137  find_daemon(rpc_client);
3138 
3139  DEBUG_MSG(LOG_WARNING, "check flowgrindds versions");
3140  if (!sigint_caught)
3141  check_version(rpc_client);
3142 
3143  DEBUG_MSG(LOG_WARNING, "check if flowgrindds are idle");
3144  if (!sigint_caught)
3145  check_idle(rpc_client);
3146 
3147  DEBUG_MSG(LOG_WARNING, "prepare all flows");
3148  if (!sigint_caught)
3149  prepare_all_flows(rpc_client);
3150 
3151  DEBUG_MSG(LOG_WARNING, "print headline");
3152  if (!sigint_caught)
3153  print_headline();
3154 
3155  DEBUG_MSG(LOG_WARNING, "start all flows");
3156  if (!sigint_caught)
3157  start_all_flows(rpc_client);
3158 
3159  DEBUG_MSG(LOG_WARNING, "close all flows");
3160  close_all_flows();
3161 
3162  DEBUG_MSG(LOG_WARNING, "print all final report");
3163  fetch_reports(rpc_client);
3165 
3168 
3169  close_logfile();
3170 
3171  xmlrpc_client_destroy(rpc_client);
3172  xmlrpc_env_clean(&rpc_env);
3173  xmlrpc_client_teardown_global_const();
3174 
3175  ap_free(&parser);
3176 
3177  DEBUG_MSG(LOG_WARNING, "bye");
3178 }
static void open_logfile(void)
Create a logfile for measurement output.
Definition: flowgrind.c:551
bool clobber
Overwrite existing log files (option -o).
Definition: flowgrind.h:190
Infos about a flowgrind daemon.
Definition: flowgrind.h:200
double delay_sum
Accumulated one-way delay.
Definition: common.h:319
Infos about the flow including flow options.
Definition: flowgrind.h:239
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:126
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:135
int maximum_block_size
Application buffer size in bytes (option -U).
Definition: common.h:201
static void usage_sockopt(void)
Print usage or error message and exit.
Definition: flowgrind.c:206
static struct arg_parser parser
Command line option parser.
Definition: flowgrind.c:113
bool mbyte
Report in MByte/s instead of MBit/s (option -m).
Definition: flowgrind.h:192
int mtcp
Set TCP_MTCP (15) on test socket (option -O).
Definition: common.h:242
unsigned random_seed
Random seed for stochastic traffic generation (option -J).
Definition: flowgrind.h:256
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:138
static void start_all_flows(xmlrpc_client *rpc_client)
Start test connections for all flows in a test.
Definition: flowgrind.c:1240
static double scale_thruput(double thruput)
Scale the given throughput thruput in either Mebibyte per seconds or in Megabits per seconds...
Definition: flowgrind.c:1606
static bool print_column_str(char **header1, char **header2, char **data, enum column_id column_id, char *value)
Append measured data for interval report column column_id to given strings.
Definition: flowgrind.c:1663
int gettime(struct timespec *tp)
Returns the current wall-clock time with nanosecond precision.
Definition: fg_time.c:145
Command line argument parser.
Report interval.
Definition: flowgrind.h:102
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:130
static struct linked_list flows_rpc_info
Global linked list to the flow endpoints XML RPC connection information.
Definition: flowgrind.c:107
void set_progname(const char *argv0)
Set global variable &#39;progname&#39;, based on argv[0].
Definition: fg_progname.c:37
int num_extra_socket_options
Definition: common.h:262
int lcd
Set TCP_LCD (21) on test socket (option -O).
Definition: common.h:240
struct rpc_info * rpc_info
Pointer to manage flow endpoint XMLRPC information.
Definition: flowgrind.h:231
enum protocol_t proto
Used transport protocol.
Definition: flowgrind.h:241
void parse_rpc_address(char **rpc_address, int *port, bool *is_ipv6)
Parse RPC address for the xmlrpc control connection.
Definition: fg_rpc_client.c:40
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:134
#define MAX_EXTRA_SOCKET_OPTIONS
Max number of arbitrary extra socket options which may sent to the deamon.
Definition: common.h:68
static void init_controller_options(void)
Initialization of general controller options.
Definition: flowgrind.c:466
Program name management.
static void parse_flow_option(int code, const char *arg, const char *opt_string, int flow_id)
Parse flow options without endpoint.
Definition: flowgrind.c:2617
Debugging routines for Flowgrind controller and daemon.
#define MAX_FLOWS_CONTROLLER
Maximal number of parallel flows supported by one controller.
Definition: common.h:58
enum endpoint_t endpoint
Daemon endpoint - either source or destination.
Definition: common.h:289
const char * ctimenow(bool ns)
Returns the current wall-clock time as null-terminated string.
Definition: fg_time.c:56
static void print_headline(void)
Print headline with various informations before the actual measurement will be begin.
Definition: flowgrind.c:931
static bool sigint_caught
SIGINT (CTRL-C) received?
Definition: flowgrind.c:101
#define PARSE_ERR(err_msg,...)
Print error message, usage string and exit.
Definition: flowgrind.c:86
int requested_send_buffer_size
Request sender buffer in bytes (option -B).
Definition: common.h:196
char test_address[1000]
network address where the actual test connection goes to.
Definition: flowgrind.h:235
double iat_sum
Accumulated inter-arrival time.
Definition: common.h:313
static struct controller_options copt
Controller options.
Definition: flowgrind.c:116
int main(int argc, char *argv[])
Definition: flowgrind.c:3111
void logging(int priority, const char *fmt,...)
Definition: fg_log.c:69
int tcpi_rttvar
Definition: common.h:278
#define unlikely(x)
These macros gain us a few percent of speed.
Flow option without endpoint string.
Definition: flowgrind.h:154
Linux is a segment-based stack.
Definition: flowgrind.h:75
static void parse_flow_option_endpoint(int code, const char *arg, const char *opt_string, int flow_id, int endpoint_id)
Parse flow options with endpoint.
Definition: flowgrind.c:2479
struct timespec begin
Definition: common.h:292
static void parse_host_option(const char *hostarg, int flow_id, int endpoint_id)
Parse argument for option -H, which specifies the endpoints of a flow.
Definition: flowgrind.c:2410
#define MAX_EXTRA_SOCKET_OPTION_VALUE_LENGTH
Ensures extra options are limited in length on both controller and deamon.
Definition: common.h:71
bool log_to_stdout
Write output to screen (option -q).
Definition: flowgrind.h:184
int write_rate
The actual rate we should send.
Definition: common.h:220
int fg_list_init(struct linked_list *const list)
Initializes the list by setting its head and tail to NULL and its size to 0.
Definition: fg_list.c:34
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:132
double rtt_sum
Accumulated round-trip time.
Definition: common.h:325
int tcpi_snd_mss
Definition: common.h:281
void ap_reset_mutex(struct ap_Mutex_state *const ms)
Reset a mutex context.
Definition: fg_argparser.c:578
const struct list_node * fg_list_front(struct linked_list *const list)
Returns the first element of the list.
Definition: fg_list.c:49
#define HIDE_COLUMNS(...)
To hide intermediated interval report columns.
Definition: flowgrind.c:78
struct daemon * daemon
Pointer to manage flow endpoint daemon.
Definition: flowgrind.h:233
#define FLOWGRIND_COPYRIGHT
Flowgrind&#39;s copyright year.
Definition: common.h:82
char shutdown
shutdown() each socket direction after test flow (option (-N).
Definition: flowgrind.h:250
static struct rpc_info * set_rpc_info(const char *server_url, const char *server_name, unsigned short server_port)
Set the flow endpoint XML RPC data for a given server_url.
Definition: flowgrind.c:2224
const struct ap_Option * ap_option(const struct arg_parser *const ap, const int i)
Get the user-defined option for a given record position.
Definition: fg_argparser.c:494
Option without argument (flag).
Definition: fg_argparser.h:37
static void init_flow_options(void)
Initilization the flow option to default values.
Definition: flowgrind.c:486
Context for flow options on source side.
Definition: flowgrind.h:166
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:133
#define errx(...)
To report an error w/o a system error message.
Definition: fg_error.h:47
int asprintf_append(char **strp, const char *fmt,...)
Definition: fg_string.c:98
size_t last_width
Last width of the column.
Definition: flowgrind.h:289
static bool update_column_width(struct column *column, unsigned column_width)
Determines if the current column width column_width is larger or smaller than the old one and updates...
Definition: flowgrind.c:1621
struct timespec end
Definition: common.h:293
static void parse_cmdline(int argc, char *argv[])
The main commandline argument parsing function.
Definition: flowgrind.c:2883
double delay[2]
Delay of flow in seconds (option -Y).
Definition: common.h:188
static unsigned port
Definition: flowgrindd.c:95
struct column_header header
Column header (name and unit).
Definition: flowgrind.h:297
Infos about the flow endpoint.
Definition: flowgrind.h:225
double rtt_min
Minimum round-trip time.
Definition: common.h:321
int id
Definition: common.h:287
#define DEBUG
Definition: config.h:5
int ap_code(const struct arg_parser *const ap, const int i)
Returns the code of a parsed option with given index.
Definition: fg_argparser.c:468
#define FLOWGRIND_COPYING
Standard GPL3 no warranty message.
Definition: common.h:85
Defines a valid command line option.
Definition: fg_argparser.h:45
const char * progname
String containing name the program is called with.
Definition: fg_progname.c:35
#define FLOWGRIND_AUTHORS
Flowgrind&#39;s authors in a printable string.
Definition: common.h:91
static FILE * log_stream
Logfile for measurement output.
Definition: flowgrind.c:95
static void parse_colon_option(const char *arg)
Parse argument for option -c to hide/show intermediated interval report columns.
Definition: flowgrind.c:2654
unsigned request_blocks_read
Definition: common.h:301
const char * unit
Second header row: unit of the column.
Definition: flowgrind.h:279
char os_release[257]
Release number of the OS.
Definition: flowgrind.h:209
double iat_min
Minimum inter-arrival time.
Definition: common.h:309
Apple OS X.
Definition: flowgrind.h:67
char byte_counting
Enumerate bytes in payload instead of sending zeros (option -E).
Definition: flowgrind.h:254
int tcpi_unacked
Definition: common.h:270
int tag
User tag for distinction of options.
Definition: fg_argparser.h:53
unsigned short server_port
Port of the XMLRPC server.
Definition: flowgrind.h:221
static struct linked_list unique_daemons
Global linked list to the daemons containing UUID and daemons flowgrind version.
Definition: flowgrind.c:110
int tcpi_retransmits
Definition: common.h:274
char server_name[257]
Name of the XMLRPC server.
Definition: flowgrind.h:219
#define HAVE_LIBGSL
Definition: config.h:50
int fg_list_clear(struct linked_list *const list)
Removes and destroys all elements from the list, leaving it with a size of 0.
Definition: fg_list.c:219
static void close_logfile(void)
Close measurement output file.
Definition: flowgrind.c:576
void ap_free(struct arg_parser *const ap)
Free internal state of arg-parser.
Definition: fg_argparser.c:448
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:125
Error-reporting routines used by Flowgrind.
static char * guess_topology(unsigned mtu)
Maps common MTU sizes to network known technologies.
Definition: flowgrind.c:1963
static void print_final_report(unsigned short flow_id, enum endpoint_t e)
Print final report (i.e.
Definition: flowgrind.c:2002
Definition: common.h:286
enum column_id type
Unique column identifier.
Definition: flowgrind.h:295
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:136
int tcpi_fackets
Definition: common.h:275
static struct rpc_info * add_flow_endpoint_by_url(const char *server_url, const char *server_name, unsigned short server_port)
Add the flow endpoint XML RPC data to the Global linked list.
Definition: flowgrind.c:2195
Application level round-trip time.
Definition: flowgrind.h:113
double time_diff(const struct timespec *tp1, const struct timespec *tp2)
Returns the time difference between two the specific points in time tp1 and tp2.
Definition: fg_time.c:95
char summarize_only
Summarize only, no intermediated interval reports (option -Q).
Definition: flowgrind.h:252
unsigned response_blocks_written
Definition: common.h:304
struct fg_tcp_info tcp_info
Definition: common.h:329
Context for controller options.
Definition: flowgrind.h:162
bool ap_set_check_mutex(const struct arg_parser *const ap, struct ap_Mutex_state *const ms, const int i, int *conflict)
Check a new option record for mutex and register it at the same time.
Definition: fg_argparser.c:569
#define warnx(...)
To report a warning w/ a system error message.
Definition: fg_error.h:54
Application level one-way delay.
Definition: flowgrind.h:119
Throughput per seconds.
Definition: flowgrind.h:104
int cork
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:232
Number of elements in enum.
Definition: flowgrind.h:69
unsigned pmtu
Discovered Path MTU.
Definition: common.h:332
Application level round-trip time.
Definition: flowgrind.h:112
int tcpi_rto
Definition: common.h:279
static void prepare_xmlrpc_client(xmlrpc_client **rpc_client)
Definition: flowgrind.c:615
mutex_context_t
Mutual exclusion contexts for options.
Definition: flowgrind.h:160
double param_two
Second mathematical parameter of the distribution, if required.
Definition: common.h:172
void ap_free_mutex_state(struct ap_Mutex_state *const ms)
Free a mutex context.
Definition: fg_argparser.c:583
#define DEBUG_MSG(LVL, MSG,...)
Print debug message to standard error.
Definition: debug.h:49
static void sighandler(int sig)
Signal handler to catching signals.
Definition: flowgrind.c:448
char cc_alg[TCP_CA_NAME_MAX]
Set congestion control algorithm ALG on test socket (option -O).
Definition: common.h:236
void * data
Pointer to user defined data stored with this node.
Definition: fg_list.h:38
#define MAX_REPORTS_IN_ROW
Number of emited reports before interval header is printed again.
Definition: flowgrind.h:50
static void fetch_reports(xmlrpc_client *rpc_client)
Reports are fetched from the flow endpoint daemon.
Definition: flowgrind.c:1297
static void report_flow(struct report *report)
Reports are fetched from the flow endpoint daemon.
Definition: flowgrind.c:1487
static void parse_multi_endpoint_option(int code, const char *arg, const char *opt_string, struct ap_Mutex_state ms[], int argind, int flow_id)
Parse flow options for multiple endpoints.
Definition: flowgrind.c:2838
int ipmtudiscover
Set IP_MTU_DISCOVER on test socket (option -O).
Definition: common.h:246
Linux.
Definition: flowgrind.h:63
#define MAX_COLUM_TOO_LARGE
How often an interval report column can be too large before get shrinked.
Definition: flowgrind.h:47
Endpoint that accepts the connection.
Definition: common.h:100
Transactions per second.
Definition: flowgrind.h:106
General controller options.
Definition: flowgrind.h:150
Log Normal distribution.
Definition: common.h:144
unsigned imtu
Interface MTU.
Definition: common.h:334
static void set_column_unit(const char *unit, unsigned nargs,...)
To set the unit the in header of intermediated interval report columns.
Definition: flowgrind.c:914
static void usage(short status) __attribute__((noreturn))
#define GUARDBAND
Number of whitespaces between to two interval report columns.
Definition: flowgrind.h:44
int status
Definition: common.h:336
int pushy
Do not iterate through select() to continue sending in case block size did not suffice to fill sendin...
Definition: common.h:213
Transmission Control Protocol.
Definition: flowgrind.h:55
#define DEFAULT_LISTEN_PORT
Daemon&#39;s default listen port.
Definition: common.h:55
static void print_interval_report(unsigned short flow_id, enum endpoint_t e, struct report *report)
Print interval report report for endpoint e of flow flow_id.
Definition: flowgrind.c:1771
static void find_daemon(xmlrpc_client *rpc_client)
Checks all daemons in flow option.
Definition: flowgrind.c:805
Infos about a flowgrind daemon and daemon-controller connection.
Definition: flowgrind.h:215
double rtt_max
Maximum round-trip time.
Definition: common.h:323
FreeBSD.
Definition: flowgrind.h:65
endpoint_t
Flow endpoint types.
Definition: common.h:96
#define MIN_BLOCK_SIZE
Minium block (message) size we can send.
Definition: common.h:79
int daemon(int, int)
Remap daemon() function.
Flow option with endpoint string.
Definition: flowgrind.h:156
int ap_arguments(const struct arg_parser *const ap)
Number of arguments parsed (may be different from argc).
Definition: fg_argparser.c:463
int send_buffer_size_real
Sending buffer (SO_SNDBUF).
Definition: flowgrind.h:227
static void usage_trafgenopt(void)
Print help on flowgrind&#39;s traffic generation facilities and exit with EXIT_SUCCESS.
Definition: flowgrind.c:383
bool symbolic
Don&#39;t use symbolic values instead of number (option -p).
Definition: flowgrind.h:194
int tcpi_snd_cwnd
Definition: common.h:268
double reporting_interval
Length of reporting interval, in seconds (option -i).
Definition: flowgrind.h:182
struct column_state state
State of the column.
Definition: flowgrind.h:299
struct flow_endpoint endpoint[2]
Infos about flow endpoint.
Definition: flowgrind.h:265
#define UNUSED_ARGUMENT(x)
Suppress warning for unused argument.
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:128
int dscp
DSCP value for TOS byte (option -D).
Definition: common.h:244
Endpoint that opens the connection.
Definition: common.h:98
Pseudo short option for option –log-file.
Definition: flowgrind.h:174
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:127
int fg_list_push_back(struct linked_list *const list, void *const data)
Inserts a new element at the end of the list.
Definition: fg_list.c:167
void increase_debuglevel()
Decrease debug level.
Definition: debug.c:50
Uniform distribution.
Definition: common.h:138
#define SET_COLUMN_UNIT(unit,...)
To set the unit of intermediated interval report columns.
Definition: flowgrind.c:82
Functions to manipulate strings used by Flowgrind.
Optional Argument.
Definition: fg_argparser.h:41
int requested_read_buffer_size
Request receiver buffer, advertised window in bytes (option -W).
Definition: common.h:198
char started
Definition: daemon.c:101
unsigned request_blocks_written
Definition: common.h:302
Exponential distribution.
Definition: common.h:140
Application level inter-arrival time.
Definition: flowgrind.h:117
Flowgrind controller.
int route_record
Sets ROUTE_RECORD on test socket (option -O).
Definition: common.h:208
static void parse_general_option(int code, const char *arg, const char *opt_string)
Parse general controller options given on the cmdline.
Definition: flowgrind.c:2711
static char * log_filename
Name of logfile.
Definition: flowgrind.c:98
struct flow_settings::extra_socket_options extra_socket_options[MAX_EXTRA_SOCKET_OPTIONS]
Controller options.
Definition: flowgrind.h:178
static void check_mutex(struct ap_Mutex_state ms[], const enum mutex_context_t context, const int argind, int flow_id)
Wrapper function for mutex checking and error message printing.
Definition: flowgrind.c:2807
Argument required.
Definition: fg_argparser.h:39
Blocks per second.
Definition: flowgrind.h:109
Contains the state of all mutex.
Definition: fg_argparser.h:91
#define free_all(...)
To free() an arbitrary number of variables.
FreeBSD and OS X stack is a bytes-based stack.
Definition: flowgrind.h:77
unsigned short num_flows
Number of test flows (option -n).
Definition: flowgrind.h:180
enum report_t type
Report type - either INTERVAL or FINAL report.
Definition: common.h:291
Single element in a doubly linked list.
Definition: fg_list.h:36
unsigned oversized
How often the current column width was too high.
Definition: flowgrind.h:287
static void set_flow_endpoint_daemon(const char *server_uuid, char *server_url)
Set the daemon for controller flow endpoint.
Definition: flowgrind.c:779
unsigned response_blocks_read
Definition: common.h:303
double delay_min
Minimum one-way delay.
Definition: common.h:315
char os_name[257]
OS on which this daemon runs.
Definition: flowgrind.h:207
int tcpi_snd_ssthresh
Definition: common.h:269
#define critx(...)
To report an critical error w/o a system error message.
Definition: fg_error.h:40
int tcpi_ca_state
Definition: common.h:282
const char * name
First header row: name of the column.
Definition: flowgrind.h:277
const char * ap_opt_string(const struct arg_parser *const ap, const int i)
Get the real command line option string (may be the short or long version).
Definition: fg_argparser.c:486
#define crit(...)
To report an critical error w/ the corresponding system error message.
Definition: fg_error.h:36
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:124
int receive_buffer_size_real
Receiver buffer (SO_RCVBUF).
Definition: flowgrind.h:229
char * url
Pointer to daemon XMLPRC URL.
Definition: flowgrind.h:211
Application level round-trip time.
Definition: flowgrind.h:111
int tcpi_retrans
Definition: common.h:273
Normal distribution.
Definition: common.h:134
char finished[2]
Flag if final report for the flow is received.
Definition: flowgrind.h:269
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:123
static unsigned short active_flows
Number of currently active flows.
Definition: flowgrind.c:125
RPC related functions used by the Flowgrind controller flowgrind-stop.
static struct daemon * add_daemon_by_uuid(const char *server_uuid, char *daemon_url)
Add daemon for controller flow by UUID.
Definition: flowgrind.c:720
struct trafgen_options response_trafgen_options
Stochastic traffic generation settings for the response size.
Definition: common.h:251
Report interval.
Definition: flowgrind.h:101
Application level one-way delay.
Definition: flowgrind.h:121
static void check_idle(xmlrpc_client *rpc_client)
Checks that all nodes are currently idle.
Definition: flowgrind.c:845
struct trafgen_options interpacket_gap_trafgen_options
Stochastic traffic generation settings for the interpacket gap.
Definition: common.h:253
unsigned long long bytes_written
Definition: common.h:296
static void set_column_visibility(bool visibility, unsigned nargs,...)
To show/hide intermediated interval report columns.
Definition: flowgrind.c:893
const char * write_rate_str
Send at specified rate per second (option -R).
Definition: common.h:218
#define SHOW_COLUMNS(...)
To show intermediated interval report columns.
Definition: flowgrind.c:74
static xmlrpc_env rpc_env
Definition: flowgrind.c:104
int elcn
Set TCP_ELCN (20) on test socket (option -O).
Definition: common.h:238
struct report * final_report[2]
Final report from the daemon.
Definition: flowgrind.h:271
distribution_t
Stochastic distributions for traffic generation.
Definition: common.h:130
Flow ID.
Definition: flowgrind.h:99
Final report.
Definition: common.h:116
Internal state of the argument parser.
Definition: fg_argparser.h:73
struct list_node * next
Pointer to the previous node in the list.
Definition: fg_list.h:40
Weibull distribution.
Definition: common.h:136
static struct daemon * set_unique_daemon_by_uuid(const char *server_uuid, char *daemon_url)
Determine the daemons for controller flow by UUID.
Definition: flowgrind.c:747
column_id
IDs to explicit address an intermediated interval report column.
Definition: flowgrind.h:97
char uuid[38]
UUID of the daemon.
Definition: flowgrind.h:203
static void check_version(xmlrpc_client *rpc_client)
Checks all the daemons flowgrind version.
Definition: flowgrind.c:652
double iat_max
Maximum inter-arrival time.
Definition: common.h:311
int traffic_dump
Dump traffic using libpcap (option -M).
Definition: common.h:204
double delay_max
Maximum one-way delay.
Definition: common.h:317
Read operation.
Definition: common.h:108
bool log_to_file
Write output to logfile (option -w).
Definition: flowgrind.h:186
static void parse_rate_option(const char *arg, int flow_id, int endpoint_id)
Parse argument for option -R, which specifies the rate the endpoint will send.
Definition: flowgrind.c:2349
int so_debug
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:206
unsigned long long bytes_read
Definition: common.h:295
Write operation.
Definition: common.h:106
static void print_all_final_reports(void)
Print final report (i.e.
Definition: flowgrind.c:2176
Data structures used by the Flowgrind daemon and controller.
static void parse_trafgen_option(const char *params, int flow_id, int endpoint_id)
Parse option for stochastic traffic generation (option -G).
Definition: flowgrind.c:2251
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:131
static void die_if_fault_occurred(xmlrpc_env *env)
Definition: flowgrind.c:608
Intermediated interval report column.
Definition: flowgrind.h:293
static void prepare_all_flows(xmlrpc_client *rpc_client)
Prepare test connection for all flows in a test.
Definition: flowgrind.c:1219
static void print_output(const char *fmt,...)
Print measurement output to logfile and / or to stdout.
Definition: flowgrind.c:592
Common definitions used by the Flowgrind daemon, controller, and libs.
enum tcp_stack_t force_unit
Force kernel output to specific unit (option -s).
Definition: flowgrind.h:196
int tcpi_lost
Definition: common.h:272
double duration[2]
Duration of flow in seconds (option -T).
Definition: common.h:190
int flow_id
Flow ID maintained by controller.
Definition: common.h:186
No stochastic distribution.
Definition: common.h:132
int flow_control
Stop flow if it is experiencing local congestion (option -C).
Definition: common.h:226
Context for flow options on destination side.
Definition: flowgrind.h:168
struct flow_settings settings[2]
Flow specific options.
Definition: flowgrind.h:267
Application level inter-arrival time.
Definition: flowgrind.h:116
Blocks per second.
Definition: flowgrind.h:108
double param_one
First mathemathical parameter of the distribution.
Definition: common.h:170
int endpoint_id[2]
ID used internally by the deamon to distinguish its flows.
Definition: flowgrind.h:261
Selects a subset of flows to apply options to (-F).
Definition: flowgrind.h:152
const char * dump_prefix
Prefix for dumpfile (option -e).
Definition: flowgrind.h:188
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:129
enum distribution_t distribution
The stochastic distribution to draw values from.
Definition: common.h:168
double time_diff_now(const struct timespec *tp)
Returns time difference between now and the specific point in time tp.
Definition: fg_time.c:101
Metric from the Linux / BSD TCP stack.
Definition: flowgrind.h:137
#define ASSIGN_MAX(s, c)
Assign value if it&#39;s greater than current one.
char server_url[1000]
XMLRPC URL for this daemon.
Definition: flowgrind.h:217
Context for flow options for both endpoints.
Definition: flowgrind.h:164
int tcpi_reordering
Definition: common.h:276
int api_version
Flowgrind API version supported by this daemon.
Definition: flowgrind.h:205
static void close_all_flows(void)
Stop test connections for all flows in a test.
Definition: flowgrind.c:1538
static size_t det_num_digits(double value)
Determines the length of the integer part of a decimal number.
Definition: flowgrind.c:1590
int nonagle
Disable nagle algorithm on test socket (option -O).
Definition: common.h:234
bool visible
Dynamically turn an column on/off.
Definition: flowgrind.h:285
A doubly linked list.
Definition: fg_list.h:46
struct timespec start_timestamp[2]
Timestamp set just before starting flow.
Definition: flowgrind.h:263
Application level one-way delay.
Definition: flowgrind.h:120
size_t fg_list_size(struct linked_list *const list)
Returns the number of elements in the list.
Definition: fg_list.c:211
static void prepare_flow(int id, xmlrpc_client *rpc_client)
Prepare test connection for a flow between source and destination daemons.
Definition: flowgrind.c:991
char late_connect
Call connect() immediately before sending data (option -L).
Definition: flowgrind.h:248
static bool print_column(char **header1, char **header2, char **data, enum column_id column_id, double value, unsigned accuracy)
Append measured data for interval report column column_id to given strings.
Definition: flowgrind.c:1711
static void sanity_check(void)
Sanity checking flow options.
Definition: flowgrind.c:3062
Pareto distribution.
Definition: common.h:142
int tcpi_backoff
Definition: common.h:280
Application level inter-arrival time.
Definition: flowgrind.h:115
Settings that describe a flow between from a endpoint&#39;s perspective.
Definition: common.h:181
int tcpi_sacked
Definition: common.h:271
bool ap_init(struct arg_parser *const ap, const int argc, const char *const argv[], const struct ap_Option options[], const char in_order)
Initialize the arg-parser given command line and user-defined options.
Definition: fg_argparser.c:374
int tcpi_rtt
Definition: common.h:277
Read / write status.
Definition: flowgrind.h:141
bool ap_init_mutex_state(const struct arg_parser *const ap, struct ap_Mutex_state *const ms)
Initialize a new mutex state table.
Definition: fg_argparser.c:516
const char * ap_error(const struct arg_parser *const ap)
Get the string containing errors encountered during parsing.
Definition: fg_argparser.c:458
Timing related routines used by Flowgrind.
#define FLOWGRIND_VERSION
Flowgrind version number.
Definition: common.h:44
const char * ap_argument(const struct arg_parser *const ap, const int i)
Returns the argument of a parsed option.
Definition: fg_argparser.c:478
struct trafgen_options request_trafgen_options
Stochastic traffic generation settings for the request size.
Definition: common.h:249