万字长文解读经典并发模型—多进程、多线程、IO多路复用

万字长文解读经典并发模型—多进程、多线程、IO多路复用

理解并发

如果给CPU一系列任务,CPU按照事件顺序逐一完成这些任务,在这种情况下后续任务必须等待前面的任务完全完成之后才能占用CPU资源,对于简单的任务,计算机使用这种模型就足够了。但是为了提高系统的性能与用户体验以及资源分配的公平性,操作系统引入了并发机制。某些应用场景对实时性有较高的要求,需要在有限的时间内进行响应。通过并发处理,可以在规定的时间内同时处理多个任务,满足实时性需求。例如对于QQ应用程序,同时打开了聊天框、QQ空间等,如果这几个任务是顺序执行的话,QQ空间的加载完毕必须等待聊天框先加载完毕,这样对于用户体验会大打折扣。

并发与并行是不同的两个概念,两个任务在两个CPU上一起执行就是并行,而并发是多个任务在同一个CPU上交替执行,这些任务可以是线程、进程、事件或其他执行单元。也就是说,并发其实就是虚拟化的并行,让事件的处理从用户角度来看是并行执行的,能够在有限的计算资源上较大程度满足用户对于实时响应的要求。同时因为前面任务中频繁的IO操作使CPU处于空闲的状态从而浪费大量的计算资源,所以对于IO密集型应用,并发执行可以有效提高系统吞吐量。但是对于纯CPU密集型的应用,并发并不能提高吞吐量,反而因为频繁的切换导致更大的开销。

多进程模型

多进程C/S模型

对于常见的C/S模型,一个服务端通常需要服务多个客户端。如果使用单行的处理模型,当新的客户端请求服务端的服务时,就必须等待比它先到的客户端的请求全部完成。

因此引入多进程并发服务器模型。多进程并发服务器模型的简单流程图如下所示。父进程创建一个套接字,然后与自己的IP地址、端口号进行绑定。之后调用开始监听来自客户端的敲门,当有客户端来敲门时,accept()接收客户端的连接并创建一个新套接字用于与客户端通信。接下来调用fork()函数,当调用fork()函数时,操作系统会复制当前进程的一个副本,包括进程的代码、数据和状态等信息。如果其返回值为负数,表示创建子进程失败。否则他在父子进程中有不同的返回值:如果返回值为0,表示当前代码正在子进程中执行。如果返回值大于0,表示当前代码正在父进程中执行,返回的值是子进程的进程ID。因此可以使用if-else语句来编写子进程的处理代码。

在子进程中,先关闭从父进程中复制下来监听套接字,这个套接字在子进程中没有用了,纯属浪费资源,之后再进行与客户端的通信。而在父进程中,同理关闭accept()创建的新套接字,然后继续监听客户端的连接请求。

image-20231025151639343

Linux下实现多进程并发C/S模型

服务端完整代码如下所示。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#define PORT 8888
#define MAX_BUFFER_SIZE 2048

void communicate(int *sockfd, int *new_sockfd, struct sockaddr_in *client)
{
    // 接收缓冲区与发送缓冲区
    char recv_buf[MAX_BUFFER_SIZE], send_buf[MAX_BUFFER_SIZE];
    int ret;
    close(*sockfd);
    while (1)
    {
        memset(recv_buf, 0, MAX_BUFFER_SIZE);
        memset(send_buf, 0, MAX_BUFFER_SIZE);
        // 读取客户端发送的消息
        ret = recv(*new_sockfd, recv_buf, MAX_BUFFER_SIZE, 0);
        if (ret < 0)
        {
            perror("recv data error");
            exit(-1);
        }
        recv_buf[ret] = '\0';
        printf("server>>>get msg from %s: %s", inet_ntoa(client->sin_addr), recv_buf);
        // 发送消息
        strcat(send_buf, "client>>>ack!!!n");
        ret = send(*new_sockfd, send_buf, sizeof(send_buf), 0);
        if (ret < 0)
        {
            perror("send data error");
            exit(-1);
        }
    }
    // 关闭套接字
    close(*new_sockfd);
}

int main()
{
    // 监听套接字,新建套接字
    int sockfd, new_sockfd;
    // 服务端和客户端地址结构
    struct sockaddr_in serv, client;
    int opt = SO_REUSEADDR;
    int ret;
    socklen_t client_len;
    int pid;

    // 创建一个TCP套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        // 创建失败
        perror("error in creating socketn");
        exit(-1);
    }

    /**
     * 设置套接字选项,以允许重新绑定(rebinding)已经在使用的地址和端口
     * 允许在关闭套接字之后立即重新绑定使用相同地址和端口的套接字。
     * 这对于服务器程序在重启后快速重新监听之前使用的地址和端口非常有用。
     */
    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));

    // 设置serv的值
    // 设置地址族为IPv4,即使用IPv4地址。
    serv.sin_family = AF_INET;
    /**
     * 将IP地址设置为INADDR_ANY,
     * 表示服务器可以接受来自任意网络接口的连接请求。
     * 这样服务器将监听所有可用的网络接口。
     */
    serv.sin_addr.s_addr = INADDR_ANY;
    // htons()函数用于将主机字节序的端口号转换为网络字节序的端口号。
    serv.sin_port = htons(PORT);

    // 将套接字绑定到serv
    ret = bind(sockfd, (struct sockaddr *)&serv, sizeof(struct sockaddr));
    if (ret < 0)
    {
        perror("error in bindingn");
        exit(-1);
    }
    // 开始监听客户端的连接请求
    if (-1 == listen(sockfd, 5))
    {
        perror("listen errorn");
        exit(-1);
    } // 使用5个客户端的连接队列监听传入的连接
    while (1)
    {
        // 接收客户端请求,新建一个套接字
        client_len = sizeof(struct sockaddr);
        new_sockfd = accept(sockfd, (struct sockaddr *)&client, &client_len);
        if (new_sockfd < 0)
        {
            // 创建失败
            perror("recv error");
            close(sockfd);
            exit(-1);
        }
        // 创建子进程
        pid = fork();
        if (pid == 0)
        {
            // 子进程处理
            communicate(&sockfd, &new_sockfd, &client);
        }
        else
        {
            // 父进程处理
            close(new_sockfd);
        }
    }
    return 0;
}

编译服务端程序,然后开启两个终端使用nc连接服务端,结果如下所示,可知实现了多进程的并发服务器模型。

image-20231025160258457

多进程并发编程注意事项

僵尸进程

下图使用ps aux查看当前系统进程所有信息,STAT列即为进程的状态信息。

img

僵尸进程,顾名思义,指一个进程生命周期结束了,但是它的“尸体”PCB等进程信息却还在内存中被保存,就成为了僵尸进程。当一个进程退出时如果它的父进程没有读取到该进程退出时返回的退出状态码,该进程就会变成僵尸进程。在Linux的shell中,其STAT字段为Z。如果僵尸进程太多,占用进程表中的空间。当进程表被完全占满,系统将无法创建新的进程,导致新进程的创建失败。fork创建进程时出现如下错误fork: Resource temporarily unavailable,表示资源不够用了。所以,在使用多进程模型时,需要避免产生僵尸进程。

所以,当一个子进程退出时需要其父进程为其“收尸”,可以在父进程中使用 waitpid() 函数等待子进程的退出状态。通过检查返回值和 status 变量,可以获取子进程的退出状态、终止原因以及其他相关信息。waitpid() 是一个系统调用函数,用于等待指定的子进程状态改变并获取子进程的退出状态。

pid_t waitpid(pid_t pid, int *status, int options);
/*
参数解释:
pid:指定要等待的子进程的进程ID。有几种可能的取值:
    -1:等待任意子进程,类似于 wait() 函数。
    0:等待与当前调用进程在同一个进程组中的任意子进程。
    大于 0:等待指定进程ID的子进程。
status:用于存储子进程的退出状态或终止原因。可以为 NULL,表示不关心子进程的退出状态。
options:控制函数的行为。常用的选项有:
    WCONTINUED:等待被暂停的子进程继续执行。
    WNOHANG:非阻塞调用,即立即返回,如果没有子进程状态发生变化。
    WUNTRACED:等待被暂停的子进程。
    WEXITED:等待已退出的子进程。
    WSTOPPED:等待已停止的子进程。
    可以通过按位或操作符 | 组合多个选项。
*/

但是当父进程也结束时,子进程的父进程就会变为 init 进程(进程 ID 为 1)。init 进程是系统中的特殊进程,是内核启动的第一个用户级进程,负责接管孤儿进程(即父进程退出的子进程)、僵尸进程,并负责回收它们的资源。因此,当父进程结束后,init进程都会接管并回收僵尸进程。

进程与线程的区别

线程也可以作为并发编程的执行单位。它是轻量级的进程。进程与线程主要有以下区别:

  1. 资源占用:每个进程都拥有独立的地址空间、文件描述符、系统资源等,进程之间的资源相互独立,不共享。而线程是在进程内部创建的,共享进程的地址空间和资源,线程之间可以直接访问同一进程的数据。
  2. 调度和切换:在操作系统中,进程是资源分配的基本单位。进程间的切换涉及上下文切换,需要保存和恢复进程的状态信息,开销较大。而线程是在进程内部调度执行的,线程切换的开销相对较小,因为线程共享进程的上下文。
  3. 通信:多个进程可以同时执行,每个进程有自己的独立执行流。进程之间需要通过进程间通信来进行数据交换和同步。而线程是在同一个进程内部并发执行,它们共享进程的资源,可以直接读写进程内的数据,因此线程之间的通信和同步更加方便。
  4. 创建和销毁:进程的创建和销毁相对比较耗费资源和时间,涉及到地址空间的分配和释放,需要操作系统的支持。而线程的创建和销毁相对轻量,开销较小,可以在进程内动态创建和销毁。

前面我们已经实现了多进程的并发服务器模型,但是使用进程的代价太大了,所以接下来我们使用更加轻量级的线程来实现并发服务器模型。

多线程模型

多线程C/S模型

相比多进程,多线程的优势主要在于线程之间共享同一进程的内存空间,因此线程之间的通信和数据共享更加方便。此外,线程的创建和销毁开销相对较小。

多线程并发服务器模型的简单流程图如下所示。

image-20231025185527410

POSIX线程库

POSIX 线程库(pthread)是一个用于多线程编程的标准库,提供了一组函数和数据类型,用于创建、同步和管理线程。下面是一些常用的 POSIX 线程库函数和参数的释义。

线程管理

创建一个新的线程

  • thread:指向 pthread_t 类型的指针,用于存储新线程的标识符。
  • attr:指向 pthread_attr_t 类型的指针,用于指定线程的属性,可以为 NULL
  • start_routine:指向线程的入口函数的指针,新线程将从该函数开始执行。
  • arg:传递给线程入口函数的参数。
pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg)

等待一个线程的结束,并获取线程的返回值。

  • thread:要等待的线程的标识符。
  • retval:用于存储线程的返回值的指针。
pthread_join(pthread_t thread, void **retval)

退出当前线程,并可选择传递一个返回值。

  • retval:线程的返回值。
pthread_exit(void *retval)

请求取消指定的线程。

  • thread:要取消的线程的标识符。
pthread_cancel(pthread_t thread)

分离一个线程,使其在退出时自动释放资源。

  • thread:要分离的线程的标识符。
pthread_detach(pthread_t thread)
互斥锁

初始化一个互斥锁。

  • mutex:指向 pthread_mutex_t 类型的指针,用于存储互斥锁对象。
  • attr:指向 pthread_mutexattr_t 类型的指针,用于指定互斥锁的属性,可以为 NULL
pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)

对互斥锁进行加锁。

  • mutex:互斥锁对象。
pthread_mutex_lock(pthread_mutex_t *mutex)

对互斥锁进行解锁。

  • mutex:互斥锁对象。
pthread_mutex_unlock(pthread_mutex_t *mutex)
条件变量

初始化一个条件变量。

  • cond:指向 pthread_cond_t 类型的指针,用于存储条件变量对象。
  • attr:指向 pthread_condattr_t 类型的指针,用于指定条件变量的属性,可以为 NULL
pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)

在条件变量上等待,同时释放互斥锁。

  • cond:条件变量对象。
  • mutex:互斥锁对象。
pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)

唤醒至少一个等待在条件变量上的线程。

  • cond:条件变量对象。
pthread_cond_signal(pthread_cond_t *cond):

唤醒所有等待在条件变量上的线程。

  • cond:条件变量对象。
pthread_cond_broadcast(pthread_cond_t *cond)

Linux下实现多进程并发C/S模型

完整代码如下所示。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/wait.h>

#define PORT 8888
#define MAX_BUFFER_SIZE 2048

struct thread
{
    int sockfd;
    char *src;
};

void *communicate(void *args)
{
    struct thread params;
    // 解引用
    params = *((struct thread *)args);
    // 释放
    free(args);
    int new_sockfd = params.sockfd;
    // 接收缓冲区与发送缓冲区
    char recv_buf[MAX_BUFFER_SIZE],
        send_buf[MAX_BUFFER_SIZE];
    int ret;
    while (1)
    {
        memset(recv_buf, 0, MAX_BUFFER_SIZE);
        memset(send_buf, 0, MAX_BUFFER_SIZE);
        // 读取客户端发送的消息
        ret = recv(new_sockfd, recv_buf, MAX_BUFFER_SIZE, 0);
        if (ret < 0)
        {
            perror("recv data error");
            exit(-1);
        }
        recv_buf[ret] = '\0';
        printf("server>>>get msg from %s: %s", params.src, recv_buf);
        // 发送消息
        strcat(send_buf, "client>>>ack!!!n");
        ret = send(new_sockfd, send_buf, sizeof(send_buf), 0);
        if (ret < 0)
        {
            perror("send data error");
            exit(-1);
        }
    }
    // 关闭套接字
    close(new_sockfd);
    // 退出线程
    pthread_exit(0);
}

int main()
{
    // 监听套接字,新建套接字
    int sockfd, new_sockfd;
    // 服务端和客户端地址结构
    struct sockaddr_in serv, client;
    int opt = SO_REUSEADDR;
    int ret;
    socklen_t client_len;

    // 创建一个TCP套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        // 创建失败
        perror("error in creating socketn");
        exit(-1);
    }

    /**
     * 设置套接字选项,以允许重新绑定(rebinding)已经在使用的地址和端口
     * 允许在关闭套接字之后立即重新绑定使用相同地址和端口的套接字。
     * 这对于服务器程序在重启后快速重新监听之前使用的地址和端口非常有用。
     */
    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));

    // 设置serv的值
    // 设置地址族为IPv4,即使用IPv4地址。
    serv.sin_family = AF_INET;
    /**
     * 将IP地址设置为INADDR_ANY,
     * 表示服务器可以接受来自任意网络接口的连接请求。
     * 这样服务器将监听所有可用的网络接口。
     */
    serv.sin_addr.s_addr = INADDR_ANY;
    // htons()函数用于将主机字节序的端口号转换为网络字节序的端口号。
    serv.sin_port = htons(PORT);

    // 将套接字绑定到serv
    ret = bind(sockfd, (struct sockaddr *)&serv, sizeof(struct sockaddr));
    if (ret < 0)
    {
        perror("error in bindingn");
        exit(-1);
    }
    // 开始监听客户端的连接请求
    // 开始监听客户端的连接请求
    if (-1 == listen(sockfd, 5))
    {
        perror("listen errorn");
        exit(-1);
    } // 使用5个客户端的连接队列监听传入的连接
    while (1)
    {
        // 接收客户端请求,新建一个套接字
        client_len = sizeof(struct sockaddr);
        new_sockfd = accept(sockfd, (struct sockaddr *)&client, &client_len);
        if (new_sockfd < 0)
        {
            // 创建失败
            perror("recv error");
            close(sockfd);
            exit(-1);
        }
        // 要传递的参数
        struct thread *args = (struct thread *)malloc(sizeof(struct thread));
        args->sockfd = new_sockfd;
        args->src = strdup(inet_ntoa(client.sin_addr));
        // 创建线程
        pthread_t thread;
        ret = pthread_create(&thread, NULL, communicate, (void *)args);
        if (0 != ret)
        {
            // 创建失败
            perror("create thread errorn");
            exit(1);
        }
    }
    return 0;
}

运行结果

image-20231025195557149

多线程并发编程注意事项

线程安全

条件竞争指的是当多个线程并发执行时,它们访问和修改共享资源的顺序或时机不确定,从而导致程序的行为出现不可预测的结果。条件竞争通常发生在没有适当同步机制保护共享资源的情况下,多个线程相互竞争访问共享资源,导致数据的读写顺序产生冲突或不一致。条件竞争可能导致数据损坏、逻辑错误、崩溃等问题。线程安全的代码可以同时被多个线程调用,而不会产生竞争条件或导致不一致的结果。

同步与互斥

合理使用同步与互斥机制来解决条件竞争,保证线程安全。确保对共享数据的访问具有正确的同步与互斥机制,例如使用互斥锁(mutex)、条件变量(condition variable)、原子操作(Atomic op)或者信号量(Semaphore)等,以避免条件竞争和并发访问的问题。

死锁避免

两个或多个线程相互等待对方释放资源而无法继续执行的情况。要避免死锁,需要仔细设计和管理锁的获取和释放顺序,避免循环等待,以及使用超时机制和适当的死锁检测和恢复策略。

常见IO模型

所有的IO都分为两个阶段,数据准备阶段(内核等待IO设备的数据)和数据拷贝阶段(内核将IO数据拷贝到用户空间)。

同步IO

阻塞IO

阻塞IO是最常见的IO模型,以socket.recv()为例,应用程序发起一个系统调用,这个时候用户程序会一直阻塞下去,直到内核把数据拷贝到用户空间。所有这种模型在数据准备阶段以及数据拷贝阶段都是被阻塞的,因此CPU利用率较为低,不适用高并发。

image-20231025203034057

非阻塞IO

应用程序可以发起IO操作,但它可以立即返回而不需要等待IO操作完成。如果IO操作没有完成,应用程序可以继续执行其他任务,而不会被阻塞。通过轮询或多次非阻塞IO调用,应用程序可以检查IO操作是否完成,当数据拷贝完成后,应用程序可以继续处理数据,从而完成整个IO过程。

这种模型与阻塞IO模型对比,它在等待内核的过程中,没有阻塞进程,可以做其他的事情。但是需要轮询,比较消耗CPU资源。

image-20231025203749509

IO复用

非阻塞的IO模型,需要不断轮询内核数据是否准备好,当并发量巨大时,轮询消耗的CPU资源时巨大的。就比如一位只允许外带的餐馆的服务员,当客流量少的时候,几个顾客隔段时间就来问我们的菜什么时候上,勉强还能应付,但是当客流量多起来的时候,全部隔段时间就来问我们的菜什么时候上,这样服务员肯定会吃不消了。为了解决这个问题,可以让一群客人派一个人来询问,当菜做好了,由这个人来通知。这就是IO复用的原理。

IO复用不需要所有进程轮询来发起,而是这些进程将这个工作交给select,由它来完成轮询,等待其代理的一个套接字集合可读状态。相比较之前两个模型,这个模型CPU利用率较高,适合并发。

image-20231025205011661

信号驱动IO

进程发起一个IO请求,向内核注册一个信号处理程序,用户进程不会被阻塞。当内核将数据报准备好后,操作系统会通知应用程序,进程调用相应的回调函数来处理事件。这种模型采用回调机制,等待数据阶段不会阻塞,适用于高并发应用程序。

image-20231025205547183

异步IO

经过三轮优化后,信号驱动IO模型在数据等待阶段已经实现了非阻塞。然而,在数据复制节点仍然存在阻塞的情况,这意味着在数据复制的过程中,应用程序仍然需要等待复制操作完成。

为了进一步优化,可以将第二个阶段(数据复制节点)进一步优化为异步。这意味着应用程序发起数据复制操作后,立即返回,而不需要等待复制过程完成。当复制操作完成时,操作系统将通知应用程序,并可以在合适的时间点处理已完成的复制结果。通过将整个IO操作的等待和复制阶段都优化为异步操作,可以实现真正的异步IO。这样,应用程序可以高效地处理多个IO操作,而不需要阻塞等待或消耗额外的线程资源。异步IO模型常用于高性能的网络编程和并发服务器等场景,以提高系统的吞吐量和响应性能。

应用程序发起IO操作后立即返回,而不需要等待IO操作完成。当IO操作完成时,操作系统会通知应用程序,并且应用程序可以在合适的时间点处理已完成的IO结果。异步IO可以通过回调函数、事件循环或Promises等机制来处理IO操作的结果。

image-20231025210335222

以node为例实现一个异步读取文件内容的操作,使用了promise机制。

const fs = require('fs');

// 异步读取文件内容函数,返回一个 Promise 对象
function readFileAsync(filePath) {
  return new Promise((resolve, reject) => {
    fs.readFile(filePath, 'utf8', (err, data) => {
      if (err) {
        reject(err); // 处理错误
      } else {
        resolve(data); // 传递读取到的文件内容
      }
    });
  });
}

// 执行其他任务
console.log('开始执行其他任务');

// 调用异步读取函数
readFileAsync('file.txt')
  .then(data => {
    console.log('文件内容:', data);
    // 在这里可以继续处理文件内容,执行其他操作
  })
  .catch(err => {
    console.error('读取文件发生错误:', err);
  });

// 继续执行其他任务
console.log('继续执行其他任务');

IO多路复用

select, poll, 和 epoll 是在 Linux 系统中用于实现 I/O 多路复用的机制。它们可以用于同时监视多个文件描述符的状态,并在其中至少有一个文件描述符准备就绪时进行相应的处理。

select

select 是最古老的多路复用机制,它使用三个位图来表示文件描述符的状态,然后通过调用系统调用来等待其中之一的状态发生变化。它具有跨平台的特性,但有一些限制,例如最大文件描述符数量的限制和线性扫描带来的性能问题。

  • nfds:监视的最大文件描述符值加 1。
  • readfds:用于监视可读事件的文件描述符集合。
  • writefds:用于监视可写事件的文件描述符集合。
  • exceptfds:用于监视异常事件的文件描述符集合。
  • timeout:超时时间,指定 select 在等待事件发生前最多等待的时间。
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

poll

poll是相对于select更加现代的多路复用机制。与select不同的是,poll使用一个结构数组来表示文件描述符的状态,不受文件描述符数量的限制。它也是跨平台的,但在处理大量文件描述符时性能可能较差,因为它和select一样,使用poll时需要轮询。

  • fds:指向 pollfd 结构体数组的指针,每个结构体描述一个文件描述符的监视信息。
  • nfds:结构体数组的大小,即监视的文件描述符数量。
  • timeout:超时时间,指定 poll 在等待事件发生前最多等待的时间。
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
/*pollfd结构——描述符、监听的事件、发生的事件*/
struct pollfd
 {
    int fd;         /* File descriptor to poll.  */
    short int events;       /* Types of events poller cares about.  */
    short int revents;      /* Types of events that actually occurred.  */
};

// 事件描述
#define POLLIN      0x001       /* There is data to read.  */
#define POLLPRI     0x002       /* There is urgent data to read.  */
#define POLLOUT     0x004       /* Writing now will not block.  */

epoll

epoll是Linux特有的多路复用机制,它使用一个事件驱动的方式来处理文件描述符的状态变化。epoll基于内核事件通知机制,通过注册事件来监听文件描述符,当有事件发生时,内核会通知应用程序。epoll在大量文件描述符和高并发环境下具有较好的性能特点,因为它使用了红黑树和回调机制来避免轮询。

  1. epoll_create:创建一个 epoll 实例并返回一个文件描述符,用于后续的 epoll 操作。size 参数表示期望监视的文件描述符数量,但该值在 epoll 中并没有实际意义。
  2. epoll_ctl:用于向 epoll 实例中添加、修改或删除要监视的文件描述符和事件。epfdepoll 的文件描述符,op 是操作类型,可以是 EPOLL_CTL_ADDEPOLL_CTL_MODEPOLL_CTL_DELfd 是要操作的文件描述符,event 是一个指向 epoll_event 结构体的指针,描述要监视的事件类型。
  3. epoll_wait:等待事件发生并获取就绪的文件描述符。epfdepoll 的文件描述符,events 是一个指向 epoll_event 结构体数组的指针,用于存储就绪的文件描述符和事件信息,maxevents 表示 events 数组的最大容量,timeout 是等待事件的超时时间,单位为毫秒。
#include <sys/epoll.h>
/* 创建一个 epoll 实例。返回一个新实例的文件描述符。
"size" 参数是一个提示,指定与新实例关联的文件描述符的数量。
epoll_create() 返回的文件描述符应该使用 close() 函数关闭。 */
int epoll_create(int size);
/* 和 epoll_create 相同,但增加了 FLAGS 参数。未使用的 SIZE
参数被删除。所以,使用 epoll_create1 创建的 epoll 实例可以根据需要动态地管理大量的文件描述符,
不再受到之前 size 参数的限制。 */
int epoll_create1(int flag);
/*
epfd:epoll 实例的文件描述符,通过 epoll_create 或 epoll_create1 获取。
op:操作类型,可以是以下值之一:
    EPOLL_CTL_ADD:将文件描述符 fd 添加到 epoll 实例 epfd 中进行监听。
    EPOLL_CTL_MOD:修改文件描述符 fd 在 epoll 实例 epfd 中的监听事件。
    EPOLL_CTL_DEL:从 epoll 实例 epfd 中删除文件描述符 fd 的监听。
fd:要添加、修改或删除的文件描述符。
event:指向一个 struct epoll_event 结构体的指针,用于指定监听事件的类型和其他属性。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
epfd:epoll 实例的文件描述符,通过 epoll_create 或 epoll_create1 获取。
events:指向存储事件的结构体数组的指针。
maxevents:events 数组的长度,表示最多能够存储的事件数量。
timeout:超时时间(以毫秒为单位)。指定为 -1 表示无限阻塞,直到有事件发生;指定为 0 表示非阻塞,立即返回;指定为一个正整数表示等待指定的毫秒数后超时返回。
*/
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// epoll_event结构
struct epoll_event
{
  uint32_t events;  /* Epoll events */
  epoll_data_t data;    /* User data variable */
} __EPOLL_PACKED;

// epoll_data结构
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

select实现C/S并发模型

使用select函数来监视多个套接字的状态,并根据状态进行相应的处理。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/time.h>

#define PORT 8888
#define MAX_BUFFER_SIZE 2048

int main()
{
    // 监听套接字,新建套接字,最大套接字
    int sockfd, new_sockfd, maxfd;
    // 主套接字集合、拷贝套接字
    fd_set sockset, tmp_sockset;
    // 服务端和客户端地址结构
    struct sockaddr_in serv, client;
    int opt = SO_REUSEADDR;
    int ret;
    socklen_t client_len;
    // 接收缓冲区与发送缓冲区
    char recv_buf[MAX_BUFFER_SIZE], send_buf[MAX_BUFFER_SIZE];

    // 创建一个TCP套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        // 创建失败
        perror("error in creating socketn");
        exit(-1);
    }
    // 初始化最大套接字
    maxfd = sockfd;
    // 初始化套接字集合
    FD_ZERO(&sockset);
    // 将sockfd放入集合
    FD_SET(sockfd, &sockset);

    /**
     * 设置套接字选项,以允许重新绑定(rebinding)已经在使用的地址和端口
     * 允许在关闭套接字之后立即重新绑定使用相同地址和端口的套接字。
     * 这对于服务器程序在重启后快速重新监听之前使用的地址和端口非常有用。
     */
    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));

    // 设置serv的值
    // 设置地址族为IPv4,即使用IPv4地址。
    serv.sin_family = AF_INET;
    /**
     * 将IP地址设置为INADDR_ANY,
     * 表示服务器可以接受来自任意网络接口的连接请求。
     * 这样服务器将监听所有可用的网络接口。
     */
    serv.sin_addr.s_addr = INADDR_ANY;
    // htons()函数用于将主机字节序的端口号转换为网络字节序的端口号。
    serv.sin_port = htons(PORT);

    // 将套接字绑定到serv
    ret = bind(sockfd, (struct sockaddr *)&serv, sizeof(struct sockaddr));
    if (ret < 0)
    {
        perror("error in bindingn");
        exit(-1);
    }
    // 开始监听客户端的连接请求
    if (-1 == listen(sockfd, 5))
    {
        perror("listen errorn");
        exit(-1);
    } // 使用5个客户端的连接队列监听传入的连接
    while (1)
    {
        memset(recv_buf, 0, MAX_BUFFER_SIZE);
        memset(send_buf, 0, MAX_BUFFER_SIZE);
          // 使用拷贝下来的临时套接字集合
        tmp_sockset = sockset;
        /**
         * 监视一组文件描述符的状态变化。它接受五个参数,
         * 其中第一个参数是待监视的最大文件描述符加一,
         * 第二个参数是指向可读文件描述符集合的指针,
         * 第三个参数是指向可写文件描述符集合的指针,
         * 第四个参数是指向异常文件描述符集合的指针,
         * 第五个参数是超时时间。
         */
        if (-1 == select(maxfd + 1, &tmp_sockset, NULL, NULL, NULL))
        {
            perror("select() error!n");
            exit(1);
        }
        // 进行轮询
        for (int i = sockfd; i <= maxfd; i++)
        {
            /**
             * 它接受一个文件描述符和一个指向文件描述符集合的指针作为参数。
             * 如果给定的文件描述符在集合中被设置(即对应的位为 1),
             * FD_ISSET 宏将返回一个非零值(通常为 1),表示文件描述符处于就绪状态。
             * 如果文件描述符未被设置(对应的位为 0),宏将返回零,表示文件描述符不处于就绪状态。
             */
            if (FD_ISSET(i, &tmp_sockset))
            {
                // 如果是来自于监听套接字
                if (i == sockfd)
                {
                    // 接收客户端请求,新建一个套接字
                    client_len = sizeof(struct sockaddr);
                    new_sockfd = accept(sockfd, (struct sockaddr *)&client, &client_len);
                    if (-1 == new_sockfd)
                    {
                        perror("accept() error:");
                        exit(1);
                    }
                    else
                    {
                        FD_SET(new_sockfd, &sockset);
                        if (new_sockfd > maxfd)
                        {
                            maxfd = new_sockfd;
                        }
                    }
                }
                else
                {
                    // 否则从套接字读取消息
                    memset(recv_buf, 0, MAX_BUFFER_SIZE);
                    ret = recv(i, recv_buf, MAX_BUFFER_SIZE, 0);
                    recv_buf[ret] = '\0';
                    if (0 > ret)
                    {
                        perror("recv() error:");
                        exit(1);
                    }
                    printf("server>>>get msg from %s: %s", inet_ntoa(client.sin_addr), recv_buf);
                    // 发送消息
                    strcat(send_buf, "client>>>ack!!!n");
                    ret = send(i, send_buf, sizeof(send_buf), 0);
                    if (ret < 0)
                    {
                        perror("send data error");
                        exit(-1);
                    }
                }
            }
        }
    }
    close(sockfd);
    return 0;
}

运行结果如下

image-20231025213624746

poll实现C/S并发模型

使用了poll函数来实现多路复用。poll函数会一直阻塞,直到有文件描述符准备好进行 I/O 操作或发生错误。程序中将监听套接字和客户端套接字都加入了poll的监视对象中,通过检查revents字段来确定是监听套接字还是客户端套接字准备好进行操作。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/time.h>
#include <poll.h>

#define PORT 8888
#define MAX_BUFFER_SIZE 2048
#define MAX_CLIENTS 10

int main()
{
    int sockfd, new_sockfd, maxfd;
    struct sockaddr_in serv, client;
    int opt = SO_REUSEADDR;
    int ret;
    socklen_t client_len;
    char recv_buf[MAX_BUFFER_SIZE], send_buf[MAX_BUFFER_SIZE];

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("error in creating socketn");
        exit(-1);
    }

    maxfd = sockfd;
    struct pollfd fds[MAX_CLIENTS + 1];
    memset(fds, 0, sizeof(fds));

    fds[0].fd = sockfd;
    // There is data to read
    fds[0].events = POLLIN;

    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));
    // 设置地址
    serv.sin_family = AF_INET;
    serv.sin_addr.s_addr = INADDR_ANY;
    serv.sin_port = htons(PORT);

    ret = bind(sockfd, (struct sockaddr *)&serv, sizeof(struct sockaddr));
    if (ret < 0)
    {
        perror("error in bindingn");
        exit(-1);
    }

    if (-1 == listen(sockfd, 5))
    {
        perror("listen errorn");
        exit(-1);
    }

    while (1)
    {
        ret = poll(fds, maxfd + 1, -1);
        if (ret < 0)
        {
            perror("poll() error!n");
            exit(1);
        }
        // 监听套接字可读
        if (fds[0].revents & POLLIN)
        {
            // 新建套接字
            client_len = sizeof(struct sockaddr);
            new_sockfd = accept(sockfd, (struct sockaddr *)&client, &client_len);
            if (-1 == new_sockfd)
            {
                perror("accept() error:");
                exit(1);
            }
            else
            {
                // 将套接字放入集合
                fds[new_sockfd].fd = new_sockfd;
                // 事件设置为可读
                fds[new_sockfd].events = POLLIN;
                if (new_sockfd > maxfd)
                {
                    maxfd = new_sockfd;
                }
            }
        }
        // 轮询其他套接字
        for (int i = sockfd + 1; i <= maxfd; i++)
        {
            // 可读
            if (fds[i].revents & POLLIN)
            {
                // 取出消息
                memset(recv_buf, 0, MAX_BUFFER_SIZE);
                ret = recv(i, recv_buf, MAX_BUFFER_SIZE, 0);
                recv_buf[ret] = '\0';
                if (0 > ret)
                {
                    perror("recv() error:");
                    exit(1);
                }
                printf("server>>>get msg from %s: %s", inet_ntoa(client.sin_addr), recv_buf);
               // 发送消息
                strcat(send_buf, "client>>>ack!!!n");
                ret = send(i, send_buf, strlen(send_buf), 0);
                if (ret < 0)
                {
                    perror("send data error");
                    exit(-1);
                }
            }
        }
    }
    close(sockfd);
    return 0;
}

epoll实现C/S并发模型

使用了epoll函数来实现多路复用。epoll函数会一直阻塞,直到有文件描述符准备好进行 I/O 操作或发生错误。程序中将监听套接字和客户端套接字都加入了epoll的监视对象中,通过检查events[i].data.fdevents[i].events字段来确定是监听套接字还是客户端套接字准备好进行操作。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/time.h>

#define PORT 8888
#define MAX_BUFFER_SIZE 2048
#define MAX_EVENTS 10

int main()
{
    int sockfd, new_sockfd;
    struct sockaddr_in serv, client;
    int opt = SO_REUSEADDR;
    int ret;
    socklen_t client_len;
    char recv_buf[MAX_BUFFER_SIZE], send_buf[MAX_BUFFER_SIZE];

    // 创建 TCP 套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("error in creating socketn");
        exit(-1);
    }

    struct epoll_event events[MAX_EVENTS];
    memset(events, 0, sizeof(events));

    // 创建 epoll 实例
    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1)
    {
        perror("error in creating epolln");
        exit(-1);
    }
    // 设置描述符和监听事件
    events[0].data.fd = sockfd;
    events[0].events = EPOLLIN;

    // 设置 SO_REUSEADDR 选项
    setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(int));

    // 绑定套接字到指定的地址和端口
    serv.sin_family = AF_INET;
    serv.sin_addr.s_addr = INADDR_ANY;
    serv.sin_port = htons(PORT);

    ret = bind(sockfd, (struct sockaddr *)&serv, sizeof(struct sockaddr));
    if (ret < 0)
    {
        perror("error in bindingn");
        exit(-1);
    }

    // 开始监听连接请求
    if (-1 == listen(sockfd, 5))
    {
        perror("listen errorn");
        exit(-1);
    }

    // 将 sockfd 添加到 epoll 实例中进行监听
    ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &events[0]);
    if (ret == -1)
    {
        perror("error in epoll_ctln");
        exit(-1);
    }

    while (1)
    {
        // 等待事件发生
        int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (num_events == -1)
        {
            perror("error in epoll_waitn");
            exit(-1);
        }

        // 处理事件
        for (int i = 0; i < num_events; i++)
        {
            if (events[i].data.fd == sockfd)
            {
                // 有新的连接请求到达
                client_len = sizeof(struct sockaddr);
                new_sockfd = accept(sockfd, (struct sockaddr *)&client, &client_len);
                if (-1 == new_sockfd)
                {
                    perror("accept() error:");
                    exit(1);
                }
                else
                {
                    // 将新的套接字添加到 epoll 实例中进行监听
                    events[i].data.fd = new_sockfd;
                    events[i].events = EPOLLIN;
                    ret = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_sockfd, &events[i]);
                    if (ret == -1)
                    {
                        perror("error in epoll_ctln");
                        exit(-1);
                    }
                }
            }
            else
            {
                // 有数据到达
                memset(recv_buf, 0, MAX_BUFFER_SIZE);
                ret = recv(events[i].data.fd, recv_buf, MAX_BUFFER_SIZE, 0);
                recv_buf[ret] = '\0';
                if (0 > ret)
                {
                    perror("recv() error:");
                    exit(1);
                }
                printf("server>>>get msg from %s: %s", inet_ntoa(client.sin_addr), recv_buf);

                strcat(send_buf, "client>>>ack!!!n");
                ret = send(events[i].data.fd, send_buf, strlen(send_buf), 0);
                if (ret < 0)
                {
                    perror("send data error");
                    exit(-1);
                }
            }
        }
    }

    close(sockfd);
    return 0;
}
------本页内容已结束,喜欢请分享------

文章作者
能不能吃完饭再说
隐私政策
PrivacyPolicy
用户协议
UseGenerator
许可协议
NC-SA 4.0


© 版权声明
THE END
喜欢就支持一下吧
点赞23赞赏 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片