41 #include <sys/types.h> 43 #include <sys/socket.h> 46 #include <sys/ioctl.h> 47 #include <sys/param.h> 48 #include <netinet/in.h> 49 #include <arpa/inet.h> 64 #include <qb/qbdefs.h> 65 #include <qb/qbloop.h> 66 #define LOGSYS_UTILS_ONLY 1 73 #define MSG_NOSIGNAL 0 76 #define MCAST_SOCKET_BUFFER_SIZE (TRANSMITS_ALLOWED * FRAME_SIZE_MAX) 77 #define NETIF_STATE_REPORT_UP 1 78 #define NETIF_STATE_REPORT_DOWN 2 80 #define BIND_STATE_UNBOUND 0 81 #define BIND_STATE_REGULAR 1 82 #define BIND_STATE_LOOPBACK 2 99 int local_mcast_loop[2];
113 void (*totemudp_deliver_fn) (
116 unsigned int msg_len,
119 void (*totemudp_iface_change_fn) (
122 unsigned int ring_no);
124 void (*totemudp_target_set_completed) (
void *context);
141 void (*totemudp_log_printf) (
144 const char *
function,
152 struct qb_list_head member_list;
158 struct iovec totemudp_iov_recv;
160 struct iovec totemudp_iov_recv_flush;
176 struct timeval stats_tv_start;
197 unsigned int msg_len;
201 static int totemudp_build_sockets (
231 #define log_printf(level, format, args...) \ 233 instance->totemudp_log_printf ( \ 234 level, instance->totemudp_subsys_id, \ 235 __FUNCTION__, __FILE__, __LINE__, \ 236 (const char *)format, ##args); \ 239 #define LOGSYS_PERROR(err_num, level, fmt, args...) \ 241 char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \ 242 const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \ 243 instance->totemudp_log_printf ( \ 244 level, instance->totemudp_subsys_id, \ 245 __FUNCTION__, __FILE__, __LINE__, \ 246 fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \ 251 const char *cipher_type,
252 const char *hash_type)
259 static inline void ucast_sendmsg (
263 unsigned int msg_len)
265 struct msghdr msg_ucast;
267 struct sockaddr_storage sockaddr;
271 iovec.iov_base = (
void*)msg;
272 iovec.iov_len = msg_len;
277 memset(&msg_ucast, 0,
sizeof(msg_ucast));
280 msg_ucast.msg_name = &sockaddr;
281 msg_ucast.msg_namelen = addrlen;
282 msg_ucast.msg_iov = (
void *)&iovec;
283 msg_ucast.msg_iovlen = 1;
284 #ifdef HAVE_MSGHDR_CONTROL 285 msg_ucast.msg_control = 0;
287 #ifdef HAVE_MSGHDR_CONTROLLEN 288 msg_ucast.msg_controllen = 0;
290 #ifdef HAVE_MSGHDR_FLAGS 291 msg_ucast.msg_flags = 0;
293 #ifdef HAVE_MSGHDR_ACCRIGHTS 294 msg_ucast.msg_accrights = NULL;
296 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 297 msg_ucast.msg_accrightslen = 0;
309 "sendmsg(ucast) failed (non-critical)");
313 static inline void mcast_sendmsg (
316 unsigned int msg_len)
318 struct msghdr msg_mcast;
321 struct sockaddr_storage sockaddr;
324 iovec.iov_base = (
void *)msg;
325 iovec.iov_len = msg_len;
332 memset(&msg_mcast, 0,
sizeof(msg_mcast));
333 msg_mcast.msg_name = &sockaddr;
334 msg_mcast.msg_namelen = addrlen;
335 msg_mcast.msg_iov = (
void *)&iovec;
336 msg_mcast.msg_iovlen = 1;
337 #ifdef HAVE_MSGHDR_CONTROL 338 msg_mcast.msg_control = 0;
340 #ifdef HAVE_MSGHDR_CONTROLLEN 341 msg_mcast.msg_controllen = 0;
343 #ifdef HAVE_MSGHDR_FLAGS 344 msg_mcast.msg_flags = 0;
346 #ifdef HAVE_MSGHDR_ACCRIGHTS 347 msg_mcast.msg_accrights = NULL;
349 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 350 msg_mcast.msg_accrightslen = 0;
361 "sendmsg(mcast) failed (non-critical)");
371 msg_mcast.msg_name = NULL;
372 msg_mcast.msg_namelen = 0;
378 "sendmsg(local mcast loop) failed (non-critical)");
416 static int net_deliver_fn (
422 struct msghdr msg_recv;
424 struct sockaddr_storage system_from;
426 int truncated_packet;
438 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
439 msg_recv.msg_iov = iovec;
440 msg_recv.msg_iovlen = 1;
441 #ifdef HAVE_MSGHDR_CONTROL 442 msg_recv.msg_control = 0;
444 #ifdef HAVE_MSGHDR_CONTROLLEN 445 msg_recv.msg_controllen = 0;
447 #ifdef HAVE_MSGHDR_FLAGS 448 msg_recv.msg_flags = 0;
450 #ifdef HAVE_MSGHDR_ACCRIGHTS 451 msg_recv.msg_accrights = NULL;
453 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 454 msg_recv.msg_accrightslen = 0;
457 bytes_received = recvmsg (fd, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
458 if (bytes_received == -1) {
464 truncated_packet = 0;
466 #ifdef HAVE_MSGHDR_FLAGS 467 if (msg_recv.msg_flags & MSG_TRUNC) {
468 truncated_packet = 1;
476 truncated_packet = 1;
480 if (truncated_packet) {
482 "Received too big message. This may be because something bad is happening" 483 "on the network (attack?), or you tried join more nodes than corosync is" 484 "compiled with (%u) or bug in the code (bad estimation of " 489 iovec->iov_len = bytes_received;
504 static int netif_determine (
514 interface_up, interface_num,
526 static void timer_function_netif_check_timeout (
537 netif_determine (instance,
540 &interface_up, &interface_num);
546 interface_up == 0) ||
550 interface_up == 1)) {
556 timer_function_netif_check_timeout,
557 &instance->timer_netif_check_timeout);
585 if (interface_up == 0) {
590 bind_address = &localhost;
599 timer_function_netif_check_timeout,
600 &instance->timer_netif_check_timeout);
611 (void)totemudp_build_sockets (instance,
621 POLLIN, instance, net_deliver_fn);
627 POLLIN, instance, net_deliver_fn);
633 POLLIN, instance, net_deliver_fn);
643 "The network interface [%s] is now up.",
656 timer_function_netif_check_timeout,
657 &instance->timer_netif_check_timeout);
663 "The network interface is down.");
673 static void totemudp_traffic_control_set(
struct totemudp_instance *instance,
int sock)
678 if (setsockopt(sock, SOL_SOCKET, SO_PRIORITY, &prio,
sizeof(
int))) {
684 static int totemudp_build_sockets_ip (
692 struct sockaddr_storage sockaddr;
693 struct ipv6_mreq mreq6;
695 struct sockaddr_storage mcast_ss, boundto_ss;
696 struct sockaddr_in6 *mcast_sin6 = (
struct sockaddr_in6 *)&mcast_ss;
697 struct sockaddr_in *mcast_sin = (
struct sockaddr_in *)&mcast_ss;
698 struct sockaddr_in *boundto_sin = (
struct sockaddr_in *)&boundto_ss;
699 unsigned int sendbuf_size;
700 unsigned int recvbuf_size;
701 unsigned int optlen =
sizeof (sendbuf_size);
702 unsigned int retries;
720 res = fcntl (sockets->
mcast_recv, F_SETFL, O_NONBLOCK);
723 "Could not set non-blocking operation on multicast socket");
731 if ( setsockopt(sockets->
mcast_recv, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
733 "setsockopt(SO_REUSEADDR) failed");
746 for (i = 0; i < 2; i++) {
751 "Could not set non-blocking operation on multicast socket");
769 res = fcntl (sockets->
mcast_send, F_SETFL, O_NONBLOCK);
772 "Could not set non-blocking operation on multicast socket");
780 if ( setsockopt(sockets->
mcast_send, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
782 "setsockopt(SO_REUSEADDR) failed");
787 &sockaddr, &addrlen);
791 res = bind (sockets->
mcast_send, (
struct sockaddr *)&sockaddr, addrlen);
796 "Unable to bind the socket to send multicast packets");
813 sockets->
token = socket (bindnet_address->
family, SOCK_DGRAM, 0);
814 if (sockets->
token == -1) {
821 res = fcntl (sockets->
token, F_SETFL, O_NONBLOCK);
824 "Could not set non-blocking operation on token socket");
832 if ( setsockopt(sockets->
token, SOL_SOCKET, SO_REUSEADDR, (
char *)&flag, sizeof (flag)) < 0) {
834 "setsockopt(SO_REUSEADDR) failed");
846 res = bind (sockets->
token, (
struct sockaddr *)&sockaddr, addrlen);
851 "Unable to bind UDP unicast socket");
870 res = setsockopt (sockets->
mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
873 "Unable to set SO_RCVBUF size on UDP mcast socket");
876 res = setsockopt (sockets->
mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
879 "Unable to set SO_SNDBUF size on UDP mcast socket");
882 res = setsockopt (sockets->
local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, optlen);
885 "Unable to set SO_RCVBUF size on UDP local mcast loop socket");
888 res = setsockopt (sockets->
local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, optlen);
891 "Unable to set SO_SNDBUF size on UDP local mcast loop socket");
895 res = getsockopt (sockets->
mcast_recv, SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
898 "Receive multicast socket recv buffer size (%d bytes).", recvbuf_size);
901 res = getsockopt (sockets->
mcast_send, SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
904 "Transmit multicast socket send buffer size (%d bytes).", sendbuf_size);
907 res = getsockopt (sockets->
local_mcast_loop[0], SOL_SOCKET, SO_RCVBUF, &recvbuf_size, &optlen);
910 "Local receive multicast loop socket recv buffer size (%d bytes).", recvbuf_size);
913 res = getsockopt (sockets->
local_mcast_loop[1], SOL_SOCKET, SO_SNDBUF, &sendbuf_size, &optlen);
916 "Local transmit multicast loop socket send buffer size (%d bytes).", sendbuf_size);
927 unsigned int broadcast = 1;
929 if ((setsockopt(sockets->
mcast_recv, SOL_SOCKET,
930 SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
932 "setting broadcast option failed");
935 if ((setsockopt(sockets->
mcast_send, SOL_SOCKET,
936 SO_BROADCAST, &broadcast, sizeof (broadcast))) == -1) {
938 "setting broadcast option failed");
942 switch (bindnet_address->
family) {
944 memset(&mreq, 0,
sizeof(mreq));
945 mreq.imr_multiaddr.s_addr = mcast_sin->sin_addr.s_addr;
946 mreq.imr_interface.s_addr = boundto_sin->sin_addr.s_addr;
947 res = setsockopt (sockets->
mcast_recv, IPPROTO_IP, IP_ADD_MEMBERSHIP,
948 &mreq, sizeof (mreq));
951 "join ipv4 multicast group failed");
956 memset(&mreq6, 0,
sizeof(mreq6));
957 memcpy(&mreq6.ipv6mr_multiaddr, &mcast_sin6->sin6_addr,
sizeof(
struct in6_addr));
958 mreq6.ipv6mr_interface = interface_num;
960 res = setsockopt (sockets->
mcast_recv, IPPROTO_IPV6, IPV6_JOIN_GROUP,
961 &mreq6, sizeof (mreq6));
964 "join ipv6 multicast group failed");
976 switch ( bindnet_address->
family ) {
979 res = setsockopt (sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_LOOP,
980 &sflag, sizeof (sflag));
983 res = setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
984 &flag, sizeof (flag));
988 "Unable to turn off multicast loopback");
996 if (bindnet_address->
family == AF_INET6) {
997 res = setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
998 &flag, sizeof (flag));
1001 "set mcast v6 TTL failed");
1006 res = setsockopt(sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_TTL,
1007 &sflag,
sizeof(sflag));
1010 "set mcast v4 TTL failed");
1018 switch ( bindnet_address->
family ) {
1020 if (setsockopt (sockets->
mcast_send, IPPROTO_IP, IP_MULTICAST_IF,
1021 &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1023 "cannot select interface for multicast packets (send)");
1026 if (setsockopt (sockets->
mcast_recv, IPPROTO_IP, IP_MULTICAST_IF,
1027 &boundto_sin->sin_addr, sizeof (boundto_sin->sin_addr)) < 0) {
1029 "cannot select interface for multicast packets (recv)");
1034 if (setsockopt (sockets->
mcast_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1035 &interface_num, sizeof (interface_num)) < 0) {
1037 "cannot select interface for multicast packets (send v6)");
1040 if (setsockopt (sockets->
mcast_recv, IPPROTO_IPV6, IPV6_MULTICAST_IF,
1041 &interface_num, sizeof (interface_num)) < 0) {
1043 "cannot select interface for multicast packets (recv v6)");
1060 res = bind (sockets->
mcast_recv, (
struct sockaddr *)&sockaddr, addrlen);
1065 "Unable to bind the socket to receive multicast packets");
1082 static int totemudp_build_sockets (
1096 res = netif_determine (instance,
1108 res = totemudp_build_sockets_ip (instance, mcast_address,
1109 bindnet_address, sockets, bound_to, interface_num);
1114 "Unable to create sockets, exiting");
1119 totemudp_traffic_control_set(instance, sockets->
token);
1132 qb_loop_t *poll_handle,
1139 void (*deliver_fn) (
1142 unsigned int msg_len,
1145 void (*iface_change_fn) (
1148 unsigned int ring_no),
1150 void (*mtu_changed) (
1154 void (*target_set_completed) (
1160 if (instance == NULL) {
1164 totemudp_instance_initialize (instance);
1207 100*QB_TIME_NS_IN_MSEC,
1209 timer_function_netif_check_timeout,
1210 &instance->timer_netif_check_timeout);
1212 *udp_context = instance;
1228 int processor_count)
1236 if (processor_count == 1) {
1241 timer_function_netif_check_timeout,
1242 &instance->timer_netif_check_timeout);
1259 for (i = 0; i < 2; i++) {
1271 ufd.events = POLLIN;
1272 nfds = poll (&ufd, 1, 0);
1273 if (nfds == 1 && ufd.revents & POLLIN) {
1274 net_deliver_fn (sock, ufd.revents, instance);
1276 }
while (nfds == 1);
1292 unsigned int msg_len)
1297 ucast_sendmsg (instance, &instance->
token_target, msg, msg_len);
1304 unsigned int msg_len)
1309 mcast_sendmsg (instance, msg, msg_len);
1317 unsigned int msg_len)
1322 mcast_sendmsg (instance, msg, msg_len);
1332 timer_function_netif_check_timeout (instance);
1340 unsigned int *iface_count)
1362 struct qb_list_head *
list;
1366 qb_list_for_each(list, &(instance->
member_list)) {
1367 member = qb_list_entry (list,
1387 struct sockaddr_storage system_from;
1388 struct msghdr msg_recv;
1391 int msg_processed = 0;
1399 msg_recv.msg_namelen =
sizeof (
struct sockaddr_storage);
1401 msg_recv.msg_iovlen = 1;
1402 #ifdef HAVE_MSGHDR_CONTROL 1403 msg_recv.msg_control = 0;
1405 #ifdef HAVE_MSGHDR_CONTROLLEN 1406 msg_recv.msg_controllen = 0;
1408 #ifdef HAVE_MSGHDR_FLAGS 1409 msg_recv.msg_flags = 0;
1411 #ifdef HAVE_MSGHDR_ACCRIGHTS 1412 msg_recv.msg_accrights = NULL;
1414 #ifdef HAVE_MSGHDR_ACCRIGHTSLEN 1415 msg_recv.msg_accrightslen = 0;
1418 for (i = 0; i < 2; i++) {
1430 ufd.events = POLLIN;
1431 nfds = poll (&ufd, 1, 0);
1432 if (nfds == 1 && ufd.revents & POLLIN) {
1433 res = recvmsg (sock, &msg_recv,
MSG_NOSIGNAL | MSG_DONTWAIT);
1440 }
while (nfds == 1);
1443 return (msg_processed);
1458 if (new_member == NULL) {
1462 memset(new_member, 0,
sizeof(*new_member));
1464 qb_list_init (&new_member->
list);
1477 struct qb_list_head *
list;
1484 qb_list_for_each(list, &(instance->
member_list)) {
1485 member = qb_list_entry (list,
1507 unsigned short ip_port,
1508 unsigned int iface_no)
1516 struct totem_config *totem_config)
unsigned int clear_node_high_bit
int totemudp_crypto_set(void *udp_context, const char *cipher_type, const char *hash_type)
int totemudp_processor_count_set(void *udp_context, int processor_count)
int totemip_localhost(int family, struct totem_ip_address *localhost)
char iov_buffer[UDP_RECEIVE_FRAME_SIZE_MAX]
int totemudp_iface_set(void *net_context, const struct totem_ip_address *local_addr, unsigned short ip_port, unsigned int iface_no)
struct totem_ip_address member
struct iovec totemudp_iov_recv_flush
struct totem_interface * interfaces
struct totemudp_instance * instance
The totem_ip_address struct.
const char * totemip_print(const struct totem_ip_address *addr)
void(* totemudp_iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no)
void totemudp_net_mtu_adjust(void *udp_context, struct totem_config *totem_config)
int totemudp_token_target_set(void *udp_context, unsigned int nodeid)
void * totemudp_buffer_alloc(void)
int totemip_compare(const void *a, const void *b)
int totemudp_mcast_flush_send(void *udp_context, const void *msg, unsigned int msg_len)
struct totemudp_socket totemudp_sockets
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
struct totem_ip_address mcast_address
unsigned int downcheck_timeout
int totemudp_mcast_noflush_send(void *udp_context, const void *msg, unsigned int msg_len)
int totemudp_log_level_security
#define BIND_STATE_REGULAR
struct totem_config * totem_config
#define NETIF_STATE_REPORT_DOWN
#define totemip_nosigpipe(s)
struct qb_list_head member_list
int totemudp_log_level_error
int totemudp_send_flush(void *udp_context)
int totemudp_recv_flush(void *udp_context)
struct iovec totemudp_iov_recv
int totemip_iface_check(struct totem_ip_address *bindnet, struct totem_ip_address *boundto, int *interface_up, int *interface_num, int mask_high_bit)
#define UDP_RECEIVE_FRAME_SIZE_MAX
char iov_buffer_flush[UDP_RECEIVE_FRAME_SIZE_MAX]
int totemudp_token_send(void *udp_context, const void *msg, unsigned int msg_len)
int totemudp_member_remove(void *udp_context, const struct totem_ip_address *token_target, int ring_no)
struct totem_interface * totem_interface
qb_loop_t * totemudp_poll_handle
unsigned int my_memb_entries
struct totem_ip_address mcast_addr
#define BIND_RETRIES_INTERVAL
void(* totemudp_log_printf)(int level, int subsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
int totemudp_finalize(void *udp_context)
struct totem_ip_address boundto
#define BIND_STATE_LOOPBACK
size_t totemip_udpip_header_size(int family)
#define NETIF_STATE_REPORT_UP
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
#define MCAST_SOCKET_BUFFER_SIZE
void(*) void udp_context)
int totemudp_log_level_debug
#define PROCESSOR_COUNT_MAX
int totemudp_log_level_notice
int totemudp_reconfigure(void *udp_context, struct totem_config *totem_config)
unsigned int broadcast_use
int totemudp_member_add(void *udp_context, const struct totem_ip_address *local, const struct totem_ip_address *member, int ring_no)
void(* totemudp_target_set_completed)(void *context)
int totemip_totemip_to_sockaddr_convert(struct totem_ip_address *ip_addr, uint16_t port, struct sockaddr_storage *saddr, int *addrlen)
struct totem_logging_configuration totem_logging_configuration
#define LOGSYS_PERROR(err_num, level, fmt, args...)
struct srp_addr system_from
#define log_printf(level, format, args...)
struct totem_ip_address my_id
int totemudp_recv_mcast_empty(void *udp_context)
int totemudp_iface_check(void *udp_context)
int totemudp_ifaces_get(void *net_context, char ***status, unsigned int *iface_count)
struct totem_ip_address bindnet
void(* totemudp_deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from)
uint32_t continuous_sendmsg_failures
int totemudp_initialize(qb_loop_t *poll_handle, void **udp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len, const struct sockaddr_storage *system_from), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_address, unsigned int ring_no), void(*mtu_changed)(void *context, int net_mtu), void(*target_set_completed)(void *context))
Create an instance.
struct totem_ip_address token_target
qb_loop_timer_handle timer_netif_check_timeout
void totemudp_buffer_release(void *ptr)
int totemudp_log_level_warning