chenwusong 发表于 2024-9-13 15:52:36

回调函数,无法正常解析rtm流传入的图像的数据

我在本机的1935部署了rtmp视频流,使demo.p已经可以正常读取到视频流,并且播放出去。然后我开始启动回调函数对数据进行处理,不过目前demo.py里的回调函数,无法解析出图像数据,这个回调函数,可以正常从/dev/video0里读取的图像,但是无法解析rtmp流的图像。


#!/usr/bin/env python
#coding:utf-8
"""
Author:   陈 --<>
Purpose:
Created: 202409
"""

import argparse
import re
import sys
import os
import stat
import time
import cv2
import copy
import json

import numpy as np

import ff_pymedia as m


def showText(img, text):
    return cv2.putText(img, f'fps:{text}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA)

def align(x, a):
    return (x + a - 1) & ~(a - 1)


def call_back1(obj, MediaBuffer):
    print('czj',time.time())
    vb = m.VideoBuffer.from_base(MediaBuffer)
    data = vb.getActiveData()
    try:
      img = data.reshape((vb.getImagePara().vstride, vb.getImagePara().hstride, 3))
      print(img.shape)
    except:
      pass
    #np.save(f'cache/{time.time()}.npy',data)
    print(type(data),data.shape,vb.getImagePara().vstride,vb.getImagePara().hstride,data[:20])
    a = MediaBuffer.getActiveData()
    print(a.shape)
    #obj.write(a)

    return MediaBuffer



def get_parameters():
    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--input_source", dest='input_source', type=str, help="input source")
    parser.add_argument("-f", "--save_file", dest='save_file', type=str, help="Enable save source output data to file, set filename, default disabled")
    parser.add_argument("-o", "--output", dest='output', type=str, help="Output image size, default same as input")
    parser.add_argument("-b", "--outputfmt", dest='outputfmt', type=str, default="NV12", help="Output image format, default NV12")
    parser.add_argument("-e", "--encodetype", dest='encodetype', type=int, default=-1, help="Encode encode, set encode type, default disabled")
    parser.add_argument("-m", "--enmux", dest='enmux', type=str, help="Enable save encode data to file. Enable package as mp4, mkv, flv, ts, ps or raw stream files, muxer type depends on the filename suffix. default disabled")
    parser.add_argument("-p", "--port", dest='port', type=int, default=0, help="Enable push stream, default rtsp stream, set push port, depend on encode enabled, default disabled\n")
    parser.add_argument("--push_type", dest='push_type', type=int, default=0, help="Set push stream type, default rtsp. e.g. --push_type 1\n")
    parser.add_argument("--rtmp_url", dest='rtmp_url', type=str, help="Set the rtmp client push address. e.g. --rtmp_url rtmp://xxx\n")
    parser.add_argument("--rtsp_transport", dest='rtsp_transport', type=int, default=0, help="Set the rtsp transport type, default 0(udp)")
    parser.add_argument("-s", "--sync", dest="sync", type=int, default=-1, help="Enable synchronization module, default disabled. e.g. -s 1")
    parser.add_argument("--audio", dest='audio', type=bool, default=False, help="Enable audio, default disabled.")
    parser.add_argument("--aplay", dest='aplay', type=str, help="Enable play audio, default disabled. e.g. --aplay plughw:3,0")
    parser.add_argument("--arecord", dest='arecord', type=str, help="Enable record audio, default disabled. e.g. --arecord plughw:3,0")
    parser.add_argument("-r", "--rotate", dest='rotate',type=int, default=0, help="Image rotation degree, default 0" )
    parser.add_argument("-d", "--drmdisplay", dest='drmdisplay', type=int, default=-1, help="Drm display, set display plane, set 0 to auto find plane")
    parser.add_argument("--connector", dest='connector', type=int, default=0, help="Set drm display connector, default 0 to auto find connector")
    parser.add_argument("-z","--zpos", dest='zpos', type=int, default=0xff, help="Drm display plane zpos, default auto select")
    parser.add_argument("-c", "--cvdisplay", dest='cvdisplay', type=int, default=0, help="OpenCv display, set window number, default 0")
    parser.add_argument("-x", "--x11display", dest='x11display', type=int, default=0, help="X11 window displays, render the video using gles. default disabled")
    parser.add_argument("-l", "--loop", dest='loop', action='store_true', help="Loop reads the media file.")
    parser.add_argument("--gb28181_user_id", dest='gb28181_user_id', type=str, help="Enable gb28181 client, default disabled. set user id.")
    parser.add_argument("--gb28181_server_id", dest='gb28181_server_id', type=str, help="Set the server id of gb28181 client.")
    parser.add_argument("--gb28181_server_ip", dest='gb28181_server_ip', type=str, help="Set the server ip of gb28181 client\.")
    parser.add_argument("--gb28181_server_port", dest='gb28181_server_port', type=int, default=5060, help="Set the server port of gb28181 client.")
    return parser.parse_args()

def main():

    args = get_parameters()
    last_audio_module = None
    input_audio_source = None

    if args.input_source is None:
      return 1
    elif args.input_source.startswith("rtsp://"):
      print("input source is a rtsp url")
      input_source = m.ModuleRtspClient(args.input_source, m.RTSP_STREAM_TYPE(args.rtsp_transport), True, args.audio)
    elif args.input_source.startswith("rtmp://"):
      print("input source is a rtmp url")
      input_source = m.ModuleRtmpClient(args.input_source)
    else:
      is_stat = os.stat(args.input_source)
      if stat.S_ISCHR(is_stat.st_mode):
            print("input source is a camera device.")
            input_source = m.ModuleCam(args.input_source)
      elif stat.S_ISREG(is_stat.st_mode):
            print("input source is a regular file.")
            input_source = m.ModuleFileReader(args.input_source, args.loop);
      else:
            print("{} is not support.".format(args.input_source))
            return 1

    ret = input_source.init()
    last_module = input_source
    if ret < 0:
      print("input_source init failed")
      return 1

    if args.sync == -1:
      sync = None
    else:
      sync = m.Synchronize(m.SynchronizeType(args.sync))

    if args.arecord is not None:
      s_info = m.SampleInfo()
      s_info.channels = 2
      s_info.fmt = m.SAMPLE_FMT_S16
      s_info.nb_samples = 1024
      s_info.sample_rate = 48000
      capture = m.ModuleAlsaCapture(args.arecord, s_info)
      ret = capture.init()
      if ret < 0:
            print("Failed to init arecord")
            return ret
      input_audio_source = capture
      last_audio_module = capture

      aac_enc = m.ModuleAacEnc()
      aac_enc.setProductor(last_audio_module)
      ret = aac_enc.init()
      if ret < 0:
            print("Failed to init aac_enc")
            return ret
      last_audio_module = aac_enc


    if args.aplay is not None:
      aac_dec = m.ModuleAacDec()
      if last_audio_module != None:
            aac_dec.setProductor(last_audio_module)
      else:
            aac_dec.setProductor(last_module)
      ret = aac_dec.init()
      if ret < 0:
            print("aac_dec init failed")
            return 1

      aplay = m.ModuleAlsaPlayBack(args.aplay)
      aplay.setProductor(aac_dec)
      aplay.setSynchronize(sync)
      ret = aplay.init()
      if ret < 0:
            print("aplay init failed")
            return 1

    input_para = last_module.getOutputImagePara()
    #input_para = last_module.getOutputImagePara()
    #last_module.setOutputDataCallback({}, call_back1)

    if input_para.v4l2Fmt == m.v4l2GetFmtByName("H264") or \
      input_para.v4l2Fmt == m.v4l2GetFmtByName("MJPEG")or \
      input_para.v4l2Fmt == m.v4l2GetFmtByName("H265"):
      dec = m.ModuleMppDec()
      dec.setProductor(last_module)
      ret = dec.init()
      if ret < 0:
            print("dec init failed")
            return 1
      last_module = dec

    input_para = last_module.getOutputImagePara()

    #last_module.setOutputDataCallback({}, call_back1)

    output_para=m.ImagePara(input_para.width, input_para.height, input_para.hstride, input_para.vstride, m.v4l2GetFmtByName(args.outputfmt))
    if args.output is not None:
      match = re.match(r"(\d+)x(\d+)", args.output)
      if match:
            width, height = map(int, match.groups())
            output_para.width = width
            output_para.height = height

    if args.rotate !=0 or input_para.height != output_para.height or \
      input_para.height != output_para.height or \
      input_para.width != output_para.width or \
      input_para.v4l2Fmt != output_para.v4l2Fmt:
      rotate = m.RgaRotate(args.rotate)

      if rotate == m.RgaRotate.RGA_ROTATE_90 or rotate == m.RgaRotate.RGA_ROTATE_270:
            t = output_para.width
            output_para.width = output_para.height
            output_para.height = t

      # hstride and vstride are aligned to 16 bytes
      output_para.hstride = align(output_para.width, 16)
      output_para.vstride = align(output_para.height, 16)

      rga = m.ModuleRga(output_para, rotate)
      rga.setProductor(last_module)
      rga.setBufferCount(2)
      ret = rga.init()
      if ret < 0:
            print("rga init failed")
            return 1
      last_module = rga

    cv_display = None

    #input_source.setOutputDataCallback({}, call_back1)

    last_module.setOutputDataCallback({}, call_back1)

    if args.drmdisplay != -1:
      input_para = last_module.getOutputImagePara()
      #last_module.setOutputDataCallback({}, call_back1)

      drm_display = m.ModuleDrmDisplay()
      drm_display.setPlanePara(m.v4l2GetFmtByName("NV12"), args.drmdisplay,
                                 m.PLANE_TYPE.PLANE_TYPE_OVERLAY_OR_PRIMARY, args.zpos, 1, args.connector)
      drm_display.setProductor(last_module)
      drm_display.setSynchronize(sync)
      ret = drm_display.init()
      if ret < 0:
            print("drm display init failed")
            return 1
      else:
            t_h = drm_display.getDisplayPlaneH()
            t_w = drm_display.getDisplayPlaneW()
            w = min(t_w, input_para.width)
            h = min(t_h, input_para.height)
            x = (t_w - w) // 2
            y = (t_h - h) // 2
            drm_display.setWindowSize(x, y, w, h)
    else:
      if args.x11display != 0:
            x11_display = m.ModuleRendererVideo(args.input_source)
            x11_display.setProductor(last_module)
            x11_display.setSynchronize(sync)
            ret = x11_display.init()
            if ret < 0:
                print("ModuleRendererVideo init failed")
                return 1

      if args.cvdisplay > 0:
            if not cv2_enable:
                print("Run 'pip3 install opencv-python' to install opencv")
                return 1
            if output_para.v4l2Fmt != m.v4l2GetFmtByName("BGR24"):
                print("Output image format is not 'BGR24', Use the '-b BGR24' option to specify image format.")
                return 1
            cv_display = Cv2Display("Cv2Display", None, sync, args.cvdisplay)
            cv_display.module = last_module.addExternalConsumer("Cv2Display", cv_display, cv2_extcall_back)

    if args.encodetype != -1:
      enc = m.ModuleMppEnc(m.EncodeType(args.encodetype))
      enc.setProductor(last_module)
      enc.setBufferCount(8)
      enc.setDuration(0) #Use the input source timestamp
      ret = enc.init()
      if ret < 0:
            print("ModuleMppEnc init failed")
            return 1
      last_module = enc

      if args.port != 0:
            push_s = None
            if args.push_type == 0:
                push_s = m.ModuleRtspServer("/live/0", args.port)
            else:
                push_s = m.ModuleRtmpServer("/live/0", args.port)
            push_s.setProductor(last_module)
            push_s.setBufferCount(0)
            if args.sync != -1:
                push_s.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))

            ret = push_s.init()
            if ret < 0:
                print("push server init failed")
                return 1

            if args.audio == True and args.push_type == 0:
                push_s_a = m.ModuleRtspServerExtend(push_s, "/live/0", args.port)
                if last_audio_module != None:
                  push_s_a.setProductor(last_audio_module)
                else:
                  push_s_a.setProductor(input_source)
                push_s_a.setAudioParameter(m.MEDIA_CODEC_AUDIO_AAC);
                ret = push_s_a.init()
                if ret < 0:
                  print("Failed to init audio push server")
                  return 1

      if args.rtmp_url is not None:
            push_c = m.ModuleRtmpClient(args.rtmp_url, m.ImagePara(), 0)
            push_c.setProductor(last_module)
            if args.sync != -1:
                push_c.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))
            ret = push_c.init()
            if ret < 0:
                print("Fail to init rtmp client push")
                return 1

      if args.gb28181_user_id is not None:
            gb28181_c = m.ModuleGB28181Client(args.gb28181_user_id, "ffmedia")
            gb28181_c.setProductor(last_module)
            if args.sync != -1:
                gb28181_c.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))

            gb28181_c.setServerConfig(args.gb28181_server_id, args.gb28181_server_ip, \
                                    args.gb28181_server_ip, args.gb28181_server_port, 3600)
            ret = gb28181_c.init()
            if ret < 0:
                print("Failed to init gb28181 client")
                return 1

    if args.enmux is not None:
      enm = m.ModuleFileWriter(args.enmux)
      enm.setProductor(last_module)
      ret = enm.init()
      if ret < 0:
            print("ModuleFileWriter init failed")
            return 1

      if args.audio:
            enm_audio = m.ModuleFileWriterExtend(enm, args.enmux)
            if last_audio_module != None:
                enm_audio.setProductor(last_audio_module)
            else:
                enm_audio.setProductor(input_source)
            enm_audio.setAudioParameter(0, 0, 0, m.MEDIA_CODEC_AUDIO_AAC);
            ret = enm_audio.init()
            if ret < 0:
                print("Failed to init audio writer")
                return 1

    if args.save_file is not None:
      file = open(args.save_file, "wb")
      input_source.setOutputDataCallback(file, call_back)


    input_source.start()
    input_source.dumpPipe()
    if input_audio_source != None:
      input_audio_source.start()
      input_audio_source.dumpPipe()
    text = input("wait...")
    if input_audio_source != None:
      input_audio_source.dumpPipeSummary()
      input_audio_source.stop()
    input_source.dumpPipeSummary()
    input_source.stop()

    if args.save_file is not None:
      file.close()


if __name__ == "__main__":
    main()
程序启动命令是:
sudo /home/firefly/miniconda3/envs/pyev/bin/python3 ./czj_new_v5.py-i rtmp://127.0.0.1:1935/live/stream_key -o 1920x1080 -d 0


然后回调函数里call_back1,无法正常解析出图像。
下面是打印日志。




可以请大佬帮忙看看怎么正常解析rtmp里的图像,处理后再发送走吗?




dengkx 发表于 2024-9-13 17:18:29

解码模块解出来的是nv12格式数据,不是BGR24格式的,你可以添加个RGA模块将nv12格式转成BGR24,然后设置回调在RGA模块上处理数据。和demo.py 的-b BGR24操作一样

chenwusong 发表于 2024-9-14 09:32:47

dengkx 发表于 2024-9-13 17:18
解码模块解出来的是nv12格式数据,不是BGR24格式的,你可以添加个RGA模块将nv12格式转成BGR24,然后设置回 ...

成功了成功了,大佬真厉害,加了这个转换后的数据果然可以正常解析了,多谢大佬指点。
多问一下,现在可以直接从/dev/video0,读取nv24的数据吗? 就是改成   python demo.py -i/dev/video0 -d 0 这样运行的,之前也问过,那会说还没兼容从/dev/video0读取nv24的输入,请问现在有什么办法解决吗?

dengkx 发表于 2024-9-14 11:39:08

chenwusong 发表于 2024-9-14 09:32
成功了成功了,大佬真厉害,加了这个转换后的数据果然可以正常解析了,多谢大佬指点。
多问一下,现在可 ...

读取应该没有问题,只是硬件不支持操作NV24格式的数据,所以他转不了BGR24给你使用
页: [1]
查看完整版本: 回调函数,无法正常解析rtm流传入的图像的数据