网络编程学习笔记

技术学习  ·  2025-05-28

网络编程

TCP编程用到的函数

  • 获取网络地址(ip和端口)

    • 保存地址的结构体

      • struct sockaddr_in{
            sa_family_t sin_family; // 一般填AF_INET
            in_port_t sin_port; // 16bit 网络字节序
            struct in_addr sin_addr; // ip地址,结构体内存的是大端存储的32bit
        }
        
      • struct sockaddr_in6

      • struct sockaddr

    • 使用DNS服务获取网站信息

函数 函数作用
htons 无符号short型整数,主机字节序转网络字节序(h:主机,n:网络,s:short)
htonl 无符号long型整数,主机字节序转网络字节序
ntohs 无符号short型整数,网络字节序转主机字节序
ntohl 无符号long型整数,网络字节序转主机字节序
inet_aton 点分十进制字符串转二进制(a:ascall字符串,n:二进制)
inet_addr 点分十进制字符串转二进制
inet_ntoa 二进制转点分十进制字符串
gethostbyname 根据域名返回该服务器信息(ip,别名,官方名)
  • TCP编程
    • 客户端
    • 服务端
函数 作用
socket 创建通信socket文件对象并返回文件描述符
bind 将服务端ip和端口号绑定到网络套接字上
listen 将套接字转为被动接收状态,准备接收连接请求(将通信socket改为监听socket)
connect 客户端发起TCP请求(TCP第一次握手和第三次握手)
accept 从全连接队列中取出一个已完成的连接并返回文件描述符
send 发送数据(将用户区buf中的内容拷贝到套接字的发送缓冲中)
recv 接收数据(将套接字接收缓冲中的内容拷贝到用户区的buf中)
close 关闭连接
setsockopt 是服务端地址可以复用,不会出现在服务端第一次断开后,第二次打开服务端显示地址占用
select 可以用来实现服务端和客户端实时通信;用来监听监听socket和通信socket

示例:

服务端

// 实现聊天室服务端,实现私聊
#include <my_header.h>

int main(int argc, char* argv[])
{
    // ./3_server 192.168.157.128 2345
    ARGS_CHECK(argc,3);
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in in;
    memset(&in,0,sizeof(in));
    in.sin_family = AF_INET;
    in.sin_port = htons(atoi(argv[2]));
    in.sin_addr.s_addr =inet_addr(argv[1]);
    int flag =1; // 允许复用本地址
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
    int ret = bind(sockfd,(struct sockaddr *)&in,sizeof(in));
    ERROR_CHECK(ret,-1,"bind");
    ret = listen(sockfd,50);
    ERROR_CHECK(ret,-1,"listen");
    fd_set readyset;
    fd_set monitorset;
    FD_ZERO(&monitorset);
    FD_SET(sockfd,&monitorset);
    int netfd[1024]; // 记录客户端文件描述符
    int target_client[1024]; // 记录对话目标客户端,-1为广播
    int p_next_client=0;   // 记录下一个client该存的位置
    for(int i =0;i<1024;++i){
        netfd[i]=-1;
    }
    for(int i =0;i<1024;++i){
        target_client[i]=-1;
    }
    char buf[1024];
    printf("server is waiting to be connected. \n");
    int num_connect = 0; // 记录已连接个数
    while(1){
        memcpy(&readyset,&monitorset,sizeof(readyset));
        select(1024,&readyset,NULL,NULL,NULL);
        if(FD_ISSET(sockfd,&readyset)){ // 如果有新连接到来
            netfd[p_next_client] = accept(sockfd,NULL,NULL);
            printf("Client %d connect, netfd= %d\n",p_next_client,netfd[p_next_client]); 
            FD_SET(netfd[p_next_client],&monitorset);
            ++p_next_client;
            ++num_connect;
            // 用户上线广播
            for(int j=0;j<p_next_client;++j){
                char message[1024]={0};
                sprintf(message,"server: Client-%d go online.",p_next_client-1);
                send(netfd[j],message,strlen(message),0);
            }
            continue;
        }
        for(int i =0;i<p_next_client;++i){
            if(FD_ISSET(netfd[i],&readyset)){
                bzero(buf,sizeof(buf));
                ssize_t retrecv = recv(netfd[i],buf,sizeof(buf),0);
                if(0==retrecv){
                    printf("client %d disconnect.\n",i);
                    FD_CLR(netfd[i],&monitorset);
                    close(netfd[i]);
                    netfd[i]=-1;
                    --num_connect;
                    // 用户下线广播
                    for(int j=0;j<p_next_client;++j){
                        if(j!=i){
                            char message[1024]={0};
                            sprintf(message,"server: Client-%d go offline.",i);
                            send(netfd[j],message,strlen(message),0);
                        }
                    }
                    if(0==num_connect){
                        printf("server is waiting to be connected again. \n");
                    }
                    continue;
                }
                // >:2 client要和client2私聊
                if(buf[0]=='>'){
                    if(buf[1]==':'){
                        int target_idx = atoi(buf+2);
                        if((target_idx<-1) || (target_idx==0 && buf[2]!='0') || netfd[target_idx]==-1){
                            send(netfd[i],"server: no have the client.\n",28,0);
                            continue;
                        }
                        if(target_idx==i){
                            send(netfd[i],"server: it\'s yourself.\n",24,0);
                            continue;
                        }
                        target_client[i]=target_idx;
                    }
                }
                // 如果target_client[i]的值为-1则广播,否则私聊
                if(target_client[i]!=-1){
                    char message[1024]={0};
                    sprintf(message,"Client-%d private: ",i);
                    strncat(message,buf,1024);
                    send(netfd[target_client[i]],message,strlen(message),0);
                    continue;
                }
                for(int j=0;j<p_next_client;++j){
                    if(j!=i){
                        char message[1024]={0};
                        sprintf(message,"Client-%d: ",i);
                        strncat(message,buf,1024);
                        send(netfd[j],message,strlen(message),0);
                    }
                }
            }
        }
    }
    return 0;
}

客户端

#include <my_header.h>

int main(int argc, char* argv[])
{
    // ./3_server 192.168.157.128 2345
    ARGS_CHEEK(argc,3);
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in in;
    memset(&in,0,sizeof(in));
    in.sin_family = AF_INET;
    in.sin_port = htons(atoi(argv[2]));
    in.sin_addr.s_addr =inet_addr(argv[1]);
    int ret = connect(sockfd,(struct sockaddr *)&in,sizeof(in));
    ERROR_CHEEK(ret,-1,"connect failure.");
    fd_set readyset;
    fd_set monitorset;
    FD_ZERO(&monitorset);
    FD_SET(STDIN_FILENO,&monitorset);
    FD_SET(sockfd,&monitorset);
    char buf[1024];
    while(1){
        memcpy(&readyset,&monitorset,sizeof(readyset));
        select(sockfd+1,&readyset,NULL,NULL,NULL);
        if(FD_ISSET(STDIN_FILENO,&readyset)){
            bzero(buf,sizeof(buf));
            ssize_t retread =  read(STDIN_FILENO,buf,sizeof(buf));
            if(0==retread){
                printf("client end.\n");
                break;
            }
            send(sockfd,buf,strlen(buf)-1,0);
        }
        if(FD_ISSET(sockfd,&readyset)){
            bzero(buf,sizeof(buf));
            ssize_t retrecv  = recv(sockfd,buf,sizeof(buf),0);
            if(0==retrecv){
                printf("client end.\n");
                break;
            }
            /* printf("retrecv=%ld,buf = %s\n",retrecv,buf); */
            printf("%s\n",buf);

        }
    }
    close(sockfd);
    return 0;
}

示例结果

Snipaste_2025-05-28_15-27-05

sendmsg recvmsg实现父子进程间传递文件对象

#include <66func.h>
int sendfd(int sockfd, int fdtosend){
    struct msghdr hdr; //准备好一个
    bzero(&hdr,sizeof(hdr));
    //准备正文
    char buf[] = "hello";
    struct iovec vec[1];//离散的区域只有一个碎片
    vec[0].iov_base = buf; //碎片首地址
    vec[0].iov_len = 5;//碎片长度
    hdr.msg_iov = vec;//将离散区域和hdr扯上关系
    hdr.msg_iovlen = 1;
    //准备控制字段
    struct cmsghdr *cmsg = (struct cmsghdr *)malloc( CMSG_LEN(sizeof(int)) );
    cmsg->cmsg_len = CMSG_LEN(sizeof(int)); //已知data长4,求整个结构体长度
    cmsg->cmsg_level = SOL_SOCKET;
    cmsg->cmsg_type = SCM_RIGHTS; // 这个选项说明要发送的数据结构是文件对象
    *(int *)CMSG_DATA(cmsg) = fdtosend; // 找到data首地址,强转成int *,再解引用,再赋值
    hdr.msg_control = cmsg;
    hdr.msg_controllen = CMSG_LEN(sizeof(int));
    sendmsg(sockfd,&hdr,0);
    return 0;
}
int recvfd(int sockfd, int *pfdtorecv){
    struct msghdr hdr; //准备好一个
    bzero(&hdr,sizeof(hdr));
    //准备正文
    char buf[6] = {0};
    struct iovec vec[1];//离散的区域只有一个碎片
    vec[0].iov_base = buf; //碎片首地址
    vec[0].iov_len = 5;//碎片长度
    hdr.msg_iov = vec;//将离散区域和hdr扯上关系
    hdr.msg_iovlen = 1;
    //准备控制字段
    struct cmsghdr *cmsg = (struct cmsghdr *)malloc( CMSG_LEN(sizeof(int)) );
    cmsg->cmsg_len = CMSG_LEN(sizeof(int)); //已知data长4,求整个结构体长度
    cmsg->cmsg_level = SOL_SOCKET;
    cmsg->cmsg_type = SCM_RIGHTS; // 这个选项说明要发送的数据结构是文件对象
    hdr.msg_control = cmsg;
    hdr.msg_controllen = CMSG_LEN(sizeof(int));
    recvmsg(sockfd,&hdr,0);
    printf("buf = %s, fdtorecv = %d\n", buf, *(int *)CMSG_DATA(cmsg));
    *pfdtorecv = *(int *)CMSG_DATA(cmsg);
    return 0;
}
int main(int argc, char *argv[])
{
    int fds[2];
    socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
    if(fork()){
        //父
        close(fds[0]);
        int fd = open("file1",O_RDWR);
        printf("parent fd = %d\n", fd);
        write(fd,"hello",5);
        sendfd(fds[1],fd);
        wait(NULL);
    }
    else{
        //子
        close(fds[1]);
        int fd;
        recvfd(fds[0],&fd);
        printf("child fd = %d\n", fd);
        write(fd,"world",5);
    }
    return 0;
}

sendmsg 函数使用流程 构建 msghdr 结构体: msg_name与msg_namelen:设为NULL和0。 msg_iov和msg_iovlen:这是iovec数组,用于存放分散的内存缓冲区。 msg_control和msg_controllen:用来存放文件描述符。msg_flags:无用,设为0。 设置 iovec 数组: 把需要发送的数据存放在iovec数组里。 设置control数据: 发送文件描述符,就需要构建 cmsghdr 结构体。

调用 sendmsg 函数: recvmsg 函数使用流程 构建 msghdr 结构体: msg_name和msg_namelen:设为NULL和0。 msg_iov和msg_iovlen:这是iovec数组,用于存放接收数据的缓冲区。 msg_control和msg_controllen:用于接收文件描述符和文件对象。 msg_flags:无用,设为0。 设置 iovec 数组: 提前分配好接收数据的缓冲区。 设置control缓冲区: 要确保缓冲区足够大,能够容纳可能接收到的辅助数据。

进程池

目录结构

image-20250604140848909

客户端

client.c

#include "head.h"
// v 1.0
typedef struct train_s{
    int len;
    char buf[1000];
}train_t;

int main(int argc, char* argv[])
{
    //./client 192.168.157.128 2345
    ARGS_CHECK(argc,3);
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in addr;
    bzero(&addr,sizeof(addr));
    addr.sin_family=AF_INET;
    addr.sin_port=htons(atoi(argv[2]));
    addr.sin_addr.s_addr=inet_addr(argv[1]);
    int ret = connect(sockfd,(struct sockaddr*)&addr,sizeof(addr));
    ERROR_CHECK(ret,-1,"connect");
    /* sleep(5); */
    recvFile(sockfd);

    /* while(1){ */
    /*     sleep(1); */
    /* } */
    return 0;
}

head.h

#ifndef HEAD_H
#define HEAD_H
#include <my_header.h>
// 工人状态
enum{
    FREE,
    BUSY
};
typedef struct workerdata_s{
    pid_t pid;
    int statu;//工人状态
    int sockfd;// 工人进程与主人进程间的管道
}workerdata_t;

int makeWorker(int num,workerdata_t* workerArr);
int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int sendfd(int sockfd, int fdtosend);
int recvfd(int sockfd, int *pfdtorecv);
int transFile(const char* filename,int netfd);
int recvFile(int sockfd);
#endif

recvfile.c

#include "head.h"
typedef struct train_s{
    int len;            
    char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
    // recv filename
    train_t train;
    bzero(&train,sizeof(train));
    int ret = recv(sockfd,&train.len,sizeof(train.len),0);
    printf("recv filename len =%d\n",train.len);
    ret = recv(sockfd,train.buf,train.len,0);
    char filename[1024];
    bzero(filename,sizeof(filename));
    memcpy(filename,train.buf,train.len);
    int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
    ERROR_CHECK(fd,-1,"open");
    printf("recv filename is %s\n",filename);
    // recv fileSize
    off_t fileSize;
    ret = recv(sockfd,&fileSize,sizeof(off_t),0);
    ERROR_CHECK(ret,-1,"recv 1");
    printf("fileSize is %ld\n",fileSize);
    // recv file content
    ret = truncate(filename,fileSize);
    ERROR_CHECK(ret,-1,"truncate");
    char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
    ERROR_CHECK(ret,-1,"recv 2");
    munmap(pfile,fileSize);
    return 0;
}

makefile

client:client.o recvfile.o
	gcc client.o recvfile.o -o client -g -Wall
client.o:client.c
	gcc -c client.c -o client.o -g -Wall
recvfile.o:recvfile.c
	gcc -c recvfile.c -o recvfile.o -g -Wall

服务端

main.c

#include "head.h"
int exitFds[2];
void handler(int num){
    printf("signal = %d!\n",num);
    int ret = write(exitFds[1],"1",1);
    ERROR_CHECK(ret,-1,"exit write");
    printf("ret = %d\n",ret);
}
int main(int argc, char* argv[])
{
    // ./server 192.168.157.128 2345 5
    ARGS_CHECK(argc,4);
    // 创建worker进程
    int workerNum = 5;
    workerdata_t* workerArr=(workerdata_t*)calloc(workerNum,sizeof(workerdata_t));
    makeWorker(workerNum,workerArr);
    // 设置tcp
    int sockfd = tcpInit(argv[1],argv[2]);
    // 设置epoll
    int epfd = epoll_create(1);
    epollAdd(epfd,sockfd);
    for(int i=0;i<workerNum;++i){
        epollAdd(epfd,workerArr[i].sockfd);
    }
    struct epoll_event readySet[1024];
    // 优雅地退出
    int retPipe = pipe(exitFds);
    ERROR_CHECK(retPipe,-1,"pipe");
    printf("exitFds[0]=%d,exitFds[1]=%d\n",exitFds[0],exitFds[1]);
    signal(SIGUSR1,handler);
    epollAdd(epfd,exitFds[0]);
    while(1){
        // 只会收到连接请求和worker信息
        int readyNum = epoll_wait(epfd,readySet,1024,-1);
        for(int i=0;i<readyNum;++i){
            if(readySet[i].data.fd==exitFds[0]){
                // 挨个通知worker进程退出
                printf("exit begin.\n");
                for(int j=0;j<workerNum;++j){
                    sendfd(workerArr[j].sockfd,9,1);
                }
                for(int j=0;j<workerNum;++j){
                    wait(NULL);
                }
                printf("all worker ended.\n");
                exit(0);
            }
            else if(readySet[i].data.fd == sockfd){
                int netfd = accept(sockfd,NULL,NULL);
                printf("a new connect netfd=%d\n",netfd);
                // 找一个空闲的worker进程
                for(int j=0;j<workerNum;++j){
                    if(workerArr[j].statu==FREE){
                        sendfd(workerArr[j].sockfd,netfd,0);
                        workerArr[j].statu=BUSY;
                        printf("send netfd=%d to worker%d\n",netfd,j);
                        break;
                    }
                }
                // worker 那里还有一个文件描述符指向netfd的文件对象
                close(netfd);
            }
            else{
                pid_t pid;
                int workerfd = readySet[i].data.fd;
                for(int j=0;j<workerNum;++j){
                    if(workerArr[j].sockfd==workerfd){
                        workerArr[j].statu=FREE;
                        read(workerfd,&pid,sizeof(pid));
                        printf("worker%d pid=%d free again\n",j,pid);
                        break;
                    }
                }
            }
        }            

    }
    return 0;
}

head.h

#ifndef HEAD_H
#define HEAD_H
#include <my_header.h>
// 工人状态
enum{
    FREE,
    BUSY
};
typedef struct workerdata_s{
    pid_t pid;
    int statu;//工人状态
    int sockfd;// 工人进程与主人进程间的管道
}workerdata_t;

int makeWorker(int num,workerdata_t* workerArr);
int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int recvfd(int sockfd, int *pfdtorecv,int* pexitFlag);
int sendfd(int sockfd, int fdtosend,int exitFlag);
int transFile(const char* filename,int netfd);
int recvFile(int sockfd);
#endif

worker.c

#include "head.h"
// v 1.0
typedef struct train_s{
    int len;
    char buf[1000];
}train_t;
int makeWorker(int num,workerdata_t* workerArr){
    for(int i=0;i<num;++i){
        // 每个工人进程创建一个socket管道
        int fds[2];
        socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
        pid_t pid = fork();
        if(pid == 0){
            close(fds[0]);
            int netfd;
            while(1){
                int exitFlag=0;
                recvfd(fds[1],&netfd,&exitFlag);
                if(exitFlag==1){
                    printf("worker going to end.\n");
                    exit(0);
                }
                printf("worker recv netfd=%d\n",netfd);
                // 业务代码,传输文件 file1
                char path[]="file1";
                int fd = open(path,O_RDONLY);
                ERROR_CHECK(fd,-1,"worker open");
                transFile(path,netfd);
                // 结束任务
                printf("work over.\n");
                pid = getpid();
                write(fds[1],&pid,sizeof(pid));
                close(netfd);
            }   

        }
        close(fds[1]);
        workerArr[i].sockfd = fds[0];
        workerArr[i].pid = pid;
        workerArr[i].statu = FREE;
        printf("worker sockfd = %d\n",fds[0]);
    }
    return 0;
}

epoll.c

#include "head.h"
int epollAdd(int epfd,int fd){
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = fd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
    return 0;
}
int epollDel(int epfd,int fd){
    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
    return 0;
}

tcpinit.c

#include "head.h"
int tcpInit(const char* ip,const char* port){
    struct sockaddr_in addr;
    bzero(&addr,sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr(ip);
    addr.sin_port = htons(atoi(port));
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    int flag =1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
    int ret = bind(sockfd,(struct sockaddr*)&addr,sizeof(addr));
    ERROR_CHECK(ret ,-1,"bind");
    listen(sockfd,50);
    return sockfd;
}

sendmsg_recvmsg.c

#include "head.h"
int sendfd(int sockfd, int fdtosend,int exitFlag){
    struct msghdr hdr; 
    bzero(&hdr,sizeof(hdr));
    //正文
    /* char buf[] = "hello"; */
    /* struct iovec vec[1]; */
    /* vec[0].iov_base = buf; */
    /* vec[0].iov_len = 5; */
    /* hdr.msg_iov = vec; */
    /* hdr.msg_iovlen = 1; */
    struct iovec vec[1];
    vec[0].iov_base = &exitFlag;
    vec[0].iov_len = sizeof(int);
    hdr.msg_iov = vec;
    hdr.msg_iovlen = 1;
    //控制信息
    struct cmsghdr *cmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
    cmsg->cmsg_level = SOL_SOCKET;
    cmsg->cmsg_type = SCM_RIGHTS;// 发送文件对象
    *(int *)CMSG_DATA(cmsg) = fdtosend;// fdtosend 赋值到data首地址
                                       
    hdr.msg_control = cmsg;
    hdr.msg_controllen = CMSG_LEN(sizeof(int));
    sendmsg(sockfd,&hdr,0);
    return 0;
}
int recvfd(int sockfd, int *pfdtorecv,int* pexitFlag){
    struct msghdr hdr;
    bzero(&hdr,sizeof(hdr));
    //正文
    /* char buf[6] = {0}; */
    /* struct iovec vec[1]; */
    /* vec[0].iov_base = buf; */
    /* vec[0].iov_len = 5; */
    /* hdr.msg_iov = vec; */
    /* hdr.msg_iovlen = 1; */
    struct iovec vec[1];
    vec[0].iov_base = pexitFlag;
    vec[0].iov_len = sizeof(int);
    hdr.msg_iov = vec;
    hdr.msg_iovlen = 1;
    //控制字段
    struct cmsghdr *cmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
    cmsg->cmsg_len = CMSG_LEN(sizeof(int));
    cmsg->cmsg_level = SOL_SOCKET;
    cmsg->cmsg_type = SCM_RIGHTS; // 发送文件对象
                                  
    hdr.msg_control = cmsg;
    hdr.msg_controllen = CMSG_LEN(sizeof(int));
    recvmsg(sockfd,&hdr,0);
    /* printf("buf=%s,fdtorecv=%d\n",buf,*(int *)CMSG_DATA(cmsg)); */
    *pfdtorecv = *(int*)CMSG_DATA(cmsg);
    return 0;
}

transfile.c

#include "head.h"
typedef struct train_s{
    int len;            
    char buf[1000];
}train_t;
// v 1.0
/* int transFile(const char* filename,int netfd){ */
/*     // send filename */
/*     train_t train; */
/*     train.len = strlen(filename); */
/*     memcpy(train.buf,filename,train.len); */
/*     /1* printf("send filename before\n"); *1/ */
/*     int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL); */
/*     ERROR_CHECK(ret,-1,"send"); */
/*     /1* printf("send filename after.\n"); *1/ */
/*     int fd = open("file1",O_RDONLY); */
/*     /1* struct stat stat; *1/ */
/*     /1* fstat(fd,&stat); *1/ */
/*     char buf[1024]={0}; */
/*     while(ret!=0){ */
/*         bzero(buf,sizeof(buf)); */
/*         int retRead  = read(fd,buf,1000); */
/*         ret = send(netfd,buf,retRead,MSG_NOSIGNAL); */
/*         if(ret == -1){ */
/*             break; */
/*         } */
/*     } */
/*     return 0; */
/* } */

// v 2.0 mmap
int transFile(const char* filename,int netfd){
    // send filename
    train_t train;
    train.len = strlen(filename);
    memcpy(train.buf,filename,train.len);
    int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL);
    ERROR_CHECK(ret,-1,"send");
    int fd = open("file1",O_RDWR);
    struct stat stat;
    fstat(fd,&stat);
    off_t fileSize = stat.st_size;
    // send fileSize
    ret = send(netfd,&fileSize,sizeof(off_t),0);
    ERROR_CHECK(ret,-1,"send3");
    // send file content
    ret = truncate(filename,fileSize);
    ERROR_CHECK(ret,-1,"truncate");
    char* filep =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    ret = send(netfd,filep,fileSize,0);
    ERROR_CHECK(ret,-1,"trainfile send2");
    munmap(filep,fileSize);
    return 0;
}

makefile

server: main.o worker.o tcpinit.o epoll.o sendmsg_recvmsg.o transfile.o
	gcc  main.o worker.o tcpinit.o epoll.o sendmsg_recvmsg.o transfile.o -o server -g -Wall
main.o: main.c
	gcc -c main.c -o main.o -g -Wall
worker.o: worker.c
	gcc -c worker.c -o worker.o -g -Wall
tcpinit.o: tcpinit.c
	gcc -c tcpinit.c -o tcpinit.o -g -Wall
epoll.o: epoll.c
	gcc -c epoll.c -o epoll.o -g -Wall
sendmsg_recvmsg.o: sendmsg_recvmsg.c
	gcc -c sendmsg_recvmsg.c -o sendmsg_recvmsg.o -g -Wall
transfile.o:transfile.c
	gcc -c transfile.c -o transfile.o -g -Wall
test:test.c
	gcc test.c -o test -g -Wall

结果

Snipaste_2025-06-04_14-25-12

线程池

目录结构

image-20250605202204080

服务端

main.c

#include "pthreadPool.h"
int pipeFds[2];
void handler(int num){
    close(pipeFds[0]);
    printf("waitting to end.\n");
    int ret  = write(pipeFds[1],"1",1);
    ERROR_CHECK(ret,-1,"handler write");
}
int main(int argc, char* argv[])
{
    // ./server 192.168.157.128 12345 5
    ARGS_CHECK(argc,4);
    // 父子进程间通信
    pipe(pipeFds);
    if(fork()){
        signal(SIGUSR1,handler);
        wait(NULL);
        printf("server end.\n");
        exit(0);
    }else{
        // create pthread pool
        int workerNum = atoi(argv[3]);
        pthreadPool_t pthreadPool;
        pthreadPoolInit(&pthreadPool,workerNum);
        workerCreate(&pthreadPool,workerNum);
        // create tcp 
        int sockfd = tcpInit(argv[1],argv[2]);
        // epoll_wait
        int epfd = epoll_create(1);
        epollAdd(epfd,sockfd);
        close(pipeFds[1]);
        epollAdd(epfd,pipeFds[0]);
        struct epoll_event readySet[1024];
        bzero(readySet,sizeof(readySet));

        while(1){
            int readyNum = epoll_wait(epfd,readySet,1024,-1);
            for(int i=0;i<readyNum;++i){
                if(readySet[i].data.fd == sockfd){
                    printf("a client connect\n");
                    int netfd = accept(sockfd,NULL,NULL);
                    pthread_mutex_lock(&pthreadPool.mutex);
                    enQueue(&pthreadPool.netfdQueue,netfd);
                    pthread_cond_broadcast(&pthreadPool.cond);
                    pthread_mutex_unlock(&pthreadPool.mutex);
                    printf("add netfd\n");
                }else if(readySet[i].data.fd == pipeFds[0]){
                    printf("child end begin.\n");
                    pthread_mutex_lock(&pthreadPool.mutex);
                    pthreadPool.endFlag=1;
                    pthread_cond_broadcast(&pthreadPool.cond);
                    pthread_mutex_unlock(&pthreadPool.mutex);
                    for(int j =0;j<workerNum;++j){
                        pthread_join(pthreadPool.tidArr.TidArr[j],NULL);
                        printf("one worker pthread end.\n");
                    }
                    printf("child end.\n");
                    exit(0);
                }
            }
        }
    }
    return 0;
}

worker.c

#include "pthreadPool.h"
void* workerFuc(void* arg){
    pthreadPool_t* pthreadPool = (pthreadPool_t*)arg;
    int netfd;
    printf("worker create suceessfully.\n");
    while(1){
        pthread_mutex_lock(&pthreadPool->mutex);
        while(pthreadPool->netfdQueue.size==0 && pthreadPool->endFlag==0){
            pthread_cond_wait(&pthreadPool->cond,&pthreadPool->mutex);    
        }
        if(pthreadPool->endFlag==1){
            pthread_mutex_unlock(&pthreadPool->mutex);
            pthread_exit(NULL);
        }
        netfd = pthreadPool->netfdQueue.head->netfd;
        deQueue(&pthreadPool->netfdQueue);
        pthread_mutex_unlock(&pthreadPool->mutex);
        printf("worker pthread get netfd = %d\n",netfd);
        
        char filename[]="file1";
        transFile(filename,netfd);

        close(netfd);
    }

}
int workerCreate(pthreadPool_t* pthreadPool,int workerNum){
    for(int i=0;i<workerNum;++i){
        int ret = pthread_create(&pthreadPool->tidArr.TidArr[i],NULL,workerFuc,pthreadPool); 
        fprintf(stderr,"pthread_create: %s\n",strerror(ret));
    }
    return 0;
}

pthreadPool.h

#ifndef __PTHREAD_POOL__
#define __PTHREAD_POOL__
#include <my_header.h>
#include "tidArr.h"
#include "taskQueue.h"
typedef struct pthreadPool_s{
    taskQueue_t netfdQueue;
    tidArr_t tidArr;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int endFlag;
}pthreadPool_t;
int pthreadPoolInit(pthreadPool_t* pthreadPool,int workerNum);
int workerCreate(pthreadPool_t* pthreadPool,int workerNum);

int tcpInit(const char* ip,const char* port);
int epollAdd(int epfd,int fd);
int epollDel(int epfd,int fd);
int transFile(const char* filename,int netfd);

#endif

pthreadPool.c

#include "pthreadPool.h"
int pthreadPoolInit(pthreadPool_t* pthreadPool,int workerNum){
    bzero(pthreadPool,sizeof(pthreadPool_t));
    pthreadPool->tidArr.TidArr = (pthread_t*)calloc(workerNum,sizeof(pthread_t));
    pthreadPool->tidArr.workerNum=workerNum;
    pthread_mutex_init(&pthreadPool->mutex,NULL);
    pthread_cond_init(&pthreadPool->cond,NULL);
    printf("pthreadPool init over.\n");
    return 0;
}  

tidArr.h

#ifndef __TID_ARR__
#define __TID_ARR__
#include <my_header.h>
typedef struct tidArr_s{
    int workerNum;// worker pthread num
    // pthread_t workerTidArr[workerNum];
    pthread_t* TidArr; // worker pthread's Tid arr
}tidArr_t;
int TidArrInit(tidArr_t* workerTidArr);
#endif

tidArr.c

#include "tidArr.h"
int TidArrInit(tidArr_t* workerTidArr){
    bzero(workerTidArr,sizeof(tidArr_t));
    printf("tidArr.c: tidArrInit\n");
    return 0;
}

taskQueue.h

#ifndef __TASK_QUEUE__
#define __TASK_QUEUE__
#include <my_header.h>
typedef struct queueNode_s{
    int netfd;
    struct queueNode_s* next;
}queueNode_t;
typedef struct taskQueue_s{
    queueNode_t* head;
    queueNode_t* tail;
    int size;
}taskQueue_t;
int taskQueueInit(taskQueue_t* queue);
int enQueue(taskQueue_t* queue,int netfd);
int deQueue(taskQueue_t* queue);
int printQueue(const taskQueue_t* queue);
#endif

taskQueue.c

#include "taskQueue.h"
int taskQueueInit(taskQueue_t* queue){
    bzero(queue,sizeof(taskQueue_t));
    return 0;
}
int enQueue(taskQueue_t* queue,int netfd){
    queueNode_t* newNode = (queueNode_t*)calloc(1,sizeof(queueNode_t));
    newNode->netfd=netfd;
    if(queue->size==0){
        queue->head=newNode;
        queue->tail=newNode;
    }else{
        queue->tail->next=newNode;
        queue->tail=newNode;
    }
    ++queue->size;
    return 0;
}
// 不处理queue为空时出队的情况
int deQueue(taskQueue_t* queue){
    queueNode_t* tempNode = queue->head;
    queue->head=queue->head->next;
    --queue->size;
    if(queue->size==0){
        queue->tail=NULL;
    }
    free(tempNode);
    return 0;
}

int printQueue(const taskQueue_t* queue){
    queueNode_t* ptr=queue->head;
    while(ptr!=NULL){
        printf("%d->",ptr->netfd);
        ptr=ptr->next;
    }
    printf("null\n");
    return 0;
}

tcpinit.c

#include <my_header.h>
int tcpInit(const char* ip,const char* port){
    struct sockaddr_in addr;
    bzero(&addr,sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr(ip);
    addr.sin_port = htons(atoi(port));
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    int flag =1;
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
    int ret = bind(sockfd,(struct sockaddr*)&addr,sizeof(addr));
    ERROR_CHECK(ret ,-1,"bind");
    listen(sockfd,50);
    return sockfd;
}

epool.c

#include <my_header.h>
int epollAdd(int epfd,int fd){
    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = fd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&event);
    return 0;
}
int epollDel(int epfd,int fd){
    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
    return 0;
}

transfile.c

#include <my_header.h>
typedef struct train_s{
    int len;            
    char buf[1000];
}train_t;
// v 1.0
/* int transFile(const char* filename,int netfd){ */
/*     // send filename */
/*     train_t train; */
/*     train.len = strlen(filename); */
/*     memcpy(train.buf,filename,train.len); */
/*     /1* printf("send filename before\n"); *1/ */
/*     int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL); */
/*     ERROR_CHECK(ret,-1,"send"); */
/*     /1* printf("send filename after.\n"); *1/ */
/*     int fd = open("file1",O_RDONLY); */
/*     /1* struct stat stat; *1/ */
/*     /1* fstat(fd,&stat); *1/ */
/*     char buf[1024]={0}; */
/*     while(ret!=0){ */
/*         bzero(buf,sizeof(buf)); */
/*         int retRead  = read(fd,buf,1000); */
/*         ret = send(netfd,buf,retRead,MSG_NOSIGNAL); */
/*         if(ret == -1){ */
/*             break; */
/*         } */
/*     } */
/*     return 0; */
/* } */

// v 2.0 mmap
int transFile(const char* filename,int netfd){
    // send filename
    train_t train;
    train.len = strlen(filename);
    memcpy(train.buf,filename,train.len);
    int ret = send(netfd,&train,sizeof(train.len)+train.len,MSG_NOSIGNAL);
    ERROR_CHECK(ret,-1,"send");
    int fd = open("file1",O_RDWR);
    struct stat stat;
    fstat(fd,&stat);
    off_t fileSize = stat.st_size;
    // send fileSize
    ret = send(netfd,&fileSize,sizeof(off_t),0);
    ERROR_CHECK(ret,-1,"send3");
    // send file content
    ret = truncate(filename,fileSize);
    ERROR_CHECK(ret,-1,"truncate");
    char* filep =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    ret = send(netfd,filep,fileSize,0);
    ERROR_CHECK(ret,-1,"trainfile send2");
    munmap(filep,fileSize);
    return 0;
}

makefile

server:main.o pthreadPool.o taskQueue.o tidArr.o worker.o tcpinit.o epoll.o transfile.o
	gcc main.o pthreadPool.o taskQueue.o worker.o tcpinit.o epoll.o transfile.o -o server
main.o:main.c
	gcc -c main.c -o main.o -g -Wall
pthreadPool.o:pthreadPool.c
	gcc -c pthreadPool.c -o pthreadPool.o -g -Wall
taskQueue.o:taskQueue.c
	gcc -c taskQueue.c -o taskQueue.o -g -Wall
tidArr.o:tidArr.c
	gcc -c tidArr.c -o tidArr.o -g -Wall
worker.o:worker.c
	gcc -c worker.c -o worker.o -g -Wall
epoll.o:epoll.c
	gcc -c epoll.c -o epoll.o -g -Wall
transfile.o:transfile.c
	gcc -c transfile.c -o transfile.o -g -Wall
tcpinit.o:tcpinit.c
	gcc -c tcpinit.c -o tcpinit.o -g -Wall

客户端

clinet.c

#include <my_header.h>
typedef struct train_s{
    int len;
    char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
    // recv filename
    train_t train;
    bzero(&train,sizeof(train));
    int ret = recv(sockfd,&train.len,sizeof(train.len),0);
    printf("recv filename len =%d\n",train.len);
    ret = recv(sockfd,train.buf,train.len,0);
    char filename[1024];
    bzero(filename,sizeof(filename));
    memcpy(filename,train.buf,train.len);
    int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
    ERROR_CHECK(fd,-1,"open");
    printf("recv filename is %s\n",filename);
    // recv fileSize
    off_t fileSize;
    ret = recv(sockfd,&fileSize,sizeof(off_t),0);
    ERROR_CHECK(ret,-1,"recv 1");
    printf("fileSize is %ld\n",fileSize);
    // recv file content
    ret = truncate(filename,fileSize);
    ERROR_CHECK(ret,-1,"truncate");
    char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
    ERROR_CHECK(ret,-1,"recv 2");
    munmap(pfile,fileSize);
    return 0;
}

int main(int argc, char* argv[])
{
    //./client 192.168.157.128 2345
    ARGS_CHECK(argc,3);
    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    struct sockaddr_in addr;
    bzero(&addr,sizeof(addr));
    addr.sin_family=AF_INET;
    addr.sin_port=htons(atoi(argv[2]));
    addr.sin_addr.s_addr=inet_addr(argv[1]);
    int ret = connect(sockfd,(struct sockaddr*)&addr,sizeof(addr));
    ERROR_CHECK(ret,-1,"connect");

    recvFile(sockfd);
    return 0;
}

recvfile.c

#include <my_header.h>
typedef struct train_s{
    int len;            
    char buf[1000];
}train_t;
// v 2.0
int recvFile(int sockfd){
    // recv filename
    train_t train;
    bzero(&train,sizeof(train));
    int ret = recv(sockfd,&train.len,sizeof(train.len),0);
    printf("recv filename len =%d\n",train.len);
    ret = recv(sockfd,train.buf,train.len,0);
    char filename[1024];
    bzero(filename,sizeof(filename));
    memcpy(filename,train.buf,train.len);
    int fd = open(filename,O_RDWR|O_CREAT|O_TRUNC,0666);
    ERROR_CHECK(fd,-1,"open");
    printf("recv filename is %s\n",filename);
    // recv fileSize
    off_t fileSize;
    ret = recv(sockfd,&fileSize,sizeof(off_t),0);
    ERROR_CHECK(ret,-1,"recv 1");
    printf("fileSize is %ld\n",fileSize);
    // recv file content
    ret = truncate(filename,fileSize);
    ERROR_CHECK(ret,-1,"truncate");
    char* pfile =(char*)mmap(NULL,fileSize,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
    ret = recv(sockfd,pfile,fileSize,MSG_WAITALL);
    ERROR_CHECK(ret,-1,"recv 2");
    munmap(pfile,fileSize);
    return 0;
}

makefile

client:client.c
	gcc client.c -o client -g -Wall

运行结果

image-20250605222757795
 
上一篇:磨山散步
评论
晨钟杂记. All Rights Reserved. Theme Jasmine by Kent Liao.