dune-common  2.8.0
variablesizecommunicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 #ifndef DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH // Still fits the line!
4 #define DUNE_COMMON_PARALLEL_VARIABLESIZECOMMUNICATOR_HH
5 
6 #if HAVE_MPI
7 
8 #include <algorithm>
9 #include <cassert>
10 #include <cstddef>
11 #include <functional>
12 #include <map>
13 #include <memory>
14 #include <utility>
15 #include <vector>
16 #include <algorithm>
17 
18 #include <mpi.h>
19 
20 #include <dune/common/concept.hh>
23 
36 namespace Dune
37 {
38 
39 namespace Concept {
40 
41 struct HasFixedSize {
42  template <typename H> auto require(H &&h) -> decltype(h.fixedSize());
43 };
44 
45 } // namespace Concept
46 
47 namespace Impl {
48 
49 template <typename H,
50  std::enable_if_t<models<Concept::HasFixedSize, H>(), int> = 0>
51 constexpr bool callFixedSize(H &&handle) {
52  return handle.fixedSize();
53 }
54 
55 template <typename H,
56  std::enable_if_t<not models<Concept::HasFixedSize, H>(), int> = 0>
57 [[deprecated("Using handles with fixedsize() (lower case s) is deprecated and "
58  "will be removed after release 2.8. Implement fixedSize() "
59  "(camelCase) instead!")]]
60 constexpr bool callFixedSize(H &&handle) {
61  return handle.fixedsize();
62 }
63 
64 } // namespace Impl
65 
66 namespace
67 {
72 template<class T, class Allocator=std::allocator<T> >
73 class MessageBuffer
74 {
75 public:
80  explicit MessageBuffer(int size)
81  : buffer_(new T[size]), size_(size), position_(0)
82  {}
87  explicit MessageBuffer(const MessageBuffer& o)
88  : buffer_(new T[o.size_]), size_(o.size_), position_(o.position_)
89  {
90  }
92  ~MessageBuffer()
93  {
94  delete[] buffer_;
95  }
100  void write(const T& data)
101  {
102  buffer_[position_++]=data;
103  }
104 
109  void read(T& data)
110  {
111  data=buffer_[position_++];
112  }
113 
119  void reset()
120  {
121  position_=0;
122  }
123 
128  bool finished()
129  {
130  return position_==size_;
131  }
132 
138  bool hasSpaceForItems(int noItems)
139  {
140  return position_+noItems<=size_;
141  }
146  std::size_t size() const
147  {
148  return size_;
149  }
154  operator T*()
155  {
156  return buffer_;
157  }
158 
159 private:
163  T* buffer_;
167  std::size_t size_;
171  std::size_t position_;
172 };
173 
177 class InterfaceTracker
178 {
179 public:
185  InterfaceTracker(int rank, InterfaceInformation info, std::size_t fixedsize=0,
186  bool allocateSizes=false)
187  : fixedSize(fixedsize),rank_(rank), index_(), interface_(info), sizes_()
188  {
189  if(allocateSizes)
190  {
191  sizes_.resize(info.size());
192  }
193  }
194 
198  void moveToNextIndex()
199  {
200  index_++;
201  assert(index_<=interface_.size());
202  skipZeroIndices();
203  }
208  void increment(std::size_t i)
209  {
210  index_+=i;
211  assert(index_<=interface_.size());
212  }
217  bool finished() const
218  {
219  return index_==interface_.size();
220  }
221 
222  void skipZeroIndices()
223  {
224  // skip indices with size zero!
225  while(sizes_.size() && index_!=interface_.size() &&!size())
226  ++index_;
227  }
228 
233  std::size_t index() const
234  {
235  return interface_[index_];
236  }
240  std::size_t size() const
241  {
242  assert(sizes_.size());
243  return sizes_[index_];
244  }
248  std::size_t* getSizesPointer()
249  {
250  return &sizes_[0];
251  }
256  bool empty() const
257  {
258  return !interface_.size();
259  }
260 
265  std::size_t indicesLeft() const
266  {
267  return interface_.size()-index_;
268  }
272  std::size_t fixedSize;
276  int rank() const
277  {
278  return rank_;
279  }
283  std::size_t offset() const
284  {
285  return index_;
286  }
287 private:
289  int rank_;
291  std::size_t index_;
293  InterfaceInformation interface_;
294  std::vector<std::size_t> sizes_;
295 };
296 
297 
298 } // end unnamed namespace
299 
337 template<class Allocator=std::allocator<std::pair<InterfaceInformation,InterfaceInformation> > >
339 {
340 public:
345  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation>,
346  std::less<int>,
347  typename std::allocator_traits<Allocator>::template rebind_alloc< std::pair<const int,std::pair<InterfaceInformation,InterfaceInformation> > > > InterfaceMap;
348 
349 #ifndef DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE
356  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf)
357  : maxBufferSize_(32768), interface_(&inf)
358  {
359  MPI_Comm_dup(comm, &communicator_);
360  }
366  : maxBufferSize_(32768), interface_(&inf.interfaces())
367  {
368  MPI_Comm_dup(inf.communicator(), &communicator_);
369  }
370 #else
377  VariableSizeCommunicator(MPI_Comm comm, InterfaceMap& inf)
378  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
379  interface_(&inf)
380  {
381  MPI_Comm_dup(comm, &communicator_);
382  }
387  VariableSizeCommunicator(const Interface& inf)
388  : maxBufferSize_(DUNE_PARALLEL_MAX_COMMUNICATION_BUFFER_SIZE),
389  interface_(&inf.interfaces())
390  {
391  MPI_Comm_dup(inf.communicator(), &communicator_);
392  }
393 #endif
400  VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap& inf, std::size_t max_buffer_size)
401  : maxBufferSize_(max_buffer_size), interface_(&inf)
402  {
403  MPI_Comm_dup(comm, &communicator_);
404  }
405 
411  VariableSizeCommunicator(const Interface& inf, std::size_t max_buffer_size)
412  : maxBufferSize_(max_buffer_size), interface_(&inf.interfaces())
413  {
414  MPI_Comm_dup(inf.communicator(), &communicator_);
415  }
416 
418  {
419  MPI_Comm_free(&communicator_);
420  }
421 
427  maxBufferSize_ = other.maxBufferSize_;
428  interface_ = other.interface_;
429  MPI_Comm_dup(other.communicator_, &communicator_);
430  }
431 
437  if(this == &other) // don't do anything if objects are the same
438  return *this;
439 
440  maxBufferSize_ = other.maxBufferSize_;
441  interface_ = other.interface_;
442  MPI_Comm_free(&communicator_);
443  MPI_Comm_dup(other.communicator_, &communicator_);
444 
445  return *this;
446  }
447 
467  template<class DataHandle>
468  void forward(DataHandle& handle)
469  {
470  communicate<true>(handle);
471  }
472 
492  template<class DataHandle>
493  void backward(DataHandle& handle)
494  {
495  communicate<false>(handle);
496  }
497 
498 private:
499  template<bool FORWARD, class DataHandle>
500  void communicateSizes(DataHandle& handle,
501  std::vector<InterfaceTracker>& recv_trackers);
502 
509  template<bool forward,class DataHandle>
510  void communicate(DataHandle& handle);
520  template<bool FORWARD, class DataHandle>
521  void setupInterfaceTrackers(DataHandle& handle,
522  std::vector<InterfaceTracker>& send_trackers,
523  std::vector<InterfaceTracker>& recv_trackers);
531  template<bool FORWARD, class DataHandle>
532  void communicateFixedSize(DataHandle& handle);
540  template<bool FORWARD, class DataHandle>
541  void communicateVariableSize(DataHandle& handle);
548  std::size_t maxBufferSize_;
556  const InterfaceMap* interface_;
562  MPI_Comm communicator_;
563 };
564 
566 namespace
567 {
571 template<class DataHandle>
572 class SizeDataHandle
573 {
574 public:
575  typedef std::size_t DataType;
576 
577  SizeDataHandle(DataHandle& data,
578  std::vector<InterfaceTracker>& trackers)
579  : data_(data), trackers_(trackers), index_()
580  {}
581  bool fixedSize()
582  {
583  return true;
584  }
585  std::size_t size([[maybe_unused]] std::size_t i)
586  {
587  return 1;
588  }
589  template<class B>
590  void gather(B& buf, int i)
591  {
592  buf.write(data_.size(i));
593  }
594  void setReceivingIndex(std::size_t i)
595  {
596  index_=i;
597  }
598  std::size_t* getSizesPointer()
599  {
600  return trackers_[index_].getSizesPointer();
601  }
602 
603 private:
604  DataHandle& data_;
605  std::vector<InterfaceTracker>& trackers_;
606  int index_;
607 };
608 
609 template<class T>
610 void setReceivingIndex(T&, int)
611 {}
612 
613 template<class T>
614 void setReceivingIndex(SizeDataHandle<T>& t, int i)
615 {
616  t.setReceivingIndex(i);
617 }
618 
619 
625 template<bool FORWARD>
626 struct InterfaceInformationChooser
627 {
631  static const InterfaceInformation&
632  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
633  {
634  return info.first;
635  }
636 
640  static const InterfaceInformation&
641  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
642  {
643  return info.second;
644  }
645 };
646 
647 template<>
648 struct InterfaceInformationChooser<false>
649 {
650  static const InterfaceInformation&
651  getSend(const std::pair<InterfaceInformation,InterfaceInformation>& info)
652  {
653  return info.second;
654  }
655 
656  static const InterfaceInformation&
657  getReceive(const std::pair<InterfaceInformation,InterfaceInformation>& info)
658  {
659  return info.first;
660  }
661 };
662 
668 template<class DataHandle>
669 struct PackEntries
670 {
671 
672  int operator()(DataHandle& handle, InterfaceTracker& tracker,
673  MessageBuffer<typename DataHandle::DataType>& buffer,
674  [[maybe_unused]] int i) const
675  {
676  return operator()(handle,tracker,buffer);
677  }
678 
686  int operator()(DataHandle& handle, InterfaceTracker& tracker,
687  MessageBuffer<typename DataHandle::DataType>& buffer) const
688  {
689  if(tracker.fixedSize) // fixed size if variable is >0!
690  {
691 
692  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
693  for(std::size_t i=0; i< noIndices; ++i)
694  {
695  handle.gather(buffer, tracker.index());
696  tracker.moveToNextIndex();
697  }
698  return noIndices*tracker.fixedSize;
699  }
700  else
701  {
702  int packed=0;
703  tracker.skipZeroIndices();
704  while(!tracker.finished())
705  if(buffer.hasSpaceForItems(handle.size(tracker.index())))
706  {
707  handle.gather(buffer, tracker.index());
708  packed+=handle.size(tracker.index());
709  tracker.moveToNextIndex();
710  }
711  else
712  break;
713  return packed;
714  }
715  }
716 };
717 
723 template<class DataHandle>
724 struct UnpackEntries{
725 
733  bool operator()(DataHandle& handle, InterfaceTracker& tracker,
734  MessageBuffer<typename DataHandle::DataType>& buffer,
735  int count=0)
736  {
737  if(tracker.fixedSize) // fixed size if variable is >0!
738  {
739  std::size_t noIndices=std::min(buffer.size()/tracker.fixedSize, tracker.indicesLeft());
740 
741  for(std::size_t i=0; i< noIndices; ++i)
742  {
743  handle.scatter(buffer, tracker.index(), tracker.fixedSize);
744  tracker.moveToNextIndex();
745  }
746  return tracker.finished();
747  }
748  else
749  {
750  assert(count);
751  for(int unpacked=0;unpacked<count;)
752  {
753  assert(!tracker.finished());
754  assert(buffer.hasSpaceForItems(tracker.size()));
755  handle.scatter(buffer, tracker.index(), tracker.size());
756  unpacked+=tracker.size();
757  tracker.moveToNextIndex();
758  }
759  return tracker.finished();
760  }
761  }
762 };
763 
764 
768 template<class DataHandle>
769 struct UnpackSizeEntries{
770 
778  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
779  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer) const
780  {
781  std::size_t noIndices=std::min(buffer.size(), tracker.indicesLeft());
782  std::copy(static_cast<std::size_t*>(buffer), static_cast<std::size_t*>(buffer)+noIndices,
783  handle.getSizesPointer()+tracker.offset());
784  tracker.increment(noIndices);
785  return noIndices;
786  }
787  bool operator()(SizeDataHandle<DataHandle>& handle, InterfaceTracker& tracker,
788  MessageBuffer<typename SizeDataHandle<DataHandle>::DataType>& buffer, int) const
789  {
790  return operator()(handle,tracker,buffer);
791  }
792 };
793 
801 void sendFixedSize(std::vector<InterfaceTracker>& send_trackers,
802  std::vector<MPI_Request>& send_requests,
803  std::vector<InterfaceTracker>& recv_trackers,
804  std::vector<MPI_Request>& recv_requests,
805  MPI_Comm communicator)
806 {
807  typedef std::vector<InterfaceTracker>::iterator TIter;
808  std::vector<MPI_Request>::iterator mIter=recv_requests.begin();
809 
810  for(TIter iter=recv_trackers.begin(), end=recv_trackers.end(); iter!=end;
811  ++iter, ++mIter)
812  {
813  MPI_Irecv(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
814  iter->rank(), 933881, communicator, &(*mIter));
815  }
816 
817  // Send our size to all neighbours using non-blocking synchronous communication.
818  std::vector<MPI_Request>::iterator mIter1=send_requests.begin();
819  for(TIter iter=send_trackers.begin(), end=send_trackers.end();
820  iter!=end;
821  ++iter, ++mIter1)
822  {
823  MPI_Issend(&(iter->fixedSize), 1, MPITraits<std::size_t>::getType(),
824  iter->rank(), 933881, communicator, &(*mIter1));
825  }
826 }
827 
828 
833 template<class DataHandle>
834 struct SetupSendRequest{
835  void operator()(DataHandle& handle,
836  InterfaceTracker& tracker,
837  MessageBuffer<typename DataHandle::DataType>& buffer,
838  MPI_Request& request,
839  MPI_Comm comm) const
840  {
841  buffer.reset();
842  int size=PackEntries<DataHandle>()(handle, tracker, buffer);
843  // Skip indices of zero size.
844  while(!tracker.finished() && !handle.size(tracker.index()))
845  tracker.moveToNextIndex();
846  if(size)
847  MPI_Issend(buffer, size, MPITraits<typename DataHandle::DataType>::getType(),
848  tracker.rank(), 933399, comm, &request);
849  }
850 };
851 
852 
857 template<class DataHandle>
858 struct SetupRecvRequest{
859  void operator()(DataHandle& /*handle*/,
860  InterfaceTracker& tracker,
861  MessageBuffer<typename DataHandle::DataType>& buffer,
862  MPI_Request& request,
863  MPI_Comm comm) const
864  {
865  buffer.reset();
866  if(tracker.indicesLeft())
867  MPI_Irecv(buffer, buffer.size(), MPITraits<typename DataHandle::DataType>::getType(),
868  tracker.rank(), 933399, comm, &request);
869  }
870 };
871 
875 template<class DataHandle>
876 struct NullPackUnpackFunctor
877 {
878  int operator()(DataHandle&, InterfaceTracker&,
879  MessageBuffer<typename DataHandle::DataType>&, int)
880  {
881  return 0;
882  }
883  int operator()(DataHandle&, InterfaceTracker&,
884  MessageBuffer<typename DataHandle::DataType>&)
885  {
886  return 0;
887  }
888 };
889 
904 template<class DataHandle, class BufferFunctor, class CommunicationFunctor>
905 std::size_t checkAndContinue(DataHandle& handle,
906  std::vector<InterfaceTracker>& trackers,
907  std::vector<MPI_Request>& requests,
908  std::vector<MPI_Request>& requests2,
909  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
910  MPI_Comm comm,
911  BufferFunctor buffer_func,
912  CommunicationFunctor comm_func,
913  bool valid=true,
914  bool getCount=false)
915 {
916  std::size_t size=requests.size();
917  std::vector<MPI_Status> statuses(size);
918  int no_completed;
919  std::vector<int> indices(size, -1); // the indices for which the communication finished.
920 
921  MPI_Testsome(size, &(requests[0]), &no_completed, &(indices[0]), &(statuses[0]));
922  indices.resize(no_completed);
923  for(std::vector<int>::iterator index=indices.begin(), end=indices.end();
924  index!=end; ++index)
925  {
926  InterfaceTracker& tracker=trackers[*index];
927  setReceivingIndex(handle, *index);
928  if(getCount)
929  {
930  // Get the number of entries received
931  int count;
932  MPI_Get_count(&(statuses[index-indices.begin()]),
934  &count);
935  // Communication completed, we can reuse the buffers, e.g. unpack or repack
936  buffer_func(handle, tracker, buffers[*index], count);
937  }else
938  buffer_func(handle, tracker, buffers[*index]);
939  tracker.skipZeroIndices();
940  if(!tracker.finished()){
941  // Maybe start another communication.
942  comm_func(handle, tracker, buffers[*index], requests2[*index], comm);
943  tracker.skipZeroIndices();
944  if(valid)
945  --no_completed; // communication not finished, decrement counter for finished ones.
946  }
947  }
948  return no_completed;
949 
950 }
951 
961 template<class DataHandle>
962 std::size_t receiveSizeAndSetupReceive(DataHandle& handle,
963  std::vector<InterfaceTracker>& trackers,
964  std::vector<MPI_Request>& size_requests,
965  std::vector<MPI_Request>& data_requests,
966  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
967  MPI_Comm comm)
968 {
969  return checkAndContinue(handle, trackers, size_requests, data_requests, buffers, comm,
970  NullPackUnpackFunctor<DataHandle>(), SetupRecvRequest<DataHandle>(), false);
971 }
972 
981 template<class DataHandle>
982 std::size_t checkSendAndContinueSending(DataHandle& handle,
983  std::vector<InterfaceTracker>& trackers,
984  std::vector<MPI_Request>& requests,
985  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
986  MPI_Comm comm)
987 {
988  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
989  NullPackUnpackFunctor<DataHandle>(), SetupSendRequest<DataHandle>());
990 }
991 
1000 template<class DataHandle>
1001 std::size_t checkReceiveAndContinueReceiving(DataHandle& handle,
1002  std::vector<InterfaceTracker>& trackers,
1003  std::vector<MPI_Request>& requests,
1004  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1005  MPI_Comm comm)
1006 {
1007  return checkAndContinue(handle, trackers, requests, requests, buffers, comm,
1008  UnpackEntries<DataHandle>(), SetupRecvRequest<DataHandle>(),
1009  true, !Impl::callFixedSize(handle));
1010 }
1011 
1012 
1013 bool validRecvRequests(const std::vector<MPI_Request> reqs)
1014 {
1015  for(std::vector<MPI_Request>::const_iterator i=reqs.begin(), end=reqs.end();
1016  i!=end; ++i)
1017  if(*i!=MPI_REQUEST_NULL)
1018  return true;
1019  return false;
1020 }
1021 
1032 template<class DataHandle, class Functor>
1033 std::size_t setupRequests(DataHandle& handle,
1034  std::vector<InterfaceTracker>& trackers,
1035  std::vector<MessageBuffer<typename DataHandle::DataType> >& buffers,
1036  std::vector<MPI_Request>& requests,
1037  const Functor& setupFunctor,
1038  MPI_Comm communicator)
1039 {
1040  typedef typename std::vector<InterfaceTracker>::iterator TIter;
1041  typename std::vector<MessageBuffer<typename DataHandle::DataType> >::iterator
1042  biter=buffers.begin();
1043  typename std::vector<MPI_Request>::iterator riter=requests.begin();
1044  std::size_t complete=0;
1045  for(TIter titer=trackers.begin(), end=trackers.end(); titer!=end; ++titer, ++biter, ++riter)
1046  {
1047  setupFunctor(handle, *titer, *biter, *riter, communicator);
1048  complete+=titer->finished();
1049  }
1050  return complete;
1051 }
1052 } // end unnamed namespace
1053 
1054 template<class Allocator>
1055 template<bool FORWARD, class DataHandle>
1056 void VariableSizeCommunicator<Allocator>::setupInterfaceTrackers(DataHandle& handle,
1057  std::vector<InterfaceTracker>& send_trackers,
1058  std::vector<InterfaceTracker>& recv_trackers)
1059 {
1060  if(interface_->size()==0)
1061  return;
1062  send_trackers.reserve(interface_->size());
1063  recv_trackers.reserve(interface_->size());
1064 
1065  int fixedsize=0;
1066  if(Impl::callFixedSize(handle))
1067  ++fixedsize;
1068 
1069 
1070  typedef typename InterfaceMap::const_iterator IIter;
1071  for(IIter inf=interface_->begin(), end=interface_->end(); inf!=end; ++inf)
1072  {
1073 
1074  if(Impl::callFixedSize(handle) && InterfaceInformationChooser<FORWARD>::getSend(inf->second).size())
1075  fixedsize=handle.size(InterfaceInformationChooser<FORWARD>::getSend(inf->second)[0]);
1076  assert(!Impl::callFixedSize(handle)||fixedsize>0);
1077  send_trackers.push_back(InterfaceTracker(inf->first,
1078  InterfaceInformationChooser<FORWARD>::getSend(inf->second), fixedsize));
1079  recv_trackers.push_back(InterfaceTracker(inf->first,
1080  InterfaceInformationChooser<FORWARD>::getReceive(inf->second), fixedsize, fixedsize==0));
1081  }
1082 }
1083 
1084 template<class Allocator>
1085 template<bool FORWARD, class DataHandle>
1086 void VariableSizeCommunicator<Allocator>::communicateFixedSize(DataHandle& handle)
1087 {
1088  std::vector<MPI_Request> size_send_req(interface_->size());
1089  std::vector<MPI_Request> size_recv_req(interface_->size());
1090 
1091  std::vector<InterfaceTracker> send_trackers;
1092  std::vector<InterfaceTracker> recv_trackers;
1093  setupInterfaceTrackers<FORWARD>(handle,send_trackers, recv_trackers);
1094  sendFixedSize(send_trackers, size_send_req, recv_trackers, size_recv_req, communicator_);
1095 
1096  std::vector<MPI_Request> data_send_req(interface_->size(), MPI_REQUEST_NULL);
1097  std::vector<MPI_Request> data_recv_req(interface_->size(), MPI_REQUEST_NULL);
1098  typedef typename DataHandle::DataType DataType;
1099  std::vector<MessageBuffer<DataType> > send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1100  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1101 
1102 
1103  setupRequests(handle, send_trackers, send_buffers, data_send_req,
1104  SetupSendRequest<DataHandle>(), communicator_);
1105 
1106  std::size_t no_size_to_recv, no_to_send, no_to_recv, old_size;
1107  no_size_to_recv = no_to_send = no_to_recv = old_size = interface_->size();
1108 
1109  // Skip empty interfaces.
1110  typedef typename std::vector<InterfaceTracker>::const_iterator Iter;
1111  for(Iter i=recv_trackers.begin(), end=recv_trackers.end(); i!=end; ++i)
1112  if(i->empty())
1113  --no_to_recv;
1114  for(Iter i=send_trackers.begin(), end=send_trackers.end(); i!=end; ++i)
1115  if(i->empty())
1116  --no_to_send;
1117 
1118  while(no_size_to_recv+no_to_send+no_to_recv)
1119  {
1120  // Receive the fixedsize and setup receives accordingly
1121  if(no_size_to_recv)
1122  no_size_to_recv -= receiveSizeAndSetupReceive(handle,recv_trackers, size_recv_req,
1123  data_recv_req, recv_buffers,
1124  communicator_);
1125 
1126  // Check send completion and initiate other necessary sends
1127  if(no_to_send)
1128  no_to_send -= checkSendAndContinueSending(handle, send_trackers, data_send_req,
1129  send_buffers, communicator_);
1130  if(validRecvRequests(data_recv_req))
1131  // Receive data and setup new unblocking receives if necessary
1132  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, data_recv_req,
1133  recv_buffers, communicator_);
1134  }
1135 
1136  // Wait for completion of sending the size.
1137  //std::vector<MPI_Status> statuses(interface_->size(), MPI_STATUSES_IGNORE);
1138  MPI_Waitall(size_send_req.size(), &(size_send_req[0]), MPI_STATUSES_IGNORE);
1139 
1140 }
1141 
1142 template<class Allocator>
1143 template<bool FORWARD, class DataHandle>
1144 void VariableSizeCommunicator<Allocator>::communicateSizes(DataHandle& handle,
1145  std::vector<InterfaceTracker>& data_recv_trackers)
1146 {
1147  std::vector<InterfaceTracker> send_trackers;
1148  std::vector<InterfaceTracker> recv_trackers;
1149  std::size_t size = interface_->size();
1150  std::vector<MPI_Request> send_requests(size, MPI_REQUEST_NULL);
1151  std::vector<MPI_Request> recv_requests(size, MPI_REQUEST_NULL);
1152  std::vector<MessageBuffer<std::size_t> >
1153  send_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_)),
1154  recv_buffers(size, MessageBuffer<std::size_t>(maxBufferSize_));
1155  SizeDataHandle<DataHandle> size_handle(handle,data_recv_trackers);
1156  setupInterfaceTrackers<FORWARD>(size_handle,send_trackers, recv_trackers);
1157  setupRequests(size_handle, send_trackers, send_buffers, send_requests,
1158  SetupSendRequest<SizeDataHandle<DataHandle> >(), communicator_);
1159  setupRequests(size_handle, recv_trackers, recv_buffers, recv_requests,
1160  SetupRecvRequest<SizeDataHandle<DataHandle> >(), communicator_);
1161 
1162  // Count valid requests that we have to wait for.
1163  auto valid_req_func =
1164  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL; };
1165 
1166  auto size_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1167  valid_req_func);
1168  auto size_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1169  valid_req_func);
1170 
1171  while(size_to_send+size_to_recv)
1172  {
1173  if(size_to_send)
1174  size_to_send -=
1175  checkSendAndContinueSending(size_handle, send_trackers, send_requests,
1176  send_buffers, communicator_);
1177  if(size_to_recv)
1178  // Could have done this using checkSendAndContinueSending
1179  // But the call below is more efficient as UnpackSizeEntries
1180  // uses std::copy.
1181  size_to_recv -=
1182  checkAndContinue(size_handle, recv_trackers, recv_requests, recv_requests,
1183  recv_buffers, communicator_, UnpackSizeEntries<DataHandle>(),
1184  SetupRecvRequest<SizeDataHandle<DataHandle> >());
1185  }
1186 }
1187 
1188 template<class Allocator>
1189 template<bool FORWARD, class DataHandle>
1190 void VariableSizeCommunicator<Allocator>::communicateVariableSize(DataHandle& handle)
1191 {
1192 
1193  std::vector<InterfaceTracker> send_trackers;
1194  std::vector<InterfaceTracker> recv_trackers;
1195  setupInterfaceTrackers<FORWARD>(handle, send_trackers, recv_trackers);
1196 
1197  std::vector<MPI_Request> send_requests(interface_->size(), MPI_REQUEST_NULL);
1198  std::vector<MPI_Request> recv_requests(interface_->size(), MPI_REQUEST_NULL);
1199  typedef typename DataHandle::DataType DataType;
1200  std::vector<MessageBuffer<DataType> >
1201  send_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_)),
1202  recv_buffers(interface_->size(), MessageBuffer<DataType>(maxBufferSize_));
1203 
1204  communicateSizes<FORWARD>(handle, recv_trackers);
1205  // Setup requests for sending and receiving.
1206  setupRequests(handle, send_trackers, send_buffers, send_requests,
1207  SetupSendRequest<DataHandle>(), communicator_);
1208  setupRequests(handle, recv_trackers, recv_buffers, recv_requests,
1209  SetupRecvRequest<DataHandle>(), communicator_);
1210 
1211  // Determine number of valid requests.
1212  auto valid_req_func =
1213  [](const MPI_Request& req) { return req != MPI_REQUEST_NULL;};
1214 
1215  auto no_to_send = std::count_if(send_requests.begin(), send_requests.end(),
1216  valid_req_func);
1217  auto no_to_recv = std::count_if(recv_requests.begin(), recv_requests.end(),
1218  valid_req_func);
1219  while(no_to_send+no_to_recv)
1220  {
1221  // Check send completion and initiate other necessary sends
1222  if(no_to_send)
1223  no_to_send -= checkSendAndContinueSending(handle, send_trackers, send_requests,
1224  send_buffers, communicator_);
1225  if(no_to_recv)
1226  // Receive data and setup new unblocking receives if necessary
1227  no_to_recv -= checkReceiveAndContinueReceiving(handle, recv_trackers, recv_requests,
1228  recv_buffers, communicator_);
1229  }
1230 }
1231 
1232 template<class Allocator>
1233 template<bool FORWARD, class DataHandle>
1234 void VariableSizeCommunicator<Allocator>::communicate(DataHandle& handle)
1235 {
1236  if( interface_->size() == 0)
1237  // Simply return as otherwise we will index an empty container
1238  // either for MPI_Wait_all or MPI_Test_some.
1239  return;
1240 
1241  if(Impl::callFixedSize(handle))
1242  communicateFixedSize<FORWARD>(handle);
1243  else
1244  communicateVariableSize<FORWARD>(handle);
1245 }
1246 } // end namespace Dune
1247 
1248 #endif // HAVE_MPI
1249 
1250 #endif
Infrastructure for concepts.
Traits classes for mapping types onto MPI_Datatype.
std::size_t fixedSize
The number of data items per index if it is fixed, 0 otherwise.
Definition: variablesizecommunicator.hh:272
MPI_Comm communicator() const
Get the MPI Communicator.
Definition: parallel/interface.hh:415
Dune namespace.
Definition: alignedallocator.hh:11
auto min(const AlignedNumber< T, align > &a, const AlignedNumber< T, align > &b)
Definition: debugalign.hh:434
static MPI_Datatype getType()
Definition: mpitraits.hh:46
size_t size() const
Get the number of entries in the interface.
Definition: parallel/interface.hh:106
Communication interface between remote and local indices.
Definition: parallel/interface.hh:207
Definition: variablesizecommunicator.hh:41
auto require(H &&h) -> decltype(h.fixedSize())
A buffered communicator where the amount of data sent does not have to be known a priori.
Definition: variablesizecommunicator.hh:339
VariableSizeCommunicator(const Interface &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:411
void backward(DataHandle &handle)
Communicate backwards.
Definition: variablesizecommunicator.hh:493
~VariableSizeCommunicator()
Definition: variablesizecommunicator.hh:417
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf, std::size_t max_buffer_size)
Creates a communicator with a specific maximum buffer size.
Definition: variablesizecommunicator.hh:400
VariableSizeCommunicator(const VariableSizeCommunicator &other)
Copy-constructs a communicator.
Definition: variablesizecommunicator.hh:426
void forward(DataHandle &handle)
Communicate forward.
Definition: variablesizecommunicator.hh:468
VariableSizeCommunicator & operator=(const VariableSizeCommunicator &other)
Copy-assignes a communicator.
Definition: variablesizecommunicator.hh:436
std::map< int, std::pair< InterfaceInformation, InterfaceInformation >, std::less< int >, typename std::allocator_traits< Allocator >::template rebind_alloc< std::pair< const int, std::pair< InterfaceInformation, InterfaceInformation > > > > InterfaceMap
The type of the map from process number to InterfaceInformation for sending and receiving to and from...
Definition: variablesizecommunicator.hh:347
VariableSizeCommunicator(MPI_Comm comm, const InterfaceMap &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:356
VariableSizeCommunicator(const Interface &inf)
Creates a communicator with the default maximum buffer size.
Definition: variablesizecommunicator.hh:365
Provides classes for building the communication interface between remote indices.