next up previous
Next: Cannon's Matrix-Matrix Multiplication with Up: Introduction to Parallel Algorithms Previous: Introduction to Parallel Algorithms


Odd-Even Sort

  1  #include <stdlib.h> 
  2  #include <mpi.h> /* Include MPI's header file */ 
  3 
  4  main(int argc, char *argv[]) 
  5  { 
  6    int n;         /* The total number of elements to be sorted */ 
  7    int npes;      /* The total number of processes */ 
  8    int myrank;    /* The rank of the calling process */ 
  9    int nlocal;    /* The local number of elements, and the array that stores them */ 
 10    int *elmnts;   /* The array that stores the local elements */ 
 11    int *relmnts;  /* The array that stores the received elements */ 
 12    int oddrank;   /* The rank of the process during odd-phase communication */ 
 13    int evenrank;  /* The rank of the process during even-phase communication */ 
 14    int *wspace;   /* Working space during the compare-split operation */ 
 15    int i; 
 16    MPI_Status status; 
 17 
 18    /* Initialize MPI and get system information */ 
 19    MPI_Init(&argc, &argv); 
 20    MPI_Comm_size(MPI_COMM_WORLD, &npes); 
 21    MPI_Comm_rank(MPI_COMM_WORLD, &myrank); 
 22 
 23    n = atoi(argv[1]); 
 24    nlocal = n/npes; /* Compute the number of elements to be stored locally. */ 
 25 
 26    /* Allocate memory for the various arrays */ 
 27    elmnts  = (int *)malloc(nlocal*sizeof(int)); 
 28    relmnts = (int *)malloc(nlocal*sizeof(int)); 
 29    wspace  = (int *)malloc(nlocal*sizeof(int)); 
 30 
 31    /* Fill-in the elmnts array with random elements */ 
 32    srandom(myrank); 
 33    for (i=0; i<nlocal; i++) 
 34      elmnts[i] = random(); 
 35 
 36    /* Sort the local elements using the built-in quicksort routine */ 
 37    qsort(elmnts, nlocal, sizeof(int), IncOrder); 
 38 
 39    /* Determine the rank of the processors that myrank needs to communicate during */ 
 40    /* the odd and even phases of the algorithm */ 
 41    if (myrank%2 == 0) { 
 42      oddrank  = myrank-1; 
 43      evenrank = myrank+1; 
 44    } 
 45    else { 
 46      oddrank  = myrank+1; 
 47      evenrank = myrank-1; 
 48    } 
 49 
 50    /* Set the ranks of the processors at the end of the linear */ 
 51    if (oddrank == -1 || oddrank == npes) 
 52      oddrank = MPI_PROC_NULL; 
 53    if (evenrank == -1 || evenrank == npes) 
 54      evenrank = MPI_PROC_NULL; 
 55 
 56    /* Get into the main loop of the odd-even sorting algorithm */ 
 57    for (i=0; i<npes-1; i++) { 
 58      if (i%2 == 1) /* Odd phase */ 
 59        MPI_Sendrecv(elmnts, nlocal, MPI_INT, oddrank, 1, relmnts, 
 60            nlocal, MPI_INT, oddrank, 1, MPI_COMM_WORLD, &status); 
 61      else /* Even phase */ 
 62        MPI_Sendrecv(elmnts, nlocal, MPI_INT, evenrank, 1, relmnts, 
 63            nlocal, MPI_INT, evenrank, 1, MPI_COMM_WORLD, &status); 
 64 
 65      CompareSplit(nlocal, elmnts, relmnts, wspace, 
 66                   myrank < status.MPI_SOURCE); 
 67    } 
 68 
 69    free(elmnts); free(relmnts); free(wspace); 
 70    MPI_Finalize(); 
 71  } 
 72 
 73  /* This is the CompareSplit function */ 
 74  CompareSplit(int nlocal, int *elmnts, int *relmnts, int *wspace, 
 75               int keepsmall) 
 76  { 
 77    int i, j, k; 
 78 
 79    for (i=0; i<nlocal; i++) 
 80      wspace[i] = elmnts[i]; /* Copy the elmnts array into the wspace array */ 
 81 
 82    if (keepsmall) { /* Keep the nlocal smaller elements */ 
 83      for (i=j=k=0; k<nlocal; k++) { 
 84        if (j == nlocal || (i < nlocal && wspace[i] < relmnts[j])) 
 85          elmnts[k] = wspace[i++]; 
 86        else 
 87          elmnts[k] = relmnts[j++]; 
 88      } 
 89    } 
 90    else { /* Keep the nlocal larger elements */ 
 91      for (i=k=nlocal-1, j=nlocal-1; k>=0; k--) { 
 92        if (j == 0 || (i >= 0 && wspace[i] >= relmnts[j])) 
 93          elmnts[k] = wspace[i--]; 
 94        else 
 95          elmnts[k] = relmnts[j--]; 
 96      } 
 97    } 
 98  } 
 99 
100  /* The IncOrder function that is called by qsort is defined as follows */ 
101  int IncOrder(const void *e1, const void *e2) 
102  { 
103    return (*((int *)e1) - *((int *)e2)); 
104  }


Cem Ozdogan 2006-12-20