57 #include <sys/types.h> 59 #include <sys/socket.h> 62 #include <sys/ioctl.h> 63 #include <sys/param.h> 64 #include <netinet/in.h> 65 #include <arpa/inet.h> 78 #include <qb/qbdefs.h> 79 #include <qb/qbutil.h> 80 #include <qb/qbloop.h> 86 #define LOGSYS_UTILS_ONLY 1 95 #define LOCALHOST_IP inet_addr("127.0.0.1") 96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 100 #define RETRANSMIT_ENTRIES_MAX 30 101 #define TOKEN_SIZE_MAX 64000 102 #define LEAVE_DUMMY_NODEID 0 114 #define SEQNO_START_MSG 0x0 115 #define SEQNO_START_TOKEN 0x0 137 #define ENDIAN_LOCAL 0xff22 374 struct sq regular_sort_queue;
376 struct sq recovery_sort_queue;
433 void (*totemsrp_log_printf) (
436 const char *
function,
439 const char *format, ...)
__attribute__((format(printf, 6, 7)));;
449 void (*totemsrp_deliver_fn) (
452 unsigned int msg_len,
453 int endian_conversion_required);
455 void (*totemsrp_confchg_fn) (
457 const unsigned int *member_list,
size_t member_list_entries,
458 const unsigned int *left_list,
size_t left_list_entries,
459 const unsigned int *joined_list,
size_t joined_list_entries,
462 void (*totemsrp_service_ready_fn) (void);
464 void (*totemsrp_waiting_trans_ack_cb_fn) (
465 int waiting_trans_ack);
467 void (*memb_ring_id_create_or_load) (
471 void (*memb_ring_id_store) (
472 const struct memb_ring_id *memb_ring_id,
525 char commit_token_storage[40000];
530 int (*handler_functions[6]) (
534 int endian_conversion_needed);
579 static int message_handler_orf_token (
583 int endian_conversion_needed);
585 static int message_handler_mcast (
589 int endian_conversion_needed);
591 static int message_handler_memb_merge_detect (
595 int endian_conversion_needed);
597 static int message_handler_memb_join (
601 int endian_conversion_needed);
603 static int message_handler_memb_commit_token (
607 int endian_conversion_needed);
609 static int message_handler_token_hold_cancel (
613 int endian_conversion_needed);
617 static unsigned int main_msgs_missing (
void);
619 static void main_token_seqid_get (
622 unsigned int *token_is);
624 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src);
626 static void srp_addr_to_nodeid (
627 unsigned int *nodeid_out,
629 unsigned int entries);
631 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b);
637 static void messages_deliver_to_app (
struct totemsrp_instance *instance,
int skip,
unsigned int end_point);
639 int fcc_mcasts_allowed);
640 static void messages_free (
struct totemsrp_instance *instance,
unsigned int token_aru);
644 static void target_set_completed (
void *context);
646 static void memb_state_commit_token_target_set (
struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out);
652 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out);
653 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out);
654 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out);
655 static void memb_merge_detect_endian_convert (
658 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (
void *data);
660 static void timer_function_pause_timeout (
void *data);
661 static void timer_function_heartbeat_timeout (
void *data);
662 static void timer_function_token_retransmit_timeout (
void *data);
663 static void timer_function_token_hold_retransmit_timeout (
void *data);
664 static void timer_function_merge_detect_timeout (
void *data);
666 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr);
672 unsigned int msg_len);
677 unsigned int iface_no);
682 message_handler_orf_token,
683 message_handler_mcast,
684 message_handler_memb_merge_detect,
685 message_handler_memb_join,
686 message_handler_memb_commit_token,
687 message_handler_token_hold_cancel
691 #define log_printf(level, format, args...) \ 693 instance->totemsrp_log_printf ( \ 694 level, instance->totemsrp_subsys_id, \ 695 __FUNCTION__, __FILE__, __LINE__, \ 698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 700 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 701 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 702 instance->totemsrp_log_printf ( \ 703 level, instance->totemsrp_subsys_id, \ 704 __FUNCTION__, __FILE__, __LINE__, \ 705 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 711 return gather_state_from_desc[gsfrom];
751 static void main_token_seqid_get (
754 unsigned int *token_is)
766 static unsigned int main_msgs_missing (
void)
775 uint64_t timestamp_msec;
778 now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
783 "Process pause detected for %d ms, flushing membership messages.", (
unsigned int)(now_msec - timestamp_msec));
798 unsigned long long nano_secs = qb_util_nano_current_get ();
800 time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
833 qb_loop_t *poll_handle,
841 unsigned int msg_len,
842 int endian_conversion_required),
846 const unsigned int *member_list,
size_t member_list_entries,
847 const unsigned int *left_list,
size_t left_list_entries,
848 const unsigned int *joined_list,
size_t joined_list_entries,
850 void (*waiting_trans_ack_cb_fn) (
857 if (instance == NULL) {
861 totemsrp_instance_initialize (instance);
899 "Token Timeout (%d ms) retransmit timeout (%d ms)",
902 "token hold (%d ms) retransmits before loss (%d retrans)",
905 "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
912 "downcheck (%d ms) fail to recv const (%d msgs)",
918 "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
922 "missed count const (%d messages)",
926 "send threads (%d threads)", totem_config->
threads);
928 "RRP token expired timeout (%d ms)",
931 "RRP token problem counter (%d ms)",
934 "RRP threshold (%d problem count)",
937 "RRP multicast threshold (%d problem count)",
940 "RRP automatic recovery check timeout (%d ms)",
967 timer_function_pause_timeout (instance);
971 "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
982 "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
986 "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
988 "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
1004 main_iface_change_fn,
1005 main_token_seqid_get,
1007 target_set_completed);
1027 token_event_stats_collector,
1033 token_event_stats_collector,
1035 *srp_context = instance;
1048 memb_leave_message_send (instance);
1068 unsigned int nodeid,
1070 unsigned int interfaces_size,
1072 unsigned int *iface_count)
1076 unsigned int found = 0;
1089 if (interfaces_size >= *iface_count) {
1109 if (interfaces_size >= *iface_count) {
1126 const char *cipher_type,
1127 const char *hash_type)
1176 static int srp_addr_equal (
const struct srp_addr *a,
const struct srp_addr *b)
1181 for (i = 0; i < 1; i++) {
1190 static void srp_addr_copy (
struct srp_addr *dest,
const struct srp_addr *src)
1201 static void srp_addr_to_nodeid (
1202 unsigned int *nodeid_out,
1204 unsigned int entries)
1208 for (i = 0; i < entries; i++) {
1209 nodeid_out[i] = srp_addr_in[i].
addr[0].
nodeid;
1213 static void srp_addr_copy_endian_convert (
struct srp_addr *out,
const struct srp_addr *in)
1227 static void memb_set_subtract (
1228 struct srp_addr *out_list,
int *out_list_entries,
1229 struct srp_addr *one_list,
int one_list_entries,
1230 struct srp_addr *two_list,
int two_list_entries)
1236 *out_list_entries = 0;
1238 for (i = 0; i < one_list_entries; i++) {
1239 for (j = 0; j < two_list_entries; j++) {
1240 if (srp_addr_equal (&one_list[i], &two_list[j])) {
1246 srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1247 *out_list_entries = *out_list_entries + 1;
1256 static void memb_consensus_set (
1283 static int memb_consensus_isset (
1300 static int memb_consensus_agreed (
1303 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1304 int token_memb_entries = 0;
1308 memb_set_subtract (token_memb, &token_memb_entries,
1312 for (i = 0; i < token_memb_entries; i++) {
1313 if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1328 assert (token_memb_entries >= 1);
1333 static void memb_consensus_notset (
1335 struct srp_addr *no_consensus_list,
1336 int *no_consensus_list_entries,
1338 int comparison_list_entries)
1342 *no_consensus_list_entries = 0;
1345 if (memb_consensus_isset (instance, &instance->
my_proc_list[i]) == 0) {
1346 srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->
my_proc_list[i]);
1347 *no_consensus_list_entries = *no_consensus_list_entries + 1;
1355 static int memb_set_equal (
1356 struct srp_addr *set1,
int set1_entries,
1357 struct srp_addr *set2,
int set2_entries)
1364 if (set1_entries != set2_entries) {
1367 for (i = 0; i < set2_entries; i++) {
1368 for (j = 0; j < set1_entries; j++) {
1369 if (srp_addr_equal (&set1[j], &set2[i])) {
1385 static int memb_set_subset (
1386 const struct srp_addr *subset,
int subset_entries,
1387 const struct srp_addr *fullset,
int fullset_entries)
1393 if (subset_entries > fullset_entries) {
1396 for (i = 0; i < subset_entries; i++) {
1397 for (j = 0; j < fullset_entries; j++) {
1398 if (srp_addr_equal (&subset[i], &fullset[j])) {
1412 static void memb_set_merge (
1413 const struct srp_addr *subset,
int subset_entries,
1414 struct srp_addr *fullset,
int *fullset_entries)
1420 for (i = 0; i < subset_entries; i++) {
1421 for (j = 0; j < *fullset_entries; j++) {
1422 if (srp_addr_equal (&fullset[j], &subset[i])) {
1428 srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1429 *fullset_entries = *fullset_entries + 1;
1436 static void memb_set_and_with_ring_id (
1452 for (i = 0; i < set2_entries; i++) {
1453 for (j = 0; j < set1_entries; j++) {
1454 if (srp_addr_equal (&set1[j], &set2[i])) {
1455 if (memcmp (&set1_ring_ids[j], old_ring_id,
sizeof (
struct memb_ring_id)) == 0) {
1462 srp_addr_copy (&and[*and_entries], &set1[j]);
1463 *and_entries = *and_entries + 1;
1470 #ifdef CODE_COVERAGE 1471 static void memb_set_print (
1478 printf (
"List '%s' contains %d entries:\n",
string, list_entries);
1480 for (i = 0; i < list_entries; i++) {
1481 printf (
"Address %d with %d rings\n", i, list[i].
no_addrs);
1482 for (j = 0; j < list[i].
no_addrs; j++) {
1483 printf (
"\tiface %d %s\n", j,
totemip_print (&list[i].addr[j]));
1484 printf (
"\tfamily %d\n", list[i].addr[j].
family);
1489 static void my_leave_memb_clear(
1496 static unsigned int my_leave_memb_match(
1498 unsigned int nodeid)
1501 unsigned int ret = 0;
1512 static void my_leave_memb_set(
1514 unsigned int nodeid)
1531 "Cannot set LEAVE nodeid=%d", nodeid);
1538 assert (instance != NULL);
1542 static void totemsrp_buffer_release (
struct totemsrp_instance *instance,
void *ptr)
1544 assert (instance != NULL);
1558 timer_function_token_retransmit_timeout,
1561 log_printf(instance->totemsrp_log_level_error,
"reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1575 timer_function_merge_detect_timeout,
1578 log_printf(instance->totemsrp_log_level_error,
"start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1581 instance->my_merge_detect_timeout_outstanding = 1;
1604 "Saving state aru %x high seq received %x",
1614 "Restoring instance->my_aru %x my high seq received %x",
1621 "Resetting old ring state");
1634 timer_function_pause_timeout,
1637 log_printf(instance->totemsrp_log_level_error,
"reset_pause_timeout - qb_loop_timer_add error : %d", res);
1649 timer_function_orf_token_timeout,
1652 log_printf(instance->totemsrp_log_level_error,
"reset_token_timeout - qb_loop_timer_add error : %d", res);
1664 timer_function_heartbeat_timeout,
1667 log_printf(instance->totemsrp_log_level_error,
"reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1680 static void cancel_token_retransmit_timeout (
struct totemsrp_instance *instance)
1685 static void start_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1693 timer_function_token_hold_retransmit_timeout,
1696 log_printf(instance->totemsrp_log_level_error,
"start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1700 static void cancel_token_hold_retransmit_timeout (
struct totemsrp_instance *instance)
1706 static void memb_state_consensus_timeout_expired (
1709 struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1710 int no_consensus_list_entries;
1713 if (memb_consensus_agreed (instance)) {
1714 memb_consensus_reset (instance);
1716 memb_consensus_set (instance, &instance->
my_id);
1718 reset_token_timeout (instance);
1720 memb_consensus_notset (
1723 &no_consensus_list_entries,
1727 memb_set_merge (no_consensus_list, no_consensus_list_entries,
1740 static void timer_function_pause_timeout (
void *data)
1745 reset_pause_timeout (instance);
1750 old_ring_state_restore (instance);
1755 static void timer_function_orf_token_timeout (
void *data)
1762 "The token was lost in the OPERATIONAL state.");
1764 "A processor failed, forming new configuration.");
1772 "The consensus timeout expired.");
1773 memb_state_consensus_timeout_expired (instance);
1780 "The token was lost in the COMMIT state.");
1787 "The token was lost in the RECOVERY state.");
1788 memb_recovery_state_token_loss (instance);
1794 static void timer_function_heartbeat_timeout (
void *data)
1798 "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->
memb_state);
1799 timer_function_orf_token_timeout(data);
1802 static void memb_timer_function_state_gather (
void *data)
1814 memb_join_message_send (instance);
1825 memb_timer_function_state_gather,
1826 &instance->memb_timer_state_gather_join_timeout);
1829 log_printf(instance->totemsrp_log_level_error,
"memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1835 static void memb_timer_function_gather_consensus_timeout (
void *data)
1838 memb_state_consensus_timeout_expired (instance);
1841 static void deliver_messages_from_recovery_to_regular (
struct totemsrp_instance *instance)
1846 unsigned int range = 0;
1859 for (i = 1; i <= range; i++) {
1865 recovery_message_item = ptr;
1870 mcast = recovery_message_item->
mcast;
1876 regular_message_item.
mcast =
1877 (
struct mcast *)(((
char *)recovery_message_item->
mcast) +
sizeof (
struct mcast));
1878 regular_message_item.
msg_len =
1879 recovery_message_item->
msg_len -
sizeof (
struct mcast);
1880 mcast = regular_message_item.
mcast;
1889 "comparing if ring id is for this processors old ring seqno %d",
1903 ®ular_message_item, mcast->
seq);
1910 "-not adding msg with seq no %x", mcast->
seq);
1920 struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1921 int joined_list_entries = 0;
1922 unsigned int aru_save;
1929 char left_node_msg[1024];
1930 char joined_node_msg[1024];
1931 char failed_node_msg[1024];
1935 memb_consensus_reset (instance);
1937 old_ring_state_reset (instance);
1939 deliver_messages_from_recovery_to_regular (instance);
1942 "Delivering to app %x to %x",
1945 aru_save = instance->
my_aru;
1958 memb_set_subtract (joined_list, &joined_list_entries,
1986 srp_addr_to_nodeid (trans_memb_list_totemip,
1999 instance->
my_aru = aru_save;
2009 joined_list, joined_list_entries,
2014 srp_addr_to_nodeid (new_memb_list_totemip,
2016 srp_addr_to_nodeid (joined_list_totemip, joined_list,
2017 joined_list_entries);
2021 joined_list_totemip, joined_list_entries, &instance->
my_ring_id);
2083 regular_message = ptr;
2084 free (regular_message->
mcast);
2090 if (joined_list_entries) {
2092 sptr += snprintf(joined_node_msg,
sizeof(joined_node_msg)-sptr,
" joined:");
2093 for (i=0; i< joined_list_entries; i++) {
2094 sptr += snprintf(joined_node_msg+sptr,
sizeof(joined_node_msg)-sptr,
" %u", joined_list_totemip[i]);
2098 joined_node_msg[0] =
'\0';
2104 sptr += snprintf(left_node_msg,
sizeof(left_node_msg)-sptr,
" left:");
2106 sptr += snprintf(left_node_msg+sptr,
sizeof(left_node_msg)-sptr,
" %u", left_list[i]);
2109 if (my_leave_memb_match(instance, left_list[i]) == 0) {
2111 sptr2 += snprintf(failed_node_msg,
sizeof(failed_node_msg)-sptr2,
" failed:");
2113 sptr2 += snprintf(failed_node_msg+sptr2,
sizeof(left_node_msg)-sptr2,
" %u", left_list[i]);
2117 failed_node_msg[0] =
'\0';
2121 left_node_msg[0] =
'\0';
2122 failed_node_msg[0] =
'\0';
2125 my_leave_memb_clear(instance);
2128 "entering OPERATIONAL state.");
2130 "A new membership (%s:%lld) was formed. Members%s%s",
2136 if (strlen(failed_node_msg)) {
2138 "Failed to receive the leave message.%s",
2149 reset_pause_timeout (instance);
2162 static void memb_state_gather_enter (
2173 &instance->
my_id, 1,
2176 memb_join_message_send (instance);
2187 memb_timer_function_state_gather,
2188 &instance->memb_timer_state_gather_join_timeout);
2190 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2196 qb_loop_timer_del (instance->totemsrp_poll_handle,
2197 instance->memb_timer_state_gather_consensus_timeout);
2199 res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2201 instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2203 memb_timer_function_gather_consensus_timeout,
2204 &instance->memb_timer_state_gather_consensus_timeout);
2206 log_printf(instance->totemsrp_log_level_error,
"memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2212 cancel_token_retransmit_timeout (instance);
2213 cancel_token_timeout (instance);
2214 cancel_merge_detect_timeout (instance);
2216 memb_consensus_reset (instance);
2218 memb_consensus_set (instance, &instance->my_id);
2220 log_printf (instance->totemsrp_log_level_debug,
2221 "entering GATHER state from %d(%s).",
2222 gather_from, gsfrom_to_msg(gather_from));
2225 instance->stats.gather_entered++;
2231 instance->stats.continuous_gather++;
2237 static void timer_function_token_retransmit_timeout (
void *data);
2239 static void target_set_completed (
2244 memb_state_commit_token_send (instance);
2248 static void memb_state_commit_enter (
2251 old_ring_state_save (instance);
2253 memb_state_commit_token_update (instance);
2255 memb_state_commit_token_target_set (instance);
2271 "entering COMMIT state.");
2274 reset_token_retransmit_timeout (instance);
2275 reset_token_timeout (instance);
2291 static void memb_state_recovery_enter (
2296 int local_received_flg = 1;
2297 unsigned int low_ring_aru;
2298 unsigned int range = 0;
2299 unsigned int messages_originated = 0;
2302 struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2308 "entering RECOVERY state.");
2319 memb_state_commit_token_send_recovery (instance, commit_token);
2334 memcpy (&my_new_memb_ring_id_list[i],
2335 &memb_list[i].ring_id,
2338 memb_set_and_with_ring_id (
2340 my_new_memb_ring_id_list,
2354 "position [%d] member %s:", i,
totemip_print (&addr[i].addr[0]));
2356 "previous ring seq %llx rep %s",
2361 "aru %x high delivered %x received flag %d",
2379 local_received_flg = 0;
2383 if (local_received_flg == 1) {
2399 if (sq_lt_compare (memb_list[i].
aru, low_ring_aru)) {
2401 low_ring_aru = memb_list[i].
aru;
2422 "copying all old ring messages from %x-%x.",
2425 for (i = 1; i <= range; i++) {
2432 low_ring_aru + i, &ptr);
2436 sort_queue_item = ptr;
2437 messages_originated++;
2438 memset (&message_item, 0,
sizeof (
struct message_item));
2440 message_item.
mcast = totemsrp_buffer_alloc (instance);
2441 assert (message_item.
mcast);
2451 memcpy (((
char *)message_item.
mcast) + sizeof (
struct mcast),
2452 sort_queue_item->
mcast,
2457 "Originated %d messages in RECOVERY.", messages_originated);
2462 "Did not need to originate any messages in recovery.");
2472 reset_token_timeout (instance);
2473 reset_token_retransmit_timeout (instance);
2486 token_hold_cancel_send (instance);
2493 struct iovec *iovec,
2494 unsigned int iov_len,
2501 unsigned int addr_idx;
2510 if (cs_queue_is_full (queue_use)) {
2515 memset (&message_item, 0,
sizeof (
struct message_item));
2520 message_item.
mcast = totemsrp_buffer_alloc (instance);
2521 if (message_item.
mcast == 0) {
2528 memset(message_item.
mcast, 0, sizeof (
struct mcast));
2538 addr = (
char *)message_item.
mcast;
2539 addr_idx = sizeof (
struct mcast);
2540 for (i = 0; i < iov_len; i++) {
2541 memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2542 addr_idx += iovec[i].iov_len;
2545 message_item.
msg_len = addr_idx;
2549 cs_queue_item_add (queue_use, &message_item);
2571 cs_queue_avail (queue_use, &avail);
2582 static int orf_token_remcast (
2590 struct sq *sort_queue;
2598 res = sq_in_range (sort_queue, seq);
2607 res = sq_item_get (sort_queue, seq, &ptr);
2612 sort_queue_item = ptr;
2616 sort_queue_item->
mcast,
2626 static void messages_free (
2628 unsigned int token_aru)
2633 int log_release = 0;
2634 unsigned int release_to;
2635 unsigned int range = 0;
2637 release_to = token_aru;
2638 if (sq_lt_compare (instance->
my_last_aru, release_to)) {
2658 for (i = 1; i <= range; i++) {
2664 regular_message = ptr;
2665 totemsrp_buffer_release (instance, regular_message->
mcast);
2676 "releasing messages up to and including %x", release_to);
2680 static void update_aru (
2685 struct sq *sort_queue;
2687 unsigned int my_aru_saved = 0;
2697 my_aru_saved = instance->
my_aru;
2698 for (i = 1; i <= range; i++) {
2702 res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2710 instance->
my_aru += i - 1;
2716 static int orf_token_mcast (
2719 int fcc_mcasts_allowed)
2723 struct sq *sort_queue;
2726 unsigned int fcc_mcast_current;
2731 reset_token_retransmit_timeout (instance);
2742 for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2743 if (cs_queue_is_empty (mcast_queue)) {
2746 message_item = (
struct message_item *)cs_queue_item_get (mcast_queue);
2754 memset (&sort_queue_item, 0,
sizeof (
struct sort_queue_item));
2758 mcast = sort_queue_item.
mcast;
2765 sq_item_add (sort_queue, &sort_queue_item, message_item->
mcast->
seq);
2769 message_item->
mcast,
2775 cs_queue_item_remove (mcast_queue);
2783 update_aru (instance);
2788 return (fcc_mcast_current);
2795 static int orf_token_rtr (
2798 unsigned int *fcc_allowed)
2803 struct sq *sort_queue;
2805 unsigned int range = 0;
2806 char retransmit_msg[1024];
2815 rtr_list = &orf_token->
rtr_list[0];
2817 strcpy (retransmit_msg,
"Retransmit List: ");
2822 sprintf (value,
"%x ", rtr_list[i].seq);
2823 strcat (retransmit_msg, value);
2825 strcat (retransmit_msg,
"");
2827 "%s", retransmit_msg);
2840 if (memcmp (&rtr_list[i].ring_id, &instance->
my_ring_id,
2847 res = orf_token_remcast (instance, rtr_list[i].seq);
2854 memmove (&rtr_list[i], &rtr_list[i + 1],
2870 range = orf_token->
seq - instance->
my_aru;
2874 (i <= range); i++) {
2879 res = sq_in_range (sort_queue, instance->
my_aru + i);
2887 res = sq_item_inuse (sort_queue, instance->
my_aru + i);
2898 res = sq_item_miss_count (sort_queue, instance->
my_aru + i);
2908 if (instance->
my_aru + i == rtr_list[j].
seq) {
2938 static void timer_function_token_retransmit_timeout (
void *data)
2948 token_retransmit (instance);
2949 reset_token_retransmit_timeout (instance);
2954 static void timer_function_token_hold_retransmit_timeout (
void *data)
2965 token_retransmit (instance);
2970 static void timer_function_merge_detect_timeout(
void *data)
2979 memb_merge_detect_transmit (instance);
2992 static int token_send (
2994 struct orf_token *orf_token,
2998 unsigned int orf_token_size;
3000 orf_token_size =
sizeof (
struct orf_token) +
3001 (orf_token->rtr_list_entries *
sizeof (
struct rtr_item));
3008 if (forward_token == 0) {
3045 sizeof (
struct token_hold_cancel));
3052 struct orf_token orf_token;
3084 res = token_send (instance, &orf_token, 1);
3089 static void memb_state_commit_token_update (
3094 unsigned int high_aru;
3130 if (sq_lt_compare (high_aru, memb_list[i].
aru)) {
3131 high_aru = memb_list[i].
aru;
3141 if (sq_lt_compare (memb_list[i].
aru, high_aru)) {
3156 static void memb_state_commit_token_target_set (
3173 static int memb_state_commit_token_send_recovery (
3175 struct memb_commit_token *commit_token)
3177 unsigned int commit_token_size;
3181 commit_token_size =
sizeof (
struct memb_commit_token) +
3182 ((sizeof (struct srp_addr) +
3183 sizeof (struct memb_commit_token_memb_entry)) * commit_token->
addr_entries);
3199 reset_token_retransmit_timeout (instance);
3203 static int memb_state_commit_token_send (
3206 unsigned int commit_token_size;
3210 commit_token_size =
sizeof (
struct memb_commit_token) +
3211 ((sizeof (struct srp_addr) +
3228 reset_token_retransmit_timeout (instance);
3235 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3236 int token_memb_entries = 0;
3240 memb_set_subtract (token_memb, &token_memb_entries,
3248 lowest_addr = &token_memb[0].
addr[0];
3249 for (i = 1; i < token_memb_entries; i++) {
3257 static int srp_addr_compare (
const void *a,
const void *b)
3265 static void memb_state_commit_token_create (
3268 struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3271 int token_memb_entries = 0;
3274 "Creating commit token because I am the rep.");
3276 memb_set_subtract (token_memb, &token_memb_entries,
3280 memset (instance->
commit_token, 0, sizeof (
struct memb_commit_token));
3295 qsort (token_memb, token_memb_entries,
sizeof (
struct srp_addr),
3304 memcpy (addr, token_memb,
3305 token_memb_entries *
sizeof (
struct srp_addr));
3306 memset (memb_list, 0,
3312 char memb_join_data[40000];
3315 unsigned int addr_idx;
3324 msg_len =
sizeof(
struct memb_join) +
3325 ((instance->my_proc_list_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3327 if (msg_len >
sizeof(memb_join_data)) {
3328 log_printf (instance->totemsrp_log_level_error,
3329 "memb_join_message too long. Ignoring message.");
3334 memb_join->
ring_seq = instance->my_ring_id.seq;
3337 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3343 addr = (
char *)memb_join;
3344 addr_idx =
sizeof (
struct memb_join);
3345 memcpy (&addr[addr_idx],
3346 instance->my_proc_list,
3347 instance->my_proc_list_entries *
3350 instance->my_proc_list_entries *
3352 memcpy (&addr[addr_idx],
3353 instance->my_failed_list,
3354 instance->my_failed_list_entries *
3357 instance->my_failed_list_entries *
3360 if (instance->totem_config->send_join_timeout) {
3361 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3364 instance->stats.memb_join_tx++;
3367 instance->totemrrp_context,
3374 char memb_join_data[40000];
3377 unsigned int addr_idx;
3378 int active_memb_entries;
3379 struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3383 "sending join/leave message");
3390 &instance->
my_id, 1,
3393 memb_set_subtract (active_memb, &active_memb_entries,
3395 &instance->
my_id, 1);
3397 msg_len =
sizeof(
struct memb_join) +
3398 ((active_memb_entries + instance->my_failed_list_entries) *
sizeof(
struct srp_addr));
3400 if (msg_len >
sizeof(memb_join_data)) {
3401 log_printf (instance->totemsrp_log_level_error,
3402 "memb_leave message too long. Ignoring message.");
3412 memb_join->
ring_seq = instance->my_ring_id.seq;
3415 srp_addr_copy (&memb_join->
system_from, &instance->my_id);
3423 addr = (
char *)memb_join;
3424 addr_idx =
sizeof (
struct memb_join);
3425 memcpy (&addr[addr_idx],
3427 active_memb_entries *
3430 active_memb_entries *
3432 memcpy (&addr[addr_idx],
3433 instance->my_failed_list,
3434 instance->my_failed_list_entries *
3437 instance->my_failed_list_entries *
3441 if (instance->totem_config->send_join_timeout) {
3442 usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3444 instance->stats.memb_join_tx++;
3447 instance->totemrrp_context,
3468 sizeof (
struct memb_merge_detect));
3471 static void memb_ring_id_set (
3490 token_hold_cancel_send (instance);
3493 if (callback_handle == 0) {
3496 *handle_out = (
void *)callback_handle;
3497 list_init (&callback_handle->
list);
3499 callback_handle->
data = (
void *) data;
3501 callback_handle->
delete =
delete;
3520 list_del (&h->
list);
3527 static void token_callbacks_execute (
3533 struct list_head *callback_listhead = 0;
3549 for (list = callback_listhead->
next; list != callback_listhead;
3552 token_callback_instance =
list_entry (list,
struct token_callback_instance, list);
3554 list_next = list->
next;
3555 del = token_callback_instance->
delete;
3562 token_callback_instance->
data);
3566 if (res == -1 && del == 1) {
3567 list_add (list, callback_listhead);
3569 free (token_callback_instance);
3593 if (queue_use != NULL) {
3594 backlog = cs_queue_used (queue_use);
3601 static int fcc_calculate (
3603 struct orf_token *token)
3605 unsigned int transmits_allowed;
3606 unsigned int backlog_calc;
3614 instance->
my_cbl = backlog_get (instance);
3623 if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3624 transmits_allowed = backlog_calc;
3628 return (transmits_allowed);
3634 static void fcc_rtr_limit (
3636 struct orf_token *token,
3637 unsigned int *transmits_allowed)
3641 assert (check >= 0);
3648 *transmits_allowed = 0;
3652 static void fcc_token_update (
3654 struct orf_token *token,
3655 unsigned int msgs_transmitted)
3657 token->
fcc += msgs_transmitted - instance->
my_trc;
3659 instance->
my_trc = msgs_transmitted;
3666 static int check_totemip_sanity(
3669 int endian_conversion_needed)
3674 if (endian_conversion_needed) {
3678 if (family != AF_INET && family != AF_INET6) {
3680 "Received message corrupted... ignoring.");
3688 static int check_srpaddr_sanity(
3691 int endian_conversion_needed)
3699 for (i = 0; i < addr->
no_addrs; i++) {
3701 if (check_totemip_sanity(instance, &addr->
addr[i], endian_conversion_needed) == -1) {
3710 static int check_orf_token_sanity(
3714 int endian_conversion_needed)
3717 const struct orf_token *token = (
const struct orf_token *)msg;
3718 size_t required_len;
3721 if (msg_len <
sizeof(
struct orf_token)) {
3723 "Received orf_token message is too short... ignoring.");
3728 if (check_totemip_sanity(instance, &token->
ring_id.
rep, endian_conversion_needed) == -1) {
3732 if (endian_conversion_needed) {
3738 required_len =
sizeof(
struct orf_token) + rtr_entries *
sizeof(
struct rtr_item);
3739 if (msg_len < required_len) {
3741 "Received orf_token message is too short... ignoring.");
3746 for (i = 0; i < rtr_entries; i++) {
3748 endian_conversion_needed) == -1) {
3756 static int check_mcast_sanity(
3760 int endian_conversion_needed)
3762 const struct mcast *mcast_msg = (
const struct mcast *)msg;
3764 if (msg_len <
sizeof(
struct mcast)) {
3766 "Received mcast message is too short... ignoring.");
3771 if ((check_totemip_sanity(instance, &mcast_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3772 (check_srpaddr_sanity(instance, &mcast_msg->
system_from, endian_conversion_needed) == -1)) {
3779 static int check_memb_merge_detect_sanity(
3783 int endian_conversion_needed)
3789 "Received memb_merge_detect message is too short... ignoring.");
3794 if ((check_totemip_sanity(instance, &mmd_msg->
ring_id.
rep, endian_conversion_needed) == -1) ||
3795 (check_srpaddr_sanity(instance, &mmd_msg->
system_from, endian_conversion_needed) == -1)) {
3802 static int check_memb_join_sanity(
3806 int endian_conversion_needed)
3811 size_t required_len;
3813 const struct srp_addr *failed_list;
3816 if (msg_len <
sizeof(
struct memb_join)) {
3818 "Received memb_join message is too short... ignoring.");
3823 if (check_srpaddr_sanity(instance, &mj_msg->
system_from, endian_conversion_needed) == -1) {
3830 if (endian_conversion_needed) {
3831 proc_list_entries =
swab32(proc_list_entries);
3832 failed_list_entries =
swab32(failed_list_entries);
3835 required_len =
sizeof(
struct memb_join) + ((proc_list_entries + failed_list_entries) *
sizeof(
struct srp_addr));
3836 if (msg_len < required_len) {
3838 "Received memb_join message is too short... ignoring.");
3844 failed_list = proc_list + proc_list_entries;
3847 if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3853 if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3861 static int check_memb_commit_token_sanity(
3865 int endian_conversion_needed)
3867 const struct memb_commit_token *mct_msg = (
const struct memb_commit_token *)msg;
3871 size_t required_len;
3874 if (msg_len <
sizeof(
struct memb_commit_token)) {
3876 "Received memb_commit_token message is too short... ignoring.");
3881 if (check_totemip_sanity(instance, &mct_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3886 if (endian_conversion_needed) {
3887 addr_entries =
swab32(addr_entries);
3890 required_len =
sizeof(
struct memb_commit_token) +
3891 (addr_entries * (
sizeof(
struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3892 if (msg_len < required_len) {
3894 "Received memb_commit_token message is too short... ignoring.");
3900 memb_list = (
const struct memb_commit_token_memb_entry *)(addr +
addr_entries);
3903 if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3907 if (memb_list[i].ring_id.
rep.
family != 0) {
3908 if (check_totemip_sanity(instance, &memb_list[i].ring_id.
rep,
3909 endian_conversion_needed) == -1) {
3918 static int check_token_hold_cancel_sanity(
3922 int endian_conversion_needed)
3928 "Received token_hold_cancel message is too short... ignoring.");
3933 if (check_totemip_sanity(instance, &thc_msg->
ring_id.
rep, endian_conversion_needed) == -1) {
3948 static int message_handler_orf_token (
3952 int endian_conversion_needed)
3954 char token_storage[1500];
3955 char token_convert[1500];
3956 struct orf_token *token = NULL;
3958 unsigned int transmits_allowed;
3959 unsigned int mcasted_retransmit;
3960 unsigned int mcasted_regular;
3961 unsigned int last_aru;
3964 unsigned long long tv_current;
3965 unsigned long long tv_diff;
3967 tv_current = qb_util_nano_current_get ();
3968 tv_diff = tv_current -
tv_old;
3969 tv_old = tv_current;
3972 "Time since last token %0.4f ms", ((
float)tv_diff) / 1000000.0);
3975 if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3982 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE 3983 if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3988 if (endian_conversion_needed) {
3989 orf_token_endian_convert ((
struct orf_token *)msg,
3990 (
struct orf_token *)token_convert);
3991 msg = (
struct orf_token *)token_convert;
3998 token = (
struct orf_token *)token_storage;
3999 memcpy (token, msg,
sizeof (
struct orf_token));
4000 memcpy (&token->
rtr_list[0], (
char *)msg + sizeof (
struct orf_token),
4008 start_merge_detect_timeout (instance);
4011 cancel_merge_detect_timeout (instance);
4012 cancel_token_hold_retransmit_timeout (instance);
4018 #ifdef TEST_RECOVERY_MSG_COUNT 4059 messages_free (instance, token->
aru);
4078 reset_heartbeat_timeout(instance);
4081 cancel_heartbeat_timeout(instance);
4096 transmits_allowed = fcc_calculate (instance, token);
4097 mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4105 fcc_rtr_limit (instance, token, &transmits_allowed);
4106 mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4113 fcc_token_update (instance, token, mcasted_retransmit +
4116 if (sq_lt_compare (instance->
my_aru, token->
aru) ||
4121 if (token->
aru == token->
seq) {
4127 if (token->
aru == last_aru && token->
aru_addr != 0) {
4142 "FAILED TO RECEIVE");
4146 memb_set_merge (&instance->
my_id, 1,
4173 "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4186 "install seq %x aru %x high seq received %x",
4204 "retrans flag count %x token aru %x install seq %x aru %x %x",
4208 memb_state_operational_enter (instance);
4215 token_send (instance, token, forward_token);
4218 tv_current = qb_util_nano_current_get ();
4219 tv_diff = tv_current -
tv_old;
4220 tv_old = tv_current;
4223 ((
float)tv_diff) / 1000000.0);
4226 messages_deliver_to_app (instance, 0,
4234 reset_token_timeout (instance);
4235 reset_token_retransmit_timeout (instance);
4239 start_token_hold_retransmit_timeout (instance);
4249 reset_heartbeat_timeout(instance);
4252 cancel_heartbeat_timeout(instance);
4258 static void messages_deliver_to_app (
4261 unsigned int end_point)
4266 struct mcast *mcast_in;
4267 struct mcast mcast_header;
4268 unsigned int range = 0;
4269 int endian_conversion_required;
4270 unsigned int my_high_delivered_stored = 0;
4286 for (i = 1; i <= range; i++) {
4294 my_high_delivered_stored + i);
4300 my_high_delivered_stored + i, &ptr);
4304 if (res != 0 && skip == 0) {
4315 sort_queue_item_p = ptr;
4317 mcast_in = sort_queue_item_p->
mcast;
4318 assert (mcast_in != (
struct mcast *)0xdeadbeef);
4320 endian_conversion_required = 0;
4322 endian_conversion_required = 1;
4323 mcast_endian_convert (mcast_in, &mcast_header);
4325 memcpy (&mcast_header, mcast_in,
sizeof (
struct mcast));
4346 "Delivering MCAST message with seq %x to pending delivery queue",
4354 ((
char *)sort_queue_item_p->
mcast) + sizeof (
struct mcast),
4356 endian_conversion_required);
4363 static int message_handler_mcast (
4367 int endian_conversion_needed)
4370 struct sq *sort_queue;
4371 struct mcast mcast_header;
4373 if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4377 if (endian_conversion_needed) {
4378 mcast_endian_convert (msg, &mcast_header);
4380 memcpy (&mcast_header, msg,
sizeof (
struct mcast));
4391 #ifdef TEST_DROP_MCAST_PERCENTAGE 4392 if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4412 if (!memb_set_subset (
4439 "Received ringid(%s:%lld) seq %x",
4449 sq_in_range (sort_queue, mcast_header.
seq) &&
4450 sq_item_inuse (sort_queue, mcast_header.
seq) == 0) {
4456 sort_queue_item.
mcast = totemsrp_buffer_alloc (instance);
4457 if (sort_queue_item.
mcast == NULL) {
4460 memcpy (sort_queue_item.
mcast, msg, msg_len);
4461 sort_queue_item.
msg_len = msg_len;
4464 mcast_header.
seq)) {
4468 sq_item_add (sort_queue, &sort_queue_item, mcast_header.
seq);
4471 update_aru (instance);
4480 static int message_handler_memb_merge_detect (
4484 int endian_conversion_needed)
4488 if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4492 if (endian_conversion_needed) {
4493 memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4495 memcpy (&memb_merge_detect, msg,
4496 sizeof (
struct memb_merge_detect));
4513 memb_set_merge (&memb_merge_detect.
system_from, 1,
4519 if (!memb_set_subset (
4525 memb_set_merge (&memb_merge_detect.
system_from, 1,
4543 static void memb_join_process (
4549 int gather_entered = 0;
4550 int fail_minus_memb_entries = 0;
4551 struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4567 "Discarding LEAVE message during flush, nodeid=%u",
4574 "Discarding JOIN message during flush, nodeid=%d", memb_join->
header.
nodeid);
4589 if (memb_set_equal (proc_list,
4594 memb_set_equal (failed_list,
4599 memb_consensus_set (instance, &memb_join->
system_from);
4601 if (memb_consensus_agreed (instance) && instance->
failed_to_recv == 1) {
4608 memb_state_commit_token_create (instance);
4610 memb_state_commit_enter (instance);
4613 if (memb_consensus_agreed (instance) &&
4614 memb_lowest_in_config (instance)) {
4616 memb_state_commit_token_create (instance);
4618 memb_state_commit_enter (instance);
4623 if (memb_set_subset (proc_list,
4628 memb_set_subset (failed_list,
4640 memb_set_merge (proc_list,
4644 if (memb_set_subset (
4645 &instance->
my_id, 1,
4652 if (memb_set_subset (
4657 if (memb_set_subset (
4662 memb_set_merge (failed_list,
4666 memb_set_subtract (fail_minus_memb,
4667 &fail_minus_memb_entries,
4673 memb_set_merge (fail_minus_memb,
4674 fail_minus_memb_entries,
4685 if (gather_entered == 0 &&
4692 static void memb_join_endian_convert (
const struct memb_join *in,
struct memb_join *out)
4714 srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4717 srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4721 static void memb_commit_token_endian_convert (
const struct memb_commit_token *in,
struct memb_commit_token *out)
4742 srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4747 if (in_memb_list[i].ring_id.
rep.
family != 0) {
4760 static void orf_token_endian_convert (
const struct orf_token *in,
struct orf_token *out)
4784 static void mcast_endian_convert (
const struct mcast *in,
struct mcast *out)
4800 static void memb_merge_detect_endian_convert (
4812 static int ignore_join_under_operational (
4814 const struct memb_join *memb_join)
4824 if (memb_set_subset (&instance->
my_id, 1,
4842 static int message_handler_memb_join (
4846 int endian_conversion_needed)
4848 const struct memb_join *memb_join;
4849 struct memb_join *memb_join_convert = alloca (msg_len);
4851 if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4855 if (endian_conversion_needed) {
4856 memb_join = memb_join_convert;
4857 memb_join_endian_convert (msg, memb_join_convert);
4867 if (pause_flush (instance)) {
4876 if (!ignore_join_under_operational (instance, memb_join)) {
4877 memb_join_process (instance, memb_join);
4882 memb_join_process (instance, memb_join);
4893 memb_join_process (instance, memb_join);
4906 memb_join_process (instance, memb_join);
4907 memb_recovery_state_token_loss (instance);
4915 static int message_handler_memb_commit_token (
4919 int endian_conversion_needed)
4921 struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4922 struct memb_commit_token *memb_commit_token;
4923 struct srp_addr sub[PROCESSOR_COUNT_MAX];
4929 "got commit token");
4931 if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4935 if (endian_conversion_needed) {
4936 memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4938 memcpy (memb_commit_token_convert, msg, msg_len);
4940 memb_commit_token = memb_commit_token_convert;
4943 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4944 if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4954 memb_set_subtract (sub, &sub_entries,
4958 if (memb_set_equal (addr,
4964 memcpy (instance->
commit_token, memb_commit_token, msg_len);
4965 memb_state_commit_enter (instance);
4979 memb_state_recovery_enter (instance, memb_commit_token);
4994 "Sending initial ORF token");
4997 orf_token_send_initial (instance);
4998 reset_token_timeout (instance);
4999 reset_token_retransmit_timeout (instance);
5006 static int message_handler_token_hold_cancel (
5010 int endian_conversion_needed)
5014 if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5023 timer_function_token_retransmit_timeout (instance);
5032 unsigned int msg_len)
5037 if (msg_len <
sizeof (
struct message_header)) {
5039 "Received message is too short... ignoring %u.",
5040 (
unsigned int)msg_len);
5045 switch (message_header->
type) {
5066 printf (
"wrong message type\n");
5083 unsigned int iface_no)
5099 "Created or loaded sequence id %llx.%s for this ring.",
5126 void (*totem_service_ready) (
void))
void(* totemsrp_service_ready_fn)(void)
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
void(*) enum memb_stat memb_state)
int totemrrp_iface_check(void *rrp_context)
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct srp_addr system_from
struct memb_ring_id ring_id
uint32_t waiting_trans_ack
struct srp_addr system_from
struct memb_ring_id ring_id
int totemsrp_log_level_debug
struct memb_ring_id my_ring_id
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
int my_leave_memb_entries
struct message_header header
unsigned int old_ring_state_high_seq_received
unsigned int proc_list_entries
struct totem_interface * interfaces
unsigned int interface_count
int totemsrp_my_family_get(void *srp_context)
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
The totem_ip_address struct.
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
const char * totemip_print(const struct totem_ip_address *addr)
int totemsrp_log_level_error
#define LEAVE_DUMMY_NODEID
struct memb_ring_id ring_id
qb_loop_timer_handle timer_heartbeat_timeout
unsigned int failed_list_entries
unsigned char end_of_memb_join[0]
unsigned long long int tv_old
#define SEQNO_START_TOKEN
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_hold_timeout
struct memb_ring_id ring_id
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
int totemip_compare(const void *a, const void *b)
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
void * token_sent_event_handle
struct srp_addr system_from
totem_configuration_type
The totem_configuration_type enum.
int totemsrp_log_level_notice
unsigned int proc_list_entries
unsigned int totemsrp_my_nodeid_get(void *srp_context)
char rrp_mode[TOTEM_RRP_MODE_BYTES]
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
int totemsrp_log_level_warning
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
struct message_header header
uint64_t memb_merge_detect_rx
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
struct cs_queue new_message_queue_trans
struct message_header header
unsigned char end_of_commit_token[0]
char commit_token_storage[40000]
unsigned int rrp_problem_count_timeout
struct list_head token_callback_sent_listhead
struct cs_queue new_message_queue
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
uint64_t gather_token_lost
int totemsrp_log_level_trace
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
struct memb_ring_id my_old_ring_id
void * totemrrp_buffer_alloc(void *rrp_context)
unsigned int downcheck_timeout
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
uint64_t memb_commit_token_tx
int my_deliver_memb_entries
unsigned int max_network_delay
unsigned int heartbeat_failures_allowed
#define TOTEM_TOKEN_STATS_MAX
#define swab64(x)
The swab64 macro.
struct message_item __attribute__
unsigned long long token_ring_id_seq
struct totem_ip_address mcast_address
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
unsigned int send_join_timeout
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
unsigned int rrp_problem_count_threshold
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
uint64_t operational_entered
void(*) in log_level_security)
unsigned long long ring_seq
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
uint64_t operational_token_lost
unsigned int received_flg
uint64_t consensus_timeouts
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
struct message_handlers totemsrp_message_handlers
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
uint64_t recovery_token_lost
unsigned char end_of_memb_join[0]
unsigned int token_retransmits_before_loss_const
struct message_header header
int totemrrp_finalize(void *rrp_context)
struct list_head token_callback_received_listhead
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
struct rtr_item rtr_list[0]
int totemsrp_ring_reenable(void *srp_context)
struct memb_ring_id ring_id
unsigned int seqno_unchanged_const
uint64_t commit_token_lost
unsigned int miss_count_const
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
uint64_t token_hold_cancel_rx
unsigned int join_timeout
uint32_t originated_orf_token
int totemrrp_send_flush(void *rrp_context)
struct message_header header
struct totem_ip_address mcast_addr
#define MESSAGE_QUEUE_MAX
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
unsigned int received_flg
struct totem_ip_address rep
unsigned int last_released
int orf_token_retransmit_size
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
unsigned int rrp_autorecovery_check_timeout
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int fail_to_recv_const
void * token_recv_event_handle
struct totem_ip_address boundto
unsigned int my_high_seq_received
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
qb_loop_t * totemsrp_poll_handle
qb_loop_timer_handle timer_pause_timeout
qb_loop_timer_handle timer_merge_detect_timeout
int my_merge_detect_timeout_outstanding
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
int totemsrp_log_level_security
qb_loop_timer_handle timer_orf_token_retransmit_timeout
struct totem_config * totem_config
int(* callback_fn)(enum totem_callback_token_type type, const void *)
#define swab32(x)
The swab32 macro.
qb_loop_timer_handle timer_orf_token_timeout
uint32_t continuous_gather
void totemsrp_threaded_mode_enable(void *context)
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
int totemrrp_recv_flush(void *rrp_context)
uint32_t orf_token_discard
int my_failed_list_entries
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
uint64_t token_hold_cancel_tx
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
unsigned int token_timeout
unsigned int high_delivered
unsigned int consensus_timeout
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
#define PROCESSOR_COUNT_MAX
unsigned short endian_detector
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
struct message_header header
struct sq regular_sort_queue
#define swab16(x)
The swab16 macro.
void totemsrp_finalize(void *srp_context)
#define QUEUE_RTR_ITEMS_SIZE_MAX
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
struct cs_queue retrans_message_queue
const char * gather_state_from_desc[]
qb_loop_timer_handle memb_timer_state_gather_join_timeout
int my_trans_memb_entries
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
uint64_t memb_merge_detect_tx
unsigned int high_delivered
struct rtr_item rtr_list[0]
int consensus_list_entries
unsigned int rrp_problem_count_mcast_threshold
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned char end_of_commit_token[0]
uint32_t threaded_mode_enabled
enum totem_callback_token_type callback_type
int totemrrp_mcast_recv_empty(void *rrp_context)
#define list_entry(ptr, type, member)
unsigned long long ring_seq
struct totem_logging_configuration totem_logging_configuration
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
struct memb_ring_id ring_id
#define log_printf(level, format, args...)
void totemsrp_trans_ack(void *context)
unsigned int max_messages
uint64_t recovery_entered
qb_loop_timer_handle memb_timer_state_commit_timeout
struct memb_commit_token * commit_token
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
struct srp_addr system_from
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
unsigned int merge_timeout
unsigned int use_heartbeat
struct message_header header
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
unsigned int token_retransmit_timeout
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
#define RETRANSMIT_ENTRIES_MAX
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
unsigned int my_token_seq
struct memb_ring_id ring_id
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
struct totem_ip_address addr[INTERFACE_MAX]
unsigned int rrp_token_expired_timeout
struct memb_ring_id ring_id
unsigned int my_install_seq
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
unsigned int failed_list_entries
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
struct sq recovery_sort_queue
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
totem_callback_token_type
The totem_callback_token_type enum.
unsigned int my_high_ring_delivered