Flowgrind
Advanced TCP traffic generator
daemon.c
Go to the documentation of this file.
1 
6 /*
7  * Copyright (C) 2010-2013 Christian Samsel <christian.samsel@rwth-aachen.de>
8  * Copyright (C) 2009 Tim Kosse <tim.kosse@gmx.de>
9  * Copyright (C) 2007-2008 Daniel Schaffrath <daniel.schaffrath@mac.com>
10  *
11  * This file is part of Flowgrind.
12  *
13  * Flowgrind is free software: you can redistribute it and/or modify
14  * it under the terms of the GNU General Public License as published by
15  * the Free Software Foundation, either version 3 of the License, or
16  * (at your option) any later version.
17  *
18  * Flowgrind is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with Flowgrind. If not, see <http://www.gnu.org/licenses/>.
25  *
26  */
27 
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif /* HAVE_CONFIG_H */
31 
32 #include <assert.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <stdarg.h>
36 #include <stdbool.h>
37 #include <strings.h>
38 #include <signal.h>
39 #include <string.h>
40 #include <fcntl.h>
41 #include <math.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <sys/param.h>
45 #include <sys/select.h>
46 #include <netinet/in.h>
47 #include <arpa/inet.h>
48 #include <unistd.h>
49 #include <sys/wait.h>
50 #include <errno.h>
51 #include <time.h>
52 #include <syslog.h>
53 #include <sys/time.h>
54 #include <netdb.h>
55 #include <pthread.h>
56 #include <inttypes.h>
57 #include <float.h>
58 #include <uuid/uuid.h>
59 
60 #include "common.h"
61 #include "debug.h"
62 #include "fg_error.h"
63 #include "fg_math.h"
64 #include "fg_definitions.h"
65 #include "fg_socket.h"
66 #include "fg_time.h"
67 #include "fg_log.h"
68 #include "daemon.h"
69 #include "source.h"
70 #include "destination.h"
71 #include "trafgen.h"
72 
73 #ifdef HAVE_LIBPCAP
74 #include "fg_pcap.h"
75 #endif /* HAVE_LIBPCAP */
76 
77 #ifndef SOL_TCP
78 #define SOL_TCP IPPROTO_TCP
79 #endif /* SOL_TCP */
80 
81 #ifndef SOL_IP
82 #define SOL_IP IPPROTO_IP
83 #endif /* SOL_IP */
84 
85 #define CONGESTION_LIMIT 10000
86 
87 int daemon_pipe[2];
88 
89 pthread_mutex_t mutex;
90 struct request *requests = 0, *requests_last = 0;
91 
92 fd_set rfds, wfds, efds;
93 int maxfd;
94 
95 struct report* reports = 0;
96 struct report* reports_last = 0;
97 unsigned pending_reports = 0;
98 
100 
101 char started = 0;
102 
103 /* Forward declarations */
104 static int write_data(struct flow *flow);
105 static int read_data(struct flow *flow);
106 static void process_rtt(struct flow* flow);
107 static void process_iat(struct flow* flow);
108 static void process_delay(struct flow* flow);
109 static void report_flow(struct flow* flow, int type);
110 static void send_response(struct flow* flow,
111  int requested_response_block_size);
112 int get_tcp_info(struct flow *flow, struct fg_tcp_info *info);
113 
114 
115 void flow_error(struct flow *flow, const char *fmt, ...)
116 {
117  char str[1000];
118  va_list ap;
119 
120  va_start(ap, fmt);
121  vsnprintf(str, 1000, fmt, ap);
122  va_end(ap);
123  str[sizeof(str) - 1] = 0;
124  flow->error = malloc(strlen(str) + 1);
125  strcpy(flow->error, str);
126 }
127 
128 void request_error(struct request *request, const char *fmt, ...)
129 {
130  char str[1000];
131  va_list ap;
132 
133  va_start(ap, fmt);
134  vsnprintf(str, 1000, fmt, ap);
135  va_end(ap);
136  str[sizeof(str) - 1] = 0;
137  request->error = malloc(strlen(str) + 1);
138  strcpy(request->error, str);
139 }
140 
141 static inline int flow_in_delay(struct timespec *now, struct flow *flow,
142  int direction)
143 {
144  return time_is_after(&flow->start_timestamp[direction], now);
145 }
146 
147 
148 static inline int flow_sending(struct timespec *now, struct flow *flow,
149  int direction)
150 {
151  return !flow_in_delay(now, flow, direction) &&
152  (flow->settings.duration[direction] < 0 ||
153  time_diff_now(&flow->stop_timestamp[direction]) < 0.0);
154 }
155 
156 static inline int flow_block_scheduled(struct timespec *now, struct flow *flow)
157 {
158  return time_is_after(now, &flow->next_write_block_timestamp);
159 }
160 
161 void uninit_flow(struct flow *flow)
162 {
163  DEBUG_MSG(LOG_DEBUG,"uninit_flow() called for flow %d",flow->id);
164  if (flow->fd != -1)
165  close(flow->fd);
166  if (flow->listenfd_data != -1)
167  close(flow->listenfd_data);
168 #ifdef HAVE_LIBPCAP
169  int rc;
170  if (flow->settings.traffic_dump && flow->pcap_thread) {
171  rc = pthread_cancel(flow->pcap_thread);
172  if (rc)
173  logging(LOG_WARNING, "failed to cancel dump thread: %s",
174  strerror(rc));
175 
176  /* wait for the dump thread to react to the cancellation request */
177  rc = pthread_join(flow->pcap_thread, NULL);
178  if (rc)
179  logging(LOG_WARNING, "failed to join dump thread: %s",
180  strerror(rc));
181  }
182 #endif /* HAVE_LIBPCAP */
183  free_all(flow->read_block, flow->write_block, flow->addr, flow->error);
184  free_math_functions(flow);
185 }
186 
187 void remove_flow(struct flow * const flow)
188 {
189  fg_list_remove(&flows, flow);
190  free(flow);
191  if (!fg_list_size(&flows))
192  started = 0;
193 }
194 
195 static void prepare_wfds(struct timespec *now, struct flow *flow, fd_set *wfds)
196 {
197  int rc = 0;
198 
199  if (flow_in_delay(now, flow, WRITE)) {
200  DEBUG_MSG(LOG_WARNING, "flow %i not started yet (delayed)",
201  flow->id);
202  return;
203  }
204 
205  if (flow_sending(now, flow, WRITE)) {
206  assert(!flow->finished[WRITE]);
207  if (flow_block_scheduled(now, flow)) {
208  DEBUG_MSG(LOG_DEBUG, "adding sock of flow %d to wfds",
209  flow->id);
210  FD_SET(flow->fd, wfds);
211  } else {
212  DEBUG_MSG(LOG_DEBUG, "no block for flow %d scheduled "
213  "yet", flow->id);
214  }
215  } else if (!flow->finished[WRITE]) {
216  flow->finished[WRITE] = 1;
217  if (flow->settings.shutdown) {
218  DEBUG_MSG(LOG_WARNING, "shutting down flow %d (WR)",
219  flow->id);
220  rc = shutdown(flow->fd,SHUT_WR);
221  if (rc == -1)
222  warn("shutdown() SHUT_WR failed");
223  }
224  }
225 
226  return;
227 }
228 
229 static int prepare_rfds(struct timespec *now, struct flow *flow, fd_set *rfds)
230 {
231  int rc = 0;
232 
233  if (!flow_in_delay(now, flow, READ) && !flow_sending(now, flow, READ)) {
234  if (!flow->finished[READ] && flow->settings.shutdown) {
235  warnx("server flow %u missed to shutdown", flow->id);
236  rc = shutdown(flow->fd, SHUT_RD);
237  if (rc == -1)
238  warn("shutdown SHUT_RD failed");
239  flow->finished[READ] = 1;
240  }
241  }
242 
243  if (flow->source_settings.late_connect && !flow->connect_called ) {
244  DEBUG_MSG(LOG_ERR, "late connecting test socket for flow %d "
245  "after %.3fs delay",
246  flow->id, flow->settings.delay[WRITE]);
247  if (do_connect(flow) == -1) {
248  return -1;
249  }
250  }
251 
252  /* Altough the server flow might be finished we keep the socket in
253  * rfd in order to check for buggy servers */
254  if (flow->connect_called && !flow->finished[READ]) {
255  DEBUG_MSG(LOG_DEBUG, "adding sock of flow %d to rfds",
256  flow->id);
257  FD_SET(flow->fd, rfds);
258  }
259 
260  return 0;
261 }
262 
263 static int prepare_fds() {
264 
265  DEBUG_MSG(LOG_DEBUG, "prepare_fds() called, number of flows: %zu",
266  fg_list_size(&flows));
267 
268  FD_ZERO(&rfds);
269  FD_ZERO(&wfds);
270  FD_ZERO(&efds);
271 
272  FD_SET(daemon_pipe[0], &rfds);
273  maxfd = daemon_pipe[0];
274 
275  struct timespec now;
276  gettime(&now);
277 
278  const struct list_node *node = fg_list_front(&flows);
279  while (node) {
280  struct flow *flow = node->data;
281  node = node->next;
282 
283  if (started &&
284  (flow->finished[READ] ||
285  !flow->settings.duration[READ] ||
286  (!flow_in_delay(&now, flow, READ) &&
287  !flow_sending(&now, flow, READ))) &&
288  (flow->finished[WRITE] ||
289  !flow->settings.duration[WRITE] ||
290  (!flow_in_delay(&now, flow, WRITE) &&
291  !flow_sending(&now, flow, WRITE)))) {
292 
293  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
294  flow->statistics[FINAL].has_tcp_info =
295  get_tcp_info(flow,
296  &flow->statistics[FINAL].tcp_info)
297  ? 0 : 1;
298 
299  flow->pmtu = get_pmtu(flow->fd);
300 
301  if (flow->settings.reporting_interval)
302  report_flow(flow, INTERVAL);
303  report_flow(flow, FINAL);
304  uninit_flow(flow);
305  remove_flow(flow);
306  continue;
307  }
308 
309  if (flow->state == GRIND_WAIT_ACCEPT &&
310  flow->listenfd_data != -1) {
311  FD_SET(flow->listenfd_data, &rfds);
312  maxfd = MAX(maxfd, flow->listenfd_data);
313  }
314 
315  if (!started)
316  continue;
317 
318  if (flow->fd != -1) {
319  FD_SET(flow->fd, &efds);
320  maxfd = MAX(maxfd, flow->fd);
321  prepare_wfds(&now, flow, &wfds);
322  prepare_rfds(&now, flow, &rfds);
323  }
324  }
325 
326  return fg_list_size(&flows);
327 }
328 
330 {
331  struct timespec start;
332  gettime(&start);
333 
334 #if 0
335  if (start.tv_sec < request->start_timestamp) {
336  /* If the clock is synchronized between nodes, all nodes will
337  * start at the same time regardless of any RPC delays */
338  start.tv_sec = request->start_timestamp;
339  start.tv_nsec = 0;
340  }
341 #else /* 0 */
342  UNUSED_ARGUMENT(request);
343 #endif /* 0 */
344 
345  const struct list_node *node = fg_list_front(&flows);
346  while (node) {
347  struct flow *flow = node->data;
348  node = node->next;
349  /* initalize random number generator etc */
351 
352  /* READ and WRITE */
353  for (int j = 0; j < 2; j++) {
354  flow->start_timestamp[j] = start;
355  time_add(&flow->start_timestamp[j],
356  flow->settings.delay[j]);
357  if (flow->settings.duration[j] >= 0) {
358  flow->stop_timestamp[j] =
359  flow->start_timestamp[j];
360  time_add(&flow->stop_timestamp[j],
361  flow->settings.duration[j]);
362  }
363  }
365  flow->start_timestamp[WRITE];
366 
367  gettime(&flow->last_report_time);
368  flow->first_report_time = flow->last_report_time;
369  flow->next_report_time = flow->last_report_time;
370 
371  time_add(&flow->next_report_time,
373  }
374 
375  started = 1;
376 }
377 
378 static void stop_flow(struct request_stop_flow *request)
379 {
380  DEBUG_MSG(LOG_DEBUG, "stop_flow forcefully unlocked mutex");
381  pthread_mutex_unlock(&mutex);
382 
383  if (request->flow_id == -1) {
384  /* Stop all flows */
385 
386  const struct list_node *node = fg_list_front(&flows);
387  while (node) {
388  struct flow *flow = node->data;
389  node = node->next;
390 
391  flow->statistics[FINAL].has_tcp_info =
392  get_tcp_info(flow,
393  &flow->statistics[FINAL].tcp_info)
394  ? 0 : 1;
395  flow->pmtu = get_pmtu(flow->fd);
396 
397  if (flow->settings.reporting_interval)
398  report_flow(flow, INTERVAL);
399  report_flow(flow, FINAL);
400 
401  uninit_flow(flow);
402  remove_flow(flow);
403  }
404 
405  return;
406  }
407 
408  const struct list_node *node = fg_list_front(&flows);
409  while (node) {
410  struct flow *flow = node->data;
411  node = node->next;
412 
413  if (flow->id != request->flow_id)
414  continue;
415 
416  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
417  flow->statistics[FINAL].has_tcp_info =
418  get_tcp_info(flow,
419  &flow->statistics[FINAL].tcp_info)
420  ? 0 : 1;
421  flow->pmtu = get_pmtu(flow->fd);
422 
423  if (flow->settings.reporting_interval)
424  report_flow(flow, INTERVAL);
425  report_flow(flow, FINAL);
426 
427  uninit_flow(flow);
428  remove_flow(flow);
429  return;
430  }
431 
432  request_error(&request->r, "Unknown flow id");
433 }
434 
442 static void process_requests()
443 {
444  int rc;
445  DEBUG_MSG(LOG_DEBUG, "process_requests trying to lock mutex");
446  pthread_mutex_lock(&mutex);
447  DEBUG_MSG(LOG_DEBUG, "process_requests locked mutex");
448 
449  char tmp[100];
450  for (;;) {
451  int rc = read(daemon_pipe[0], tmp, 100);
452  if (rc != 100)
453  break;
454  }
455 
456  while (requests) {
457  struct request* request = requests;
458  requests = requests->next;
459  rc = 0;
460 
461  switch (request->type) {
463  add_flow_destination((struct
465  *)request);
466  break;
467  case REQUEST_ADD_SOURCE:
468  rc = add_flow_source((struct
470  *)request);
471  break;
472  case REQUEST_START_FLOWS:
473  start_flows((struct request_start_flows *)request);
474  break;
475  case REQUEST_STOP_FLOW:
476  stop_flow((struct request_stop_flow *)request);
477  break;
478  case REQUEST_GET_STATUS:
479  {
480  struct request_get_status *r =
481  (struct request_get_status *)request;
482  r->started = started;
484  }
485  break;
486  case REQUEST_GET_UUID:
487  {
488  struct request_get_uuid *r =
489  (struct request_get_uuid *)request;
491  }
492  break;
493  default:
494  request_error(request, "Unknown request type");
495  break;
496  }
497  if (rc != 1)
498  pthread_cond_signal(request->condition);
499  }
500 
501  pthread_mutex_unlock(&mutex);
502  DEBUG_MSG(LOG_DEBUG, "process_requests unlocked mutex");
503 }
504 
518 static void report_flow(struct flow* flow, int type)
519 {
520  DEBUG_MSG(LOG_DEBUG, "report_flow called for flow %d (type %d)",
521  flow->id, type);
522  struct report* report =
523  (struct report*)malloc(sizeof(struct report));
524 
525  report->id = flow->id;
526  report->endpoint = flow->endpoint;
527  report->type = type;
528 
529  if (type == INTERVAL)
530  report->begin = flow->last_report_time;
531  else
532  report->begin = flow->first_report_time;
533 
534  gettime(&report->end);
535  flow->last_report_time = report->end;
536 
537  /* abort if we were scheduled way to early for a interval report */
538  if (time_diff(&report->begin,&report->end) < 0.2 *
539  flow->settings.reporting_interval && type == INTERVAL){
540  free(report);
541  return;
542  }
543 
544  report->bytes_read = flow->statistics[type].bytes_read;
545  report->bytes_written = flow->statistics[type].bytes_written;
546  report->request_blocks_read =
548  report->response_blocks_read =
550  report->request_blocks_written =
552  report->response_blocks_written =
554 
555  report->rtt_min = flow->statistics[type].rtt_min;
556  report->rtt_max = flow->statistics[type].rtt_max;
557  report->rtt_sum = flow->statistics[type].rtt_sum;
558  report->iat_min = flow->statistics[type].iat_min;
559  report->iat_max = flow->statistics[type].iat_max;
560  report->iat_sum = flow->statistics[type].iat_sum;
561  report->delay_min = flow->statistics[type].delay_min;
562  report->delay_max = flow->statistics[type].delay_max;
563  report->delay_sum = flow->statistics[type].delay_sum;
564 
565  /* Currently this will only contain useful information on Linux
566  * and FreeBSD */
567  report->tcp_info = flow->statistics[type].tcp_info;
568 
569  if (flow->fd != -1) {
570  /* Get latest MTU */
571  flow->pmtu = get_pmtu(flow->fd);
572  report->pmtu = flow->pmtu;
573  if (type == FINAL)
574  report->imtu = get_imtu(flow->fd);
575  else
576  report->imtu = 0;
577  } else {
578  report->imtu = 0;
579  report->pmtu = 0;
580  }
581  /* Add status flags to report */
582  report->status = 0;
583 
584  if (flow->statistics[type].bytes_read == 0) {
585  if (flow_in_delay(&report->end, flow, READ))
586  report->status |= 'd';
587  else if (flow_sending(&report->end, flow, READ))
588  report->status |= 'l';
589  else if (flow->settings.duration[READ] == 0)
590  report->status |= 'o';
591  else
592  report->status |= 'f';
593  } else {
594  if (!flow_sending(&report->end, flow, READ) && !flow->finished)
595  report->status |= 'c';
596  else
597  report->status |= 'n';
598  }
599  report->status <<= 8;
600 
601  if (flow->statistics[type].bytes_written == 0) {
602  if (flow_in_delay(&report->end, flow, WRITE))
603  report->status |= 'd';
604  else if (flow_sending(&report->end, flow, WRITE))
605  report->status |= 'l';
606  else if (flow->settings.duration[WRITE] == 0)
607  report->status |= 'o';
608  else
609  report->status |= 'f';
610  } else {
611  if (!flow_sending(&report->end, flow, WRITE) && !flow->finished)
612  report->status |= 'c';
613  else
614  report->status |= 'n';
615  }
616 
617  /* New report interval, reset old data */
618  if (type == INTERVAL) {
619  flow->statistics[INTERVAL].bytes_read = 0;
620  flow->statistics[INTERVAL].bytes_written = 0;
621 
624 
627 
628  flow->statistics[INTERVAL].rtt_min = FLT_MAX;
629  flow->statistics[INTERVAL].rtt_max = FLT_MIN;
630  flow->statistics[INTERVAL].rtt_sum = 0.0F;
631  flow->statistics[INTERVAL].iat_min = FLT_MAX;
632  flow->statistics[INTERVAL].iat_max = FLT_MIN;
633  flow->statistics[INTERVAL].iat_sum = 0.0F;
634  flow->statistics[INTERVAL].delay_min = FLT_MAX;
635  flow->statistics[INTERVAL].delay_max = FLT_MIN;
636  flow->statistics[INTERVAL].delay_sum = 0.0F;
637  }
638 
639  add_report(report);
640  DEBUG_MSG(LOG_DEBUG, "report_flow finished for flow %d (type %d)",
641  flow->id, type);
642 }
643 
644 /* Fills the given _fg_tcp_info with the values of the OS specific tcp_info,
645  * returns 0 on success */
646 int get_tcp_info(struct flow *flow, struct fg_tcp_info *info)
647 {
648 #ifdef HAVE_TCP_INFO
649  struct tcp_info tmp_info;
650  socklen_t info_len = sizeof(tmp_info);
651  int rc;
652  memset(info, 0, sizeof(struct fg_tcp_info));
653 
654  rc = getsockopt(flow->fd, IPPROTO_TCP, TCP_INFO, &tmp_info, &info_len);
655  if (rc == -1) {
656  warn("getsockopt() failed");
657  return -1;
658  }
659  #define CPY_INFO_MEMBER(a) info->a = (int) tmp_info.a;
660  CPY_INFO_MEMBER(tcpi_snd_cwnd);
661  CPY_INFO_MEMBER(tcpi_snd_ssthresh);
662  CPY_INFO_MEMBER(tcpi_rtt);
663  CPY_INFO_MEMBER(tcpi_rttvar);
664  CPY_INFO_MEMBER(tcpi_rto);
665  CPY_INFO_MEMBER(tcpi_snd_mss);
666 
667  /* TODO FreeBSD 9.1 doesn't fill these members, but maybe FreeBSD 10.0
668  * will fill it, so get rid of this ifdef */
669 #ifdef __LINUX__
670  CPY_INFO_MEMBER(tcpi_backoff);
671  CPY_INFO_MEMBER(tcpi_unacked);
672  CPY_INFO_MEMBER(tcpi_sacked);
673  CPY_INFO_MEMBER(tcpi_lost);
674  CPY_INFO_MEMBER(tcpi_retrans);
675  CPY_INFO_MEMBER(tcpi_retransmits);
676  CPY_INFO_MEMBER(tcpi_fackets);
677  CPY_INFO_MEMBER(tcpi_reordering);
678  CPY_INFO_MEMBER(tcpi_ca_state);
679 #endif /* __LINUX__ */
680 #else /* HAVE_TCP_INFO */
681  UNUSED_ARGUMENT(flow);
682  memset(info, 0, sizeof(struct fg_tcp_info));
683 #endif /* HAVE_TCP_INFO */
684  return 0;
685 }
686 
687 static void timer_check()
688 {
689  struct timespec now;
690 
691  if (!started)
692  return;
693 
694  gettime(&now);
695  const struct list_node *node = fg_list_front(&flows);
696  while (node) {
697  struct flow *flow = node->data;
698  node = node->next;
699 
700  DEBUG_MSG(LOG_DEBUG, "processing timer_check() for flow %d",
701  flow->id);
702 
703  if (!flow->settings.reporting_interval)
704  continue;
705 
706  if (!time_is_after(&now, &flow->next_report_time))
707  continue;
708 
709  /* On Other OSes than Linux or FreeBSD, tcp_info will contain all zeroes */
710  if (flow->fd != -1)
712  get_tcp_info(flow,
713  &flow->statistics[INTERVAL].tcp_info)
714  ? 0 : 1;
715  report_flow(flow, INTERVAL);
716 
717  do {
718  time_add(&flow->next_report_time,
720  } while (time_is_after(&now, &flow->next_report_time));
721  }
722  DEBUG_MSG(LOG_DEBUG, "finished timer_check()");
723 }
724 
725 static void process_select(fd_set *rfds, fd_set *wfds, fd_set *efds)
726 {
727  const struct list_node *node = fg_list_front(&flows);
728  while (node) {
729  struct flow *flow = node->data;
730  node = node->next;
731 
732  DEBUG_MSG(LOG_DEBUG, "processing pselect() for flow %d",
733  flow->id);
734 
735  if (flow->listenfd_data != -1 &&
736  FD_ISSET(flow->listenfd_data, rfds)) {
737  DEBUG_MSG(LOG_DEBUG, "ready for accept");
738  if (flow->state == GRIND_WAIT_ACCEPT) {
739  if (accept_data(flow) == -1) {
740  DEBUG_MSG(LOG_ERR, "accept_data() "
741  "failed");
742  goto remove;
743  }
744  }
745  }
746 
747  if (flow->fd != -1) {
748  if (FD_ISSET(flow->fd, efds)) {
749  int error_number, rc;
750  socklen_t error_number_size =
751  sizeof(error_number);
752  DEBUG_MSG(LOG_DEBUG, "sock of flow %d in efds",
753  flow->id);
754  rc = getsockopt(flow->fd, SOL_SOCKET,
755  SO_ERROR,
756  (void *)&error_number,
757  &error_number_size);
758  if (rc == -1) {
759  warn("failed to get errno for"
760  "non-blocking connect");
761  goto remove;
762  }
763  if (error_number != 0) {
764  warnc(error_number, "connect");
765  goto remove;
766  }
767  }
768  if (FD_ISSET(flow->fd, wfds))
769  if (write_data(flow) == -1) {
770  DEBUG_MSG(LOG_ERR, "write_data() failed");
771  goto remove;
772  }
773 
774  if (FD_ISSET(flow->fd, rfds))
775  if (read_data(flow) == -1) {
776  DEBUG_MSG(LOG_ERR, "read_data() failed");
777  goto remove;
778  }
779  }
780  continue;
781 remove:
782  if (flow->fd != -1) {
783  flow->statistics[FINAL].has_tcp_info =
784  get_tcp_info(flow,
785  &flow->statistics[FINAL].tcp_info)
786  ? 0 : 1;
787  }
788  flow->pmtu = get_pmtu(flow->fd);
789  report_flow(flow, FINAL);
790  uninit_flow(flow);
791  DEBUG_MSG(LOG_ERR, "removing flow %d", flow->id);
792  remove_flow(flow);
793  }
794 }
795 
796 void* daemon_main(void* ptr __attribute__((unused)))
797 {
798  struct timespec timeout;
799  for (;;) {
800  int need_timeout = prepare_fds();
801 
802  timeout.tv_sec = 0;
803  timeout.tv_nsec = DEFAULT_SELECT_TIMEOUT;
804  DEBUG_MSG(LOG_DEBUG, "calling pselect() need_timeout: %i",
805  need_timeout);
806  int rc = pselect(maxfd + 1, &rfds, &wfds, &efds,
807  need_timeout ? &timeout : 0, NULL);
808  if (rc < 0) {
809  if (errno == EINTR)
810  continue;
811  crit("pselect() failed");
812  }
813  DEBUG_MSG(LOG_DEBUG, "pselect() finished");
814 
815  if (FD_ISSET(daemon_pipe[0], &rfds))
817 
818  timer_check();
819  process_select(&rfds, &wfds, &efds);
820  }
821 }
822 
823 void add_report(struct report* report)
824 {
825  DEBUG_MSG(LOG_DEBUG, "add_report trying to lock mutex");
826  pthread_mutex_lock(&mutex);
827  DEBUG_MSG(LOG_DEBUG, "add_report aquired mutex");
828  /* Do not keep too much data */
829  if (pending_reports >= 250 && report->type != FINAL) {
830  free(report);
831  pthread_mutex_unlock(&mutex);
832  return;
833  }
834 
835  report->next = 0;
836 
837  if (reports_last)
838  reports_last->next = report;
839  else
840  reports = report;
841 
842  reports_last = report;
843  pending_reports++;
844 
845  pthread_mutex_unlock(&mutex);
846  DEBUG_MSG(LOG_DEBUG, "add_report unlocked mutex");
847 }
848 
849 struct report* get_reports(int *has_more)
850 {
851  const unsigned max_reports = 50;
852 
853  struct report* ret;
854  DEBUG_MSG(LOG_DEBUG, "get_reports trying to lock mutex");
855  pthread_mutex_lock(&mutex);
856  DEBUG_MSG(LOG_DEBUG, "get_reports aquired mutex");
857  ret = reports;
858 
859  if (pending_reports <= max_reports) {
860  *has_more = 0;
861  pending_reports = 0;
862  reports = NULL;
863  reports_last = 0;
864  } else {
865  /* Split off first 50 items */
866  struct report* tmp;
867  for (unsigned i = 0; i < max_reports - 1; i++)
868  reports = reports->next;
869  tmp = reports->next;
870  reports->next = 0;
871  reports = tmp;
872 
873  pending_reports -= max_reports;
874  *has_more = 1;
875  }
876 
877  pthread_mutex_unlock(&mutex);
878  DEBUG_MSG(LOG_DEBUG, "get_reports unlocked mutex");
879  return ret;
880 }
881 
892 void init_flow(struct flow* flow, int is_source)
893 {
894  memset(flow, 0, sizeof(struct flow));
895 
896  /* flow id is given by controller */
897  flow->id = -1;
898  flow->endpoint = is_source ? SOURCE : DESTINATION;
899  flow->state = is_source ? GRIND_WAIT_CONNECT : GRIND_WAIT_ACCEPT;
900  flow->fd = -1;
901  flow->listenfd_data = -1;
902 
905 
906  flow->finished[READ] = flow->finished[WRITE] = 0;
907 
908  flow->addr = 0;
909 
910  foreach(int *i, INTERVAL, FINAL) {
911  flow->statistics[*i].bytes_read = 0;
912  flow->statistics[*i].bytes_written = 0;
913 
914  flow->statistics[*i].request_blocks_read = 0;
915  flow->statistics[*i].request_blocks_written = 0;
916  flow->statistics[*i].response_blocks_read = 0;
917  flow->statistics[*i].response_blocks_written = 0;
918 
919  flow->statistics[*i].rtt_min = FLT_MAX;
920  flow->statistics[*i].rtt_max = FLT_MIN;
921  flow->statistics[*i].rtt_sum = 0.0F;
922  flow->statistics[*i].iat_min = FLT_MAX;
923  flow->statistics[*i].iat_max = FLT_MIN;
924  flow->statistics[*i].iat_sum = 0.0F;
925  flow->statistics[*i].delay_min = FLT_MAX;
926  flow->statistics[*i].delay_max = FLT_MIN;
927  flow->statistics[*i].delay_sum = 0.0F;
928  }
929 
930  DEBUG_MSG(LOG_NOTICE, "called init flow %d", flow->id);
931 }
932 
933 static int write_data(struct flow *flow)
934 {
935  int rc = 0;
936  int response_block_size = 0;
937  double interpacket_gap = .0;
938  for (;;) {
939 
940  /* fill buffer with new data */
941  if (flow->current_block_bytes_written == 0) {
944  response_block_size = next_response_block_size(flow);
945  /* serialize data:
946  * this_block_size */
947  ((struct block *)flow->write_block)->this_block_size =
948  htonl(flow->current_write_block_size);
949  /* requested_block_size */
950  ((struct block *)flow->write_block)->request_block_size =
951  htonl(response_block_size);
952  /* write rtt data (will be echoed back by the receiver
953  * in the response packet) */
954  gettime((struct timespec *)
955  (flow->write_block + 2 * (sizeof (int32_t))));
956 
957  DEBUG_MSG(LOG_DEBUG, "wrote new request data to out "
958  "buffer bs = %d, rqs = %d, on flow %d",
959  ntohl(((struct block *)flow->write_block)->this_block_size),
960  ntohl(((struct block *)flow->write_block)->request_block_size),
961  flow->id);
962  }
963 
964  rc = write(flow->fd,
965  flow->write_block +
969 
970  if (rc == -1) {
971  if (errno == EAGAIN) {
972  logging(LOG_WARNING, "write queue limit hit for "
973  "flow %d", flow->id);
974  break;
975  }
976  DEBUG_MSG(LOG_WARNING, "write() returned %d on flow %d, "
977  "fd %d: %s", rc, flow->id, flow->fd,
978  strerror(errno));
979  flow_error(flow, "premature end of test: %s",
980  strerror(errno));
981  return rc;
982  }
983 
984  if (rc == 0) {
985  DEBUG_MSG(LOG_CRIT, "flow %d sent zero bytes. what "
986  "does that mean?", flow->id);
987  return rc;
988  }
989 
990  DEBUG_MSG(LOG_DEBUG, "flow %d sent %d request bytes of %u "
991  "(before = %u)", flow->id, rc,
994 
995  foreach(int *i, INTERVAL, FINAL)
996  flow->statistics[*i].bytes_written += rc;
997 
998  flow->current_block_bytes_written += rc;
999 
1000  if (flow->current_block_bytes_written >=
1001  flow->current_write_block_size) {
1002  assert(flow->current_block_bytes_written ==
1003  flow->current_write_block_size);
1004  /* we just finished writing a block */
1005  flow->current_block_bytes_written = 0;
1006  gettime(&flow->last_block_written);
1007 
1008  foreach(int *i, INTERVAL, FINAL)
1009  flow->statistics[*i].request_blocks_written++;
1010 
1011  interpacket_gap = next_interpacket_gap(flow);
1012 
1013  /* if we calculated a non-zero packet add relative time
1014  * to the next write stamp which is then checked in the
1015  * select call */
1016  if (interpacket_gap) {
1018  interpacket_gap);
1019  if (time_is_after(&flow->last_block_written,
1020  &flow->next_write_block_timestamp)) {
1021  char timestamp[30] = "";
1023  timestamp, sizeof(timestamp), true);
1024  DEBUG_MSG(LOG_WARNING, "incipient "
1025  "congestion on flow %u new "
1026  "block scheduled for %s, "
1027  "%.6lfs before now",
1028  flow->id, timestamp,
1030  &flow->last_block_written));
1031  flow->congestion_counter++;
1032  if (flow->congestion_counter >
1033  CONGESTION_LIMIT &&
1034  flow->settings.flow_control)
1035  return -1;
1036  }
1037  }
1038  if (flow->settings.cork && toggle_tcp_cork(flow->fd) == -1)
1039  DEBUG_MSG(LOG_NOTICE, "failed to recork test "
1040  "socket for flow %d: %s",
1041  flow->id, strerror(errno));
1042  }
1043 
1044  if (!flow->settings.pushy)
1045  break;
1046  }
1047  return 0;
1048 }
1049 
1050 static inline int try_read_n_bytes(struct flow *flow, int bytes)
1051 {
1052  int rc;
1053  struct iovec iov;
1054  struct msghdr msg;
1055 /* we only read out of band data for debugging purpose */
1056 #ifdef DEBUG
1057  char cbuf[512];
1058  struct cmsghdr *cmsg;
1059 #else /* DEBUG */
1060  char cbuf[16];
1061 #endif /* DEBUG */
1062  iov.iov_base = flow->read_block +
1064  iov.iov_len = bytes;
1065  /* no name required */
1066  msg.msg_name = NULL;
1067  msg.msg_namelen = 0;
1068  msg.msg_iov = &iov;
1069  msg.msg_iovlen = 1;
1070  msg.msg_control = cbuf;
1071  msg.msg_controllen = sizeof(cbuf);
1072 
1073  rc = recvmsg(flow->fd, &msg, 0);
1074 
1075  DEBUG_MSG(LOG_DEBUG, "tried reading %d bytes, got %d", bytes, rc);
1076 
1077  if (rc == -1) {
1078  if (errno == EAGAIN)
1079  flow_error(flow, "Premature end of test: %s",
1080  strerror(errno));
1081  return -1;
1082  }
1083 
1084  if (rc == 0) {
1085  DEBUG_MSG(LOG_ERR, "server shut down test socket of flow %d",
1086  flow->id);
1087  if (!flow->finished[READ] || !flow->settings.shutdown)
1088  warnx("premature shutdown of server flow");
1089  flow->finished[READ] = 1;
1090  return -1;
1091  }
1092 
1093  DEBUG_MSG(LOG_DEBUG, "flow %d received %u bytes", flow->id, rc);
1094 
1095  flow->current_block_bytes_read += rc;
1096 
1097  foreach(int *i, INTERVAL, FINAL)
1098  flow->statistics[*i].bytes_read += rc;
1099 
1100 #ifdef DEBUG
1101  for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg))
1102  DEBUG_MSG(LOG_NOTICE, "flow %d received cmsg: type = %u, len = %u",
1103  flow->id, cmsg->cmsg_type, (socklen_t) cmsg->cmsg_len);
1104 #endif /* DEBUG */
1105 
1106  return rc;
1107 }
1108 
1109 static int read_data(struct flow *flow)
1110 {
1111  int rc = 0;
1112  int optint = 0;
1113  int requested_response_block_size = 0;
1114 
1115  for (;;) {
1116  /* make sure to read block header for new block */
1118  rc = try_read_n_bytes(flow,
1121  break;
1122  }
1123  /* parse data and update status */
1124 
1125  /* parse and check current block size for validity */
1126  optint = ntohl( ((struct block *)flow->read_block)->this_block_size );
1127  if (optint >= MIN_BLOCK_SIZE &&
1128  optint <= flow->settings.maximum_block_size )
1129  flow->current_read_block_size = optint;
1130  else
1131  logging(LOG_WARNING, "flow %d parsed illegal cbs %d, "
1132  "ignoring (max: %d)", flow->id, optint,
1134 
1135  /* parse and check current request size for validity */
1136  optint = ntohl( ((struct block *)flow->read_block)->request_block_size );
1137  if (optint == -1 || optint == 0 ||
1138  (optint >= MIN_BLOCK_SIZE &&
1139  optint <= flow->settings.maximum_block_size))
1140  requested_response_block_size = optint;
1141  else
1142  logging(LOG_WARNING, "flow %d parsed illegal qbs %d, "
1143  "ignoring (max: %d)", flow->id, optint,
1145 #ifdef DEBUG
1146  if (requested_response_block_size == -1) {
1147  DEBUG_MSG(LOG_NOTICE, "processing response block on "
1148  "flow %d size: %d", flow->id,
1149  flow->current_read_block_size);
1150  } else {
1151  DEBUG_MSG(LOG_NOTICE, "processing request block on "
1152  "flow %d size: %d, request: %d", flow->id,
1154  requested_response_block_size);
1155  }
1156 #endif /* DEBUG */
1157  /* read rest of block, if we have more to read */
1158  if (flow->current_block_bytes_read <
1160  rc += try_read_n_bytes(flow,
1161  flow->current_read_block_size -
1162  flow->current_block_bytes_read);
1163 
1164  if (flow->current_block_bytes_read >=
1165  flow->current_read_block_size ) {
1166  assert(flow->current_block_bytes_read ==
1167  flow->current_read_block_size);
1168  flow->current_block_bytes_read = 0;
1169 
1170  /* TODO process_rtt(), process_iat(), and
1171  * process_delay () call all gettime().
1172  * Quite inefficient... */
1173 
1174  if (requested_response_block_size == -1) {
1175  /* this is a response block, consider DATA as
1176  * RTT */
1177  foreach(int *i, INTERVAL, FINAL)
1178  flow->statistics[*i].response_blocks_read++;
1179  process_rtt(flow);
1180  } else {
1181  /* this is a request block, calculate IAT */
1182  foreach(int *i, INTERVAL, FINAL)
1183  flow->statistics[*i].request_blocks_read++;
1184  process_iat(flow);
1185  process_delay(flow);
1186 
1187  /* send response if requested */
1188  if (requested_response_block_size >=
1189  (signed)MIN_BLOCK_SIZE && !flow->finished[READ])
1190  send_response(flow,
1191  requested_response_block_size);
1192  }
1193  }
1194  if (!flow->settings.pushy)
1195  break;
1196  }
1197  return rc;
1198 }
1199 
1200 static void process_rtt(struct flow* flow)
1201 {
1202  double current_rtt = .0;
1203  struct timespec now;
1204  struct timespec *data = (struct timespec *)
1205  (flow->read_block + 2*(sizeof (int32_t)));
1206 
1207  gettime(&now);
1208  current_rtt = time_diff(data, &now);
1209 
1210  if (current_rtt < 0) {
1211  logging(LOG_CRIT, "received malformed rtt block of flow %d "
1212  "(rtt = %.3lfms), ignoring", flow->id, current_rtt * 1e3);
1213  current_rtt = NAN;
1214  }
1215 
1216  flow->last_block_read = now;
1217 
1218  if (!isnan(current_rtt)) {
1219  foreach(int *i, INTERVAL, FINAL) {
1220  ASSIGN_MIN(flow->statistics[*i].rtt_min, current_rtt);
1221  ASSIGN_MAX(flow->statistics[*i].rtt_max, current_rtt);
1222  flow->statistics[*i].rtt_sum += current_rtt;
1223  }
1224  }
1225 
1226  DEBUG_MSG(LOG_NOTICE, "processed RTT of flow %d (%.3lfms)",
1227  flow->id, current_rtt * 1e3);
1228 }
1229 
1230 static void process_iat(struct flow* flow)
1231 {
1232  double current_iat = .0;
1233  struct timespec now;
1234 
1235  gettime(&now);
1236 
1237  if (flow->last_block_read.tv_sec ||
1238  flow->last_block_read.tv_nsec)
1239  current_iat = time_diff(&flow->last_block_read, &now);
1240  else
1241  current_iat = NAN;
1242 
1243  if (current_iat < 0) {
1244  logging(LOG_CRIT, "calculated malformed iat of flow %d "
1245  "(iat = %.3lfms) (clock skew?), ignoring",
1246  flow->id, current_iat * 1e3);
1247  current_iat = NAN;
1248  }
1249 
1250  flow->last_block_read = now;
1251 
1252  if (!isnan(current_iat)) {
1253  foreach(int *i, INTERVAL, FINAL) {
1254  ASSIGN_MIN(flow->statistics[*i].iat_min, current_iat);
1255  ASSIGN_MAX(flow->statistics[*i].iat_max, current_iat);
1256  flow->statistics[*i].iat_sum += current_iat;
1257  }
1258  }
1259  DEBUG_MSG(LOG_NOTICE, "processed IAT of flow %d (%.3lfms)",
1260  flow->id, current_iat * 1e3);
1261 }
1262 
1263 static void process_delay(struct flow* flow)
1264 {
1265  double current_delay = .0;
1266  struct timespec now;
1267  struct timespec *data = (struct timespec *)
1268  (flow->read_block + 2*(sizeof (int32_t)));
1269 
1270  gettime(&now);
1271  current_delay = time_diff(data, &now);
1272 
1273  if (current_delay < 0) {
1274  logging(LOG_CRIT, "calculated malformed delay of flow "
1275  "%d (rtt = %.3lfms) (clocks out-of-sync?), ignoring",
1276  flow->id, current_delay * 1e3);
1277  current_delay = NAN;
1278  }
1279 
1280  if (!isnan(current_delay)) {
1281  foreach(int *i, INTERVAL, FINAL) {
1282  ASSIGN_MIN(flow->statistics[*i].delay_min,
1283  current_delay);
1284  ASSIGN_MAX(flow->statistics[*i].delay_max,
1285  current_delay);
1286  flow->statistics[*i].delay_sum += current_delay;
1287  }
1288  }
1289 
1290  DEBUG_MSG(LOG_NOTICE, "processed delay of flow %d (%.3lfms)",
1291  flow->id, current_delay * 1e3);
1292 }
1293 
1294 static void send_response(struct flow* flow, int requested_response_block_size)
1295 {
1296  int rc;
1297  int try = 0;
1298 
1299  assert(!flow->current_block_bytes_written);
1300 
1301  /* write requested block size as current size */
1302  ((struct block *)flow->write_block)->this_block_size =
1303  htonl(requested_response_block_size);
1304  /* rqs = -1 indicates response block */
1305  ((struct block *)flow->write_block)->request_block_size = htonl(-1);
1306  /* copy rtt data from received block to response block (echo back) */
1307  ((struct block *)flow->write_block)->data =
1308  ((struct block *)flow->read_block)->data;
1309  /* workaround for 64bit sender and 32bit receiver: we check if the
1310  * timespec is 64bit and then echo the missing 32bit back, too */
1311  if ((((struct block *)flow->write_block)->data.tv_sec) ||
1312  ((struct block *)flow->write_block)->data.tv_nsec)
1313  ((struct block *)flow->write_block)->data2 =
1314  ((struct block *)flow->read_block)->data2;
1315 
1316  DEBUG_MSG(LOG_DEBUG, "wrote new response data to out buffer bs = %d, "
1317  "rqs = %d on flow %d",
1318  ntohl(((struct block *)flow->write_block)->this_block_size),
1319  ntohl(((struct block *)flow->write_block)->request_block_size),
1320  flow->id);
1321 
1322  /* send data out until block is finished (or abort if 0 zero bytes are
1323  * send CONGESTION_LIMIT times) */
1324  for (;;) {
1325  rc = write(flow->fd,
1327  requested_response_block_size -
1329 
1330  DEBUG_MSG(LOG_NOTICE, "send %d bytes response (rqs %d) on flow "
1331  "%d", rc, requested_response_block_size,flow->id);
1332 
1333  if (rc == -1) {
1334  if (errno == EAGAIN) {
1335  DEBUG_MSG(LOG_DEBUG, "%s, still trying to send "
1336  "response block (write queue hit "
1337  "limit)", strerror(errno));
1338  try++;
1339  if (try >= CONGESTION_LIMIT &&
1340  !flow->current_block_bytes_written) {
1341  logging(LOG_WARNING, "tried to send "
1342  "response block %d times without "
1343  "success, dropping (%s)",
1344  try, strerror(errno));
1345  break;
1346  }
1347  } else {
1348  logging(LOG_WARNING, "premature end of test: "
1349  "%s, abort flow", strerror(errno));
1350  flow->finished[READ] = 1;
1351  break;
1352  }
1353  } else {
1354  flow->current_block_bytes_written += rc;
1355  foreach(int *i, INTERVAL, FINAL)
1356  flow->statistics[*i].bytes_written += rc;
1357 
1358  if (flow->current_block_bytes_written >=
1359  (unsigned)requested_response_block_size) {
1360  assert(flow->current_block_bytes_written ==
1361  (unsigned)requested_response_block_size);
1362  /* just finish sending response block */
1363  flow->current_block_bytes_written = 0;
1364  gettime(&flow->last_block_written);
1365  foreach(int *i, INTERVAL, FINAL)
1366  flow->statistics[*i].response_blocks_written++;
1367  break;
1368  }
1369  }
1370  }
1371 }
1372 
1373 
1375 {
1376  for (int i = 0; i < flow->settings.num_extra_socket_options; i++) {
1377  int level, res;
1378  const struct extra_socket_options *option =
1379  &flow->settings.extra_socket_options[i];
1380 
1381  switch (option->level) {
1382  case level_sol_socket:
1383  level = SOL_SOCKET;
1384  break;
1385  case level_sol_tcp:
1386  level = SOL_TCP;
1387  break;
1388  case level_ipproto_ip:
1389  level = IPPROTO_IP;
1390  break;
1391  case level_ipproto_sctp:
1392  level = IPPROTO_SCTP;
1393  break;
1394  case level_ipproto_tcp:
1395  level = IPPROTO_TCP;
1396  break;
1397  case level_ipproto_udp:
1398  level = IPPROTO_UDP;
1399  break;
1400  default:
1401  flow_error(flow, "Unknown socket option level: %d",
1402  option->level);
1403  return -1;
1404  }
1405 
1406  res = setsockopt(flow->fd, level, option->optname,
1407  option->optval, option->optlen);
1408 
1409  if (res == -1) {
1410  flow_error(flow, "Unable to set socket option %d: %s",
1411  option->optname, strerror(errno));
1412  return -1;
1413  }
1414  }
1415 
1416  return 0;
1417 }
1418 
1419 /* Set the TCP options on the data socket */
1421 {
1422  set_non_blocking(flow->fd);
1423 
1424  if (*flow->settings.cc_alg &&
1425  set_congestion_control(flow->fd, flow->settings.cc_alg) == -1) {
1426  flow_error(flow, "Unable to set congestion control "
1427  "algorithm: %s", strerror(errno));
1428  return -1;
1429  }
1430  if (flow->settings.elcn &&
1431  set_so_elcn(flow->fd, flow->settings.elcn) == -1) {
1432  flow_error(flow, "Unable to set TCP_ELCN: %s",
1433  strerror(errno));
1434  return -1;
1435  }
1436  if (flow->settings.lcd && set_so_lcd(flow->fd) == -1) {
1437  flow_error(flow, "Unable to set TCP_LCD: %s",
1438  strerror(errno));
1439  return -1;
1440  }
1441  if (flow->settings.cork && set_tcp_cork(flow->fd) == -1) {
1442  flow_error(flow, "Unable to set TCP_CORK: %s",
1443  strerror(errno));
1444  return -1;
1445  }
1446  if (flow->settings.so_debug && set_so_debug(flow->fd) == -1) {
1447  flow_error(flow, "Unable to set SO_DEBUG: %s",
1448  strerror(errno));
1449  return -1;
1450  }
1451  if (flow->settings.mtcp && set_tcp_mtcp(flow->fd) == -1) {
1452  flow_error(flow, "Unable to set TCP_MTCP: %s",
1453  strerror(errno));
1454  return -1;
1455  }
1456  if (flow->settings.nonagle && set_tcp_nodelay(flow->fd) == -1) {
1457  flow_error(flow, "Unable to set TCP_NODELAY: %s",
1458  strerror(errno));
1459  return -1;
1460  }
1461  if (flow->settings.route_record && set_route_record(flow->fd) == -1) {
1462  flow_error(flow, "Unable to set route record option: %s",
1463  strerror(errno));
1464  return -1;
1465  }
1466  if (flow->settings.dscp &&
1467  set_dscp(flow->fd, flow->settings.dscp) == -1) {
1468  flow_error(flow, "Unable to set DSCP value: %s",
1469  strerror(errno));
1470  return -1;
1471  }
1472  if (flow->settings.ipmtudiscover &&
1473  set_ip_mtu_discover(flow->fd) == -1) {
1474  flow_error(flow, "Unable to set IP_MTU_DISCOVER value: %s",
1475  strerror(errno));
1476  return -1;
1477  }
1478  if (apply_extra_socket_options(flow) == -1)
1479  return -1;
1480 
1481  return 0;
1482 }
1483 
1484 /* Dispatch an incoming request to daemon thread */
1485 int dispatch_request(struct request *request, int type)
1486 {
1487  pthread_cond_t cond;
1488 
1489  request->error = NULL;
1490  request->type = type;
1491  request->next = NULL;
1492 
1493  /* Create synchronization mutex */
1494  if (pthread_cond_init(&cond, NULL)) {
1495  request_error(request, "Could not create synchronization mutex");
1496  return -1;
1497  }
1498  request->condition = &cond;
1499 
1500  pthread_mutex_lock(&mutex);
1501 
1502  if (!requests) {
1503  requests = request;
1504  requests_last = request;
1505  } else {
1506  requests_last->next = request;
1507  requests_last = request;
1508  }
1509  if (write(daemon_pipe[1], &type, 1) != 1) /* Doesn't matter what we write */
1510  return -1;
1511  /* Wait until the daemon thread has processed the request */
1512  pthread_cond_wait(&cond, &mutex);
1513 
1514  pthread_mutex_unlock(&mutex);
1515 
1516  if (request->error)
1517  return -1;
1518 
1519  return 0;
1520 }
1521 
1532 void get_uuid_string(char *uuid_str)
1533 {
1534  uuid_t uuid;
1535  static char server_uuid[38] = "";
1536 
1537  if (!strlen(server_uuid)) {
1538  uuid_generate_time(uuid);
1539  uuid_unparse(uuid,uuid_str);
1540  memset(server_uuid,0,sizeof(server_uuid));
1541  strcpy(server_uuid,uuid_str);
1542  return;
1543  }
1544  strcpy(uuid_str,server_uuid);
1545 }
int has_tcp_info
Definition: daemon.h:156
const char * ctimespec_r(const struct timespec *tp, char *buf, size_t size, bool ns)
Converts timespec struct tp into a null-terminated string.
Definition: fg_time.c:66
double delay_sum
Accumulated one-way delay.
Definition: common.h:319
struct report * reports
Definition: daemon.c:95
double delay_sum
Accumulated one-way delay.
Definition: daemon.h:148
Routines used to setup a Flowgrind destination for a test.
structure for getting the UUID.
Definition: daemon.h:240
static void process_select(fd_set *rfds, fd_set *wfds, fd_set *efds)
Definition: daemon.c:725
fd_set rfds
Definition: daemon.c:92
int maximum_block_size
Application buffer size in bytes (option -U).
Definition: common.h:201
unsigned request_blocks_read
Definition: daemon.h:130
int mtcp
Set TCP_MTCP (15) on test socket (option -O).
Definition: common.h:242
int set_so_debug(int fd)
Definition: fg_socket.c:366
int gettime(struct timespec *tp)
Returns the current wall-clock time with nanosecond precision.
Definition: fg_time.c:145
int set_dscp(int fd, int dscp)
Definition: fg_socket.c:132
int pmtu
Definition: daemon.h:114
int set_so_lcd(int fd)
Definition: fg_socket.c:288
#define REQUEST_ADD_SOURCE
Definition: daemon.h:174
Routines used by the Flowgrind daemon.
void get_uuid_string(char *uuid_str)
To generate daemon UUID.
Definition: daemon.c:1532
int num_extra_socket_options
Definition: common.h:262
unsigned current_read_block_size
Definition: daemon.h:101
void * daemon_main(void *ptr __attribute__((unused)))
Definition: daemon.c:796
int lcd
Set TCP_LCD (21) on test socket (option -O).
Definition: common.h:240
#define warnc(code,...)
To report a warning w/ the system error message &#39;code&#39;.
Definition: fg_error.h:52
int get_imtu(int fd)
Definition: fg_socket.c:214
Debugging routines for Flowgrind controller and daemon.
enum endpoint_t endpoint
Daemon endpoint - either source or destination.
Definition: common.h:289
static void timer_check()
Definition: daemon.c:687
static int read_data(struct flow *flow)
Definition: daemon.c:1109
double iat_sum
Accumulated inter-arrival time.
Definition: common.h:313
static void process_iat(struct flow *flow)
Definition: daemon.c:1230
void logging(int priority, const char *fmt,...)
Definition: fg_log.c:69
int set_tcp_nodelay(int fd)
Definition: fg_socket.c:358
int set_tcp_mtcp(int fd)
Definition: fg_socket.c:347
struct timespec begin
Definition: common.h:292
int set_flow_tcp_options(struct flow *flow)
Definition: daemon.c:1420
char connect_called
Definition: daemon.h:111
struct report * next
Definition: common.h:338
Routines for statistics and advanced traffic generation.
unsigned request_blocks_written
Definition: daemon.h:131
double rtt_sum
Accumulated round-trip time.
Definition: common.h:325
const struct list_node * fg_list_front(struct linked_list *const list)
Returns the first element of the list.
Definition: fg_list.c:49
int do_connect(struct flow *flow)
Establishes a connection of a flow.
Definition: source.c:153
struct timespec next_write_block_timestamp
Definition: daemon.h:95
struct timespec data2
Used to access 64bit timespec on 32bit arch.
Definition: common.h:162
struct flow_source_settings source_settings
Definition: daemon.h:84
struct request r
Definition: daemon.h:229
static void start_flows(struct request_start_flows *request)
Definition: daemon.c:329
struct timespec end
Definition: common.h:293
double delay[2]
Delay of flow in seconds (option -Y).
Definition: common.h:188
double rtt_min
Minimum round-trip time.
Definition: common.h:321
int id
Definition: common.h:287
#define REQUEST_STOP_FLOW
Definition: daemon.h:176
int set_tcp_cork(int fd)
Definition: fg_socket.c:317
int dispatch_request(struct request *request, int type)
Dispatch a request to daemon loop.
Definition: daemon.c:1485
unsigned request_blocks_read
Definition: common.h:301
double iat_min
Minimum inter-arrival time.
Definition: common.h:309
static int prepare_rfds(struct timespec *now, struct flow *flow, fd_set *rfds)
Definition: daemon.c:229
int maxfd
Definition: daemon.c:93
static int write_data(struct flow *flow)
Definition: daemon.c:933
Error-reporting routines used by Flowgrind.
Definition: common.h:286
int next_response_block_size(struct flow *flow)
Definition: trafgen.c:143
int set_route_record(int fd)
Definition: fg_socket.c:149
struct flow_settings settings
Definition: daemon.h:83
struct request * next
Definition: daemon.h:189
#define CPY_INFO_MEMBER(a)
pthread_mutex_t mutex
Definition: daemon.c:89
struct fg_tcp_info tcp_info
Definition: daemon.h:157
static void prepare_wfds(struct timespec *now, struct flow *flow, fd_set *wfds)
Definition: daemon.c:195
double time_diff(const struct timespec *tp1, const struct timespec *tp2)
Returns the time difference between two the specific points in time tp1 and tp2.
Definition: fg_time.c:95
struct request * requests
Definition: daemon.c:90
unsigned response_blocks_written
Definition: common.h:304
struct fg_tcp_info tcp_info
Definition: common.h:329
enum endpoint_t endpoint
Definition: daemon.h:78
void uninit_flow(struct flow *flow)
Definition: daemon.c:161
static void send_response(struct flow *flow, int requested_response_block_size)
Definition: daemon.c:1294
int get_tcp_info(struct flow *flow, struct fg_tcp_info *info)
Definition: daemon.c:646
#define warnx(...)
To report a warning w/ a system error message.
Definition: fg_error.h:54
static void stop_flow(struct request_stop_flow *request)
Definition: daemon.c:378
char type
Definition: daemon.h:181
static void report_flow(struct flow *flow, int type)
To prepare a report, report type is either INTERVAL or FINAL.
Definition: daemon.c:518
int cork
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:232
unsigned pmtu
Discovered Path MTU.
Definition: common.h:332
#define SOL_TCP
Definition: daemon.c:78
fd_set efds
Definition: daemon.c:92
#define DEBUG_MSG(LVL, MSG,...)
Print debug message to standard error.
Definition: debug.h:49
unsigned current_block_bytes_read
Definition: daemon.h:103
char cc_alg[TCP_CA_NAME_MAX]
Set congestion control algorithm ALG on test socket (option -O).
Definition: common.h:236
static xmlrpc_value * add_flow_source(xmlrpc_env *const env, xmlrpc_value *const param_array, void *const user_data)
Prepare data connection for source endpoint.
Definition: fg_rpc_server.c:62
void * data
Pointer to user defined data stored with this node.
Definition: fg_list.h:38
char * read_block
Definition: daemon.h:97
int ipmtudiscover
Set IP_MTU_DISCOVER on test socket (option -O).
Definition: common.h:246
Endpoint that accepts the connection.
Definition: common.h:100
struct timespec last_report_time
Definition: daemon.h:92
#define ASSIGN_MIN(s, c)
Assign value if it less than current one.
unsigned imtu
Interface MTU.
Definition: common.h:334
int get_pmtu(int fd)
Definition: fg_socket.c:193
struct timespec data
Sending timestap for calculating delay and RTT.
Definition: common.h:160
int status
Definition: common.h:336
int pushy
Do not iterate through select() to continue sending in case block size did not suffice to fill sendin...
Definition: common.h:213
static int flow_sending(struct timespec *now, struct flow *flow, int direction)
Definition: daemon.c:148
static void process_delay(struct flow *flow)
Definition: daemon.c:1263
double iat_min
Minimum interarrival time.
Definition: daemon.h:138
double rtt_max
Maximum round-trip time.
Definition: common.h:323
char * error
Definition: daemon.h:187
#define MIN_BLOCK_SIZE
Minium block (message) size we can send.
Definition: common.h:79
unsigned congestion_counter
Definition: daemon.h:116
char * write_block
Definition: daemon.h:98
unsigned current_write_block_size
Definition: daemon.h:100
double iat_sum
Accumulated interarrival time.
Definition: daemon.h:142
struct report * get_reports(int *has_more)
Definition: daemon.c:849
void remove_flow(struct flow *const flow)
Definition: daemon.c:187
void add_report(struct report *report)
Definition: daemon.c:823
char * error
Definition: daemon.h:170
void time_add(struct timespec *tp, double seconds)
Add an amount of time seconds to a specific point in time tp.
Definition: fg_time.c:136
#define UNUSED_ARGUMENT(x)
Suppress warning for unused argument.
static void process_requests()
To process the request issued from the controller.
Definition: daemon.c:442
static int try_read_n_bytes(struct flow *flow, int bytes)
Definition: daemon.c:1050
int dscp
DSCP value for TOS byte (option -D).
Definition: common.h:244
Endpoint that opens the connection.
Definition: common.h:98
double rtt_min
Minimum round-trip time.
Definition: daemon.h:150
int accept_data(struct flow *flow)
Definition: destination.c:250
int toggle_tcp_cork(int fd)
Definition: fg_socket.c:331
unsigned pending_reports
Definition: daemon.c:97
unsigned response_blocks_read
Definition: daemon.h:132
char started
Definition: daemon.c:101
unsigned request_blocks_written
Definition: common.h:302
#define REQUEST_START_FLOWS
Definition: daemon.h:175
int route_record
Sets ROUTE_RECORD on test socket (option -O).
Definition: common.h:208
struct flow_settings::extra_socket_options extra_socket_options[MAX_EXTRA_SOCKET_OPTIONS]
#define free_all(...)
To free() an arbitrary number of variables.
int listenfd_data
Definition: daemon.h:81
Flowgrind&#39;s data block layout.
Definition: common.h:148
#define warn(...)
To report a warning w/ the corresponding system error message.
Definition: fg_error.h:50
enum report_t type
Report type - either INTERVAL or FINAL report.
Definition: common.h:291
struct sockaddr * addr
Definition: daemon.h:119
int set_ip_mtu_discover(int fd)
Definition: fg_socket.c:300
void init_flow(struct flow *flow, int is_source)
To initialize all flows to the default value.
Definition: daemon.c:892
Single element in a doubly linked list.
Definition: fg_list.h:36
unsigned response_blocks_read
Definition: common.h:303
double delay_max
Maximum one-way delay.
Definition: daemon.h:146
double delay_min
Minimum one-way delay.
Definition: common.h:315
struct timespec first_report_time
Definition: daemon.h:91
#define crit(...)
To report an critical error w/ the corresponding system error message.
Definition: fg_error.h:36
unsigned response_blocks_written
Definition: daemon.h:133
unsigned random_seed
Random seed to use (default: read /dev/urandom) (option -J).
Definition: common.h:223
Routines used by Flowgrind to setup the source for a test flow.
struct timespec start_timestamp[2]
Definition: daemon.h:86
#define REQUEST_ADD_DESTINATION
Definition: daemon.h:173
pthread_cond_t * condition
Definition: daemon.h:185
int set_so_elcn(int fd, int val)
Definition: fg_socket.c:278
unsigned long long bytes_written
Definition: common.h:296
int set_congestion_control(int fd, const char *cc_alg)
Definition: fg_socket.c:265
void init_math_functions(struct flow *flow, unsigned long seed)
Definition: fg_math.c:58
unsigned long long bytes_written
Definition: daemon.h:125
struct request * requests_last
Definition: daemon.c:90
int elcn
Set TCP_ELCN (20) on test socket (option -O).
Definition: common.h:238
bool time_is_after(const struct timespec *tp1, const struct timespec *tp2)
Returns true if second point in time tp2 is chronologically after the first point in time tp1...
Definition: fg_time.c:110
static int prepare_fds()
Definition: daemon.c:263
double iat_max
Maximum interarrival time.
Definition: daemon.h:140
Final report.
Definition: common.h:116
struct list_node * next
Pointer to the previous node in the list.
Definition: fg_list.h:40
#define REQUEST_GET_UUID
Definition: daemon.h:178
enum flow_state_t state
Definition: daemon.h:77
struct timespec last_block_read
Definition: daemon.h:88
int fd
Definition: daemon.h:80
double iat_max
Maximum inter-arrival time.
Definition: common.h:311
int traffic_dump
Dump traffic using libpcap (option -M).
Definition: common.h:204
double delay_max
Maximum one-way delay.
Definition: common.h:317
Read operation.
Definition: common.h:108
static void process_rtt(struct flow *flow)
Definition: daemon.c:1200
char finished[2]
Definition: daemon.h:112
static int flow_in_delay(struct timespec *now, struct flow *flow, int direction)
Definition: daemon.c:141
int so_debug
Sets SO_DEBUG on test socket (option -O).
Definition: common.h:206
unsigned long long bytes_read
Definition: common.h:295
Write operation.
Definition: common.h:106
Data structures used by the Flowgrind daemon and controller.
char server_uuid[38]
UUID from the daemon.
Definition: daemon.h:244
struct timespec stop_timestamp[2]
Definition: daemon.h:87
double rtt_sum
Accumulated round-trip time.
Definition: daemon.h:154
Routines used to manipulate socket parameters for Flowgrind.
Common definitions used by the Flowgrind daemon, controller, and libs.
struct timespec next_report_time
Definition: daemon.h:93
double duration[2]
Duration of flow in seconds (option -T).
Definition: common.h:190
Routines used by the Flowgrind daemon for advanced traffic generation.
int daemon_pipe[2]
Definition: daemon.c:87
double next_interpacket_gap(struct flow *flow)
Definition: trafgen.c:173
int flow_control
Stop flow if it is experiencing local congestion (option -C).
Definition: common.h:226
struct timespec last_block_written
Definition: daemon.h:89
void request_error(struct request *request, const char *fmt,...)
Definition: daemon.c:128
pthread_t pcap_thread
Definition: daemon.h:161
double time_diff_now(const struct timespec *tp)
Returns time difference between now and the specific point in time tp.
Definition: fg_time.c:101
double delay_min
Minimum one-way delay.
Definition: daemon.h:144
Packet capture support for the Flowgrind daemon.
fd_set wfds
Definition: daemon.c:92
int apply_extra_socket_options(struct flow *flow)
Definition: daemon.c:1374
#define ASSIGN_MAX(s, c)
Assign value if it&#39;s greater than current one.
Definition: daemon.h:73
#define CONGESTION_LIMIT
Definition: daemon.c:85
struct flow::statistics statistics[2]
int next_request_block_size(struct flow *flow)
Definition: trafgen.c:107
#define REQUEST_GET_STATUS
Definition: daemon.h:177
void free_math_functions(struct flow *flow)
Definition: fg_math.c:97
int nonagle
Disable nagle algorithm on test socket (option -O).
Definition: common.h:234
int shutdown
Shutdown socket after test flow (option -N).
Definition: common.h:215
A doubly linked list.
Definition: fg_list.h:46
static int flow_block_scheduled(struct timespec *now, struct flow *flow)
Definition: daemon.c:156
#define DEFAULT_SELECT_TIMEOUT
Time select() will block waiting for a file descriptor to become ready.
Definition: daemon.h:51
void add_flow_destination(struct request_add_flow_destination *request)
To set daemon flow as destination endpoint.
Definition: destination.c:158
size_t fg_list_size(struct linked_list *const list)
Returns the number of elements in the list.
Definition: fg_list.c:211
unsigned current_block_bytes_written
Definition: daemon.h:104
int id
Definition: daemon.h:75
double rtt_max
Maximum round-trip time.
Definition: daemon.h:152
struct request r
Definition: daemon.h:249
int set_non_blocking(int fd)
Definition: fg_socket.c:172
unsigned long long bytes_read
Definition: daemon.h:124
struct linked_list flows
Definition: daemon.c:99
Timing related routines used by Flowgrind.
struct request r
Daemon thread process the request r.
Definition: daemon.h:242
int fg_list_remove(struct linked_list *const list, const void *const data)
Removes from the list the first element whose data points to data.
Definition: fg_list.c:65
double reporting_interval
Interval to report flow on screen (option -i).
Definition: common.h:193
Intermediated interval report.
Definition: common.h:114
void flow_error(struct flow *flow, const char *fmt,...)
Definition: daemon.c:115