并发编程来说的并行和实现方法1.11.2并行举例

这也是一个学习笔记,加上很多我自己的理解。对于并发编程,做开发的同学一定很熟悉。逻辑控制流在时间上是重叠的,即并发。并发和并行是两个概念。并发多指在一定的时间间隔内同时运行。一般采用时间片和分时操作来完成不同进程的上下文切换。因为CPU运行速度非常快,在我们看来,整个程序是并行运行的,比如在单核cpu机器上,也可以一边听歌一边写文档。并行性是指多个线程同时在不同的内核上同时执行,而不管时间间隔如何。有些书也没有这么准确地定义这两个概念,就知道了,这样的词就不用扣了。本文主要介绍并行,应用级并行。

– 并行的必要性和实现方法1.1 并行执行的必要性1.2 并行实现方法

除了我们常说的并行实现的多线程多进程之外,还有IO复用。我个人认为IO复用只能算是整体并行,而不是微观上真正的并行,因为它是一个共享同一个地址空间的进程。只需将逻辑流程转化为状态机,根据不同的状态进行不同的操作,这在高性能服务器开发领域使用较多。

两个平行的例子

以 Web 服务器为例,更好地说明应用程序的并行性。

2.1 基于进程的并发编程

网络编程就是开发一个网络服务器来服务多个客户端。基于进程的网络编程方法非常简单。每次收到连接时,都可以为客户端服务器打开一个新进程。当终端关闭时,进程被销毁。这种方式的网络服务器一般都比较稳定,但是支持的并发量很少。另外,由于进程的创建和销毁是比较繁重的操作,这样开发的网络服务器的性能一般都不够好。对于进程开销高的问题,也可以通过类似于线程池的方法,为即将到来的客户端服务器提前创建多个进程,比如Apache的prefork方式;而且如果内部连接的数量肯定不会很大。,可以使用此模式。

图片

该模式需要注意的几点是:

在fork进程之后,新进程复制旧进程的数据,包括地址空间、打开的文件描述符、程序计数器以及程序执行的代码。所以父子进程需要关闭它们不需要的套接字。对于监听的父进程,子进程创建后不需要关心连接的socket,直接关闭即可;对于创建的子进程,不需要关心服务socket,所以需要关闭监控socket。子进程执行后,需要回收子进程,通过注册事件处理完成父进程对子进程的资源回收:

signal(SIGCHLD, sigchld_handler);  

子进程执行完毕后,会向父进程发送 SIGCHLD 信号。默认是忽略这个信号而不处理它。如果不回收,子进程就会变成僵尸进程,破坏系统资源,过多会导致系统挂起。


#include "lib/common.h"
#define MAX_LINE 4096
char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}
void child_run(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;
    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);
        if (result == 0) {
            break;
        } else if (result == -1) {
            perror("read");
            break;
        }
        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }
        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}
void sigchld_handler(int sig) {
   // 回收子进程资源
    while (waitpid(-1, 0, WNOHANG) > 0);
    return;
}
int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    signal(SIGCHLD, sigchld_handler);
    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
            exit(1);
        }
        if (fork() == 0) {
            // 子进程关闭服务套接字
            close(listener_fd);
            child_run(fd);
            exit(0);
        } else {
             // 父进程 关闭连接的客户端套接字,因为已经复制给子进程处理了
            close(fd);
        }
    }
    return 0;
}

解释,代码来自极客时代《网络编程实战》

2.2 基于多线程的并发模型

进程的创建比较耗时,而且不同进程之间的通信要复杂得多,所以很多服务器都采用多线程的方式。同一个进程的多个线程共享同一个地址空间,这使得它们之间的通信更加方便。线程由内核自动调度。不同的线程有不同的上下文。当不同线程交错运行时,需要进行在线切换。每个线程都有自己的线程上下文,包括线程 ID、堆栈、堆栈指针、程序计数器、公共目标寄存器和条件代码。多个进程之间也会发生切换,但是上下文比较重,不如线程切换快,而且线程在进程中没有父子进程,所有线程都是平等的。POSIX线程标准接口,

#include 
typedef void *(func)(void *);
// 线程的创建
int pthread_create(pthread t *tid, pthread attr t *attr, func *f, void *arg);
// 获取自身的线程ID
pthread t pthread_self(void);
// 终止线程,thread_return为线程返回值
int pthread_exit(void *thread_return);
// 终止pid的线程
int pthread_cancel(pthread t tid);
// 调用后会阻塞等待,直到线程tid终止,回收线程资源
int pthread_join(pthread t tid, void **thread return);
// 分离线程,默认线程是joinable状态,调用后为detached
int pthread_detach(pthread t tid);
pthread_once_t once_control= PTHREAD_ONCE_INIT;
int  pthread_once(pthread_once_t * once_control,void (*init_routine)(void));

阐明:

创建线程后,默认情况下它是可连接的。可加入线程意味着该线程可以被其他线程回收和杀死。在其他线程回收之前,它的堆栈等内存资源不会被释放。pthread_detach 一个线程后应用编程接口和套接字,线程被分离,不能被其他线程回收和杀死。它的内存资源在终止时由系统自动释放。pthread_once 这个函数我以前很少用。该函数用于多线程初始化。当多个线程在执行时,如果使用同一个once_control调用pthread_once,init_routine只会被调用一次。在初始化多线程时很有用。

与多进程相比应用编程接口和套接字,多线程共享数据更方便。多线程的共享部分包括整个进程的虚拟存储区域,由只读文本、读写数据、堆、共享代码和数据区域以及线程之间组成。所有打开的文件集合也被共享。所有线程上下文数据显然不能共享,比如不同线程的堆栈数据。这种分享带来了方便和麻烦,即如果不控制分享,很容易造成覆盖和混乱。因为共享,多个线程可以同时访问。被另一个线程覆盖。

全局变量和局部静态变量是共享的,就像函数中的局部变量一样,如果它们不通过指针传递给其他线程,它们就不是共享的。不同的线程在不同的函数中有局部变量。

一个简单的多线程服务器如下:

图片[1]-并发编程来说的并行和实现方法1.11.2并行举例-老王博客


#include "lib/common.h"
extern void loop_echo(int);
void thread_run(void *arg) {
    pthread_detach(pthread_self());
    int fd = (int) arg;
    loop_echo(fd);
}
int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    pthread_t tid;
    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            pthread_create(&tid, NULL, &thread_run, (void *) fd);
        }
    }
    return 0;
}
char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}
void loop_echo(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;
    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);
        //断开连接或者出错
        if (result == 0) {
            break;
        } else if (result == -1) {
            error(1, errno, "read error");
            break;
        }
        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }
        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

这个简单的服务器没什么好说的。实际上,它使用结合队列的线程池。整体架构如下:

图片

这种方式也比较简单,就是客户端连接后,主线程将socket描述符加入socket缓冲队列,工作线程从socket缓冲队列中取出socket,然后提供服务。这是典型的生产者-消费者模式。由于主线程和工作线程之间有一个共享队列,所以需要加锁;同时,由于队列的大小有限,需要通过信号量来控制,比如等待队列为空。,主线程可以放,当队列中有数据时,工作线程可以工作。

2.3 基于IO复用的并发编程

前面的程序我们只用多线程或者多进程的方式来实现,因为有多个socket需要同时服务,而这些socket操作都是阻塞的,没有办法同时处理. 如果能把这些socket放在一起统一监控,就知道任何一个socket都有请求,根据不同的请求时间回调不同的函数进行处理。这就是IO复用,目前高性能的高Server一般都是通过IO复用来实现的。

如果是非阻塞IO,我们可以通过轮询的方式遍历socket组合,找到需要IO处理的socket,进行处理,但是这种方式有个缺点,如果socket里面的数据很多,就遍历一次会很费时间。在遍历的过程中,如果其他客户端发起请求,他们将无法响应,而为了快速响应请求,CPU必须在很短的时间间隔内遍历,消耗了大量无用的cpu资源。

现在我们可以将socket集合交给操作系统,系统可以判断socket上是否有IO事件,如果发生则返回或超时;比如select、poll、epoll等IO分发技术都可以做到这一点。多IO复用。选择函数的说明如下:

#include 
#include 
#include 
int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
//从集合中删除指定的fd描述符
void FD_CLR(int fd, fd_set *set); 
//判断指定的fd描述符是否存在于集合之中
int  FD_ISSET(int fd, fd_set *set); 
//将指定的fd添加到集合之中
void FD_SET(int fd, fd_set *set);
//初始化集合
void FD_ZERO(fd_set *set); 

Select 对一个名为 fd_set 的集合进行操作。逻辑上,我们可以把集合看成是一个n位掩码,哪个位设置为1,表示哪个位是描述符集合中的一个元素。使用 select 函数,内核需要暂停进程。当发生一个或多个io事件时,会返回该事件的IO集合,然后我们使用FD_ISSET判断fd是否在集合中。

参数说明:nfds标识最大描述符+1;readfds 允许内核检测该集合上的可读事件;writefds 允许内核检测可写集上的事件;exceptfds 允许内核检测异常的套接字集。


int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: select01 ");
    }
    int socket_fd = tcp_client(argv[1], SERV_PORT);
    char recv_line[MAXLINE], send_line[MAXLINE];
    int n;
    fd_set readmask;
    fd_set allreads;
    FD_ZERO(&allreads);
    FD_SET(0, &allreads);
    FD_SET(socket_fd, &allreads);
    for (;;) {
        readmask = allreads;
        int rc = select(socket_fd + 1, &readmask, NULL, NULL, NULL);
        if (rc <= 0) {
            error(1, errno, "select failed");
        }
        if (FD_ISSET(socket_fd, &readmask)) {
            n = read(socket_fd, recv_line, MAXLINE);
            if (n < 0) {
                error(1, errno, "read error");
            } else if (n == 0) {
                error(1, 0, "server terminated \n");
            }
            recv_line[n] = 0;
            fputs(recv_line, stdout);
            fputs("\n", stdout);
        }
        if (FD_ISSET(STDIN_FILENO, &readmask)) {
            if (fgets(send_line, MAXLINE, stdin) != NULL) {
                int i = strlen(send_line);
                if (send_line[i - 1] == '\n') {
                    send_line[i - 1] = 0;
                }
                printf("now sending %s\n", send_line);
                size_t rt = write(socket_fd, send_line, strlen(send_line));
                if (rt < 0) {
                    error(1, errno, "write failed ");
                }
                printf("send bytes: %zu \n", rt);
            }
        }
    }
}

最初,fd_set被FD_ZERO初始化,结果如下:

图片

然后添加标准输入socket,如下图:

图片

特别值得注意的是:

       readmask = allreads;

因为每次select返回时,都会返回一组就绪的socket,而且还修改了select中注册的函数,所以每次都需要重新赋值。

select函数虽然可以实现多路复用,但也有很多缺点:

暂时写到这里。下次可以写关于select的改进。

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论