|
【技术讨论】
回调函数,无法正常解析rtm流传入的图像的数据
发表于 2024-9-13 15:52:36
浏览:645
|
回复:3
打印
只看该作者
[复制链接]
楼主
我在本机的1935部署了rtmp视频流,使demo.p已经可以正常读取到视频流,并且播放出去。然后我开始启动回调函数对数据进行处理,不过目前demo.py里的回调函数,无法解析出图像数据,这个回调函数,可以正常从/dev/video0里读取的图像,但是无法解析rtmp流的图像。
- #!/usr/bin/env python
- #coding:utf-8
- """
- Author: 陈 --<>
- Purpose:
- Created: 202409
- """
- import argparse
- import re
- import sys
- import os
- import stat
- import time
- import cv2
- import copy
- import json
- import numpy as np
- import ff_pymedia as m
- def showText(img, text):
- return cv2.putText(img, f'fps:{text}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA)
- def align(x, a):
- return (x + a - 1) & ~(a - 1)
- def call_back1(obj, MediaBuffer):
- print('czj',time.time())
- vb = m.VideoBuffer.from_base(MediaBuffer)
- data = vb.getActiveData()
- try:
- img = data.reshape((vb.getImagePara().vstride, vb.getImagePara().hstride, 3))
- print(img.shape)
- except:
- pass
- #np.save(f'cache/{time.time()}.npy',data)
- print(type(data),data.shape,vb.getImagePara().vstride,vb.getImagePara().hstride,data[:20])
- a = MediaBuffer.getActiveData()
- print(a.shape)
- #obj.write(a)
- return MediaBuffer
- def get_parameters():
- parser = argparse.ArgumentParser()
- parser.add_argument("-i", "--input_source", dest='input_source', type=str, help="input source")
- parser.add_argument("-f", "--save_file", dest='save_file', type=str, help="Enable save source output data to file, set filename, default disabled")
- parser.add_argument("-o", "--output", dest='output', type=str, help="Output image size, default same as input")
- parser.add_argument("-b", "--outputfmt", dest='outputfmt', type=str, default="NV12", help="Output image format, default NV12")
- parser.add_argument("-e", "--encodetype", dest='encodetype', type=int, default=-1, help="Encode encode, set encode type, default disabled")
- parser.add_argument("-m", "--enmux", dest='enmux', type=str, help="Enable save encode data to file. Enable package as mp4, mkv, flv, ts, ps or raw stream files, muxer type depends on the filename suffix. default disabled")
- parser.add_argument("-p", "--port", dest='port', type=int, default=0, help="Enable push stream, default rtsp stream, set push port, depend on encode enabled, default disabled\n")
- parser.add_argument("--push_type", dest='push_type', type=int, default=0, help="Set push stream type, default rtsp. e.g. --push_type 1\n")
- parser.add_argument("--rtmp_url", dest='rtmp_url', type=str, help="Set the rtmp client push address. e.g. --rtmp_url rtmp://xxx\n")
- parser.add_argument("--rtsp_transport", dest='rtsp_transport', type=int, default=0, help="Set the rtsp transport type, default 0(udp)")
- parser.add_argument("-s", "--sync", dest="sync", type=int, default=-1, help="Enable synchronization module, default disabled. e.g. -s 1")
- parser.add_argument("--audio", dest='audio', type=bool, default=False, help="Enable audio, default disabled.")
- parser.add_argument("--aplay", dest='aplay', type=str, help="Enable play audio, default disabled. e.g. --aplay plughw:3,0")
- parser.add_argument("--arecord", dest='arecord', type=str, help="Enable record audio, default disabled. e.g. --arecord plughw:3,0")
- parser.add_argument("-r", "--rotate", dest='rotate',type=int, default=0, help="Image rotation degree, default 0" )
- parser.add_argument("-d", "--drmdisplay", dest='drmdisplay', type=int, default=-1, help="Drm display, set display plane, set 0 to auto find plane")
- parser.add_argument("--connector", dest='connector', type=int, default=0, help="Set drm display connector, default 0 to auto find connector")
- parser.add_argument("-z","--zpos", dest='zpos', type=int, default=0xff, help="Drm display plane zpos, default auto select")
- parser.add_argument("-c", "--cvdisplay", dest='cvdisplay', type=int, default=0, help="OpenCv display, set window number, default 0")
- parser.add_argument("-x", "--x11display", dest='x11display', type=int, default=0, help="X11 window displays, render the video using gles. default disabled")
- parser.add_argument("-l", "--loop", dest='loop', action='store_true', help="Loop reads the media file.")
- parser.add_argument("--gb28181_user_id", dest='gb28181_user_id', type=str, help="Enable gb28181 client, default disabled. set user id.")
- parser.add_argument("--gb28181_server_id", dest='gb28181_server_id', type=str, help="Set the server id of gb28181 client.")
- parser.add_argument("--gb28181_server_ip", dest='gb28181_server_ip', type=str, help="Set the server ip of gb28181 client\.")
- parser.add_argument("--gb28181_server_port", dest='gb28181_server_port', type=int, default=5060, help="Set the server port of gb28181 client.")
- return parser.parse_args()
- def main():
- args = get_parameters()
- last_audio_module = None
- input_audio_source = None
- if args.input_source is None:
- return 1
- elif args.input_source.startswith("rtsp://"):
- print("input source is a rtsp url")
- input_source = m.ModuleRtspClient(args.input_source, m.RTSP_STREAM_TYPE(args.rtsp_transport), True, args.audio)
- elif args.input_source.startswith("rtmp://"):
- print("input source is a rtmp url")
- input_source = m.ModuleRtmpClient(args.input_source)
- else:
- is_stat = os.stat(args.input_source)
- if stat.S_ISCHR(is_stat.st_mode):
- print("input source is a camera device.")
- input_source = m.ModuleCam(args.input_source)
- elif stat.S_ISREG(is_stat.st_mode):
- print("input source is a regular file.")
- input_source = m.ModuleFileReader(args.input_source, args.loop);
- else:
- print("{} is not support.".format(args.input_source))
- return 1
- ret = input_source.init()
- last_module = input_source
- if ret < 0:
- print("input_source init failed")
- return 1
- if args.sync == -1:
- sync = None
- else:
- sync = m.Synchronize(m.SynchronizeType(args.sync))
- if args.arecord is not None:
- s_info = m.SampleInfo()
- s_info.channels = 2
- s_info.fmt = m.SAMPLE_FMT_S16
- s_info.nb_samples = 1024
- s_info.sample_rate = 48000
- capture = m.ModuleAlsaCapture(args.arecord, s_info)
- ret = capture.init()
- if ret < 0:
- print("Failed to init arecord")
- return ret
- input_audio_source = capture
- last_audio_module = capture
- aac_enc = m.ModuleAacEnc()
- aac_enc.setProductor(last_audio_module)
- ret = aac_enc.init()
- if ret < 0:
- print("Failed to init aac_enc")
- return ret
- last_audio_module = aac_enc
- if args.aplay is not None:
- aac_dec = m.ModuleAacDec()
- if last_audio_module != None:
- aac_dec.setProductor(last_audio_module)
- else:
- aac_dec.setProductor(last_module)
- ret = aac_dec.init()
- if ret < 0:
- print("aac_dec init failed")
- return 1
- aplay = m.ModuleAlsaPlayBack(args.aplay)
- aplay.setProductor(aac_dec)
- aplay.setSynchronize(sync)
- ret = aplay.init()
- if ret < 0:
- print("aplay init failed")
- return 1
- input_para = last_module.getOutputImagePara()
- #input_para = last_module.getOutputImagePara()
- #last_module.setOutputDataCallback({}, call_back1)
- if input_para.v4l2Fmt == m.v4l2GetFmtByName("H264") or \
- input_para.v4l2Fmt == m.v4l2GetFmtByName("MJPEG")or \
- input_para.v4l2Fmt == m.v4l2GetFmtByName("H265"):
- dec = m.ModuleMppDec()
- dec.setProductor(last_module)
- ret = dec.init()
- if ret < 0:
- print("dec init failed")
- return 1
- last_module = dec
- input_para = last_module.getOutputImagePara()
- #last_module.setOutputDataCallback({}, call_back1)
- output_para=m.ImagePara(input_para.width, input_para.height, input_para.hstride, input_para.vstride, m.v4l2GetFmtByName(args.outputfmt))
- if args.output is not None:
- match = re.match(r"(\d+)x(\d+)", args.output)
- if match:
- width, height = map(int, match.groups())
- output_para.width = width
- output_para.height = height
- if args.rotate !=0 or input_para.height != output_para.height or \
- input_para.height != output_para.height or \
- input_para.width != output_para.width or \
- input_para.v4l2Fmt != output_para.v4l2Fmt:
- rotate = m.RgaRotate(args.rotate)
- if rotate == m.RgaRotate.RGA_ROTATE_90 or rotate == m.RgaRotate.RGA_ROTATE_270:
- t = output_para.width
- output_para.width = output_para.height
- output_para.height = t
- # hstride and vstride are aligned to 16 bytes
- output_para.hstride = align(output_para.width, 16)
- output_para.vstride = align(output_para.height, 16)
- rga = m.ModuleRga(output_para, rotate)
- rga.setProductor(last_module)
- rga.setBufferCount(2)
- ret = rga.init()
- if ret < 0:
- print("rga init failed")
- return 1
- last_module = rga
- cv_display = None
- #input_source.setOutputDataCallback({}, call_back1)
- last_module.setOutputDataCallback({}, call_back1)
- if args.drmdisplay != -1:
- input_para = last_module.getOutputImagePara()
- #last_module.setOutputDataCallback({}, call_back1)
- drm_display = m.ModuleDrmDisplay()
- drm_display.setPlanePara(m.v4l2GetFmtByName("NV12"), args.drmdisplay,
- m.PLANE_TYPE.PLANE_TYPE_OVERLAY_OR_PRIMARY, args.zpos, 1, args.connector)
- drm_display.setProductor(last_module)
- drm_display.setSynchronize(sync)
- ret = drm_display.init()
- if ret < 0:
- print("drm display init failed")
- return 1
- else:
- t_h = drm_display.getDisplayPlaneH()
- t_w = drm_display.getDisplayPlaneW()
- w = min(t_w, input_para.width)
- h = min(t_h, input_para.height)
- x = (t_w - w) // 2
- y = (t_h - h) // 2
- drm_display.setWindowSize(x, y, w, h)
- else:
- if args.x11display != 0:
- x11_display = m.ModuleRendererVideo(args.input_source)
- x11_display.setProductor(last_module)
- x11_display.setSynchronize(sync)
- ret = x11_display.init()
- if ret < 0:
- print("ModuleRendererVideo init failed")
- return 1
- if args.cvdisplay > 0:
- if not cv2_enable:
- print("Run 'pip3 install opencv-python' to install opencv")
- return 1
- if output_para.v4l2Fmt != m.v4l2GetFmtByName("BGR24"):
- print("Output image format is not 'BGR24', Use the '-b BGR24' option to specify image format.")
- return 1
- cv_display = Cv2Display("Cv2Display", None, sync, args.cvdisplay)
- cv_display.module = last_module.addExternalConsumer("Cv2Display", cv_display, cv2_extcall_back)
- if args.encodetype != -1:
- enc = m.ModuleMppEnc(m.EncodeType(args.encodetype))
- enc.setProductor(last_module)
- enc.setBufferCount(8)
- enc.setDuration(0) #Use the input source timestamp
- ret = enc.init()
- if ret < 0:
- print("ModuleMppEnc init failed")
- return 1
- last_module = enc
- if args.port != 0:
- push_s = None
- if args.push_type == 0:
- push_s = m.ModuleRtspServer("/live/0", args.port)
- else:
- push_s = m.ModuleRtmpServer("/live/0", args.port)
- push_s.setProductor(last_module)
- push_s.setBufferCount(0)
- if args.sync != -1:
- push_s.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))
- ret = push_s.init()
- if ret < 0:
- print("push server init failed")
- return 1
- if args.audio == True and args.push_type == 0:
- push_s_a = m.ModuleRtspServerExtend(push_s, "/live/0", args.port)
- if last_audio_module != None:
- push_s_a.setProductor(last_audio_module)
- else:
- push_s_a.setProductor(input_source)
- push_s_a.setAudioParameter(m.MEDIA_CODEC_AUDIO_AAC);
- ret = push_s_a.init()
- if ret < 0:
- print("Failed to init audio push server")
- return 1
- if args.rtmp_url is not None:
- push_c = m.ModuleRtmpClient(args.rtmp_url, m.ImagePara(), 0)
- push_c.setProductor(last_module)
- if args.sync != -1:
- push_c.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))
- ret = push_c.init()
- if ret < 0:
- print("Fail to init rtmp client push")
- return 1
- if args.gb28181_user_id is not None:
- gb28181_c = m.ModuleGB28181Client(args.gb28181_user_id, "ffmedia")
- gb28181_c.setProductor(last_module)
- if args.sync != -1:
- gb28181_c.setSynchronize(m.Synchronize(m.SynchronizeType(args.sync)))
- gb28181_c.setServerConfig(args.gb28181_server_id, args.gb28181_server_ip, \
- args.gb28181_server_ip, args.gb28181_server_port, 3600)
- ret = gb28181_c.init()
- if ret < 0:
- print("Failed to init gb28181 client")
- return 1
- if args.enmux is not None:
- enm = m.ModuleFileWriter(args.enmux)
- enm.setProductor(last_module)
- ret = enm.init()
- if ret < 0:
- print("ModuleFileWriter init failed")
- return 1
- if args.audio:
- enm_audio = m.ModuleFileWriterExtend(enm, args.enmux)
- if last_audio_module != None:
- enm_audio.setProductor(last_audio_module)
- else:
- enm_audio.setProductor(input_source)
- enm_audio.setAudioParameter(0, 0, 0, m.MEDIA_CODEC_AUDIO_AAC);
- ret = enm_audio.init()
- if ret < 0:
- print("Failed to init audio writer")
- return 1
- if args.save_file is not None:
- file = open(args.save_file, "wb")
- input_source.setOutputDataCallback(file, call_back)
- input_source.start()
- input_source.dumpPipe()
- if input_audio_source != None:
- input_audio_source.start()
- input_audio_source.dumpPipe()
- text = input("wait...")
- if input_audio_source != None:
- input_audio_source.dumpPipeSummary()
- input_audio_source.stop()
- input_source.dumpPipeSummary()
- input_source.stop()
- if args.save_file is not None:
- file.close()
- if __name__ == "__main__":
- main()
复制代码
程序启动命令是:
sudo /home/firefly/miniconda3/envs/pyev/bin/python3 ./czj_new_v5.py -i rtmp://127.0.0.1:1935/live/stream_key -o 1920x1080 -d 0
然后回调函数里call_back1,无法正常解析出图像。
下面是打印日志。
可以请大佬帮忙看看怎么正常解析rtmp里的图像,处理后再发送走吗?
|
|