sender.cpp

Go to the documentation of this file.
00001 /*
00002  * sender.cpp
00003  *
00004  *  Created on: Jan 10, 2012
00005  *      Author: sushil
00006  */
00007 
00008 #include "DEBUG.h"
00009 #include <messages.h>
00010 #include <sender.h>
00011 #include <socket.h>
00012 
00013 #include <boost/asio.hpp>
00014 #include <boost/thread.hpp>
00015 
00016 #include<iostream>
00017 
00018 using namespace FastEcslent;
00019 using namespace boost::asio::ip;
00020 
00021 void FastEcslent::Sender::init() { //initialize sockets, public
00022         DEBUG(std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;)
00023         quit = false;
00024         sleepTime = new boost::posix_time::milliseconds(500);
00025 
00026 }
00027 
00028 
00029 void FastEcslent::Sender::run() { //start thread, public
00030         senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00031         DEBUG(std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;)
00032 }
00033 
00034 void FastEcslent::Sender::addMessage(Message *m) {
00035         DEBUG(std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl;)
00036         sendBuffer.push_back(m);
00037 }
00038 
00039 Message *Sender::dequeMessage(){
00040 
00041         if (sendBuffer.empty()) {
00042                 DEBUG(std::cout << "Sender Dequeueing NULL message" << std::endl;)
00043                 return NULL;
00044         } else {
00045                 DEBUG(std::cout << "Sender Dequeueing FULL message" << std::endl;)
00046                 Message *m = sendBuffer.front();
00047                 sendBuffer.pop_front();
00048                 return m;
00049         }
00050 }
00051 
00052 void FastEcslent::Sender::runThread(){ // run the netManaager thread, private (each tick)
00053 
00054         try {
00055                 socket = makeUDPBroadcastSocket();
00056                 //socket->bind(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00057                 DEBUG(std::cout << "Sender Opened Broadcast Socket" << std::endl;)
00058                 //udp::endpoint broadcastEndpoint(udp::v4(), port);
00059                 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00060                 //socket->bind(broadcastEndpoint);
00061                 int size;
00062                 while(!quit){
00063 
00064                         size_t len;
00065                         Message *m = dequeMessage();
00066                         DEBUG(std::cout << "Sender sending message at time: " << m->head.millisecondsFromStart << std::endl;)
00067                         while (m) {
00068                                 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00069                                 len = socket->send_to(boost::asio::buffer((void *) m, size), broadcastEndpoint);
00070                                 DEBUG(std::cout << "Sender sent message with len: " << len << " and with size: " << size <<std::endl;)
00071                                 delete m;
00072                                 m = dequeMessage();
00073                         }
00074                         boost::this_thread::sleep(*sleepTime);
00075                 }//end while
00076                 } catch (std::exception& e) {
00077                         DEBUG(std::cerr << "Sender exception: " << e.what() << std::endl;)
00078                         quit = true;
00079                 }
00080                 DEBUG(std::cout << "..........................................Sender Dead......................................" << std::endl;)
00081 }
00082 
00083 void FastEcslent::Sender::stop(){ // end thread, public
00084         DEBUG(std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;)
00085         boost::mutex::scoped_lock scoped_lock(quitLock);
00086         quit = true;
00087 }
00088 
00089 void FastEcslent::Sender::kill(){ // end thread, public
00090         DEBUG(std::cout << "Sender: Killed.....:" << std::endl;)
00091         quit = true;
00092         senderThread.interrupt();
00093 }
00094 
00095 void FastEcslent::Sender::join(){ //wait for thread to die, called by thread creator, public
00096         DEBUG(std::cout << "Sender: Shutting down socket" << std::endl;)
00097         boost::system::error_code ec;
00098         try {
00099                 //socket->shutdown(udp::socket::shutdown_both);
00100                 socket->close();
00101                 //socket->close(ec);
00102                 DEBUG(std::cout << "Sender STOPPING, Sender socket closing" << std::endl;)
00103                 if (ec){
00104                         DEBUG(std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;)
00105                 }
00106         } catch (std::exception& e) {
00107                 DEBUG(std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;)
00108         }
00109         DEBUG(std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;)
00110         std::flush(std::cout);
00111         senderThread.join();
00112 
00113 }

Generated on Fri Dec 13 14:54:18 2013 for FastECSLent by  doxygen 1.5.4