00001
00002
00003
00004
00005
00006
00007
00008 #include <messages.h>
00009 #include <sender.h>
00010 #include <socket.h>
00011
00012 #include <boost/asio.hpp>
00013 #include <boost/thread.hpp>
00014
00015 #include<iostream>
00016
00017 using namespace FastEcslent;
00018 using namespace boost::asio::ip;
00019
00020 void FastEcslent::Sender::init() {
00021 std::cout << "Initializing Sender before starting sender thread. port: " << port << std::endl;
00022 quit = false;
00023 SERVER_SLEEP_TIME = 470;
00024 CLIENT_SLEEP_TIME = 70;
00025 if(isServer)
00026 sleepTime = new boost::posix_time::milliseconds(SERVER_SLEEP_TIME);
00027 else
00028 sleepTime = new boost::posix_time::milliseconds(CLIENT_SLEEP_TIME);
00029 }
00030
00031
00032 void FastEcslent::Sender::run() {
00033 senderThread = boost::thread(&FastEcslent::Sender::runThread, this);
00034 std::cout << "Started Sender thread's run method" << senderThread.get_id() << std::endl;
00035 }
00036
00037 void FastEcslent::Sender::addMessage(Message *m) {
00038
00039 sendBuffer.push_back(m);
00040 }
00041
00042 void FastEcslent::Sender::addPriorityMessage(Message *m) {
00043
00044 sendBuffer.push_front(m);
00045 }
00046
00047 Message *Sender::dequeMessage(){
00048
00049 if (sendBuffer.empty()) {
00050
00051 return NULL;
00052 } else {
00053
00054 Message *m = sendBuffer.front();
00055 sendBuffer.pop_front();
00056 return m;
00057 }
00058 }
00059
00060 void FastEcslent::Sender::runThread(){
00061
00062 try {
00063 socket = makeUDPBroadcastSocket();
00064
00065 std::cout << "Sender Opened Broadcast Socket" << std::endl;
00066
00067 udp::endpoint broadcastEndpoint(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00068
00069 int size;
00070 while(!quit){
00071
00072 size_t len;
00073 Message *m = dequeMessage();
00074
00075 while (m) {
00076 size = (m->head.numberOfStructs * m->head.sizeOfStruct) + HeaderSize;
00077
00078 len = socket->send_to(boost::asio::buffer((const void *) m, size), broadcastEndpoint);
00079
00080
00081 delete m;
00082 m = dequeMessage();
00083 }
00084 boost::this_thread::sleep(*sleepTime);
00085 }
00086 } catch (std::exception& e) {
00087 std::cerr << "Sender exception: " << e.what() << std::endl;
00088 quit = true;
00089 }
00090 std::cout << "..........................................Sender Dead......................................" << std::endl;
00091 }
00092
00093 void FastEcslent::Sender::stop(){
00094 std::cout << "Sender: Stopping.....:" << senderThread.get_id() << std::endl;
00095 boost::mutex::scoped_lock scoped_lock(quitLock);
00096 quit = true;
00097 }
00098
00099 void FastEcslent::Sender::kill(){
00100 std::cout << "Sender: Killed.....:" << std::endl;
00101 quit = true;
00102 senderThread.interrupt();
00103 }
00104
00105 void FastEcslent::Sender::join(){
00106 std::cout << "Sender: Shutting down socket" << std::endl;
00107 boost::system::error_code ec;
00108 try {
00109
00110 socket->close();
00111
00112 std::cout << "Sender STOPPING, Sender socket closing" << std::endl;
00113 if (ec){
00114 std::cerr << "Error in closing Sender socket:" << ec.message() << std::endl;
00115 }
00116 } catch (std::exception& e) {
00117 std::cerr << "Exception: in closing Sender socket:" << e.what() << std::endl;
00118 }
00119 std::cout << "Sender: Socket has been shutdown...waiting for join().....:" << std::endl;
00120 std::flush(std::cout);
00121 senderThread.join();
00122
00123 }