|
- #include <stdio.h>
- #include <unistd.h>
- #include <stdlib.h>
- #include <string.h>
- #include <pthread.h>
- #include <semaphore.h>
- #include "queue.h"
- void *thread_function(void *arg);
- #define WORK_SIZE 1024
- #define THREAD_NUM 10
- //互斥量work_mutex 将time_to_exit和strQueue管理在其中
- pthread_mutex_t work_mutex;
- queue_t* strQueue;
- int time_to_exit = 0;
- //信号量的作用是不通过轮询让多线程阻塞或者取消阻塞
- sem_t bin_sem;
- int main()
- {
- char *local_work_area;
- int i, res;
- int thread_id[THREAD_NUM];
- pthread_t thread[THREAD_NUM];
- void *thread_result;
- strQueue = init_queue();
- res = pthread_mutex_init(&work_mutex, NULL);
- if (res != 0)
- {
- perror("Mutex initialization failed");
- exit(EXIT_FAILURE);
- }
- res = sem_init(&bin_sem, 0, 0);
- if (res != 0)
- {
- perror("Semaphore initialization failed");
- exit(EXIT_FAILURE);
- }
- for (i = 0; i < THREAD_NUM; i++)
- {
- thread_id[i] = i;
- res = pthread_create(&thread[i], NULL, thread_function, &thread_id[i]);
- if (res != 0)
- {
- perror("Thread creation failed");
- exit(EXIT_FAILURE);
- }
- }
- printf("Input some text. Enter 'end' to finish\n");
- while (1)
- {
- local_work_area = (char*) malloc(WORK_SIZE);
- printf(">");
- fgets(local_work_area, WORK_SIZE, stdin); //gets(local_work_area);
- if (strcmp(local_work_area, "end\n") == 0)
- {
- printf("program ready to exit.\n");
- pthread_mutex_lock(&work_mutex);
- time_to_exit = 1;
- pthread_mutex_unlock(&work_mutex);
- break;
- }
- else if (strcmp(local_work_area, "\n") == 0)
- free(local_work_area);
- else
- {
- //begin input queue
- pthread_mutex_lock(&work_mutex);
- //将字符串的地址 加入到队列中
- enqueue(strQueue, local_work_area);
- pthread_mutex_unlock(&work_mutex);
- //通知其他线程,有字符串加入到队列中了
- sem_post(&bin_sem);
- }
- }
- printf("\nWaiting for thread to finish...\n");
- for (i = 0; i < THREAD_NUM; i++)
- {
- sem_post(&bin_sem); //激活正在阻塞的线程
- }
- //上,下两段for语句,不能组合在一个for中。
- //因为thread[i]完全有可能不把收到sem_post()的信号,就会一无限等待
- for (i = 0; i < THREAD_NUM; i++)
- {
- res = pthread_join(thread[i], &thread_result);
- if (res != 0)
- {
- perror("Thread join failed");
- exit(EXIT_FAILURE);
- }
- }
- pthread_mutex_destroy(&work_mutex);
- sem_destroy(&bin_sem);
- exit(EXIT_SUCCESS);
- }
- /**
- * src转换为大写到dst
- */
- void CopyAndUpper(char *dst, char *src)
- {
- while (((*src) != '\n') && ((*src) != '\0'))
- {
- *dst = toupper(*src);
- src++;
- dst++;
- }
- *dst = '\0';
- }
- void *thread_function(void *arg)
- {
- char strlocal[WORK_SIZE], strtemp[WORK_SIZE];
- int thread_id = *(int*) (arg);
- char *p;
- while (1)
- {
- //排队
- sem_wait(&bin_sem);
- pthread_mutex_lock(&work_mutex);
- if (time_to_exit == 1)
- {
- pthread_mutex_unlock(&work_mutex);
- break;
- }
- //出队,获取到地址后,strcpy拷贝到strtemp
- dequeue(strQueue, &p);
- strcpy(strtemp, p);
- pthread_mutex_unlock(&work_mutex); //互斥量时间越少越好
- CopyAndUpper(strlocal, strtemp);
- if (strlen(strlocal) > 3)
- sleep(strlen(strlocal)); //模拟长时间运算,每字符一秒
- printf("thread-%d: %s len=%d\n", thread_id, strlocal, strlen(strlocal));
- free(p);
- }
- pthread_exit(0);
- }
复制代码 |
|