#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <assert.h>
#include <sys/types.h>
#include <unistd.h>

#define NUM_MSGS	20
#define MSG_SIZE	2048
#define MSG_TAG		48

// int my_proc;
// char buffer[4*NUM_MSGS][MSG_SIZE];
// MPI_Request 	requests[4*NUM_MSGS];

void* thread_func(void* arg) {
  int num_recvd = 0;
  int flag, src, len, tag;
  char* buf;
  int cee,cee1,i,cee2;
  MPI_Status status;
  MPI_Comm_rank(MPI_COMM_WORLD, &cee2);
    
  while (1) {
    /*   if(MPI_Isend(&cee2, 1, MPI_INT, 0, MSG_TAG, MPI_COMM_WORLD, &requests[num_recvd])!=MPI_SUCCESS)
	 assert(0);*/
    if(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status)!=MPI_SUCCESS)
      assert(0);
    num_recvd++;
    tag = status.MPI_TAG;
    src = status.MPI_SOURCE;
    /*    if(MPI_Get_count(&status, MPI_INT, &len) != MPI_SUCCESS)
	  assert(0);
	  fprintf(stderr, "%i: Receiving %i bytes, tag = %u, src = %u\n", my_proc, len, tag, src);
	  if(!(buf = (int*) malloc(len)))
	  assert(0);*/
    if(MPI_Recv(&cee1, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status)!=MPI_SUCCESS) 
      assert(0);
    fprintf(stderr, "%i: MPI_Recv OK <-- %d num_recvd: %d\n", cee2,cee1,num_recvd);   
    if (cee1 < 0){
      break;
    }
  }
  return 0;
}

int main(int argc, char* argv[])
{
  pthread_t thread;
  void* stat;
  int tgt, i; 
   
  int cee,cee1,cee2,numOfNodes;
  int my_proc; 
  int num_recvd = 0;
  int flag, src, len, tag;
  //   int* buf;
  char* buf;
  MPI_Status status;
  MPI_Request request1;
  
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &my_proc);
  MPI_Comm_size(MPI_COMM_WORLD, &numOfNodes);
  if(my_proc==0)
    { 
      pthread_create(&thread, NULL, thread_func, NULL);
      i = (numOfNodes)*NUM_MSGS-1;
      while (1){
	cee=i;
	cee2=cee%numOfNodes;
// 	printf("***********************\n");
// 	fflush(stdout);
	if(cee < 0)
	  {
	    cee2=numOfNodes+cee;
	  }

// 	printf("i:%d cee2=%d  \n",cee,cee2);  
// 	fflush(stdout);
    		
	if (i == (0 - numOfNodes-1)){
	  // 		pthread_cancel(thread1);
	  printf("ii=%d --> ended \n",i);
	  fflush(stdout);
	  break;
	}
	if(MPI_Isend(&cee, 1, MPI_INT, cee2, MSG_TAG, MPI_COMM_WORLD, &request1)!=MPI_SUCCESS)
	  assert(0);   
	--i;
      }
    }
  else{
    printf("hold on \n");
    fflush(stdout);
    while (1) {
      // //    if(MPI_Isend(&my_proc, 1, MPI_INT, 0, MSG_TAG, MPI_COMM_WORLD, &requests[i])!=MPI_SUCCESS)
      // //       assert(0);
      if(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status)!=MPI_SUCCESS)
	assert(0);
      num_recvd++;
      tag = status.MPI_TAG;
      src = status.MPI_SOURCE;
      if(MPI_Recv(&cee1, 1, MPI_INT, src, tag, MPI_COMM_WORLD, &status)!=MPI_SUCCESS) 
	assert(0);
      fprintf(stderr, "%i: MPI_Recv OK <-- %d num_recvd: %d\n", my_proc,cee1,num_recvd);
      if (cee1 < 0){
	break;
      }
    }
  }
  //   pthread_create(&thread, NULL, thread_func, NULL);
  if(my_proc==0){
    pthread_join(thread, &stat);
    fprintf(stderr, "JOINED!\n");
  }
  MPI_Barrier(MPI_COMM_WORLD);
  MPI_Finalize();
  
  return 0;
}
