Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "dgraph.h"
- #include "shortestpath.h"
- #include "comsock.h"
- #include "messages.h"
- #include "sigutils.h"
- #include "mgutils.h"
- #include "macro.h"
- #include <pthread.h>
- #include <stdlib.h>
- #include <signal.h>
- #include <string.h>
- #include <errno.h>
- #include <unistd.h>
- #define LOGFILE "./mgcars.log"
- #define SOCKNAME "./tmp/cars.sck"
- graph_t* map;
- sessionlist_t* sessions;
- offerlist_t* offers;
- requestlist_t* requests;
- /* Socket creato all'inizio da mgcars e attivo
- * fino alla chiusura del server */
- int server_sock;
- int thread_terminate = 0;
- pthread_t request_dispatcher;
- pthread_t request_match;
- FILE* logfile;
- int find_covering(unsigned int from, unsigned int to, unsigned int* pprec, matchlist_t* covering) {
- debug( fprintf(stderr, "find_covering: from %d to %d\n", from, to); );
- /* Passo base */
- if (from == to) return 1;
- else {
- offer_t* offer = offers->first;
- int found = 0;
- /* Scorro tutte le offerte, la prima che mi fa fare almeno
- * uno step del mio percorso da request->from a request->to
- * valutato al contrario per questione di comodità
- * lo inserisco in coda alla lista di match e ricorro sul
- * pezzo di percorso non ancora valutato, se arrivo ad una
- * richiesta con partenza e arrivo uguali vuol dire che ho
- * trovato il match e sono a posto, altrimenti ritorno 0
- * e la matchlist non è significativa (e va liberata) */
- while (offer != NULL && !found) {
- int a, b, tmpfrom;
- int count;
- a = to;
- b = offer->to;
- count = 0;
- /* Mettere offer->num qua è un barbatrucco, ma sennò il codice
- * diventava ancora più illeggibile */
- while (b != offer->from && b != a) b = offer->pprec[b];
- while (a == b && a >= 0 && b >= 0) {
- count++;
- tmpfrom = a;
- a = pprec[a];
- b = offer->pprec[b];
- }
- if (count > 1) {
- /* Calcolo la rotta rimanente in termini di from e to */
- match_t* match_add;
- ec_null( match_add = match_new() );
- match_add->from = tmpfrom;
- match_add->to = to;
- match_add->offer_ref = offer;
- matchlist_add(covering, match_add);
- if (!find_covering(from, tmpfrom, pprec, covering)) {
- matchlist_remove(covering, match_add);
- }
- else return 1;
- }
- offer = offer->next;
- }
- return 0;
- }
- EC_CLEANUP_BGN
- return 0;
- EC_CLEANUP_END
- }
- void perform_match() {
- request_t* request = NULL;
- message_t* answer = NULL;
- request_t* remove_request;
- offer_t* remove_offer;
- matchlist_t* covering;
- char* logwrite;
- char* path;
- request = requests->first;
- while (request != NULL) {
- int found;
- debug( fprintf(stderr, "perform_match(): evaluating\n"); );
- request_print(request);
- remove_request = NULL;
- covering = matchlist_new();
- ec_nzero( pthread_mutex_lock(&(offers->mutex)) );
- found = find_covering(request->from, request->to, request->pprec, covering);
- ec_nzero( pthread_mutex_unlock(&(offers->mutex)) );
- if (found) {
- match_t* ptr;
- remove_request = request;
- ptr = covering->first;
- while (ptr != NULL) {
- int size;
- debug( fprintf(stderr, "perform_match: ptr->from: %d, ptr->to: %d\n", ptr->from, ptr->to); );
- ec_null( path = shpath_to_string(map, ptr->from, ptr->to, request->pprec) );
- size = strlen(path)+strlen(session_user(request->owner))+strlen(session_user(ptr->offer_ref->owner))+3;
- ec_null( logwrite = malloc(size*sizeof(char)) );
- sprintf(logwrite, "%s$%s$%s", session_user(ptr->offer_ref->owner), session_user(request->owner), path);
- fwrite(logwrite, strlen(logwrite), 1, logfile);
- fwrite("\n", 1, 1, logfile);
- ec_null( answer = create_message(MSG_SHARE, logwrite) );
- ec_meno1( sendMessage(request->owner->client_send, answer) );
- if (ptr->to != ptr->offer_ref->to) {
- offer_t* tmp;
- double* dist;
- ec_null( tmp = offer_new() );
- tmp->from = ptr->to;
- tmp->to = ptr->offer_ref->to;
- tmp->num = 1;
- tmp->pprec = NULL;
- /* cleanup interno */
- if ( (dist = dijkstra(map, tmp->from, &(tmp->pprec))) == NULL ) {
- offer_free(&tmp);
- /* cleanup della funzione */
- goto ec_cleanup_bgn;
- }
- free(dist);
- tmp->owner = ptr->offer_ref->owner;
- offerlist_add(offers, tmp);
- }
- if (ptr->from != ptr->offer_ref->from) {
- offer_t* tmp;
- double* dist;
- ec_null( tmp = offer_new() );
- tmp->from = ptr->offer_ref->from;
- tmp->to = ptr->from;
- tmp->num = 1;
- tmp->pprec = NULL;
- /* cleanup interno */
- if ( (dist = dijkstra(map, tmp->from, &(tmp->pprec))) == NULL ) {
- offer_free(&tmp);
- /* cleanup della funzione */
- goto ec_cleanup_bgn;
- }
- free(dist);
- tmp->owner = ptr->offer_ref->owner;
- offerlist_add(offers, tmp);
- }
- fflush(logfile);
- remove_offer = ptr->offer_ref;
- ptr->offer_ref->num--;
- if (remove_offer->num == 0) offerlist_remove(offers, remove_offer);
- ptr = ptr->next;
- free(path);
- free(logwrite);
- free_message(&answer);
- }
- matchlist_free(&covering);
- }
- remove_request = request;
- request = request->next;
- /* if (remove_request != NULL) {
- remove_request->owner->requests--;
- requestlist_remove(requests, remove_request);
- }*/
- remove_request->owner->requests--;
- requestlist_remove(requests, remove_request);
- }
- /* Per ogni sessione, scorro le richieste, e cerco
- * se possono essere soddisfatte totalmente o parzialmente
- * dalle offerte di tutte le altre sessioni.
- * Se una sessione è terminata lato client e non ha più
- * richieste non la cancello finchè non ho consumato tutte
- * le sue offerte */
- /* Ora scorro tutte le sessioni a vedere quali non hanno più richieste
- * pendenti e mando MSG_SHAREND e se il client è terminato elimino la
- * sessione */
- debug( fprintf(stderr, "perform_match: Match performed.\n"); );
- EC_CLEANUP_BGN
- /* TODO: implementare cleanup */
- debug( fprintf(stderr, "perform_match(): Oooops\n"); );
- pthread_mutex_unlock(&(offers->mutex));
- /*pthread_mutex_unlock(&(sessions->mutex));*/
- EC_CLEANUP_END
- }
- static void void_handler(int signum) {
- }
- void* match (void* arg) {
- siginfo_t signum;
- struct timespec ts;
- sigset_t set;
- ec_meno1( sigfillset(&set) );
- ec_meno1( signal_block_all() );
- ec_meno1( signal_view_usr1(void_handler) );
- ec_meno1( signal_view_int(void_handler) );
- ec_meno1( signal_view_term(void_handler) );
- ec_meno1( signal_view_pipe(void_handler) );
- ts.tv_sec = (time_t)30;
- ts.tv_nsec = 0;
- memset(&signum, 0, sizeof(signum));
- while (1) {
- if (sigtimedwait(&set, &signum, &ts) < 0) {
- perform_match();
- ec_nzero( sessionlist_cleanup(sessions) );
- }
- else {
- debug( fprintf(stderr, "Got signal %d\n", signum.si_signo); );
- if (signum.si_signo == SIGUSR1) {
- perform_match();
- ec_nzero( sessionlist_cleanup(sessions) );
- }
- if (signum.si_signo == SIGTERM || signum.si_signo == SIGINT) {
- debug( fprintf(stderr, "Got SIGINT or SIGTERM, dumping status\n"); );
- requestlist_print(requests);
- offerlist_print(offers);
- perform_match();
- /* sessionlist_cleanup(sessions);*/
- ec_nzero( pthread_kill(request_dispatcher, SIGUSR2) );
- return NULL;
- }
- }
- }
- EC_CLEANUP_BGN
- return NULL;
- EC_CLEANUP_END
- }
- void* worker (void* arg) {
- session_t* me = (session_t*)arg;
- /* Messaggi di errore usati per risposte negative */
- const char* err_nocity = "Unknown city %s";
- const char* err_wrongpass = "Wrong password";
- const char* err_userlogged = "User %s logged";
- const char* err_invalid = "Invalid request";
- int close_send = 0;
- message_t* message = NULL;
- message_t* answer = NULL;
- double* distances = NULL;
- char* errmsg = NULL;
- request_t* request = NULL;
- offer_t* offer = NULL;
- unsigned int* pprec = NULL;
- ec_meno1( signal_block_all() );
- ec_meno1( signal_view_usr2(void_handler) );
- while (!me->exit) {
- errmsg = NULL;
- distances = NULL;
- request = NULL;
- offer = NULL;
- answer = NULL;
- message = NULL;
- ec_null( message = (message_t*)malloc(sizeof(message_t)) );
- memset(message, 0, sizeof(message_t));
- ec_meno1( receiveMessage(me->client_recv, message) );
- print_message(message);
- /* Gestisco i vari tipi di messaggio */
- switch (message->type) {
- char* user = NULL;
- char* pass = NULL;
- char* sock = NULL;
- char* offer_from = NULL;
- char* offer_to = NULL;
- char* offer_num = NULL;
- char* request_from = NULL;
- char* request_to = NULL;
- char* strtok_status = NULL;
- case MSG_CONNECT:
- user = message->buffer;
- pass = user+strlen(user)+1;
- sock = pass+strlen(pass)+1;
- me->user = strdup(user);
- /* TODO: macro */
- /* TODO: VERIFICARE PARAMETRI INVALIDI LATO CLIENT */
- debug( fprintf(stderr, "worker: opening connection for %s...\n", user); )
- ec_meno1( me->client_send = openConnection(sock) );
- switch (login(user, pass)) {
- case LOGINOK:
- ec_null( answer = create_message(MSG_OK, NULL) );
- break;
- case ERR_WRONGPASS:
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_wrongpass);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- me->exit = 1;
- close_send = 1;
- break;
- case ERR_LOGGED:
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_userlogged, user);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- me->exit = 1;
- close_send = 1;
- break;
- }
- break;
- case MSG_EXIT:
- logout(me->user);
- me->exit = 1;
- break;
- case MSG_OFFER:
- /* TODO: sistemare */
- strtok_status = NULL;
- offer_from = strtok_r(message->buffer, ":", &strtok_status);
- offer_to = strtok_r(NULL, ":", &strtok_status);
- offer_num = strtok_r(NULL, "\0", &strtok_status);
- if (is_node(map, offer_from) < 0) {
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_nocity, offer_from);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- }
- else if (is_node(map, offer_to) < 0) {
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_nocity, offer_to);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- }
- else {
- pprec = NULL;
- ec_null( offer = offer_new() );
- offer->from = is_node(map, offer_from);
- offer->to = is_node(map, offer_to);
- offer->num = strtod(offer_num, NULL);
- offer->owner = me;
- ec_null( distances = dijkstra(map, offer->from, &pprec) );
- free(distances);
- offer->pprec = pprec;
- offerlist_add(offers, offer);
- ec_null( answer = create_message(MSG_OK, NULL) );
- }
- break;
- case MSG_REQUEST:
- /* TODO: sistemare */
- strtok_status = NULL;
- request_from = strtok_r(message->buffer, ":", &strtok_status);
- request_to = strtok_r(NULL, ":", &strtok_status);
- if (is_node(map, request_from) < 0) {
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_nocity, request_from);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- }
- else if (is_node(map, request_to) < 0) {
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_nocity, request_to);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- }
- else {
- me->requests++;
- pprec = NULL;
- ec_null( request = request_new() );
- request->from = is_node(map, request_from);
- request->to = is_node(map, request_to);
- request->owner = me;
- ec_null( distances = dijkstra(map, request->from, &pprec) );
- free(distances);
- request->pprec = pprec;
- request_print(request);
- requestlist_add(requests, request);
- ec_null( answer = create_message(MSG_OK, NULL) );
- }
- break;
- default:
- ec_null( errmsg = (char*)malloc(1024*sizeof(char)) );
- sprintf(errmsg, err_invalid);
- ec_null( answer = create_message(MSG_NO, errmsg) );
- break;
- }
- print_message(answer);
- ec_meno1( sendMessage(me->client_send, answer) );
- free(errmsg);
- free_message(&answer);
- free_message(&message);
- }
- EC_CLEANUP_BGN
- free(distances);
- free_message(&answer);
- free_message(&message);
- offer_free(&offer);
- request_free(&request);
- closeSocket(me->client_recv);
- if (close_send) closeSocket(me->client_send);
- me->exit = 1;
- return NULL;
- EC_CLEANUP_END
- }
- void* dispatcher(void* param) {
- int client_recv;
- session_t* curr_session;
- ec_meno1( signal_block_all() );
- ec_meno1( signal_view_usr2(void_handler) );
- debug( fprintf(stderr, "dispatcher: started\n"); );
- while (1) {
- curr_session = NULL;
- ec_meno1( client_recv = acceptConnection(server_sock) );
- /* Attivo un nuovo worker, che ascolterà su client_recv (il socket
- * creato dalla accept(), e invierà sul socket inviato con un messaggio
- * di connessione da parte del client */
- ec_null( curr_session = session_new() );
- curr_session->client_recv = client_recv;
- ec_nzero( pthread_create(&(curr_session->thread), NULL, worker, (void*)curr_session) );
- sessionlist_add(sessions, curr_session);
- debug( fprintf(stderr, "dispatcher: new worker created\n"); );
- }
- EC_CLEANUP_BGN
- session_free(&curr_session);
- pthread_mutex_unlock(&(sessions->mutex));
- return NULL;
- EC_CLEANUP_END
- }
- int main (int argc, char* argv[]) {
- FILE* cityfile;
- FILE* edgefile;
- int ret = EXIT_FAILURE;
- if (argc != 3) return ret;
- ec_meno1( signal_block_all() );
- ec_null( cityfile = fopen(argv[1], "r") );
- ec_null( edgefile = fopen(argv[2], "r") );
- ec_null( map = load_graph(cityfile, edgefile) );
- ec_null( logfile = fopen(LOGFILE, "w") );
- ec_meno1( server_sock = createServerChannel(SOCKNAME) );
- /*ec_null( authlist = authlist_new() );*/
- ec_null( offers = offerlist_new() );
- ec_null( requests = requestlist_new() );
- ec_null( sessions = sessionlist_new() );
- login_init();
- ec_nzero( pthread_create(&request_dispatcher, NULL, dispatcher, NULL) );
- ec_nzero( pthread_create(&request_match, NULL, match, logfile) );
- ec_nzero( pthread_join(request_match, NULL) );
- debug( fprintf(stderr, "Match terminated...\n"); );
- ec_nzero( pthread_join(request_dispatcher, NULL) );
- debug( fprintf(stderr, "Dispatcher terminated...\n"); );
- ret = EXIT_SUCCESS;
- sessionlist_print(sessions);
- requestlist_print(requests);
- offerlist_print(offers);
- /* Apertura del socket, inizio server */
- /* return 0;*/
- EC_CLEANUP_BGN
- sessionlist_free(&sessions);
- requestlist_free(&requests);
- offerlist_free(&offers);
- /* authlist_free(&authlist);*/
- closeSocket(server_sock);
- debug( fprintf(stderr, "Unlinking\n"); );
- unlink(SOCKNAME);
- free_graph(&map);
- if (logfile != NULL) fclose(logfile);
- if (cityfile != NULL) fclose(cityfile);
- if (edgefile != NULL) fclose(edgefile);
- return ret;
- EC_CLEANUP_END
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement