00001
00002
00003
00004
00005
00006
00007
00008 #include <iostream>
00009
00010 #include "DEBUG.h"
00011 #include <messages.h>
00012 #include <listener.h>
00013 #include <socket.h>
00014
00015
00016 #include <boost/asio.hpp>
00017 #include <boost/thread.hpp>
00018
00019 using namespace FastEcslent;
00020 using namespace boost::asio::ip;
00021
00022 void Listener::init() {
00023 DEBUG(std::cout << "Initializing Listener before starting listener thread. port: " << port << std::endl;)
00024 quit = false;
00025
00026
00027
00028
00029
00030
00031 }
00032
00033
00034
00035
00036
00037
00038
00039 void Listener::addMessage(Message *m) {
00040 DEBUG(std::cout << "Listener Added message of Type: " << (int) m->head.msgType << std::endl;)
00041
00042 recBuffer.push_back(m);
00043 }
00044
00045 Message *Listener::dequeMessage(){
00046 DEBUG(std::cout << "Listener Dequeueing message" << std::endl;)
00047 if (recBuffer.empty()) {
00048 return NULL;
00049 } else {
00050 Message *m = recBuffer.front();
00051 recBuffer.pop_front();
00052 return m;
00053 }
00054 }
00055
00056 void FastEcslent::Listener::run() {
00057 listenerThread = boost::thread(&FastEcslent::Listener::runThread, this);
00058 DEBUG(std::cout << "Running Listener thread: " << listenerThread.get_id() << std::endl;)
00059 }
00060
00061
00062 void FastEcslent::Listener::runThread(){
00063 DEBUG(std::cout << "..........................................Running listener......................................" << std::endl;)
00064 try {
00065
00066 socket = makeUDPBroadcastSocket();
00067
00068
00069
00070
00071
00072
00073
00074
00075 socket->bind(udp::endpoint(boost::asio::ip::address_v4::broadcast( ), port));
00076 DEBUG(std::cout << "Listener Bound Socket" << std::endl;)
00077
00078 udp::endpoint recFrom;
00079 while(!quit){
00080 DEBUG(std::cout << "Running Listener with id: " << listenerThread.get_id() << std::endl;)
00081 size_t len;
00082 Message *m = new Message;
00083 len = socket->receive_from(boost::asio::buffer((char *) m, MaxMessageSize), recFrom);
00084 if (recFrom != myIP ) {
00085 DEBUG(std::cout << "Listener got message with length: " << len << " from " << recFrom.address() << std::endl;)
00086 DEBUG(std::cout << "Type: " << (int) m->head.msgType << std::endl;)
00087 addMessage((Message *) m);
00088 }
00089 }
00090 } catch (std::exception& e) {
00091 DEBUG(std::cerr << "Listener Exception: " << e.what() << std::endl;)
00092
00093 return;
00094 }
00095
00096 DEBUG(std::cout << "..........................................Listener Dead......................................" << std::endl;)
00097 }
00098
00099 void FastEcslent::Listener::stop(){
00100 DEBUG(std::cout << "Listener stopping..." << std::endl;)
00101 boost::mutex::scoped_lock scoped_lock(quitLock);
00102 quit = true;
00103 }
00104
00105 void FastEcslent::Listener::kill(){
00106 DEBUG(std::cout << "Listener killed..." << std::endl;)
00107 quit = true;
00108 listenerThread.interrupt();
00109 }
00110
00111 void FastEcslent::Listener::join(){
00112 DEBUG(std::cout << "Listener shutting down socket..." << quit << std::endl;)
00113 std::flush(std::cout);
00114 listenerThread.join();
00115 try {
00116 boost::system::error_code ec;
00117
00118 if(socket->is_open())
00119 socket->close(ec);
00120
00121 if (ec){
00122 DEBUG(std::cerr << "Error in closing listener socket:" << ec.message() << std::endl;)
00123 }
00124 } catch (std::exception& e) {
00125 DEBUG(std::cerr << "Exception: in closing listener socket:" << e.what() << std::endl;)
00126 }
00127 DEBUG(std::cout << "Listener Socket shutdown." << std::endl;)
00128 }
00129
00130