#include #include #include #include #include #include #include #include #include #include #include #define WCOUNT 10 /* count of writers */ #define RCOUNT 20 /* count of readers */ #define WMSG_COUNT 30 #define RMSG_COUNT 50 #define WTOKENQ 2342 /* writing token queue key */ #define RREADERSQ 2343 /* reading readers queue key */ #define MAX_WSLEEP_TIME 30 /* writers waiting in 50ms interval */ #define MAX_RSLEEP_TIME 20 #define TOKEN_MSG 1 #define MSG_SIZE sizeof(char) typedef struct { long mtype; char flag; } my_msg_type; typedef struct { long pid; char flag; } rr_msg_type; int wfile, rfile; int a; pid_t forkpid; /* PID returned by fork() */ int tokenq; /* token queue; not empty - reading allowed */ int rrq; /* running readers queue contains PIDs of reading processes */ my_msg_type my_msg; unsigned long sleep_time; struct msqid_ds qparams; void writer(void) { pid_t pid; /* current process PID */ int length; int msg_num = 0; char pid_str[10]; /* string for writing to wfile */ pid = getpid(); srand((unsigned) rand() + pid); printf("Writer %d started.\n", pid); while ( msg_num++ < WMSG_COUNT ) { sleep_time = 50000 * (rand() % MAX_WSLEEP_TIME); if ( sleep_time != 0 ) { printf("Writer %d is falling asleep for %lu ms.\n", pid, sleep_time / 1000); usleep(sleep_time); printf("Writer %d is wakeing up.\n", pid); } printf("Writer %d is waiting for writing token.\n", pid); if ( msgrcv(tokenq, &my_msg, MSG_SIZE, TOKEN_MSG, 0) == MSG_SIZE ) { printf("Writer %d - token has been received.\n", pid); do { msgctl(rrq, IPC_STAT, &qparams); } while ( qparams.msg_qnum != 0 ); sprintf(pid_str, "%d\n", pid); length = strlen(pid_str); if ( write(wfile, pid_str, length) < length ) printf("Error during writing (%d).\n", pid); else printf("Writer %d - data %d has been written successfully.\n", pid, msg_num); if ( ( msgsnd(tokenq,(void *)&my_msg, MSG_SIZE, IPC_NOWAIT) ) == -1 ) exit(1); // exit(perror("Error: Sending message"), TOKEN_MSG); else printf("Writer %d - token has been sent.\n", pid); } else printf("Writer %d is not listening.\n", pid); } } void reader(void) { pid_t pid; /* current process PID */ int bytesread, msg_num = 0; char pid_str[10]; /* string for reading from rfile */ char *s; struct msqid_ds qparams; rr_msg_type rr_msg; pid = getpid(); srand((unsigned) rand() + pid); printf("Reader %d started.\n", pid); while ( msg_num < RMSG_COUNT ) { sleep_time = 50000 * (rand() % MAX_RSLEEP_TIME); if ( sleep_time != 0 ) { printf("Reader %d is falling asleep for %lu ms.\n", pid, sleep_time / 1000); usleep(sleep_time); printf("Reader %d is wakeing up.\n", pid); } rr_msg.pid = pid; printf("Reader %d is waiting for writing token.\n", pid); do { msgctl(tokenq, IPC_STAT, &qparams); } while ( qparams.msg_qnum == 0 ); if (( msgsnd(rrq,(void *)&rr_msg, MSG_SIZE, IPC_NOWAIT) ) == -1) exit(1); else printf("Reader %d - PID has been sent.\n", pid); msgctl(tokenq, IPC_STAT, &qparams); if ( qparams.msg_qnum != 0 ) { /* writing token is still in queue */ if (( bytesread = read(rfile, pid_str, sizeof(pid_str)) ) == -1 ) printf("Error during reading (%d).\n", pid); if (bytesread == 0) { /* Are we at the end of the file */ lseek(rfile, 0, SEEK_SET); /* seek to the begining of the file */ printf("Reader %d - lseek to the begining of the file.\n", pid); } else { for (s = pid_str; ( s - pid_str < sizeof(pid_str) ) ; s++) if (*s == *"\n") { *s = *"\0"; lseek(rfile, s - pid_str + 1 - bytesread, SEEK_CUR); /* set file position to next PID */ break; } printf("Reader %d - data %d `%s' has been read successfully.\n", pid, ++msg_num ,pid_str); } } else { printf("Reader is not reading, writing token has disappeared.\a\n"); } // usleep(100000); if ( msgrcv(rrq, &rr_msg, MSG_SIZE, pid, 0) == MSG_SIZE ) printf("Reader %d - PID has been received.\n", pid); } } void parent(void) { int pid_stat; int rwcount; /* running writers count */ printf("Parent process %d is sending writing token.\n", getpid()); if (( msgsnd(tokenq,(void *)&my_msg, MSG_SIZE, IPC_NOWAIT) ) == -1) exit(1); for ( rwcount = WCOUNT+RCOUNT; rwcount > 0; rwcount-- ) { /* get defunct (zombie) process */ printf("Child %d ", wait(&pid_stat)); if ( WIFEXITED(pid_stat) ) printf("has finished with exit code %d.\n", WEXITSTATUS(pid_stat)); else { printf("has terminated abnormally.\n"); if ( ( msgsnd(tokenq,(void *)&my_msg, MSG_SIZE, IPC_NOWAIT) ) == -1 ) exit(1); } // exit(perror("Error: Sending message"), TOKEN_MSG); } msgctl(tokenq, IPC_RMID, NULL); msgctl(rrq, IPC_RMID, NULL); close(wfile); } int main(int argc, char *argv[]) { if ( argc == 1 ) { my_msg.mtype = 1; if ( (wfile = open("soubor", O_CREAT | O_WRONLY | O_APPEND, 0644)) == -1 ) return(perror("Error: Opening file"), 1); if ( (rfile = open("soubor", O_RDONLY, 0644)) == -1 ) return(perror("Error: Opening file"), 1); if (( tokenq = msgget((key_t) WTOKENQ , 0666 | IPC_CREAT) ) == -1 ) return(perror("Error: Getting writers queue"), 1); if (( rrq = msgget((key_t) RREADERSQ, 0666 | IPC_CREAT) ) == -1 ) return(perror("Error: Getting readers queue"), 1); for ( a = 0; a < WCOUNT; a++ ) { switch ( forkpid = fork() ) { case -1: return(perror("Error: Forking processes"), 1); case 0: a = WCOUNT; break; /* child */ } } if ( forkpid == 0 ) writer(); else { for ( a = 0; a < RCOUNT; a++ ) { switch ( forkpid = fork() ) { case -1: return(perror("Error: Forking processes"), 1); case 0: a = RCOUNT; break; /* child */ } } if ( forkpid == 0 ) reader(); else parent(); } } else { if (( argc == 2 ) && ((( strcmp(argv[1],"-h") == 0 )) || ( strcmp(argv[1],"--help") == 0 ))) printf("rwproblem 1.0\n" "rwproblem implements readers and writers problem with\n" "message-passing synchronization.\n" "This program comes with ABSOLUTELY NO WARRANTY.\n" "This is free software, and you are welcome to redistribute it\n" "under the GNU GENERAL PUBLIC LICENCE, version 2.\n" "Look at http://www.gnu.org for the licence.\n" "\nSyntax: rwproblem\n" "\nAuthor: Jiri VERUNEK \n" "project homepage: http://verunek.host.sk\n" " http://verunek.oceany.cz\n" ""); else { printf("This is rwproblem.\n" "Try --help for more information.\n"); return 1; } } return 0; }