sender.cpp

Go to the documentation of this file.
00001 /*
00002  * sender.cpp
00003  *
00004  *  Created on: Jan 10, 2012
00005  *      Author: sushil
00006  */
00007 
00008 #include <messages.h>
00009 #include <sender.h>
00010 #include <socket.h>
00011 
00012 #include <boost/asio.hpp>
00013 #include <boost/thread.hpp>
00014 
00015 #include<iostream>
00016 
00017 using namespace FastEcslent;
00018 using namespace boost::asio::ip;
00019 
00020 void FastEcslent::Sender::init() { //initialize sockets, public
00021         std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;
00022         quit = false;
00023         SERVER_SLEEP_TIME = 470;
00024         CLIENT_SLEEP_TIME = 70;
00025         if(isServer)
00026                 sleepTime = new boost::posix_time::milliseconds(SERVER_SLEEP_TIME);
00027         else
00028                 sleepTime = new boost::posix_time::milliseconds(CLIENT_SLEEP_TIME);
00029 }
00030 
00031 
00032 void FastEcslent::Sender::run() { //start thread, public
00033         senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00034         std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;
00035 }
00036 
00037 void FastEcslent::Sender::addMessage(Message *m) {
00038         //std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl;
00039         sendBuffer.push_back(m);
00040 }
00041 
00042 void FastEcslent::Sender::addPriorityMessage(Message *m) {
00043         //std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl;
00044         sendBuffer.push_front(m);
00045 }
00046 
00047 Message *Sender::dequeMessage(){
00048 
00049         if (sendBuffer.empty()) {
00050                 //std::cout << "Sender Dequeueing NULL message" << std::endl;
00051                 return NULL;
00052         } else {
00053                 //std::cout << "Sender Dequeueing FULL message" << std::endl;
00054                 Message *m = sendBuffer.front();
00055                 sendBuffer.pop_front();
00056                 return m;
00057         }
00058 }
00059 
00060 void FastEcslent::Sender::runThread(){ // run the netManaager thread, private (each tick)
00061 
00062         try {
00063                 socket = makeUDPBroadcastSocket();
00064                 //socket->bind(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00065                 std::cout << "Sender Opened Broadcast Socket" << std::endl;
00066                 //udp::endpoint broadcastEndpoint(udp::v4(), port);
00067                 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00068                 //socket->bind(broadcastEndpoint);
00069                 int size;
00070                 while(!quit){
00071 
00072                         size_t len;
00073                         Message *m = dequeMessage();
00074                         //std::cout << "Sender sending message at time: " << m->head.millisecondsFromStart << std::endl;
00075                         while (m) {
00076                                 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00077 
00078                                 len = socket->send_to(boost::asio::buffer((const void *) m, size), broadcastEndpoint);
00079 
00080                                 //std::cout << "Sender sent message with len: " << len << " and with size: " << size <<std::endl;
00081                                 delete m;
00082                                 m = dequeMessage();
00083                         }
00084                         boost::this_thread::sleep(*sleepTime);
00085                 }//end while
00086                 } catch (std::exception& e) {
00087                         std::cerr << "Sender exception: " << e.what() << std::endl;
00088                         quit = true;
00089                 }
00090                 std::cout << "..........................................Sender Dead......................................" << std::endl;
00091 }
00092 
00093 void FastEcslent::Sender::stop(){ // end thread, public
00094         std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;
00095         boost::mutex::scoped_lock scoped_lock(quitLock);
00096         quit = true;
00097 }
00098 
00099 void FastEcslent::Sender::kill(){ // end thread, public
00100         std::cout << "Sender: Killed.....:" << std::endl;
00101         quit = true;
00102         senderThread.interrupt();
00103 }
00104 
00105 void FastEcslent::Sender::join(){ //wait for thread to die, called by thread creator, public
00106         std::cout << "Sender: Shutting down socket" << std::endl;
00107         boost::system::error_code ec;
00108         try {
00109                 //socket->shutdown(udp::socket::shutdown_both);
00110                 socket->close();
00111                 //socket->close(ec);
00112                 std::cout << "Sender STOPPING, Sender socket closing" << std::endl;
00113                 if (ec){
00114                         std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;
00115                 }
00116         } catch (std::exception& e) {
00117                 std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;
00118         }
00119         std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;
00120         std::flush(std::cout);
00121         senderThread.join();
00122 
00123 }

Generated on Fri Dec 13 14:54:18 2013 for FastECSLent by  doxygen 1.5.4