dune-common  2.2.0
communicator.hh
Go to the documentation of this file.
1 // $Id$
2 #ifndef DUNE_COMMUNICATOR
3 #define DUNE_COMMUNICATOR
4 
5 #include "remoteindices.hh"
6 #include "interface.hh"
10 
11 #if HAVE_MPI
12 // MPI header
13 #include <mpi.h>
14 
15 namespace Dune
16 {
96  struct SizeOne
97  {};
98 
105  {};
106 
107 
113  template<class V>
114  struct CommPolicy
115  {
127  typedef V Type;
128 
134  typedef typename V::value_type IndexedType;
135 
141 
150  static const void* getAddress(const V& v, int index);
151 
157  static int getSize(const V&, int index);
158  };
159 
160  template<class K, int n> class FieldVector;
161 
162  template<class B, class A> class VariableBlockVector;
163 
164  template<class K, class A, int n>
165  struct CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >
166  {
167  typedef VariableBlockVector<FieldVector<K, n>, A> Type;
168 
169  typedef typename Type::B IndexedType;
170 
172 
173  static const void* getAddress(const Type& v, int i);
174 
175  static int getSize(const Type& v, int i);
176  };
177 
182  {};
183 
187  template<class T>
189  {
191 
192  static const IndexedType& gather(const T& vec, std::size_t i);
193 
194  static void scatter(T& vec, const IndexedType& v, std::size_t i);
195 
196  };
197 
209  template<typename T>
210  class DatatypeCommunicator : public InterfaceBuilder
211  {
212  public:
213 
217  typedef T ParallelIndexSet;
218 
223 
227  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
228 
232  typedef typename RemoteIndices::Attribute Attribute;
233 
237  typedef typename RemoteIndices::LocalIndex LocalIndex;
238 
242  DatatypeCommunicator();
243 
247  ~DatatypeCommunicator();
248 
275  template<class T1, class T2, class V>
276  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
277 
281  void forward();
282 
286  void backward();
287 
291  void free();
292  private:
293  enum {
297  commTag_ = 234
298  };
299 
303  const RemoteIndices* remoteIndices_;
304 
305  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
306  MessageTypeMap;
307 
311  MessageTypeMap messageTypes;
312 
316  void* data_;
317 
318  MPI_Request* requests_[2];
319 
323  bool created_;
324 
328  template<class V, bool FORWARD>
329  void createRequests(V& sendData, V& receiveData);
330 
334  template<class T1, class T2, class V, bool send>
335  void createDataTypes(const T1& source, const T2& destination, V& data);
336 
340  void sendRecv(MPI_Request* req);
341 
345  struct IndexedTypeInformation
346  {
352  void build(int i)
353  {
354  length = new int[i];
355  displ = new MPI_Aint[i];
356  size = i;
357  }
358 
362  void free()
363  {
364  delete[] length;
365  delete[] displ;
366  }
368  int* length;
370  MPI_Aint* displ;
376  int elements;
380  int size;
381  };
382 
388  template<class V>
389  struct MPIDatatypeInformation
390  {
395  MPIDatatypeInformation(const V& data): data_(data)
396  {}
397 
403  void reserve(int proc, int size)
404  {
405  information_[proc].build(size);
406  }
413  void add(int proc, int local)
414  {
415  IndexedTypeInformation& info=information_[proc];
416  assert(info.elements<info.size);
417  MPI_Address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
418  info.displ+info.elements);
419  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
420  info.elements++;
421  }
422 
427  std::map<int,IndexedTypeInformation> information_;
431  const V& data_;
432 
433  };
434 
435  };
436 
447  {
448 
449  public:
454 
461  template<class Data, class Interface>
463  build(const Interface& interface);
464 
472  template<class Data, class Interface>
473  void build(const Data& source, const Data& target, const Interface& interface);
474 
503  template<class GatherScatter, class Data>
504  void forward(const Data& source, Data& dest);
505 
534  template<class GatherScatter, class Data>
535  void backward(Data& source, const Data& dest);
536 
562  template<class GatherScatter, class Data>
563  void forward(Data& data);
564 
590  template<class GatherScatter, class Data>
591  void backward(Data& data);
592 
596  void free();
597 
602 
603  private:
604 
608  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
609  InterfaceMap;
610 
611 
615  template<class Data, typename IndexedTypeFlag>
616  struct MessageSizeCalculator
617  {};
618 
623  template<class Data>
624  struct MessageSizeCalculator<Data,SizeOne>
625  {
632  inline int operator()(const InterfaceInformation& info) const;
641  inline int operator()(const Data& data, const InterfaceInformation& info) const;
642  };
643 
648  template<class Data>
649  struct MessageSizeCalculator<Data,VariableSize>
650  {
659  inline int operator()(const Data& data, const InterfaceInformation& info) const;
660  };
661 
665  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
666  struct MessageGatherer
667  {};
668 
673  template<class Data, class GatherScatter, bool send>
674  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
675  {
677  typedef typename CommPolicy<Data>::IndexedType Type;
678 
683  typedef GatherScatter Gatherer;
684 
685  enum{
691  forward=send
692  };
693 
701  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
702  };
703 
708  template<class Data, class GatherScatter, bool send>
709  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
710  {
712  typedef typename CommPolicy<Data>::IndexedType Type;
713 
718  typedef GatherScatter Gatherer;
719 
720  enum{
726  forward=send
727  };
728 
736  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
737  };
738 
742  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
743  struct MessageScatterer
744  {};
745 
750  template<class Data, class GatherScatter, bool send>
751  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
752  {
754  typedef typename CommPolicy<Data>::IndexedType Type;
755 
760  typedef GatherScatter Scatterer;
761 
762  enum{
768  forward=send
769  };
770 
778  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
779  };
784  template<class Data, class GatherScatter, bool send>
785  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
786  {
788  typedef typename CommPolicy<Data>::IndexedType Type;
789 
794  typedef GatherScatter Scatterer;
795 
796  enum{
802  forward=send
803  };
804 
812  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
813  };
814 
818  struct MessageInformation
819  {
821  MessageInformation()
822  : start_(0), size_(0)
823  {}
824 
832  MessageInformation(size_t start, size_t size)
833  :start_(start), size_(size)
834  {}
838  size_t start_;
842  size_t size_;
843  };
844 
851  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
852  InformationMap;
856  InformationMap messageInformation_;
860  char* buffers_[2];
864  size_t bufferSize_[2];
865 
866  enum{
870  commTag_
871  };
872 
876  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
877 
878  MPI_Comm communicator_;
879 
883  template<class GatherScatter, bool FORWARD, class Data>
884  void sendRecv(const Data& source, Data& target);
885 
886  };
887 
888 #ifndef DOXYGEN
889 
890  template<class V>
891  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
892  {
893  return &(v[index]);
894  }
895 
896  template<class V>
897  inline int CommPolicy<V>::getSize(const V& v, int index)
898  {
899  return 1;
900  }
901 
902  template<class K, class A, int n>
903  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
904  {
905  return &(v[index][0]);
906  }
907 
908  template<class K, class A, int n>
909  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
910  {
911  return v[index].getsize();
912  }
913 
914  template<class T>
915  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T& vec, std::size_t i)
916  {
917  return vec[i];
918  }
919 
920  template<class T>
921  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
922  {
923  vec[i]=v;
924  }
925 
926  template<typename T>
927  DatatypeCommunicator<T>::DatatypeCommunicator()
928  : remoteIndices_(0), created_(false)
929  {
930  requests_[0]=0;
931  requests_[1]=0;
932  }
933 
934 
935 
936  template<typename T>
937  DatatypeCommunicator<T>::~DatatypeCommunicator()
938  {
939  free();
940  }
941 
942  template<typename T>
943  template<class T1, class T2, class V>
944  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
945  const T1& source, V& sendData,
946  const T2& destination, V& receiveData)
947  {
948  remoteIndices_ = &remoteIndices;
949  free();
950  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
951  createDataTypes<T1,T2,V,true>(source,destination, sendData);
952  createRequests<V,true>(sendData, receiveData);
953  createRequests<V,false>(receiveData, sendData);
954  created_=true;
955  }
956 
957  template<typename T>
958  void DatatypeCommunicator<T>::free()
959  {
960  if(created_){
961  delete[] requests_[0];
962  delete[] requests_[1];
963  typedef MessageTypeMap::iterator iterator;
964  typedef MessageTypeMap::const_iterator const_iterator;
965 
966  const const_iterator end=messageTypes.end();
967 
968  for(iterator process = messageTypes.begin(); process != end; ++process){
969  MPI_Datatype *type = &(process->second.first);
970  int finalized=0;
971 #if MPI_2
972  MPI_Finalized(&finalized);
973 #endif
974  if(*type!=MPI_DATATYPE_NULL && !finalized)
975  MPI_Type_free(type);
976  type = &(process->second.second);
977  if(*type!=MPI_DATATYPE_NULL && !finalized)
978  MPI_Type_free(type);
979  }
980  messageTypes.clear();
981  created_=false;
982  }
983 
984  }
985 
986  template<typename T>
987  template<class T1, class T2, class V, bool send>
988  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
989  {
990 
991  MPIDatatypeInformation<V> dataInfo(data);
992  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
993 
994  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
995  const const_iterator end=this->remoteIndices_->end();
996 
997  // Allocate MPI_Datatypes and deallocate memory for the type construction.
998  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process){
999  IndexedTypeInformation& info=dataInfo.information_[process->first];
1000  // Shift the displacement
1001  MPI_Aint base;
1002  MPI_Address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1003 
1004  for(int i=0; i< info.elements; i++){
1005  info.displ[i]-=base;
1006  }
1007 
1008  // Create data type
1009  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1010  MPI_Type_hindexed(info.elements, info.length, info.displ,
1011  MPITraits<typename CommPolicy<V>::IndexedType>::getType(),
1012  type);
1013  MPI_Type_commit(type);
1014  // Deallocate memory
1015  info.free();
1016  }
1017  }
1018 
1019  template<typename T>
1020  template<class V, bool createForward>
1021  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1022  {
1023  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1024  int rank;
1025  static int index = createForward?1:0;
1026  int noMessages = messageTypes.size();
1027  // allocate request handles
1028  requests_[index] = new MPI_Request[2*noMessages];
1029  const MapIterator end = messageTypes.end();
1030  int request=0;
1031  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1032 
1033  // Set up the requests for receiving first
1034  for(MapIterator process = messageTypes.begin(); process != end;
1035  ++process, ++request){
1036  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1037  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1038  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1039  }
1040 
1041  // And now the send requests
1042 
1043  for(MapIterator process = messageTypes.begin(); process != end;
1044  ++process, ++request){
1045  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1046  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1047  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1048  }
1049  }
1050 
1051  template<typename T>
1052  void DatatypeCommunicator<T>::forward()
1053  {
1054  sendRecv(requests_[1]);
1055  }
1056 
1057  template<typename T>
1058  void DatatypeCommunicator<T>::backward()
1059  {
1060  sendRecv(requests_[0]);
1061  }
1062 
1063  template<typename T>
1064  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1065  {
1066  int noMessages = messageTypes.size();
1067  // Start the receive calls first
1068  MPI_Startall(noMessages, requests);
1069  // Now the send calls
1070  MPI_Startall(noMessages, requests+noMessages);
1071 
1072  // Wait for completion of the communication send first then receive
1073  MPI_Status* status=new MPI_Status[2*noMessages];
1074  for(int i=0; i<2*noMessages; i++)
1075  status[i].MPI_ERROR=MPI_SUCCESS;
1076 
1077  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1078  int receive = MPI_Waitall(noMessages, requests, status);
1079 
1080  // Error checks
1081  int success=1, globalSuccess=0;
1082  if(send==MPI_ERR_IN_STATUS){
1083  int rank;
1084  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1085  std::cerr<<rank<<": Error in sending :"<<std::endl;
1086  // Search for the error
1087  for(int i=noMessages; i< 2*noMessages; i++)
1088  if(status[i].MPI_ERROR!=MPI_SUCCESS){
1089  char message[300];
1090  int messageLength;
1091  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1092  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1093  for(int i=0; i< messageLength; i++)
1094  std::cout<<message[i];
1095  }
1096  std::cerr<<std::endl;
1097  success=0;
1098  }
1099 
1100  if(receive==MPI_ERR_IN_STATUS){
1101  int rank;
1102  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1103  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1104  // Search for the error
1105  for(int i=0; i< noMessages; i++)
1106  if(status[i].MPI_ERROR!=MPI_SUCCESS){
1107  char message[300];
1108  int messageLength;
1109  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1110  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1111  for(int i=0; i< messageLength; i++)
1112  std::cerr<<message[i];
1113  }
1114  std::cerr<<std::endl;
1115  success=0;
1116  }
1117 
1118  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1119 
1120  delete[] status;
1121 
1122  if(!globalSuccess)
1123  DUNE_THROW(CommunicationError, "A communication error occurred!");
1124 
1125  }
1126 
1128  {
1129  buffers_[0]=0;
1130  buffers_[1]=0;
1131  bufferSize_[0]=0;
1132  bufferSize_[1]=0;
1133  }
1134 
1135  template<class Data, class Interface>
1136  typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1137  BufferedCommunicator::build(const Interface& interface)
1138  {
1139  interfaces_=interface.interfaces();
1140  communicator_=interface.communicator();
1141  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1142  ::const_iterator const_iterator;
1143  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1144  const const_iterator end = interfaces_.end();
1145  int lrank;
1146  MPI_Comm_rank(communicator_, &lrank);
1147 
1148  bufferSize_[0]=0;
1149  bufferSize_[1]=0;
1150 
1151  for(const_iterator interfacePair = interfaces_.begin();
1152  interfacePair != end; ++interfacePair){
1153  int noSend = MessageSizeCalculator<Data,Flag>()(interfacePair->second.first);
1154  int noRecv = MessageSizeCalculator<Data,Flag>()(interfacePair->second.second);
1155  messageInformation_.insert(std::make_pair(interfacePair->first,
1156  std::make_pair(MessageInformation(bufferSize_[0],
1157  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1158  MessageInformation(bufferSize_[1],
1159  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1160  bufferSize_[0] += noSend;
1161  bufferSize_[1] += noRecv;
1162  }
1163 
1164  // allocate the buffers
1165  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1166  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1167 
1168  buffers_[0] = new char[bufferSize_[0]];
1169  buffers_[1] = new char[bufferSize_[1]];
1170  }
1171 
1172  template<class Data, class Interface>
1173  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1174  {
1175 
1176  interfaces_=interface.interfaces();
1177  communicator_=interface.communicator();
1178  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1179  ::const_iterator const_iterator;
1180  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1181  const const_iterator end = interfaces_.end();
1182 
1183  bufferSize_[0]=0;
1184  bufferSize_[1]=0;
1185 
1186  for(const_iterator interfacePair = interfaces_.begin();
1187  interfacePair != end; ++interfacePair){
1188  int noSend = MessageSizeCalculator<Data,Flag>()(source, interfacePair->second.first);
1189  int noRecv = MessageSizeCalculator<Data,Flag>()(dest, interfacePair->second.second);
1190 
1191  messageInformation_.insert(std::make_pair(interfacePair->first,
1192  std::make_pair(MessageInformation(bufferSize_[0],
1193  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1194  MessageInformation(bufferSize_[1],
1195  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1196  bufferSize_[0] += noSend;
1197  bufferSize_[1] += noRecv;
1198  }
1199 
1200  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1201  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1202  // allocate the buffers
1203  buffers_[0] = new char[bufferSize_[0]];
1204  buffers_[1] = new char[bufferSize_[1]];
1205  }
1206 
1207  inline void BufferedCommunicator::free()
1208  {
1209  messageInformation_.clear();
1210  if(buffers_[0])
1211  delete[] buffers_[0];
1212 
1213  if(buffers_[1])
1214  delete[] buffers_[1];
1215  buffers_[0]=buffers_[1]=0;
1216  }
1217 
1219  {
1220  free();
1221  }
1222 
1223  template<class Data>
1224  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1225  (const InterfaceInformation& info) const
1226  {
1227  return info.size();
1228  }
1229 
1230 
1231  template<class Data>
1232  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1233  (const Data& data, const InterfaceInformation& info) const
1234  {
1235  return operator()(info);
1236  }
1237 
1238 
1239  template<class Data>
1240  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1241  (const Data& data, const InterfaceInformation& info) const
1242  {
1243  int entries=0;
1244 
1245  for(size_t i=0; i < info.size(); i++)
1246  entries += CommPolicy<Data>::getSize(data,info[i]);
1247 
1248  return entries;
1249  }
1250 
1251 
1252  template<class Data, class GatherScatter, bool FORWARD>
1253  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1254  {
1255  typedef typename InterfaceMap::const_iterator
1256  const_iterator;
1257 
1258  int rank;
1259  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1260  const const_iterator end = interfaces.end();
1261  size_t index=0;
1262 
1263  for(const_iterator interfacePair = interfaces.begin();
1264  interfacePair != end; ++interfacePair){
1265  int size = forward ? interfacePair->second.first.size() :
1266  interfacePair->second.second.size();
1267 
1268  for(int i=0; i < size; i++){
1269  int local = forward ? interfacePair->second.first[i] :
1270  interfacePair->second.second[i];
1271  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local);j++, index++){
1272 
1273 #ifdef DUNE_ISTL_WITH_CHECKING
1274  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1275 #endif
1276  buffer[index]=GatherScatter::gather(data, local, j);
1277  }
1278 
1279  }
1280  }
1281 
1282  }
1283 
1284 
1285  template<class Data, class GatherScatter, bool FORWARD>
1286  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize)const
1287  {
1288  typedef typename InterfaceMap::const_iterator
1289  const_iterator;
1290  const const_iterator end = interfaces.end();
1291  size_t index = 0;
1292 
1293  int rank;
1294  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1295 
1296  for(const_iterator interfacePair = interfaces.begin();
1297  interfacePair != end; ++interfacePair){
1298  size_t size = FORWARD ? interfacePair->second.first.size() :
1299  interfacePair->second.second.size();
1300 
1301  for(size_t i=0; i < size; i++){
1302 
1303 #ifdef DUNE_ISTL_WITH_CHECKING
1304  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1305 #endif
1306 
1307  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1308  interfacePair->second.second[i]);
1309  }
1310  }
1311 
1312  }
1313 
1314 
1315  template<class Data, class GatherScatter, bool FORWARD>
1316  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc)const
1317  {
1318  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1319  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1320 
1321  assert(infoPair!=interfaces.end());
1322 
1323  const Information& info = FORWARD ? infoPair->second.second :
1324  infoPair->second.first;
1325 
1326  for(size_t i=0, index=0; i < info.size(); i++){
1327  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1328  GatherScatter::scatter(data, buffer[index++], info[i], j);
1329  }
1330  }
1331 
1332 
1333  template<class Data, class GatherScatter, bool FORWARD>
1334  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc)const
1335  {
1336  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1337  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1338 
1339  assert(infoPair!=interfaces.end());
1340 
1341  const Information& info = FORWARD ? infoPair->second.second :
1342  infoPair->second.first;
1343 
1344  for(size_t i=0; i < info.size(); i++){
1345  GatherScatter::scatter(data, buffer[i], info[i]);
1346  }
1347  }
1348 
1349 
1350  template<class GatherScatter,class Data>
1351  void BufferedCommunicator::forward(Data& data)
1352  {
1353  this->template sendRecv<GatherScatter,true>(data, data);
1354  }
1355 
1356 
1357  template<class GatherScatter, class Data>
1358  void BufferedCommunicator::backward(Data& data)
1359  {
1360  this->template sendRecv<GatherScatter,false>(data, data);
1361  }
1362 
1363 
1364  template<class GatherScatter, class Data>
1365  void BufferedCommunicator::forward(const Data& source, Data& dest)
1366  {
1367  this->template sendRecv<GatherScatter,true>(source, dest);
1368  }
1369 
1370 
1371  template<class GatherScatter, class Data>
1372  void BufferedCommunicator::backward(Data& source, const Data& dest)
1373  {
1374  this->template sendRecv<GatherScatter,false>(dest, source);
1375  }
1376 
1377 
1378  template<class GatherScatter, bool FORWARD, class Data>
1379  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1380  {
1381  int rank, lrank;
1382 
1383  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1384  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1385 
1386  typedef typename CommPolicy<Data>::IndexedType Type;
1387  Type *sendBuffer, *recvBuffer;
1388  size_t sendBufferSize;
1389 #ifndef NDEBUG
1390  size_t recvBufferSize;
1391 #endif
1392 
1393  if(FORWARD){
1394  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1395  sendBufferSize = bufferSize_[0];
1396  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1397 #ifndef NDEBUG
1398  recvBufferSize = bufferSize_[1];
1399 #endif
1400  }else{
1401  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1402  sendBufferSize = bufferSize_[1];
1403  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1404 #ifndef NDEBUG
1405  recvBufferSize = bufferSize_[0];
1406 #endif
1407  }
1408  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1409 
1410  MessageGatherer<Data,GatherScatter,FORWARD,Flag>()(interfaces_, source, sendBuffer, sendBufferSize);
1411 
1412  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1413  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1414 
1415  // Setup receive first
1416  typedef typename InformationMap::const_iterator const_iterator;
1417 
1418  const const_iterator end = messageInformation_.end();
1419  size_t i=0;
1420  int* processMap = new int[messageInformation_.size()];
1421 
1422  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i){
1423  processMap[i]=info->first;
1424  if(FORWARD){
1425  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1426  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1427  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1428  MPI_BYTE, info->first, commTag_, communicator_,
1429  recvRequests+i);
1430  }else{
1431  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1432  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1433  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1434  MPI_BYTE, info->first, commTag_, communicator_,
1435  recvRequests+i);
1436  }
1437  }
1438 
1439  // now the send requests
1440  i=0;
1441  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1442  if(FORWARD){
1443  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1444  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1445  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1446  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1447  MPI_BYTE, info->first, commTag_, communicator_,
1448  sendRequests+i);
1449  }else{
1450  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1451  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1452  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1453  MPI_BYTE, info->first, commTag_, communicator_,
1454  sendRequests+i);
1455  }
1456 
1457  // Wait for completion of receive and immediately start scatter
1458  i=0;
1459  //int success = 1;
1460  int finished = MPI_UNDEFINED;
1461  MPI_Status status;//[messageInformation_.size()];
1462  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1463 
1464  for(i=0;i< messageInformation_.size();i++){
1465  status.MPI_ERROR=MPI_SUCCESS;
1466  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1467  assert(finished != MPI_UNDEFINED);
1468 
1469  if(status.MPI_ERROR==MPI_SUCCESS){
1470  int& proc = processMap[finished];
1471  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1472  assert(infoIter != messageInformation_.end());
1473 
1474  MessageInformation info = (FORWARD)? infoIter->second.second : infoIter->second.first;
1475  assert(info.start_+info.size_ <= recvBufferSize);
1476 
1477  MessageScatterer<Data,GatherScatter,FORWARD,Flag>()(interfaces_, dest, recvBuffer+info.start_, proc);
1478  }else{
1479  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1480  //success=0;
1481  }
1482  }
1483 
1484  MPI_Status recvStatus;
1485 
1486  // Wait for completion of sends
1487  for(i=0;i< messageInformation_.size();i++)
1488  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)){
1489  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1490  //success=0;
1491  }
1492  /*
1493  int globalSuccess;
1494  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1495 
1496  if(!globalSuccess)
1497  DUNE_THROW(CommunicationError, "A communication error occurred!");
1498  */
1499  delete[] processMap;
1500  delete[] sendRequests;
1501  delete[] recvRequests;
1502 
1503  }
1504 
1505 #endif // DOXYGEN
1506 
1508 }
1509 
1510 #endif
1511 
1512 #endif