之前搭建了tensorRT的基本框架,想着部署个yolo做多路视频的处理,现在没有真实的网络摄像头可供使用,临时采用RTSP+ffmpeg推流、Opencv拉流组个模拟。

推流

RTSP(Real-Time Streaming Protocol)用于在网络中控制流媒体服务器。RTSP 允许客户端(如媒体播放器)与服务器建立和控制流媒体会话,支持播放、暂停和停止等功能。他并不直接提供视频推流,需要结合ffmpeg一起。

RTSP服务器安装

下载mediamtx_v1.5.1_linux_amd64.tar.gz,解压,执行./mediamtx

ffmpeg 安装

直接sudo安装即可:sudo apt install ffmpeg
为了同时推送多路视频,可以写sh脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/bin/bash

# MediaMTX 服务器地址
RTSP_URL="rtsp://localhost:8554"

# 视频目录
VIDEO_DIR="/mnt/f/anime/oddtaxi"

# 遍历视频目录中的所有 mp4 文件
for VIDEO_FILE in "$VIDEO_DIR"/*.mp4; do
if [ -f "$VIDEO_FILE" ]; then # 确保是文件
# 获取文件名并替换所有非法字符
STREAM_NAME=$(basename "$VIDEO_FILE" .mp4 | sed -e 's/[^A-Za-z0-9._-]/_/g')

# 启动 ffmpeg 推流
ffmpeg \
-re \
-stream_loop -1 \
-i "$VIDEO_FILE" \
-c copy \
-f rtsp \
-rtsp_transport tcp \
"$RTSP_URL/$STREAM_NAME" \
> /dev/null 2>&1 &

echo "已启动推流: $VIDEO_FILE -> $RTSP_URL/$STREAM_NAME"
fi
done

echo "所有推流已启动!"

此时可以在一些支持拉流的软件中观看“直播”了。

拉流

从tensorRT部署的yolo demo来看,我们的目标是同时处理多路视频,理想状态下,batch_size = 8,每一维都是一路视频,8路视频同时处理。将8路视频帧封装成一个输入,这个过程就是我们想在拉流阶段想要实现的目标。这个过程采用队列似乎更合适一些,队列满时遇到新的帧入队,就pop首元素。

Opencv是可以直接从RTSP拿到视频帧的,既然有多路视频,我们先给视频流定义一些信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct StreamData {
int stream_id;
std::string url;
cv::VideoCapture capture;
std::thread capture_thread;
std::atomic<bool> is_running{true};

// 每路流的统计信息
std::atomic<int> frames_captured{0};
std::atomic<int> frames_processed{0};
std::atomic<int> frames_dropped{0};
std::chrono::steady_clock::time_point last_processed_time;
};

根据预定义好的一些信息就可以获取流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void addStream(const std::string& url, int stream_id) {
auto stream = std::make_unique<StreamData>();
stream->stream_id = stream_id;
stream->url = url;

stream->capture.open(url, cv::CAP_FFMPEG);
stream->capture.set(cv::CAP_PROP_BUFFERSIZE, 1);

if (!stream->capture.isOpened()) {
std::cerr << "Failed to open stream: " << url << std::endl;
return;
}

std::cout << "Successfully opened stream " << stream_id << ": " << url << std::endl;
streams_.push_back(std::move(stream));
}

现在我们把多路视频的信息都放到了streams_数组中,每一路都有自己的url以及相应的信息,此时就应该开始stream->capture.read(frame)读取每一帧了。很显然,如果我们写一个循环去轮询每一路的数据,在进行相应的处理,这是非常慢的,那么一个可以想到的方式就是:有几路视频,就启动几个线程,捕获到的帧,放入到一个共享队列中,队列时调用推理函数。

启动线程

stream->capture.read(frame)shared_frame_queue_.push({frame.clone(), stream->stream_id, capture_time});封装成一个函数:void captureLoop(StreamData* stream),然后启动线程调用函数。回头看给流的定义,我们定义了线程的成员。

这句代码意思为:启动线程执行void captureLoop函数,this指针使得该函数可以访问对象成员,也就是用来存放帧的共享队列,stream.get()作为函数的形参。

1
stream->capture_thread = std::thread(&RTSPBatchProcessor::captureLoop, this, stream.get());

函数主体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

void captureLoop(StreamData* stream) {
cv::Mat frame;
constexpr int max_queue_size = 50; // 全局队列最大大小

while (stream->is_running && !should_stop_) {
if (stream->capture.read(frame)) {
auto capture_time = std::chrono::steady_clock::now();
++stream->frames_captured;

std::lock_guard<std::mutex> lock(queue_mutex_);

// 如果队列满了,丢弃最旧的帧
if (shared_frame_queue_.size() >= max_queue_size) {
// 找到并丢弃该流最旧的帧
std::queue<FrameData> temp_queue;
bool dropped = false;

while (!shared_frame_queue_.empty()) {
auto& front = shared_frame_queue_.front();
if (!dropped && front.stream_id == stream->stream_id) {
dropped = true;
++stream->frames_dropped;
} else {
temp_queue.push(std::move(front));
}
shared_frame_queue_.pop();
}

shared_frame_queue_ = std::move(temp_queue);
}

// 添加新帧
shared_frame_queue_.push({frame.clone(), stream->stream_id, capture_time});

batch_ready_cv_.notify_one();

}
}
}

启动完捕获视频帧的线程,我们紧接着再启动一个推理线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
inference_thread_ = std::thread(&RTSPBatchProcessor::inferenceLoop, this);

void inferenceLoop() {
while (!should_stop_) {
std::vector<FrameData> batch;
batch.reserve(batch_size_);

// 收集批次
std::unique_lock<std::mutex> lock(queue_mutex_);

// 等待直到有足够的帧或超时
batch_ready_cv_.wait_for(lock, std::chrono::milliseconds(100), [this]() {
return should_stop_ || shared_frame_queue_.size() >= batch_size_ ||
(shared_frame_queue_.size() > 0 &&
std::chrono::steady_clock::now() - last_batch_time_ > std::chrono::milliseconds(200));
});

if (should_stop_) break;

// 收集最多batch_size个帧
while (!shared_frame_queue_.empty() && batch.size() < batch_size_) {
batch.push_back(std::move(shared_frame_queue_.front()));
shared_frame_queue_.pop();
}

last_batch_time_ = std::chrono::steady_clock::now();

// 执行批量推理
if (!batch.empty()) {
processBatch(batch);
}
}
}

流程抽象

每一个线程都在不停的获取新的帧,但是共享队列是一个共享缓冲区 (Shared Buffer),当然,叫他临界资源 (Critical Resource)也没啥问题,最关键的地方在于,他是互斥访问的,同一时间只有一个线程能访问这个资源,访问时要给它上个锁,也就是互斥锁(mutex),除此之外,我们还需要另一个机制:条件变量。用于同步消费者的状态,比如告诉消费者现在可以消费了。

因此我们将视频流捕获的过程以及送去推理的过程进行一个抽象类比:8路视频就是8位生产者,不断拿到新的帧,伺机抢占共享资源,给共享队列上锁,然后放入自己的帧(当然这里面还有丢弃帧的过程),放入帧之后通知消费者,也就是推理线程,看看现在要不要消费。

代码流程 - 生产者

具体在代码层面上,上锁的过程:

1
std::lock_guard<std::mutex> lock(queue_mutex_);

需要单独进行说明的点:

  • 上锁的代码以下,直至这句上锁代码所在的作用域结束,这段代码片中的变量如果是全局变量就会被上锁,锁保护的是临界区代码片段,而不是特定的变量。mutex和被保护的共享资源之间的关联是程序逻辑上的约定,同一个mutex应该用来保护同一组相关的共享资源。在作用域结束时,lock_guard会自动释放锁。

  • 此时有一个疑问:如果上锁失败呢?也就是别人已经上锁做插入图像的工作了,当前线程会怎么办?继续尝试?还是在循环中continue进行下一帧处理? 基于当前的代码,该线程会阻塞等待

在正常结束添加新帧的工作后,生产者线程会通过一个条件变量,通知等待这个条件变量的线程,是时候准备上班了。

1
batch_ready_cv_.notify_one();

代码流程 - 消费者

消费者线程在一个条件循环中一直等待,先尝试上锁,准备访问共享队列,而后将锁的信息一起发给条件变量,由条件变量决定之后继续往下执行,*条件变量在发现当前线程不满足继续运行的条件时,然后执行以下操作:

  • 释放锁(完全释放,其他线程可以获取)
  • 阻塞当前线程
  • 被唤醒后重新获取锁
1
2
3
4
5
6
7
8
std::unique_lock<std::mutex> lock(queue_mutex_);

// 等待直到有足够的帧或超时
batch_ready_cv_.wait_for(lock, std::chrono::milliseconds(100), [this]() {
return should_stop_ || shared_frame_queue_.size() >= batch_size_ ||
(shared_frame_queue_.size() > 0 &&
std::chrono::steady_clock::now() - last_batch_time_ > std::chrono::milliseconds(200));
});

流程总结

生产者:8个视频捕获线程,各自尝试上锁,

  • 上锁失败,阻塞等待,不断尝试。
  • 上锁成功,写入新帧,通过条件变量告知消费者

消费者:推理线程,先上锁,再去检测条件变量是否满足,

  • 满足:锁具有互斥功能,执行推理
  • 不满足:wait_for会释放锁并阻塞等待,不会妨碍生产者线程的读写