<返回更多

ijkPlayer源码分析 PacketQueue分析

2021-07-13    程序员老z
加入收藏

本文介绍PacketQueue,相对于FrameQueue来说比较简单,可以类比Android中的MessageQueue。

PacketQueue总体介绍

  1. 单向链表结构。first_pkt、last_pkt,是链表的起点和终点结点;recycle_pkt链表用于节点复用;
  2. 是一个多线程安全队列,靠等待唤醒机制保证线程安全;
  3. 当遇到flush_pkt时,serial加1自增,标志着流序列变化,区分是否是连续的流;
typedef struct MyAVPacketList {
    AVPacket pkt;
    struct MyAVPacketList *next;
    int serial;
} MyAVPacketList;

typedef struct PacketQueue {
    MyAVPacketList *first_pkt, *last_pkt;
    int nb_packets;
    int size;
    int64_t duration;
    int abort_request;
    int serial;
    SDL_mutex *mutex;
    SDL_cond *cond;
    MyAVPacketList *recycle_pkt;
    int recycle_count;
    int alloc_count;

    int is_buffer_indicator;
    SDL_Profiler    videoBufferProfiler;
    SDL_Profiler    audioBufferProfiler;
    void *ffp;
} PacketQueue;

 

PacketQueue API介绍

packet_queue_init:初始化;

packet_queue_start:启动队列,设置abort_request为0,先放一个flush_pkt;

packet_queue_put:存入一个节点,;


packet_queue_put_nullpacket:存入一个空节点;

packet_queue_put_private:存入一个节点,后唤醒packet_queue_get等待锁;

packet_queue_get:获取一个节点;


packet_queue_get_or_buffering:去缓冲等待水位后获取一个节点;

packet_queue_abort:中止,设置abort_request=1后唤醒packet_queue_get等待锁;

packet_queue_flush:清除队列内所有的节点;

packet_queue_destroy:销毁;

 

初始化

static int packet_queue_init(PacketQueue *q) {
    memset(q, 0, sizeof(PacketQueue));
    q->mutex = SDL_CreateMutex();
    q->cond = SDL_CreateCond();
    q->abort_request = 1;
    return 0;
}

static void packet_queue_start(PacketQueue *q) {
    SDL_LockMutex(q->mutex);
    q->abort_request = 0;
    packet_queue_put_private(q, &flush_pkt);
    SDL_UnlockMutex(q->mutex);
}

 

put操作

/*
  * 存入null结点,eof和error时候存入,表示流结束
  */
static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index) {
    AVPacket pkt1, *pkt = &pkt1;
    av_init_packet(pkt);
    pkt->data = NULL;
    pkt->size = 0;
    pkt->stream_index = stream_index;
    return packet_queue_put(q, pkt);
}

static int packet_queue_put(PacketQueue *q, AVPacket *pkt) {
    int ret;

    SDL_LockMutex(q->mutex);
    ret = packet_queue_put_private(q, pkt);
    SDL_UnlockMutex(q->mutex);

    if (pkt != &flush_pkt && ret < 0)
        av_packet_unref(pkt);

    return ret;
}

static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt) {
    if (q->abort_request) {
        return -1;
    }

    // 如果有已经回收的就复用该回收的结点,没有就申请一个;
    MyAVPacketList *pkt1 = q->recycle_pkt;
    if (pkt1) {
        q->recycle_pkt = pkt1->next; // 移动到下一个
        q->recycle_count++;
    } else {
        q->alloc_count++;
        pkt1 = av_malloc(sizeof(MyAVPacketList));
    }

    if (!pkt1) {
        return -1;
    }

    pkt1->pkt = *pkt;
    pkt1->next = NULL;

    // 遇到flush_pkt就升级serial序列号,标志刚开始或进行了seek
    if (pkt == &flush_pkt) {
        q->serial++;
    }

    pkt1->serial = q->serial;

    //  赋值first_pkt和last_pkt,定义链表的起点和终点;
    if (!q->last_pkt) { // 条件判断同 !q->first_pkt
        q->first_pkt = pkt1;
    } else {
        q->last_pkt->next = pkt1;
    }

    q->last_pkt = pkt1;
    q->nb_packets++;

    q->size += pkt1->pkt.size + sizeof(*pkt1);
    q->duration += FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);

    /* XXX: should duplicate packet data in DV case */
    SDL_CondSignal(q->cond);
    return 0;
}

 

get操作

/*
  * block: 是否阻塞
  * 返回1表示获取到了,返回值<=0表示没获取到
  */
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial) {
    MyAVPacketList *pkt1;
    int ret;

    // 加锁
    SDL_LockMutex(q->mutex);

    for (;;) {
        if (q->abort_request) {
            ret = -1;
            break;
        }

        pkt1 = q->first_pkt;
        if (pkt1) {
            q->first_pkt = pkt1->next;
            if (!q->first_pkt) {
                 // 说明只有一个结点
                q->last_pkt = NULL;
            }

            q->nb_packets--;
            q->size -= pkt1->pkt.size + sizeof(*pkt1);
            q->duration -= FFMAX(pkt1->pkt.duration, MIN_PKT_DURATION);
            *pkt = pkt1->pkt;

            if (serial) {
                *serial = pkt1->serial;
            }
          
            // 把pkt1持有的pkt给出去后进行回收,放到recycle_pkt链表头部
            pkt1->next = q->recycle_pkt;
            q->recycle_pkt = pkt1;
            ret = 1;
            break;
        } else if (!block) {
            ret = 0;
            break;
        } else {
            // wait阻塞,等待put唤醒
            SDL_CondWait(q->cond, q->mutex);
        }
    }
    SDL_UnlockMutex(q->mutex);
    return ret;
}


/*
 *  阻塞等待直到退出或者有AVPacket数据
 *  >= 0 即取到值;
 */
static int packet_queue_get_or_buffering(FFPlayer *ffp, PacketQueue *q, AVPacket *pkt, int *serial,
                                         int *finished) {

    if (!ffp->packet_buffering)
        return packet_queue_get(q, pkt, 1, serial); // queue为空时会阻塞等待

    while (1) {
        int new_packet = packet_queue_get(q, pkt, 0, serial); // 非阻塞,直接返回

        if (new_packet < 0) {
            // abort_request了
            return -1;
        } else if (new_packet == 0) {
            // 队列为空,去缓冲
            if (q->is_buffer_indicator && !*finished) {
                ffp_toggle_buffering(ffp, 1);
            }

           // 再阻塞获取,等待水位填充满
            new_packet = packet_queue_get(q, pkt, 1, serial);

            if (new_packet < 0) {
                // abort_request了
                return -1;
            }
        }

        if (*finished == *serial) {
            av_packet_unref(pkt);
            continue;
        } else {
            break;
        }
    }


    return 1;
}

 

重置、销毁操作

// stream_close时第一个调用它,主要是置abort_request为1,阻断后续所有流程
static void packet_queue_abort(PacketQueue *q) {
    SDL_LockMutex(q->mutex);
    q->abort_request = 1;
    SDL_CondSignal(q->cond);
    SDL_UnlockMutex(q->mutex);
}

// seek或destory时调用
static void packet_queue_flush(PacketQueue *q) {
    SDL_LockMutex(q->mutex);

    // 释放所有pkt
    MyAVPacketList *pkt, *pkt1;
    for (pkt = q->first_pkt; pkt; pkt = pkt1) {
        pkt1 = pkt->next;
        av_packet_unref(&pkt->pkt);

        // 回收,放到链表头部
        pkt->next = q->recycle_pkt;
        q->recycle_pkt = pkt;
    }

    q->last_pkt = NULL;
    q->first_pkt = NULL;
    q->nb_packets = 0;
    q->size = 0;
    q->duration = 0;

    SDL_UnlockMutex(q->mutex);
}

// 清空所有pkt,包括recycle_pkt,stream_close处调用
static void packet_queue_destroy(PacketQueue *q) {
    packet_queue_flush(q);

    SDL_LockMutex(q->mutex);
    while (q->recycle_pkt) {
        MyAVPacketList *pkt = q->recycle_pkt;
        if (pkt)
            q->recycle_pkt = pkt->next;
        av_freep(&pkt);
    }
    SDL_UnlockMutex(q->mutex);

    SDL_DestroyMutex(q->mutex);
    SDL_DestroyCond(q->cond);
}
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>