00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 #include <messages.h>
00009 #include <sender.h>
00010 #include <socket.h>
00011 
00012 #include <boost/asio.hpp>
00013 #include <boost/thread.hpp>
00014 
00015 #include<iostream>
00016 
00017 using namespace FastEcslent;
00018 using namespace boost::asio::ip;
00019 
00020 void FastEcslent::Sender::init() { 
00021         std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;
00022         quit = false;
00023         SERVER_SLEEP_TIME = 470;
00024         CLIENT_SLEEP_TIME = 70;
00025         if(isServer)
00026                 sleepTime = new boost::posix_time::milliseconds(SERVER_SLEEP_TIME);
00027         else
00028                 sleepTime = new boost::posix_time::milliseconds(CLIENT_SLEEP_TIME);
00029 }
00030 
00031 
00032 void FastEcslent::Sender::run() { 
00033         senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00034         std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;
00035 }
00036 
00037 void FastEcslent::Sender::addMessage(Message *m) {
00038         
00039         sendBuffer.push_back(m);
00040 }
00041 
00042 void FastEcslent::Sender::addPriorityMessage(Message *m) {
00043         
00044         sendBuffer.push_front(m);
00045 }
00046 
00047 Message *Sender::dequeMessage(){
00048 
00049         if (sendBuffer.empty()) {
00050                 
00051                 return NULL;
00052         } else {
00053                 
00054                 Message *m = sendBuffer.front();
00055                 sendBuffer.pop_front();
00056                 return m;
00057         }
00058 }
00059 
00060 void FastEcslent::Sender::runThread(){ 
00061 
00062         try {
00063                 socket = makeUDPBroadcastSocket();
00064                 
00065                 std::cout << "Sender Opened Broadcast Socket" << std::endl;
00066                 
00067                 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00068                 
00069                 int size;
00070                 while(!quit){
00071 
00072                         size_t len;
00073                         Message *m = dequeMessage();
00074                         
00075                         while (m) {
00076                                 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00077 
00078                                 len = socket->send_to(boost::asio::buffer((const void *) m, size), broadcastEndpoint);
00079 
00080                                 
00081                                 delete m;
00082                                 m = dequeMessage();
00083                         }
00084                         boost::this_thread::sleep(*sleepTime);
00085                 }
00086                 } catch (std::exception& e) {
00087                         std::cerr << "Sender exception: " << e.what() << std::endl;
00088                         quit = true;
00089                 }
00090                 std::cout << "..........................................Sender Dead......................................" << std::endl;
00091 }
00092 
00093 void FastEcslent::Sender::stop(){ 
00094         std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;
00095         boost::mutex::scoped_lock scoped_lock(quitLock);
00096         quit = true;
00097 }
00098 
00099 void FastEcslent::Sender::kill(){ 
00100         std::cout << "Sender: Killed.....:" << std::endl;
00101         quit = true;
00102         senderThread.interrupt();
00103 }
00104 
00105 void FastEcslent::Sender::join(){ 
00106         std::cout << "Sender: Shutting down socket" << std::endl;
00107         boost::system::error_code ec;
00108         try {
00109                 
00110                 socket->close();
00111                 
00112                 std::cout << "Sender STOPPING, Sender socket closing" << std::endl;
00113                 if (ec){
00114                         std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;
00115                 }
00116         } catch (std::exception& e) {
00117                 std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;
00118         }
00119         std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;
00120         std::flush(std::cout);
00121         senderThread.join();
00122 
00123 }