corosync  3.1.10
lib/cpg.c
Go to the documentation of this file.
1 /*
2  * vi: set autoindent tabstop=4 shiftwidth=4 :
3  *
4  * Copyright (c) 2006-2015 Red Hat, Inc.
5  *
6  * All rights reserved.
7  *
8  * Author: Christine Caulfield (ccaulfi@redhat.com)
9  * Author: Jan Friesse (jfriesse@redhat.com)
10  *
11  * This software licensed under BSD license, the text of which follows:
12  *
13  * Redistribution and use in source and binary forms, with or without
14  * modification, are permitted provided that the following conditions are met:
15  *
16  * - Redistributions of source code must retain the above copyright notice,
17  * this list of conditions and the following disclaimer.
18  * - Redistributions in binary form must reproduce the above copyright notice,
19  * this list of conditions and the following disclaimer in the documentation
20  * and/or other materials provided with the distribution.
21  * - Neither the name of the MontaVista Software, Inc. nor the names of its
22  * contributors may be used to endorse or promote products derived from this
23  * software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35  * THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 /*
38  * Provides a closed process group API using the coroipcc executive
39  */
40 
41 #include <config.h>
42 
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54 
55 #include <qb/qblist.h>
56 #include <qb/qbdefs.h>
57 #include <qb/qbipcc.h>
58 #include <qb/qblog.h>
59 
60 #include <corosync/hdb.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65 
66 #include "util.h"
67 
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71 
72 /*
73  * Maximum number of times to retry a send when transmitting
74  * a large message fragment
75  */
76 #define MAX_RETRIES 100
77 
78 /*
79  * ZCB files have following umask (umask is same as used in libqb)
80  */
81 #define CPG_MEMORY_MAP_UMASK 077
82 
84 {
85  struct qb_list_head list;
86  uint32_t nodeid;
87  uint32_t pid;
88  char *assembly_buf;
89  uint32_t assembly_buf_ptr;
90 };
91 
92 struct cpg_inst {
93  qb_ipcc_connection_t *c;
94  int finalize;
95  void *context;
96  union {
99  };
100  struct qb_list_head iteration_list_head;
101  uint32_t max_msg_size;
102  struct qb_list_head assembly_list_head;
103 };
104 static void cpg_inst_free (void *inst);
105 
106 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107 
110  qb_ipcc_connection_t *conn;
112  struct qb_list_head list;
113 };
114 
115 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116 
117 
118 /*
119  * Internal (not visible by API) functions
120  */
121 
122 static cs_error_t
123 coroipcc_msg_send_reply_receive (
124  qb_ipcc_connection_t *c,
125  const struct iovec *iov,
126  unsigned int iov_len,
127  void *res_msg,
128  size_t res_len)
129 {
130  return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
132 }
133 
134 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135 {
136  qb_list_del (&cpg_iteration_instance->list);
137  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138 }
139 
140 static void cpg_inst_free (void *inst)
141 {
142  struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143  qb_ipcc_disconnect(cpg_inst->c);
144 }
145 
146 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147 {
148  struct qb_list_head *iter, *tmp_iter;
150 
151  /*
152  * Traverse thru iteration instances and delete them
153  */
154  qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
155  cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
156 
157  cpg_iteration_instance_finalize (cpg_iteration_instance);
158  }
159  hdb_handle_destroy (&cpg_handle_t_db, handle);
160 }
161 
170  cpg_handle_t *handle,
171  cpg_callbacks_t *callbacks)
172 {
173  cpg_model_v1_data_t model_v1_data;
174 
175  memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
176 
177  if (callbacks) {
178  model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
179  model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
180  }
181 
182  return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
183 }
184 
186  cpg_handle_t *handle,
187  cpg_model_t model,
188  cpg_model_data_t *model_data,
189  void *context)
190 {
191  cs_error_t error;
192  struct cpg_inst *cpg_inst;
193 
194  if (model != CPG_MODEL_V1) {
195  error = CS_ERR_INVALID_PARAM;
196  goto error_no_destroy;
197  }
198 
199  error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
200  if (error != CS_OK) {
201  goto error_no_destroy;
202  }
203 
204  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
205  if (error != CS_OK) {
206  goto error_destroy;
207  }
208 
209  cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
210  if (cpg_inst->c == NULL) {
211  error = qb_to_cs_error(-errno);
212  goto error_put_destroy;
213  }
214 
215  if (model_data != NULL) {
216  switch (model) {
217  case CPG_MODEL_V1:
218  memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
219  if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
220  error = CS_ERR_INVALID_PARAM;
221 
222  goto error_destroy;
223  }
224  break;
225  }
226  }
227 
228  /* Allow space for corosync internal headers */
229  cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
230  cpg_inst->model_data.model = model;
231  cpg_inst->context = context;
232 
233  qb_list_init(&cpg_inst->iteration_list_head);
234 
235  qb_list_init(&cpg_inst->assembly_list_head);
236 
237  hdb_handle_put (&cpg_handle_t_db, *handle);
238 
239  return (CS_OK);
240 
241 error_put_destroy:
242  hdb_handle_put (&cpg_handle_t_db, *handle);
243 error_destroy:
244  hdb_handle_destroy (&cpg_handle_t_db, *handle);
245 error_no_destroy:
246  return (error);
247 }
248 
250  cpg_handle_t handle)
251 {
252  struct cpg_inst *cpg_inst;
253  struct iovec iov;
254  struct req_lib_cpg_finalize req_lib_cpg_finalize;
255  struct res_lib_cpg_finalize res_lib_cpg_finalize;
256  cs_error_t error;
257 
258  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
259  if (error != CS_OK) {
260  return (error);
261  }
262 
263  /*
264  * Another thread has already started finalizing
265  */
266  if (cpg_inst->finalize) {
267  hdb_handle_put (&cpg_handle_t_db, handle);
268  return (CS_ERR_BAD_HANDLE);
269  }
270 
271  cpg_inst->finalize = 1;
272 
273  /*
274  * Send service request
275  */
276  req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
277  req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
278 
279  iov.iov_base = (void *)&req_lib_cpg_finalize;
280  iov.iov_len = sizeof (struct req_lib_cpg_finalize);
281 
282  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
283  &iov,
284  1,
285  &res_lib_cpg_finalize,
286  sizeof (struct res_lib_cpg_finalize));
287 
288  cpg_inst_finalize (cpg_inst, handle);
289  hdb_handle_put (&cpg_handle_t_db, handle);
290 
291  return (error);
292 }
293 
295  cpg_handle_t handle,
296  int *fd)
297 {
298  cs_error_t error;
299  struct cpg_inst *cpg_inst;
300 
301  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
302  if (error != CS_OK) {
303  return (error);
304  }
305 
306  error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
307 
308  hdb_handle_put (&cpg_handle_t_db, handle);
309 
310  return (error);
311 }
312 
314  cpg_handle_t handle,
315  uint32_t *size)
316 {
317  cs_error_t error;
318  struct cpg_inst *cpg_inst;
319 
320  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
321  if (error != CS_OK) {
322  return (error);
323  }
324 
325  *size = cpg_inst->max_msg_size;
326 
327  hdb_handle_put (&cpg_handle_t_db, handle);
328 
329  return (error);
330 }
331 
333  cpg_handle_t handle,
334  void **context)
335 {
336  cs_error_t error;
337  struct cpg_inst *cpg_inst;
338 
339  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
340  if (error != CS_OK) {
341  return (error);
342  }
343 
344  *context = cpg_inst->context;
345 
346  hdb_handle_put (&cpg_handle_t_db, handle);
347 
348  return (CS_OK);
349 }
350 
352  cpg_handle_t handle,
353  void *context)
354 {
355  cs_error_t error;
356  struct cpg_inst *cpg_inst;
357 
358  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
359  if (error != CS_OK) {
360  return (error);
361  }
362 
363  cpg_inst->context = context;
364 
365  hdb_handle_put (&cpg_handle_t_db, handle);
366 
367  return (CS_OK);
368 }
369 
371  cpg_handle_t handle,
372  cs_dispatch_flags_t dispatch_types)
373 {
374  int timeout = -1;
375  cs_error_t error;
376  int cont = 1; /* always continue do loop except when set to 0 */
377  struct cpg_inst *cpg_inst;
378  struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
379  struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
380  struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
381  struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
382  struct cpg_inst cpg_inst_copy;
383  struct qb_ipc_response_header *dispatch_data;
384  struct cpg_address member_list[CPG_MEMBERS_MAX];
385  struct cpg_address left_list[CPG_MEMBERS_MAX];
386  struct cpg_address joined_list[CPG_MEMBERS_MAX];
387  struct cpg_name group_name;
388  struct cpg_assembly_data *assembly_data;
389  struct qb_list_head *iter, *tmp_iter;
390  mar_cpg_address_t *left_list_start;
391  mar_cpg_address_t *joined_list_start;
392  unsigned int i;
393  struct cpg_ring_id ring_id;
394  uint32_t totem_member_list[CPG_MEMBERS_MAX];
395  int32_t errno_res;
396  char dispatch_buf[IPC_DISPATCH_SIZE];
397 
398  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
399  if (error != CS_OK) {
400  return (error);
401  }
402 
403  /*
404  * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
405  * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
406  */
407  if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
408  timeout = 0;
409  }
410 
411  dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
412  do {
413  errno_res = qb_ipcc_event_recv (
414  cpg_inst->c,
415  dispatch_buf,
417  timeout);
418  error = qb_to_cs_error (errno_res);
419  if (error == CS_ERR_BAD_HANDLE) {
420  error = CS_OK;
421  goto error_put;
422  }
423  if (error == CS_ERR_TRY_AGAIN) {
424  if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
425  /*
426  * Don't mask error
427  */
428  goto error_put;
429  }
430  error = CS_OK;
431  if (dispatch_types == CS_DISPATCH_ALL) {
432  break; /* exit do while cont is 1 loop */
433  } else {
434  continue; /* next poll */
435  }
436  }
437  if (error != CS_OK) {
438  goto error_put;
439  }
440 
441  /*
442  * Make copy of callbacks, message data, unlock instance, and call callback
443  * A risk of this dispatch method is that the callback routines may
444  * operate at the same time that cpgFinalize has been called.
445  */
446  memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
447  switch (cpg_inst_copy.model_data.model) {
448  case CPG_MODEL_V1:
449  /*
450  * Dispatch incoming message
451  */
452  switch (dispatch_data->id) {
454  if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
455  break;
456  }
457 
458  res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
459 
460  marshall_from_mar_cpg_name_t (
461  &group_name,
462  &res_cpg_deliver_callback->group_name);
463 
464  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
465  &group_name,
466  res_cpg_deliver_callback->nodeid,
467  res_cpg_deliver_callback->pid,
468  &res_cpg_deliver_callback->message,
469  res_cpg_deliver_callback->msglen);
470  break;
471 
473  res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
474 
475  marshall_from_mar_cpg_name_t (
476  &group_name,
477  &res_cpg_partial_deliver_callback->group_name);
478 
479  /*
480  * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
481  */
482  assembly_data = NULL;
483  qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
484  struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
485  if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
486  assembly_data = current_assembly_data;
487  break;
488  }
489  }
490 
491  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
492 
493  /*
494  * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
495  * Otherwise the sending of packet must have been interrupted and error should have
496  * been reported to sending client. Therefore here last assembly will be dropped.
497  */
498  if (assembly_data) {
499  qb_list_del (&assembly_data->list);
500  free(assembly_data->assembly_buf);
501  free(assembly_data);
502  // coverity[UNUSED_VALUE:SUPPRESS] defensive programming
503  assembly_data = NULL;
504  }
505 
506  assembly_data = malloc(sizeof(struct cpg_assembly_data));
507  if (!assembly_data) {
508  error = CS_ERR_NO_MEMORY;
509  goto error_put;
510  }
511 
512  assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
513  assembly_data->pid = res_cpg_partial_deliver_callback->pid;
514  assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
515  if (!assembly_data->assembly_buf) {
516  free(assembly_data);
517  error = CS_ERR_NO_MEMORY;
518  goto error_put;
519  }
520  assembly_data->assembly_buf_ptr = 0;
521  qb_list_init (&assembly_data->list);
522 
523  qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
524  }
525  if (assembly_data) {
526  memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
527  res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
528  assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
529 
530  if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
531  if (cpg_inst_copy.model_v1_data.cpg_deliver_fn != NULL) {
532  cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533  &group_name,
534  res_cpg_partial_deliver_callback->nodeid,
535  res_cpg_partial_deliver_callback->pid,
536  assembly_data->assembly_buf,
537  res_cpg_partial_deliver_callback->msglen);
538  }
539 
540  qb_list_del (&assembly_data->list);
541  free(assembly_data->assembly_buf);
542  free(assembly_data);
543  }
544  }
545  break;
546 
548  if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
549  break;
550  }
551 
552  res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
553 
554  for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555  marshall_from_mar_cpg_address_t (&member_list[i],
556  &res_cpg_confchg_callback->member_list[i]);
557  }
558  left_list_start = res_cpg_confchg_callback->member_list +
559  res_cpg_confchg_callback->member_list_entries;
560  for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
561  marshall_from_mar_cpg_address_t (&left_list[i],
562  &left_list_start[i]);
563  }
564  joined_list_start = res_cpg_confchg_callback->member_list +
565  res_cpg_confchg_callback->member_list_entries +
566  res_cpg_confchg_callback->left_list_entries;
567  for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
568  marshall_from_mar_cpg_address_t (&joined_list[i],
569  &joined_list_start[i]);
570  }
571  marshall_from_mar_cpg_name_t (
572  &group_name,
573  &res_cpg_confchg_callback->group_name);
574 
575  cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
576  &group_name,
577  member_list,
578  res_cpg_confchg_callback->member_list_entries,
579  left_list,
580  res_cpg_confchg_callback->left_list_entries,
581  joined_list,
582  res_cpg_confchg_callback->joined_list_entries);
583 
584  /*
585  * If member left while his partial packet was being assembled, assembly data must be removed from list
586  */
587  for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
588  qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
589  struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
590  if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
591  continue;
592 
593  qb_list_del (&current_assembly_data->list);
594  free(current_assembly_data->assembly_buf);
595  free(current_assembly_data);
596  }
597  }
598 
599  break;
601  if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
602  break;
603  }
604 
605  res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
606 
607  marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
608  for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
609  totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
610  }
611 
612  cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
613  ring_id,
614  res_cpg_totem_confchg_callback->member_list_entries,
615  totem_member_list);
616  break;
617  default:
618  error = CS_ERR_LIBRARY;
619  goto error_put;
620  break;
621  } /* - switch (dispatch_data->id) */
622  break; /* case CPG_MODEL_V1 */
623  } /* - switch (cpg_inst_copy.model_data.model) */
624 
625  if (cpg_inst_copy.finalize || cpg_inst->finalize) {
626  /*
627  * If the finalize has been called then get out of the dispatch.
628  */
629  cpg_inst->finalize = 1;
630  error = CS_ERR_BAD_HANDLE;
631  goto error_put;
632  }
633 
634  /*
635  * Determine if more messages should be processed
636  */
637  if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
638  cont = 0;
639  }
640  } while (cont);
641 
642 error_put:
643  hdb_handle_put (&cpg_handle_t_db, handle);
644  return (error);
645 }
646 
648  cpg_handle_t handle,
649  const struct cpg_name *group)
650 {
651  cs_error_t error;
652  struct cpg_inst *cpg_inst;
653  struct iovec iov[2];
654  struct req_lib_cpg_join req_lib_cpg_join;
655  struct res_lib_cpg_join response;
656 
657  if (group->length > CPG_MAX_NAME_LENGTH) {
658  return (CS_ERR_NAME_TOO_LONG);
659  }
660 
661  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
662  if (error != CS_OK) {
663  return (error);
664  }
665 
666  /* Now join */
667  req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
668  req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
669  req_lib_cpg_join.pid = getpid();
670  req_lib_cpg_join.flags = 0;
671 
672  switch (cpg_inst->model_data.model) {
673  case CPG_MODEL_V1:
674  req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
675  break;
676  }
677 
678  marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
679  group);
680 
681  iov[0].iov_base = (void *)&req_lib_cpg_join;
682  iov[0].iov_len = sizeof (struct req_lib_cpg_join);
683 
684  do {
685  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
686  &response, sizeof (struct res_lib_cpg_join));
687 
688  if (error != CS_OK) {
689  goto error_exit;
690  }
691  } while (response.header.error == CS_ERR_BUSY);
692 
693  error = response.header.error;
694 
695 error_exit:
696  hdb_handle_put (&cpg_handle_t_db, handle);
697 
698  return (error);
699 }
700 
702  cpg_handle_t handle,
703  const struct cpg_name *group)
704 {
705  cs_error_t error;
706  struct cpg_inst *cpg_inst;
707  struct iovec iov[2];
708  struct req_lib_cpg_leave req_lib_cpg_leave;
709  struct res_lib_cpg_leave res_lib_cpg_leave;
710 
711  if (group->length > CPG_MAX_NAME_LENGTH) {
712  return (CS_ERR_NAME_TOO_LONG);
713  }
714 
715  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
716  if (error != CS_OK) {
717  return (error);
718  }
719 
720  req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
721  req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
722  req_lib_cpg_leave.pid = getpid();
723  marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
724  group);
725 
726  iov[0].iov_base = (void *)&req_lib_cpg_leave;
727  iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
728 
729  do {
730  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
731  &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
732 
733  if (error != CS_OK) {
734  goto error_exit;
735  }
736  } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
737 
738  error = res_lib_cpg_leave.header.error;
739 
740 error_exit:
741  hdb_handle_put (&cpg_handle_t_db, handle);
742 
743  return (error);
744 }
745 
747  cpg_handle_t handle,
748  struct cpg_name *group_name,
749  struct cpg_address *member_list,
750  int *member_list_entries)
751 {
752  cs_error_t error;
753  struct cpg_inst *cpg_inst;
754  struct iovec iov;
755  struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
756  struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
757  unsigned int i;
758 
759  if (group_name->length > CPG_MAX_NAME_LENGTH) {
760  return (CS_ERR_NAME_TOO_LONG);
761  }
762  if (member_list == NULL) {
763  return (CS_ERR_INVALID_PARAM);
764  }
765  if (member_list_entries == NULL) {
766  return (CS_ERR_INVALID_PARAM);
767  }
768 
769  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
770  if (error != CS_OK) {
771  return (error);
772  }
773 
774  req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
775  req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
776 
777  marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
778  group_name);
779 
780  iov.iov_base = (void *)&req_lib_cpg_membership_get;
781  iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
782 
783  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
784  &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
785 
786  if (error != CS_OK) {
787  goto error_exit;
788  }
789 
790  error = res_lib_cpg_membership_get.header.error;
791 
792  /*
793  * Copy results to caller
794  */
795  *member_list_entries = res_lib_cpg_membership_get.member_count;
796  if (member_list) {
797  for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
798  marshall_from_mar_cpg_address_t (&member_list[i],
799  &res_lib_cpg_membership_get.member_list[i]);
800  }
801  }
802 
803 error_exit:
804  hdb_handle_put (&cpg_handle_t_db, handle);
805 
806  return (error);
807 }
808 
810  cpg_handle_t handle,
811  unsigned int *local_nodeid)
812 {
813  cs_error_t error;
814  struct cpg_inst *cpg_inst;
815  struct iovec iov;
816  struct req_lib_cpg_local_get req_lib_cpg_local_get;
817  struct res_lib_cpg_local_get res_lib_cpg_local_get;
818 
819  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
820  if (error != CS_OK) {
821  return (error);
822  }
823 
824  req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
825  req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
826 
827  iov.iov_base = (void *)&req_lib_cpg_local_get;
828  iov.iov_len = sizeof (struct req_lib_cpg_local_get);
829 
830  error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
831  &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
832 
833  if (error != CS_OK) {
834  goto error_exit;
835  }
836 
837  error = res_lib_cpg_local_get.header.error;
838 
839  *local_nodeid = res_lib_cpg_local_get.local_nodeid;
840 
841 error_exit:
842  hdb_handle_put (&cpg_handle_t_db, handle);
843 
844  return (error);
845 }
846 
848  cpg_handle_t handle,
849  cpg_flow_control_state_t *flow_control_state)
850 {
851  cs_error_t error;
852  struct cpg_inst *cpg_inst;
853 
854  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
855  if (error != CS_OK) {
856  return (error);
857  }
858  *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
859  error = CS_OK;
860 
861  hdb_handle_put (&cpg_handle_t_db, handle);
862 
863  return (error);
864 }
865 
866 static int
867 memory_map (char *path, const char *file, void **buf, size_t bytes)
868 {
869  int32_t fd;
870  void *addr;
871  int32_t res;
872  char *buffer;
873  int32_t i;
874  size_t written;
875  size_t page_size;
876  long int sysconf_page_size;
877  mode_t old_umask;
878 
879  snprintf (path, PATH_MAX, "/dev/shm/%s", file);
880 
881  old_umask = umask(CPG_MEMORY_MAP_UMASK);
882  fd = mkstemp (path);
883  (void)umask(old_umask);
884  if (fd == -1) {
885  snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
886  old_umask = umask(CPG_MEMORY_MAP_UMASK);
887  fd = mkstemp (path);
888  (void)umask(old_umask);
889  if (fd == -1) {
890  return (-1);
891  }
892  }
893 
894  res = ftruncate (fd, bytes);
895  if (res == -1) {
896  goto error_close_unlink;
897  }
898  sysconf_page_size = sysconf(_SC_PAGESIZE);
899  if (sysconf_page_size <= 0) {
900  goto error_close_unlink;
901  }
902  page_size = sysconf_page_size;
903  buffer = malloc (page_size);
904  if (buffer == NULL) {
905  goto error_close_unlink;
906  }
907  memset (buffer, 0, page_size);
908  for (i = 0; i < (bytes / page_size); i++) {
909 retry_write:
910  written = write (fd, buffer, page_size);
911  if (written == -1 && errno == EINTR) {
912  goto retry_write;
913  }
914  if (written != page_size) {
915  free (buffer);
916  goto error_close_unlink;
917  }
918  }
919  free (buffer);
920 
921  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
922  MAP_SHARED, fd, 0);
923 
924  if (addr == MAP_FAILED) {
925  goto error_close_unlink;
926  }
927 #ifdef MADV_NOSYNC
928  madvise(addr, bytes, MADV_NOSYNC);
929 #endif
930 
931  res = close (fd);
932  if (res) {
933  munmap(addr, bytes);
934 
935  return (-1);
936  }
937  *buf = addr;
938 
939  return 0;
940 
941 error_close_unlink:
942  close (fd);
943  unlink(path);
944  return -1;
945 }
946 
948  cpg_handle_t handle,
949  size_t size,
950  void **buffer)
951 {
952  void *buf = NULL;
953  char path[PATH_MAX];
954  mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
955  struct qb_ipc_response_header res_coroipcs_zc_alloc;
956  size_t map_size;
957  struct iovec iovec;
958  struct coroipcs_zc_header *hdr;
959  cs_error_t error;
960  struct cpg_inst *cpg_inst;
961 
962  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
963  if (error != CS_OK) {
964  return (error);
965  }
966 
967  map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
968  assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969 
970  if (strlen(path) >= CPG_ZC_PATH_LEN) {
971  unlink(path);
972  munmap (buf, map_size);
973  return (CS_ERR_NAME_TOO_LONG);
974  }
975 
976  req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
977  req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
978  req_coroipcc_zc_alloc.map_size = map_size;
979  strcpy (req_coroipcc_zc_alloc.path_to_file, path);
980 
981  iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
982  iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
983 
984  error = coroipcc_msg_send_reply_receive (
985  cpg_inst->c,
986  &iovec,
987  1,
988  &res_coroipcs_zc_alloc,
989  sizeof (struct qb_ipc_response_header));
990 
991  if (error != CS_OK) {
992  goto error_exit;
993  }
994 
995  hdr = (struct coroipcs_zc_header *)buf;
996  hdr->map_size = map_size;
997  *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
998 
999 error_exit:
1000  hdb_handle_put (&cpg_handle_t_db, handle);
1001  /*
1002  * Coverity correctly reports an error here. We cannot safely munmap and unlink the file, because
1003  * the timing of the failure is the key issue: if a failure occurs before the IPC reply,
1004  * the file should be deleted.
1005  * However, if the failure happens during the IPC reply, Corosync has already deleted the file.
1006  * This means the cpg library could attempt to delete a non-existing file (not a problem) or,
1007  * in a theoretical race condition, delete a new file created by another application.
1008  * There are multiple possible solutions, but none of them are ready to be implemented yet.
1009  */
1010  return (error);
1011 }
1012 
1014  cpg_handle_t handle,
1015  void *buffer)
1016 {
1017  cs_error_t error;
1018  unsigned int res;
1019  struct cpg_inst *cpg_inst;
1020  mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1021  struct qb_ipc_response_header res_coroipcs_zc_free;
1022  struct iovec iovec;
1023  struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1024 
1025  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1026  if (error != CS_OK) {
1027  return (error);
1028  }
1029 
1030  req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1031  req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1032  req_coroipcc_zc_free.map_size = header->map_size;
1033  req_coroipcc_zc_free.server_address = header->server_address;
1034 
1035  iovec.iov_base = (void *)&req_coroipcc_zc_free;
1036  iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1037 
1038  error = coroipcc_msg_send_reply_receive (
1039  cpg_inst->c,
1040  &iovec,
1041  1,
1042  &res_coroipcs_zc_free,
1043  sizeof (struct qb_ipc_response_header));
1044 
1045  if (error != CS_OK) {
1046  goto error_exit;
1047  }
1048 
1049  res = munmap ((void *)header, header->map_size);
1050  if (res == -1) {
1051  error = qb_to_cs_error(-errno);
1052 
1053  goto error_exit;
1054  }
1055 
1056 error_exit:
1057  hdb_handle_put (&cpg_handle_t_db, handle);
1058 
1059  return (error);
1060 }
1061 
1063  cpg_handle_t handle,
1065  void *msg,
1066  size_t msg_len)
1067 {
1068  cs_error_t error;
1069  struct cpg_inst *cpg_inst;
1071  struct res_lib_cpg_mcast res_lib_cpg_mcast;
1072  mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1073  struct coroipcs_zc_header *hdr;
1074  struct iovec iovec;
1075 
1076  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1077  if (error != CS_OK) {
1078  return (error);
1079  }
1080 
1081  if (msg_len > IPC_REQUEST_SIZE) {
1082  error = CS_ERR_TOO_BIG;
1083  goto error_exit;
1084  }
1085 
1086  req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1087  req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1088  msg_len;
1089 
1090  req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1091  req_lib_cpg_mcast->guarantee = guarantee;
1092  req_lib_cpg_mcast->msglen = msg_len;
1093 
1094  hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1095 
1096  req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1097  req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1098  req_coroipcc_zc_execute.server_address = hdr->server_address;
1099 
1100  iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1101  iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1102 
1103  error = coroipcc_msg_send_reply_receive (
1104  cpg_inst->c,
1105  &iovec,
1106  1,
1107  &res_lib_cpg_mcast,
1108  sizeof(res_lib_cpg_mcast));
1109 
1110  if (error != CS_OK) {
1111  goto error_exit;
1112  }
1113 
1114  error = res_lib_cpg_mcast.header.error;
1115 
1116 error_exit:
1117  hdb_handle_put (&cpg_handle_t_db, handle);
1118 
1119  return (error);
1120 }
1121 
1122 static cs_error_t send_fragments (
1123  struct cpg_inst *cpg_inst,
1125  size_t msg_len,
1126  const struct iovec *iovec,
1127  unsigned int iov_len)
1128 {
1129  int i;
1130  cs_error_t error = CS_OK;
1131  struct iovec iov[2];
1134  size_t sent = 0;
1135  size_t iov_sent = 0;
1136  int retry_count;
1137 
1139  req_lib_cpg_mcast.guarantee = guarantee;
1140  req_lib_cpg_mcast.msglen = msg_len;
1141 
1142  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1143  iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1144 
1145  i=0;
1146  iov_sent = 0 ;
1147  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1148 
1149  while (error == CS_OK && sent < msg_len) {
1150 
1151  retry_count = 0;
1152  if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1153  iov[1].iov_len = cpg_inst->max_msg_size;
1154  }
1155  else {
1156  iov[1].iov_len = iovec[i].iov_len - iov_sent;
1157  }
1158 
1159  if (sent == 0) {
1161  }
1162  else if ((sent + iov[1].iov_len) == msg_len) {
1164  }
1165  else {
1167  }
1168 
1169  req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1170  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1171  iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1172 
1173  resend:
1174  error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1176  sizeof (res_lib_cpg_partial_send));
1177 
1178  if (error == CS_ERR_TRY_AGAIN) {
1179  fprintf(stderr, "sleep. counter=%d\n", retry_count);
1180  if (++retry_count > MAX_RETRIES) {
1181  goto error_exit;
1182  }
1183  usleep(10000);
1184  goto resend;
1185  }
1186 
1187  iov_sent += iov[1].iov_len;
1188  sent += iov[1].iov_len;
1189 
1190  /* Next iovec */
1191  if (iov_sent >= iovec[i].iov_len) {
1192  i++;
1193  iov_sent = 0;
1194  }
1195  error = res_lib_cpg_partial_send.header.error;
1196  }
1197 error_exit:
1198  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1199 
1200  return error;
1201 }
1202 
1203 
1205  cpg_handle_t handle,
1206  cpg_guarantee_t guarantee,
1207  const struct iovec *iovec,
1208  unsigned int iov_len)
1209 {
1210  int i;
1211  cs_error_t error;
1212  struct cpg_inst *cpg_inst;
1213  struct iovec iov[64];
1214  struct req_lib_cpg_mcast req_lib_cpg_mcast;
1215  size_t msg_len = 0;
1216 
1217  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1218  if (error != CS_OK) {
1219  return (error);
1220  }
1221 
1222  for (i = 0; i < iov_len; i++ ) {
1223  msg_len += iovec[i].iov_len;
1224  }
1225 
1226  if (msg_len > cpg_inst->max_msg_size) {
1227  error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1228  goto error_exit;
1229  }
1230 
1231  req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1232  msg_len;
1233 
1234  req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1235  req_lib_cpg_mcast.guarantee = guarantee;
1236  req_lib_cpg_mcast.msglen = msg_len;
1237 
1238  iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1239  iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1240  memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1241 
1242  qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1243  error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1244  qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1245 
1246 error_exit:
1247  hdb_handle_put (&cpg_handle_t_db, handle);
1248 
1249  return (error);
1250 }
1251 
1253  cpg_handle_t handle,
1254  cpg_iteration_type_t iteration_type,
1255  const struct cpg_name *group,
1256  cpg_iteration_handle_t *cpg_iteration_handle)
1257 {
1258  cs_error_t error;
1259  struct iovec iov;
1260  struct cpg_inst *cpg_inst;
1261  struct cpg_iteration_instance_t *cpg_iteration_instance;
1262  struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1263  struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1264 
1265  if (group && group->length > CPG_MAX_NAME_LENGTH) {
1266  return (CS_ERR_NAME_TOO_LONG);
1267  }
1268  if (cpg_iteration_handle == NULL) {
1269  return (CS_ERR_INVALID_PARAM);
1270  }
1271 
1272  if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1273  (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1274  return (CS_ERR_INVALID_PARAM);
1275  }
1276 
1277  if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1278  iteration_type != CPG_ITERATION_ALL) {
1279 
1280  return (CS_ERR_INVALID_PARAM);
1281  }
1282 
1283  error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1284  if (error != CS_OK) {
1285  return (error);
1286  }
1287 
1288  error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1289  sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1290  if (error != CS_OK) {
1291  goto error_put_cpg_db;
1292  }
1293 
1294  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1295  (void *)&cpg_iteration_instance));
1296  if (error != CS_OK) {
1297  goto error_destroy;
1298  }
1299 
1300  cpg_iteration_instance->conn = cpg_inst->c;
1301 
1302  qb_list_init (&cpg_iteration_instance->list);
1303 
1304  req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1305  req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1306  req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1307  if (group) {
1308  marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1309  }
1310 
1311  iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1312  iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1313 
1314  error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1315  &iov,
1316  1,
1317  &res_lib_cpg_iterationinitialize,
1318  sizeof (struct res_lib_cpg_iterationinitialize));
1319 
1320  if (error != CS_OK) {
1321  goto error_put_destroy;
1322  }
1323 
1324  cpg_iteration_instance->executive_iteration_handle =
1325  res_lib_cpg_iterationinitialize.iteration_handle;
1326  cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1327 
1328  qb_list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1329 
1330  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331  hdb_handle_put (&cpg_handle_t_db, handle);
1332 
1333  return (res_lib_cpg_iterationinitialize.header.error);
1334 
1335 error_put_destroy:
1336  hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1337 error_destroy:
1338  hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1339 error_put_cpg_db:
1340  hdb_handle_put (&cpg_handle_t_db, handle);
1341 
1342  return (error);
1343 }
1344 
1346  cpg_iteration_handle_t handle,
1347  struct cpg_iteration_description_t *description)
1348 {
1349  cs_error_t error;
1350  struct cpg_iteration_instance_t *cpg_iteration_instance;
1351  struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1352  struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1353 
1354  if (description == NULL) {
1355  return CS_ERR_INVALID_PARAM;
1356  }
1357 
1358  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1359  (void *)&cpg_iteration_instance));
1360  if (error != CS_OK) {
1361  goto error_exit;
1362  }
1363 
1364  req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1365  req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1366  req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1367 
1368  error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1369  &req_lib_cpg_iterationnext,
1370  req_lib_cpg_iterationnext.header.size));
1371  if (error != CS_OK) {
1372  goto error_put;
1373  }
1374 
1375  error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1376  &res_lib_cpg_iterationnext,
1377  sizeof(struct res_lib_cpg_iterationnext), -1));
1378  if (error != CS_OK) {
1379  goto error_put;
1380  }
1381 
1382  marshall_from_mar_cpg_iteration_description_t(
1383  description,
1384  &res_lib_cpg_iterationnext.description);
1385 
1386  error = res_lib_cpg_iterationnext.header.error;
1387 
1388 error_put:
1389  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1390 
1391 error_exit:
1392  return (error);
1393 }
1394 
1396  cpg_iteration_handle_t handle)
1397 {
1398  cs_error_t error;
1399  struct iovec iov;
1400  struct cpg_iteration_instance_t *cpg_iteration_instance;
1401  struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1402  struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1403 
1404  error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1405  (void *)&cpg_iteration_instance));
1406  if (error != CS_OK) {
1407  goto error_exit;
1408  }
1409 
1410  req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1411  req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1412  req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1413 
1414  iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1415  iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1416 
1417  error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1418  &iov,
1419  1,
1420  &res_lib_cpg_iterationfinalize,
1421  sizeof (struct req_lib_cpg_iterationfinalize));
1422 
1423  if (error != CS_OK) {
1424  goto error_put;
1425  }
1426 
1427  cpg_iteration_instance_finalize (cpg_iteration_instance);
1428  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1429 
1430  return (res_lib_cpg_iterationfinalize.header.error);
1431 
1432 error_put:
1433  hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1434 error_exit:
1435  return (error);
1436 }
1437 
uint32_t pid
Definition: cpg.h:113
struct qb_list_head list
Definition: lib/cpg.c:112
The cpg_ring_id struct.
Definition: cpg.h:140
The cpg_callbacks_t struct.
Definition: cpg.h:182
cs_error_t cpg_iteration_next(cpg_iteration_handle_t handle, struct cpg_iteration_description_t *description)
cpg_iteration_next
Definition: lib/cpg.c:1345
cs_error_t cpg_flow_control_state_get(cpg_handle_t handle, cpg_flow_control_state_t *flow_control_state)
cpg_flow_control_state_get
Definition: lib/cpg.c:847
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
uint32_t pid
Definition: lib/cpg.c:87
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:117
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:201
cs_error_t hdb_error_to_cs(int res)
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
cpg_flow_control_state_t
The cpg_flow_control_state_t enum.
Definition: cpg.h:74
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:194
cs_error_t cpg_context_set(cpg_handle_t handle, void *context)
Set contexts for a CPG handle.
Definition: lib/cpg.c:351
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:184
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
The cpg_name struct.
Definition: cpg.h:121
cs_error_t cpg_local_get(cpg_handle_t handle, unsigned int *local_nodeid)
cpg_local_get
Definition: lib/cpg.c:809
cs_error_t cpg_membership_get(cpg_handle_t handle, struct cpg_name *group_name, struct cpg_address *member_list, int *member_list_entries)
Get membership information from cpg.
Definition: lib/cpg.c:746
char * assembly_buf
Definition: lib/cpg.c:88
The cpg_address struct.
Definition: cpg.h:111
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
cs_error_t cpg_iteration_finalize(cpg_iteration_handle_t handle)
cpg_iteration_finalize
Definition: lib/cpg.c:1395
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
cpg_iteration_handle_t cpg_iteration_handle
Definition: lib/cpg.c:109
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
#define LOCALSTATEDIR
Definition: config.h:361
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
int guarantee
Definition: totemsrp.c:266
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
The cpg_iteration_description_t struct.
Definition: cpg.h:131
#define CPG_MEMBERS_MAX
Definition: cpg.h:126
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The req_lib_cpg_local_get struct.
Definition: ipc_cpg.h:282
cpg_guarantee_t
The cpg_guarantee_t enum.
Definition: cpg.h:64
cpg_confchg_fn_t cpg_confchg_fn
Definition: cpg.h:202
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
qb_ipcc_connection_t * c
Definition: lib/cpg.c:93
hdb_handle_t executive_iteration_handle
Definition: lib/cpg.c:111
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
cs_error_t cpg_fd_get(cpg_handle_t handle, int *fd)
Get a file descriptor on which to poll.
Definition: lib/cpg.c:294
uint64_t server_address
Definition: ipc_cpg.h:500
cpg_model_v1_data_t model_v1_data
Definition: lib/cpg.c:98
#define IPC_DISPATCH_SIZE
Definition: lib/util.h:51
cs_error_t cpg_zcb_alloc(cpg_handle_t handle, size_t size, void **buffer)
cpg_zcb_alloc
Definition: lib/cpg.c:947
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
cpg_totem_confchg_fn_t cpg_totem_confchg_fn
Definition: cpg.h:203
#define MAX_RETRIES
Definition: lib/cpg.c:76
unsigned int flags
Definition: cpg.h:204
cs_error_t cpg_zcb_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, void *msg, size_t msg_len)
cpg_zcb_mcast_joined
Definition: lib/cpg.c:1062
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
struct qb_list_head iteration_list_head
Definition: lib/cpg.c:100
struct qb_list_head assembly_list_head
Definition: lib/cpg.c:102
uint64_t cpg_handle_t
cpg_handle_t
Definition: cpg.h:54
cs_error_t cpg_mcast_joined(cpg_handle_t handle, cpg_guarantee_t guarantee, const struct iovec *iovec, unsigned int iov_len)
Multicast to groups joined with cpg_join.
Definition: lib/cpg.c:1204
uint32_t max_msg_size
Definition: lib/cpg.c:101
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
#define IPC_REQUEST_SIZE
Definition: lib/util.h:49
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:98
cs_error_t cpg_dispatch(cpg_handle_t handle, cs_dispatch_flags_t dispatch_types)
Dispatch messages and configuration changes.
Definition: lib/cpg.c:370
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
void * context
Definition: lib/cpg.c:95
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
cs_dispatch_flags_t
The cs_dispatch_flags_t enum.
Definition: corotypes.h:84
struct qb_list_head list
Definition: lib/cpg.c:85
struct totem_message_header header
Definition: totemsrp.c:260
uint32_t nodeid
Definition: cpg.h:112
cs_error_t cpg_join(cpg_handle_t handle, const struct cpg_name *group)
Join one or more groups.
Definition: lib/cpg.c:647
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
#define CPG_ZC_PATH_LEN
Definition: ipc_cpg.h:43
cpg_model_data_t model_data
Definition: lib/cpg.c:97
cpg_model_t model
Definition: cpg.h:191
cpg_iteration_type_t
The cpg_iteration_type_t enum.
Definition: cpg.h:95
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
The req_lib_cpg_finalize struct.
Definition: ipc_cpg.h:268
flow control is disabled - new messages may be sent
Definition: cpg.h:75
qb_handle_t hdb_handle_t
Definition: hdb.h:52
qb_ipcc_connection_t * conn
Definition: lib/cpg.c:110
cs_error_t cpg_finalize(cpg_handle_t handle)
Close the cpg handle.
Definition: lib/cpg.c:249
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
cs_error_t cpg_leave(cpg_handle_t handle, const struct cpg_name *group)
Leave one or more groups.
Definition: lib/cpg.c:701
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
cs_error_t cpg_model_initialize(cpg_handle_t *handle, cpg_model_t model, cpg_model_data_t *model_data, void *context)
Create a new cpg connection, initialize with model.
Definition: lib/cpg.c:185
cs_error_t cpg_context_get(cpg_handle_t handle, void **context)
Get contexts for a CPG handle.
Definition: lib/cpg.c:332
uint64_t cpg_iteration_handle_t
cpg_iteration_handle_t
Definition: cpg.h:59
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
uint32_t length
Definition: cpg.h:122
cs_error_t cpg_initialize(cpg_handle_t *handle, cpg_callbacks_t *callbacks)
Create a new cpg connection.
Definition: lib/cpg.c:169
int finalize
Definition: lib/cpg.c:94
The cpg_model_v1_data_t struct.
Definition: cpg.h:199
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
cpg_model_t
The cpg_model_t enum.
Definition: cpg.h:104
The cpg_model_data_t struct.
Definition: cpg.h:190
cs_error_t cpg_iteration_initialize(cpg_handle_t handle, cpg_iteration_type_t iteration_type, const struct cpg_name *group, cpg_iteration_handle_t *cpg_iteration_handle)
cpg_iteration_initialize
Definition: lib/cpg.c:1252
#define CS_IPC_TIMEOUT_MS
Definition: corotypes.h:131
uint32_t nodeid
Definition: lib/cpg.c:86
cs_error_t qb_to_cs_error(int result)
qb_to_cs_error
#define CPG_MEMORY_MAP_UMASK
Definition: lib/cpg.c:81
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
cpg_deliver_fn_t cpg_deliver_fn
Definition: cpg.h:183
cs_error_t cpg_zcb_free(cpg_handle_t handle, void *buffer)
cpg_zcb_free
Definition: lib/cpg.c:1013
cs_error_t cpg_max_atomic_msgsize_get(cpg_handle_t handle, uint32_t *size)
Get maximum size of a message that will not be fragmented.
Definition: lib/cpg.c:313
Message from another node.
Definition: ipc_cpg.h:333
uint32_t assembly_buf_ptr
Definition: lib/cpg.c:89
DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free)