41 #include <sys/types.h> 42 #include <sys/socket.h> 44 #include <sys/ioctl.h> 45 #include <netinet/in.h> 54 #include <arpa/inet.h> 57 #include <qb/qblist.h> 61 #include <qb/qbipc_common.h> 70 #define MAP_ANONYMOUS MAP_ANON 77 #define GROUP_HASH_SIZE 32 143 static struct qb_list_head joinlist_messages_head;
155 struct qb_list_head iteration_instance_list_head;
156 struct qb_list_head zcb_mapped_list_head;
162 struct qb_list_head items_list_head;
172 static unsigned int my_member_list_entries;
176 static unsigned int my_old_member_list_entries = 0;
202 static int cpg_lib_init_fn (
void *conn);
204 static int cpg_lib_exit_fn (
void *conn);
206 static void message_handler_req_exec_cpg_procjoin (
210 static void message_handler_req_exec_cpg_procleave (
214 static void message_handler_req_exec_cpg_joinlist (
218 static void message_handler_req_exec_cpg_mcast (
222 static void message_handler_req_exec_cpg_partial_mcast (
226 static void message_handler_req_exec_cpg_downlist_old (
230 static void message_handler_req_exec_cpg_downlist (
234 static void exec_cpg_procjoin_endian_convert (
void *msg);
236 static void exec_cpg_joinlist_endian_convert (
void *msg);
238 static void exec_cpg_mcast_endian_convert (
void *msg);
240 static void exec_cpg_partial_mcast_endian_convert (
void *msg);
242 static void exec_cpg_downlist_endian_convert_old (
void *msg);
244 static void exec_cpg_downlist_endian_convert (
void *msg);
246 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message);
248 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message);
250 static void message_handler_req_lib_cpg_finalize (
void *conn,
const void *message);
252 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message);
254 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message);
256 static void message_handler_req_lib_cpg_membership (
void *conn,
257 const void *message);
259 static void message_handler_req_lib_cpg_local_get (
void *conn,
260 const void *message);
262 static void message_handler_req_lib_cpg_iteration_initialize (
264 const void *message);
266 static void message_handler_req_lib_cpg_iteration_next (
268 const void *message);
270 static void message_handler_req_lib_cpg_iteration_finalize (
272 const void *message);
274 static void message_handler_req_lib_cpg_zc_alloc (
276 const void *message);
278 static void message_handler_req_lib_cpg_zc_free (
280 const void *message);
282 static void message_handler_req_lib_cpg_zc_execute (
284 const void *message);
286 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason);
288 static int cpg_exec_send_downlist(
void);
290 static int cpg_exec_send_joinlist(
void);
292 static void downlist_inform_clients (
void);
294 static void joinlist_inform_clients (
void);
296 static void joinlist_messages_delete (
void);
298 static void cpg_sync_init (
299 const unsigned int *trans_list,
300 size_t trans_list_entries,
301 const unsigned int *member_list,
302 size_t member_list_entries,
305 static int cpg_sync_process (
void);
307 static void cpg_sync_activate (
void);
309 static void cpg_sync_abort (
void);
311 static void do_proc_join(
317 static void do_proc_leave(
323 static int notify_lib_totem_membership (
325 int member_list_entries,
326 const unsigned int *member_list);
328 static inline int zcb_all_free (
331 static char *cpg_print_group_name (
344 .lib_handler_fn = message_handler_req_lib_cpg_leave,
348 .lib_handler_fn = message_handler_req_lib_cpg_mcast,
352 .lib_handler_fn = message_handler_req_lib_cpg_membership,
356 .lib_handler_fn = message_handler_req_lib_cpg_local_get,
360 .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
364 .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
368 .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
372 .lib_handler_fn = message_handler_req_lib_cpg_finalize,
376 .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
380 .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
384 .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
388 .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
398 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
401 .exec_handler_fn = message_handler_req_exec_cpg_procleave,
402 .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
405 .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
406 .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
409 .exec_handler_fn = message_handler_req_exec_cpg_mcast,
410 .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
413 .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
414 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
417 .exec_handler_fn = message_handler_req_exec_cpg_downlist,
418 .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
421 .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
422 .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
427 .
name =
"corosync cluster closed process group service v1.01",
430 .private_data_size =
sizeof (
struct cpg_pd),
433 .lib_init_fn = cpg_lib_init_fn,
434 .lib_exit_fn = cpg_lib_exit_fn,
435 .lib_engine = cpg_lib_engine,
437 .exec_init_fn = cpg_exec_init_fn,
438 .exec_dump_fn = NULL,
439 .exec_engine = cpg_exec_engine,
441 .sync_init = cpg_sync_init,
442 .sync_process = cpg_sync_process,
443 .sync_activate = cpg_sync_activate,
444 .sync_abort = cpg_sync_abort
449 return (&cpg_service_engine);
453 struct qb_ipc_request_header header __attribute__((aligned(8)));
460 struct qb_ipc_request_header header __attribute__((aligned(8)));
469 struct qb_ipc_request_header header __attribute__((aligned(8)));
480 struct qb_ipc_request_header header __attribute__((aligned(8)));
486 struct qb_ipc_request_header header __attribute__((aligned(8)));
513 for (i = 0; i < group->length; i++) {
516 if (c >=
' ' && c < 0x7f && c !=
'\\') {
520 res[dest_pos++] =
'\\';
521 res[dest_pos++] =
'\\';
523 snprintf(res + dest_pos,
sizeof(res) - dest_pos,
"\\x%02X", c);
533 static void cpg_sync_init (
534 const unsigned int *trans_list,
535 size_t trans_list_entries,
536 const unsigned int *member_list,
537 size_t member_list_entries,
546 memcpy (my_member_list, member_list, member_list_entries *
547 sizeof (
unsigned int));
548 my_member_list_entries = member_list_entries;
550 last_sync_ring_id.nodeid = ring_id->
nodeid;
551 last_sync_ring_id.seq = ring_id->
seq;
557 for (i = 0; i < my_old_member_list_entries; i++) {
559 for (j = 0; j < trans_list_entries; j++) {
560 if (my_old_member_list[i] == trans_list[j]) {
566 g_req_exec_cpg_downlist.nodeids[entries++] =
567 my_old_member_list[i];
570 g_req_exec_cpg_downlist.left_nodes = entries;
573 static int cpg_sync_process (
void)
578 res = cpg_exec_send_downlist();
585 res = cpg_exec_send_joinlist();
590 static void cpg_sync_activate (
void)
592 memcpy (my_old_member_list, my_member_list,
593 my_member_list_entries *
sizeof (
unsigned int));
594 my_old_member_list_entries = my_member_list_entries;
596 downlist_inform_clients ();
598 joinlist_inform_clients ();
600 joinlist_messages_delete ();
602 notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
605 static void cpg_sync_abort (
void)
608 joinlist_messages_delete ();
611 static int notify_lib_totem_membership (
613 int member_list_entries,
614 const unsigned int *member_list)
616 struct qb_list_head *iter;
628 res->member_list_entries = member_list_entries;
629 res->header.size =
size;
631 res->header.error =
CS_OK;
637 qb_list_for_each(iter, &cpg_pd_list_head) {
648 static int notify_lib_joinlist(
651 int joined_list_entries,
653 int left_list_entries,
659 struct qb_list_head *iter;
666 qb_list_for_each(iter, &process_info_list_head) {
668 if (mar_name_compare (&pi->
group, group_name) == 0) {
672 for (i = 0; i < left_list_entries; i++) {
673 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
690 res->joined_list_entries = joined_list_entries;
691 res->left_list_entries = left_list_entries;
692 res->member_list_entries = count;
694 res->header.size =
size;
696 res->header.error =
CS_OK;
699 qb_list_for_each(iter, &process_info_list_head) {
702 if (mar_name_compare (&pi->
group, group_name) == 0) {
706 for (i = 0;i < left_list_entries; i++) {
707 if (left_list[i].
nodeid == pi->
nodeid && left_list[i].pid == pi->
pid) {
713 retgi->nodeid = pi->
nodeid;
714 retgi->pid = pi->
pid;
721 if (left_list_entries) {
723 retgi += left_list_entries;
726 if (joined_list_entries) {
728 retgi += joined_list_entries;
734 qb_list_for_each(iter, &cpg_pd_list_head) {
736 if (mar_name_compare (&cpd->
group_name, group_name) == 0) {
737 assert (joined_list_entries <= 1);
738 if (joined_list_entries) {
739 if (joined_list[0].
pid == cpd->
pid &&
750 if (left_list_entries) {
751 if (left_list[0].
pid == cpd->
pid &&
768 qb_list_for_each(iter, &cpg_pd_list_head) {
774 notify_lib_totem_membership (cpd->
conn, my_old_member_list_entries, my_old_member_list);
784 "%s: members(old:%d left:%d)",
790 static void downlist_inform_clients (
void)
792 struct qb_list_head *iter, *tmp_iter;
800 int left_list_entries;
801 struct qb_list_head
list;
803 qb_map_iter_t *miter;
806 downlist_log(
"my downlist", &g_req_exec_cpg_downlist);
808 group_map = qb_skiplist_create();
815 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
819 for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
821 if (pi->
nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
828 marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->
group);
829 cpg_group.value[cpg_group.length] = 0;
831 pcd = (
struct confchg_data *)qb_map_get(group_map, cpg_group.value);
833 pcd = (
struct confchg_data *)calloc(1,
sizeof(
struct confchg_data));
834 memcpy(&pcd->cpg_group, &cpg_group,
sizeof(
struct cpg_name));
835 qb_map_put(group_map, pcd->cpg_group.value, pcd);
837 size = pcd->left_list_entries;
838 pcd->left_list[
size].nodeid = left_pi->
nodeid;
839 pcd->left_list[
size].pid = left_pi->
pid;
841 pcd->left_list_entries++;
842 qb_list_del (&left_pi->
list);
848 miter = qb_map_iter_create(group_map);
849 while (qb_map_iter_next(miter, (
void **)&pcd)) {
850 marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
852 log_printf (LOG_DEBUG,
"left_list_entries:%d", pcd->left_list_entries);
853 for (i=0; i<pcd->left_list_entries; i++) {
854 log_printf (LOG_DEBUG,
"left_list[%d] group:%s, ip:%s, pid:%d",
855 i, cpg_print_group_name(&group),
857 pcd->left_list[i].pid);
861 notify_lib_joinlist(&group, NULL,
863 pcd->left_list_entries,
869 qb_map_iter_free(miter);
870 qb_map_destroy(group_map);
876 static void joinlist_remove_zombie_pi_entries (
void)
878 struct qb_list_head *pi_iter, *tmp_iter;
879 struct qb_list_head *jl_iter;
884 qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) {
898 qb_list_for_each(jl_iter, &joinlist_messages_head) {
906 pi->
pid == stored_msg->
pid &&
919 static void joinlist_inform_clients (
void)
922 struct qb_list_head *iter;
926 qb_list_for_each(iter, &joinlist_messages_head) {
929 log_printf (LOG_DEBUG,
"joinlist_messages[%u] group:%s, ip:%s, pid:%d",
930 i++, cpg_print_group_name(&stored_msg->
group_name),
943 joinlist_remove_zombie_pi_entries ();
946 static void joinlist_messages_delete (
void)
949 struct qb_list_head *iter, *tmp_iter;
951 qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) {
953 qb_list_del (&stored_msg->
list);
956 qb_list_init (&joinlist_messages_head);
961 qb_list_init (&joinlist_messages_head);
968 struct qb_list_head *iter, *tmp_iter;
971 qb_list_for_each_safe(iter, tmp_iter, &(cpg_iteration_instance->
items_list_head)) {
973 qb_list_del (&pi->
list);
977 qb_list_del (&cpg_iteration_instance->
list);
978 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->
handle);
981 static void cpg_pd_finalize (
struct cpg_pd *cpd)
983 struct qb_list_head *iter, *tmp_iter;
984 struct cpg_iteration_instance *cpii;
988 cpii = qb_list_entry (iter,
struct cpg_iteration_instance,
list);
990 cpg_iteration_instance_finalize (cpii);
993 qb_list_del (&cpd->
list);
996 static int cpg_lib_exit_fn (
void *conn)
1007 cpg_pd_finalize (cpd);
1013 static int cpg_node_joinleave_send (
unsigned int pid,
const mar_cpg_name_t *group_name,
int fn,
int reason)
1016 struct iovec req_exec_cpg_iovec;
1035 static void exec_cpg_procjoin_endian_convert (
void *msg)
1039 req_exec_cpg_procjoin->pid =
swab32(req_exec_cpg_procjoin->pid);
1040 swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1041 req_exec_cpg_procjoin->reason =
swab32(req_exec_cpg_procjoin->reason);
1044 static void exec_cpg_joinlist_endian_convert (
void *msg_v)
1047 struct qb_ipc_response_header *res = (
struct qb_ipc_response_header *)msg;
1050 swab_mar_int32_t (&res->size);
1052 while ((
const char*)jle < msg + res->size) {
1059 static void exec_cpg_downlist_endian_convert_old (
void *msg)
1063 static void exec_cpg_downlist_endian_convert (
void *msg)
1068 req_exec_cpg_downlist->left_nodes =
swab32(req_exec_cpg_downlist->left_nodes);
1069 req_exec_cpg_downlist->old_members =
swab32(req_exec_cpg_downlist->old_members);
1071 for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1072 req_exec_cpg_downlist->nodeids[i] =
swab32(req_exec_cpg_downlist->nodeids[i]);
1077 static void exec_cpg_mcast_endian_convert (
void *msg)
1081 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1082 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1083 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1084 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1085 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1088 static void exec_cpg_partial_mcast_endian_convert (
void *msg)
1092 swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1093 swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1094 req_exec_cpg_mcast->pid =
swab32(req_exec_cpg_mcast->pid);
1095 req_exec_cpg_mcast->msglen =
swab32(req_exec_cpg_mcast->msglen);
1096 req_exec_cpg_mcast->fraglen =
swab32(req_exec_cpg_mcast->fraglen);
1097 req_exec_cpg_mcast->type =
swab32(req_exec_cpg_mcast->type);
1098 swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1102 struct qb_list_head *iter;
1104 qb_list_for_each(iter, &process_info_list_head) {
1107 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1108 mar_name_compare (&pi->
group, group_name) == 0) {
1116 static void do_proc_join(
1119 unsigned int nodeid,
1125 struct qb_list_head *
list;
1126 struct qb_list_head *list_to_add = NULL;
1128 if (process_info_find (name, pid, nodeid) != NULL) {
1138 memcpy(&pi->
group, name,
sizeof(*name));
1139 qb_list_init(&pi->
list);
1144 list_to_add = &process_info_list_head;
1145 qb_list_for_each(list, &process_info_list_head) {
1146 pi_entry = qb_list_entry(list,
struct process_info, list);
1154 qb_list_add (&pi->
list, list_to_add);
1156 notify_info.pid = pi->
pid;
1157 notify_info.nodeid =
nodeid;
1158 notify_info.reason = reason;
1160 notify_lib_joinlist(&pi->
group, NULL,
1166 static void do_proc_leave(
1169 unsigned int nodeid,
1173 struct qb_list_head *iter, *tmp_iter;
1176 notify_info.pid = pid;
1177 notify_info.nodeid =
nodeid;
1178 notify_info.reason = reason;
1180 notify_lib_joinlist(name, NULL,
1185 qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
1188 if (pi->
pid == pid && pi->
nodeid == nodeid &&
1189 mar_name_compare (&pi->
group, name)==0) {
1190 qb_list_del (&pi->
list);
1196 static void message_handler_req_exec_cpg_downlist_old (
1197 const void *message,
1198 unsigned int nodeid)
1204 static void message_handler_req_exec_cpg_downlist(
1205 const void *message,
1206 unsigned int nodeid)
1208 const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1211 req_exec_cpg_downlist->left_nodes);
1215 static void message_handler_req_exec_cpg_procjoin (
1216 const void *message,
1217 unsigned int nodeid)
1224 (
unsigned int)req_exec_cpg_procjoin->pid);
1226 do_proc_join (&req_exec_cpg_procjoin->group_name,
1227 req_exec_cpg_procjoin->pid, nodeid,
1231 static void message_handler_req_exec_cpg_procleave (
1232 const void *message,
1233 unsigned int nodeid)
1240 (
unsigned int)req_exec_cpg_procjoin->pid);
1242 do_proc_leave (&req_exec_cpg_procjoin->group_name,
1243 req_exec_cpg_procjoin->pid, nodeid,
1244 req_exec_cpg_procjoin->reason);
1249 static void message_handler_req_exec_cpg_joinlist (
1250 const void *message_v,
1251 unsigned int nodeid)
1253 const char *message = message_v;
1254 const struct qb_ipc_response_header *res = (
const struct qb_ipc_response_header *)message;
1261 while ((
const char*)jle < message + res->size) {
1265 stored_msg->
pid = jle->
pid;
1267 qb_list_init (&stored_msg->
list);
1268 qb_list_add (&stored_msg->
list, &joinlist_messages_head);
1273 static void message_handler_req_exec_cpg_mcast (
1274 const void *message,
1275 unsigned int nodeid)
1279 int msglen = req_exec_cpg_mcast->msglen;
1280 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1282 struct iovec iovec[2];
1296 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1297 iovec[1].iov_len = msglen;
1299 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1300 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1302 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1306 qb_list_for_each(pi_iter, &process_info_list_head) {
1309 if (pi->
nodeid == nodeid &&
1310 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1327 static void message_handler_req_exec_cpg_partial_mcast (
1328 const void *message,
1329 unsigned int nodeid)
1333 int msglen = req_exec_cpg_mcast->fraglen;
1334 struct qb_list_head *iter, *pi_iter, *tmp_iter;
1336 struct iovec iovec[2];
1354 iovec[1].iov_base = (
char*)message+
sizeof(*req_exec_cpg_mcast);
1355 iovec[1].iov_len = msglen;
1357 qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1358 cpd = qb_list_entry(iter,
struct cpg_pd, list);
1361 && (mar_name_compare (&cpd->
group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1365 qb_list_for_each(pi_iter, &process_info_list_head) {
1368 if (pi->
nodeid == nodeid &&
1369 mar_name_compare (&pi->
group, &req_exec_cpg_mcast->group_name) == 0) {
1387 static int cpg_exec_send_downlist(
void)
1392 g_req_exec_cpg_downlist.header.size =
sizeof(
struct req_exec_cpg_downlist);
1394 g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1396 iov.iov_base = (
void *)&g_req_exec_cpg_downlist;
1397 iov.iov_len = g_req_exec_cpg_downlist.header.size;
1402 static int cpg_exec_send_joinlist(
void)
1405 struct qb_list_head *iter;
1406 struct qb_ipc_response_header *res;
1409 struct iovec req_exec_cpg_iovec;
1411 qb_list_for_each(iter, &process_info_list_head) {
1423 buf = alloca(
sizeof(
struct qb_ipc_response_header) +
sizeof(
struct join_list_entry) * count);
1429 jle = (
struct join_list_entry *)(buf +
sizeof(
struct qb_ipc_response_header));
1430 res = (
struct qb_ipc_response_header *)buf;
1432 qb_list_for_each(iter, &process_info_list_head) {
1443 res->size =
sizeof(
struct qb_ipc_response_header)+sizeof(struct
join_list_entry) * count;
1445 req_exec_cpg_iovec.iov_base = buf;
1446 req_exec_cpg_iovec.iov_len = res->size;
1451 static int cpg_lib_init_fn (
void *conn)
1454 memset (cpd, 0,
sizeof(
struct cpg_pd));
1456 qb_list_add (&cpd->
list, &cpg_pd_list_head);
1467 static void message_handler_req_lib_cpg_join (
void *conn,
const void *message)
1473 struct qb_list_head *iter;
1476 qb_list_for_each(iter, &cpg_pd_list_head) {
1477 struct cpg_pd *cpd_item = qb_list_entry (iter,
struct cpg_pd, list);
1479 if (cpd_item->
pid == req_lib_cpg_join->pid &&
1480 mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->
group_name) == 0) {
1492 qb_list_for_each(iter, &process_info_list_head) {
1496 mar_name_compare(&req_lib_cpg_join->group_name, &pi->
group) == 0) {
1512 cpd->
pid = req_lib_cpg_join->pid;
1513 cpd->
flags = req_lib_cpg_join->flags;
1514 memcpy (&cpd->
group_name, &req_lib_cpg_join->group_name,
1517 cpg_node_joinleave_send (req_lib_cpg_join->pid,
1518 &req_lib_cpg_join->group_name,
1540 static void message_handler_req_lib_cpg_leave (
void *conn,
const void *message)
1562 cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1563 &req_lib_cpg_leave->group_name,
1577 static void message_handler_req_lib_cpg_finalize (
1579 const void *message)
1591 qb_list_del (&cpd->
list);
1592 qb_list_init (&cpd->
list);
1612 fd = open (path, O_RDWR, 0600);
1620 res = ftruncate (fd, bytes);
1622 goto error_close_unlink;
1625 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1628 if (addr == MAP_FAILED) {
1629 goto error_close_unlink;
1632 madvise(addr, bytes, MADV_NOSYNC);
1637 munmap (addr, bytes);
1649 static inline int zcb_alloc (
1651 const char *path_to_file,
1658 zcb_mapped = malloc (
sizeof (
struct zcb_mapped));
1659 if (zcb_mapped == NULL) {
1672 qb_list_init (&zcb_mapped->
list);
1680 static inline int zcb_free (
struct zcb_mapped *zcb_mapped)
1684 res = munmap (zcb_mapped->
addr, zcb_mapped->
size);
1685 qb_list_del (&zcb_mapped->
list);
1690 static inline int zcb_by_addr_free (
struct cpg_pd *cpd,
void *addr)
1692 struct qb_list_head *
list, *tmp_iter;
1693 struct zcb_mapped *zcb_mapped;
1694 unsigned int res = 0;
1697 zcb_mapped = qb_list_entry (list,
struct zcb_mapped, list);
1699 if (zcb_mapped->
addr == addr) {
1700 res = zcb_free (zcb_mapped);
1708 static inline int zcb_all_free (
1711 struct qb_list_head *
list, *tmp_iter;
1712 struct zcb_mapped *zcb_mapped;
1715 zcb_mapped = qb_list_entry (list,
struct zcb_mapped, list);
1717 zcb_free (zcb_mapped);
1727 static uint64_t void2serveraddr (
void *server_ptr)
1735 static void *serveraddr2void (uint64_t
server_addr)
1743 static void message_handler_req_lib_cpg_zc_alloc (
1745 const void *message)
1748 struct qb_ipc_response_header res_header;
1756 res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1763 res_header.size =
sizeof (
struct qb_ipc_response_header);
1770 static void message_handler_req_lib_cpg_zc_free (
1772 const void *message)
1775 struct qb_ipc_response_header res_header;
1781 addr = serveraddr2void (hdr->server_address);
1783 zcb_by_addr_free (cpd, addr);
1785 res_header.size =
sizeof (
struct qb_ipc_response_header);
1793 static void message_handler_req_lib_cpg_partial_mcast (
void *conn,
const void *message)
1799 struct iovec req_exec_cpg_iovec[2];
1802 int msglen = req_lib_cpg_mcast->fraglen;
1824 res_lib_cpg_partial_send.header.size =
sizeof(res_lib_cpg_partial_send);
1834 if (error ==
CS_OK) {
1835 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1838 req_exec_cpg_mcast.pid = cpd->
pid;
1839 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1840 req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
1841 req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1843 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1846 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1847 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1848 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1849 req_exec_cpg_iovec[1].iov_len = msglen;
1852 assert(result == 0);
1855 conn, group_name.value, cpd->
cpd_state, error);
1858 res_lib_cpg_partial_send.header.error = error;
1860 sizeof (res_lib_cpg_partial_send));
1864 static void message_handler_req_lib_cpg_mcast (
void *conn,
const void *message)
1870 struct iovec req_exec_cpg_iovec[2];
1871 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1872 int msglen = req_lib_cpg_mcast->msglen;
1893 if (error ==
CS_OK) {
1894 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + msglen;
1897 req_exec_cpg_mcast.pid = cpd->
pid;
1898 req_exec_cpg_mcast.msglen = msglen;
1900 memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1903 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1904 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1905 req_exec_cpg_iovec[1].iov_base = (
char *)&req_lib_cpg_mcast->message;
1906 req_exec_cpg_iovec[1].iov_len = msglen;
1909 assert(result == 0);
1912 conn, group_name.value, cpd->
cpd_state, error);
1916 static void message_handler_req_lib_cpg_zc_execute (
1918 const void *message)
1921 struct qb_ipc_request_header *
header;
1924 struct iovec req_exec_cpg_iovec[2];
1925 struct req_exec_cpg_mcast req_exec_cpg_mcast;
1932 header = (
struct qb_ipc_request_header *)(((
char *)serveraddr2void(hdr->server_address) + sizeof (
struct coroipcs_zc_header)));
1933 req_lib_cpg_mcast = (
struct req_lib_cpg_mcast *)header;
1950 res_lib_cpg_mcast.header.size =
sizeof(res_lib_cpg_mcast);
1952 if (error ==
CS_OK) {
1953 req_exec_cpg_mcast.header.size =
sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
1956 req_exec_cpg_mcast.pid = cpd->
pid;
1957 req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1959 memcpy(&req_exec_cpg_mcast.group_name, &cpd->
group_name,
1962 req_exec_cpg_iovec[0].iov_base = (
char *)&req_exec_cpg_mcast;
1963 req_exec_cpg_iovec[0].iov_len =
sizeof(req_exec_cpg_mcast);
1964 req_exec_cpg_iovec[1].iov_base = (
char *)header +
sizeof(
struct req_lib_cpg_mcast);
1965 req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
1969 res_lib_cpg_mcast.header.error =
CS_OK;
1974 res_lib_cpg_mcast.header.error = error;
1978 sizeof (res_lib_cpg_mcast));
1982 static void message_handler_req_lib_cpg_membership (
void *conn,
1983 const void *message)
1986 (
struct req_lib_cpg_membership_get *)message;
1988 struct qb_list_head *iter;
1989 int member_count = 0;
1992 res_lib_cpg_membership_get.header.error =
CS_OK;
1993 res_lib_cpg_membership_get.header.size =
1994 sizeof (
struct res_lib_cpg_membership_get);
1996 qb_list_for_each(iter, &process_info_list_head) {
1998 if (mar_name_compare (&pi->
group, &req_lib_cpg_membership_get->group_name) == 0) {
2000 res_lib_cpg_membership_get.
member_list[member_count].pid = pi->
pid;
2004 res_lib_cpg_membership_get.member_count = member_count;
2007 sizeof (res_lib_cpg_membership_get));
2010 static void message_handler_req_lib_cpg_local_get (
void *conn,
2011 const void *message)
2015 res_lib_cpg_local_get.header.size =
sizeof (res_lib_cpg_local_get);
2017 res_lib_cpg_local_get.header.error =
CS_OK;
2021 sizeof (res_lib_cpg_local_get));
2024 static void message_handler_req_lib_cpg_iteration_initialize (
2026 const void *message)
2032 struct qb_list_head *iter, *iter2;
2033 struct cpg_iteration_instance *cpg_iteration_instance;
2046 res = hdb_handle_create (&cpg_iteration_handle_t_db,
sizeof (
struct cpg_iteration_instance),
2047 &cpg_iteration_handle);
2054 res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (
void *)&cpg_iteration_instance);
2061 qb_list_init (&cpg_iteration_instance->items_list_head);
2062 cpg_iteration_instance->handle = cpg_iteration_handle;
2067 qb_list_for_each(iter, &process_info_list_head) {
2077 qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2080 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2096 if (mar_name_compare (&pi->
group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2109 goto error_put_destroy;
2113 qb_list_init (&new_pi->
list);
2125 qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2128 if (mar_name_compare (&pi2->
group, &pi->
group) == 0) {
2133 qb_list_add (&new_pi->
list, iter2);
2143 qb_list_init (&cpg_iteration_instance->list);
2146 cpg_iteration_instance->current_pointer = &cpg_iteration_instance->items_list_head;
2149 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2151 if (error !=
CS_OK) {
2152 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2156 res_lib_cpg_iterationinitialize.header.size =
sizeof (res_lib_cpg_iterationinitialize);
2158 res_lib_cpg_iterationinitialize.header.error = error;
2159 res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2162 sizeof (res_lib_cpg_iterationinitialize));
2165 static void message_handler_req_lib_cpg_iteration_next (
2167 const void *message)
2171 struct cpg_iteration_instance *cpg_iteration_instance;
2178 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2179 req_lib_cpg_iterationnext->iteration_handle,
2180 (
void *)&cpg_iteration_instance);
2187 assert (cpg_iteration_instance);
2189 cpg_iteration_instance->current_pointer = cpg_iteration_instance->current_pointer->next;
2191 if (cpg_iteration_instance->current_pointer == &cpg_iteration_instance->items_list_head) {
2196 pi = qb_list_entry (cpg_iteration_instance->current_pointer,
struct process_info, list);
2201 res_lib_cpg_iterationnext.description.nodeid = pi->
nodeid;
2202 res_lib_cpg_iterationnext.description.pid = pi->
pid;
2203 memcpy (&res_lib_cpg_iterationnext.description.group,
2208 hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2210 res_lib_cpg_iterationnext.header.size =
sizeof (res_lib_cpg_iterationnext);
2212 res_lib_cpg_iterationnext.header.error = error;
2215 sizeof (res_lib_cpg_iterationnext));
2218 static void message_handler_req_lib_cpg_iteration_finalize (
2220 const void *message)
2224 struct cpg_iteration_instance *cpg_iteration_instance;
2230 res = hdb_handle_get (&cpg_iteration_handle_t_db,
2231 req_lib_cpg_iterationfinalize->iteration_handle,
2232 (
void *)&cpg_iteration_instance);
2239 assert (cpg_iteration_instance);
2241 cpg_iteration_instance_finalize (cpg_iteration_instance);
2242 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2245 res_lib_cpg_iterationfinalize.header.size =
sizeof (res_lib_cpg_iterationfinalize);
2247 res_lib_cpg_iterationfinalize.header.error = error;
2250 sizeof (res_lib_cpg_iterationfinalize));
void *(* ipc_private_data_get)(void *conn)
int initial_totem_conf_sent
mar_cpg_address_t member_list[]
mar_req_coroipcc_zc_free_t struct
#define CPG_MAX_NAME_LENGTH
uint64_t initial_transition_counter
#define LOGSYS_LEVEL_TRACE
mar_uint32_t sender_nodeid
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
The req_lib_cpg_join struct.
mar_req_coroipcc_zc_alloc_t struct
The corosync_service_engine struct.
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
The res_lib_cpg_partial_deliver_callback struct.
The req_lib_cpg_mcast struct.
The corosync_lib_handler struct.
The res_lib_cpg_membership_get struct.
The res_lib_cpg_iterationnext struct.
The res_lib_cpg_iterationinitialize struct.
The corosync_exec_handler struct.
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
uint64_t transition_counter
#define log_printf(level, format, args...)
The res_lib_cpg_partial_send struct.
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
#define SERVICE_ID_MAKE(a, b)
The req_lib_cpg_iterationinitialize struct.
#define LOGSYS_LEVEL_WARNING
The res_lib_cpg_join struct.
unsigned int(* totem_nodeid_get)(void)
void(* ipc_refcnt_dec)(void *conn)
struct qb_list_head * current_pointer
mar_req_coroipcc_zc_execute_t struct
The res_lib_cpg_mcast struct.
#define LOGSYS_LEVEL_ERROR
mar_uint32_t member_list[]
cs_error_t
The cs_error_t enum.
The req_lib_cpg_leave struct.
#define LOGSYS_LEVEL_DEBUG
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
mar_cpg_name_t group_name
The req_lib_cpg_iterationfinalize struct.
mar_cpg_name_t group_name
The corosync_api_v1 struct.
LOGSYS_DECLARE_SUBSYS("CPG")
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
struct totem_message_header header
#define swab32(x)
The swab32 macro.
The res_lib_cpg_finalize struct.
struct qb_list_head items_list_head
struct qb_list_head zcb_mapped_list_head
The res_lib_cpg_local_get struct.
#define PROCESSOR_COUNT_MAX
The res_lib_cpg_iterationfinalize struct.
struct corosync_service_engine cpg_service_engine
The req_lib_cpg_partial_mcast struct.
The req_lib_cpg_iterationnext struct.
const char *(* totem_ifaces_print)(unsigned int nodeid)
The res_lib_cpg_confchg_callback struct.
mar_cpg_name_t group_name
struct qb_list_head iteration_instance_list_head
void(* lib_handler_fn)(void *conn, const void *msg)
The req_lib_cpg_membership_get struct.
QB_LIST_DECLARE(cpg_pd_list_head)
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
The res_lib_cpg_leave struct.
struct memb_ring_id ring_id
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
The res_lib_cpg_totem_confchg_callback struct.
Message from another node.
The mar_message_source_t struct.
void(* ipc_refcnt_inc)(void *conn)