corosync  3.0.2
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2018 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
45  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
46  */
47 
48 #include <config.h>
49 
50 #include <assert.h>
51 #ifdef HAVE_ALLOCA_H
52 #include <alloca.h>
53 #endif
54 #include <sys/mman.h>
55 #include <sys/types.h>
56 #include <sys/stat.h>
57 #include <sys/socket.h>
58 #include <netdb.h>
59 #include <sys/un.h>
60 #include <sys/ioctl.h>
61 #include <sys/param.h>
62 #include <netinet/in.h>
63 #include <arpa/inet.h>
64 #include <unistd.h>
65 #include <fcntl.h>
66 #include <stdlib.h>
67 #include <stdio.h>
68 #include <errno.h>
69 #include <sched.h>
70 #include <time.h>
71 #include <sys/time.h>
72 #include <sys/poll.h>
73 #include <sys/uio.h>
74 #include <limits.h>
75 
76 #include <qb/qblist.h>
77 #include <qb/qbdefs.h>
78 #include <qb/qbutil.h>
79 #include <qb/qbloop.h>
80 
81 #include <corosync/swab.h>
82 #include <corosync/sq.h>
83 
84 #define LOGSYS_UTILS_ONLY 1
85 #include <corosync/logsys.h>
86 
87 #include "totemsrp.h"
88 #include "totemnet.h"
89 
90 #include "cs_queue.h"
91 
92 #define LOCALHOST_IP inet_addr("127.0.0.1")
93 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
94 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
95 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
96 #define MAXIOVS 5
97 #define RETRANSMIT_ENTRIES_MAX 30
98 #define TOKEN_SIZE_MAX 64000 /* bytes */
99 #define LEAVE_DUMMY_NODEID 0
100 
101 /*
102  * SRP address.
103  */
104 struct srp_addr {
105  unsigned int nodeid;
106 };
107 
108 /*
109  * Rollover handling:
110  * SEQNO_START_MSG is the starting sequence number after a new configuration
111  * This should remain zero, unless testing overflow in which case
112  * 0x7ffff000 and 0xfffff000 are good starting values.
113  *
114  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
115  * for a token. This should remain zero, unless testing overflow in which
116  * case 07fffff00 or 0xffffff00 are good starting values.
117  */
118 #define SEQNO_START_MSG 0x0
119 #define SEQNO_START_TOKEN 0x0
120 
121 /*
122  * These can be used ot test different rollover points
123  * #define SEQNO_START_MSG 0xfffffe00
124  * #define SEQNO_START_TOKEN 0xfffffe00
125  */
126 
127 /*
128  * These can be used to test the error recovery algorithms
129  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
130  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
131  * #define TEST_DROP_MCAST_PERCENTAGE 50
132  * #define TEST_RECOVERY_MSG_COUNT 300
133  */
134 
135 /*
136  * we compare incoming messages to determine if their endian is
137  * different - if so convert them
138  *
139  * do not change
140  */
141 #define ENDIAN_LOCAL 0xff22
142 
144  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
145  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
146  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
147  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
148  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
149  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
150 };
151 
155 };
156 
157 /*
158  * New membership algorithm local variables
159  */
161  struct srp_addr addr;
162  int set;
163 };
164 
165 
167  struct qb_list_head list;
168  int (*callback_fn) (enum totem_callback_token_type type, const void *);
169  enum totem_callback_token_type callback_type;
170  int delete;
171  void *data;
172 };
173 
174 
176  int mcast;
177  int token;
178 };
179 
180 struct mcast {
183  unsigned int seq;
186  unsigned int node_id;
188 } __attribute__((packed));
189 
190 
191 struct rtr_item {
193  unsigned int seq;
194 }__attribute__((packed));
195 
196 
197 struct orf_token {
199  unsigned int seq;
200  unsigned int token_seq;
201  unsigned int aru;
202  unsigned int aru_addr;
204  unsigned int backlog;
205  unsigned int fcc;
208  struct rtr_item rtr_list[0];
209 }__attribute__((packed));
210 
211 
212 struct memb_join {
215  unsigned int proc_list_entries;
216  unsigned int failed_list_entries;
217  unsigned long long ring_seq;
218  unsigned char end_of_memb_join[0];
219 /*
220  * These parts of the data structure are dynamic:
221  * struct srp_addr proc_list[];
222  * struct srp_addr failed_list[];
223  */
224 } __attribute__((packed));
225 
226 
231 } __attribute__((packed));
232 
233 
237 } __attribute__((packed));
238 
239 
242  unsigned int aru;
243  unsigned int high_delivered;
244  unsigned int received_flg;
245 }__attribute__((packed));
246 
247 
250  unsigned int token_seq;
252  unsigned int retrans_flg;
255  unsigned char end_of_commit_token[0];
256 /*
257  * These parts of the data structure are dynamic:
258  *
259  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
260  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
261  */
262 }__attribute__((packed));
264 struct message_item {
265  struct mcast *mcast;
266  unsigned int msg_len;
267 };
270  struct mcast *mcast;
271  unsigned int msg_len;
272 };
273 
279 };
280 
283 
285 
286  /*
287  * Flow control mcasts and remcasts on last and current orf_token
288  */
290 
292 
294 
295  struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX];
296 
298 
300 
301  struct srp_addr my_id;
302 
303  struct totem_ip_address my_addrs[INTERFACE_MAX];
304 
305  struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX];
306 
307  struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX];
308 
309  struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX];
310 
311  struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX];
312 
313  struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX];
314 
315  struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX];
316 
317  struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX];
318 
319  unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX];
320 
322 
324 
326 
328 
330 
332 
334 
336 
337  struct memb_ring_id my_ring_id;
338 
339  struct memb_ring_id my_old_ring_id;
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
368  struct cs_queue new_message_queue;
369 
370  struct cs_queue new_message_queue_trans;
371 
372  struct cs_queue retrans_message_queue;
373 
374  struct sq regular_sort_queue;
375 
376  struct sq recovery_sort_queue;
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
385  struct qb_list_head token_callback_received_listhead;
386 
387  struct qb_list_head token_callback_sent_listhead;
388 
389  char orf_token_retransmit[TOKEN_SIZE_MAX];
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
402  qb_loop_timer_handle timer_orf_token_warning;
403 
405 
407 
408  qb_loop_timer_handle timer_merge_detect_timeout;
409 
411 
413 
414  qb_loop_timer_handle memb_timer_state_commit_timeout;
415 
416  qb_loop_timer_handle timer_heartbeat_timeout;
417 
418  /*
419  * Function and data used to log messages
420  */
422 
424 
426 
428 
430 
432 
434 
435  void (*totemsrp_log_printf) (
436  int level,
437  int subsys,
438  const char *function,
439  const char *file,
440  int line,
441  const char *format, ...)__attribute__((format(printf, 6, 7)));;
442 
444 
445 //TODO struct srp_addr next_memb;
446 
448 
449  struct totem_ip_address mcast_address;
450 
451  void (*totemsrp_deliver_fn) (
452  unsigned int nodeid,
453  const void *msg,
454  unsigned int msg_len,
455  int endian_conversion_required);
456 
457  void (*totemsrp_confchg_fn) (
458  enum totem_configuration_type configuration_type,
459  const unsigned int *member_list, size_t member_list_entries,
460  const unsigned int *left_list, size_t left_list_entries,
461  const unsigned int *joined_list, size_t joined_list_entries,
462  const struct memb_ring_id *ring_id);
463 
464  void (*totemsrp_service_ready_fn) (void);
465 
466  void (*totemsrp_waiting_trans_ack_cb_fn) (
467  int waiting_trans_ack);
468 
469  void (*memb_ring_id_create_or_load) (
470  struct memb_ring_id *memb_ring_id,
471  unsigned int nodeid);
472 
473  void (*memb_ring_id_store) (
474  const struct memb_ring_id *memb_ring_id,
475  unsigned int nodeid);
476 
478 
480 
481  unsigned long long token_ring_id_seq;
482 
483  unsigned int last_released;
484 
485  unsigned int set_aru;
486 
488 
490 
492 
493  unsigned int my_last_seq;
494 
495  struct timeval tv_old;
496 
498 
500 
501  unsigned int use_heartbeat;
502 
503  unsigned int my_trc;
504 
505  unsigned int my_pbl;
506 
507  unsigned int my_cbl;
508 
509  uint64_t pause_timestamp;
510 
512 
514 
516 
518 
520 
522 
523  int flushing;
524 
527  char commit_token_storage[40000];
528 };
529 
531  int count;
532  int (*handler_functions[6]) (
533  struct totemsrp_instance *instance,
534  const void *msg,
535  size_t msg_len,
536  int endian_conversion_needed);
537 };
538 
557 };
558 
559 const char* gather_state_from_desc [] = {
560  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
561  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
563  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
564  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
565  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
566  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
567  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
568  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
570  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
571  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
572  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
573  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
574  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
575  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
576 };
577 
578 /*
579  * forward decls
580  */
581 static int message_handler_orf_token (
582  struct totemsrp_instance *instance,
583  const void *msg,
584  size_t msg_len,
585  int endian_conversion_needed);
586 
587 static int message_handler_mcast (
588  struct totemsrp_instance *instance,
589  const void *msg,
590  size_t msg_len,
591  int endian_conversion_needed);
592 
593 static int message_handler_memb_merge_detect (
594  struct totemsrp_instance *instance,
595  const void *msg,
596  size_t msg_len,
597  int endian_conversion_needed);
598 
599 static int message_handler_memb_join (
600  struct totemsrp_instance *instance,
601  const void *msg,
602  size_t msg_len,
603  int endian_conversion_needed);
604 
605 static int message_handler_memb_commit_token (
606  struct totemsrp_instance *instance,
607  const void *msg,
608  size_t msg_len,
609  int endian_conversion_needed);
610 
611 static int message_handler_token_hold_cancel (
612  struct totemsrp_instance *instance,
613  const void *msg,
614  size_t msg_len,
615  int endian_conversion_needed);
616 
617 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
618 
619 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
620 
621 static void srp_addr_to_nodeid (
622  struct totemsrp_instance *instance,
623  unsigned int *nodeid_out,
624  struct srp_addr *srp_addr_in,
625  unsigned int entries);
626 
627 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
628 
629 static void memb_leave_message_send (struct totemsrp_instance *instance);
630 
631 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
632 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
633 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
634 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
635  int fcc_mcasts_allowed);
636 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
637 
638 static void memb_ring_id_set (struct totemsrp_instance *instance,
639  const struct memb_ring_id *ring_id);
640 static void target_set_completed (void *context);
641 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
642 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
643 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
644 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
645 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
646 static int token_hold_cancel_send (struct totemsrp_instance *instance);
647 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
648 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
649 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
650 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
651 static void memb_merge_detect_endian_convert (
652  const struct memb_merge_detect *in,
653  struct memb_merge_detect *out);
654 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
655 static void timer_function_orf_token_timeout (void *data);
656 static void timer_function_orf_token_warning (void *data);
657 static void timer_function_pause_timeout (void *data);
658 static void timer_function_heartbeat_timeout (void *data);
659 static void timer_function_token_retransmit_timeout (void *data);
660 static void timer_function_token_hold_retransmit_timeout (void *data);
661 static void timer_function_merge_detect_timeout (void *data);
662 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
663 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
664 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
665 
666 void main_deliver_fn (
667  void *context,
668  const void *msg,
669  unsigned int msg_len,
670  const struct sockaddr_storage *system_from);
671 
673  void *context,
674  const struct totem_ip_address *iface_address,
675  unsigned int iface_no);
676 
678  6,
679  {
680  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
681  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
682  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
683  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
684  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
685  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
686  }
687 };
688 
689 #define log_printf(level, format, args...) \
690 do { \
691  instance->totemsrp_log_printf ( \
692  level, instance->totemsrp_subsys_id, \
693  __FUNCTION__, __FILE__, __LINE__, \
694  format, ##args); \
695 } while (0);
696 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
697 do { \
698  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
699  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
700  instance->totemsrp_log_printf ( \
701  level, instance->totemsrp_subsys_id, \
702  __FUNCTION__, __FILE__, __LINE__, \
703  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
704  } while(0)
705 
706 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
707 {
708  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
709  return gather_state_from_desc[gsfrom];
710  }
711  else {
712  return "UNKNOWN";
713  }
714 }
715 
716 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
717 {
718  memset (instance, 0, sizeof (struct totemsrp_instance));
719 
720  qb_list_init (&instance->token_callback_received_listhead);
721 
722  qb_list_init (&instance->token_callback_sent_listhead);
723 
724  instance->my_received_flg = 1;
725 
726  instance->my_token_seq = SEQNO_START_TOKEN - 1;
727 
729 
730  instance->set_aru = -1;
731 
732  instance->my_aru = SEQNO_START_MSG;
733 
735 
737 
738  instance->orf_token_discard = 0;
739 
740  instance->originated_orf_token = 0;
741 
742  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
743 
744  instance->waiting_trans_ack = 1;
745 }
746 
747 static int pause_flush (struct totemsrp_instance *instance)
748 {
749  uint64_t now_msec;
750  uint64_t timestamp_msec;
751  int res = 0;
752 
753  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
754  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
755 
756  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
758  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
759  /*
760  * -1 indicates an error from recvmsg
761  */
762  do {
764  } while (res == -1);
765  }
766  return (res);
767 }
768 
769 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
770 {
771  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
772  uint32_t time_now;
773  unsigned long long nano_secs = qb_util_nano_current_get ();
774 
775  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
776 
777  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
778  /* incr latest token the index */
779  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
780  instance->stats.latest_token = 0;
781  else
782  instance->stats.latest_token++;
783 
784  if (instance->stats.earliest_token == instance->stats.latest_token) {
785  /* we have filled up the array, start overwriting */
786  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
787  instance->stats.earliest_token = 0;
788  else
789  instance->stats.earliest_token++;
790 
791  instance->stats.token[instance->stats.earliest_token].rx = 0;
792  instance->stats.token[instance->stats.earliest_token].tx = 0;
793  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
794  }
795 
796  instance->stats.token[instance->stats.latest_token].rx = time_now;
797  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
798  } else {
799  instance->stats.token[instance->stats.latest_token].tx = time_now;
800  }
801  return 0;
802 }
803 
804 static void totempg_mtu_changed(void *context, int net_mtu)
805 {
806  struct totemsrp_instance *instance = context;
807 
808  instance->totem_config->net_mtu = net_mtu - sizeof (struct mcast);
809 
811  "Net MTU changed to %d, new value is %d",
812  net_mtu, instance->totem_config->net_mtu);
813 }
814 
815 /*
816  * Exported interfaces
817  */
819  qb_loop_t *poll_handle,
820  void **srp_context,
821  struct totem_config *totem_config,
822  totempg_stats_t *stats,
823 
824  void (*deliver_fn) (
825  unsigned int nodeid,
826  const void *msg,
827  unsigned int msg_len,
828  int endian_conversion_required),
829 
830  void (*confchg_fn) (
831  enum totem_configuration_type configuration_type,
832  const unsigned int *member_list, size_t member_list_entries,
833  const unsigned int *left_list, size_t left_list_entries,
834  const unsigned int *joined_list, size_t joined_list_entries,
835  const struct memb_ring_id *ring_id),
836  void (*waiting_trans_ack_cb_fn) (
837  int waiting_trans_ack))
838 {
839  struct totemsrp_instance *instance;
840  int res;
841 
842  instance = malloc (sizeof (struct totemsrp_instance));
843  if (instance == NULL) {
844  goto error_exit;
845  }
846 
847  totemsrp_instance_initialize (instance);
848 
849  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
850  instance->totemsrp_waiting_trans_ack_cb_fn (1);
851 
852  stats->srp = &instance->stats;
853  instance->stats.latest_token = 0;
854  instance->stats.earliest_token = 0;
855 
856  instance->totem_config = totem_config;
857 
858  /*
859  * Configure logging
860  */
869 
870  /*
871  * Configure totem store and load functions
872  */
874  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
875 
876  /*
877  * Initialize local variables for totemsrp
878  */
879  totemip_copy (&instance->mcast_address, &totem_config->interfaces[instance->lowest_active_if].mcast_addr);
880 
881  /*
882  * Display totem configuration
883  */
885  "Token Timeout (%d ms) retransmit timeout (%d ms)",
886  totem_config->token_timeout, totem_config->token_retransmit_timeout);
887  if (totem_config->token_warning) {
888  uint32_t token_warning_ms = totem_config->token_warning * totem_config->token_timeout / 100;
890  "Token warning every %d ms (%d%% of Token Timeout)",
891  token_warning_ms, totem_config->token_warning);
892  if (token_warning_ms < totem_config->token_retransmit_timeout)
894  "The token warning interval (%d ms) is less than the token retransmit timeout (%d ms) "
895  "which can lead to spurious token warnings. Consider increasing the token_warning parameter.",
896  token_warning_ms, totem_config->token_retransmit_timeout);
897  } else {
899  "Token warnings disabled");
900  }
902  "token hold (%d ms) retransmits before loss (%d retrans)",
903  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
905  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
906  totem_config->join_timeout,
907  totem_config->send_join_timeout,
908  totem_config->consensus_timeout,
909 
910  totem_config->merge_timeout);
912  "downcheck (%d ms) fail to recv const (%d msgs)",
913  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
915  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
916 
918  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
919  totem_config->window_size, totem_config->max_messages);
920 
922  "missed count const (%d messages)",
923  totem_config->miss_count_const);
924 
926  "send threads (%d threads)", totem_config->threads);
927 
929  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
931  "max_network_delay (%d ms)", totem_config->max_network_delay);
932 
933 
934  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
935  sizeof (struct message_item), instance->threaded_mode_enabled);
936 
937  sq_init (&instance->regular_sort_queue,
938  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
939 
940  sq_init (&instance->recovery_sort_queue,
941  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
942 
943  instance->totemsrp_poll_handle = poll_handle;
944 
945  instance->totemsrp_deliver_fn = deliver_fn;
946 
947  instance->totemsrp_confchg_fn = confchg_fn;
948  instance->use_heartbeat = 1;
949 
950  timer_function_pause_timeout (instance);
951 
952  if ( totem_config->heartbeat_failures_allowed == 0 ) {
954  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
955  instance->use_heartbeat = 0;
956  }
957 
958  if (instance->use_heartbeat) {
959  instance->heartbeat_timeout
960  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
961  + totem_config->max_network_delay;
962 
963  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
965  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
966  instance->heartbeat_timeout,
967  totem_config->token_timeout);
969  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
971  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
972  instance->use_heartbeat = 0;
973  }
974  else {
976  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
977  }
978  }
979 
980  res = totemnet_initialize (
981  poll_handle,
982  &instance->totemnet_context,
983  totem_config,
984  stats->srp,
985  instance,
986  main_deliver_fn,
987  main_iface_change_fn,
988  totempg_mtu_changed,
989  target_set_completed);
990  if (res == -1) {
991  goto error_exit;
992  }
993 
994  instance->my_id.nodeid = instance->totem_config->interfaces[instance->lowest_active_if].boundto.nodeid;
995 
996  /*
997  * Must have net_mtu adjusted by totemnet_initialize first
998  */
999  cs_queue_init (&instance->new_message_queue,
1001  sizeof (struct message_item), instance->threaded_mode_enabled);
1002 
1003  cs_queue_init (&instance->new_message_queue_trans,
1005  sizeof (struct message_item), instance->threaded_mode_enabled);
1006 
1008  &instance->token_recv_event_handle,
1010  0,
1011  token_event_stats_collector,
1012  instance);
1014  &instance->token_sent_event_handle,
1016  0,
1017  token_event_stats_collector,
1018  instance);
1019  *srp_context = instance;
1020  return (0);
1021 
1022 error_exit:
1023  return (-1);
1024 }
1025 
1027  void *srp_context)
1028 {
1029  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1030 
1031  memb_leave_message_send (instance);
1032  totemnet_finalize (instance->totemnet_context);
1033  cs_queue_free (&instance->new_message_queue);
1034  cs_queue_free (&instance->new_message_queue_trans);
1035  cs_queue_free (&instance->retrans_message_queue);
1036  sq_free (&instance->regular_sort_queue);
1037  sq_free (&instance->recovery_sort_queue);
1038  free (instance);
1039 }
1040 
1041 /*
1042  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1043  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1044  * function.
1045  *
1046  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1047  * and if interface was not found, -1 is returned.
1048  */
1050  void *srp_context,
1051  unsigned int nodeid,
1052  unsigned int *interface_id,
1053  struct totem_ip_address *interfaces,
1054  unsigned int interfaces_size,
1055  char ***status,
1056  unsigned int *iface_count)
1057 {
1058  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1059  struct totem_ip_address *iface_ptr = interfaces;
1060  int res = 0;
1061  int i,n;
1062  int num_ifs = 0;
1063 
1064  memset(interfaces, 0, sizeof(struct totem_ip_address) * interfaces_size);
1065  *iface_count = INTERFACE_MAX;
1066 
1067  for (i=0; i<INTERFACE_MAX; i++) {
1068  for (n=0; n < instance->totem_config->interfaces[i].member_count; n++) {
1069  if (instance->totem_config->interfaces[i].configured &&
1070  instance->totem_config->interfaces[i].member_list[n].nodeid == nodeid) {
1071  memcpy(iface_ptr, &instance->totem_config->interfaces[i].member_list[n], sizeof(struct totem_ip_address));
1072  interface_id[num_ifs] = i;
1073  iface_ptr++;
1074  if (++num_ifs > interfaces_size) {
1075  res = -2;
1076  break;
1077  }
1078  }
1079  }
1080  }
1081 
1082  totemnet_ifaces_get(instance->totemnet_context, status, iface_count);
1083  *iface_count = num_ifs;
1084  return (res);
1085 }
1086 
1088  void *srp_context,
1089  const char *cipher_type,
1090  const char *hash_type)
1091 {
1092  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1093  int res;
1094 
1095  res = totemnet_crypto_set(instance->totemnet_context, cipher_type, hash_type);
1096 
1097  return (res);
1098 }
1099 
1100 
1102  void *srp_context)
1103 {
1104  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1105  unsigned int res;
1106 
1107  res = instance->my_id.nodeid;
1108 
1109  return (res);
1110 }
1111 
1113  void *srp_context)
1114 {
1115  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1116  int res;
1117 
1118  res = instance->totem_config->interfaces[instance->lowest_active_if].boundto.family;
1119 
1120  return (res);
1121 }
1122 
1123 
1124 /*
1125  * Set operations for use by the membership algorithm
1126  */
1127 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1128 {
1129  if (a->nodeid == b->nodeid) {
1130  return 1;
1131  }
1132  return 0;
1133 }
1134 
1135 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1136 {
1137  dest->nodeid = src->nodeid;
1138 }
1139 
1140 static void srp_addr_to_nodeid (
1141  struct totemsrp_instance *instance,
1142  unsigned int *nodeid_out,
1143  struct srp_addr *srp_addr_in,
1144  unsigned int entries)
1145 {
1146  unsigned int i;
1147 
1148  for (i = 0; i < entries; i++) {
1149  nodeid_out[i] = srp_addr_in[i].nodeid;
1150  }
1151 }
1152 
1153 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1154 {
1155  out->nodeid = swab32 (in->nodeid);
1156 }
1157 
1158 static void memb_consensus_reset (struct totemsrp_instance *instance)
1159 {
1160  instance->consensus_list_entries = 0;
1161 }
1162 
1163 static void memb_set_subtract (
1164  struct srp_addr *out_list, int *out_list_entries,
1165  struct srp_addr *one_list, int one_list_entries,
1166  struct srp_addr *two_list, int two_list_entries)
1167 {
1168  int found = 0;
1169  int i;
1170  int j;
1171 
1172  *out_list_entries = 0;
1173 
1174  for (i = 0; i < one_list_entries; i++) {
1175  for (j = 0; j < two_list_entries; j++) {
1176  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1177  found = 1;
1178  break;
1179  }
1180  }
1181  if (found == 0) {
1182  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1183  *out_list_entries = *out_list_entries + 1;
1184  }
1185  found = 0;
1186  }
1187 }
1188 
1189 /*
1190  * Set consensus for a specific processor
1191  */
1192 static void memb_consensus_set (
1193  struct totemsrp_instance *instance,
1194  const struct srp_addr *addr)
1195 {
1196  int found = 0;
1197  int i;
1198 
1199  for (i = 0; i < instance->consensus_list_entries; i++) {
1200  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1201  found = 1;
1202  break; /* found entry */
1203  }
1204  }
1205  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1206  instance->consensus_list[i].set = 1;
1207  if (found == 0) {
1208  instance->consensus_list_entries++;
1209  }
1210  return;
1211 }
1212 
1213 /*
1214  * Is consensus set for a specific processor
1215  */
1216 static int memb_consensus_isset (
1217  struct totemsrp_instance *instance,
1218  const struct srp_addr *addr)
1219 {
1220  int i;
1221 
1222  for (i = 0; i < instance->consensus_list_entries; i++) {
1223  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1224  return (instance->consensus_list[i].set);
1225  }
1226  }
1227  return (0);
1228 }
1229 
1230 /*
1231  * Is consensus agreed upon based upon consensus database
1232  */
1233 static int memb_consensus_agreed (
1234  struct totemsrp_instance *instance)
1235 {
1236  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1237  int token_memb_entries = 0;
1238  int agreed = 1;
1239  int i;
1240 
1241  memb_set_subtract (token_memb, &token_memb_entries,
1242  instance->my_proc_list, instance->my_proc_list_entries,
1243  instance->my_failed_list, instance->my_failed_list_entries);
1244 
1245  for (i = 0; i < token_memb_entries; i++) {
1246  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1247  agreed = 0;
1248  break;
1249  }
1250  }
1251 
1252  if (agreed && instance->failed_to_recv == 1) {
1253  /*
1254  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1255  * will create single ring anyway.
1256  */
1257 
1258  return (agreed);
1259  }
1260 
1261  assert (token_memb_entries >= 1);
1262 
1263  return (agreed);
1264 }
1265 
1266 static void memb_consensus_notset (
1267  struct totemsrp_instance *instance,
1268  struct srp_addr *no_consensus_list,
1269  int *no_consensus_list_entries,
1270  struct srp_addr *comparison_list,
1271  int comparison_list_entries)
1272 {
1273  int i;
1274 
1275  *no_consensus_list_entries = 0;
1276 
1277  for (i = 0; i < instance->my_proc_list_entries; i++) {
1278  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1279  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1280  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1281  }
1282  }
1283 }
1284 
1285 /*
1286  * Is set1 equal to set2 Entries can be in different orders
1287  */
1288 static int memb_set_equal (
1289  struct srp_addr *set1, int set1_entries,
1290  struct srp_addr *set2, int set2_entries)
1291 {
1292  int i;
1293  int j;
1294 
1295  int found = 0;
1296 
1297  if (set1_entries != set2_entries) {
1298  return (0);
1299  }
1300  for (i = 0; i < set2_entries; i++) {
1301  for (j = 0; j < set1_entries; j++) {
1302  if (srp_addr_equal (&set1[j], &set2[i])) {
1303  found = 1;
1304  break;
1305  }
1306  }
1307  if (found == 0) {
1308  return (0);
1309  }
1310  found = 0;
1311  }
1312  return (1);
1313 }
1314 
1315 /*
1316  * Is subset fully contained in fullset
1317  */
1318 static int memb_set_subset (
1319  const struct srp_addr *subset, int subset_entries,
1320  const struct srp_addr *fullset, int fullset_entries)
1321 {
1322  int i;
1323  int j;
1324  int found = 0;
1325 
1326  if (subset_entries > fullset_entries) {
1327  return (0);
1328  }
1329  for (i = 0; i < subset_entries; i++) {
1330  for (j = 0; j < fullset_entries; j++) {
1331  if (srp_addr_equal (&subset[i], &fullset[j])) {
1332  found = 1;
1333  }
1334  }
1335  if (found == 0) {
1336  return (0);
1337  }
1338  found = 0;
1339  }
1340  return (1);
1341 }
1342 /*
1343  * merge subset into fullset taking care not to add duplicates
1344  */
1345 static void memb_set_merge (
1346  const struct srp_addr *subset, int subset_entries,
1347  struct srp_addr *fullset, int *fullset_entries)
1348 {
1349  int found = 0;
1350  int i;
1351  int j;
1352 
1353  for (i = 0; i < subset_entries; i++) {
1354  for (j = 0; j < *fullset_entries; j++) {
1355  if (srp_addr_equal (&fullset[j], &subset[i])) {
1356  found = 1;
1357  break;
1358  }
1359  }
1360  if (found == 0) {
1361  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1362  *fullset_entries = *fullset_entries + 1;
1363  }
1364  found = 0;
1365  }
1366  return;
1367 }
1368 
1369 static void memb_set_and_with_ring_id (
1370  struct srp_addr *set1,
1371  struct memb_ring_id *set1_ring_ids,
1372  int set1_entries,
1373  struct srp_addr *set2,
1374  int set2_entries,
1375  struct memb_ring_id *old_ring_id,
1376  struct srp_addr *and,
1377  int *and_entries)
1378 {
1379  int i;
1380  int j;
1381  int found = 0;
1382 
1383  *and_entries = 0;
1384 
1385  for (i = 0; i < set2_entries; i++) {
1386  for (j = 0; j < set1_entries; j++) {
1387  if (srp_addr_equal (&set1[j], &set2[i])) {
1388  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1389  found = 1;
1390  }
1391  break;
1392  }
1393  }
1394  if (found) {
1395  srp_addr_copy (&and[*and_entries], &set1[j]);
1396  *and_entries = *and_entries + 1;
1397  }
1398  found = 0;
1399  }
1400  return;
1401 }
1402 
1403 static void memb_set_log(
1404  struct totemsrp_instance *instance,
1405  int level,
1406  const char *string,
1407  struct srp_addr *list,
1408  int list_entries)
1409 {
1410  char int_buf[32];
1411  char list_str[512];
1412  int i;
1413 
1414  memset(list_str, 0, sizeof(list_str));
1415 
1416  for (i = 0; i < list_entries; i++) {
1417  if (i == 0) {
1418  snprintf(int_buf, sizeof(int_buf), "%u", list[i].nodeid);
1419  } else {
1420  snprintf(int_buf, sizeof(int_buf), ",%u", list[i].nodeid);
1421  }
1422 
1423  if (strlen(list_str) + strlen(int_buf) >= sizeof(list_str)) {
1424  break ;
1425  }
1426  strcat(list_str, int_buf);
1427  }
1428 
1429  log_printf(level, "List '%s' contains %d entries: %s", string, list_entries, list_str);
1430 }
1431 
1432 static void my_leave_memb_clear(
1433  struct totemsrp_instance *instance)
1434 {
1435  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1436  instance->my_leave_memb_entries = 0;
1437 }
1438 
1439 static unsigned int my_leave_memb_match(
1440  struct totemsrp_instance *instance,
1441  unsigned int nodeid)
1442 {
1443  int i;
1444  unsigned int ret = 0;
1445 
1446  for (i = 0; i < instance->my_leave_memb_entries; i++){
1447  if (instance->my_leave_memb_list[i] == nodeid){
1448  ret = nodeid;
1449  break;
1450  }
1451  }
1452  return ret;
1453 }
1454 
1455 static void my_leave_memb_set(
1456  struct totemsrp_instance *instance,
1457  unsigned int nodeid)
1458 {
1459  int i, found = 0;
1460  for (i = 0; i < instance->my_leave_memb_entries; i++){
1461  if (instance->my_leave_memb_list[i] == nodeid){
1462  found = 1;
1463  break;
1464  }
1465  }
1466  if (found == 1) {
1467  return;
1468  }
1469  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1470  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1471  instance->my_leave_memb_entries++;
1472  } else {
1474  "Cannot set LEAVE nodeid=%d", nodeid);
1475  }
1476 }
1477 
1478 
1479 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1480 {
1481  assert (instance != NULL);
1482  return totemnet_buffer_alloc (instance->totemnet_context);
1483 }
1484 
1485 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1486 {
1487  assert (instance != NULL);
1488  totemnet_buffer_release (instance->totemnet_context, ptr);
1489 }
1490 
1491 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1492 {
1493  int32_t res;
1494 
1495  qb_loop_timer_del (instance->totemsrp_poll_handle,
1497  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1498  QB_LOOP_MED,
1499  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1500  (void *)instance,
1501  timer_function_token_retransmit_timeout,
1503  if (res != 0) {
1504  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1505  }
1506 
1507 }
1508 
1509 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1510 {
1511  int32_t res;
1512 
1513  if (instance->my_merge_detect_timeout_outstanding == 0) {
1514  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1515  QB_LOOP_MED,
1516  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1517  (void *)instance,
1518  timer_function_merge_detect_timeout,
1519  &instance->timer_merge_detect_timeout);
1520  if (res != 0) {
1521  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1522  }
1523 
1524  instance->my_merge_detect_timeout_outstanding = 1;
1525  }
1526 }
1527 
1528 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1529 {
1530  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1532 }
1533 
1534 /*
1535  * ring_state_* is used to save and restore the sort queue
1536  * state when a recovery operation fails (and enters gather)
1537  */
1538 static void old_ring_state_save (struct totemsrp_instance *instance)
1539 {
1540  if (instance->old_ring_state_saved == 0) {
1541  instance->old_ring_state_saved = 1;
1542  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1543  sizeof (struct memb_ring_id));
1544  instance->old_ring_state_aru = instance->my_aru;
1547  "Saving state aru %x high seq received %x",
1548  instance->my_aru, instance->my_high_seq_received);
1549  }
1550 }
1551 
1552 static void old_ring_state_restore (struct totemsrp_instance *instance)
1553 {
1554  instance->my_aru = instance->old_ring_state_aru;
1557  "Restoring instance->my_aru %x my high seq received %x",
1558  instance->my_aru, instance->my_high_seq_received);
1559 }
1560 
1561 static void old_ring_state_reset (struct totemsrp_instance *instance)
1562 {
1564  "Resetting old ring state");
1565  instance->old_ring_state_saved = 0;
1566 }
1567 
1568 static void reset_pause_timeout (struct totemsrp_instance *instance)
1569 {
1570  int32_t res;
1571 
1572  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1573  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1574  QB_LOOP_MED,
1575  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1576  (void *)instance,
1577  timer_function_pause_timeout,
1578  &instance->timer_pause_timeout);
1579  if (res != 0) {
1580  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1581  }
1582 }
1583 
1584 static void reset_token_warning (struct totemsrp_instance *instance) {
1585  int32_t res;
1586 
1587  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1588  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1589  QB_LOOP_MED,
1590  instance->totem_config->token_warning * instance->totem_config->token_timeout / 100 * QB_TIME_NS_IN_MSEC,
1591  (void *)instance,
1592  timer_function_orf_token_warning,
1593  &instance->timer_orf_token_warning);
1594  if (res != 0) {
1595  log_printf(instance->totemsrp_log_level_error, "reset_token_warning - qb_loop_timer_add error : %d", res);
1596  }
1597 }
1598 
1599 static void reset_token_timeout (struct totemsrp_instance *instance) {
1600  int32_t res;
1601 
1602  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1603  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1604  QB_LOOP_MED,
1605  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1606  (void *)instance,
1607  timer_function_orf_token_timeout,
1608  &instance->timer_orf_token_timeout);
1609  if (res != 0) {
1610  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1611  }
1612 
1613  if (instance->totem_config->token_warning)
1614  reset_token_warning(instance);
1615 }
1616 
1617 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1618  int32_t res;
1619 
1620  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1621  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1622  QB_LOOP_MED,
1623  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1624  (void *)instance,
1625  timer_function_heartbeat_timeout,
1626  &instance->timer_heartbeat_timeout);
1627  if (res != 0) {
1628  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1629  }
1630 }
1631 
1632 
1633 static void cancel_token_warning (struct totemsrp_instance *instance) {
1634  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_warning);
1635 }
1636 
1637 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1638  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1639 
1640  if (instance->totem_config->token_warning)
1641  cancel_token_warning(instance);
1642 }
1643 
1644 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1645  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1646 }
1647 
1648 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1649 {
1650  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1651 }
1652 
1653 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1654 {
1655  int32_t res;
1656 
1657  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1658  QB_LOOP_MED,
1659  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1660  (void *)instance,
1661  timer_function_token_hold_retransmit_timeout,
1663  if (res != 0) {
1664  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1665  }
1666 }
1667 
1668 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1669 {
1670  qb_loop_timer_del (instance->totemsrp_poll_handle,
1672 }
1673 
1674 static void memb_state_consensus_timeout_expired (
1675  struct totemsrp_instance *instance)
1676 {
1677  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1678  int no_consensus_list_entries;
1679 
1680  instance->stats.consensus_timeouts++;
1681  if (memb_consensus_agreed (instance)) {
1682  memb_consensus_reset (instance);
1683 
1684  memb_consensus_set (instance, &instance->my_id);
1685 
1686  reset_token_timeout (instance); // REVIEWED
1687  } else {
1688  memb_consensus_notset (
1689  instance,
1690  no_consensus_list,
1691  &no_consensus_list_entries,
1692  instance->my_proc_list,
1693  instance->my_proc_list_entries);
1694 
1695  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1696  instance->my_failed_list, &instance->my_failed_list_entries);
1697  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1698  }
1699 }
1700 
1701 static void memb_join_message_send (struct totemsrp_instance *instance);
1702 
1703 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1704 
1705 /*
1706  * Timers used for various states of the membership algorithm
1707  */
1708 static void timer_function_pause_timeout (void *data)
1709 {
1710  struct totemsrp_instance *instance = data;
1711 
1712  instance->pause_timestamp = qb_util_nano_current_get ();
1713  reset_pause_timeout (instance);
1714 }
1715 
1716 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1717 {
1718  old_ring_state_restore (instance);
1719  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1720  instance->stats.recovery_token_lost++;
1721 }
1722 
1723 static void timer_function_orf_token_warning (void *data)
1724 {
1725  struct totemsrp_instance *instance = data;
1726  uint64_t tv_diff;
1727 
1728  /* need to protect against the case where token_warning is set to 0 dynamically */
1729  if (instance->totem_config->token_warning) {
1730  tv_diff = qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC -
1731  instance->stats.token[instance->stats.latest_token].rx;
1733  "Token has not been received in %d ms ", (unsigned int) tv_diff);
1734  reset_token_warning(instance);
1735  } else {
1736  cancel_token_warning(instance);
1737  }
1738 }
1739 
1740 static void timer_function_orf_token_timeout (void *data)
1741 {
1742  struct totemsrp_instance *instance = data;
1743 
1744  switch (instance->memb_state) {
1747  "The token was lost in the OPERATIONAL state.");
1749  "A processor failed, forming new configuration.");
1751  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1752  instance->stats.operational_token_lost++;
1753  break;
1754 
1755  case MEMB_STATE_GATHER:
1757  "The consensus timeout expired.");
1758  memb_state_consensus_timeout_expired (instance);
1759  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1760  instance->stats.gather_token_lost++;
1761  break;
1762 
1763  case MEMB_STATE_COMMIT:
1765  "The token was lost in the COMMIT state.");
1766  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1767  instance->stats.commit_token_lost++;
1768  break;
1769 
1770  case MEMB_STATE_RECOVERY:
1772  "The token was lost in the RECOVERY state.");
1773  memb_recovery_state_token_loss (instance);
1774  instance->orf_token_discard = 1;
1775  break;
1776  }
1777 }
1778 
1779 static void timer_function_heartbeat_timeout (void *data)
1780 {
1781  struct totemsrp_instance *instance = data;
1783  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1784  timer_function_orf_token_timeout(data);
1785 }
1786 
1787 static void memb_timer_function_state_gather (void *data)
1788 {
1789  struct totemsrp_instance *instance = data;
1790  int32_t res;
1791 
1792  switch (instance->memb_state) {
1794  case MEMB_STATE_RECOVERY:
1795  assert (0); /* this should never happen */
1796  break;
1797  case MEMB_STATE_GATHER:
1798  case MEMB_STATE_COMMIT:
1799  memb_join_message_send (instance);
1800 
1801  /*
1802  * Restart the join timeout
1803  `*/
1804  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1805 
1806  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1807  QB_LOOP_MED,
1808  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1809  (void *)instance,
1810  memb_timer_function_state_gather,
1811  &instance->memb_timer_state_gather_join_timeout);
1812 
1813  if (res != 0) {
1814  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1815  }
1816  break;
1817  }
1818 }
1819 
1820 static void memb_timer_function_gather_consensus_timeout (void *data)
1821 {
1822  struct totemsrp_instance *instance = data;
1823  memb_state_consensus_timeout_expired (instance);
1824 }
1825 
1826 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1827 {
1828  unsigned int i;
1829  struct sort_queue_item *recovery_message_item;
1830  struct sort_queue_item regular_message_item;
1831  unsigned int range = 0;
1832  int res;
1833  void *ptr;
1834  struct mcast *mcast;
1835 
1837  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1838 
1839  range = instance->my_aru - SEQNO_START_MSG;
1840  /*
1841  * Move messages from recovery to regular sort queue
1842  */
1843 // todo should i be initialized to 0 or 1 ?
1844  for (i = 1; i <= range; i++) {
1845  res = sq_item_get (&instance->recovery_sort_queue,
1846  i + SEQNO_START_MSG, &ptr);
1847  if (res != 0) {
1848  continue;
1849  }
1850  recovery_message_item = ptr;
1851 
1852  /*
1853  * Convert recovery message into regular message
1854  */
1855  mcast = recovery_message_item->mcast;
1856  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1857  /*
1858  * Message is a recovery message encapsulated
1859  * in a new ring message
1860  */
1861  regular_message_item.mcast =
1862  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1863  regular_message_item.msg_len =
1864  recovery_message_item->msg_len - sizeof (struct mcast);
1865  mcast = regular_message_item.mcast;
1866  } else {
1867  /*
1868  * TODO this case shouldn't happen
1869  */
1870  continue;
1871  }
1872 
1874  "comparing if ring id is for this processors old ring seqno %d",
1875  mcast->seq);
1876 
1877  /*
1878  * Only add this message to the regular sort
1879  * queue if it was originated with the same ring
1880  * id as the previous ring
1881  */
1882  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1883  sizeof (struct memb_ring_id)) == 0) {
1884 
1885  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1886  if (res == 0) {
1887  sq_item_add (&instance->regular_sort_queue,
1888  &regular_message_item, mcast->seq);
1889  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1890  instance->old_ring_state_high_seq_received = mcast->seq;
1891  }
1892  }
1893  } else {
1895  "-not adding msg with seq no %x", mcast->seq);
1896  }
1897  }
1898 }
1899 
1900 /*
1901  * Change states in the state machine of the membership algorithm
1902  */
1903 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1904 {
1905  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1906  int joined_list_entries = 0;
1907  unsigned int aru_save;
1908  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1909  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1910  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1911  unsigned int left_list[PROCESSOR_COUNT_MAX];
1912  unsigned int i;
1913  unsigned int res;
1914  char left_node_msg[1024];
1915  char joined_node_msg[1024];
1916  char failed_node_msg[1024];
1917 
1918  instance->originated_orf_token = 0;
1919 
1920  memb_consensus_reset (instance);
1921 
1922  old_ring_state_reset (instance);
1923 
1924  deliver_messages_from_recovery_to_regular (instance);
1925 
1927  "Delivering to app %x to %x",
1928  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1929 
1930  aru_save = instance->my_aru;
1931  instance->my_aru = instance->old_ring_state_aru;
1932 
1933  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1934 
1935  /*
1936  * Calculate joined and left list
1937  */
1938  memb_set_subtract (instance->my_left_memb_list,
1939  &instance->my_left_memb_entries,
1940  instance->my_memb_list, instance->my_memb_entries,
1941  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1942 
1943  memb_set_subtract (joined_list, &joined_list_entries,
1944  instance->my_new_memb_list, instance->my_new_memb_entries,
1945  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1946 
1947  /*
1948  * Install new membership
1949  */
1950  instance->my_memb_entries = instance->my_new_memb_entries;
1951  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1952  sizeof (struct srp_addr) * instance->my_memb_entries);
1953  instance->last_released = 0;
1954  instance->my_set_retrans_flg = 0;
1955 
1956  /*
1957  * Deliver transitional configuration to application
1958  */
1959  srp_addr_to_nodeid (instance, left_list, instance->my_left_memb_list,
1960  instance->my_left_memb_entries);
1961  srp_addr_to_nodeid (instance, trans_memb_list_totemip,
1962  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1964  trans_memb_list_totemip, instance->my_trans_memb_entries,
1965  left_list, instance->my_left_memb_entries,
1966  0, 0, &instance->my_ring_id);
1967  instance->waiting_trans_ack = 1;
1968  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1969 
1970 // TODO we need to filter to ensure we only deliver those
1971 // messages which are part of instance->my_deliver_memb
1972  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1973 
1974  instance->my_aru = aru_save;
1975 
1976  /*
1977  * Deliver regular configuration to application
1978  */
1979  srp_addr_to_nodeid (instance, new_memb_list_totemip,
1980  instance->my_new_memb_list, instance->my_new_memb_entries);
1981  srp_addr_to_nodeid (instance, joined_list_totemip, joined_list,
1982  joined_list_entries);
1984  new_memb_list_totemip, instance->my_new_memb_entries,
1985  0, 0,
1986  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
1987 
1988  /*
1989  * The recovery sort queue now becomes the regular
1990  * sort queue. It is necessary to copy the state
1991  * into the regular sort queue.
1992  */
1993  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
1994  instance->my_last_aru = SEQNO_START_MSG;
1995 
1996  /* When making my_proc_list smaller, ensure that the
1997  * now non-used entries are zero-ed out. There are some suspect
1998  * assert's that assume that there is always 2 entries in the list.
1999  * These fail when my_proc_list is reduced to 1 entry (and the
2000  * valid [0] entry is the same as the 'unused' [1] entry).
2001  */
2002  memset(instance->my_proc_list, 0,
2003  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2004 
2005  instance->my_proc_list_entries = instance->my_new_memb_entries;
2006  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2007  sizeof (struct srp_addr) * instance->my_memb_entries);
2008 
2009  instance->my_failed_list_entries = 0;
2010  /*
2011  * TODO Not exactly to spec
2012  *
2013  * At the entry to this function all messages without a gap are
2014  * deliered.
2015  *
2016  * This code throw away messages from the last gap in the sort queue
2017  * to my_high_seq_received
2018  *
2019  * What should really happen is we should deliver all messages up to
2020  * a gap, then delier the transitional configuration, then deliver
2021  * the messages between the first gap and my_high_seq_received, then
2022  * deliver a regular configuration, then deliver the regular
2023  * configuration
2024  *
2025  * Unfortunately totempg doesn't appear to like this operating mode
2026  * which needs more inspection
2027  */
2028  i = instance->my_high_seq_received + 1;
2029  do {
2030  void *ptr;
2031 
2032  i -= 1;
2033  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2034  if (i == 0) {
2035  break;
2036  }
2037  } while (res);
2038 
2039  instance->my_high_delivered = i;
2040 
2041  for (i = 0; i <= instance->my_high_delivered; i++) {
2042  void *ptr;
2043 
2044  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2045  if (res == 0) {
2046  struct sort_queue_item *regular_message;
2047 
2048  regular_message = ptr;
2049  free (regular_message->mcast);
2050  }
2051  }
2052  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2053  instance->last_released = instance->my_high_delivered;
2054 
2055  if (joined_list_entries) {
2056  int sptr = 0;
2057  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2058  for (i=0; i< joined_list_entries; i++) {
2059  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2060  }
2061  }
2062  else {
2063  joined_node_msg[0] = '\0';
2064  }
2065 
2066  if (instance->my_left_memb_entries) {
2067  int sptr = 0;
2068  int sptr2 = 0;
2069  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2070  for (i=0; i< instance->my_left_memb_entries; i++) {
2071  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2072  }
2073  for (i=0; i< instance->my_left_memb_entries; i++) {
2074  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2075  if (sptr2 == 0) {
2076  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2077  }
2078  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2079  }
2080  }
2081  if (sptr2 == 0) {
2082  failed_node_msg[0] = '\0';
2083  }
2084  }
2085  else {
2086  left_node_msg[0] = '\0';
2087  failed_node_msg[0] = '\0';
2088  }
2089 
2090  my_leave_memb_clear(instance);
2091 
2093  "entering OPERATIONAL state.");
2095  "A new membership (%u:%lld) was formed. Members%s%s",
2096  instance->my_ring_id.rep,
2097  instance->my_ring_id.seq,
2098  joined_node_msg,
2099  left_node_msg);
2100 
2101  if (strlen(failed_node_msg)) {
2103  "Failed to receive the leave message.%s",
2104  failed_node_msg);
2105  }
2106 
2107  instance->memb_state = MEMB_STATE_OPERATIONAL;
2108 
2109  instance->stats.operational_entered++;
2110  instance->stats.continuous_gather = 0;
2111 
2112  instance->my_received_flg = 1;
2113 
2114  reset_pause_timeout (instance);
2115 
2116  /*
2117  * Save ring id information from this configuration to determine
2118  * which processors are transitioning from old regular configuration
2119  * in to new regular configuration on the next configuration change
2120  */
2121  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2122  sizeof (struct memb_ring_id));
2123 
2124  return;
2125 }
2126 
2127 static void memb_state_gather_enter (
2128  struct totemsrp_instance *instance,
2129  enum gather_state_from gather_from)
2130 {
2131  int32_t res;
2132 
2133  instance->orf_token_discard = 1;
2134 
2135  instance->originated_orf_token = 0;
2136 
2137  memb_set_merge (
2138  &instance->my_id, 1,
2139  instance->my_proc_list, &instance->my_proc_list_entries);
2140 
2141  memb_join_message_send (instance);
2142 
2143  /*
2144  * Restart the join timeout
2145  */
2146  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2147 
2148  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2149  QB_LOOP_MED,
2150  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2151  (void *)instance,
2152  memb_timer_function_state_gather,
2153  &instance->memb_timer_state_gather_join_timeout);
2154  if (res != 0) {
2155  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2156  }
2157 
2158  /*
2159  * Restart the consensus timeout
2160  */
2161  qb_loop_timer_del (instance->totemsrp_poll_handle,
2162  instance->memb_timer_state_gather_consensus_timeout);
2163 
2164  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2165  QB_LOOP_MED,
2166  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2167  (void *)instance,
2168  memb_timer_function_gather_consensus_timeout,
2169  &instance->memb_timer_state_gather_consensus_timeout);
2170  if (res != 0) {
2171  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2172  }
2173 
2174  /*
2175  * Cancel the token loss and token retransmission timeouts
2176  */
2177  cancel_token_retransmit_timeout (instance); // REVIEWED
2178  cancel_token_timeout (instance); // REVIEWED
2179  cancel_merge_detect_timeout (instance);
2180 
2181  memb_consensus_reset (instance);
2182 
2183  memb_consensus_set (instance, &instance->my_id);
2184 
2185  log_printf (instance->totemsrp_log_level_debug,
2186  "entering GATHER state from %d(%s).",
2187  gather_from, gsfrom_to_msg(gather_from));
2188 
2189  instance->memb_state = MEMB_STATE_GATHER;
2190  instance->stats.gather_entered++;
2191 
2192  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2193  /*
2194  * State 3 means gather, so we are continuously gathering.
2195  */
2196  instance->stats.continuous_gather++;
2197  }
2198 
2199  return;
2200 }
2201 
2202 static void timer_function_token_retransmit_timeout (void *data);
2203 
2204 static void target_set_completed (
2205  void *context)
2206 {
2207  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2208 
2209  memb_state_commit_token_send (instance);
2210 
2211 }
2212 
2213 static void memb_state_commit_enter (
2214  struct totemsrp_instance *instance)
2215 {
2216  old_ring_state_save (instance);
2217 
2218  memb_state_commit_token_update (instance);
2219 
2220  memb_state_commit_token_target_set (instance);
2221 
2222  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2223 
2225 
2226  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2227 
2229 
2230  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2231 
2232  instance->memb_ring_id_store (&instance->my_ring_id, instance->my_id.nodeid);
2233 
2234  instance->token_ring_id_seq = instance->my_ring_id.seq;
2235 
2237  "entering COMMIT state.");
2238 
2239  instance->memb_state = MEMB_STATE_COMMIT;
2240  reset_token_retransmit_timeout (instance); // REVIEWED
2241  reset_token_timeout (instance); // REVIEWED
2242 
2243  instance->stats.commit_entered++;
2244  instance->stats.continuous_gather = 0;
2245 
2246  /*
2247  * reset all flow control variables since we are starting a new ring
2248  */
2249  instance->my_trc = 0;
2250  instance->my_pbl = 0;
2251  instance->my_cbl = 0;
2252  /*
2253  * commit token sent after callback that token target has been set
2254  */
2255 }
2256 
2257 static void memb_state_recovery_enter (
2258  struct totemsrp_instance *instance,
2259  struct memb_commit_token *commit_token)
2260 {
2261  int i;
2262  int local_received_flg = 1;
2263  unsigned int low_ring_aru;
2264  unsigned int range = 0;
2265  unsigned int messages_originated = 0;
2266  const struct srp_addr *addr;
2267  struct memb_commit_token_memb_entry *memb_list;
2268  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2269 
2270  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2271  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2272 
2274  "entering RECOVERY state.");
2275 
2276  instance->orf_token_discard = 0;
2277 
2278  instance->my_high_ring_delivered = 0;
2279 
2280  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2281  cs_queue_reinit (&instance->retrans_message_queue);
2282 
2283  low_ring_aru = instance->old_ring_state_high_seq_received;
2284 
2285  memb_state_commit_token_send_recovery (instance, commit_token);
2286 
2287  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2288 
2289  /*
2290  * Build regular configuration
2291  */
2293  instance->totemnet_context,
2294  commit_token->addr_entries);
2295 
2296  /*
2297  * Build transitional configuration
2298  */
2299  for (i = 0; i < instance->my_new_memb_entries; i++) {
2300  memcpy (&my_new_memb_ring_id_list[i],
2301  &memb_list[i].ring_id,
2302  sizeof (struct memb_ring_id));
2303  }
2304  memb_set_and_with_ring_id (
2305  instance->my_new_memb_list,
2306  my_new_memb_ring_id_list,
2307  instance->my_new_memb_entries,
2308  instance->my_memb_list,
2309  instance->my_memb_entries,
2310  &instance->my_old_ring_id,
2311  instance->my_trans_memb_list,
2312  &instance->my_trans_memb_entries);
2313 
2314  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2316  "TRANS [%d] member %u:", i, instance->my_trans_memb_list[i].nodeid);
2317  }
2318  for (i = 0; i < instance->my_new_memb_entries; i++) {
2320  "position [%d] member %u:", i, addr[i].nodeid);
2322  "previous ring seq %llx rep %u",
2323  memb_list[i].ring_id.seq,
2324  memb_list[i].ring_id.rep);
2325 
2327  "aru %x high delivered %x received flag %d",
2328  memb_list[i].aru,
2329  memb_list[i].high_delivered,
2330  memb_list[i].received_flg);
2331 
2332  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2333  }
2334  /*
2335  * Determine if any received flag is false
2336  */
2337  for (i = 0; i < commit_token->addr_entries; i++) {
2338  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2339  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2340 
2341  memb_list[i].received_flg == 0) {
2342  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2343  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2344  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2345  local_received_flg = 0;
2346  break;
2347  }
2348  }
2349  if (local_received_flg == 1) {
2350  goto no_originate;
2351  } /* Else originate messages if we should */
2352 
2353  /*
2354  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2355  */
2356  for (i = 0; i < commit_token->addr_entries; i++) {
2357  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2358  instance->my_deliver_memb_list,
2359  instance->my_deliver_memb_entries) &&
2360 
2361  memcmp (&instance->my_old_ring_id,
2362  &memb_list[i].ring_id,
2363  sizeof (struct memb_ring_id)) == 0) {
2364 
2365  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2366 
2367  low_ring_aru = memb_list[i].aru;
2368  }
2369  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2370  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2371  }
2372  }
2373  }
2374 
2375  /*
2376  * Copy all old ring messages to instance->retrans_message_queue
2377  */
2378  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2379  if (range == 0) {
2380  /*
2381  * No messages to copy
2382  */
2383  goto no_originate;
2384  }
2385  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2386 
2388  "copying all old ring messages from %x-%x.",
2389  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2390 
2391  for (i = 1; i <= range; i++) {
2393  struct message_item message_item;
2394  void *ptr;
2395  int res;
2396 
2397  res = sq_item_get (&instance->regular_sort_queue,
2398  low_ring_aru + i, &ptr);
2399  if (res != 0) {
2400  continue;
2401  }
2402  sort_queue_item = ptr;
2403  messages_originated++;
2404  memset (&message_item, 0, sizeof (struct message_item));
2405  // TODO LEAK
2406  message_item.mcast = totemsrp_buffer_alloc (instance);
2407  assert (message_item.mcast);
2408  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2409  message_item.mcast->header.version = TOTEM_MH_VERSION;
2410  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2411  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2413 
2414  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2415  assert (message_item.mcast->header.nodeid);
2416  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2417  sizeof (struct memb_ring_id));
2418  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2419  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2420  sort_queue_item->mcast,
2421  sort_queue_item->msg_len);
2422  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2423  }
2425  "Originated %d messages in RECOVERY.", messages_originated);
2426  goto originated;
2427 
2428 no_originate:
2430  "Did not need to originate any messages in recovery.");
2431 
2432 originated:
2433  instance->my_aru = SEQNO_START_MSG;
2434  instance->my_aru_count = 0;
2435  instance->my_seq_unchanged = 0;
2437  instance->my_install_seq = SEQNO_START_MSG;
2438  instance->last_released = SEQNO_START_MSG;
2439 
2440  reset_token_timeout (instance); // REVIEWED
2441  reset_token_retransmit_timeout (instance); // REVIEWED
2442 
2443  instance->memb_state = MEMB_STATE_RECOVERY;
2444  instance->stats.recovery_entered++;
2445  instance->stats.continuous_gather = 0;
2446 
2447  return;
2448 }
2449 
2450 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2451 {
2452  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2453 
2454  token_hold_cancel_send (instance);
2455 
2456  return;
2457 }
2458 
2460  void *srp_context,
2461  struct iovec *iovec,
2462  unsigned int iov_len,
2463  int guarantee)
2464 {
2465  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2466  int i;
2467  struct message_item message_item;
2468  char *addr;
2469  unsigned int addr_idx;
2470  struct cs_queue *queue_use;
2471 
2472  if (instance->waiting_trans_ack) {
2473  queue_use = &instance->new_message_queue_trans;
2474  } else {
2475  queue_use = &instance->new_message_queue;
2476  }
2477 
2478  if (cs_queue_is_full (queue_use)) {
2479  log_printf (instance->totemsrp_log_level_debug, "queue full");
2480  return (-1);
2481  }
2482 
2483  memset (&message_item, 0, sizeof (struct message_item));
2484 
2485  /*
2486  * Allocate pending item
2487  */
2488  message_item.mcast = totemsrp_buffer_alloc (instance);
2489  if (message_item.mcast == 0) {
2490  goto error_mcast;
2491  }
2492 
2493  /*
2494  * Set mcast header
2495  */
2496  memset(message_item.mcast, 0, sizeof (struct mcast));
2497  message_item.mcast->header.magic = TOTEM_MH_MAGIC;
2498  message_item.mcast->header.version = TOTEM_MH_VERSION;
2499  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2501 
2502  message_item.mcast->header.nodeid = instance->my_id.nodeid;
2503  assert (message_item.mcast->header.nodeid);
2504 
2505  message_item.mcast->guarantee = guarantee;
2506  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2507 
2508  addr = (char *)message_item.mcast;
2509  addr_idx = sizeof (struct mcast);
2510  for (i = 0; i < iov_len; i++) {
2511  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2512  addr_idx += iovec[i].iov_len;
2513  }
2514 
2515  message_item.msg_len = addr_idx;
2516 
2517  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2518  instance->stats.mcast_tx++;
2519  cs_queue_item_add (queue_use, &message_item);
2520 
2521  return (0);
2522 
2523 error_mcast:
2524  return (-1);
2525 }
2526 
2527 /*
2528  * Determine if there is room to queue a new message
2529  */
2530 int totemsrp_avail (void *srp_context)
2531 {
2532  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2533  int avail;
2534  struct cs_queue *queue_use;
2535 
2536  if (instance->waiting_trans_ack) {
2537  queue_use = &instance->new_message_queue_trans;
2538  } else {
2539  queue_use = &instance->new_message_queue;
2540  }
2541  cs_queue_avail (queue_use, &avail);
2542 
2543  return (avail);
2544 }
2545 
2546 /*
2547  * ORF Token Management
2548  */
2549 /*
2550  * Recast message to mcast group if it is available
2551  */
2552 static int orf_token_remcast (
2553  struct totemsrp_instance *instance,
2554  int seq)
2555 {
2557  int res;
2558  void *ptr;
2559 
2560  struct sq *sort_queue;
2561 
2562  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2563  sort_queue = &instance->recovery_sort_queue;
2564  } else {
2565  sort_queue = &instance->regular_sort_queue;
2566  }
2567 
2568  res = sq_in_range (sort_queue, seq);
2569  if (res == 0) {
2570  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2571  return (-1);
2572  }
2573 
2574  /*
2575  * Get RTR item at seq, if not available, return
2576  */
2577  res = sq_item_get (sort_queue, seq, &ptr);
2578  if (res != 0) {
2579  return -1;
2580  }
2581 
2582  sort_queue_item = ptr;
2583 
2585  instance->totemnet_context,
2586  sort_queue_item->mcast,
2587  sort_queue_item->msg_len);
2588 
2589  return (0);
2590 }
2591 
2592 
2593 /*
2594  * Free all freeable messages from ring
2595  */
2596 static void messages_free (
2597  struct totemsrp_instance *instance,
2598  unsigned int token_aru)
2599 {
2600  struct sort_queue_item *regular_message;
2601  unsigned int i;
2602  int res;
2603  int log_release = 0;
2604  unsigned int release_to;
2605  unsigned int range = 0;
2606 
2607  release_to = token_aru;
2608  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2609  release_to = instance->my_last_aru;
2610  }
2611  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2612  release_to = instance->my_high_delivered;
2613  }
2614 
2615  /*
2616  * Ensure we dont try release before an already released point
2617  */
2618  if (sq_lt_compare (release_to, instance->last_released)) {
2619  return;
2620  }
2621 
2622  range = release_to - instance->last_released;
2623  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2624 
2625  /*
2626  * Release retransmit list items if group aru indicates they are transmitted
2627  */
2628  for (i = 1; i <= range; i++) {
2629  void *ptr;
2630 
2631  res = sq_item_get (&instance->regular_sort_queue,
2632  instance->last_released + i, &ptr);
2633  if (res == 0) {
2634  regular_message = ptr;
2635  totemsrp_buffer_release (instance, regular_message->mcast);
2636  }
2637  sq_items_release (&instance->regular_sort_queue,
2638  instance->last_released + i);
2639 
2640  log_release = 1;
2641  }
2642  instance->last_released += range;
2643 
2644  if (log_release) {
2646  "releasing messages up to and including %x", release_to);
2647  }
2648 }
2649 
2650 static void update_aru (
2651  struct totemsrp_instance *instance)
2652 {
2653  unsigned int i;
2654  int res;
2655  struct sq *sort_queue;
2656  unsigned int range;
2657  unsigned int my_aru_saved = 0;
2658 
2659  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2660  sort_queue = &instance->recovery_sort_queue;
2661  } else {
2662  sort_queue = &instance->regular_sort_queue;
2663  }
2664 
2665  range = instance->my_high_seq_received - instance->my_aru;
2666 
2667  my_aru_saved = instance->my_aru;
2668  for (i = 1; i <= range; i++) {
2669 
2670  void *ptr;
2671 
2672  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2673  /*
2674  * If hole, stop updating aru
2675  */
2676  if (res != 0) {
2677  break;
2678  }
2679  }
2680  instance->my_aru += i - 1;
2681 }
2682 
2683 /*
2684  * Multicasts pending messages onto the ring (requires orf_token possession)
2685  */
2686 static int orf_token_mcast (
2687  struct totemsrp_instance *instance,
2688  struct orf_token *token,
2689  int fcc_mcasts_allowed)
2690 {
2691  struct message_item *message_item = 0;
2692  struct cs_queue *mcast_queue;
2693  struct sq *sort_queue;
2694  struct sort_queue_item sort_queue_item;
2695  struct mcast *mcast;
2696  unsigned int fcc_mcast_current;
2697 
2698  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2699  mcast_queue = &instance->retrans_message_queue;
2700  sort_queue = &instance->recovery_sort_queue;
2701  reset_token_retransmit_timeout (instance); // REVIEWED
2702  } else {
2703  if (instance->waiting_trans_ack) {
2704  mcast_queue = &instance->new_message_queue_trans;
2705  } else {
2706  mcast_queue = &instance->new_message_queue;
2707  }
2708 
2709  sort_queue = &instance->regular_sort_queue;
2710  }
2711 
2712  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2713  if (cs_queue_is_empty (mcast_queue)) {
2714  break;
2715  }
2716  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2717 
2718  message_item->mcast->seq = ++token->seq;
2719  message_item->mcast->this_seqno = instance->global_seqno++;
2720 
2721  /*
2722  * Build IO vector
2723  */
2724  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2725  sort_queue_item.mcast = message_item->mcast;
2726  sort_queue_item.msg_len = message_item->msg_len;
2727 
2728  mcast = sort_queue_item.mcast;
2729 
2730  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2731 
2732  /*
2733  * Add message to retransmit queue
2734  */
2735  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2736 
2738  instance->totemnet_context,
2739  message_item->mcast,
2740  message_item->msg_len);
2741 
2742  /*
2743  * Delete item from pending queue
2744  */
2745  cs_queue_item_remove (mcast_queue);
2746 
2747  /*
2748  * If messages mcasted, deliver any new messages to totempg
2749  */
2750  instance->my_high_seq_received = token->seq;
2751  }
2752 
2753  update_aru (instance);
2754 
2755  /*
2756  * Return 1 if more messages are available for single node clusters
2757  */
2758  return (fcc_mcast_current);
2759 }
2760 
2761 /*
2762  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2763  * Modify's orf_token's rtr to include retransmits required by this process
2764  */
2765 static int orf_token_rtr (
2766  struct totemsrp_instance *instance,
2767  struct orf_token *orf_token,
2768  unsigned int *fcc_allowed)
2769 {
2770  unsigned int res;
2771  unsigned int i, j;
2772  unsigned int found;
2773  struct sq *sort_queue;
2774  struct rtr_item *rtr_list;
2775  unsigned int range = 0;
2776  char retransmit_msg[1024];
2777  char value[64];
2778 
2779  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2780  sort_queue = &instance->recovery_sort_queue;
2781  } else {
2782  sort_queue = &instance->regular_sort_queue;
2783  }
2784 
2785  rtr_list = &orf_token->rtr_list[0];
2786 
2787  strcpy (retransmit_msg, "Retransmit List: ");
2788  if (orf_token->rtr_list_entries) {
2790  "Retransmit List %d", orf_token->rtr_list_entries);
2791  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2792  sprintf (value, "%x ", rtr_list[i].seq);
2793  strcat (retransmit_msg, value);
2794  }
2795  strcat (retransmit_msg, "");
2797  "%s", retransmit_msg);
2798  }
2799 
2800  /*
2801  * Retransmit messages on orf_token's RTR list from RTR queue
2802  */
2803  for (instance->fcc_remcast_current = 0, i = 0;
2804  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2805 
2806  /*
2807  * If this retransmit request isn't from this configuration,
2808  * try next rtr entry
2809  */
2810  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2811  sizeof (struct memb_ring_id)) != 0) {
2812 
2813  i += 1;
2814  continue;
2815  }
2816 
2817  res = orf_token_remcast (instance, rtr_list[i].seq);
2818  if (res == 0) {
2819  /*
2820  * Multicasted message, so no need to copy to new retransmit list
2821  */
2822  orf_token->rtr_list_entries -= 1;
2823  assert (orf_token->rtr_list_entries >= 0);
2824  memmove (&rtr_list[i], &rtr_list[i + 1],
2825  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2826 
2827  instance->stats.mcast_retx++;
2828  instance->fcc_remcast_current++;
2829  } else {
2830  i += 1;
2831  }
2832  }
2833  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2834 
2835  /*
2836  * Add messages to retransmit to RTR list
2837  * but only retry if there is room in the retransmit list
2838  */
2839 
2840  range = orf_token->seq - instance->my_aru;
2841  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2842 
2843  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2844  (i <= range); i++) {
2845 
2846  /*
2847  * Ensure message is within the sort queue range
2848  */
2849  res = sq_in_range (sort_queue, instance->my_aru + i);
2850  if (res == 0) {
2851  break;
2852  }
2853 
2854  /*
2855  * Find if a message is missing from this processor
2856  */
2857  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2858  if (res == 0) {
2859  /*
2860  * Determine how many times we have missed receiving
2861  * this sequence number. sq_item_miss_count increments
2862  * a counter for the sequence number. The miss count
2863  * will be returned and compared. This allows time for
2864  * delayed multicast messages to be received before
2865  * declaring the message is missing and requesting a
2866  * retransmit.
2867  */
2868  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2869  if (res < instance->totem_config->miss_count_const) {
2870  continue;
2871  }
2872 
2873  /*
2874  * Determine if missing message is already in retransmit list
2875  */
2876  found = 0;
2877  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2878  if (instance->my_aru + i == rtr_list[j].seq) {
2879  found = 1;
2880  }
2881  }
2882  if (found == 0) {
2883  /*
2884  * Missing message not found in current retransmit list so add it
2885  */
2886  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2887  &instance->my_ring_id, sizeof (struct memb_ring_id));
2888  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2889  orf_token->rtr_list_entries++;
2890  }
2891  }
2892  }
2893  return (instance->fcc_remcast_current);
2894 }
2895 
2896 static void token_retransmit (struct totemsrp_instance *instance)
2897 {
2899  instance->orf_token_retransmit,
2900  instance->orf_token_retransmit_size);
2901 }
2902 
2903 /*
2904  * Retransmit the regular token if no mcast or token has
2905  * been received in retransmit token period retransmit
2906  * the token to the next processor
2907  */
2908 static void timer_function_token_retransmit_timeout (void *data)
2909 {
2910  struct totemsrp_instance *instance = data;
2911 
2912  switch (instance->memb_state) {
2913  case MEMB_STATE_GATHER:
2914  break;
2915  case MEMB_STATE_COMMIT:
2917  case MEMB_STATE_RECOVERY:
2918  token_retransmit (instance);
2919  reset_token_retransmit_timeout (instance); // REVIEWED
2920  break;
2921  }
2922 }
2923 
2924 static void timer_function_token_hold_retransmit_timeout (void *data)
2925 {
2926  struct totemsrp_instance *instance = data;
2927 
2928  switch (instance->memb_state) {
2929  case MEMB_STATE_GATHER:
2930  break;
2931  case MEMB_STATE_COMMIT:
2932  break;
2934  case MEMB_STATE_RECOVERY:
2935  token_retransmit (instance);
2936  break;
2937  }
2938 }
2939 
2940 static void timer_function_merge_detect_timeout(void *data)
2941 {
2942  struct totemsrp_instance *instance = data;
2943 
2945 
2946  switch (instance->memb_state) {
2948  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
2949  memb_merge_detect_transmit (instance);
2950  }
2951  break;
2952  case MEMB_STATE_GATHER:
2953  case MEMB_STATE_COMMIT:
2954  case MEMB_STATE_RECOVERY:
2955  break;
2956  }
2957 }
2958 
2959 /*
2960  * Send orf_token to next member (requires orf_token)
2961  */
2962 static int token_send (
2963  struct totemsrp_instance *instance,
2964  struct orf_token *orf_token,
2965  int forward_token)
2966 {
2967  int res = 0;
2968  unsigned int orf_token_size;
2969 
2970  orf_token_size = sizeof (struct orf_token) +
2971  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2972 
2973  orf_token->header.nodeid = instance->my_id.nodeid;
2974  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
2975  instance->orf_token_retransmit_size = orf_token_size;
2976  assert (orf_token->header.nodeid);
2977 
2978  if (forward_token == 0) {
2979  return (0);
2980  }
2981 
2983  orf_token,
2984  orf_token_size);
2985 
2986  return (res);
2987 }
2988 
2989 static int token_hold_cancel_send (struct totemsrp_instance *instance)
2990 {
2991  struct token_hold_cancel token_hold_cancel;
2992 
2993  /*
2994  * Only cancel if the token is currently held
2995  */
2996  if (instance->my_token_held == 0) {
2997  return (0);
2998  }
2999  instance->my_token_held = 0;
3000 
3001  /*
3002  * Build message
3003  */
3004  token_hold_cancel.header.magic = TOTEM_MH_MAGIC;
3005  token_hold_cancel.header.version = TOTEM_MH_VERSION;
3006  token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
3007  token_hold_cancel.header.encapsulated = 0;
3008  token_hold_cancel.header.nodeid = instance->my_id.nodeid;
3009  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3010  sizeof (struct memb_ring_id));
3011  assert (token_hold_cancel.header.nodeid);
3012 
3013  instance->stats.token_hold_cancel_tx++;
3014 
3015  totemnet_mcast_flush_send (instance->totemnet_context, &token_hold_cancel,
3016  sizeof (struct token_hold_cancel));
3017 
3018  return (0);
3019 }
3020 
3021 static int orf_token_send_initial (struct totemsrp_instance *instance)
3022 {
3023  struct orf_token orf_token;
3024  int res;
3025 
3026  orf_token.header.magic = TOTEM_MH_MAGIC;
3027  orf_token.header.version = TOTEM_MH_VERSION;
3028  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3029  orf_token.header.encapsulated = 0;
3030  orf_token.header.nodeid = instance->my_id.nodeid;
3031  assert (orf_token.header.nodeid);
3032  orf_token.seq = SEQNO_START_MSG;
3033  orf_token.token_seq = SEQNO_START_TOKEN;
3034  orf_token.retrans_flg = 1;
3035  instance->my_set_retrans_flg = 1;
3036  instance->stats.orf_token_tx++;
3037 
3038  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3039  orf_token.retrans_flg = 0;
3040  instance->my_set_retrans_flg = 0;
3041  } else {
3042  orf_token.retrans_flg = 1;
3043  instance->my_set_retrans_flg = 1;
3044  }
3045 
3046  orf_token.aru = 0;
3047  orf_token.aru = SEQNO_START_MSG - 1;
3048  orf_token.aru_addr = instance->my_id.nodeid;
3049 
3050  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3051  orf_token.fcc = 0;
3052  orf_token.backlog = 0;
3053 
3054  orf_token.rtr_list_entries = 0;
3055 
3056  res = token_send (instance, &orf_token, 1);
3057 
3058  return (res);
3059 }
3060 
3061 static void memb_state_commit_token_update (
3062  struct totemsrp_instance *instance)
3063 {
3064  struct srp_addr *addr;
3065  struct memb_commit_token_memb_entry *memb_list;
3066  unsigned int high_aru;
3067  unsigned int i;
3068 
3069  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3070  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3071 
3072  memcpy (instance->my_new_memb_list, addr,
3073  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3074 
3075  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3076 
3077  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3078  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3079 
3080  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3081  /*
3082  * TODO high delivered is really instance->my_aru, but with safe this
3083  * could change?
3084  */
3085  instance->my_received_flg =
3086  (instance->my_aru == instance->my_high_seq_received);
3087 
3088  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3089 
3090  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3091  /*
3092  * find high aru up to current memb_index for all matching ring ids
3093  * if any ring id matching memb_index has aru less then high aru set
3094  * received flag for that entry to false
3095  */
3096  high_aru = memb_list[instance->commit_token->memb_index].aru;
3097  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3098  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3099  &memb_list[i].ring_id,
3100  sizeof (struct memb_ring_id)) == 0) {
3101 
3102  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3103  high_aru = memb_list[i].aru;
3104  }
3105  }
3106  }
3107 
3108  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3109  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3110  &memb_list[i].ring_id,
3111  sizeof (struct memb_ring_id)) == 0) {
3112 
3113  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3114  memb_list[i].received_flg = 0;
3115  if (i == instance->commit_token->memb_index) {
3116  instance->my_received_flg = 0;
3117  }
3118  }
3119  }
3120  }
3121 
3122  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3123  instance->commit_token->memb_index += 1;
3124  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3125  assert (instance->commit_token->header.nodeid);
3126 }
3127 
3128 static void memb_state_commit_token_target_set (
3129  struct totemsrp_instance *instance)
3130 {
3131  struct srp_addr *addr;
3132 
3133  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3134 
3135  /* Totemnet just looks at the node id */
3137  instance->totemnet_context,
3138  addr[instance->commit_token->memb_index %
3139  instance->commit_token->addr_entries].nodeid);
3140 }
3141 
3142 static int memb_state_commit_token_send_recovery (
3143  struct totemsrp_instance *instance,
3144  struct memb_commit_token *commit_token)
3145 {
3146  unsigned int commit_token_size;
3147 
3148  commit_token->token_seq++;
3149  commit_token->header.nodeid = instance->my_id.nodeid;
3150  commit_token_size = sizeof (struct memb_commit_token) +
3151  ((sizeof (struct srp_addr) +
3152  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3153  /*
3154  * Make a copy for retransmission if necessary
3155  */
3156  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3157  instance->orf_token_retransmit_size = commit_token_size;
3158 
3159  instance->stats.memb_commit_token_tx++;
3160 
3162  commit_token,
3163  commit_token_size);
3164 
3165  /*
3166  * Request retransmission of the commit token in case it is lost
3167  */
3168  reset_token_retransmit_timeout (instance);
3169  return (0);
3170 }
3171 
3172 static int memb_state_commit_token_send (
3173  struct totemsrp_instance *instance)
3174 {
3175  unsigned int commit_token_size;
3176 
3177  instance->commit_token->token_seq++;
3178  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3179  commit_token_size = sizeof (struct memb_commit_token) +
3180  ((sizeof (struct srp_addr) +
3181  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3182  /*
3183  * Make a copy for retransmission if necessary
3184  */
3185  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3186  instance->orf_token_retransmit_size = commit_token_size;
3187 
3188  instance->stats.memb_commit_token_tx++;
3189 
3191  instance->commit_token,
3192  commit_token_size);
3193 
3194  /*
3195  * Request retransmission of the commit token in case it is lost
3196  */
3197  reset_token_retransmit_timeout (instance);
3198  return (0);
3199 }
3200 
3201 
3202 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3203 {
3204  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3205  int token_memb_entries = 0;
3206  int i;
3207  unsigned int lowest_nodeid;
3208 
3209  memb_set_subtract (token_memb, &token_memb_entries,
3210  instance->my_proc_list, instance->my_proc_list_entries,
3211  instance->my_failed_list, instance->my_failed_list_entries);
3212 
3213  /*
3214  * find representative by searching for smallest identifier
3215  */
3216  assert(token_memb_entries > 0);
3217 
3218  lowest_nodeid = token_memb[0].nodeid;
3219  for (i = 1; i < token_memb_entries; i++) {
3220  if (lowest_nodeid > token_memb[i].nodeid) {
3221  lowest_nodeid = token_memb[i].nodeid;
3222  }
3223  }
3224  return (lowest_nodeid == instance->my_id.nodeid);
3225 }
3226 
3227 static int srp_addr_compare (const void *a, const void *b)
3228 {
3229  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3230  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3231 
3232  if (srp_a->nodeid < srp_b->nodeid) {
3233  return -1;
3234  } else if (srp_a->nodeid > srp_b->nodeid) {
3235  return 1;
3236  } else {
3237  return 0;
3238  }
3239 }
3240 
3241 static void memb_state_commit_token_create (
3242  struct totemsrp_instance *instance)
3243 {
3244  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3245  struct srp_addr *addr;
3246  struct memb_commit_token_memb_entry *memb_list;
3247  int token_memb_entries = 0;
3248 
3250  "Creating commit token because I am the rep.");
3251 
3252  memb_set_subtract (token_memb, &token_memb_entries,
3253  instance->my_proc_list, instance->my_proc_list_entries,
3254  instance->my_failed_list, instance->my_failed_list_entries);
3255 
3256  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3257  instance->commit_token->header.magic = TOTEM_MH_MAGIC;
3260  instance->commit_token->header.encapsulated = 0;
3261  instance->commit_token->header.nodeid = instance->my_id.nodeid;
3262  assert (instance->commit_token->header.nodeid);
3263 
3264  instance->commit_token->ring_id.rep = instance->my_id.nodeid;
3265  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3266 
3267  /*
3268  * This qsort is necessary to ensure the commit token traverses
3269  * the ring in the proper order
3270  */
3271  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3272  srp_addr_compare);
3273 
3274  instance->commit_token->memb_index = 0;
3275  instance->commit_token->addr_entries = token_memb_entries;
3276 
3277  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3278  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3279 
3280  memcpy (addr, token_memb,
3281  token_memb_entries * sizeof (struct srp_addr));
3282  memset (memb_list, 0,
3283  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3284 }
3285 
3286 static void memb_join_message_send (struct totemsrp_instance *instance)
3287 {
3288  char memb_join_data[40000];
3289  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3290  char *addr;
3291  unsigned int addr_idx;
3292  size_t msg_len;
3293 
3294  memb_join->header.magic = TOTEM_MH_MAGIC;
3295  memb_join->header.version = TOTEM_MH_VERSION;
3296  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3297  memb_join->header.encapsulated = 0;
3298  memb_join->header.nodeid = instance->my_id.nodeid;
3299  assert (memb_join->header.nodeid);
3300 
3301  msg_len = sizeof(struct memb_join) +
3302  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3303 
3304  if (msg_len > sizeof(memb_join_data)) {
3305  log_printf (instance->totemsrp_log_level_error,
3306  "memb_join_message too long. Ignoring message.");
3307 
3308  return ;
3309  }
3310 
3311  memb_join->ring_seq = instance->my_ring_id.seq;
3312  memb_join->proc_list_entries = instance->my_proc_list_entries;
3313  memb_join->failed_list_entries = instance->my_failed_list_entries;
3314  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3315 
3316  /*
3317  * This mess adds the joined and failed processor lists into the join
3318  * message
3319  */
3320  addr = (char *)memb_join;
3321  addr_idx = sizeof (struct memb_join);
3322  memcpy (&addr[addr_idx],
3323  instance->my_proc_list,
3324  instance->my_proc_list_entries *
3325  sizeof (struct srp_addr));
3326  addr_idx +=
3327  instance->my_proc_list_entries *
3328  sizeof (struct srp_addr);
3329  memcpy (&addr[addr_idx],
3330  instance->my_failed_list,
3331  instance->my_failed_list_entries *
3332  sizeof (struct srp_addr));
3333  addr_idx +=
3334  instance->my_failed_list_entries *
3335  sizeof (struct srp_addr);
3336 
3337  if (instance->totem_config->send_join_timeout) {
3338  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3339  }
3340 
3341  instance->stats.memb_join_tx++;
3342 
3344  instance->totemnet_context,
3345  memb_join,
3346  addr_idx);
3347 }
3348 
3349 static void memb_leave_message_send (struct totemsrp_instance *instance)
3350 {
3351  char memb_join_data[40000];
3352  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3353  char *addr;
3354  unsigned int addr_idx;
3355  int active_memb_entries;
3356  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3357  size_t msg_len;
3358 
3360  "sending join/leave message");
3361 
3362  /*
3363  * add us to the failed list, and remove us from
3364  * the members list
3365  */
3366  memb_set_merge(
3367  &instance->my_id, 1,
3368  instance->my_failed_list, &instance->my_failed_list_entries);
3369 
3370  memb_set_subtract (active_memb, &active_memb_entries,
3371  instance->my_proc_list, instance->my_proc_list_entries,
3372  &instance->my_id, 1);
3373 
3374  msg_len = sizeof(struct memb_join) +
3375  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3376 
3377  if (msg_len > sizeof(memb_join_data)) {
3378  log_printf (instance->totemsrp_log_level_error,
3379  "memb_leave message too long. Ignoring message.");
3380 
3381  return ;
3382  }
3383 
3384  memb_join->header.magic = TOTEM_MH_MAGIC;
3385  memb_join->header.version = TOTEM_MH_VERSION;
3386  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3387  memb_join->header.encapsulated = 0;
3388  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3389 
3390  memb_join->ring_seq = instance->my_ring_id.seq;
3391  memb_join->proc_list_entries = active_memb_entries;
3392  memb_join->failed_list_entries = instance->my_failed_list_entries;
3393  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3394 
3395  // TODO: CC Maybe use the actual join send routine.
3396  /*
3397  * This mess adds the joined and failed processor lists into the join
3398  * message
3399  */
3400  addr = (char *)memb_join;
3401  addr_idx = sizeof (struct memb_join);
3402  memcpy (&addr[addr_idx],
3403  active_memb,
3404  active_memb_entries *
3405  sizeof (struct srp_addr));
3406  addr_idx +=
3407  active_memb_entries *
3408  sizeof (struct srp_addr);
3409  memcpy (&addr[addr_idx],
3410  instance->my_failed_list,
3411  instance->my_failed_list_entries *
3412  sizeof (struct srp_addr));
3413  addr_idx +=
3414  instance->my_failed_list_entries *
3415  sizeof (struct srp_addr);
3416 
3417 
3418  if (instance->totem_config->send_join_timeout) {
3419  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3420  }
3421  instance->stats.memb_join_tx++;
3422 
3424  instance->totemnet_context,
3425  memb_join,
3426  addr_idx);
3427 }
3428 
3429 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3430 {
3431  struct memb_merge_detect memb_merge_detect;
3432 
3433  memb_merge_detect.header.magic = TOTEM_MH_MAGIC;
3434  memb_merge_detect.header.version = TOTEM_MH_VERSION;
3435  memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
3436  memb_merge_detect.header.encapsulated = 0;
3437  memb_merge_detect.header.nodeid = instance->my_id.nodeid;
3438  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3439  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3440  sizeof (struct memb_ring_id));
3441  assert (memb_merge_detect.header.nodeid);
3442 
3443  instance->stats.memb_merge_detect_tx++;
3445  &memb_merge_detect,
3446  sizeof (struct memb_merge_detect));
3447 }
3448 
3449 static void memb_ring_id_set (
3450  struct totemsrp_instance *instance,
3451  const struct memb_ring_id *ring_id)
3452 {
3453 
3454  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3455 }
3456 
3458  void *srp_context,
3459  void **handle_out,
3460  enum totem_callback_token_type type,
3461  int delete,
3462  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3463  const void *data)
3464 {
3465  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3466  struct token_callback_instance *callback_handle;
3467 
3468  token_hold_cancel_send (instance);
3469 
3470  callback_handle = malloc (sizeof (struct token_callback_instance));
3471  if (callback_handle == 0) {
3472  return (-1);
3473  }
3474  *handle_out = (void *)callback_handle;
3475  qb_list_init (&callback_handle->list);
3476  callback_handle->callback_fn = callback_fn;
3477  callback_handle->data = (void *) data;
3478  callback_handle->callback_type = type;
3479  callback_handle->delete = delete;
3480  switch (type) {
3482  qb_list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3483  break;
3485  qb_list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3486  break;
3487  }
3488 
3489  return (0);
3490 }
3491 
3492 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3493 {
3494  struct token_callback_instance *h;
3495 
3496  if (*handle_out) {
3497  h = (struct token_callback_instance *)*handle_out;
3498  qb_list_del (&h->list);
3499  free (h);
3500  h = NULL;
3501  *handle_out = 0;
3502  }
3503 }
3504 
3505 static void token_callbacks_execute (
3506  struct totemsrp_instance *instance,
3507  enum totem_callback_token_type type)
3508 {
3509  struct qb_list_head *list, *tmp_iter;
3510  struct qb_list_head *callback_listhead = 0;
3512  int res;
3513  int del;
3514 
3515  switch (type) {
3517  callback_listhead = &instance->token_callback_received_listhead;
3518  break;
3520  callback_listhead = &instance->token_callback_sent_listhead;
3521  break;
3522  default:
3523  assert (0);
3524  }
3525 
3526  qb_list_for_each_safe(list, tmp_iter, callback_listhead) {
3527  token_callback_instance = qb_list_entry (list, struct token_callback_instance, list);
3528  del = token_callback_instance->delete;
3529  if (del == 1) {
3530  qb_list_del (list);
3531  }
3532 
3533  res = token_callback_instance->callback_fn (
3534  token_callback_instance->callback_type,
3535  token_callback_instance->data);
3536  /*
3537  * This callback failed to execute, try it again on the next token
3538  */
3539  if (res == -1 && del == 1) {
3540  qb_list_add (list, callback_listhead);
3541  } else if (del) {
3542  free (token_callback_instance);
3543  }
3544  }
3545 }
3546 
3547 /*
3548  * Flow control functions
3549  */
3550 static unsigned int backlog_get (struct totemsrp_instance *instance)
3551 {
3552  unsigned int backlog = 0;
3553  struct cs_queue *queue_use = NULL;
3554 
3555  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3556  if (instance->waiting_trans_ack) {
3557  queue_use = &instance->new_message_queue_trans;
3558  } else {
3559  queue_use = &instance->new_message_queue;
3560  }
3561  } else
3562  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3563  queue_use = &instance->retrans_message_queue;
3564  }
3565 
3566  if (queue_use != NULL) {
3567  backlog = cs_queue_used (queue_use);
3568  }
3569 
3570  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3571  return (backlog);
3572 }
3573 
3574 static int fcc_calculate (
3575  struct totemsrp_instance *instance,
3576  struct orf_token *token)
3577 {
3578  unsigned int transmits_allowed;
3579  unsigned int backlog_calc;
3580 
3581  transmits_allowed = instance->totem_config->max_messages;
3582 
3583  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3584  transmits_allowed = instance->totem_config->window_size - token->fcc;
3585  }
3586 
3587  instance->my_cbl = backlog_get (instance);
3588 
3589  /*
3590  * Only do backlog calculation if there is a backlog otherwise
3591  * we would result in div by zero
3592  */
3593  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3594  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3595  (token->backlog + instance->my_cbl - instance->my_pbl);
3596  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3597  transmits_allowed = backlog_calc;
3598  }
3599  }
3600 
3601  return (transmits_allowed);
3602 }
3603 
3604 /*
3605  * don't overflow the RTR sort queue
3606  */
3607 static void fcc_rtr_limit (
3608  struct totemsrp_instance *instance,
3609  struct orf_token *token,
3610  unsigned int *transmits_allowed)
3611 {
3612  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3613  check -= (*transmits_allowed + instance->totem_config->window_size);
3614  assert (check >= 0);
3615  if (sq_lt_compare (instance->last_released +
3616  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3617  instance->totem_config->window_size,
3618 
3619  token->seq)) {
3620 
3621  *transmits_allowed = 0;
3622  }
3623 }
3624 
3625 static void fcc_token_update (
3626  struct totemsrp_instance *instance,
3627  struct orf_token *token,
3628  unsigned int msgs_transmitted)
3629 {
3630  token->fcc += msgs_transmitted - instance->my_trc;
3631  token->backlog += instance->my_cbl - instance->my_pbl;
3632  instance->my_trc = msgs_transmitted;
3633  instance->my_pbl = instance->my_cbl;
3634 }
3635 
3636 /*
3637  * Sanity checkers
3638  */
3639 static int check_orf_token_sanity(
3640  const struct totemsrp_instance *instance,
3641  const void *msg,
3642  size_t msg_len,
3643  int endian_conversion_needed)
3644 {
3645  int rtr_entries;
3646  const struct orf_token *token = (const struct orf_token *)msg;
3647  size_t required_len;
3648 
3649  if (msg_len < sizeof(struct orf_token)) {
3651  "Received orf_token message is too short... ignoring.");
3652 
3653  return (-1);
3654  }
3655 
3656  if (endian_conversion_needed) {
3657  rtr_entries = swab32(token->rtr_list_entries);
3658  } else {
3659  rtr_entries = token->rtr_list_entries;
3660  }
3661 
3662  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3663  if (msg_len < required_len) {
3665  "Received orf_token message is too short... ignoring.");
3666 
3667  return (-1);
3668  }
3669 
3670  return (0);
3671 }
3672 
3673 static int check_mcast_sanity(
3674  struct totemsrp_instance *instance,
3675  const void *msg,
3676  size_t msg_len,
3677  int endian_conversion_needed)
3678 {
3679 
3680  if (msg_len < sizeof(struct mcast)) {
3682  "Received mcast message is too short... ignoring.");
3683 
3684  return (-1);
3685  }
3686 
3687  return (0);
3688 }
3689 
3690 static int check_memb_merge_detect_sanity(
3691  struct totemsrp_instance *instance,
3692  const void *msg,
3693  size_t msg_len,
3694  int endian_conversion_needed)
3695 {
3696 
3697  if (msg_len < sizeof(struct memb_merge_detect)) {
3699  "Received memb_merge_detect message is too short... ignoring.");
3700 
3701  return (-1);
3702  }
3703 
3704  return (0);
3705 }
3706 
3707 static int check_memb_join_sanity(
3708  struct totemsrp_instance *instance,
3709  const void *msg,
3710  size_t msg_len,
3711  int endian_conversion_needed)
3712 {
3713  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3714  unsigned int proc_list_entries;
3715  unsigned int failed_list_entries;
3716  size_t required_len;
3717 
3718  if (msg_len < sizeof(struct memb_join)) {
3720  "Received memb_join message is too short... ignoring.");
3721 
3722  return (-1);
3723  }
3724 
3725  proc_list_entries = mj_msg->proc_list_entries;
3726  failed_list_entries = mj_msg->failed_list_entries;
3727 
3728  if (endian_conversion_needed) {
3729  proc_list_entries = swab32(proc_list_entries);
3730  failed_list_entries = swab32(failed_list_entries);
3731  }
3732 
3733  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3734  if (msg_len < required_len) {
3736  "Received memb_join message is too short... ignoring.");
3737 
3738  return (-1);
3739  }
3740 
3741  return (0);
3742 }
3743 
3744 static int check_memb_commit_token_sanity(
3745  struct totemsrp_instance *instance,
3746  const void *msg,
3747  size_t msg_len,
3748  int endian_conversion_needed)
3749 {
3750  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3751  unsigned int addr_entries;
3752  size_t required_len;
3753 
3754  if (msg_len < sizeof(struct memb_commit_token)) {
3756  "Received memb_commit_token message is too short... ignoring.");
3757 
3758  return (0);
3759  }
3760 
3761  addr_entries= mct_msg->addr_entries;
3762  if (endian_conversion_needed) {
3763  addr_entries = swab32(addr_entries);
3764  }
3765 
3766  required_len = sizeof(struct memb_commit_token) +
3767  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3768  if (msg_len < required_len) {
3770  "Received memb_commit_token message is too short... ignoring.");
3771 
3772  return (-1);
3773  }
3774 
3775  return (0);
3776 }
3777 
3778 static int check_token_hold_cancel_sanity(
3779  struct totemsrp_instance *instance,
3780  const void *msg,
3781  size_t msg_len,
3782  int endian_conversion_needed)
3783 {
3784 
3785  if (msg_len < sizeof(struct token_hold_cancel)) {
3787  "Received token_hold_cancel message is too short... ignoring.");
3788 
3789  return (-1);
3790  }
3791 
3792  return (0);
3793 }
3794 
3795 /*
3796  * Message Handlers
3797  */
3798 
3799 unsigned long long int tv_old;
3800 /*
3801  * message handler called when TOKEN message type received
3802  */
3803 static int message_handler_orf_token (
3804  struct totemsrp_instance *instance,
3805  const void *msg,
3806  size_t msg_len,
3807  int endian_conversion_needed)
3808 {
3809  char token_storage[1500];
3810  char token_convert[1500];
3811  struct orf_token *token = NULL;
3812  int forward_token;
3813  unsigned int transmits_allowed;
3814  unsigned int mcasted_retransmit;
3815  unsigned int mcasted_regular;
3816  unsigned int last_aru;
3817 
3818 #ifdef GIVEINFO
3819  unsigned long long tv_current;
3820  unsigned long long tv_diff;
3821 
3822  tv_current = qb_util_nano_current_get ();
3823  tv_diff = tv_current - tv_old;
3824  tv_old = tv_current;
3825 
3827  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3828 #endif
3829 
3830  if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3831  return (0);
3832  }
3833 
3834  if (instance->orf_token_discard) {
3835  return (0);
3836  }
3837 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3838  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3839  return (0);
3840  }
3841 #endif
3842 
3843  if (endian_conversion_needed) {
3844  orf_token_endian_convert ((struct orf_token *)msg,
3845  (struct orf_token *)token_convert);
3846  msg = (struct orf_token *)token_convert;
3847  }
3848 
3849  /*
3850  * Make copy of token and retransmit list in case we have
3851  * to flush incoming messages from the kernel queue
3852  */
3853  token = (struct orf_token *)token_storage;
3854  memcpy (token, msg, sizeof (struct orf_token));
3855  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3856  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3857 
3858 
3859  /*
3860  * Handle merge detection timeout
3861  */
3862  if (token->seq == instance->my_last_seq) {
3863  start_merge_detect_timeout (instance);
3864  instance->my_seq_unchanged += 1;
3865  } else {
3866  cancel_merge_detect_timeout (instance);
3867  cancel_token_hold_retransmit_timeout (instance);
3868  instance->my_seq_unchanged = 0;
3869  }
3870 
3871  instance->my_last_seq = token->seq;
3872 
3873 #ifdef TEST_RECOVERY_MSG_COUNT
3874  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
3875  return (0);
3876  }
3877 #endif
3878  instance->flushing = 1;
3880  instance->flushing = 0;
3881 
3882  /*
3883  * Determine if we should hold (in reality drop) the token
3884  */
3885  instance->my_token_held = 0;
3886  if (instance->my_ring_id.rep == instance->my_id.nodeid &&
3887  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
3888  instance->my_token_held = 1;
3889  } else {
3890  if (instance->my_ring_id.rep != instance->my_id.nodeid &&
3891  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
3892  instance->my_token_held = 1;
3893  }
3894  }
3895 
3896  /*
3897  * Hold onto token when there is no activity on ring and
3898  * this processor is the ring rep
3899  */
3900  forward_token = 1;
3901  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
3902  if (instance->my_token_held) {
3903  forward_token = 0;
3904  }
3905  }
3906 
3907  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
3908 
3909  switch (instance->memb_state) {
3910  case MEMB_STATE_COMMIT:
3911  /* Discard token */
3912  break;
3913 
3915  messages_free (instance, token->aru);
3916  /*
3917  * Do NOT add break, this case should also execute code in gather case.
3918  */
3919 
3920  case MEMB_STATE_GATHER:
3921  /*
3922  * DO NOT add break, we use different free mechanism in recovery state
3923  */
3924 
3925  case MEMB_STATE_RECOVERY:
3926  /*
3927  * Discard tokens from another configuration
3928  */
3929  if (memcmp (&token->ring_id, &instance->my_ring_id,
3930  sizeof (struct memb_ring_id)) != 0) {
3931 
3932  if ((forward_token)
3933  && instance->use_heartbeat) {
3934  reset_heartbeat_timeout(instance);
3935  }
3936  else {
3937  cancel_heartbeat_timeout(instance);
3938  }
3939 
3940  return (0); /* discard token */
3941  }
3942 
3943  /*
3944  * Discard retransmitted tokens
3945  */
3946  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
3947  return (0); /* discard token */
3948  }
3949  last_aru = instance->my_last_aru;
3950  instance->my_last_aru = token->aru;
3951 
3952  transmits_allowed = fcc_calculate (instance, token);
3953  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
3954 
3955  if (instance->my_token_held == 1 &&
3956  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
3957  instance->my_token_held = 0;
3958  forward_token = 1;
3959  }
3960 
3961  fcc_rtr_limit (instance, token, &transmits_allowed);
3962  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
3963 /*
3964 if (mcasted_regular) {
3965 printf ("mcasted regular %d\n", mcasted_regular);
3966 printf ("token seq %d\n", token->seq);
3967 }
3968 */
3969  fcc_token_update (instance, token, mcasted_retransmit +
3970  mcasted_regular);
3971 
3972  if (sq_lt_compare (instance->my_aru, token->aru) ||
3973  instance->my_id.nodeid == token->aru_addr ||
3974  token->aru_addr == 0) {
3975 
3976  token->aru = instance->my_aru;
3977  if (token->aru == token->seq) {
3978  token->aru_addr = 0;
3979  } else {
3980  token->aru_addr = instance->my_id.nodeid;
3981  }
3982  }
3983  if (token->aru == last_aru && token->aru_addr != 0) {
3984  instance->my_aru_count += 1;
3985  } else {
3986  instance->my_aru_count = 0;
3987  }
3988 
3989  /*
3990  * We really don't follow specification there. In specification, OTHER nodes
3991  * detect failure of one node (based on aru_count) and my_id IS NEVER added
3992  * to failed list (so node never mark itself as failed)
3993  */
3994  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
3995  token->aru_addr == instance->my_id.nodeid) {
3996 
3998  "FAILED TO RECEIVE");
3999 
4000  instance->failed_to_recv = 1;
4001 
4002  memb_set_merge (&instance->my_id, 1,
4003  instance->my_failed_list,
4004  &instance->my_failed_list_entries);
4005 
4006  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4007  } else {
4008  instance->my_token_seq = token->token_seq;
4009  token->token_seq += 1;
4010 
4011  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4012  /*
4013  * instance->my_aru == instance->my_high_seq_received means this processor
4014  * has recovered all messages it can recover
4015  * (ie: its retrans queue is empty)
4016  */
4017  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4018 
4019  if (token->retrans_flg == 0) {
4020  token->retrans_flg = 1;
4021  instance->my_set_retrans_flg = 1;
4022  }
4023  } else
4024  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4025  token->retrans_flg = 0;
4026  instance->my_set_retrans_flg = 0;
4027  }
4029  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4030  token->retrans_flg, instance->my_set_retrans_flg,
4031  cs_queue_is_empty (&instance->retrans_message_queue),
4032  instance->my_retrans_flg_count, token->aru);
4033  if (token->retrans_flg == 0) {
4034  instance->my_retrans_flg_count += 1;
4035  } else {
4036  instance->my_retrans_flg_count = 0;
4037  }
4038  if (instance->my_retrans_flg_count == 2) {
4039  instance->my_install_seq = token->seq;
4040  }
4042  "install seq %x aru %x high seq received %x",
4043  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4044  if (instance->my_retrans_flg_count >= 2 &&
4045  instance->my_received_flg == 0 &&
4046  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4047  instance->my_received_flg = 1;
4048  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4049  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4050  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4051  }
4052  if (instance->my_retrans_flg_count >= 3 &&
4053  sq_lte_compare (instance->my_install_seq, token->aru)) {
4054  instance->my_rotation_counter += 1;
4055  } else {
4056  instance->my_rotation_counter = 0;
4057  }
4058  if (instance->my_rotation_counter == 2) {
4060  "retrans flag count %x token aru %x install seq %x aru %x %x",
4061  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4062  instance->my_aru, token->seq);
4063 
4064  memb_state_operational_enter (instance);
4065  instance->my_rotation_counter = 0;
4066  instance->my_retrans_flg_count = 0;
4067  }
4068  }
4069 
4071  token_send (instance, token, forward_token);
4072 
4073 #ifdef GIVEINFO
4074  tv_current = qb_util_nano_current_get ();
4075  tv_diff = tv_current - tv_old;
4076  tv_old = tv_current;
4078  "I held %0.4f ms",
4079  ((float)tv_diff) / 1000000.0);
4080 #endif
4081  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4082  messages_deliver_to_app (instance, 0,
4083  instance->my_high_seq_received);
4084  }
4085 
4086  /*
4087  * Deliver messages after token has been transmitted
4088  * to improve performance
4089  */
4090  reset_token_timeout (instance); // REVIEWED
4091  reset_token_retransmit_timeout (instance); // REVIEWED
4092  if (instance->my_id.nodeid == instance->my_ring_id.rep &&
4093  instance->my_token_held == 1) {
4094 
4095  start_token_hold_retransmit_timeout (instance);
4096  }
4097 
4098  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4099  }
4100  break;
4101  }
4102 
4103  if ((forward_token)
4104  && instance->use_heartbeat) {
4105  reset_heartbeat_timeout(instance);
4106  }
4107  else {
4108  cancel_heartbeat_timeout(instance);
4109  }
4110 
4111  return (0);
4112 }
4113 
4114 static void messages_deliver_to_app (
4115  struct totemsrp_instance *instance,
4116  int skip,
4117  unsigned int end_point)
4118 {
4119  struct sort_queue_item *sort_queue_item_p;
4120  unsigned int i;
4121  int res;
4122  struct mcast *mcast_in;
4123  struct mcast mcast_header;
4124  unsigned int range = 0;
4125  int endian_conversion_required;
4126  unsigned int my_high_delivered_stored = 0;
4127 
4128 
4129  range = end_point - instance->my_high_delivered;
4130 
4131  if (range) {
4133  "Delivering %x to %x", instance->my_high_delivered,
4134  end_point);
4135  }
4136  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4137  my_high_delivered_stored = instance->my_high_delivered;
4138 
4139  /*
4140  * Deliver messages in order from rtr queue to pending delivery queue
4141  */
4142  for (i = 1; i <= range; i++) {
4143 
4144  void *ptr = 0;
4145 
4146  /*
4147  * If out of range of sort queue, stop assembly
4148  */
4149  res = sq_in_range (&instance->regular_sort_queue,
4150  my_high_delivered_stored + i);
4151  if (res == 0) {
4152  break;
4153  }
4154 
4155  res = sq_item_get (&instance->regular_sort_queue,
4156  my_high_delivered_stored + i, &ptr);
4157  /*
4158  * If hole, stop assembly
4159  */
4160  if (res != 0 && skip == 0) {
4161  break;
4162  }
4163 
4164  instance->my_high_delivered = my_high_delivered_stored + i;
4165 
4166  if (res != 0) {
4167  continue;
4168 
4169  }
4170 
4171  sort_queue_item_p = ptr;
4172 
4173  mcast_in = sort_queue_item_p->mcast;
4174  assert (mcast_in != (struct mcast *)0xdeadbeef);
4175 
4176  endian_conversion_required = 0;
4177  if (mcast_in->header.magic != TOTEM_MH_MAGIC) {
4178  endian_conversion_required = 1;
4179  mcast_endian_convert (mcast_in, &mcast_header);
4180  } else {
4181  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4182  }
4183 
4184  /*
4185  * Skip messages not originated in instance->my_deliver_memb
4186  */
4187  if (skip &&
4188  memb_set_subset (&mcast_header.system_from,
4189  1,
4190  instance->my_deliver_memb_list,
4191  instance->my_deliver_memb_entries) == 0) {
4192 
4193  instance->my_high_delivered = my_high_delivered_stored + i;
4194 
4195  continue;
4196  }
4197 
4198  /*
4199  * Message found
4200  */
4202  "Delivering MCAST message with seq %x to pending delivery queue",
4203  mcast_header.seq);
4204 
4205  /*
4206  * Message is locally originated multicast
4207  */
4208  instance->totemsrp_deliver_fn (
4209  mcast_header.header.nodeid,
4210  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4211  sort_queue_item_p->msg_len - sizeof (struct mcast),
4212  endian_conversion_required);
4213  }
4214 }
4215 
4216 /*
4217  * recv message handler called when MCAST message type received
4218  */
4219 static int message_handler_mcast (
4220  struct totemsrp_instance *instance,
4221  const void *msg,
4222  size_t msg_len,
4223  int endian_conversion_needed)
4224 {
4225  struct sort_queue_item sort_queue_item;
4226  struct sq *sort_queue;
4227  struct mcast mcast_header;
4228 
4229  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4230  return (0);
4231  }
4232 
4233  if (endian_conversion_needed) {
4234  mcast_endian_convert (msg, &mcast_header);
4235  } else {
4236  memcpy (&mcast_header, msg, sizeof (struct mcast));
4237  }
4238 
4239  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4240  sort_queue = &instance->recovery_sort_queue;
4241  } else {
4242  sort_queue = &instance->regular_sort_queue;
4243  }
4244 
4245  assert (msg_len <= FRAME_SIZE_MAX);
4246 
4247 #ifdef TEST_DROP_MCAST_PERCENTAGE
4248  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4249  return (0);
4250  }
4251 #endif
4252 
4253  /*
4254  * If the message is foreign execute the switch below
4255  */
4256  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4257  sizeof (struct memb_ring_id)) != 0) {
4258 
4259  switch (instance->memb_state) {
4261  memb_set_merge (
4262  &mcast_header.system_from, 1,
4263  instance->my_proc_list, &instance->my_proc_list_entries);
4264  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4265  break;
4266 
4267  case MEMB_STATE_GATHER:
4268  if (!memb_set_subset (
4269  &mcast_header.system_from,
4270  1,
4271  instance->my_proc_list,
4272  instance->my_proc_list_entries)) {
4273 
4274  memb_set_merge (&mcast_header.system_from, 1,
4275  instance->my_proc_list, &instance->my_proc_list_entries);
4276  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4277  return (0);
4278  }
4279  break;
4280 
4281  case MEMB_STATE_COMMIT:
4282  /* discard message */
4283  instance->stats.rx_msg_dropped++;
4284  break;
4285 
4286  case MEMB_STATE_RECOVERY:
4287  /* discard message */
4288  instance->stats.rx_msg_dropped++;
4289  break;
4290  }
4291  return (0);
4292  }
4293 
4295  "Received ringid(%u:%lld) seq %x",
4296  mcast_header.ring_id.rep,
4297  mcast_header.ring_id.seq,
4298  mcast_header.seq);
4299 
4300  /*
4301  * Add mcast message to rtr queue if not already in rtr queue
4302  * otherwise free io vectors
4303  */
4304  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4305  sq_in_range (sort_queue, mcast_header.seq) &&
4306  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4307 
4308  /*
4309  * Allocate new multicast memory block
4310  */
4311 // TODO LEAK
4312  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4313  if (sort_queue_item.mcast == NULL) {
4314  return (-1); /* error here is corrected by the algorithm */
4315  }
4316  memcpy (sort_queue_item.mcast, msg, msg_len);
4317  sort_queue_item.msg_len = msg_len;
4318 
4319  if (sq_lt_compare (instance->my_high_seq_received,
4320  mcast_header.seq)) {
4321  instance->my_high_seq_received = mcast_header.seq;
4322  }
4323 
4324  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4325  }
4326 
4327  update_aru (instance);
4328  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4329  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4330  }
4331 
4332 /* TODO remove from retrans message queue for old ring in recovery state */
4333  return (0);
4334 }
4335 
4336 static int message_handler_memb_merge_detect (
4337  struct totemsrp_instance *instance,
4338  const void *msg,
4339  size_t msg_len,
4340  int endian_conversion_needed)
4341 {
4342  struct memb_merge_detect memb_merge_detect;
4343 
4344  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4345  return (0);
4346  }
4347 
4348  if (endian_conversion_needed) {
4349  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4350  } else {
4351  memcpy (&memb_merge_detect, msg,
4352  sizeof (struct memb_merge_detect));
4353  }
4354 
4355  /*
4356  * do nothing if this is a merge detect from this configuration
4357  */
4358  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4359  sizeof (struct memb_ring_id)) == 0) {
4360 
4361  return (0);
4362  }
4363 
4364  /*
4365  * Execute merge operation
4366  */
4367  switch (instance->memb_state) {
4369  memb_set_merge (&memb_merge_detect.system_from, 1,
4370  instance->my_proc_list, &instance->my_proc_list_entries);
4371  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4372  break;
4373 
4374  case MEMB_STATE_GATHER:
4375  if (!memb_set_subset (
4376  &memb_merge_detect.system_from,
4377  1,
4378  instance->my_proc_list,
4379  instance->my_proc_list_entries)) {
4380 
4381  memb_set_merge (&memb_merge_detect.system_from, 1,
4382  instance->my_proc_list, &instance->my_proc_list_entries);
4383  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4384  return (0);
4385  }
4386  break;
4387 
4388  case MEMB_STATE_COMMIT:
4389  /* do nothing in commit */
4390  break;
4391 
4392  case MEMB_STATE_RECOVERY:
4393  /* do nothing in recovery */
4394  break;
4395  }
4396  return (0);
4397 }
4398 
4399 static void memb_join_process (
4400  struct totemsrp_instance *instance,
4401  const struct memb_join *memb_join)
4402 {
4403  struct srp_addr *proc_list;
4404  struct srp_addr *failed_list;
4405  int gather_entered = 0;
4406  int fail_minus_memb_entries = 0;
4407  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4408 
4409  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4410  failed_list = proc_list + memb_join->proc_list_entries;
4411 
4412  log_printf(instance->totemsrp_log_level_trace, "memb_join_process");
4413  memb_set_log(instance, instance->totemsrp_log_level_trace,
4414  "proclist", proc_list, memb_join->proc_list_entries);
4415  memb_set_log(instance, instance->totemsrp_log_level_trace,
4416  "faillist", failed_list, memb_join->failed_list_entries);
4417  memb_set_log(instance, instance->totemsrp_log_level_trace,
4418  "my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4419  memb_set_log(instance, instance->totemsrp_log_level_trace,
4420  "my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4421 
4422  if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4423  if (instance->flushing) {
4424  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4426  "Discarding LEAVE message during flush, nodeid=%u",
4427  memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4428  if (memb_join->failed_list_entries > 0) {
4429  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4430  }
4431  } else {
4433  "Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4434  }
4435  return;
4436  } else {
4437  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4439  "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].nodeid : LEAVE_DUMMY_NODEID);
4440  if (memb_join->failed_list_entries > 0) {
4441  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].nodeid);
4442  }
4443  }
4444  }
4445 
4446  }
4447 
4448  if (memb_set_equal (proc_list,
4449  memb_join->proc_list_entries,
4450  instance->my_proc_list,
4451  instance->my_proc_list_entries) &&
4452 
4453  memb_set_equal (failed_list,
4454  memb_join->failed_list_entries,
4455  instance->my_failed_list,
4456  instance->my_failed_list_entries)) {
4457 
4458  if (memb_join->header.nodeid != LEAVE_DUMMY_NODEID) {
4459  memb_consensus_set (instance, &memb_join->system_from);
4460  }
4461 
4462  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4463  instance->failed_to_recv = 0;
4464  srp_addr_copy (&instance->my_proc_list[0],
4465  &instance->my_id);
4466  instance->my_proc_list_entries = 1;
4467  instance->my_failed_list_entries = 0;
4468 
4469  memb_state_commit_token_create (instance);
4470 
4471  memb_state_commit_enter (instance);
4472  return;
4473  }
4474  if (memb_consensus_agreed (instance) &&
4475  memb_lowest_in_config (instance)) {
4476 
4477  memb_state_commit_token_create (instance);
4478 
4479  memb_state_commit_enter (instance);
4480  } else {
4481  goto out;
4482  }
4483  } else
4484  if (memb_set_subset (proc_list,
4485  memb_join->proc_list_entries,
4486  instance->my_proc_list,
4487  instance->my_proc_list_entries) &&
4488 
4489  memb_set_subset (failed_list,
4490  memb_join->failed_list_entries,
4491  instance->my_failed_list,
4492  instance->my_failed_list_entries)) {
4493 
4494  goto out;
4495  } else
4496  if (memb_set_subset (&memb_join->system_from, 1,
4497  instance->my_failed_list, instance->my_failed_list_entries)) {
4498 
4499  goto out;
4500  } else {
4501  memb_set_merge (proc_list,
4502  memb_join->proc_list_entries,
4503  instance->my_proc_list, &instance->my_proc_list_entries);
4504 
4505  if (memb_set_subset (
4506  &instance->my_id, 1,
4507  failed_list, memb_join->failed_list_entries)) {
4508 
4509  memb_set_merge (
4510  &memb_join->system_from, 1,
4511  instance->my_failed_list, &instance->my_failed_list_entries);
4512  } else {
4513  if (memb_set_subset (
4514  &memb_join->system_from, 1,
4515  instance->my_memb_list,
4516  instance->my_memb_entries)) {
4517 
4518  if (memb_set_subset (
4519  &memb_join->system_from, 1,
4520  instance->my_failed_list,
4521  instance->my_failed_list_entries) == 0) {
4522 
4523  memb_set_merge (failed_list,
4524  memb_join->failed_list_entries,
4525  instance->my_failed_list, &instance->my_failed_list_entries);
4526  } else {
4527  memb_set_subtract (fail_minus_memb,
4528  &fail_minus_memb_entries,
4529  failed_list,
4530  memb_join->failed_list_entries,
4531  instance->my_memb_list,
4532  instance->my_memb_entries);
4533 
4534  memb_set_merge (fail_minus_memb,
4535  fail_minus_memb_entries,
4536  instance->my_failed_list,
4537  &instance->my_failed_list_entries);
4538  }
4539  }
4540  }
4541  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4542  gather_entered = 1;
4543  }
4544 
4545 out:
4546  if (gather_entered == 0 &&
4547  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4548 
4549  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4550  }
4551 }
4552 
4553 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4554 {
4555  int i;
4556  struct srp_addr *in_proc_list;
4557  struct srp_addr *in_failed_list;
4558  struct srp_addr *out_proc_list;
4559  struct srp_addr *out_failed_list;
4560 
4561  out->header.magic = TOTEM_MH_MAGIC;
4563  out->header.type = in->header.type;
4564  out->header.nodeid = swab32 (in->header.nodeid);
4565  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4568  out->ring_seq = swab64 (in->ring_seq);
4569 
4570  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4571  in_failed_list = in_proc_list + out->proc_list_entries;
4572  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4573  out_failed_list = out_proc_list + out->proc_list_entries;
4574 
4575  for (i = 0; i < out->proc_list_entries; i++) {
4576  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4577  }
4578  for (i = 0; i < out->failed_list_entries; i++) {
4579  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4580  }
4581 }
4582 
4583 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4584 {
4585  int i;
4586  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4587  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4588  struct memb_commit_token_memb_entry *in_memb_list;
4589  struct memb_commit_token_memb_entry *out_memb_list;
4590 
4591  out->header.magic = TOTEM_MH_MAGIC;
4593  out->header.type = in->header.type;
4594  out->header.nodeid = swab32 (in->header.nodeid);
4595  out->token_seq = swab32 (in->token_seq);
4596  out->ring_id.rep = swab32(in->ring_id.rep);
4597  out->ring_id.seq = swab64 (in->ring_id.seq);
4598  out->retrans_flg = swab32 (in->retrans_flg);
4599  out->memb_index = swab32 (in->memb_index);
4600  out->addr_entries = swab32 (in->addr_entries);
4601 
4602  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4603  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4604  for (i = 0; i < out->addr_entries; i++) {
4605  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4606 
4607  /*
4608  * Only convert the memb entry if it has been set
4609  */
4610  if (in_memb_list[i].ring_id.rep != 0) {
4611  out_memb_list[i].ring_id.rep = swab32(in_memb_list[i].ring_id.rep);
4612 
4613  out_memb_list[i].ring_id.seq =
4614  swab64 (in_memb_list[i].ring_id.seq);
4615  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4616  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4617  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4618  }
4619  }
4620 }
4621 
4622 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4623 {
4624  int i;
4625 
4626  out->header.magic = TOTEM_MH_MAGIC;
4628  out->header.type = in->header.type;
4629  out->header.nodeid = swab32 (in->header.nodeid);
4630  out->seq = swab32 (in->seq);
4631  out->token_seq = swab32 (in->token_seq);
4632  out->aru = swab32 (in->aru);
4633  out->ring_id.rep = swab32(in->ring_id.rep);
4634  out->aru_addr = swab32(in->aru_addr);
4635  out->ring_id.seq = swab64 (in->ring_id.seq);
4636  out->fcc = swab32 (in->fcc);
4637  out->backlog = swab32 (in->backlog);
4638  out->retrans_flg = swab32 (in->retrans_flg);
4640  for (i = 0; i < out->rtr_list_entries; i++) {
4641  out->rtr_list[i].ring_id.rep = swab32(in->rtr_list[i].ring_id.rep);
4642  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4643  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4644  }
4645 }
4646 
4647 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4648 {
4649  out->header.magic = TOTEM_MH_MAGIC;
4651  out->header.type = in->header.type;
4652  out->header.nodeid = swab32 (in->header.nodeid);
4654 
4655  out->seq = swab32 (in->seq);
4656  out->this_seqno = swab32 (in->this_seqno);
4657  out->ring_id.rep = swab32(in->ring_id.rep);
4658  out->ring_id.seq = swab64 (in->ring_id.seq);
4659  out->node_id = swab32 (in->node_id);
4660  out->guarantee = swab32 (in->guarantee);
4661  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4662 }
4663 
4664 static void memb_merge_detect_endian_convert (
4665  const struct memb_merge_detect *in,
4666  struct memb_merge_detect *out)
4667 {
4668  out->header.magic = TOTEM_MH_MAGIC;
4670  out->header.type = in->header.type;
4671  out->header.nodeid = swab32 (in->header.nodeid);
4672  out->ring_id.rep = swab32(in->ring_id.rep);
4673  out->ring_id.seq = swab64 (in->ring_id.seq);
4674  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4675 }
4676 
4677 static int ignore_join_under_operational (
4678  struct totemsrp_instance *instance,
4679  const struct memb_join *memb_join)
4680 {
4681  struct srp_addr *proc_list;
4682  struct srp_addr *failed_list;
4683  unsigned long long ring_seq;
4684 
4685  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4686  failed_list = proc_list + memb_join->proc_list_entries;
4687  ring_seq = memb_join->ring_seq;
4688 
4689  if (memb_set_subset (&instance->my_id, 1,
4690  failed_list, memb_join->failed_list_entries)) {
4691  return (1);
4692  }
4693 
4694  /*
4695  * In operational state, my_proc_list is exactly the same as
4696  * my_memb_list.
4697  */
4698  if ((memb_set_subset (&memb_join->system_from, 1,
4699  instance->my_memb_list, instance->my_memb_entries)) &&
4700  (ring_seq < instance->my_ring_id.seq)) {
4701  return (1);
4702  }
4703 
4704  return (0);
4705 }
4706 
4707 static int message_handler_memb_join (
4708  struct totemsrp_instance *instance,
4709  const void *msg,
4710  size_t msg_len,
4711  int endian_conversion_needed)
4712 {
4713  const struct memb_join *memb_join;
4714  struct memb_join *memb_join_convert = alloca (msg_len);
4715 
4716  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4717  return (0);
4718  }
4719 
4720  if (endian_conversion_needed) {
4721  memb_join = memb_join_convert;
4722  memb_join_endian_convert (msg, memb_join_convert);
4723 
4724  } else {
4725  memb_join = msg;
4726  }
4727  /*
4728  * If the process paused because it wasn't scheduled in a timely
4729  * fashion, flush the join messages because they may be queued
4730  * entries
4731  */
4732  if (pause_flush (instance)) {
4733  return (0);
4734  }
4735 
4736  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4737  instance->token_ring_id_seq = memb_join->ring_seq;
4738  }
4739  switch (instance->memb_state) {
4741  if (!ignore_join_under_operational (instance, memb_join)) {
4742  memb_join_process (instance, memb_join);
4743  }
4744  break;
4745 
4746  case MEMB_STATE_GATHER:
4747  memb_join_process (instance, memb_join);
4748  break;
4749 
4750  case MEMB_STATE_COMMIT:
4751  if (memb_set_subset (&memb_join->system_from,
4752  1,
4753  instance->my_new_memb_list,
4754  instance->my_new_memb_entries) &&
4755 
4756  memb_join->ring_seq >= instance->my_ring_id.seq) {
4757 
4758  memb_join_process (instance, memb_join);
4759  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4760  }
4761  break;
4762 
4763  case MEMB_STATE_RECOVERY:
4764  if (memb_set_subset (&memb_join->system_from,
4765  1,
4766  instance->my_new_memb_list,
4767  instance->my_new_memb_entries) &&
4768 
4769  memb_join->ring_seq >= instance->my_ring_id.seq) {
4770 
4771  memb_join_process (instance, memb_join);
4772  memb_recovery_state_token_loss (instance);
4773  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4774  }
4775  break;
4776  }
4777  return (0);
4778 }
4779 
4780 static int message_handler_memb_commit_token (
4781  struct totemsrp_instance *instance,
4782  const void *msg,
4783  size_t msg_len,
4784  int endian_conversion_needed)
4785 {
4786  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4787  struct memb_commit_token *memb_commit_token;
4788  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4789  int sub_entries;
4790 
4791  struct srp_addr *addr;
4792 
4794  "got commit token");
4795 
4796  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4797  return (0);
4798  }
4799 
4800  if (endian_conversion_needed) {
4801  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4802  } else {
4803  memcpy (memb_commit_token_convert, msg, msg_len);
4804  }
4805  memb_commit_token = memb_commit_token_convert;
4806  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4807 
4808 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4809  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4810  return (0);
4811  }
4812 #endif
4813  switch (instance->memb_state) {
4815  /* discard token */
4816  break;
4817 
4818  case MEMB_STATE_GATHER:
4819  memb_set_subtract (sub, &sub_entries,
4820  instance->my_proc_list, instance->my_proc_list_entries,
4821  instance->my_failed_list, instance->my_failed_list_entries);
4822 
4823  if (memb_set_equal (addr,
4824  memb_commit_token->addr_entries,
4825  sub,
4826  sub_entries) &&
4827 
4828  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4829  memcpy (instance->commit_token, memb_commit_token, msg_len);
4830  memb_state_commit_enter (instance);
4831  }
4832  break;
4833 
4834  case MEMB_STATE_COMMIT:
4835  /*
4836  * If retransmitted commit tokens are sent on this ring
4837  * filter them out and only enter recovery once the
4838  * commit token has traversed the array. This is
4839  * determined by :
4840  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4841  */
4842  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4843  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4844  memb_state_recovery_enter (instance, memb_commit_token);
4845  }
4846  break;
4847 
4848  case MEMB_STATE_RECOVERY:
4849  if (instance->my_id.nodeid == instance->my_ring_id.rep) {
4850 
4851  /* Filter out duplicated tokens */
4852  if (instance->originated_orf_token) {
4853  break;
4854  }
4855 
4856  instance->originated_orf_token = 1;
4857 
4859  "Sending initial ORF token");
4860 
4861  // TODO convert instead of initiate
4862  orf_token_send_initial (instance);
4863  reset_token_timeout (instance); // REVIEWED
4864  reset_token_retransmit_timeout (instance); // REVIEWED
4865  }
4866  break;
4867  }
4868  return (0);
4869 }
4870 
4871 static int message_handler_token_hold_cancel (
4872  struct totemsrp_instance *instance,
4873  const void *msg,
4874  size_t msg_len,
4875  int endian_conversion_needed)
4876 {
4877  const struct token_hold_cancel *token_hold_cancel = msg;
4878 
4879  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4880  return (0);
4881  }
4882 
4883  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
4884  sizeof (struct memb_ring_id)) == 0) {
4885 
4886  instance->my_seq_unchanged = 0;
4887  if (instance->my_ring_id.rep == instance->my_id.nodeid) {
4888  timer_function_token_retransmit_timeout (instance);
4889  }
4890  }
4891  return (0);
4892 }
4893 
4894 static int check_message_header_validity(
4895  void *context,
4896  const void *msg,
4897  unsigned int msg_len,
4898  const struct sockaddr_storage *system_from)
4899 {
4900  struct totemsrp_instance *instance = context;
4901  const struct totem_message_header *message_header = msg;
4902  const char *guessed_str;
4903  const char *msg_byte = msg;
4904 
4905  if (msg_len < sizeof (struct totem_message_header)) {
4907  "Message received from %s is too short... Ignoring %u.",
4908  totemip_sa_print((struct sockaddr *)system_from), (unsigned int)msg_len);
4909  return (-1);
4910  }
4911 
4912  if (message_header->magic != TOTEM_MH_MAGIC &&
4913  message_header->magic != swab16(TOTEM_MH_MAGIC)) {
4914  /*
4915  * We've received ether Knet, old version of Corosync,
4916  * or something else. Do some guessing to display (hopefully)
4917  * helpful message
4918  */
4919  guessed_str = NULL;
4920 
4921  if (message_header->magic == 0xFFFF) {
4922  /*
4923  * Corosync 2.2 used header with two UINT8_MAX
4924  */
4925  guessed_str = "Corosync 2.2";
4926  } else if (message_header->magic == 0xFEFE) {
4927  /*
4928  * Corosync 2.3+ used header with two UINT8_MAX - 1
4929  */
4930  guessed_str = "Corosync 2.3+";
4931  } else if (msg_byte[0] == 0x01) {
4932  /*
4933  * Knet has stable1 with first byte of message == 1
4934  */
4935  guessed_str = "unencrypted Kronosnet";
4936  } else if (msg_byte[0] >= 0 && msg_byte[0] <= 5) {
4937  /*
4938  * Unencrypted Corosync 1.x/OpenAIS has first byte
4939  * 0-5. Collision with Knet (but still worth the try)
4940  */
4941  guessed_str = "unencrypted Corosync 2.0/2.1/1.x/OpenAIS";
4942  } else {
4943  /*
4944  * Encrypted Kronosned packet has a hash at the end of
4945  * the packet and nothing specific at the beginning of the
4946  * packet (just encrypted data).
4947  * Encrypted Corosync 1.x/OpenAIS is quite similar but hash_digest
4948  * is in the beginning of the packet.
4949  *
4950  * So it's not possible to reliably detect ether of them.
4951  */
4952  guessed_str = "encrypted Kronosnet/Corosync 2.0/2.1/1.x/OpenAIS or unknown";
4953  }
4954 
4956  "Message received from %s has bad magic number (probably sent by %s).. Ignoring",
4957  totemip_sa_print((struct sockaddr *)system_from),
4958  guessed_str);
4959 
4960  return (-1);
4961  }
4962 
4963  if (message_header->version != TOTEM_MH_VERSION) {
4965  "Message received from %s has unsupported version %u... Ignoring",
4966  totemip_sa_print((struct sockaddr *)system_from),
4967  message_header->version);
4968 
4969  return (-1);
4970  }
4971 
4972  return (0);
4973 }
4974 
4975 
4977  void *context,
4978  const void *msg,
4979  unsigned int msg_len,
4980  const struct sockaddr_storage *system_from)
4981 {
4982  struct totemsrp_instance *instance = context;
4983  const struct totem_message_header *message_header = msg;
4984 
4985  if (check_message_header_validity(context, msg, msg_len, system_from) == -1) {
4986  return ;
4987  }
4988 
4989  switch (message_header->type) {
4991  instance->stats.orf_token_rx++;
4992  break;
4993  case MESSAGE_TYPE_MCAST:
4994  instance->stats.mcast_rx++;
4995  break;
4997  instance->stats.memb_merge_detect_rx++;
4998  break;
5000  instance->stats.memb_join_rx++;
5001  break;
5003  instance->stats.memb_commit_token_rx++;
5004  break;
5006  instance->stats.token_hold_cancel_rx++;
5007  break;
5008  default:
5010  "Message received from %s has wrong type... ignoring %d.\n",
5011  totemip_sa_print((struct sockaddr *)system_from),
5012  (int)message_header->type);
5013 
5014  instance->stats.rx_msg_dropped++;
5015  return;
5016  }
5017  /*
5018  * Handle incoming message
5019  */
5020  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5021  instance,
5022  msg,
5023  msg_len,
5024  message_header->magic != TOTEM_MH_MAGIC);
5025 }
5026 
5028  void *context,
5029  const struct totem_ip_address *interface_addr,
5030  unsigned short ip_port,
5031  unsigned int iface_no)
5032 {
5033  struct totemsrp_instance *instance = context;
5034  int res;
5035 
5036  totemip_copy(&instance->my_addrs[iface_no], interface_addr);
5037 
5038  res = totemnet_iface_set (
5039  instance->totemnet_context,
5040  interface_addr,
5041  ip_port,
5042  iface_no);
5043 
5044  return (res);
5045 }
5046 
5047 
5049  void *context,
5050  const struct totem_ip_address *iface_addr,
5051  unsigned int iface_no)
5052 {
5053  struct totemsrp_instance *instance = context;
5054  int num_interfaces;
5055  int i;
5056 
5057  if (!instance->my_id.nodeid) {
5058  instance->my_id.nodeid = iface_addr->nodeid;
5059  }
5060  totemip_copy (&instance->my_addrs[iface_no], iface_addr);
5061 
5062  if (instance->iface_changes++ == 0) {
5063  instance->memb_ring_id_create_or_load (&instance->my_ring_id, instance->my_id.nodeid);
5064  instance->token_ring_id_seq = instance->my_ring_id.seq;
5065  log_printf (
5066  instance->totemsrp_log_level_debug,
5067  "Created or loaded sequence id %llx.%u for this ring.",
5068  instance->my_ring_id.seq,
5069  instance->my_ring_id.rep);
5070 
5071  if (instance->totemsrp_service_ready_fn) {
5072  instance->totemsrp_service_ready_fn ();
5073  }
5074 
5075  }
5076 
5077  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5078  totemsrp_member_add (instance,
5079  &instance->totem_config->interfaces[iface_no].member_list[i],
5080  iface_no);
5081  }
5082 
5083  num_interfaces = 0;
5084  for (i = 0; i < INTERFACE_MAX; i++) {
5085  if (instance->totem_config->interfaces[i].configured) {
5086  num_interfaces++;
5087  }
5088  }
5089 
5090  if (instance->iface_changes >= num_interfaces) {
5091  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5092  }
5093 }
5094 
5095 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5096  totem_config->net_mtu -= sizeof (struct mcast);
5097 }
5098 
5100  void *context,
5101  void (*totem_service_ready) (void))
5102 {
5103  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5104 
5105  instance->totemsrp_service_ready_fn = totem_service_ready;
5106 }
5107 
5109  void *context,
5110  const struct totem_ip_address *member,
5111  int iface_no)
5112 {
5113  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5114  int res;
5115 
5116  res = totemnet_member_add (instance->totemnet_context, &instance->my_addrs[iface_no], member, iface_no);
5117 
5118  return (res);
5119 }
5120 
5122  void *context,
5123  const struct totem_ip_address *member,
5124  int iface_no)
5125 {
5126  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5127  int res;
5128 
5129  res = totemnet_member_remove (instance->totemnet_context, member, iface_no);
5130 
5131  return (res);
5132 }
5133 
5134 void totemsrp_threaded_mode_enable (void *context)
5135 {
5136  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5137 
5138  instance->threaded_mode_enabled = 1;
5139 }
5140 
5141 void totemsrp_trans_ack (void *context)
5142 {
5143  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5144 
5145  instance->waiting_trans_ack = 0;
5146  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5147 }
5148 
5149 
5150 int totemsrp_reconfigure (void *context, struct totem_config *totem_config)
5151 {
5152  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5153  int res;
5154 
5155  res = totemnet_reconfigure (instance->totemnet_context, totem_config);
5156  return (res);
5157 }
5158 
5159 void totemsrp_stats_clear (void *context, int flags)
5160 {
5161  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5162 
5163  memset(&instance->stats, 0, sizeof(totemsrp_stats_t));
5164  if (flags & TOTEMPG_STATS_CLEAR_TRANSPORT) {
5166  }
5167 }
5168 
5169 void totemsrp_force_gather (void *context)
5170 {
5171  timer_function_orf_token_timeout(context);
5172 }
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:464
unsigned int backlog
Definition: totemsrp.c:204
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:451
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:443
unsigned short family
Definition: coroapi.h:113
int totemsrp_reconfigure(void *context, struct totem_config *totem_config)
Definition: totemsrp.c:5150
struct totem_message_header header
Definition: totemsrp.c:181
int totemsrp_iface_set(void *context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemsrp.c:5027
gather_state_from
Definition: totemsrp.c:539
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5048
struct srp_addr system_from
Definition: totemsrp.c:214
unsigned int nodeid
Definition: totem.h:129
struct memb_ring_id ring_id
Definition: totemsrp.c:192
int totemnet_mcast_flush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:410
uint32_t waiting_trans_ack
Definition: totemsrp.c:521
struct srp_addr system_from
Definition: totemsrp.c:182
struct memb_ring_id ring_id
Definition: totemsrp.c:251
int totemsrp_log_level_debug
Definition: totemsrp.c:429
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totemstats.h:65
qb_loop_timer_handle timer_orf_token_warning
Definition: totemsrp.c:402
int my_leave_memb_entries
Definition: totemsrp.c:335
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:491
unsigned int proc_list_entries
Definition: totemsrp.c:215
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:158
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1112
struct qb_list_head list
Definition: totemsrp.c:167
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:469
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:457
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int seq
Definition: totemsrp.c:262
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totemstats.h:90
int totemsrp_log_level_error
Definition: totemsrp.c:423
int old_ring_state_aru
Definition: totemsrp.c:489
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:99
unsigned int seq
Definition: totemsrp.c:199
struct memb_ring_id ring_id
Definition: totemsrp.c:241
int fcc_remcast_current
Definition: totemsrp.c:293
#define TOTEM_MH_MAGIC
Definition: totem.h:121
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:416
unsigned int failed_list_entries
Definition: totemsrp.c:216
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:265
uint64_t mcast_rx
Definition: totemstats.h:63
unsigned long long int tv_old
Definition: totemsrp.c:3799
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5108
#define SEQNO_START_TOKEN
Definition: totemsrp.c:119
void totemnet_stats_clear(void *net_context)
Definition: totemnet.c:574
unsigned int token_hold_timeout
Definition: totem.h:180
unsigned int msg_len
Definition: totemsrp.c:266
int member_count
Definition: totem.h:88
struct memb_ring_id ring_id
Definition: totemsrp.c:203
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:95
void * token_sent_event_handle
Definition: totemsrp.c:526
int retrans_flg
Definition: totemsrp.c:206
struct srp_addr system_from
Definition: totemsrp.c:229
int my_new_memb_entries
Definition: totemsrp.c:325
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
int addr_entries
Definition: totemsrp.c:265
int totemsrp_log_level_notice
Definition: totemsrp.c:427
unsigned int proc_list_entries
Definition: totemsrp.c:262
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1101
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:238
unsigned int my_pbl
Definition: totemsrp.c:505
int totemnet_member_remove(void *net_context, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:524
#define TOTEM_TOKEN_STATS_MAX
Definition: totemstats.h:89
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5095
int totemsrp_log_level_warning
Definition: totemsrp.c:425
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1087
unsigned int my_aru
Definition: totemsrp.c:381
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int iface_no)
Definition: totemsrp.c:5121
uint64_t memb_merge_detect_rx
Definition: totemstats.h:58
int guarantee
Definition: totemsrp.c:266
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:255
unsigned int seq
Definition: totemsrp.c:183
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
char commit_token_storage[40000]
Definition: totemsrp.c:527
The sq struct.
Definition: sq.h:43
unsigned int set_aru
Definition: totemsrp.c:485
struct cs_queue new_message_queue
Definition: totemsrp.c:368
int my_rotation_counter
Definition: totemsrp.c:355
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totemstats.h:55
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3492
uint64_t gather_token_lost
Definition: totemstats.h:71
int totemsrp_log_level_trace
Definition: totemsrp.c:431
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:123
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
memb_state
Definition: totemsrp.c:274
unsigned int downcheck_timeout
Definition: totem.h:192
struct qb_list_head token_callback_received_listhead
Definition: totemsrp.c:385
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:98
uint64_t memb_commit_token_tx
Definition: totemstats.h:64
int my_deliver_memb_entries
Definition: totemsrp.c:331
unsigned int max_network_delay
Definition: totem.h:208
unsigned int heartbeat_failures_allowed
Definition: totem.h:206
unsigned int my_last_seq
Definition: totemsrp.c:493
int my_left_memb_entries
Definition: totemsrp.c:333
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:481
struct totem_ip_address mcast_address
Definition: totemsrp.c:449
void(* totemsrp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:435
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3457
unsigned int send_join_timeout
Definition: totem.h:186
unsigned int window_size
Definition: totem.h:210
int guarantee
Definition: totemsrp.c:187
unsigned int seq
Definition: totemsrp.c:193
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5099
struct mcast * mcast
Definition: totemsrp.c:270
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totemstats.h:68
void(*) in log_level_security)
Definition: totem.h:106
unsigned long long ring_seq
Definition: totemsrp.c:217
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2459
message_type
Definition: totemsrp.c:143
uint64_t operational_token_lost
Definition: totemstats.h:69
unsigned int received_flg
Definition: totemsrp.c:263
uint64_t consensus_timeouts
Definition: totemstats.h:76
unsigned int aru_addr
Definition: totemsrp.c:202
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:383
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:677
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:412
uint64_t recovery_token_lost
Definition: totemstats.h:75
int totemnet_recv_flush(void *net_context)
Definition: totemnet.c:378
int totemnet_token_target_set(void *net_context, unsigned int nodeid)
Definition: totemnet.c:481
int totemnet_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
Definition: totemnet.c:468
unsigned int backlog
Definition: totemsrp.c:266
int this_seqno
Definition: totemsrp.c:184
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:218
unsigned int token_retransmits_before_loss_const
Definition: totem.h:182
uint8_t configured
Definition: totem.h:87
totemsrp_stats_t * srp
Definition: totemstats.h:96
struct rtr_item rtr_list[0]
Definition: totemsrp.c:270
unsigned int retrans_flg
Definition: totemsrp.c:252
struct memb_ring_id ring_id
Definition: totemsrp.c:185
unsigned int seqno_unchanged_const
Definition: totem.h:196
uint64_t commit_token_lost
Definition: totemstats.h:73
unsigned int miss_count_const
Definition: totem.h:232
uint64_t token_hold_cancel_rx
Definition: totemstats.h:67
int totemnet_crypto_set(void *net_context, const char *cipher_type, const char *hash_type)
Definition: totemnet.c:276
unsigned int join_timeout
Definition: totem.h:184
unsigned int aru
Definition: totemsrp.c:242
uint32_t originated_orf_token
Definition: totemsrp.c:517
unsigned int rep
Definition: totem.h:148
void * totemnet_buffer_alloc(void *net_context)
Definition: totemnet.c:351
unsigned int nodeid
Definition: coroapi.h:112
uint32_t flags
uint64_t pause_timestamp
Definition: totemsrp.c:509
int my_set_retrans_flg
Definition: totemsrp.c:357
struct totem_ip_address mcast_addr
Definition: totem.h:84
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
int totemnet_recv_mcast_empty(void *net_context)
Definition: totemnet.c:493
unsigned int received_flg
Definition: totemsrp.c:244
unsigned int my_cbl
Definition: totemsrp.c:507
unsigned int last_released
Definition: totemsrp.c:483
int orf_token_retransmit_size
Definition: totemsrp.c:391
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2530
uint64_t mcast_retx
Definition: totemstats.h:62
int totemnet_initialize(qb_loop_t *loop_pt, void **net_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Definition: totemnet.c:301
unsigned int msg_len
Definition: totemsrp.c:271
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:94
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
unsigned int fail_to_recv_const
Definition: totem.h:194
int totemnet_send_flush(void *net_context)
Definition: totemnet.c:388
#define TOTEM_MH_VERSION
Definition: totem.h:122
unsigned int token_seq
Definition: totemsrp.c:200
void totemsrp_stats_clear(void *context, int flags)
Definition: totemsrp.c:5159
int totemnet_mcast_noflush_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:423
struct mcast * mcast
Definition: totemsrp.c:265
void * token_recv_event_handle
Definition: totemsrp.c:525
struct totem_ip_address boundto
Definition: totem.h:83
unsigned int my_high_seq_received
Definition: totemsrp.c:351
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:447
int totemnet_finalize(void *net_context)
Definition: totemnet.c:290
totem_event_type
Definition: totem.h:259
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:408
int old_ring_state_saved
Definition: totemsrp.c:487
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
uint64_t rx_msg_dropped
Definition: totemstats.h:77
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:99
int totemsrp_log_level_security
Definition: totemsrp.c:421
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:404
struct totem_config * totem_config
Definition: totemsrp.c:499
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:168
struct totem_message_header header
Definition: totemsrp.c:260
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
uint32_t continuous_gather
Definition: totemstats.h:78
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
Definition: totemsrp.c:4976
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5134
int totemnet_token_send(void *net_context, const void *msg, unsigned int msg_len)
Definition: totemnet.c:398
unsigned int aru
Definition: totemsrp.c:263
encapsulation_type
Definition: totemsrp.c:152
unsigned int net_mtu
Definition: totem.h:202
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, unsigned int *interface_id, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1049
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totempg_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:818
struct totem_message_header header
Definition: totemsrp.c:249
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2450
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totem.h:242
unsigned int node_id
Definition: totemsrp.c:186
uint32_t orf_token_discard
Definition: totemsrp.c:515
int my_failed_list_entries
Definition: totemsrp.c:323
struct srp_addr my_id
Definition: totemsrp.c:301
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totemstats.h:66
const char * totemip_sa_print(const struct sockaddr *sa)
Definition: totemip.c:242
unsigned int token_timeout
Definition: totem.h:174
Definition: totemsrp.c:240
struct totem_message_header header
Definition: totemsrp.c:198
unsigned int high_delivered
Definition: totemsrp.c:243
unsigned int consensus_timeout
Definition: totem.h:188
totemsrp_stats_t stats
Definition: totemsrp.c:513
void totemsrp_force_gather(void *context)
Definition: totemsrp.c:5169
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
struct totem_ip_address my_addrs[INTERFACE_MAX]
Definition: totemsrp.c:303
uint64_t mcast_tx
Definition: totemstats.h:61
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
unsigned int token_warning
Definition: totem.h:176
struct sq regular_sort_queue
Definition: totemsrp.c:374
int my_retrans_flg_count
Definition: totemsrp.c:359
The memb_ring_id struct.
Definition: coroapi.h:122
#define SEQNO_START_MSG
Definition: totemsrp.c:118
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
struct totem_message_header header
Definition: totemsrp.c:235
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1026
struct totem_message_header header
Definition: totemsrp.c:228
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:93
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
unsigned int aru
Definition: totemsrp.c:201
const char * gather_state_from_desc[]
Definition: totemsrp.c:559
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:410
int my_trans_memb_entries
Definition: totemsrp.c:327
unsigned int my_trc
Definition: totemsrp.c:503
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:466
uint64_t memb_merge_detect_tx
Definition: totemstats.h:57
unsigned int high_delivered
Definition: totemsrp.c:262
struct rtr_item rtr_list[0]
Definition: totemsrp.c:208
int consensus_list_entries
Definition: totemsrp.c:297
uint64_t memb_join_rx
Definition: totemstats.h:60
int totemnet_member_add(void *net_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
Definition: totemnet.c:504
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:266
#define FRAME_SIZE_MAX
Definition: totem.h:52
int rtr_list_entries
Definition: totemsrp.c:207
uint32_t threaded_mode_enabled
Definition: totemsrp.c:519
enum totem_callback_token_type callback_type
Definition: totemsrp.c:169
int my_proc_list_entries
Definition: totemsrp.c:321
unsigned long long ring_seq
Definition: totemsrp.c:264
struct totem_message_header header
Definition: totemsrp.c:213
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:200
struct memb_ring_id ring_id
Definition: totemsrp.c:236
#define log_printf(level, format, args...)
Definition: totemsrp.c:689
unsigned long long seq
Definition: coroapi.h:124
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5141
unsigned int max_messages
Definition: totem.h:212
uint64_t recovery_entered
Definition: totemstats.h:74
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:414
void totemnet_buffer_release(void *net_context, void *ptr)
Definition: totemnet.c:359
int totemnet_reconfigure(void *net_context, struct totem_config *totem_config)
Definition: totemnet.c:560
struct memb_commit_token * commit_token
Definition: totemsrp.c:511
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:295
struct srp_addr system_from
Definition: totemsrp.c:261
struct srp_addr addr
Definition: totemsrp.c:161
char type
Definition: totem.h:55
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, unsigned int nodeid)
Definition: totemsrp.c:473
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:532
int totemsrp_subsys_id
Definition: totemsrp.c:433
unsigned int merge_timeout
Definition: totem.h:190
unsigned int use_heartbeat
Definition: totemsrp.c:501
int totemnet_iface_check(void *net_context)
Definition: totemnet.c:436
unsigned int token_retransmit_timeout
Definition: totem.h:178
struct qb_list_head token_callback_sent_listhead
Definition: totemsrp.c:387
int rtr_list_entries
Definition: totemsrp.c:269
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:97
unsigned short magic
Definition: totem.h:125
unsigned int token_seq
Definition: totemsrp.c:250
unsigned int my_token_seq
Definition: totemsrp.c:393
struct memb_ring_id ring_id
Definition: totemsrp.c:264
unsigned int my_last_aru
Definition: totemsrp.c:345
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
unsigned int nodeid
Definition: totemsrp.c:105
void * totemnet_context
Definition: totemsrp.c:497
uint64_t commit_entered
Definition: totemstats.h:72
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:406
int totemnet_iface_set(void *net_context, const struct totem_ip_address *interface_addr, unsigned short ip_port, unsigned int iface_no)
Definition: totemnet.c:455
struct memb_ring_id ring_id
Definition: totemsrp.c:230
unsigned int my_install_seq
Definition: totemsrp.c:353
uint64_t orf_token_rx
Definition: totemstats.h:56
unsigned int threads
Definition: totem.h:204
unsigned int failed_list_entries
Definition: totemsrp.c:263
struct sq recovery_sort_queue
Definition: totemsrp.c:376
#define TOTEMPG_STATS_CLEAR_TRANSPORT
Definition: totemstats.h:116
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
int totemnet_processor_count_set(void *net_context, int processor_count)
Definition: totemnet.c:367
unsigned int fcc
Definition: totemsrp.c:205