一、问题描写叙述

如今以C/S架构为例。client向server端发送要查找的数字,server端启动线程中的线程进行对应的查询。将查询结果显示出来。

二、实现方案

1. 整个project以client、server、lib组织。例如以下图所看到的:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

2. 进入lib。

socket.h、socket.c

/**
@file socket.h
@brief Socket API header file TCP socket utility functions, it provides simple functions that helps
to build TCP client/server. @author wangzhicheng
*/
#ifndef SOCKET_H
#define SOCKET_H #include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <resolv.h>
#include <fcntl.h> #define MAX_CONNECTION 20 int TCPServerInit(int port, int *serverfd);
int TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr);
int TCPServerSelect(int* serverfdlist, int num, int *clientfd, char *clientaddr);
int TCPClientInit(int *clientfd);
int TCPClientConnect(const int clientfd, const char *addr, int port);
int TCPNonBlockRead(int clientfd, char* buf, int size);
int TCPBlockRead(int clientfd, char* buf, int size);
int TCPWrite(int clientfd, char* buf, int size);
void TCPClientClose(int sockfd);
void TCPServerClose(int sockfd); #endif

socket.c

#include "socket.h"
/*
* @brief initialize TCP server
* @port port number for socket
* @serverfd server socket fd
* return server socked fd for success, on error return error code
* */
int TCPServerInit(int port, int *serverfd) {
struct sockaddr_in dest;
// create socket , same as client
*serverfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if(*serverfd < 0) return -1;
/// initialize structure dest
memset((void*)&dest, '\0', sizeof(dest));
dest.sin_family = PF_INET;
dest.sin_port = htons(port);
dest.sin_addr.s_addr = INADDR_ANY;
// Assign a port number to socket
bind( *serverfd, (struct sockaddr*)&dest, sizeof(dest)); return *serverfd;
}
/*
* @brief wait client connect
* @serverfd server socket fd
* @clientfd client socket fd
* @clientaddr client address which connect to server
* return client fd, on error return error code
* */
int TCPServerWaitConnection(int serverfd, int *clientfd, char *clientaddr) {
struct sockaddr_in client_addr;
socklen_t addrlen = sizeof(client_addr);
// make it listen to socket
listen( serverfd, 20);
// Wait and Accept connection
*clientfd = accept(serverfd, (struct sockaddr*)&client_addr, &addrlen);
strcpy( clientaddr, (const char *)( inet_ntoa( client_addr.sin_addr))); return *clientfd;
}
/*
* @brief initialize TCP client
* @clientfd client socket fd
* return client socked fd for success, on error return error code
*/
int TCPClientInit(int *clientfd) {
*clientfd = socket(PF_INET, SOCK_STREAM, 0); return *clientfd;
}
/*
* @brief connect to TCP server
* @clientfd client socket fd
* @addr server address
* @port server port number
* return 0 for success, on error -1 is returned
*/
int TCPClientConnect(const int clientfd, const char *addr, int port) {
struct sockaddr_in dest;
// initialize value in dest
memset(&dest, '\0', sizeof(dest));
dest.sin_family = PF_INET;
dest.sin_port = htons(port);
inet_aton(addr, &dest.sin_addr); // Connecting to server
return connect(clientfd, (struct sockaddr*)&dest, sizeof(dest));
}
/*
* @brief non-block read from TCP socket
* @clientfd socket fd
* @buf input buffer
* @size buffer size
* return the length of read data
*/
int TCPNonBlockRead(int clientfd, char* buf, int size) {
int opts;
opts = fcntl(clientfd, F_GETFL);
opts = (opts | O_NONBLOCK);
fcntl(clientfd, F_SETFL, opts); return recv(clientfd, buf, size, 0);
}
/*
* @brief block read from TCP socket
* @clientfd socket fd
* @buf input buffer
* @size buf size
* return the length of read data
*/
int TCPBlockRead(int clientfd, char* buf, int size) {
int opts;
opts = fcntl(clientfd, F_GETFL);
opts = (opts & ~O_NONBLOCK);
fcntl(clientfd, F_SETFL, opts); return recv(clientfd, buf, size, 0);
}
/*
* @brief write to TCP socket
* @clientfd socket fd
* @buf output buf
* @size output buf length
* return the length of the actual written data, -1: disconnected
*/
int TCPWrite(int clientfd, char* buf, int size) {
int len= 0;
/* set socket to nonblock */
int ret = fcntl(clientfd, F_GETFL);
ret |= O_NONBLOCK;
if (fcntl(clientfd, F_SETFL, ret) < 0 ) {
printf("set socket to nonblock fail [%d] !\n", errno);
}
len = send(clientfd, buf, size, MSG_NOSIGNAL); return len;
}
/*
* @brief close the tcp connection
* @sockfd socket fd
* return none
*/
void TCPConnectionClose(int sockfd) {
close(sockfd);
}

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H #include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
struct job
{
void* (*callback_function)(void *arg); //线程回调函数
void *arg; //回调函数參数
struct job *next;
}; struct threadpool
{
int thread_num; //线程池中开启线程的个数
int queue_max_num; //队列中最大job的个数
struct job *head; //指向job的头指针
struct job *tail; //指向job的尾指针
pthread_t *pthreads; //线程池中全部线程的pthread_t
pthread_mutex_t mutex; //相互排斥信号量
pthread_cond_t queue_empty; //队列为空的条件变量
pthread_cond_t queue_not_empty; //队列不为空的条件变量
pthread_cond_t queue_not_full; //队列不为满的条件变量
int queue_cur_num; //队列当前的job个数
int queue_close; //队列是否已经关闭
int pool_close; //线程池是否已经关闭
}; //================================================================================================
//函数名: threadpool_init
//函数描写叙述: 初始化线程池
//输入: [in] thread_num 线程池开启的线程个数
// [in] queue_max_num 队列的最大job个数
//输出: 无
//返回: 成功:线程池地址 失败:NULL
//================================================================================================
struct threadpool* threadpool_init(int thread_num, int queue_max_num); //================================================================================================
//函数名: threadpool_add_job
//函数描写叙述: 向线程池中加入任务
//输入: [in] pool 线程池地址
// [in] callback_function 回调函数
// [in] arg 回调函数參数
//输出: 无
//返回: 成功:0 失败:-1
//================================================================================================
int threadpool_add_job(struct threadpool *pool, void* (*callback_function)(void *arg), void *arg); //================================================================================================
//函数名: threadpool_destroy
//函数描写叙述: 销毁线程池
//输入: [in] pool 线程池地址
//输出: 无
//返回: 成功:0 失败:-1
//================================================================================================
int threadpool_destroy(struct threadpool *pool); //================================================================================================
//函数名: threadpool_function
//函数描写叙述: 线程池中线程函数
//输入: [in] arg 线程池地址
//输出: 无
//返回: 无
//================================================================================================
void* threadpool_function(void* arg);
#endif

threadpool.c

#include "threadpool.h"

struct threadpool* threadpool_init(int thread_num, int queue_max_num) {
struct threadpool *pool = NULL;
do
{
pool = malloc(sizeof(struct threadpool));
if (NULL == pool)
{
printf("failed to malloc threadpool!\n");
break;
}
pool->thread_num = thread_num;
pool->queue_max_num = queue_max_num;
pool->queue_cur_num = 0;
pool->head = NULL;
pool->tail = NULL;
if (pthread_mutex_init(&(pool->mutex), NULL))
{
printf("failed to init mutex!\n");
break;
}
if (pthread_cond_init(&(pool->queue_empty), NULL))
{
printf("failed to init queue_empty!\n");
break;
}
if (pthread_cond_init(&(pool->queue_not_empty), NULL))
{
printf("failed to init queue_not_empty!\n");
break;
}
if (pthread_cond_init(&(pool->queue_not_full), NULL))
{
printf("failed to init queue_not_full!\n");
break;
}
pool->pthreads = malloc(sizeof(pthread_t) * thread_num);
if (NULL == pool->pthreads)
{
printf("failed to malloc pthreads!\n");
break;
}
pool->queue_close = 0;
pool->pool_close = 0;
int i;
for (i = 0; i < pool->thread_num; ++i)
{
pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool);
} return pool;
} while (0); return NULL;
}
int threadpool_add_job(struct threadpool* pool, void* (*callback_function)(void *arg), void *arg) {
if(pool == NULL || callback_function == NULL || arg == NULL) return -1; pthread_mutex_lock(&(pool->mutex));
while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close))
{
pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); //队列满的时候就等待
}
if (pool->queue_close || pool->pool_close) //队列关闭或者线程池关闭就退出
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
struct job *pjob =(struct job*) malloc(sizeof(struct job));
if (NULL == pjob)
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
}
pjob->callback_function = callback_function;
pjob->arg = arg;
pjob->next = NULL;
if (pool->head == NULL)
{
pool->head = pool->tail = pjob;
pthread_cond_broadcast(&(pool->queue_not_empty)); //队列空的时候,有任务来时就通知线程池中的线程:队列非空
}
else
{
pool->tail->next = pjob;
pool->tail = pjob;
}
pool->queue_cur_num++;
pthread_mutex_unlock(&(pool->mutex));
return 0;
} void* threadpool_function(void* arg) {
struct threadpool *pool = (struct threadpool*)arg;
struct job *pjob = NULL;
while (1) //死循环
{
pthread_mutex_lock(&(pool->mutex));
while ((pool->queue_cur_num == 0) && !pool->pool_close) //队列为空时,就等待队列非空
{
pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
}
if (pool->pool_close) //线程池关闭,线程就退出
{
pthread_mutex_unlock(&(pool->mutex));
pthread_exit(NULL);
}
pool->queue_cur_num--;
pjob = pool->head;
if (pool->queue_cur_num == 0)
{
pool->head = pool->tail = NULL;
}
else
{
pool->head = pjob->next;
}
if (pool->queue_cur_num == 0)
{
pthread_cond_signal(&(pool->queue_empty)); //队列为空,就能够通知threadpool_destroy函数,销毁线程函数
}
if (pool->queue_cur_num == pool->queue_max_num - 1)
{
pthread_cond_broadcast(&(pool->queue_not_full)); //队列非满。就能够通知threadpool_add_job函数,加入新任务
}
pthread_mutex_unlock(&(pool->mutex)); (*(pjob->callback_function))(pjob->arg); //线程真正要做的工作,回调函数的调用
free(pjob);
pjob = NULL;
}
}
int threadpool_destroy(struct threadpool *pool) {
if(pool == NULL) return -1;
pthread_mutex_lock(&(pool->mutex));
if (pool->queue_close || pool->pool_close) //线程池已经退出了,就直接返回
{
pthread_mutex_unlock(&(pool->mutex));
return -1;
} pool->queue_close = 1; //置队列关闭标志
while (pool->queue_cur_num != 0)
{
pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); //等待队列为空
} pool->pool_close = 1; //置线程池关闭标志
pthread_mutex_unlock(&(pool->mutex));
pthread_cond_broadcast(&(pool->queue_not_empty)); //唤醒线程池中正在堵塞的线程
pthread_cond_broadcast(&(pool->queue_not_full)); //唤醒加入任务的threadpool_add_job函数
int i;
for (i = 0; i < pool->thread_num; ++i)
{
pthread_join(pool->pthreads[i], NULL); //等待线程池的全部线程运行完成
} pthread_mutex_destroy(&(pool->mutex)); //清理资源
pthread_cond_destroy(&(pool->queue_empty));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
free(pool->pthreads);
struct job *p;
while (pool->head != NULL)
{
p = pool->head;
pool->head = p->next;
free(p);
}
free(pool);
return 0;
}

3.进入client

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

client.c

/*************************************************************************
> File Name: test.c
> Author: wangzhicheng
> Mail: 2363702560@qq.com
> Created Time: Fri 03 Oct 2014 09:43:59 PM WST
************************************************************************/ #include "socket.h"
const char * serveraddr = "127.0.0.1";
#define TCPPORT 4001
int main() {
int clientfd = -1;
char buf[256];
strcpy(buf, "1");
if(TCPClientInit(&clientfd) < 0) {
perror("client init failed...!\n");
exit(EXIT_FAILURE);
}
if(TCPClientConnect(clientfd, serveraddr, TCPPORT)) {
perror("can not connect to server...!\n");
exit(EXIT_FAILURE);
}
if(TCPWrite(clientfd, buf, strlen(buf) == 1)) {
printf("send successfully...!\n");
}
else printf("send failed...!\n"); return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=client.o socket.o all: client client: $(OBJS1)
$(CC) -o $@ $(OBJS1) socket.o: $(LIBRARY)/socket.c
$(CC) -c $(LIBRARY)/socket.c clean:
rm *.o client > /dev/null 2>&1

4. 进入server

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd2FuZ3poaWNoZW5nMTk4Mw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

server.c

/*************************************************************************
> File Name: server.c
> Author: ma6174
> Mail: ma6174@163.com
> Created Time: Sat 04 Oct 2014 09:46:30 PM WST
************************************************************************/ #include "socket.h"
#include "threadpool.h" #define TCPPORT 4001
#define SIZE 256
#define N 10
int array[N] = {1, 2, 6, 8, 12, 88, 208, 222, 688, 1018};
int find(int low, int high, int m) {
int mid;
if(low <= high) {
mid = (low + high) >> 1;
if(array[mid] == m) return 1;
else if(array[mid] > m) return find(low, mid - 1, m);
else return find(mid + 1, high, m);
}
return 0;
}
void* work(void* arg)
{
int *p = (int *) arg;
int m = *p;
if(find(0, N - 1, m)) printf("%d has been found...!\n", m);
else printf("%d has not been found...!\n", m);
sleep(1);
}
int main() {
int serverfd = -1, clientfd = -1;
char clientaddr[SIZE];
char buf[SIZE];
int num;
struct threadpool *pool = NULL;
TCPServerInit(TCPPORT, &serverfd);
if(serverfd < 0) {
perror("server init failed...!\n");
exit(EXIT_FAILURE);
}
pool = threadpool_init(10, 20);
while(1) {
TCPServerWaitConnection(serverfd, &clientfd, clientaddr);
if(clientfd < 0) {
perror("can not connect the clients...!\n");
exit(EXIT_FAILURE);
}
if(TCPBlockRead(clientfd, buf, SIZE) <= 0) {
perror("can not read from client...!\n");
sleep(1);
}
else {
num = atoi(buf);
threadpool_add_job(pool, work, &num);
}
}
threadpool_destroy(pool); return 0;
}

Makefile

CC=gcc
LIBRARY=../lib
CFLAGS=-I$(LIBRARY)
CXXFLAGS=
OBJS1=server.o socket.o threadpool.o all: server server: $(OBJS1)
$(CC) -o $@ $(OBJS1) -lpthread socket.o: $(LIBRARY)/socket.c
$(CC) -c $(LIBRARY)/socket.c threadpool.o: $(LIBRARY)/threadpool.c
$(CC) -c $(LIBRARY)/threadpool.c
clean:
rm *.o client > /dev/null 2>&1

三、測试

四、有关线程池的说明

当线程池被创建时,线程池中有些“空”的线程。即不运行任务,每当一个任务被增加进来时,任务就被组织成任务队列,线程依照队列队头出。队尾进的原则取出头任务运行。

任务队列中所含任务数必须控制在一个上限内。超过上限时。任务被堵塞。当全部任务被运行完,销毁线程池。

最新文章

  1. Python 黑帽编程大纲(变化中)
  2. php环境的搭建
  3. CentOs中yum安装LAMP+PHPMYADMIN
  4. Core Data 使用映射模型
  5. net 调用https接口
  6. js window对象
  7. 定做属于自己的Lodop安装程序
  8. mysql备份与还原
  9. .Net的DataGrid的使用
  10. dojo新建widget步骤----主要针对widget路径
  11. 收集些日本的VPS
  12. 删除 GPT 保护分区
  13. rsyslogd配置文件详解
  14. 利用Crowbar抓取网页异步加载的内容 [Python俱乐部]
  15. Android——用户登陆及用户名和密码的保存
  16. listview 拖动item效果实现
  17. Oracle历史记录
  18. node.js-v6新版安装过程
  19. 2018-2019-2 网络对抗技术 20162329 Exp5 MSF基础应用
  20. [CTCI] 最小调整有序

热门文章

  1. Java&amp;Xml教程(四)使用DOM方式生成XML文件
  2. ionic2\ionic3 自定义弹窗
  3. Android基础TOP3:Activity的线性,相对,帧和表格布局的概括
  4. UltraEdit(UE)window破解方法
  5. java网络
  6. cocos自动图集
  7. 调用CAD内的颜色选择对话框
  8. /etc目录常用配置文件
  9. Lua中返回值的丢失问题
  10. SSH技术介绍和Xshell公钥远程登陆