1. 事件处理框架

  libevent 是一个C语言编写,轻量级开源高性能事件框架。事件驱动,支持多种IO多路复用(如epoll),支持注册优先级等

1
2
3
4
5
6
7
8
9
10
11
// 头文件
#include <event2/event.h>
// 创建一个事件处理框架
struct event_base *event_base_new(void);
// 销毁一个事件处理框架
void event_base_free(struct event_base *base);

// 查看底层支持的IO转接模型
const char** event_get_supported_method(void);
// 查看当前实际处理框架使用的IO转接模型
const char* event_base_get_method(const struct event_base *base);

代码示例:创建事件处理框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <event2/event.h>

int main()
{
// 1.创建事件处理框架
struct event_base* base = event_base_new();

// 2.查看支持什么IO转接函数
const char** methods = event_get_supported_methods();
printf("支持的IO转接模:\n");
for(int i=0; methods[i]!=NULL; ++i)
{
printf(" %s\n", methods[i]);
}
// 3.查看当前使用的IO转接函数
printf("当前使用的IO转接模型: %s\n", event_base_get_method(base));
// 4.释放资源
event_base_free(base);
return 0;
}

2. 事件循环

  • 开始事件循环
1
2
3
4
5
6
7
8
9
// 头文件
#include <event2/event.h>

// 启动事件处理框架中的事件循环
int event_base_dispatch(struct event_base* base);
// 举例:检测读事件,检测是否有数据发送,等待到事件触发
// 如果有读事件,默认情况下只处理一次,事件循环停止,只能接收一次数据
// 如果要持续接收数据,需要额外设置
// 这个函数被调用,内部会进行事件检测,阻塞在这一行
  • 终止事件循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 头文件
#include <event2/event.h>

// 表示一个时间段
struct timeval {
long tv_sec; // 秒
long tv_usec; // 微秒
};

// 事件循环不会马上终止,在tv指定时长后终止
int event_base_loopexit(struct event_base *base, const struct timeval *tv);

// 立即终止事件循环
int event_base_loopbreak(struct event_base *base);

3. 事件

  • 设置事件处理函数
  • 创建需要检测的文件描述符对应的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 头文件
#include <event2/event.h>

#define EV_TIMEOUT 0x01 // 定时器超时
#define EV_READ 0x02 // 读,检测缓冲区是否有数据
#define EV_WRITE 0x04 // 写,检测是否可写
#define EV_SIGNAL 0x08 // 信号
#define EV_PERSIST 0x10 // 设置事件被重复检测
#define EV_ET 0x20 // 边沿模式

// 事件的处理函数,被libevent框架调用
typedef void(*event_callback_fn)(evutil_socket_t fd, short what, void *arg);
/* 参数
fd: event_new() 的第二个参数
what: 记录了文件描述符触发的事件
arg: event_new() 的最后一个参数 */

// 创建一个需要检测的事件,struct event 就是创建出的事件
// evutil_socket_t == int
// event_new() 函数本质就是对一个文件描述符进行封装
struct event* event_new(struct event_base *base, evutil_socket_t fd, short what, event_callback_fn cb, void *arg);
/* 参数
base: 事件处理框架
fd: 文件描述符(如管道、套接字通信)
what: 要检测的描述符
EV_READ: 读事件
EV_WRITE: 写事件
EV_SIGNAL: 信号事件(Linux)
EV_ET: 设置边沿模式
cb: 函数指针对应的一个回调函数,检测触发时,这个函数被调用
arg: 实参传给cb */

// 释放事件资源
void event_free(struct event *event);
  • 事件的添加和删除
1
2
3
4
5
6
7
8
9
struct timeval {
long tv_sec; // 秒
long tv_usec; // 微秒
};

// 添加事件到事件循环中(如果在tv时间段没有被触发,会强制调用一次)
int event_add(struct *ev, const struct timeval *tv);
// 将事件从事件处理函数删除
int event_del(struct event *ev);

代码示例:管道写端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <event2/event.h>

// 回调函数
void write_cb(evutil_socket_t fd, short what, void *arg)
{
// 事件触发的事件在what里
printf("当前触发的事件是EV_WRITE事件吗: %d\n", what && EV_WRITE);
// 处理动作就是写管道
char buf[1024];
static int num = 0;
sprintf(buf, "我在写数据,%d\n", num++);
int ret = write(fd, buf, strlen(buf)+1);
}
int main()
{
// 1.创建有名管道
int ret = mkfifo("testfifo", 0664);
// 2.打开管道的写端
int wfd = open("testfifo", O_WRONLY);
// 3.创建事件处理框架
struct event_base *base = event_base_new();
// 4.创建事件并添加到时间处理框架
struct event *ev = event_new(base, wfd, EV_WRITE|EV_PERSIST, write_cb, NULL); // EV_PERSIST 才是持续检测,否则只检测一次
event_add(ev, NULL);
// 5.启动事件循环
event_base_dispatch(base);
// 释放资源
event_free(ev);
event_base_free(base);
}

代码示例:管道读端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <event2/event.h>

// 回调函数
void read_cb(evutil_socket_t fd, short what, void *arg)
{
// 事件触发的事件在what里
printf("当前触发的事件是EV_READ事件吗: %d\n", what && EV_READ);
// 处理动作就是写管道
char buf[1024];
memset(buf, 0, sizeof(buf));
int len = read(fd, buf, sizeof(buf));
if(len==0)
{
printf("写端已经关闭\n");
}
else if(len>0)
{
printf("接收到的数据: %s", buf);
}
else
{
printf("失败");
}

}
int main()
{
// 1.打开管道的写端
int rfd = open("testfifo", O_RDONLY);
// 2.创建事件处理框架
struct event_base *base = event_base_new();
// 3.创建事件并添加到时间处理框架
struct event *ev = event_new(base, rfd, EV_READ|EV_PERSIST, read_cb, NULL); // EV_PERSIST 才是持续检测,否则只检测一次
event_add(ev, NULL);
// 4.启动事件循环
event_base_dispatch(base);
// 释放资源
event_free(ev);
event_base_free(base);
}

4. 带缓冲区的事件

使用场景:应用于网络套接字通信(bufferevent)

带缓冲区的事件结构:

  1. 封装了 socket 中的文件描述符,得到了struct bufferevent
  2. 该结构体对文件描述符封装后,使用者不能直接操作 socket 对应的两块读写缓冲区,而是先写入 bufferevent 的读写缓冲区,然后再由该框架写入 socket 中的读写缓冲区
  3. 读写缓冲区分别对应了两个读写事件
    • 读回调是 bufferevent 中读缓冲区的有数据时调用的
    • 写回调是数据写入到 socket 的写缓冲区后调用的
  • 创建带缓冲区的事件
1
2
3
4
5
6
7
struct bufferevent* bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, enum bufferevent_options options);
/* 参数:
base: 事件处理框架
fd: 通信的文件描述符
自己使用 socket() 函数创建的文件描述符
填 -1 自动创建
options: BEV_OPT_CLOSE_ON_FREE: 在销毁对象时自动释放封装的文件描述符 */
  • 销毁带缓冲区的事件
1
void bufferevent_free(struct bufferevent *bev);
  • 在 bufferevent 上启动连接
1
2
3
4
5
int bufferevent_socket_connect(struct bufferevent *bev, struct sockaddr *address, int addrlen);
/* 参数:
bev: 带缓冲区的事件,通过 bufferevent_socket_new() 得到
address: 在这里初始化服务器端的ip和端口信息
addlen: address 参数执行的内存大小 */
  • 数据发送和接收(发送到 bufferevent 的写缓冲区,从 bufferevent 的读缓冲区接收)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
/* 参数:
bufev: bufferevent_socket_new() 得到的返回值
data: 要发送的数据
size: 发送数据data的长度
返回值:
>0: 发送的字节数 -1: 失败 */
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
/* 参数:
bufev: bufferevent_socket_new() 得到的返回值
data: 存储数据的内存
size: data的大小
返回值:
>0: 接收的字节数 -1: 失败 */
  • 设置读写回调函数(读是bufferevent缓冲区,写是socket缓冲区)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void bufferevent_setcb(struct bufferevent *bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, bufferevent_event_cb eventcb, void *cbarg);
/* 参数:
bufev: bufferevent_socket_new() 得到的返回值
readcb: 设置读缓冲区对应的回调函数
writecb: 设置写缓冲区对应的回调函数
eventcb: 事件回调函数
cbarg: 回调函数的实参 */

// 读写缓冲区回调函数原型
typedef void (*bufferevent_data_cb)(struct bufferevent *bev, void *ctx);
/* bev: bufferevent_setcb() 第一个参数
ctx: bufferevent_setcb() 最后一个参数 */

// 读写缓冲区回调函数原型
typedef void (*bufferevent_event_cb)(struct bufferevent *bev, short events, void *ctx);
/* bev: bufferevent_setcb() 第一个参数
event: 实际检测到的事件
EV_EVENT_READING: 产生读异常
BEV_EVENT_WRITING: 写异常
BEV_EVENT_ERROR: 操作时发生错误
BEV_EVENT_TIMEOUT: 超时
BEV_EVENT_EOF: 另一端关闭连接
BEV_EVENT_CONNECTED: 客户端判断是否连接完成
ctx: bufferevent_socket_new() 最后一个参数 */
  • 禁用、启用缓冲区
1
2
3
4
5
6
7
8
9
// bufferevent 读缓冲区默认不可用,读事件不会被检测,读回调也不会被调用
// bufferevent 写缓冲区默认可用
void bufferevent_enable(struct bufferevent *bufev, short events);
void bufferevent_disable(struct bufferevent *bufev, short events);
/* event: 读EV_READ 2; 写EV_WRITE 4; */

// 获取当前缓冲区状态
short bufferevent_get_enabled(struct bufferevent *bufev);
/* 返回值2读可用,返回值4写可用,返回值6都可用 */

事件缓冲函数客户端示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
void read_cb(struct bufferevent *, void *);
void write_cb(struct bufferevent *, void *);
void events_cb(struct bufferevent *, short , void *);
void send_cb(evutil_socket_t , short , void* );
int main()
{
// 1.创建事件处理框架
struct event_base* base = event_base_new();
// 2.创建带缓冲区的事件
struct bufferevent* bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
// 3.连接服务器
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(8989);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
bufferevent_socket_connect(bev, (struct sockaddr*)&addr, sizeof(addr));
// 4.设置回调
bufferevent_setcb(bev, read_cb, write_cb, events_cb, base);
// 5.设置读缓冲区为可用
bufferevent_enable(bev, EV_READ);
struct event *ev = event_new(base, STDIN_FILENO, EV_READ|EV_PERSIST, send_cb, bev);
event_add(ev, NULL);
// 6.启动事件循环
event_base_dispatch(base);
// 7.释放资源
bufferevent_free(bev);
event_base_free(base);
}

void send_cb(evutil_socket_t fd, short what, void* arg)
{
char buf[1024] = {0};
read(fd, buf, sizeof(buf));

struct bufferevent *bev = (struct bufferevent*)arg;
bufferevent_write(bev, buf, strlen(buf)+1);
printf("客户端发送数据完毕\n");
}

// 读回调 - 读缓冲区有数据回调 - 接收数据
void read_cb(struct bufferevent *bev, void *ctx)
{
char buf[1024];
memset(buf, 0, sizeof(buf));
bufferevent_read(bev, buf, sizeof(buf));
printf("接收到服务器回复的数据 : %s\n", buf);
}
// 写回调 - 写入内核缓冲区时回调 - 发送数据
void write_cb(struct bufferevent *bev, void *ctx)
{
printf("数据被写入内核\n");
}

void events_cb(struct bufferevent *bev, short events, void *ctx)
{
struct event_base *base = (struct event_base*)ctx;
if(events & BEV_EVENT_EOF)
{
printf("服务器断开了链接\n");
}
else if(events & BEV_EVENT_ERROR)
{
printf("不可预期错误\n");
}
else if(events & BEV_EVENT_CONNECTED)
{
printf("和服务器成功建立链接\n");
return;
}
// 终止事件循环
event_base_loopbreak(base);
}

5. 链接监听器

  套接字通信服务器需要做以下流程

  创建套接字 -> 绑定ip端口 -> 监听 -> 等待客户端连接 -> 使用对应客户端的通信文件描述符通信 -> 断开连接

  链接监听器可以完成服务器前4步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <event2/listener.h>
// 回调函数原型,主要是处理通信流程
typedef void (*evconnlistener_cb)(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int len, void *ptr);
/* listener: evconnlistener_new_bind() 返回值
sock: 连接建立后得到的通信文件描述符
addr: 客户端地址信息
len: addr指向内存的大小
ptr: evconnlistener_new_bind() 传进来的参数 */

// 创建连接监听器,封装监听的文件描述符
struct evconnlistener* evconnlistener_new(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, evutil_socket_t fd);
/* 参数:
base: 事件的处理框架
cb: 回调函数
ptr: 实参传递给cb
flags: 连接监听器的选项设置
LEV_OPT_CLOSE_ON_FREE: 连接监听器释放时关闭封装的监听套接字
LEV_OPT_REUSEABLE: 端口复用
backlog: listen() 函数的第二个参数(该函数可以设置监听,最大值128,-1表示自动设置)
fd: 监听的套接字(绑定ip端口后的) */

// 创建连接监听器,封装监听的文件描述符
struct evconnlistener* evconnlistener_new_bind(struct event_base *base, evconnlistener_cb cb, void *ptr, unsigned flags, int backlog, const struct sockaddr *sa, int socklen);
/* 参数:
base: 事件的处理框架
cb: 回调函数
ptr: 实参传递给cb
flags: 连接监听器的选项设置
LEV_OPT_CLOSE_ON_FREE: 连接监听器释放时关闭封装的监听套接字
LEV_OPT_REUSEABLE: 端口复用
backlog: listen() 函数的第二个参数(该函数可以设置监听,最大值128,-1表示自动设置)
sa: 绑定时需要的ip和端口
socklen: sa指针指向的内存和大小 */
  • 调整 evconnlistener 回调函数
1
void evconnlistener_set_cb(struct evconnlistener *lev, evconnlistener_cb cb, void *arg);
  • 释放
1
evconnlistener_free(struct evconnlistener* lev)

服务器端示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
void cb_listener(struct evconnlistener *, evutil_socket_t , struct sockaddr *, int , void *);
void read_cb(struct bufferevent *, void *);
void write_cb(struct bufferevent *, void *);
void events_cb(struct bufferevent *, short , void *);

int main()
{
// 1.创建事件处理框架
struct event_base* base = event_base_new();
// 2.创建带缓冲区的事件
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(8989);
addr.sin_addr.s_addr = INADDR_ANY;
struct evconnlistener* evl;
evl = evconnlistener_new_bind(base, cb_listener, base, LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1, (struct sockaddr*)&addr, sizeof(addr));
// 3.启动事件循环
event_base_dispatch(base);
// 4.释放资源
evconnlistener_free(evl);
event_base_free(base);
}

// 链接监听器的回调
void cb_listener(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *addr, int len, void *ptr)
{
struct event_base *base = (struct event_base*)ptr;
struct bufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, read_cb, write_cb, events_cb, base);
bufferevent_enable(bev, EV_READ);
}


// 读回调 - 读缓冲区有数据回调 - 接收数据
void read_cb(struct bufferevent *bev, void *ctx)
{
char buf[1024];
memset(buf, 0, sizeof(buf));
bufferevent_read(bev, buf, sizeof(buf));
printf("接收到客户端发送的数据 : %s\n", buf);

bufferevent_write(bev, buf, strlen(buf)+1);
printf("数据已经回复给客户端\n");
}
// 写回调 - 写入内核缓冲区时回调 - 发送数据
void write_cb(struct bufferevent *bev, void *ctx)
{
printf("数据被写入内核\n");
}

void events_cb(struct bufferevent *bev, short events, void *ctx)
{
struct event_base *base = (struct event_base*)ctx;
if(events & BEV_EVENT_EOF)
{
printf("客户端断开了链接\n");
}
else if(events & BEV_EVENT_ERROR)
{
printf("不可预期错误\n");
}
// 终止事件循环
event_base_loopbreak(base);
}