Commit c04fa8c0 authored by NzSN's avatar NzSN

Update

parent b16f2f95
......@@ -127,6 +127,13 @@ class Channel {
return this.#shMem;
}
dataSize() {
this.#readPointerCache = this.#getReadPointer();
this.#writePointerCache = this.#getWritePointer();
return this.#size - this.#remain() - 1;
}
#remain() {
let readPos = this.#getReadPointer();
......@@ -291,7 +298,12 @@ const MESSAGE_TYPE = Object.freeze({
* within shared memory */
DATA : 4,
BRIDGE : 5,
MSG_MAX : 5,
BRIDGE_DESTROY : 6,
/* Encoder may be blocked cause of Muxer unable to
* consume packets as fast as Encoder provide */
DATA_REQUIRE : 7,
EOF : 8,
MSG_MAX : 8,
});
......
let encoder = null;
let isInited = false;
let isBridged = false;
let src = null;
let bridge = null;
let size = null;
let encoder = null;
let isInited = false;
let isBridged = false;
let src = null;
let bridge = null;
let wasmMem = null;
let size = null;
// Number of bytes for each read
// from channel
let READ_SIZE = (1920*1080*4) * 10 ;
const RGBFrameSize = 1920*1080*4;
let READ_SIZE = RGBFrameSize;
const SLEEP_INTERVAL = 100;
const SPIN_TIMEOUT = 500;
const ENC_BUF_SIZE = RGBFrameSize * 10
let encBuf = null;
// Read H264 Encode Group definitions
self.importScripts('defs.js');
// Load wasm encoder
self.importScripts('mp4encoder.js');
self.importScripts('./mp4encoder.js');
createMP4Encoder().then(m => {
encoder = m;
});
......@@ -38,37 +45,64 @@ async function main(msg) {
case MESSAGE_TYPE.BRIDGE:
if (!isBridged)
await bridging(msg);
break;
case MESSAGE_TYPE.BRIDGE_DESTROY:
if (isBridged) await deBridging();
break;
case MESSAGE_TYPE.DATA:
await steps();
if (isInited && isBridged) {
await steps();
}
break;
case MESSAGE_TYPE.DESTROY:
if (isInited) await destroy();
break;
case MESSAGE_TYPE.ERROR:
case MESSAGE_TYPE.DATA_REQUIRE:
break;
case MESSAGE_TYPE.EOF:
await EOFProcessing();
break;
}
}
async function bridging(msg) {
let info = getInfoFromMsg(msg);
bridge = new Channel(info.size, SharedArrayBuffer, info.shm);
if (!('size' in info) || !('shm' in info)) {
throw new Error("Incomplete Bridging message");
}
bridge = new Channel(info.size, SharedArrayBuffer, info.shm);
isBridged = true;
}
async function deBridging() {
bridge = null;
isBridged = false;
}
async function init(msg) {
let info = getInfoFromMsg(msg);
let info = getInfoFromMsg(msg), ret;
src = new Channel(info.size, SharedArrayBuffer, info.shm);
// Wait encoder init
while (encoder == null)
await sleep(100);
let size = encoder._malloc(4);
let ret = encoder._encodeInit(1920, 1080, 30);
size = encoder._malloc(4);
wasmMem = encoder._malloc(RGBFrameSize);
encodedFrames = encoder._malloc(ENC_BUF_SIZE);
ret = encoder._encodeInit(1920, 1080, 30);
isInited = true;
}
async function destroy() {
encoder._free(size);
encoder._free(wasmMem);
encoder._free(encodedFrames);
}
// Return value:
// True : Success
// False : Fail
......@@ -85,44 +119,90 @@ async function step() {
return true;
}
/* Steps() process as most datas as possible
* if the time to waiting for datas is more
* than 'SPIN_TIMEOUT' it will turn into sleep. */
async function steps() {
let timeout = SPIN_TIMEOUT;
src.setPriv(MSG.PRIV_FLAGS.EXECUTING);
src.setPriv(PRIV_FLAGS.EXECUTING);
while (timeout > 0) {
if (await step() === false && retryCount >= 0) {
timeout -= SLEEP_INTERVAL;
if (await step() === false) {
await sleep(SLEEP_INTERVAL);
timeout -= SLEEP_INTERVAL;
}
}
src.unsetPriv(MSG.PRIV_FLAGS.EXECUTING);
src.unsetPriv(PRIV_FLAGS.EXECUTING);
}
///////////////////////////////////////////////////////////////////////////////
// RGB Frame Processing Definitions //
///////////////////////////////////////////////////////////////////////////////
// Try to push all frames in Encoder
// to Muxer.
async function EOFProcessing() {
let ret = 0, size_;
while (true) {
ret = encoder._getPackets(encBuf, ENC_BUF_SIZE, size);
size_ = encoder.getValue(size, 'i32');
if (ret == 1) {
/* No more frames here */
return true;
} else if (ret > 0) {
throw new Error("Encoder internal error")
}
sendToMuxerUntilSuccess(size_, 500);
}
}
async function RGBProcessing(frame) {
let ret = 0, size_;
// Encode RGB Frame into H264 Frame
let h264frame = encoder.encode(frame, size);
let frameSize = encoder.getValue(size, 'i32');
encoder.HEAP8.set(frame, wasmMem);
ret = encoder._encode(wasmMem, encBuf, ENC_BUF_SIZE, size);
size_ = encoder.getValue(size, 'i32');
if (ret == 2) {
throw new Error("Buffer is too small");
} else if (ret == 3) {
throw new Error("Unknown error");
}
sendToMuxer(size_);
return true;
}
///////////////////////////////////////////////////////////////////////////////
// Helper Procedures //
///////////////////////////////////////////////////////////////////////////////
let h264frame_ = encoder.HEAP8.subarray(h264frame, h264frame+frameSize);
// Unit of interval is millionsecond
async function sendToMuxerUntilSuccess(size, interval) {
while (!sendToMuxer(size)) {
await sleep(interval);
}
}
async function sendToMuxer(size) {
let h264Frames = encoder.HEAP8.subarray(encBuf, encBuf+size);
// Push to Muxer
bridge.push(h264frame_);
if (bridge.push(h264Frames) == false) {
return false;
}
// To Check that is Muxer in executing
let flag = bridge.readPriv()
if (flag & PRIV_FLAGS.EXECUTING == 0) {
// Muxer is idle, send a Data message to
// Muxer to wake it up.
postMessage(makeMsg(Data, {}));
postMessage(makeMsg(MESSAGE_TYPE.Data, {}));
}
return true;
......
// Status
let isInited = false;
let isBridged = false;
let bridge = null;
let muxer = null;
let numOfEncs = 0;
// Constants
const SLEEP_INTERVAL = 100;
const SPIN_TIMEOUT = 500;
// WASM Objects
let MUX_BUFFER_SIZE = Math.pow(2, 22);
let muxBuffer = null;
// Read H264 Encode Group definitions
self.importScripts('defs.js');
// Load wasm encoder
self.importScripts('mp4encoder.js');
self.importScripts('./mp4encoder.js');
createMP4Encoder().then(m => {
encoder = m;
muxer = m;
});
......@@ -32,17 +40,68 @@ async function main(msg) {
case MESSAGE_TYPE.BRIDGE:
if (!isBridged) await bridging();
break;
case MESSAGE_TYPE.BRIDGE_DESTROY:
if (isBridged) await deBridging();
break;
case MESSAGE_TYPE.DATA:
await steps();
break;
case MESSAGE_TYPE.DESTROY:
if (isInited) await destroy();
break;
}
}
async function init(msg) {}
async function init(msg) {
let info = getInfoFromMsg(msg);
while (muxer == null) await sleep(100);
if ('wasmBufSize' in info)
MUX_BUFFER_SIZE = info.wasmBufSize;
if ('numofEncs' in info)
numOfEncs = info.numOfEncs;
muxBuffer = muxer._malloc(MUX_BUFFER_SIZE);
/* Init Muxer */
muxer.muxInit(numofEncs);
}
async function destroy() {
/* Free all wasm memories */
muxer._free(muxBuffer);
muxBuffer = null
async function bridge(msg) {}
/* Destroy MUXER_WASM */
muxer.muxDestruct();
}
async function bridging(msg) {
let info = getInfoFromMsg(msg);
if (!('size' in info) || !('shm' in info)) {
throw new Error("Incomplete Bridging message");
}
bridge = new Channel(info.size, SharedArrayBuffer, info.shm);
isBridged = true;
}
async function deBridging() {
bridge = null;
isBridged = false;
}
// Return value:
// True : Success
// False : Fail
async function step() {
}
async function steps() {
let timeout = SPIN_TIMEOUT;
......
let i = 0;
let begin = false;
let MP4Encoder = null;
let FS = null;
let bufferPtr = null;
let load = true;
let fps = 30;
let size = null;
let getCStringPtr = (jstr) => {
let lengthBytes = lengthBytesUTF8(jstr) + 1;
let p = MP4Encoder._malloc(lengthBytes);
stringToUTF8(jstr, p, lengthBytes);
return p;
}
let ready = () => {
while (!MP4Encoder || !load);
bufferPtr = MP4Encoder._malloc(1920*1080*4);
stop = false;
let pStr = getCStringPtr("/tmp/demo2.mp4");
let ret = MP4Encoder._createH264(pStr, 1920, 1080, fps);
//ret = MP4Encoder._encodeInit(1920,1080,fps);
//size = MP4Encoder._malloc(4);
console.log("ready =>", ret);
}
onmessage = (e) => {
if (!begin) {
ready();
begin = true;
if (FS == null) {
throw new Error("Failed to get FS");
}
}
if (e.data.data == null) {
console.log("Terminated: " + e.data.name);
MP4Encoder._close();
let buff = FS.readFile('/tmp/demo2.mp4', { encoding: 'binary' });
let generated_file = new Blob([buff]);
postMessage([generated_file]);
} else {
MP4Encoder.HEAP8.set(e.data.data, bufferPtr);
//console.log("RET IS: " + MP4Encoder._encodeFrame(bufferPtr, size));
//console.log("SIZE:" + MP4Encoder.getValue(size, 'i32'));
console.log(MP4Encoder._addFrame(bufferPtr));
}
}
self.importScripts("mp4encoder.js");
createMP4Encoder().then(m => {
console.log(m)
MP4Encoder = m;
FS = MP4Encoder.FS
lengthBytesUTF8 = MP4Encoder.lengthBytesUTF8
stringToUTF8 = MP4Encoder.stringToUTF8
});
import { Observable, filter } from 'rxjs';
import { COMMANDS } from './WWOpts.js';
export class WW extends Observable {
......@@ -52,7 +53,7 @@ export class WW extends Observable {
return;
}
await preconnect(targetWW);
await preconnect(this, targetWW);
let sub = this.pipe(filter(data => msgPredicate(data)))
.subscribe(data => { targetWW.postMessage(data); });
......
......@@ -58,19 +58,13 @@ export class Channel {
this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
if (shMem == null) {
this.#writePointerCache = this.#metaSize;
this.#readPointerCache = this.#metaSize;
this.#writePointerCache = this.#metaSize;
this.#readPointerCache = this.#metaSize;
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
} else {
this.#writePointerCache = this.#getWritePointer();
this.#readPointerCache = this.#getReadPointer();
}
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
......@@ -126,6 +120,12 @@ export class Channel {
return this.#shMem;
}
dataSize() {
this.#readPointerCache = this.#getReadPointer();
this.#writePointerCache = this.#getWritePointer();
return this.#size - this.#remain() - 1;
}
#remain() {
let readPos = this.#getReadPointer();
......@@ -217,7 +217,6 @@ export class Channel {
}
push(data /* Uint8Array */) {
let writePos = this.#writePointerCache;
if (!this.#isAbleToWrite(data.byteLength)) {
......@@ -236,11 +235,12 @@ export class Channel {
for (let key in schedule) {
plan = schedule[key];
if (plan.size == 0)
continue;
this.#buffer.set(
data.slice(srcPos, srcPos+plan.size), plan.pos)
let d_ = data.slice(srcPos, srcPos+plan.size);
this.#buffer.set(d_, plan.pos);
srcPos += plan.size;
writePos = plan.pos+plan.size;
}
......
......@@ -24,7 +24,14 @@ export const MESSAGE_TYPE = Object.freeze({
/* Notify to oppsite that there is datas
* within shared memory */
DATA : 4,
MSG_MAX : 5,
BRIDGE : 5,
BRIDGE_DESTROY : 6,
/* Encoder may be blocked cause of Muxer unable to
* consume packets as fast as Encoder provide */
DATA_REQUIRE : 7,
EOF : 8,
MSG_MAX : 8,
});
......
......@@ -116,7 +116,7 @@ export class H264EncWWGroup extends WWGroup {
let enc = this.#encWorkers[i];
enc.connect(
this.#muxWorker,
async ww => { encMuxBridge(this.#bridges, ww, this.#muxWorker) },
async (ww, mux) => { encMuxBridge(this.#bridges, ww, mux) },
async msg => { return isDataMsg(msg); });
}
}
......
#include <stdio.h>
#include <stdint.h>
#include <malloc.h>
#include "wasm.h"
extern "C" {
#include <libavcodec/avcodec.h>
#include <libswscale/swscale.h>
}
int width_ = 0;
int height_ = 0;
int framerate_ = 0;
AVFrame *frame;
AVPacket *packet;
AVCodecContext *cc;
struct SwsContext* swsCtx = NULL;
EM_PORT_API(uint8_t) encodeInit(int width, int height, int fps) {
width_ = width;
height_ = height;
framerate_ = fps;
const AVCodec *encoder = avcodec_find_encoder_by_name("libx264");
if (encoder == NULL) {
fprintf(stderr, "Unable to find H.264 decoder\n");
return 1;
}
cc = avcodec_alloc_context3(encoder);
if (cc == NULL) {
fprintf(stderr, "Unable to alloc codec context\n");
return 2;
}
// Setup encode parameters
cc->width = width;
cc->height = height;
cc->pix_fmt = AV_PIX_FMT_YUV420P;
cc->time_base = (AVRational){1, 90000};
if (avcodec_open2(cc, encoder, NULL) < 0) {
fprintf(stderr, "Unable to open codec context\n");
return 3;
}
packet = av_packet_alloc();
if (packet == NULL) {
fprintf(stderr, "Could not allocate packet\n");
}
frame = av_frame_alloc();
frame->format = cc->pix_fmt;
frame->width = cc->width;
frame->height = cc->height;
int ret = av_frame_get_buffer(frame, 0);
if (ret < 0) {
fprintf(stderr, "Could not allocate the video frame data\n");
return 4;
}
swsCtx = sws_getCachedContext(swsCtx, width, height, AV_PIX_FMT_RGBA,
width, height, AV_PIX_FMT_YUV420P,
SWS_BICUBIC,
NULL, NULL, NULL);
return 0;
}
EM_PORT_API(uint8_t*) encode(uint8_t *data, uint32_t *size) {
int ret = 0;
uint8_t *mem;
if (av_frame_make_writable(frame) < 0) {
fprintf(stderr, "Fail to make frame writable\n");
}
AVFrame *rgbaFrame = av_frame_alloc();
rgbaFrame->format =AV_PIX_FMT_RGBA;
rgbaFrame->height = cc->height;
rgbaFrame->width = cc->width;
avpicture_fill((AVPicture*)rgbaFrame, data, AV_PIX_FMT_RGBA, width_, height_);
//转换的YUV数据存放在frame
int outSliceH = sws_scale(swsCtx, (const uint8_t* const*)rgbaFrame->data, rgbaFrame->linesize, 0, height_,
frame->data, frame->linesize);
if (outSliceH <= 0) {
printf("outSliceH <= 0 \n");
return NULL;
}
frame->pts = AV_NOPTS_VALUE;
frame->pict_type = AV_PICTURE_TYPE_I;
// Encode
ret = avcodec_send_frame(cc, frame);
if (ret < 0) {
fprintf(stderr, "Fail to encoding\n");
}
while (true) {
ret = avcodec_receive_packet(cc, packet);
if (ret) break;
// For video frame avcodec_receive_packet should return
// only once.
mem = (uint8_t*)malloc(packet->size);
memcpy(mem, packet->data, packet->size);
}
av_packet_unref(packet);
av_frame_unref(frame);
*size = packet->size;
return mem;
}
// Trivial main
int main(int argc, char *argv[]) {}
#include "proto.h"
#ifndef PROTOLISTS_H
#define PROTOLISTS_H
namespace IOProto {
}
#endif /* PROTOLISTS_H */
#include "proto.h"
#ifndef SHAREDMEMPROTO_H
#define SHAREDMEMPROTO_H
namespace IOProto {
class SharedMemProto: public IOProtocol<SharedMemProto> {
public:
SharedMemProto(void *priv, RW_FLAG flag):
IOProtocol("", flag, priv) {}
};
}
#endif /* SHAREDMEMPROTO_H */
#include "transientMemProto.h"
#include <memory>
#include <algorithm>
#include <queue>
extern "C" {
#include <malloc.h>
}
using std::queue;
namespace IOProto {
namespace TransientMemProto {
/* External buffer must not be realeased
* by TransientMemProto */
struct BufferInfo {
uint8_t *buffer;
uint8_t *pos;
size_t remain;
};
bool eof = false;
BufferInfo current;
queue<BufferInfo> externalBuffers;
// Configurations
static bool isRleaseExtBuf = false;
static bool debugging = true;
constexpr size_t buffMaxSize = 33554432;
uint64_t dataSize = 0;
uint8_t buff[buffMaxSize];
uint8_t *buffCurrent = buff;
uint8_t *buffEnd = buff + buffMaxSize;
int numOfFrames() {
return externalBuffers.size();
}
void config_releaseExtBuf(bool onoff) {
isRleaseExtBuf = onoff;
}
/* Note: The passed buf should be full filled */
void attachBuffer(uint8_t *buf, size_t bufSize) {
externalBuffers.push({ buf, buf, bufSize });
}
int TransientMemProto::read_packet_internal(void *priv, uint8_t *buf, int bufSize) {
size_t readSize = 0;
if(flag == write || bufSize < 0 || buf == nullptr) {
AVERROR(EINVAL);
}
// No Datas
if (current.remain == 0 && externalBuffers.empty()) {
if (eof)
return AVERROR_EOF;
else
return AVERROR(EAGAIN);
}
if (current.remain == 0) {
current = externalBuffers.front();
externalBuffers.pop();
}
readSize = std::min((size_t)bufSize, current.remain);
// Read data
memcpy(buf, current.pos, readSize);
// Update Buffer status
current.pos += readSize;
current.remain -= readSize;
if (current.remain == 0 && isRleaseExtBuf) {
delete current.buffer;
}
return readSize == 0 ? AVERROR(EAGAIN) : readSize;
}
int TransientMemProto::write_packet_internal(void *priv, uint8_t *buf, int bufSize) {
uint8_t *newBuf = new uint8_t[bufSize];
memcpy(newBuf, buf, bufSize);
attachBuffer(newBuf, bufSize);
return bufSize;
}
int64_t TransientMemProto::seek_packet_internal(void *opaque, int64_t offset, int whence) {
return 0;
}
void TransientMemProto::close_internal() {
eof = true;
}
}
}
#include <stdio.h>
#include <stdint.h>
#include <malloc.h>
#include "wasm.h"
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libswscale/swscale.h>
#include <libavutil/avutil.h>
}
///////////////////////////////////////////////////////////////////////////////
// Encoding Part //
///////////////////////////////////////////////////////////////////////////////
static int width_ = 0;
static int height_ = 0;
static int framerate_ = 0;
static int timescale = 90000;
static AVFrame *rgbaFrame;
static unsigned frameIndex = 0;
static AVFrame *frame;
static AVPacket *packet;
static AVCodecContext *cc;
static struct SwsContext* swsCtx = NULL;
static AVPacket *lastPkt = NULL;
// Bitstream Filter
static AVBSFContext *bsCtx = NULL;
static const AVBitStreamFilter *bsFilter = NULL;
EM_PORT_API(int) getPackets(uint8_t *buffer, uint32_t size, uint32_t *osize);
EM_PORT_API(uint8_t) encodeInit(int width, int height, int fps) {
width_ = width;
height_ = height;
framerate_ = fps;
const AVCodec *encoder = avcodec_find_encoder_by_name("libx264");
if (encoder == NULL) {
fprintf(stderr, "Unable to find H.264 decoder\n");
return 1;
}
cc = avcodec_alloc_context3(encoder);
if (cc == NULL) {
fprintf(stderr, "Unable to alloc codec context\n");
return 2;
}
// Setup encode parameters
cc->width = width;
cc->height = height;
cc->pix_fmt = AV_PIX_FMT_YUV420P;
cc->time_base = (AVRational){1, timescale};
cc->gop_size = 0;
if (avcodec_open2(cc, encoder, NULL) < 0) {
fprintf(stderr, "Unable to open codec context\n");
return 3;
}
packet = av_packet_alloc();
if (packet == NULL) {
fprintf(stderr, "Could not allocate packet\n");
}
frame = av_frame_alloc();
frame->format = cc->pix_fmt;
frame->width = cc->width;
frame->height = cc->height;
int ret = av_frame_get_buffer(frame, 0);
if (ret < 0) {
fprintf(stderr, "Could not allocate the video frame data\n");
return 4;
}
swsCtx = sws_getCachedContext(swsCtx, width, height, AV_PIX_FMT_RGBA,
width, height, AV_PIX_FMT_YUV420P,
SWS_BICUBIC,
NULL, NULL, NULL);
// Init bitstream filter
bsFilter = av_bsf_get_by_name("h264_mp4toannexb");
if (bsFilter == NULL) {
}
av_bsf_alloc(bsFilter, &bsCtx);
if (bsCtx == NULL) {
}
return 0;
}
/* Ret Values:
* 0: Success
* 1: AGAIN
* 2: Buffer too small
* 3: ERROR */
EM_PORT_API(int) encode(uint8_t *data, uint8_t *buffer, uint32_t size, uint32_t *osize) {
int ret = 0;
if (av_frame_make_writable(frame) < 0) {
fprintf(stderr, "Fail to make frame writable\n");
}
rgbaFrame = av_frame_alloc();
rgbaFrame->format =AV_PIX_FMT_RGBA;
rgbaFrame->height = cc->height;
rgbaFrame->width = cc->width;
avpicture_fill((AVPicture*)rgbaFrame, data, AV_PIX_FMT_RGBA, width_, height_);
//转换的YUV数据存放在frame
int outSliceH = sws_scale(swsCtx, (const uint8_t* const*)rgbaFrame->data, rgbaFrame->linesize, 0, height_,
frame->data, frame->linesize);
if (outSliceH <= 0) {
printf("outSliceH <= 0 \n");
return 3;
}
frame->pts = timescale / framerate_ * frameIndex;
frame->pict_type = AV_PICTURE_TYPE_I;
++frameIndex;
// Encode
ret = avcodec_send_frame(cc, frame);
if (ret < 0) {
fprintf(stderr, "Fail to encoding\n");
}
return getPackets(buffer, size, osize);
}
/* Ret Values:
* 0: Success
* 1: AGAIN or EOF
* 2: Buffer too small
* 3: ERROR */
EM_PORT_API(int) getPackets(uint8_t *buffer, uint32_t size, uint32_t *osize) {
int ret = 0;
uint8_t *pos = buffer;
int remainSize = size;
*osize = 0;
if (lastPkt != NULL && lastPkt->size > remainSize) {
memcpy(pos, lastPkt->data, lastPkt->size);
remainSize -= lastPkt->size;
av_packet_unref(lastPkt);
lastPkt = NULL;
} else if (lastPkt != NULL) {
/* Buffer is too small to containe the packet */
return 2;
}
while (true) {
ret = avcodec_receive_packet(cc, packet);
if (ret < 0) {
ret = 1;
goto DONE;
}
// Do bitstream modification to let packet can be
// streaming.
// For video frame avcodec_receive_packet should return
// only once.
if (remainSize > packet->size) {
memcpy(pos, packet->data, packet->size);
remainSize -= packet->size;
} else {
lastPkt = packet;
break;
}
av_packet_unref(packet);
}
DONE:
*osize = size - remainSize;
return ret;
}
///////////////////////////////////////////////////////////////////////////////
// Muxing Parts //
///////////////////////////////////////////////////////////////////////////////
#include "ioctx.h"
#include "proto/movMemProto.h"
#include <vector>
#include <queue>
#include <memory>
using IOEndpoint = IOProto::MovMemProto::MovMemProto;
namespace MuxEnv {
class PacketBuffer {
public:
using PacketChain = std::queue<AVPacket*>;
PacketBuffer() = default;
PacketBuffer(int numOfStream) {
for (int i = 0; i < numOfStream; ++i) {
packets.emplace_back(PacketChain());
}
}
void push(int idx, AVPacket *packet) {
packets[idx].push(packet);
}
AVPacket* pop(int idx) {
auto &chain = packets[idx];
if (chain.empty()) {
return nullptr;
}
auto pkt = chain.front();
chain.pop();
return pkt;
}
private:
std::vector<PacketChain> packets;
};
bool ioCtxInited = false;
int numOfStreams = 0;
PacketBuffer pktBuffer;
std::shared_ptr<bool[]> finished;
std::vector<IOEndpoint> protos;
std::vector<IOCtx::InCtx> ctxs;
constexpr size_t PKT_BUF_UPPER_BOUND = 300;
};
EM_PORT_API(int) muxInit(int numOfStreams) {
// Setup MuxEnv Status
MuxEnv::numOfStreams = numOfStreams;
MuxEnv::pktBuffer = MuxEnv::PacketBuffer(numOfStreams);
MuxEnv::finished = std::shared_ptr<bool[]>(new bool[numOfStreams]);
for (int i = 0; i < numOfStreams; ++i) {
// Create IOEncpoint for a IOCtx
MuxEnv::protos.emplace_back(IOEndpoint(nullptr, IOProto::read));
}
return 0;
}
EM_PORT_API(int) muxPush(int sIdx, uint8_t *data, size_t size) {
IOProto::MovMemProto::MemPiece mem;
MuxEnv::protos[sIdx].push(data, size);
return 0;
}
int bufferPackets() {
int ret = 0, total = 0;
AVPacket *pkt = nullptr;
while (true) {
unsigned readed = 0;
// Read out all of packets of all streams
// then buffer such packets.
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
if (MuxEnv::finished[i])
continue;
IOCtx::InCtx &c = MuxEnv::ctxs[i];
pkt = pkt == nullptr ? av_packet_alloc() : pkt;
ret = c.readFrame_(pkt);
if (ret < 0) {
if (ret == AVERROR_EOF) {
MuxEnv::finished[i] = true;
}
av_packet_unref(pkt);
continue;
}
MuxEnv::pktBuffer.push(i, pkt);
pkt = nullptr;
++readed;
}
total += readed;
// Too few packets to read from Encoder, exit the loop
if (readed < MuxEnv::numOfStreams ||
// Packets reside in PKT Buffer is more than
// 'PKT_BUF_UPPER_BOUND', exit the loop
total >= MuxEnv::PKT_BUF_UPPER_BOUND) {
break;
}
}
return total;
}
EM_PORT_API(int) muxStep() {
// Buffer Encoded frames from
// Encoder into 'MuxEnv::pktBuffer'
bufferPackets();
// Write out to file
return 0;
}
int main(int argc, char *argv[]) {}
......@@ -7,7 +7,7 @@ namespace IOCtx {
// InOutCtx //
///////////////////////////////////////////////////////////////////////////////
AVStream* InOutCtx::getStream(StreamPrediction predict) {
AVStream* InOutCtx::getStream(StreamPredicate predict) {
AVStream *s;
AVFormatContext *fmtCtx = fmt.get();
......@@ -56,6 +56,11 @@ void InCtx::readFrame(AVPacket *packet) {
}
int InCtx::readFrame_(AVPacket *packet) noexcept {
return av_read_frame(fmt.get(), packet);
}
///////////////////////////////////////////////////////////////////////////////
// OutCtx //
///////////////////////////////////////////////////////////////////////////////
......
......@@ -39,7 +39,7 @@ enum IOCTX_ERROR {
ERROR,
};
using StreamPrediction = std::function<bool(AVStream*)>;
using StreamPredicate = std::function<bool(AVStream*)>;
class InOutCtx {
......@@ -48,7 +48,7 @@ public:
InOutCtx(std::string path, bool isCustom):
fmt(nullptr), path(path), isCustomIO(isCustom) {}
AVStream* getStream(StreamPrediction);
AVStream* getStream(StreamPredicate);
bool isReady() noexcept;
void closeCustomIO();
......@@ -83,6 +83,7 @@ public:
}
void readFrame(AVPacket*);
int readFrame_(AVPacket*) noexcept;
};
......
#include "movMemProto.h"
#include <algorithm>
#include <stdexcept>
namespace IOProto {
namespace MovMemProto {
int i = 0;
int MovMemProto::read_packet_internal(void *priv, uint8_t *buf, int bufSize) {
if (flag == write || bufSize < 0 || buf == nullptr) {
return AVERROR(EINVAL);
}
// No Datas
if (trans.mem.data == nullptr && s.empty()) {
return AVERROR(EAGAIN);
}
if (trans.mem.data == nullptr) {
trans.mem = s.front();
// EOF arrive
if (trans.mem.data == nullptr) {
return AVERROR_EOF;
}
trans.pos = trans.mem.data.get();
trans.remain = trans.mem.size;
s.pop();
}
size_t sizeToRead = std::min((size_t)bufSize, trans.remain);
memcpy(buf, trans.pos, sizeToRead);
// Update TransContext
trans.pos += sizeToRead;
trans.remain -= sizeToRead;
// Datas of current memory piece is all
// readed, do cleaning.
if (trans.remain == 0) {
trans.mem.data = nullptr;
trans.pos = nullptr;
trans.remain = 0;
}
return sizeToRead == 0 ? AVERROR(EAGAIN) : sizeToRead;
}
int MovMemProto::write_packet_internal(void *priv, uint8_t *buf, int bufSize) {
throw std::runtime_error("MovMemProto not support write");
}
int64_t MovMemProto::seek_packet_internal(void *opaque, int64_t offset, int whence) {
return 0;
}
void MovMemProto::close_internal() {
trans.mem.data = nullptr;
while (!s.empty()) {
auto mem = s.front();
s.pop();
}
}
}
}
#include "proto.h"
#include <memory>
#include <queue>
#ifndef TRANSIENTMEMPROTO_H
#define TRANSIENTMEMPROTO_H
#ifndef MOVMEMPROTO_H
#define MOVMEMPROTO_H
namespace IOProto {
namespace TransientMemProto {
extern uint64_t dataSize;
namespace MovMemProto {
struct PacketMem {
std::shared_ptr<uint8_t[]> data;
size_t size;
};
void config_releaseExtBuf(bool onoff);
bool isEmpty();
int numOfFrames();
using MemPiece = PacketMem;
using Stream = std::queue<MemPiece>;
/* Note: The passed buf should be full filled */
void attachBuffer(uint8_t *buf, size_t bufSize);
struct TransContext {
MemPiece mem;
uint8_t *pos;
size_t remain;
};
class TransientMemProto: public IOProtocol<TransientMemProto> {
class MovMemProto: public IOProtocol<MovMemProto> {
public:
static constexpr char protoName[] = "MovMemProto";
static constexpr char protoName[] = "TransientMemProto";
TransientMemProto(void *priv, RW_FLAG flag):
MovMemProto(void *priv, RW_FLAG flag):
IOProtocol(protoName, flag, priv) {}
int read_packet_internal(void *priv, uint8_t *buf, int bufSize);
int write_packet_internal(void *priv, uint8_t *buf, int bufSize);
/* MovMemProto temporarily only support Stream so seek function
* is ignored. */
int64_t seek_packet_internal(void *opaque, int64_t offset, int whence);
void close_internal();
};
void push(uint8_t data[], size_t size) {
s.push(MemPiece{ std::shared_ptr<uint8_t[]>(data), size });
}
}
}
void eof() {
s.push(MemPiece{ nullptr, 0 });
}
private:
Stream s;
TransContext trans;
};
}}
#endif /* TRANSIENTMEMPROTO_H */
#endif /* MOVMEMPROTO_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment