00001 /* 00002 * sender.cpp 00003 * 00004 * Created on: Jan 10, 2012 00005 * Author: sushil 00006 */ 00007 00008 #include <messages.h> 00009 #include <sender.h> 00010 #include <socket.h> 00011 00012 #include <boost/asio.hpp> 00013 #include <boost/thread.hpp> 00014 00015 #include<iostream> 00016 00017 using namespace FastEcslent; 00018 using namespace boost::asio::ip; 00019 00020 void FastEcslent::Sender::init() { //initialize sockets, public 00021 std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl; 00022 quit = false; 00023 SERVER_SLEEP_TIME = 470; 00024 CLIENT_SLEEP_TIME = 70; 00025 if(isServer) 00026 sleepTime = new boost::posix_time::milliseconds(SERVER_SLEEP_TIME); 00027 else 00028 sleepTime = new boost::posix_time::milliseconds(CLIENT_SLEEP_TIME); 00029 } 00030 00031 00032 void FastEcslent::Sender::run() { //start thread, public 00033 senderThread = boost::thread(&FastEcslent::Sender::runThread, this); 00034 std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl; 00035 } 00036 00037 void FastEcslent::Sender::addMessage(Message *m) { 00038 //std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl; 00039 sendBuffer.push_back(m); 00040 } 00041 00042 void FastEcslent::Sender::addPriorityMessage(Message *m) { 00043 //std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl; 00044 sendBuffer.push_front(m); 00045 } 00046 00047 Message *Sender::dequeMessage(){ 00048 00049 if (sendBuffer.empty()) { 00050 //std::cout << "Sender Dequeueing NULL message" << std::endl; 00051 return NULL; 00052 } else { 00053 //std::cout << "Sender Dequeueing FULL message" << std::endl; 00054 Message *m = sendBuffer.front(); 00055 sendBuffer.pop_front(); 00056 return m; 00057 } 00058 } 00059 00060 void FastEcslent::Sender::runThread(){ // run the netManaager thread, private (each tick) 00061 00062 try { 00063 socket = makeUDPBroadcastSocket(); 00064 //socket->bind(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port)); 00065 std::cout << "Sender Opened Broadcast Socket" << std::endl; 00066 //udp::endpoint broadcastEndpoint(udp::v4(), port); 00067 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port)); 00068 //socket->bind(broadcastEndpoint); 00069 int size; 00070 while(!quit){ 00071 00072 size_t len; 00073 Message *m = dequeMessage(); 00074 //std::cout << "Sender sending message at time: " << m->head.millisecondsFromStart << std::endl; 00075 while (m) { 00076 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize; 00077 00078 len = socket->send_to(boost::asio::buffer((const void *) m, size), broadcastEndpoint); 00079 00080 //std::cout << "Sender sent message with len: " << len << " and with size: " << size <<std::endl; 00081 delete m; 00082 m = dequeMessage(); 00083 } 00084 boost::this_thread::sleep(*sleepTime); 00085 }//end while 00086 } catch (std::exception& e) { 00087 std::cerr << "Sender exception: " << e.what() << std::endl; 00088 quit = true; 00089 } 00090 std::cout << "..........................................Sender Dead......................................" << std::endl; 00091 } 00092 00093 void FastEcslent::Sender::stop(){ // end thread, public 00094 std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl; 00095 boost::mutex::scoped_lock scoped_lock(quitLock); 00096 quit = true; 00097 } 00098 00099 void FastEcslent::Sender::kill(){ // end thread, public 00100 std::cout << "Sender: Killed.....:" << std::endl; 00101 quit = true; 00102 senderThread.interrupt(); 00103 } 00104 00105 void FastEcslent::Sender::join(){ //wait for thread to die, called by thread creator, public 00106 std::cout << "Sender: Shutting down socket" << std::endl; 00107 boost::system::error_code ec; 00108 try { 00109 //socket->shutdown(udp::socket::shutdown_both); 00110 socket->close(); 00111 //socket->close(ec); 00112 std::cout << "Sender STOPPING, Sender socket closing" << std::endl; 00113 if (ec){ 00114 std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl; 00115 } 00116 } catch (std::exception& e) { 00117 std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl; 00118 } 00119 std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl; 00120 std::flush(std::cout); 00121 senderThread.join(); 00122 00123 }