#include #include #include #include #include #include const char* procname = 0; #define n_printf(FORMAT, ARGS...) { if( procname ) printf("%-10s: "FORMAT, procname, ##ARGS); else printf("%-10i: "FORMAT, n, ##ARGS); } float randF( float min = 0, float max = 1.0f ){ return random() / float(RAND_MAX) * (max-min) + min; } int randI( int min = 0 , int max = 1){ return int((random() / float(RAND_MAX)) * ((max-min)+1) + min); } #define roll(PROB) if(randF(0,1) < PROB ) int n; //my rank / processor id int N; //total number of processors int myrank, numProcs; enum tasks { //MESSAGE TAGS EVALUATE, //MASTER -> SLAVE here is a bitstring to evaluate DIE, //MASTER -> SLAVE stop waiting for evaluations and close FITNESS, //SLAVE -> MASTER I am done evaluating and here is the fitness RUNNING, //SLAVE -> MASTER I am ready to do work }; //GA vars int gen; //current generation const int numgens = 40; //number of generations to run const int popsize = 40; //population size const int l = 10; //genome length == numbits float crossoverProb = .75; //prob of crossover happenning float mutationProb = .1; //prob of each bit being mutated float fitnessTotal; //total fitness for the current generation #include "genome.h" Genome pop[popsize]; //the population Genome newpop[popsize]; //new individuals are created in here, copied to pop inside nextGeneration() /******************************************************************** GA STUFF */ void initializePopulation(); //setup the populations void release(); //frees up memory void showPopulation(); //prints out the current population void crossover(); //performs crossover void mutation(); //performs mutation void selection(); //performs roullete wheel selection void nextGeneration(); //readies the next generation void evaluatePopulation(); //MASTER -> evaluates the entire population, making lots of parallel calls /******************************************************************** PARALLEL STUFF */ void waitForSlaves(); //MASTER -> waits for all slaves to call signalMaster(); void signalMaster(); //SLAVE -> signals the master that I am ready for work // MASTER -> SENDS A GENOME TO BE EVALUATED void evaluateExternal( const Genome& g, //the genome to be evaluated int procSendingTo, //the processor who should evaluate it float* fits, //the array of floats that fitness' returned arestored into MPI_Request* reqs ); //the array of requests that track which fitness have been done / evaluations have finished. void slave(); //wait for genomes to be sent by the master until a DIE message is recieved. /******************************************************************** GA STUFF */ void initializePopulation(){ for( int i = 0; i < popsize; i ++ ){ pop[i].initialize(); newpop[i].initialize(); pop[i].randomize(); } } void release(){ for( int i = 1; i < N; i ++ ){ MPI_Send( 0,0, MPI_INT, i, DIE, MPI_COMM_WORLD ); } } void showPopulation(){ printf("******************************************************\n"); printf("Gen: %i\n", gen ); for( int i = 0; i < popsize; i++ ){ printf("%s\n", pop[i].toString()); } } void crossover(){ for( int i = 0; i < popsize-1; i+=2 ){ newpop[i].crossover(newpop[i+1]); } } void mutation(){ for( int i = 0; i < popsize; i ++ ) newpop[i].mutate(); } void selection(){ for( int i = 0; i < popsize; i ++ ){ float choice = randF( 0, fitnessTotal ); int chosen = -1; int low = 0; int high = popsize - 1; int mid = 0; int mid0 = -1; while( mid!= mid0 ){ mid0 = mid; mid = (high + low)/2; if( pop[mid].lastSlot > choice ) if( mid == 0 || pop[mid-1].lastSlot < choice ){ chosen = mid; break; } else high = mid; else low = mid; } if( chosen == -1 ) chosen = popsize-1; newpop[i] = pop[chosen]; } } void nextGeneration(){ for( int i = 0; i < popsize; i ++ ) pop[i] = newpop[i]; } /******************************************************************** PARALLEL STUFF */ void waitForSlaves(){ int j; //just needed because i dont know how to send an emtpy message (just a tag) MPI_Status status; for( int i = 1; i < N; i ++ ){ //wait until they send a message message with the tag RUNNING MPI_Recv( &j, 1, MPI_INT, i, RUNNING, MPI_COMM_WORLD, &status ); assert( j == 1 ); n_printf("recived startup from %i\n", i ); } } void signalMaster(){ int i = 1; MPI_Send( &i, 1, MPI_INT, 0, RUNNING, MPI_COMM_WORLD ); } void evaluateExternal( const Genome& g, int procSendingTo, float* fits, MPI_Request* reqs ){ /* to evaluate 1) send the bitstring to a slave to proccess sending is done synchonously with MPI_Send, since there is little delay involved. the only data needed is the actual bitstring, so we just pack the array of floats and send it 2) wait for it to send back a fitness this needs to be asynchonously, so the proccessors do not need to finish in order this requires using MPI_Irecv, which doesn't block, it works through a request the request keeps track of whether or not that command hsa been completed. */ MPI_Send( g.bits, //the array we are sending from l, //the size of the array MPI_INT, //the array type procSendingTo, //the processor to send the message to EVALUATE, //the message tag MPI_COMM_WORLD ); //the group of processors we are working with (all proccessors) MPI_Irecv( fits, //the array we are recieving into 1, //number of elements being recieved MPI_FLOAT, //the type being recieved procSendingTo, //the processor that is sending the message (we get the fitness back from the processor we told to evaluate) FITNESS, //only accept messages with tag FITNESS MPI_COMM_WORLD, //we are working with all processors reqs ); //the array of requests to track } void evaluatePopulation(){ MPI_Request* reqs = new MPI_Request[N]; MPI_Status* stats = new MPI_Status[N]; int* sentTo = new int[N]; float* fits = new float[N]; int lastPopSent = 0; int genomeFinished; float fitness; fitnessTotal = 0.0f; /*********** send out individuals to every member in the population ************/ for( int i = 1; i < N && lastPopSent < popsize; i ++ ){ n_printf("sending genome %i -> proc(%i)\n", lastPopSent, i ); evaluateExternal( pop[lastPopSent], i, &fits[i], &reqs[i] ); sentTo[i] = lastPopSent; lastPopSent++; } /*********** as processors finish processing send them other work to do ************/ while( lastPopSent < popsize ){ int index; //the index of the proccessor that finished in wait any, relative to the start of the array. - unused int proc; //rank of the processor that has finished //****** wait for a proc to finish MPI_Waitany( N-1, &reqs[1], &index, &stats[0] ); //****** save the work it has done proc = stats[0].MPI_SOURCE; //identify which processor is done assert( proc == index + 1 ); //confirm that we have the right processor idea fitness = fits[proc]; //the fitness from proc will be saved to fits[proc] genomeFinished = sentTo[proc]; //which genome it evaluated n_printf("recieving evaluation from %i, genome %i fitness = %f\n", proc, genomeFinished, fits[proc] ); pop[genomeFinished].setFitness( fitness , fitnessTotal ); //save the fitness /*********** send it more work ************/ n_printf("sending genome %i -> proc(%i)\n", lastPopSent, proc ); evaluateExternal( pop[lastPopSent], proc, &fits[proc], &reqs[proc] ); sentTo[proc] = lastPopSent; lastPopSent++; } /*********** once everything has been sent out, wait for all the proccessors to finish. ************/ MPI_Waitall( N-1, &reqs[1], stats ); // LET THEM ALL FINISH for( int i = 1; i < N; i ++ ){ //SAVE ALL THEIR RETURNS INTO THE GENOMES genomeFinished = sentTo[i]; fitness = fits[i]; pop[genomeFinished].setFitness( fitness , fitnessTotal ); } } void slave(){ float fitness = 0.0f; //calculated fitness int* bits = new int[l]; //local array for the bitstring MPI_Status status; //message info while(1) { //wait for a bitstring to evaluate MPI_Recv( bits, l, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status ); //check the message tag switch( status.MPI_TAG ){ case DIE: n_printf("DYING\n"); delete[] bits; return; case EVALUATE: fitness = 0.0f; usleep(int(500000 * (1.0 + drand48()))); //wait for a bit, to simulate a more complicated evaluation n_printf("recived " ); for( int i = 0; i < l; i ++ ) printf("%i", bits[i]); printf("\t"); for( int i = 0; i < l; i ++ ) //onemax fitness += bits[i]; printf("fitness : %f\n", fitness ); //FINISHED evaluating, send fitness back to the master MPI_Send( &fitness, 1, MPI_FLOAT, 0, FITNESS, MPI_COMM_WORLD ); break; default: n_printf("failed to recognize tag : %i\n", status.MPI_TAG ); } } n_printf("slave escaped\n"); } int main( int argc, char* argv[] ){ srand(time(0)); MPI_Init( &argc, &argv ); MPI_Comm_rank( MPI_COMM_WORLD, &n ); MPI_Comm_size( MPI_COMM_WORLD, &N ); n_printf( "Hello parallel worlds! It's me: Processor %i out of %i\n", n, N); if( n == 0 ){ procname = "MASTER"; waitForSlaves(); initializePopulation(); for( gen = 0; gen < numgens; gen ++ ){ evaluatePopulation(); showPopulation(); selection(); crossover(); mutation(); nextGeneration(); } evaluatePopulation(); showPopulation(); release(); } else { char * string = new char[1024]; sprintf( string, "SLAVE %i", n ); procname = string; signalMaster(); slave(); } MPI_Finalize(); return 0; }