Advertisement
wandrake

Untitled

Jul 13th, 2011
288
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C 18.28 KB | None | 0 0
  1. #include "dgraph.h"
  2. #include "shortestpath.h"
  3. #include "comsock.h"
  4. #include "messages.h"
  5. #include "sigutils.h"
  6. #include "mgutils.h"
  7. #include "macro.h"
  8.  
  9. #include <pthread.h>
  10. #include <stdlib.h>
  11. #include <signal.h>
  12. #include <string.h>
  13. #include <errno.h>
  14. #include <unistd.h>
  15.  
  16. #define LOGFILE "./mgcars.log"
  17. #define SOCKNAME "./tmp/cars.sck"
  18.  
  19. graph_t* map;
  20.  
  21. sessionlist_t* sessions;
  22. offerlist_t* offers;
  23. requestlist_t* requests;
  24.  
  25. /* Socket creato all'inizio da mgcars e attivo
  26.  * fino alla chiusura del server */
  27.  
  28. int server_sock;
  29. int thread_terminate = 0;
  30.  
  31. pthread_t request_dispatcher;
  32. pthread_t request_match;
  33.  
  34. FILE* logfile;
  35.  
  36. int find_covering(unsigned int from, unsigned int to, unsigned int* pprec, matchlist_t* covering) {
  37.     debug( fprintf(stderr, "find_covering: from %d to %d\n", from, to); );
  38.     /* Passo base */
  39.     if (from == to) return 1;
  40.     else {
  41.         offer_t* offer = offers->first;
  42.         int found = 0;
  43.  
  44.         /* Scorro tutte le offerte, la prima che mi fa fare almeno
  45.          * uno step del mio percorso da request->from a request->to
  46.          * valutato al contrario per questione di comodità
  47.          * lo inserisco in coda alla lista di match e ricorro sul
  48.          * pezzo di percorso non ancora valutato, se arrivo ad una
  49.          * richiesta con partenza e arrivo uguali vuol dire che ho
  50.          * trovato il match e sono a posto, altrimenti ritorno 0
  51.          * e la matchlist non è significativa (e va liberata) */
  52.  
  53.         while (offer != NULL && !found) {
  54.             int a, b, tmpfrom;
  55.             int count;
  56.  
  57.             a = to;
  58.             b = offer->to;
  59.             count = 0;
  60.  
  61.             /* Mettere offer->num qua è un barbatrucco, ma sennò il codice
  62.              * diventava ancora più illeggibile */
  63.  
  64.             while (b != offer->from && b != a) b = offer->pprec[b];
  65.  
  66.             while (a == b && a >= 0 && b >= 0) {
  67.                 count++;
  68.                 tmpfrom = a;
  69.                 a = pprec[a];
  70.                 b = offer->pprec[b];
  71.             }
  72.  
  73.             if (count > 1) {
  74.                 /* Calcolo la rotta rimanente in termini di from e to */
  75.                 match_t* match_add;
  76.  
  77.                 ec_null( match_add = match_new() );
  78.                 match_add->from = tmpfrom;
  79.                 match_add->to = to;
  80.                 match_add->offer_ref = offer;
  81.                 matchlist_add(covering, match_add);
  82.  
  83.                 if (!find_covering(from, tmpfrom, pprec, covering)) {
  84.                     matchlist_remove(covering, match_add);
  85.                 }
  86.                 else return 1;
  87.             }
  88.  
  89.             offer = offer->next;
  90.         }
  91.         return 0;
  92.  
  93.     }
  94.  
  95. EC_CLEANUP_BGN
  96.     return 0;
  97. EC_CLEANUP_END
  98. }
  99.  
  100. void perform_match() {
  101.     request_t* request = NULL;
  102.     message_t* answer = NULL;
  103.  
  104.     request_t* remove_request;
  105.     offer_t* remove_offer;
  106.  
  107.     matchlist_t* covering;
  108.     char* logwrite;
  109.     char* path;
  110.  
  111.     request = requests->first;
  112.  
  113.     while (request != NULL) {
  114.         int found;
  115.  
  116.         debug( fprintf(stderr, "perform_match(): evaluating\n"); );
  117.         request_print(request);
  118.         remove_request = NULL;
  119.         covering = matchlist_new();
  120.  
  121.         ec_nzero( pthread_mutex_lock(&(offers->mutex)) );
  122.         found = find_covering(request->from, request->to, request->pprec, covering);
  123.         ec_nzero( pthread_mutex_unlock(&(offers->mutex)) );
  124.  
  125.         if (found) {
  126.             match_t* ptr;
  127.  
  128.             remove_request = request;
  129.             ptr = covering->first;
  130.  
  131.             while (ptr != NULL) {
  132.                 int size;
  133.  
  134.                 debug( fprintf(stderr, "perform_match: ptr->from: %d, ptr->to: %d\n", ptr->from, ptr->to); );
  135.  
  136.                 ec_null( path = shpath_to_string(map, ptr->from, ptr->to, request->pprec) );
  137.                 size = strlen(path)+strlen(session_user(request->owner))+strlen(session_user(ptr->offer_ref->owner))+3;
  138.                 ec_null( logwrite = malloc(size*sizeof(char)) );
  139.                 sprintf(logwrite, "%s$%s$%s", session_user(ptr->offer_ref->owner), session_user(request->owner), path);
  140.  
  141.                 fwrite(logwrite, strlen(logwrite), 1, logfile);
  142.                 fwrite("\n", 1, 1, logfile);
  143.  
  144.                 ec_null( answer = create_message(MSG_SHARE, logwrite) );
  145.  
  146.                 ec_meno1( sendMessage(request->owner->client_send, answer) );
  147.  
  148.                 if (ptr->to != ptr->offer_ref->to) {
  149.                     offer_t* tmp;
  150.                     double* dist;
  151.  
  152.                     ec_null( tmp = offer_new() );
  153.                     tmp->from = ptr->to;
  154.                     tmp->to = ptr->offer_ref->to;
  155.                     tmp->num = 1;
  156.                     tmp->pprec = NULL;
  157.  
  158.                     /* cleanup interno */
  159.                     if ( (dist = dijkstra(map, tmp->from, &(tmp->pprec))) == NULL ) {
  160.                         offer_free(&tmp);
  161.                         /* cleanup della funzione */
  162.                         goto ec_cleanup_bgn;
  163.                     }
  164.  
  165.                     free(dist);
  166.                     tmp->owner = ptr->offer_ref->owner;
  167.                     offerlist_add(offers, tmp);
  168.                 }
  169.  
  170.                 if (ptr->from != ptr->offer_ref->from) {
  171.                     offer_t* tmp;
  172.                     double* dist;
  173.  
  174.                     ec_null( tmp = offer_new() );
  175.                     tmp->from = ptr->offer_ref->from;
  176.                     tmp->to = ptr->from;
  177.                     tmp->num = 1;
  178.                     tmp->pprec = NULL;
  179.  
  180.                     /* cleanup interno */
  181.                     if ( (dist = dijkstra(map, tmp->from, &(tmp->pprec))) == NULL ) {
  182.                         offer_free(&tmp);
  183.                         /* cleanup della funzione */
  184.                         goto ec_cleanup_bgn;
  185.                     }
  186.  
  187.                     free(dist);
  188.                     tmp->owner = ptr->offer_ref->owner;
  189.                     offerlist_add(offers, tmp);
  190.                 }
  191.  
  192.                 fflush(logfile);
  193.                 remove_offer = ptr->offer_ref;
  194.                 ptr->offer_ref->num--;
  195.                 if (remove_offer->num == 0) offerlist_remove(offers, remove_offer);
  196.                 ptr = ptr->next;
  197.  
  198.                 free(path);
  199.                 free(logwrite);
  200.  
  201.                 free_message(&answer);
  202.             }
  203.  
  204.             matchlist_free(&covering);
  205.         }
  206.  
  207.         remove_request = request;
  208.         request = request->next;
  209.  
  210. /*        if (remove_request != NULL) {
  211.             remove_request->owner->requests--;
  212.             requestlist_remove(requests, remove_request);
  213.         }*/
  214.         remove_request->owner->requests--;
  215.         requestlist_remove(requests, remove_request);
  216.     }
  217.  
  218.     /* Per ogni sessione, scorro le richieste, e cerco
  219.      * se possono essere soddisfatte totalmente o parzialmente
  220.      * dalle offerte di tutte le altre sessioni.
  221.      * Se una sessione è terminata lato client e non ha più
  222.      * richieste non la cancello finchè non ho consumato tutte
  223.      * le sue offerte */
  224.  
  225.     /* Ora scorro tutte le sessioni a vedere quali non hanno più richieste
  226.      * pendenti e mando MSG_SHAREND e se il client è terminato elimino la
  227.      * sessione */
  228.  
  229.  
  230.     debug( fprintf(stderr, "perform_match: Match performed.\n"); );
  231. EC_CLEANUP_BGN
  232. /* TODO: implementare cleanup */
  233. debug( fprintf(stderr, "perform_match(): Oooops\n"); );
  234. pthread_mutex_unlock(&(offers->mutex));
  235. /*pthread_mutex_unlock(&(sessions->mutex));*/
  236. EC_CLEANUP_END
  237. }
  238.  
  239. static void void_handler(int signum) {
  240. }
  241.  
  242. void* match (void* arg) {
  243.     siginfo_t signum;
  244.     struct timespec ts;
  245.  
  246.     sigset_t set;
  247.  
  248.     ec_meno1( sigfillset(&set) );
  249.  
  250.     ec_meno1( signal_block_all() );
  251.     ec_meno1( signal_view_usr1(void_handler) );
  252.     ec_meno1( signal_view_int(void_handler) );
  253.     ec_meno1( signal_view_term(void_handler) );
  254.     ec_meno1( signal_view_pipe(void_handler) );
  255.  
  256.     ts.tv_sec = (time_t)30;
  257.     ts.tv_nsec = 0;
  258.  
  259.     memset(&signum, 0, sizeof(signum));
  260.  
  261.     while (1) {
  262.         if (sigtimedwait(&set, &signum, &ts) < 0) {
  263.             perform_match();
  264.             ec_nzero( sessionlist_cleanup(sessions) );
  265.         }
  266.         else {
  267.             debug( fprintf(stderr, "Got signal %d\n", signum.si_signo); );
  268.  
  269.             if (signum.si_signo == SIGUSR1) {
  270.                 perform_match();
  271.                 ec_nzero( sessionlist_cleanup(sessions) );
  272.             }
  273.  
  274.             if (signum.si_signo == SIGTERM || signum.si_signo == SIGINT) {
  275.                 debug( fprintf(stderr, "Got SIGINT or SIGTERM, dumping status\n"); );
  276.                 requestlist_print(requests);
  277.                 offerlist_print(offers);
  278.                 perform_match();
  279. /*                sessionlist_cleanup(sessions);*/
  280.                 ec_nzero( pthread_kill(request_dispatcher, SIGUSR2) );
  281.                 return NULL;
  282.             }
  283.         }
  284.     }
  285.  
  286. EC_CLEANUP_BGN
  287.     return NULL;
  288. EC_CLEANUP_END
  289.  
  290. }
  291.  
  292. void* worker (void* arg) {
  293.     session_t* me = (session_t*)arg;
  294.  
  295.     /* Messaggi di errore usati per risposte negative */
  296.  
  297.     const char* err_nocity      = "Unknown city %s";
  298.     const char* err_wrongpass   = "Wrong password";
  299.     const char* err_userlogged  = "User %s logged";
  300.     const char* err_invalid     = "Invalid request";
  301.  
  302.     int close_send = 0;
  303.  
  304.     message_t* message = NULL;
  305.     message_t* answer = NULL;
  306.  
  307.     double* distances = NULL;
  308.     char* errmsg = NULL;
  309.  
  310.     request_t* request = NULL;
  311.     offer_t* offer = NULL;
  312.     unsigned int* pprec = NULL;
  313.  
  314.     ec_meno1( signal_block_all() );
  315.     ec_meno1( signal_view_usr2(void_handler) );
  316.  
  317.     while (!me->exit) {
  318.         errmsg = NULL;
  319.         distances = NULL;
  320.         request = NULL;
  321.         offer = NULL;
  322.         answer = NULL;
  323.         message = NULL;
  324.  
  325.         ec_null( message = (message_t*)malloc(sizeof(message_t)) );
  326.         memset(message, 0, sizeof(message_t));
  327.         ec_meno1( receiveMessage(me->client_recv, message) );
  328.         print_message(message);
  329.  
  330.         /* Gestisco i vari tipi di messaggio */
  331.  
  332.         switch (message->type) {
  333.             char* user = NULL;
  334.             char* pass = NULL;
  335.             char* sock = NULL;
  336.  
  337.             char* offer_from = NULL;
  338.             char* offer_to = NULL;
  339.             char* offer_num = NULL;
  340.  
  341.             char* request_from = NULL;
  342.             char* request_to = NULL;
  343.  
  344.             char* strtok_status = NULL;
  345.  
  346.             case MSG_CONNECT:
  347.                 user = message->buffer;
  348.                 pass = user+strlen(user)+1;
  349.                 sock = pass+strlen(pass)+1;
  350.  
  351.                 me->user = strdup(user);
  352.  
  353.                 /* TODO: macro */
  354.                 /* TODO: VERIFICARE PARAMETRI INVALIDI LATO CLIENT */
  355.  
  356.                 debug( fprintf(stderr, "worker: opening connection for %s...\n", user); )
  357.                 ec_meno1( me->client_send = openConnection(sock) );
  358.  
  359.                 switch (login(user, pass)) {
  360.                     case LOGINOK:
  361.                         ec_null( answer = create_message(MSG_OK, NULL) );
  362.                     break;
  363.  
  364.                     case ERR_WRONGPASS:
  365.                         ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  366.                         sprintf(errmsg, err_wrongpass);
  367.                         ec_null( answer = create_message(MSG_NO, errmsg) );
  368.  
  369.                         me->exit = 1;
  370.                         close_send = 1;
  371.                     break;
  372.  
  373.                     case ERR_LOGGED:
  374.                         ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  375.                         sprintf(errmsg, err_userlogged, user);
  376.                         ec_null( answer = create_message(MSG_NO, errmsg) );
  377.  
  378.                         me->exit = 1;
  379.                         close_send = 1;
  380.                     break;
  381.                 }
  382.  
  383.             break;
  384.  
  385.             case MSG_EXIT:
  386.                 logout(me->user);
  387.                 me->exit = 1;
  388.             break;
  389.        
  390.             case MSG_OFFER:
  391.                 /* TODO: sistemare */
  392.                 strtok_status = NULL;
  393.  
  394.                 offer_from  = strtok_r(message->buffer, ":", &strtok_status);
  395.                 offer_to    = strtok_r(NULL, ":", &strtok_status);
  396.                 offer_num   = strtok_r(NULL, "\0", &strtok_status);
  397.  
  398.                 if (is_node(map, offer_from) < 0) {
  399.                     ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  400.                     sprintf(errmsg, err_nocity, offer_from);
  401.  
  402.                     ec_null( answer = create_message(MSG_NO, errmsg) );
  403.                 }
  404.                 else if (is_node(map, offer_to) < 0) {
  405.                     ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  406.                     sprintf(errmsg, err_nocity, offer_to);
  407.  
  408.                     ec_null( answer = create_message(MSG_NO, errmsg) );
  409.                 }
  410.                 else {
  411.                     pprec = NULL;
  412.  
  413.                     ec_null( offer = offer_new() );
  414.                     offer->from = is_node(map, offer_from);
  415.                     offer->to = is_node(map, offer_to);
  416.                     offer->num = strtod(offer_num, NULL);
  417.                     offer->owner = me;
  418.  
  419.                     ec_null( distances = dijkstra(map, offer->from, &pprec) );
  420.                     free(distances);
  421.  
  422.                     offer->pprec = pprec;
  423.                     offerlist_add(offers, offer);
  424.  
  425.                     ec_null( answer = create_message(MSG_OK, NULL) );
  426.                 }
  427.             break;
  428.  
  429.             case MSG_REQUEST:
  430.                 /* TODO: sistemare */
  431.                 strtok_status = NULL;
  432.  
  433.                 request_from    = strtok_r(message->buffer, ":", &strtok_status);
  434.                 request_to      = strtok_r(NULL, ":", &strtok_status);
  435.  
  436.                 if (is_node(map, request_from) < 0) {
  437.                     ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  438.                     sprintf(errmsg, err_nocity, request_from);
  439.                     ec_null( answer = create_message(MSG_NO, errmsg) );
  440.                 }
  441.                 else if (is_node(map, request_to) < 0) {
  442.                     ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  443.                     sprintf(errmsg, err_nocity, request_to);
  444.                     ec_null( answer = create_message(MSG_NO, errmsg) );
  445.                 }
  446.                 else {
  447.                     me->requests++;
  448.                     pprec = NULL;
  449.  
  450.                     ec_null( request = request_new() );
  451.  
  452.                     request->from = is_node(map, request_from);
  453.                     request->to = is_node(map, request_to);
  454.                     request->owner = me;
  455.  
  456.                     ec_null( distances = dijkstra(map, request->from, &pprec) );
  457.                     free(distances);
  458.  
  459.                     request->pprec = pprec;
  460.  
  461.                     request_print(request);
  462.                     requestlist_add(requests, request);
  463.  
  464.                     ec_null( answer = create_message(MSG_OK, NULL) );
  465.                 }
  466.             break;
  467.  
  468.             default:
  469.                 ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
  470.                 sprintf(errmsg, err_invalid);
  471.                 ec_null( answer = create_message(MSG_NO, errmsg) );
  472.             break;
  473.         }
  474.  
  475.         print_message(answer);
  476.         ec_meno1( sendMessage(me->client_send, answer) );
  477.  
  478.         free(errmsg);
  479.         free_message(&answer);
  480.         free_message(&message);
  481.     }
  482.  
  483. EC_CLEANUP_BGN
  484.     free(distances);
  485.     free_message(&answer);
  486.     free_message(&message);
  487.     offer_free(&offer);
  488.     request_free(&request);
  489.     closeSocket(me->client_recv);
  490.     if (close_send) closeSocket(me->client_send);
  491.     me->exit = 1;
  492.     return NULL;
  493. EC_CLEANUP_END
  494. }
  495.  
  496. void* dispatcher(void* param) {
  497.     int client_recv;
  498.     session_t* curr_session;
  499.  
  500.     ec_meno1( signal_block_all() );
  501.     ec_meno1( signal_view_usr2(void_handler) );
  502.  
  503.     debug( fprintf(stderr, "dispatcher: started\n"); );
  504.  
  505.     while (1) {
  506.         curr_session = NULL;
  507.         ec_meno1( client_recv = acceptConnection(server_sock) );
  508.  
  509.         /* Attivo un nuovo worker, che ascolterà su client_recv (il socket
  510.          * creato dalla accept(), e invierà sul socket inviato con un messaggio
  511.          * di connessione da parte del client */
  512.  
  513.         ec_null( curr_session = session_new() );
  514.         curr_session->client_recv = client_recv;
  515.  
  516.         ec_nzero( pthread_create(&(curr_session->thread), NULL, worker, (void*)curr_session) );
  517.  
  518.         sessionlist_add(sessions, curr_session);
  519.  
  520.         debug( fprintf(stderr, "dispatcher: new worker created\n"); );
  521.     }
  522.  
  523. EC_CLEANUP_BGN
  524.     session_free(&curr_session);
  525.     pthread_mutex_unlock(&(sessions->mutex));
  526.     return NULL;
  527. EC_CLEANUP_END
  528. }
  529.  
  530. int main (int argc, char* argv[]) {
  531.     FILE* cityfile;
  532.     FILE* edgefile;
  533.  
  534.     int ret = EXIT_FAILURE;
  535.  
  536.     if (argc != 3) return ret;
  537.  
  538.     ec_meno1( signal_block_all() );
  539.  
  540.     ec_null( cityfile = fopen(argv[1], "r") );
  541.     ec_null( edgefile = fopen(argv[2], "r") );
  542.     ec_null( map = load_graph(cityfile, edgefile) );
  543.     ec_null( logfile = fopen(LOGFILE, "w") );
  544.  
  545.     ec_meno1( server_sock = createServerChannel(SOCKNAME) );
  546.     /*ec_null( authlist = authlist_new() );*/
  547.     ec_null( offers = offerlist_new() );
  548.     ec_null( requests = requestlist_new() );
  549.     ec_null( sessions = sessionlist_new() );
  550.  
  551.     login_init();
  552.  
  553.     ec_nzero( pthread_create(&request_dispatcher, NULL, dispatcher, NULL) );
  554.     ec_nzero( pthread_create(&request_match, NULL, match, logfile) );
  555.     ec_nzero( pthread_join(request_match, NULL) );
  556.  
  557.     debug( fprintf(stderr, "Match terminated...\n"); );
  558.  
  559.     ec_nzero( pthread_join(request_dispatcher, NULL) );
  560.  
  561.     debug( fprintf(stderr, "Dispatcher terminated...\n"); );
  562.  
  563.     ret = EXIT_SUCCESS;
  564.  
  565.     sessionlist_print(sessions);
  566.     requestlist_print(requests);
  567.     offerlist_print(offers);
  568.  
  569.     /* Apertura del socket, inizio server */
  570. /*    return 0;*/
  571.  
  572. EC_CLEANUP_BGN
  573.         sessionlist_free(&sessions);
  574.         requestlist_free(&requests);
  575.         offerlist_free(&offers);
  576. /*        authlist_free(&authlist);*/
  577.  
  578.         closeSocket(server_sock);
  579.  
  580.         debug( fprintf(stderr, "Unlinking\n"); );
  581.         unlink(SOCKNAME);
  582.  
  583.         free_graph(&map);
  584.  
  585.         if (logfile != NULL) fclose(logfile);
  586.         if (cityfile != NULL) fclose(cityfile);
  587.         if (edgefile != NULL) fclose(edgefile);
  588.  
  589.         return ret;
  590. EC_CLEANUP_END
  591. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement