ffplay队列分析

ffplay的整体结构是由5个线程和4个队列组成、运转的(不分析字幕)。

整体结构

音视频开发之ffplay队列分析

5个线程

  • 解复用线程 : read_thread,由主线程创建,负责媒体文件的解复用和读取,读取的数据根据流类型放入到对应的编码数据队列中。
  • 音频解码线程 : audio_thread,由read_thread创建,负责将编码数据队列中的数据解码,放入到原始数据队列中。
  • 视频解码线程 : video_thread,由read_thread创建,负责将编码数据队列中的数据解码,放入到原始数据队列中。
  • 音频渲染线程 : 由SDL创建,负责将音频原始数据队列中的数据发送给音频播放设备。
  • 视频渲染线程 : main线程,负责用视频原始数据队列中的数据不断的更新纹理内容,并刷新显示器进行显示。

4个队列

  • FrameQueue pictq : 视频原始数据队列
  • FrameQueue sampq : 音频原始数据队列
  • PacketQueue audioq : 音频编码数据队列
  • PacketQueue videoq : 视频编码数据队列

PacketQueue

Packet队列是基于链表实现的普通队列,由于编码数据帧比较小,所以这是个无限队列。

1
2
3
4
5
6
7
8
9
10
11
typedef struct PacketQueue {
// Packet队列采用的是链表结构
MyAVPacketList *first_pkt, *last_pkt; // 第一个节点和最后一个节点
int nb_packets; // 队列中节点个数
int size; // 队列中所有节点的字节总数
int64_t duration; // 队列中所有节点的总时长
int abort_request; // 是否退出标记 1 退出,0 不退出
int serial; // 序号
SDL_mutex *mutex; // 锁
SDL_cond *cond; // 互斥量
} PacketQueue;

packet_queue_init

初始化队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int packet_queue_init(PacketQueue *q) {
// 将编码数据队列初始化,清零
memset(q, 0, sizeof(PacketQueue));
// 为队列创建锁
q->mutex = SDL_CreateMutex();
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
// 为队列创建锁的互斥量
q->cond = SDL_CreateCond();
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
// 队列设置为退出状态
q->abort_request = 1;
return 0;
}

packet_queue_start

启动队列,每次启动都会在队列中插入一个分割标记,并将队列序号+1

1
2
3
4
5
6
7
8
9
10
static void packet_queue_start(PacketQueue *q) {
// 加锁
SDL_LockMutex(q->mutex);
// 设置启动标记
q->abort_request = 0;
// 队列中放入一个特殊的flush_pkt,用来做不连续的两段数据的分割标记
packet_queue_put_private(q, &flush_pkt);
// 解锁
SDL_UnlockMutex(q->mutex);
}

packet_queue_destroy

销毁队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void packet_queue_flush(PacketQueue *q) {
MyAVPacketList *pkt, *pkt1;
// 加锁
SDL_LockMutex(q->mutex);
// 遍历整个链表,将每个节点的AVPacket对象进行销毁
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
av_freep(&pkt);
}
// 将队列重置
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
// 解锁
SDL_UnlockMutex(q->mutex);
}

static void packet_queue_destroy(PacketQueue *q) {
// 销毁队列
packet_queue_flush(q);
// 销毁锁
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}

packet_queue_get

从队列中取出头节点,block参数表明当队列为空时是否需要阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block,
int *serial) {
MyAVPacketList *pkt1;
int ret;
// 加锁
SDL_LockMutex(q->mutex);

for (;;) {
// 如果退出标记,则退出
if (q->abort_request) {
ret = -1;
break;
}
// 获取队列的头节点
pkt1 = q->first_pkt;
if (pkt1) {
// 将头结点的下一个节点作为新的头结点
q->first_pkt = pkt1->next;
// 如果新头结点为空,则说明当前队列空了,将尾节点也设为空
if (!q->first_pkt) q->last_pkt = NULL;
// 节点数量-1
q->nb_packets--;
// 队列中总字节数减少
q->size -= pkt1->pkt.size + sizeof(*pkt1);
// 队列中总时长减少
q->duration -= pkt1->pkt.duration;
// 获取节点中的AVPacket,赋值给参数中的pkt指针
*pkt = pkt1->pkt;
// 将节点的序号赋值给参数中的serial指针
if (serial) *serial = pkt1->serial;
// 释放节点
av_free(pkt1);
ret = 1;
break;
} else if (!block) {
// 如果头结点为空,而block为0,则不阻塞,直接返回
ret = 0;
break;
} else {
// 如果头结点为空,block为1,则wait等待
SDL_CondWait(q->cond, q->mutex);
}
}
// 解锁
SDL_UnlockMutex(q->mutex);
return ret;
}

packet_queue_put

在队列尾部插入新节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {
// 创建节点
MyAVPacketList *pkt1;
// 如果退出,则返回
if (q->abort_request) return -1;

pkt1 = av_malloc(sizeof(MyAVPacketList));
if (!pkt1) return -1;
// 将AVPacket保存到节点中
pkt1->pkt = *pkt;
pkt1->next = NULL;
// 如果加入的数据是flush_pkt标记,则序号+1
if (pkt == &flush_pkt) q->serial++;
pkt1->serial = q->serial;
// 如果队列中尾节点为空,则将当前节点作为头结点,否则,将当前节点作为尾节点的下一个节点
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
// 将当前节点作为队列新的尾节点
q->last_pkt = pkt1;
// 节点数量+1
q->nb_packets++;
// 队列中总字节数增加
q->size += pkt1->pkt.size + sizeof(*pkt1);
// 队列中总时长增加
q->duration += pkt1->pkt.duration;
/* XXX: should duplicate packet data in DV case */
// 唤醒wait线程
SDL_CondSignal(q->cond);
return 0;
}

static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {
int ret;
// 加锁
SDL_LockMutex(q->mutex);
// 入队列
ret = packet_queue_put_private(q, pkt);
// 解锁
SDL_UnlockMutex(q->mutex);
// 如果失败,则释放AVPacket
if (pkt != &flush_pkt && ret < 0) av_packet_unref(pkt);

return ret;
}

FrameQueue

Frame队列与Packet队列不同,因为Frame中存储的是原始数据,是非常庞大的,所以需要设置成有限队列,这里采用的是滚动数组的方式来实现,分别用读写两个索引来记录位置。

由于将数组进行了读写分离,所以在线程加锁时有了优化,不再需要锁住整个函数,可以局部加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct FrameQueue {
// 使用滚动数组来实现的Frame队列
Frame queue
[FRAME_QUEUE_SIZE]; // 数据帧数组,使用数组来实现的队列,默认大小为FRAME_QUEUE_SIZE
int rindex; // 读索引
int windex; // 写索引
int size; // 当前写入的数据大小
int max_size; // 外部设置的队列最大允许存储的大小,该值不超过FRAME_QUEUE_SIZE
int keep_last; // 是否保留最后一个读数据,如果是1,则rindex_shown的值就为1。并且后面rindex所指向的位置实际上是上个次读的索引,而rindex+rindex_shown才是当前读的索引
int rindex_shown; // 用来配合keep_last,如果keep_last为1,则再第一次next后,rindex_shown也会为1
SDL_mutex *mutex; // 队列锁
SDL_cond *cond; // 队列锁互斥量
PacketQueue *pktq; // 关联的Packet队列
} FrameQueue;

frame_queue_init

初始化队列,并设置队列的最大大小,根据最大大小提前创建好每一个Frame对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int frame_queue_init(FrameQueue *f, PacketQueue *pktq, int max_size,
int keep_last) {
int i;
// 初始化原始数据队列,清零
memset(f, 0, sizeof(FrameQueue));
// 创建原始数据队列的锁
if (!(f->mutex = SDL_CreateMutex())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
// 创建原始数据队列的互斥量
if (!(f->cond = SDL_CreateCond())) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s\n", SDL_GetError());
return AVERROR(ENOMEM);
}
// 关联原始数据队列和编码数据队列
f->pktq = pktq;
// 设置原始数据队列的最大大小
f->max_size = FFMIN(max_size, FRAME_QUEUE_SIZE);
// 是否保留最后一个读的数据
f->keep_last = !!keep_last;
// 提前分配好所有的Frame
for (i = 0; i < f->max_size; i++)
if (!(f->queue[i].frame = av_frame_alloc())) return AVERROR(ENOMEM);
return 0;
}

frame_queue_destory

销毁队列

1
2
3
4
5
6
7
8
9
10
11
12
13
static void frame_queue_destory(FrameQueue *f) {
int i;
// 释放所有的原始数据帧
for (i = 0; i < f->max_size; i++) {
Frame *vp = &f->queue[i];
// 主要是销毁AVFrame和AVSubtitle
frame_queue_unref_item(vp);
av_frame_free(&vp->frame);
}
// 销毁锁和互斥量
SDL_DestroyMutex(f->mutex);
SDL_DestroyCond(f->cond);
}

frame_queue_peek_readable

读取这里也和Packet队列不同,首先通过frame_queue_peek_readable获取到读索引位置的节点,想要获取下一个节点,需要调用frame_queue_next将读索引移动到下一个位置。

读取时如果当前队列为空,则进行阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 从队列中取出一个可读的帧
* @param 队列
* @return 队列中可读的帧
*/
static Frame *frame_queue_peek_readable(FrameQueue *f) {
/* wait until we have a readable a new frame */
// 同写入,size是读写线程都需要操作的,所以需要加锁
SDL_LockMutex(f->mutex);
// 如果设置了keep_last,则在next后,rindex_shown为1,
// rindex_shown为1其实就是标记rindex少走了一步,也等于size少减了1,所以这里计算size时需要减去1
// 如果队列为空
while (f->size - f->rindex_shown <= 0 && !f->pktq->abort_request) {
// 等待写线程notify
SDL_CondWait(f->cond, f->mutex);
}
// 队列解锁
SDL_UnlockMutex(f->mutex);
// 如果关联的Packet队列退出,则返回null
if (f->pktq->abort_request) return NULL;
// rindex + rindex_shown
// (当rindex_shown为1时,rindex少了一次+1,所以需要加上1)
return &f->queue[(f->rindex + f->rindex_shown) % f->max_size];
}

frame_queue_next中并没有对队列进行判空,所以调用该方法的时候需要先用frame_queue_nb_remaining判断一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

/**
* 通知队列数据已经读取,rindex位置的帧已经被外部使用
* rindex需要向后移动,以便frame_queue_peek_readable获取后续的数据
* @param f
*/
static void frame_queue_next(FrameQueue *f) {
// 如果当前队列设置了保留最后一帧,则rindex少走一步,size少减1
if (f->keep_last && !f->rindex_shown) {
// 标记rindex_shown来弥补这1步
f->rindex_shown = 1;
return;
}
// 如果队列设置了keep_last,则这里的删除操作针对的都是上一帧,而不是当前帧
// 删除rindex位置的帧数据,AVFrame和AVSubtitle
frame_queue_unref_item(&f->queue[f->rindex]);
// rindex+1,向后移动一位,如果超出,则从头开始
if (++f->rindex == f->max_size) f->rindex = 0;
// 加锁
SDL_LockMutex(f->mutex);
// size减1
f->size--;
// 通知写线程有空位写入,唤醒wait
SDL_CondSignal(f->cond);
// 解锁
SDL_UnlockMutex(f->mutex);
}

获取当前队列中可读的节点数量

1
2
3
4
static int frame_queue_nb_remaining(FrameQueue *f) {
// 返回当前队列中已有的帧数量
return f->size - f->rindex_shown;
}

frame_queue_peek_writable

写入的话和读取一样,先通过frame_queue_peek_writable获取当前写索引位置的节点,写入完毕后,调用frame_queue_push通知队列写入完毕,写索引向后移动一位。

写入时如果当前队列已满,那么会进行阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 从队列中取出一个可写入的帧
* @param 队列
* @return 队列中可写入的帧
*/
static Frame *frame_queue_peek_writable(FrameQueue *f) {
/* wait until we have space to put a new frame */
// 队列加锁(因为读线程和写线程都会对队列的size进行修改,所以需要加锁)
SDL_LockMutex(f->mutex);
// 如果队列已满,并且关联的Packet队列并没有退出(避免死循环),则循环wait
while (f->size >= f->max_size && !f->pktq->abort_request) {
// wait,等待notify
SDL_CondWait(f->cond, f->mutex);
}
// 队列解锁
SDL_UnlockMutex(f->mutex);
// 只有写线程才需要进行写入,才会改变windex的值,所以获取写入的帧操作并不需要加锁
// 如果关联的Packet队列退出,则返回null
if (f->pktq->abort_request) return NULL;
// 返回windex位置的Frame供外部写入
return &f->queue[f->windex];
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 通知队列数据已经写入,windex位置的帧已经写入了数据
* 数据写入的帧是通过frame_queue_peek_writable获取的
* @param 队列
*/
static void frame_queue_push(FrameQueue *f) {
// 将windex+1,往后移一位,如果后面已经到底了,则从头开始(环状)
if (++f->windex == f->max_size) f->windex = 0;
// 同上,windex只有写线程操作,所以不需要加锁,但是下面的size的操作是读线程和写线程都需要操作的,所以需要加锁
// 加锁
SDL_LockMutex(f->mutex);
// 队列的size+1
f->size++;
// notify通知读线程,唤醒wait
SDL_CondSignal(f->cond);
// 释放锁
SDL_UnlockMutex(f->mutex);
}

结构体

MyAVPacketList

Packet队列中的节点

1
2
3
4
5
typedef struct MyAVPacketList {
AVPacket pkt; // 编码数据
struct MyAVPacketList *next; // 下一个节点
int serial; // 序号
} MyAVPacketList;

Frame

Frame队列中的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct Frame {
// 对数据帧的统一抽象,包含音频帧、视频帧、字幕帧
AVFrame *frame; // 音视频帧
AVSubtitle sub; // 字幕帧
int serial; // 序号
double pts; // pts
double duration; // 显示时长
int64_t pos; // 在流中的位置
int width; // 宽
int height; // 高
int format; // 格式
AVRational sar; // 横纵比
int uploaded; // 是否上传到纹理上
int flip_v; // 是否需要翻转
} Frame;