00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 #include "DEBUG.h"
00009 #include <messages.h>
00010 #include <sender.h>
00011 #include <socket.h>
00012 
00013 #include <boost/asio.hpp>
00014 #include <boost/thread.hpp>
00015 
00016 #include<iostream>
00017 
00018 using namespace FastEcslent;
00019 using namespace boost::asio::ip;
00020 
00021 void FastEcslent::Sender::init() { 
00022         DEBUG(std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;)
00023         quit = false;
00024         sleepTime = new boost::posix_time::milliseconds(500);
00025 
00026 }
00027 
00028 
00029 void FastEcslent::Sender::run() { 
00030         senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00031         DEBUG(std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;)
00032 }
00033 
00034 void FastEcslent::Sender::addMessage(Message *m) {
00035         DEBUG(std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl;)
00036         sendBuffer.push_back(m);
00037 }
00038 
00039 Message *Sender::dequeMessage(){
00040 
00041         if (sendBuffer.empty()) {
00042                 DEBUG(std::cout << "Sender Dequeueing NULL message" << std::endl;)
00043                 return NULL;
00044         } else {
00045                 DEBUG(std::cout << "Sender Dequeueing FULL message" << std::endl;)
00046                 Message *m = sendBuffer.front();
00047                 sendBuffer.pop_front();
00048                 return m;
00049         }
00050 }
00051 
00052 void FastEcslent::Sender::runThread(){ 
00053 
00054         try {
00055                 socket = makeUDPBroadcastSocket();
00056                 
00057                 DEBUG(std::cout << "Sender Opened Broadcast Socket" << std::endl;)
00058                 
00059                 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00060                 
00061                 int size;
00062                 while(!quit){
00063 
00064                         size_t len;
00065                         Message *m = dequeMessage();
00066                         DEBUG(std::cout << "Sender sending message at time: " << m->head.millisecondsFromStart << std::endl;)
00067                         while (m) {
00068                                 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00069                                 len = socket->send_to(boost::asio::buffer((void *) m, size), broadcastEndpoint);
00070                                 DEBUG(std::cout << "Sender sent message with len: " << len << " and with size: " << size <<std::endl;)
00071                                 delete m;
00072                                 m = dequeMessage();
00073                         }
00074                         boost::this_thread::sleep(*sleepTime);
00075                 }
00076                 } catch (std::exception& e) {
00077                         DEBUG(std::cerr << "Sender exception: " << e.what() << std::endl;)
00078                         quit = true;
00079                 }
00080                 DEBUG(std::cout << "..........................................Sender Dead......................................" << std::endl;)
00081 }
00082 
00083 void FastEcslent::Sender::stop(){ 
00084         DEBUG(std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;)
00085         boost::mutex::scoped_lock scoped_lock(quitLock);
00086         quit = true;
00087 }
00088 
00089 void FastEcslent::Sender::kill(){ 
00090         DEBUG(std::cout << "Sender: Killed.....:" << std::endl;)
00091         quit = true;
00092         senderThread.interrupt();
00093 }
00094 
00095 void FastEcslent::Sender::join(){ 
00096         DEBUG(std::cout << "Sender: Shutting down socket" << std::endl;)
00097         boost::system::error_code ec;
00098         try {
00099                 
00100                 socket->close();
00101                 
00102                 DEBUG(std::cout << "Sender STOPPING, Sender socket closing" << std::endl;)
00103                 if (ec){
00104                         DEBUG(std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;)
00105                 }
00106         } catch (std::exception& e) {
00107                 DEBUG(std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;)
00108         }
00109         DEBUG(std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;)
00110         std::flush(std::cout);
00111         senderThread.join();
00112 
00113 }