网络编程
TCP编程用到的函数
-
获取网络地址(ip和端口)
-
保存地址的结构体
-
struct sockaddr_in{ sa_family_t sin_family; // 一般填AF_INET in_port_t sin_port; // 16bit 网络字节序 struct in_addr sin_addr; // ip地址,结构体内存的是大端存储的32bit }
-
struct sockaddr_in6
-
struct sockaddr
-
-
使用DNS服务获取网站信息
-
函数 | 函数作用 |
---|---|
htons | 无符号short型整数,主机字节序转网络字节序(h:主机,n:网络,s:short) |
htonl | 无符号long型整数,主机字节序转网络字节序 |
ntohs | 无符号short型整数,网络字节序转主机字节序 |
ntohl | 无符号long型整数,网络字节序转主机字节序 |
inet_aton | 点分十进制字符串转二进制(a:ascall字符串,n:二进制) |
inet_addr | 点分十进制字符串转二进制 |
inet_ntoa | 二进制转点分十进制字符串 |
gethostbyname | 根据域名返回该服务器信息(ip,别名,官方名) |
- TCP编程
- 客户端
- 服务端
函数 | 作用 |
---|---|
socket | 创建通信socket文件对象并返回文件描述符 |
bind | 将服务端ip和端口号绑定到网络套接字上 |
listen | 将套接字转为被动接收状态,准备接收连接请求(将通信socket改为监听socket) |
connect | 客户端发起TCP请求(TCP第一次握手和第三次握手) |
accept | 从全连接队列中取出一个已完成的连接并返回文件描述符 |
send | 发送数据(将用户区buf中的内容拷贝到套接字的发送缓冲中) |
recv | 接收数据(将套接字接收缓冲中的内容拷贝到用户区的buf中) |
close | 关闭连接 |
setsockopt | 是服务端地址可以复用,不会出现在服务端第一次断开后,第二次打开服务端显示地址占用 |
select | 可以用来实现服务端和客户端实时通信;用来监听监听socket和通信socket |
示例:
服务端
// 实现聊天室服务端,实现私聊
#include <my_header.h>
int main(int argc, char* argv[])
{
// ./3_server 192.168.157.128 2345
ARGS_CHECK(argc,3);
int sockfd = socket(AF_INET,SOCK_STREAM,0);
struct sockaddr_in in;
memset(&in,0,sizeof(in));
in.sin_family = AF_INET;
in.sin_port = htons(atoi(argv[2]));
in.sin_addr.s_addr =inet_addr(argv[1]);
int flag =1; // 允许复用本地址
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
int ret = bind(sockfd,(struct sockaddr *)&in,sizeof(in));
ERROR_CHECK(ret,-1,"bind");
ret = listen(sockfd,50);
ERROR_CHECK(ret,-1,"listen");
fd_set readyset;
fd_set monitorset;
FD_ZERO(&monitorset);
FD_SET(sockfd,&monitorset);
int netfd[1024]; // 记录客户端文件描述符
int target_client[1024]; // 记录对话目标客户端,-1为广播
int p_next_client=0; // 记录下一个client该存的位置
for(int i =0;i<1024;++i){
netfd[i]=-1;
}
for(int i =0;i<1024;++i){
target_client[i]=-1;
}
char buf[1024];
printf("server is waiting to be connected. \n");
int num_connect = 0; // 记录已连接个数
while(1){
memcpy(&readyset,&monitorset,sizeof(readyset));
select(1024,&readyset,NULL,NULL,NULL);
if(FD_ISSET(sockfd,&readyset)){ // 如果有新连接到来
netfd[p_next_client] = accept(sockfd,NULL,NULL);
printf("Client %d connect, netfd= %d\n",p_next_client,netfd[p_next_client]);
FD_SET(netfd[p_next_client],&monitorset);
++p_next_client;
++num_connect;
// 用户上线广播
for(int j=0;j<p_next_client;++j){
char message[1024]={0};
sprintf(message,"server: Client-%d go online.",p_next_client-1);
send(netfd[j],message,strlen(message),0);
}
continue;
}
for(int i =0;i<p_next_client;++i){
if(FD_ISSET(netfd[i],&readyset)){
bzero(buf,sizeof(buf));
ssize_t retrecv = recv(netfd[i],buf,sizeof(buf),0);
if(0==retrecv){
printf("client %d disconnect.\n",i);
FD_CLR(netfd[i],&monitorset);
close(netfd[i]);
netfd[i]=-1;
--num_connect;
// 用户下线广播
for(int j=0;j<p_next_client;++j){
if(j!=i){
char message[1024]={0};
sprintf(message,"server: Client-%d go offline.",i);
send(netfd[j],message,strlen(message),0);
}
}
if(0==num_connect){
printf("server is waiting to be connected again. \n");
}
continue;
}
// >:2 client要和client2私聊
if(buf[0]=='>'){
if(buf[1]==':'){
int target_idx = atoi(buf+2);
if((target_idx<-1) || (target_idx==0 && buf[2]!='0') || netfd[target_idx]==-1){
send(netfd[i],"server: no have the client.\n",28,0);
continue;
}
if(target_idx==i){
send(netfd[i],"server: it\'s yourself.\n",24,0);
continue;
}
target_client[i]=target_idx;
}
}
// 如果target_client[i]的值为-1则广播,否则私聊
if(target_client[i]!=-1){
char message[1024]={0};
sprintf(message,"Client-%d private: ",i);
strncat(message,buf,1024);
send(netfd[target_client[i]],message,strlen(message),0);
continue;
}
for(int j=0;j<p_next_client;++j){
if(j!=i){
char message[1024]={0};
sprintf(message,"Client-%d: ",i);
strncat(message,buf,1024);
send(netfd[j],message,strlen(message),0);
}
}
}
}
}
return 0;
}
客户端
#include <my_header.h>
int main(int argc, char* argv[])
{
// ./3_server 192.168.157.128 2345
ARGS_CHEEK(argc,3);
int sockfd = socket(AF_INET,SOCK_STREAM,0);
struct sockaddr_in in;
memset(&in,0,sizeof(in));
in.sin_family = AF_INET;
in.sin_port = htons(atoi(argv[2]));
in.sin_addr.s_addr =inet_addr(argv[1]);
int ret = connect(sockfd,(struct sockaddr *)&in,sizeof(in));
ERROR_CHEEK(ret,-1,"connect failure.");
fd_set readyset;
fd_set monitorset;
FD_ZERO(&monitorset);
FD_SET(STDIN_FILENO,&monitorset);
FD_SET(sockfd,&monitorset);
char buf[1024];
while(1){
memcpy(&readyset,&monitorset,sizeof(readyset));
select(sockfd+1,&readyset,NULL,NULL,NULL);
if(FD_ISSET(STDIN_FILENO,&readyset)){
bzero(buf,sizeof(buf));
ssize_t retread = read(STDIN_FILENO,buf,sizeof(buf));
if(0==retread){
printf("client end.\n");
break;
}
send(sockfd,buf,strlen(buf)-1,0);
}
if(FD_ISSET(sockfd,&readyset)){
bzero(buf,sizeof(buf));
ssize_t retrecv = recv(sockfd,buf,sizeof(buf),0);
if(0==retrecv){
printf("client end.\n");
break;
}
/* printf("retrecv=%ld,buf = %s\n",retrecv,buf); */
printf("%s\n",buf);
}
}
close(sockfd);
return 0;
}
示例结果
sendmsg recvmsg实现父子进程间传递文件对象
#include <66func.h>
int sendfd(int sockfd, int fdtosend){
struct msghdr hdr; //准备好一个
bzero(&hdr,sizeof(hdr));
//准备正文
char buf[] = "hello";
struct iovec vec[1];//离散的区域只有一个碎片
vec[0].iov_base = buf; //碎片首地址
vec[0].iov_len = 5;//碎片长度
hdr.msg_iov = vec;//将离散区域和hdr扯上关系
hdr.msg_iovlen = 1;
//准备控制字段
struct cmsghdr *cmsg = (struct cmsghdr *)malloc( CMSG_LEN(sizeof(int)) );
cmsg->cmsg_len = CMSG_LEN(sizeof(int)); //已知data长4,求整个结构体长度
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; // 这个选项说明要发送的数据结构是文件对象
*(int *)CMSG_DATA(cmsg) = fdtosend; // 找到data首地址,强转成int *,再解引用,再赋值
hdr.msg_control = cmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
sendmsg(sockfd,&hdr,0);
return 0;
}
int recvfd(int sockfd, int *pfdtorecv){
struct msghdr hdr; //准备好一个
bzero(&hdr,sizeof(hdr));
//准备正文
char buf[6] = {0};
struct iovec vec[1];//离散的区域只有一个碎片
vec[0].iov_base = buf; //碎片首地址
vec[0].iov_len = 5;//碎片长度
hdr.msg_iov = vec;//将离散区域和hdr扯上关系
hdr.msg_iovlen = 1;
//准备控制字段
struct cmsghdr *cmsg = (struct cmsghdr *)malloc( CMSG_LEN(sizeof(int)) );
cmsg->cmsg_len = CMSG_LEN(sizeof(int)); //已知data长4,求整个结构体长度
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; // 这个选项说明要发送的数据结构是文件对象
hdr.msg_control = cmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
recvmsg(sockfd,&hdr,0);
printf("buf = %s, fdtorecv = %d\n", buf, *(int *)CMSG_DATA(cmsg));
*pfdtorecv = *(int *)CMSG_DATA(cmsg);
return 0;
}
int main(int argc, char *argv[])
{
int fds[2];
socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
if(fork()){
//父
close(fds[0]);
int fd = open("file1",O_RDWR);
printf("parent fd = %d\n", fd);
write(fd,"hello",5);
sendfd(fds[1],fd);
wait(NULL);
}
else{
//子
close(fds[1]);
int fd;
recvfd(fds[0],&fd);
printf("child fd = %d\n", fd);
write(fd,"world",5);
}
return 0;
}
sendmsg 函数使用流程 构建 msghdr 结构体: msg_name与msg_namelen:设为NULL和0。 msg_iov和msg_iovlen:这是iovec数组,用于存放分散的内存缓冲区。 msg_control和msg_controllen:用来存放文件描述符。msg_flags:无用,设为0。 设置 iovec 数组: 把需要发送的数据存放在iovec数组里。 设置control数据: 发送文件描述符,就需要构建 cmsghdr 结构体。
调用 sendmsg 函数: recvmsg 函数使用流程 构建 msghdr 结构体: msg_name和msg_namelen:设为NULL和0。 msg_iov和msg_iovlen:这是iovec数组,用于存放接收数据的缓冲区。 msg_control和msg_controllen:用于接收文件描述符和文件对象。 msg_flags:无用,设为0。 设置 iovec 数组: 提前分配好接收数据的缓冲区。 设置control缓冲区: 要确保缓冲区足够大,能够容纳可能接收到的辅助数据。
进程池
目录结构

客户端
client.c
#include "head.h"
// v 1.0
typedef struct train_s{
int len;
char buf[1000];
}train_t;
int main(int argc, char* argv[])
{
//./client 192.168.157.128 2345
ARGS_CHECK(argc,3);
int sockfd = socket(AF_INET,SOCK_STREAM,0);
struct sockaddr_in addr;
bzero(&addr,sizeof(addr));
addr.sin_family=AF_INET;
addr.sin_port=htons(atoi(argv[2]));
addr.sin_addr.s_addr=inet_addr(argv[1]);
int ret = connect(sockfd,(struct sockaddr*)&addr,sizeof(addr));
ERROR_CHECK(ret,-1,"connect");
/* sleep(5); */
recvFile(sockfd);
/* while(1){ */
/* sleep(1); */
/* } */
return 0;
}
head.h
#ifndef HEAD_H
#define HEAD_H
#include <my_header.h>
// 工人状态
enum{
FREE,
BUSY
};
typedef struct workerdata_s{
pid_t pid;
int statu;//工人状态
int sockfd;// 工人进程与主人进程间的管道
}workerdata_t;
int makeWorker(int num,workerdata_t* workerArr);
int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int sendfd(int sockfd, int fdtosend);
int recvfd(int sockfd, int *pfdtorecv);
int transFile(const char* filename,int netfd);
int recvFile(int sockfd);
#endif
recvfile.c
#include "head.h"
typedef struct train_s{
int len;
char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
// recv filename
train_t train;
bzero(&train,sizeof(train));
int ret = recv(sockfd,&train.len,sizeof(train.len),0);
printf("recv filename len =%d\n",train.len);
ret = recv(sockfd,train.buf,train.len,0);
char filename[1024];
bzero(filename,sizeof(filename));
memcpy(filename,train.buf,train.len);
int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
ERROR_CHECK(fd,-1,"open");
printf("recv filename is %s\n",filename);
// recv fileSize
off_t fileSize;
ret = recv(sockfd,&fileSize,sizeof(off_t),0);
ERROR_CHECK(ret,-1,"recv 1");
printf("fileSize is %ld\n",fileSize);
// recv file content
ret = truncate(filename,fileSize);
ERROR_CHECK(ret,-1,"truncate");
char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
ERROR_CHECK(ret,-1,"recv 2");
munmap(pfile,fileSize);
return 0;
}
makefile
client:client.o recvfile.o
gcc client.o recvfile.o -o client -g -Wall
client.o:client.c
gcc -c client.c -o client.o -g -Wall
recvfile.o:recvfile.c
gcc -c recvfile.c -o recvfile.o -g -Wall
服务端
main.c
#include "head.h"
int exitFds[2];
void handler(int num){
printf("signal = %d!\n",num);
int ret = write(exitFds[1],"1",1);
ERROR_CHECK(ret,-1,"exit write");
printf("ret = %d\n",ret);
}
int main(int argc, char* argv[])
{
// ./server 192.168.157.128 2345 5
ARGS_CHECK(argc,4);
// 创建worker进程
int workerNum = 5;
workerdata_t* workerArr=(workerdata_t*)calloc(workerNum,sizeof(workerdata_t));
makeWorker(workerNum,workerArr);
// 设置tcp
int sockfd = tcpInit(argv[1],argv[2]);
// 设置epoll
int epfd = epoll_create(1);
epollAdd(epfd,sockfd);
for(int i=0;i<workerNum;++i){
epollAdd(epfd,workerArr[i].sockfd);
}
struct epoll_event readySet[1024];
// 优雅地退出
int retPipe = pipe(exitFds);
ERROR_CHECK(retPipe,-1,"pipe");
printf("exitFds[0]=%d,exitFds[1]=%d\n",exitFds[0],exitFds[1]);
signal(SIGUSR1,handler);
epollAdd(epfd,exitFds[0]);
while(1){
// 只会收到连接请求和worker信息
int readyNum = epoll_wait(epfd,readySet,1024,-1);
for(int i=0;i<readyNum;++i){
if(readySet[i].data.fd==exitFds[0]){
// 挨个通知worker进程退出
printf("exit begin.\n");
for(int j=0;j<workerNum;++j){
sendfd(workerArr[j].sockfd,9,1);
}
for(int j=0;j<workerNum;++j){
wait(NULL);
}
printf("all worker ended.\n");
exit(0);
}
else if(readySet[i].data.fd == sockfd){
int netfd = accept(sockfd,NULL,NULL);
printf("a new connect netfd=%d\n",netfd);
// 找一个空闲的worker进程
for(int j=0;j<workerNum;++j){
if(workerArr[j].statu==FREE){
sendfd(workerArr[j].sockfd,netfd,0);
workerArr[j].statu=BUSY;
printf("send netfd=%d to worker%d\n",netfd,j);
break;
}
}
// worker 那里还有一个文件描述符指向netfd的文件对象
close(netfd);
}
else{
pid_t pid;
int workerfd = readySet[i].data.fd;
for(int j=0;j<workerNum;++j){
if(workerArr[j].sockfd==workerfd){
workerArr[j].statu=FREE;
read(workerfd,&pid,sizeof(pid));
printf("worker%d pid=%d free again\n",j,pid);
break;
}
}
}
}
}
return 0;
}
head.h
#ifndef HEAD_H
#define HEAD_H
#include <my_header.h>
// 工人状态
enum{
FREE,
BUSY
};
typedef struct workerdata_s{
pid_t pid;
int statu;//工人状态
int sockfd;// 工人进程与主人进程间的管道
}workerdata_t;
int makeWorker(int num,workerdata_t* workerArr);
int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int recvfd(int sockfd, int *pfdtorecv,int* pexitFlag);
int sendfd(int sockfd, int fdtosend,int exitFlag);
int transFile(const char* filename,int netfd);
int recvFile(int sockfd);
#endif
worker.c
#include "head.h"
// v 1.0
typedef struct train_s{
int len;
char buf[1000];
}train_t;
int makeWorker(int num,workerdata_t* workerArr){
for(int i=0;i<num;++i){
// 每个工人进程创建一个socket管道
int fds[2];
socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
pid_t pid = fork();
if(pid == 0){
close(fds[0]);
int netfd;
while(1){
int exitFlag=0;
recvfd(fds[1],&netfd,&exitFlag);
if(exitFlag==1){
printf("worker going to end.\n");
exit(0);
}
printf("worker recv netfd=%d\n",netfd);
// 业务代码,传输文件 file1
char path[]="file1";
int fd = open(path,O_RDONLY);
ERROR_CHECK(fd,-1,"worker open");
transFile(path,netfd);
// 结束任务
printf("work over.\n");
pid = getpid();
write(fds[1],&pid,sizeof(pid));
close(netfd);
}
}
close(fds[1]);
workerArr[i].sockfd = fds[0];
workerArr[i].pid = pid;
workerArr[i].statu = FREE;
printf("worker sockfd = %d\n",fds[0]);
}
return 0;
}
epoll.c
#include "head.h"
int epollAdd(int epfd,int fd){
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
return 0;
}
int epollDel(int epfd,int fd){
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
return 0;
}
tcpinit.c
#include "head.h"
int tcpInit(const char* ip,const char* port){
struct sockaddr_in addr;
bzero(&addr,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
addr.sin_port = htons(atoi(port));
int sockfd = socket(AF_INET,SOCK_STREAM,0);
int flag =1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
int ret = bind(sockfd,(struct sockaddr*)&addr,sizeof(addr));
ERROR_CHECK(ret ,-1,"bind");
listen(sockfd,50);
return sockfd;
}
sendmsg_recvmsg.c
#include "head.h"
int sendfd(int sockfd, int fdtosend,int exitFlag){
struct msghdr hdr;
bzero(&hdr,sizeof(hdr));
//正文
/* char buf[] = "hello"; */
/* struct iovec vec[1]; */
/* vec[0].iov_base = buf; */
/* vec[0].iov_len = 5; */
/* hdr.msg_iov = vec; */
/* hdr.msg_iovlen = 1; */
struct iovec vec[1];
vec[0].iov_base = &exitFlag;
vec[0].iov_len = sizeof(int);
hdr.msg_iov = vec;
hdr.msg_iovlen = 1;
//控制信息
struct cmsghdr *cmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;// 发送文件对象
*(int *)CMSG_DATA(cmsg) = fdtosend;// fdtosend 赋值到data首地址
hdr.msg_control = cmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
sendmsg(sockfd,&hdr,0);
return 0;
}
int recvfd(int sockfd, int *pfdtorecv,int* pexitFlag){
struct msghdr hdr;
bzero(&hdr,sizeof(hdr));
//正文
/* char buf[6] = {0}; */
/* struct iovec vec[1]; */
/* vec[0].iov_base = buf; */
/* vec[0].iov_len = 5; */
/* hdr.msg_iov = vec; */
/* hdr.msg_iovlen = 1; */
struct iovec vec[1];
vec[0].iov_base = pexitFlag;
vec[0].iov_len = sizeof(int);
hdr.msg_iov = vec;
hdr.msg_iovlen = 1;
//控制字段
struct cmsghdr *cmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS; // 发送文件对象
hdr.msg_control = cmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
recvmsg(sockfd,&hdr,0);
/* printf("buf=%s,fdtorecv=%d\n",buf,*(int *)CMSG_DATA(cmsg)); */
*pfdtorecv = *(int*)CMSG_DATA(cmsg);
return 0;
}
transfile.c
#include "head.h"
typedef struct train_s{
int len;
char buf[1000];
}train_t;
// v 1.0
/* int transFile(const char* filename,int netfd){ */
/* // send filename */
/* train_t train; */
/* train.len = strlen(filename); */
/* memcpy(train.buf,filename,train.len); */
/* /1* printf("send filename before\n"); *1/ */
/* int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL); */
/* ERROR_CHECK(ret,-1,"send"); */
/* /1* printf("send filename after.\n"); *1/ */
/* int fd = open("file1",O_RDONLY); */
/* /1* struct stat stat; *1/ */
/* /1* fstat(fd,&stat); *1/ */
/* char buf[1024]={0}; */
/* while(ret!=0){ */
/* bzero(buf,sizeof(buf)); */
/* int retRead = read(fd,buf,1000); */
/* ret = send(netfd,buf,retRead,MSG_NOSIGNAL); */
/* if(ret == -1){ */
/* break; */
/* } */
/* } */
/* return 0; */
/* } */
// v 2.0 mmap
int transFile(const char* filename,int netfd){
// send filename
train_t train;
train.len = strlen(filename);
memcpy(train.buf,filename,train.len);
int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL);
ERROR_CHECK(ret,-1,"send");
int fd = open("file1",O_RDWR);
struct stat stat;
fstat(fd,&stat);
off_t fileSize = stat.st_size;
// send fileSize
ret = send(netfd,&fileSize,sizeof(off_t),0);
ERROR_CHECK(ret,-1,"send3");
// send file content
ret = truncate(filename,fileSize);
ERROR_CHECK(ret,-1,"truncate");
char* filep =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
ret = send(netfd,filep,fileSize,0);
ERROR_CHECK(ret,-1,"trainfile send2");
munmap(filep,fileSize);
return 0;
}
makefile
server: main.o worker.o tcpinit.o epoll.o sendmsg_recvmsg.o transfile.o
gcc main.o worker.o tcpinit.o epoll.o sendmsg_recvmsg.o transfile.o -o server -g -Wall
main.o: main.c
gcc -c main.c -o main.o -g -Wall
worker.o: worker.c
gcc -c worker.c -o worker.o -g -Wall
tcpinit.o: tcpinit.c
gcc -c tcpinit.c -o tcpinit.o -g -Wall
epoll.o: epoll.c
gcc -c epoll.c -o epoll.o -g -Wall
sendmsg_recvmsg.o: sendmsg_recvmsg.c
gcc -c sendmsg_recvmsg.c -o sendmsg_recvmsg.o -g -Wall
transfile.o:transfile.c
gcc -c transfile.c -o transfile.o -g -Wall
test:test.c
gcc test.c -o test -g -Wall
结果

线程池
目录结构

服务端
main.c
#include "pthreadPool.h"
int pipeFds[2];
void handler(int num){
close(pipeFds[0]);
printf("waitting to end.\n");
int ret = write(pipeFds[1],"1",1);
ERROR_CHECK(ret,-1,"handler write");
}
int main(int argc, char* argv[])
{
// ./server 192.168.157.128 12345 5
ARGS_CHECK(argc,4);
// 父子进程间通信
pipe(pipeFds);
if(fork()){
signal(SIGUSR1,handler);
wait(NULL);
printf("server end.\n");
exit(0);
}else{
// create pthread pool
int workerNum = atoi(argv[3]);
pthreadPool_t pthreadPool;
pthreadPoolInit(&pthreadPool,workerNum);
workerCreate(&pthreadPool,workerNum);
// create tcp
int sockfd = tcpInit(argv[1],argv[2]);
// epoll_wait
int epfd = epoll_create(1);
epollAdd(epfd,sockfd);
close(pipeFds[1]);
epollAdd(epfd,pipeFds[0]);
struct epoll_event readySet[1024];
bzero(readySet,sizeof(readySet));
while(1){
int readyNum = epoll_wait(epfd,readySet,1024,-1);
for(int i=0;i<readyNum;++i){
if(readySet[i].data.fd == sockfd){
printf("a client connect\n");
int netfd = accept(sockfd,NULL,NULL);
pthread_mutex_lock(&pthreadPool.mutex);
enQueue(&pthreadPool.netfdQueue,netfd);
pthread_cond_broadcast(&pthreadPool.cond);
pthread_mutex_unlock(&pthreadPool.mutex);
printf("add netfd\n");
}else if(readySet[i].data.fd == pipeFds[0]){
printf("child end begin.\n");
pthread_mutex_lock(&pthreadPool.mutex);
pthreadPool.endFlag=1;
pthread_cond_broadcast(&pthreadPool.cond);
pthread_mutex_unlock(&pthreadPool.mutex);
for(int j =0;j<workerNum;++j){
pthread_join(pthreadPool.tidArr.TidArr[j],NULL);
printf("one worker pthread end.\n");
}
printf("child end.\n");
exit(0);
}
}
}
}
return 0;
}
worker.c
#include "pthreadPool.h"
void* workerFuc(void* arg){
pthreadPool_t* pthreadPool = (pthreadPool_t*)arg;
int netfd;
printf("worker create suceessfully.\n");
while(1){
pthread_mutex_lock(&pthreadPool->mutex);
while(pthreadPool->netfdQueue.size==0 && pthreadPool->endFlag==0){
pthread_cond_wait(&pthreadPool->cond,&pthreadPool->mutex);
}
if(pthreadPool->endFlag==1){
pthread_mutex_unlock(&pthreadPool->mutex);
pthread_exit(NULL);
}
netfd = pthreadPool->netfdQueue.head->netfd;
deQueue(&pthreadPool->netfdQueue);
pthread_mutex_unlock(&pthreadPool->mutex);
printf("worker pthread get netfd = %d\n",netfd);
char filename[]="file1";
transFile(filename,netfd);
close(netfd);
}
}
int workerCreate(pthreadPool_t* pthreadPool,int workerNum){
for(int i=0;i<workerNum;++i){
int ret = pthread_create(&pthreadPool->tidArr.TidArr[i],NULL,workerFuc,pthreadPool);
fprintf(stderr,"pthread_create: %s\n",strerror(ret));
}
return 0;
}
pthreadPool.h
#ifndef __PTHREAD_POOL__
#define __PTHREAD_POOL__
#include <my_header.h>
#include "tidArr.h"
#include "taskQueue.h"
typedef struct pthreadPool_s{
taskQueue_t netfdQueue;
tidArr_t tidArr;
pthread_mutex_t mutex;
pthread_cond_t cond;
int endFlag;
}pthreadPool_t;
int pthreadPoolInit(pthreadPool_t* pthreadPool,int workerNum);
int workerCreate(pthreadPool_t* pthreadPool,int workerNum);
int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int transFile(const char* filename,int netfd);
#endif
pthreadPool.c
#include "pthreadPool.h"
int pthreadPoolInit(pthreadPool_t* pthreadPool,int workerNum){
bzero(pthreadPool,sizeof(pthreadPool_t));
pthreadPool->tidArr.TidArr = (pthread_t*)calloc(workerNum,sizeof(pthread_t));
pthreadPool->tidArr.workerNum=workerNum;
pthread_mutex_init(&pthreadPool->mutex,NULL);
pthread_cond_init(&pthreadPool->cond,NULL);
printf("pthreadPool init over.\n");
return 0;
}
tidArr.h
#ifndef __TID_ARR__
#define __TID_ARR__
#include <my_header.h>
typedef struct tidArr_s{
int workerNum;// worker pthread num
// pthread_t workerTidArr[workerNum];
pthread_t* TidArr; // worker pthread's Tid arr
}tidArr_t;
int TidArrInit(tidArr_t* workerTidArr);
#endif
tidArr.c
#include "tidArr.h"
int TidArrInit(tidArr_t* workerTidArr){
bzero(workerTidArr,sizeof(tidArr_t));
printf("tidArr.c: tidArrInit\n");
return 0;
}
taskQueue.h
#ifndef __TASK_QUEUE__
#define __TASK_QUEUE__
#include <my_header.h>
typedef struct queueNode_s{
int netfd;
struct queueNode_s* next;
}queueNode_t;
typedef struct taskQueue_s{
queueNode_t* head;
queueNode_t* tail;
int size;
}taskQueue_t;
int taskQueueInit(taskQueue_t* queue);
int enQueue(taskQueue_t* queue,int netfd);
int deQueue(taskQueue_t* queue);
int printQueue(const taskQueue_t* queue);
#endif
taskQueue.c
#include "taskQueue.h"
int taskQueueInit(taskQueue_t* queue){
bzero(queue,sizeof(taskQueue_t));
return 0;
}
int enQueue(taskQueue_t* queue,int netfd){
queueNode_t* newNode = (queueNode_t*)calloc(1,sizeof(queueNode_t));
newNode->netfd=netfd;
if(queue->size==0){
queue->head=newNode;
queue->tail=newNode;
}else{
queue->tail->next=newNode;
queue->tail=newNode;
}
++queue->size;
return 0;
}
// 不处理queue为空时出队的情况
int deQueue(taskQueue_t* queue){
queueNode_t* tempNode = queue->head;
queue->head=queue->head->next;
--queue->size;
if(queue->size==0){
queue->tail=NULL;
}
free(tempNode);
return 0;
}
int printQueue(const taskQueue_t* queue){
queueNode_t* ptr=queue->head;
while(ptr!=NULL){
printf("%d->",ptr->netfd);
ptr=ptr->next;
}
printf("null\n");
return 0;
}
tcpinit.c
#include <my_header.h>
int tcpInit(const char* ip,const char* port){
struct sockaddr_in addr;
bzero(&addr,sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(ip);
addr.sin_port = htons(atoi(port));
int sockfd = socket(AF_INET,SOCK_STREAM,0);
int flag =1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
int ret = bind(sockfd,(struct sockaddr*)&addr,sizeof(addr));
ERROR_CHECK(ret ,-1,"bind");
listen(sockfd,50);
return sockfd;
}
epool.c
#include <my_header.h>
int epollAdd(int epfd,int fd){
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
return 0;
}
int epollDel(int epfd,int fd){
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
return 0;
}
transfile.c
#include <my_header.h>
typedef struct train_s{
int len;
char buf[1000];
}train_t;
// v 1.0
/* int transFile(const char* filename,int netfd){ */
/* // send filename */
/* train_t train; */
/* train.len = strlen(filename); */
/* memcpy(train.buf,filename,train.len); */
/* /1* printf("send filename before\n"); *1/ */
/* int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL); */
/* ERROR_CHECK(ret,-1,"send"); */
/* /1* printf("send filename after.\n"); *1/ */
/* int fd = open("file1",O_RDONLY); */
/* /1* struct stat stat; *1/ */
/* /1* fstat(fd,&stat); *1/ */
/* char buf[1024]={0}; */
/* while(ret!=0){ */
/* bzero(buf,sizeof(buf)); */
/* int retRead = read(fd,buf,1000); */
/* ret = send(netfd,buf,retRead,MSG_NOSIGNAL); */
/* if(ret == -1){ */
/* break; */
/* } */
/* } */
/* return 0; */
/* } */
// v 2.0 mmap
int transFile(const char* filename,int netfd){
// send filename
train_t train;
train.len = strlen(filename);
memcpy(train.buf,filename,train.len);
int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL);
ERROR_CHECK(ret,-1,"send");
int fd = open("file1",O_RDWR);
struct stat stat;
fstat(fd,&stat);
off_t fileSize = stat.st_size;
// send fileSize
ret = send(netfd,&fileSize,sizeof(off_t),0);
ERROR_CHECK(ret,-1,"send3");
// send file content
ret = truncate(filename,fileSize);
ERROR_CHECK(ret,-1,"truncate");
char* filep =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
ret = send(netfd,filep,fileSize,0);
ERROR_CHECK(ret,-1,"trainfile send2");
munmap(filep,fileSize);
return 0;
}
makefile
server:main.o pthreadPool.o taskQueue.o tidArr.o worker.o tcpinit.o epoll.o transfile.o
gcc main.o pthreadPool.o taskQueue.o worker.o tcpinit.o epoll.o transfile.o -o server
main.o:main.c
gcc -c main.c -o main.o -g -Wall
pthreadPool.o:pthreadPool.c
gcc -c pthreadPool.c -o pthreadPool.o -g -Wall
taskQueue.o:taskQueue.c
gcc -c taskQueue.c -o taskQueue.o -g -Wall
tidArr.o:tidArr.c
gcc -c tidArr.c -o tidArr.o -g -Wall
worker.o:worker.c
gcc -c worker.c -o worker.o -g -Wall
epoll.o:epoll.c
gcc -c epoll.c -o epoll.o -g -Wall
transfile.o:transfile.c
gcc -c transfile.c -o transfile.o -g -Wall
tcpinit.o:tcpinit.c
gcc -c tcpinit.c -o tcpinit.o -g -Wall
客户端
clinet.c
#include <my_header.h>
typedef struct train_s{
int len;
char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
// recv filename
train_t train;
bzero(&train,sizeof(train));
int ret = recv(sockfd,&train.len,sizeof(train.len),0);
printf("recv filename len =%d\n",train.len);
ret = recv(sockfd,train.buf,train.len,0);
char filename[1024];
bzero(filename,sizeof(filename));
memcpy(filename,train.buf,train.len);
int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
ERROR_CHECK(fd,-1,"open");
printf("recv filename is %s\n",filename);
// recv fileSize
off_t fileSize;
ret = recv(sockfd,&fileSize,sizeof(off_t),0);
ERROR_CHECK(ret,-1,"recv 1");
printf("fileSize is %ld\n",fileSize);
// recv file content
ret = truncate(filename,fileSize);
ERROR_CHECK(ret,-1,"truncate");
char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
ERROR_CHECK(ret,-1,"recv 2");
munmap(pfile,fileSize);
return 0;
}
int main(int argc, char* argv[])
{
//./client 192.168.157.128 2345
ARGS_CHECK(argc,3);
int sockfd = socket(AF_INET,SOCK_STREAM,0);
struct sockaddr_in addr;
bzero(&addr,sizeof(addr));
addr.sin_family=AF_INET;
addr.sin_port=htons(atoi(argv[2]));
addr.sin_addr.s_addr=inet_addr(argv[1]);
int ret = connect(sockfd,(struct sockaddr*)&addr,sizeof(addr));
ERROR_CHECK(ret,-1,"connect");
recvFile(sockfd);
return 0;
}
recvfile.c
#include <my_header.h>
typedef struct train_s{
int len;
char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
// recv filename
train_t train;
bzero(&train,sizeof(train));
int ret = recv(sockfd,&train.len,sizeof(train.len),0);
printf("recv filename len =%d\n",train.len);
ret = recv(sockfd,train.buf,train.len,0);
char filename[1024];
bzero(filename,sizeof(filename));
memcpy(filename,train.buf,train.len);
int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
ERROR_CHECK(fd,-1,"open");
printf("recv filename is %s\n",filename);
// recv fileSize
off_t fileSize;
ret = recv(sockfd,&fileSize,sizeof(off_t),0);
ERROR_CHECK(ret,-1,"recv 1");
printf("fileSize is %ld\n",fileSize);
// recv file content
ret = truncate(filename,fileSize);
ERROR_CHECK(ret,-1,"truncate");
char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
ERROR_CHECK(ret,-1,"recv 2");
munmap(pfile,fileSize);
return 0;
}
makefile
client:client.c
gcc client.c -o client -g -Wall
运行结果

评论