00001
00002
00003
00004
00005
00006
00007
00008 #include "DEBUG.h"
00009 #include <messages.h>
00010 #include <sender.h>
00011 #include <socket.h>
00012
00013 #include <boost/asio.hpp>
00014 #include <boost/thread.hpp>
00015
00016 #include<iostream>
00017
00018 using namespace FastEcslent;
00019 using namespace boost::asio::ip;
00020
00021 void FastEcslent::Sender::init() {
00022 DEBUG(std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;)
00023 quit = false;
00024 sleepTime = new boost::posix_time::milliseconds(500);
00025
00026 }
00027
00028
00029 void FastEcslent::Sender::run() {
00030 senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00031 DEBUG(std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;)
00032 }
00033
00034 void FastEcslent::Sender::addMessage(Message *m) {
00035 DEBUG(std::cout << "Sender Added message of Type: " << (int) m->head.msgType << std::endl;)
00036 sendBuffer.push_back(m);
00037 }
00038
00039 Message *Sender::dequeMessage(){
00040
00041 if (sendBuffer.empty()) {
00042 DEBUG(std::cout << "Sender Dequeueing NULL message" << std::endl;)
00043 return NULL;
00044 } else {
00045 DEBUG(std::cout << "Sender Dequeueing FULL message" << std::endl;)
00046 Message *m = sendBuffer.front();
00047 sendBuffer.pop_front();
00048 return m;
00049 }
00050 }
00051
00052 void FastEcslent::Sender::runThread(){
00053
00054 try {
00055 socket = makeUDPBroadcastSocket();
00056
00057 DEBUG(std::cout << "Sender Opened Broadcast Socket" << std::endl;)
00058
00059 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00060
00061 int size;
00062 while(!quit){
00063
00064 size_t len;
00065 Message *m = dequeMessage();
00066 DEBUG(std::cout << "Sender sending message at time: " << m->head.millisecondsFromStart << std::endl;)
00067 while (m) {
00068 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00069 len = socket->send_to(boost::asio::buffer((void *) m, size), broadcastEndpoint);
00070 DEBUG(std::cout << "Sender sent message with len: " << len << " and with size: " << size <<std::endl;)
00071 delete m;
00072 m = dequeMessage();
00073 }
00074 boost::this_thread::sleep(*sleepTime);
00075 }
00076 } catch (std::exception& e) {
00077 DEBUG(std::cerr << "Sender exception: " << e.what() << std::endl;)
00078 quit = true;
00079 }
00080 DEBUG(std::cout << "..........................................Sender Dead......................................" << std::endl;)
00081 }
00082
00083 void FastEcslent::Sender::stop(){
00084 DEBUG(std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;)
00085 boost::mutex::scoped_lock scoped_lock(quitLock);
00086 quit = true;
00087 }
00088
00089 void FastEcslent::Sender::kill(){
00090 DEBUG(std::cout << "Sender: Killed.....:" << std::endl;)
00091 quit = true;
00092 senderThread.interrupt();
00093 }
00094
00095 void FastEcslent::Sender::join(){
00096 DEBUG(std::cout << "Sender: Shutting down socket" << std::endl;)
00097 boost::system::error_code ec;
00098 try {
00099
00100 socket->close();
00101
00102 DEBUG(std::cout << "Sender STOPPING, Sender socket closing" << std::endl;)
00103 if (ec){
00104 DEBUG(std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;)
00105 }
00106 } catch (std::exception& e) {
00107 DEBUG(std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;)
00108 }
00109 DEBUG(std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;)
00110 std::flush(std::cout);
00111 senderThread.join();
00112
00113 }