Commit 522461ca authored by Linshizhi's avatar Linshizhi

update

parent 0e206d09
......@@ -91,7 +91,7 @@ module.exports = function(config) {
// start these browsers
// available browser launchers: https://www.npmjs.com/search?q=keywords:karma-launcher
browsers: ['ChromeHeadless'],
// browsers: ['ChromeHeadless'],
// Continuous Integration mode
// if true, Karma captures browsers, runs the tests and exits
......
......@@ -14,6 +14,8 @@
"babel-loader": "^8.2.5",
"babelify": "^10.0.0",
"browserify": "^17.0.0",
"file-save": "^0.2.0",
"file-saver": "^2.0.5",
"jasmine": "^4.1.0",
"jasmine-browser-runner": "^1.0.0",
"jasmine-core": "^4.1.0",
......
......@@ -50,7 +50,7 @@ class Channel {
assert(size >= 2, `Channel require its data area has at least 2 Bytes.`)
this.#size = size + 1;
this.#size = size+1;
// Init shared memory
this.#metaSize = this.#rFieldPosLen + this.#wFieldPosLen + this.#priFieldLen;
......@@ -69,6 +69,7 @@ class Channel {
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
}
#getReadPointer() {
......@@ -125,16 +126,15 @@ class Channel {
return this.#size - this.#remain() - 1;
}
#remain() {
let readPos = this.#getReadPointer();
this.#readPointerCache = this.#getReadPointer();
if (this.#writePointerCache == readPos) {
if (this.#writePointerCache == this.#readPointerCache) {
return this.#size - 1;
} else if (this.#writePointerCache > readPos) {
return this.#size - (this.#writePointerCache - readPos) - 1;
} else if (this.#writePointerCache > this.#readPointerCache) {
return this.#size - (this.#writePointerCache - this.#readPointerCache) - 1;
} else {
return readPos - this.#writePointerCache - 1;
return this.#readPointerCache - this.#writePointerCache - 1;
}
}
......@@ -186,7 +186,7 @@ class Channel {
return new Uint8Array(0);
} else if (this.#readPointerCache < writePos) {
readTo = this.#readPointerCache + Math.min(size, writePos - this.#readPointerCache);
readBuffer = this.#buffer.slice(this.#readPointerCache, readTo);
readBuffer = this.#buffer.subarray(this.#readPointerCache, readTo);
this.#readPointerUpdate(readTo);
} else {
// To make sure
......@@ -197,13 +197,13 @@ class Channel {
readBuffer = new Uint8Array(firstRSize+secondRSize);
// First read
readBuffer.set(this.#buffer.slice(
readBuffer.set(this.#buffer.subarray(
this.#readPointerCache, this.#readPointerCache+firstRSize), 0);
// Second Read
if (secondRSize > 0) {
readBuffer.set(
this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize),
this.#buffer.subarray(this.#metaSize, this.#metaSize+secondRSize),
firstRSize);
this.#readPointerUpdate(this.#metaSize+secondRSize);
} else {
......@@ -217,8 +217,8 @@ class Channel {
}
push(data /* Uint8Array */) {
let writePos = this.#writePointerCache;
this.#readPointerCache = this.#getReadPointer();
if (!this.#isAbleToWrite(data.byteLength)) {
return false;
......@@ -233,18 +233,24 @@ class Channel {
// Perfrom write schedule
let srcPos = 0, plan;
let length = 0;
for (let key in schedule) {
plan = schedule[key];
if (plan.size == 0)
continue;
this.#buffer.set(
data.slice(srcPos, srcPos+plan.size), plan.pos)
let d_ = data.subarray(srcPos, srcPos+plan.size);
this.#buffer.set(d_, plan.pos);
srcPos += plan.size;
writePos = plan.pos+plan.size;
length += plan.size;
}
console.log("CHANNEL: WRITE " + length, data.byteLength);
// Caution: 'Write Pointer' must be updated after
// all datas are writed but not before or
// at intermediate of some writes otherwise
......@@ -266,7 +272,6 @@ class Channel {
}
///////////////////////////////////////////////////////////////////////////////
// Message Definitions //
///////////////////////////////////////////////////////////////////////////////
......@@ -295,7 +300,8 @@ const MESSAGE_TYPE = Object.freeze({
* consume packets as fast as Encoder provide */
DATA_REQUIRE : 7,
EOF : 8,
MSG_MAX : 8,
DONE : 9,
MSG_MAX : 9,
});
......@@ -397,3 +403,7 @@ function assert(expr, message) {
async function sleep(ms) {
await new Promise(r => setTimeout(() => r(), ms));
}
const areEqual = (first, second) =>
first.length === second.length &&
first.every((value, index) => value === second[index]);
......@@ -7,6 +7,7 @@ let src = null;
let bridge = null;
let wasmMem = null;
let size = null;
let eof = false;
// Number of bytes for each read
// from channel
......@@ -58,7 +59,6 @@ async function eventloop(msg) {
case MESSAGE_TYPE.DATA_REQUIRE:
break;
case MESSAGE_TYPE.EOF:
await EOFProcessing();
break;
}
}
......@@ -71,7 +71,7 @@ async function bridging(msg) {
}
bridge = new Channel(info.size, SharedArrayBuffer, info.shm);
postMessage(makeMsg(MESSAGE_TYPE.BRIDGE, {}));
postMessage(makeMsg(MESSAGE_TYPE.BRIDGE, { here: 1 }));
isBridged = true;
}
......@@ -110,7 +110,14 @@ async function destroy() {
async function step() {
// Read RGB Frame from Channel.
let data = src.readData(READ_SIZE);
if (data.byteLength == 0) {
// Check eof flag
let flag = src.readPriv();
if (flag & PRIV_FLAGS.EOF) {
await EOFProcessing();
return true;
}
return false;
}
......@@ -130,6 +137,9 @@ async function steps() {
src.setPriv(PRIV_FLAGS.EXECUTING);
while (timeout > 0) {
if (eof) break;
if (await step() === false) {
await sleep(SLEEP_INTERVAL);
timeout -= SLEEP_INTERVAL;
......@@ -151,19 +161,27 @@ async function steps() {
async function EOFProcessing() {
let ret = 0, size_;
console.log(ident + " EOF");
while (true) {
ret = encoder._getPackets(encBuf, ENC_BUF_SIZE, size);
size_ = encoder.getValue(size, 'i32');
if (ret == 1) {
/* No more frames here */
return true;
break;
} else if (ret > 0) {
throw new Error("Encoder internal error")
}
sendToMuxerUntilSuccess(size_, 500);
}
// Notify muxer this stream is all transfered.
bridge.setPriv(PRIV_FLAGS.EOF);
eof = true;
return true;
}
async function RGBProcessing(frame) {
......@@ -176,14 +194,17 @@ async function RGBProcessing(frame) {
ret = encoder._encode(wasmMem, encBuf, ENC_BUF_SIZE, size);
size_ = encoder.getValue(size, 'i32');
if (size_ == 0)
return true;
if (ret == 2) {
throw new Error("Buffer is too small");
} else if (ret == 3) {
throw new Error("Unknown error");
}
console.log("Encoder Send size: " + size_);
sendToMuxer(size_);
await sendToMuxerUntilSuccess(size_, 300);
return true;
}
......
......@@ -4,10 +4,14 @@ let ident = undefined;
let isInited = false;
let isBridged = false;
let bridge = null;
let bridgeCount = 0;
let muxer = null;
let numOfEncs = 1;
let channels = [];
let eof = [];
let eofCount = 0;
let done = false;
// Constants
const SLEEP_INTERVAL = 100;
......@@ -41,7 +45,7 @@ async function main(msg) {
if (!isInited) await init(msg);
break;
case MESSAGE_TYPE.BRIDGE:
if (!isBridged) await bridging(msg);
await bridging(msg);
break;
case MESSAGE_TYPE.BRIDGE_DESTROY:
if (isBridged) await deBridging();
......@@ -52,6 +56,10 @@ async function main(msg) {
case MESSAGE_TYPE.DESTROY:
if (isInited) await destroy();
break;
case MESSAGE_TYPE.DATA_REQUIRE:
let buff = muxer.FS.readFile('/tmp/output.mp4', { encoding: 'binary' });
postMessage(buff);
break;
}
}
......@@ -62,11 +70,12 @@ async function init(msg) {
if ('wasmBufSize' in info)
MUX_BUFFER_SIZE = info.wasmBufSize;
if ('numofEncs' in info)
numOfEncs = info.numofEncs;
if ('numOfEncs' in info)
numOfEncs = info.numOfEncs;
/* Init Muxer */
muxer._muxInit(numOfEncs);
eof = [...Array(numOfEncs).keys()].map(() => false);
postMessage(makeMsg(MESSAGE_TYPE.INIT, {}));
}
......@@ -85,13 +94,14 @@ async function bridging(msg) {
}
bridge = new Channel(info.size, SharedArrayBuffer, info.shm);
channels.push(bridge);
isBridged = true;
bridgeCount++;
}
async function deBridging() {
bridge = null;
isBridged = false;
}
......@@ -105,28 +115,37 @@ async function step() {
// Read Datas
for (let i = 0; i < numOfEncs; ++i) {
if (eof[i])
continue;
chn = channels[i];
let data = chn.readData(MUX_BUFFER_SIZE);
console.log("Mux Receive (" + data.byteLength + " Bytes ) " + data)
if (data.byteLength == 0) {
// Check EOF
let flag = chn.readPriv();
if (flag & PRIV_FLAGS.EOF) {
console.log("MUXER: EOF " + i)
muxer._eof(i);
++eofCount;
eof[i] = true;
}
continue;
}
console.log("MUXWW Push to " + i);
/* Write data into wasm */
muxBuffer = muxer._malloc(MUX_BUFFER_SIZE);
muxBuffer = muxer._malloc(data.byteLength);
muxer.HEAP8.set(data, muxBuffer);
muxer._muxPush(i, muxBuffer, data.byteLength);
}
// Handle Datas
muxer._muxStep();
done = muxer._muxStep();
console.log("Mux step done")
return true;
......@@ -134,7 +153,13 @@ async function step() {
async function steps() {
while (await step()) {
console.log("Done: " + done);
if (done)
break;
await sleep(SLEEP_INTERVAL);
}
// Notify to main thread that all frames
// has been written into format.
postMessage(makeMsg(MESSAGE_TYPE.DONE, {}));
}
......@@ -28,6 +28,7 @@ export class WW extends Observable {
#remoteInited = false;
#remoteBridged = false;
#done = false;
#state = WW_STATE.UNINITIALIZED;
......@@ -111,6 +112,10 @@ export class WW extends Observable {
return this.#state;
}
isDone() {
return this.#done;
}
// Enable status tracking for WW,
// status of WW will map to local variables.
#monitor() {
......@@ -122,6 +127,9 @@ export class WW extends Observable {
case MESSAGE_TYPE.BRIDGE:
this.#remoteBridged = true;
break;
case MESSAGE_TYPE.DONE:
this.#done = true;
break;
}
// Update State
......
......@@ -6,6 +6,10 @@ class CpySchedule {
second = { pos: 0, size: 0 }
}
export function createChnlBuffer(size, bufferType=SharedArrayBuffer) {
return new bufferType(size+12+1);
}
/* An abstraction that help to
* easily to transfer datas to target worker
* in real time manner */
......@@ -98,6 +102,11 @@ export class Channel {
return true;
}
isSetPriv(flag) {
let flags = this.readPriv();
return flags & flag;
}
setPriv(flag) {
let old = this.readPriv();
this.writePriv(flag | old);
......
......@@ -30,7 +30,8 @@ export const MESSAGE_TYPE = Object.freeze({
* consume packets as fast as Encoder provide */
DATA_REQUIRE : 7,
EOF : 8,
MSG_MAX : 8,
DONE : 9,
MSG_MAX : 9,
});
......@@ -43,7 +44,7 @@ export function makeMsg(type, info_) {
}
export function makeInitMsg(shm, size, ident) {
return makeMsg(MESSAGE_TYPE.INIT, {shm: shm, size:size, ident: ident})
return makeMsg(MESSAGE_TYPE.INIT, { shm: shm, size:size, ident: ident})
}
export function typeOfMsg(msg) {
......@@ -71,7 +72,6 @@ export function isEncGrpMsg(msg) {
}
function isMsgWithType(msg, type) {
if (type >= MESSAGE_TYPE.MSG_MIN &&
type <= MESSAGE_TYPE.MSG_MAX) {
return false;
......
import { assert, waitCond } from './utils.js';
import { assert, waitCond, sleep } from './utils.js';
import { WWGroup } from './WWGroup.js';
import { WW, WWInitError } from './WW.js';
import { WW, WWInitError, WW_STATE } from './WW.js';
import { Channel } from './channel.js';
import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg, typeOfMsg } from './encGrooupMsg.js';
import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg, typeOfMsg, PRIV_FLAGS } from './encGrooupMsg.js';
export class H264EncWWGroupInitError extends Error {
constructor(gid) {
......@@ -11,6 +11,13 @@ export class H264EncWWGroupInitError extends Error {
}
}
export const H264GRP_OPTIONS = Object.freeze({
MAX_PAYLOAD: "max",
NUM_OF_WW: "numOfWW",
MUXER_CHNL_SIZE: "muxchnlsize",
ENC_CHNL_SIZE: "encchnlsize",
BRIDGE_CHNL_SIZE: "bridgechnlsize",
});
/* WWGroup which target is to encode RGB frames into
* MP4 video */
......@@ -21,21 +28,28 @@ export class H264EncWWGroup extends WWGroup {
#muxWorker = undefined;
#encWorkers = [];
#channels = {};
#channels = [];
#bridges = {};
#name = undefined;
#curProcWW = 0;
/* Options */
/* Number of frames WWGroup can handle at a time */
#maxPayload = 0;
/* Muxer Channel Size */
#muxChannelSize = Math.pow(2, 20);
#muxChannelSize = Math.pow(2, 22);
/* Encoder Channel Size */
#encChannelSize = Math.pow(2, 20);
#encChannelSize = Math.pow(2, 22);
#bridgeChannelSize = Math.pow(2, 22);
#begin = false;
#terminated = false;
/* Precondition: 2 <= options.numOfWW */
constructor(name, options) {
......@@ -51,10 +65,16 @@ export class H264EncWWGroup extends WWGroup {
if ('max' in options)
this.#maxPayload = options.max;
if ('muxchnlsize' in options)
this.#muxChannelSize = options.muxchnlsize;
if ('encchnlsize' in options)
this.#encChannelSize = options.encChanlsize;
this.#encChannelSize = options.encchnlsize;
if ('bridgechnlsize' in options)
this.#bridgeChannelSize = options.bridgechnlsize;
if ('numOfWW' in options) {
// Precondition not meet:
// At least two web workers is required, one encode worker
......@@ -93,14 +113,14 @@ export class H264EncWWGroup extends WWGroup {
await this.#muxWorker.start(async ww => {
return await initStrategies(muxInit, INIT_FLAGS.NONE, ww,
// muxInit parameters
this.#channels, ww)
this.#numOfEncWorker, Math.pow(2, 22), ww)
});
for (let i = 0; i < this.#numOfEncWorker; ++i) {
await this.#encWorkers[i].start(async ww => {
return await initStrategies(encInit, INIT_FLAGS.NONE, ww,
// encInit parameters
this.#channels, ww);
this.#channels, this.#encChannelSize, ww);
});
}
} catch (e) {
......@@ -114,9 +134,15 @@ export class H264EncWWGroup extends WWGroup {
let enc = this.#encWorkers[i];
enc.connect(
this.#muxWorker,
async (ww, mux) => { encMuxBridge(this.#bridges, ww, mux) },
async msg => { return isDataMsg(msg); });
async (ww, mux) => { encMuxBridge(this.#bridges, this.#bridgeChannelSize, ww, mux) },
msg => { return isDataMsg(msg); });
}
waitCond(() => {
const workers = this.#encWorkers.concat([this.#muxWorker]);
return workers.map(ww => ww.getState() == WW_STATE.READY)
.reduce((l,r) => l && r);
});
}
numOfWorker() {
......@@ -125,6 +151,58 @@ export class H264EncWWGroup extends WWGroup {
async dispatch(rgbFrame) {
if (!this.#begin) {
this.#muxWorker.postMessage(
makeMsg(MESSAGE_TYPE.DATA, {}));
this.#begin = true;
}
if (this.#terminated) return;
console.log("Proc WW is " + this.#encWorkers[this.#curProcWW].ident());
// Null is treated as terminated
// frame.
if (rgbFrame == null) {
for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#channels[i].setPriv(PRIV_FLAGS.EOF);
}
this.#terminated = true;
return;
}
console.log("Ready to push...");
if (!this.#channels[this.#curProcWW].isSetPriv(PRIV_FLAGS.EXECUTING)) {
this.#encWorkers[this.#curProcWW].postMessage(
makeMsg(MESSAGE_TYPE.DATA, {}))
}
while (this.#channels[this.#curProcWW].push(rgbFrame) == false) {
console.log("Pushing...");
await sleep(300);
}
this.#curProcWW = (this.#curProcWW + 1) % this.#numOfEncWorker;
}
async getResult() {
// Muxer is not done the job.
if (!this.#muxWorker.isDone()) {
return null;
}
let result = null;
let sub = this.#muxWorker.subscribe(buff => {
result = buff
});
this.#muxWorker.postMessage(
makeMsg(MESSAGE_TYPE.DATA_REQUIRE, {}));
await waitCond(() => result != null, 500);
sub.unsusbscribe();
return result;
}
}
......@@ -164,39 +242,42 @@ function syncInitStrategy(init, ww) {
let strategy = async () => {
let sub = ww.subscribe(msg => {
inited = typeOfMsg(msg) == MESSAGE_TYPE.INIT;
console.log("INITED: " + ww.ident());
sub.unsubscribe();
});
if (await init() === false)
if (await init() === false) {
return false;
}
// Waiting 1 Second for Encoder/Muxer
let ret = await waitCond(() => inited === true, 1000);
let ret = await waitCond(() => inited === true, 5000);
return ret;
};
return strategy;
}
async function muxInit(chnls, ww) {
let channel = new Channel(Math.pow(2, 20));
async function encInit(chnls, size, ww) {
let channel = new Channel(size);
ww.postMessage(
makeInitMsg(channel.getShMem(), Math.pow(2, 20), ww.ident()));
chnls[ww.ident()] = channel;
makeInitMsg(channel.getShMem(), size, ww.ident()));
chnls.push(channel);
return true;
}
async function encInit(chnls, ww) {
return await muxInit(chnls, ww);
async function muxInit(numOfEncs, size, ww) {
ww.postMessage(
makeMsg(MESSAGE_TYPE.INIT, { wasmBufSize: size, numOfEncs: numOfEncs }));
}
async function encMuxBridge(bridge, enc, mux) {
let channel = new Channel(Math.pow(2, 20));
async function encMuxBridge(bridge, size, enc, mux) {
let channel = new Channel(size);
enc.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
makeMsg(MESSAGE_TYPE.BRIDGE, { shm: channel.getShMem(), size: size }));
mux.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
makeMsg(MESSAGE_TYPE.BRIDGE, { shm: channel.getShMem(), size: size }));
bridge[enc.ident()] = channel;
}
......@@ -22,7 +22,7 @@ export function isInObj(obj, v) {
return asArray.reduce((prev, cur) => prev || cur[1] == v, false);
}
export async function waitCond(cond, timeout = 500, intvl = 200) {
export async function waitCond(cond, timeout = 60000, intvl = 200) {
let remain = timeout, ret = false;
await new Promise(resolve => {
......
......@@ -250,10 +250,13 @@ public:
}
void push(int idx, AVPacket *packet) {
printf("PacketBuffer: Push\n");
packets[idx].push(packet);
printf("size of packet in %d is %d\n", idx, packets[idx].size());
}
AVPacket* pop(int idx) {
printf("PacketBuffer: POP\n");
auto &chain = packets[idx];
if (chain.empty()) {
return nullptr;
......@@ -264,6 +267,10 @@ public:
return pkt;
}
int size(int idx) {
return packets[idx].size();
}
private:
std::vector<PacketChain> packets;
};
......@@ -272,8 +279,10 @@ bool ioCtxInited = false;
int numOfStreams = 0;
PacketBuffer pktBuffer;
int finishedCount = 0;
int initedCount = 0;
std::shared_ptr<bool[]> finished;
std::shared_ptr<bool[]> inited;
std::shared_ptr<bool[]> failed;
bool oCtxInited = false;
std::vector<IOEndpoint> protos;
IOCtx::InCtx **ctxs = nullptr;
......@@ -285,6 +294,11 @@ int currentChannel = 0;
const char *outputPath = "/tmp/output.mp4";
IOCtx::OutCtx oCtx { outputPath };
bool done = false;
/* Timestam parameters */
int timescale = 90000;
AVRational timebase = av_make_q(1, timescale);
int framerate = 30;
};
EM_PORT_API(int) muxInit(int numOfStreams) {
......@@ -294,8 +308,11 @@ EM_PORT_API(int) muxInit(int numOfStreams) {
MuxEnv::pktBuffer = MuxEnv::PacketBuffer(numOfStreams);
MuxEnv::finished = std::shared_ptr<bool[]>(new bool[numOfStreams]{false});
MuxEnv::inited = std::shared_ptr<bool[]>(new bool[numOfStreams]{false});
MuxEnv::failed = std::shared_ptr<bool[]>(new bool[numOfStreams]{false});
MuxEnv::ctxs = (IOCtx::InCtx**)malloc(sizeof(IOCtx::InCtx*));
printf("Numb of Streams %d\n", numOfStreams);
for (int i = 0; i < numOfStreams; ++i) {
// Create IOEncpoint for a IOCtx
......@@ -306,9 +323,8 @@ EM_PORT_API(int) muxInit(int numOfStreams) {
}
EM_PORT_API(int) muxPush(int sIdx, uint8_t *data, size_t size) {
IOProto::MovMemProto::MemPiece mem;
printf("Push to %d in wasm\n", sIdx);
MuxEnv::protos[sIdx].push(data, size);
return 0;
}
......@@ -323,38 +339,52 @@ int bufferPackets() {
// then buffer such packets.
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
if (MuxEnv::finished[i] || !MuxEnv::inited[i])
if (MuxEnv::failed[i] || MuxEnv::finished[i] || !MuxEnv::inited[i]) {
printf("SKIP %d\n", i);
continue;
}
printf("IOCTX %d in processing...\n", i);
IOCtx::InCtx *c = MuxEnv::ctxs[i];
pkt = pkt == nullptr ? av_packet_alloc() : pkt;
printf("Read Frame...\n");
ret = c->readFrame_(pkt);
printf("Read Frame...Done\n");
if (ret < 0) {
if (ret == AVERROR_EOF) {
MuxEnv::finishedCount++;
MuxEnv::finished[i] = true;
printf("EOF\n");
printf("WASM: EOF %d\n", i);
if (MuxEnv::finishedCount == MuxEnv::numOfStreams) {
printf("ALL STREAM EOF DONE\n");
AVPacket *nullpkt = av_packet_alloc();
nullpkt->data = nullptr;
nullpkt->size = 0;
MuxEnv::finished[i] = false;
MuxEnv::pktBuffer.push(i, nullpkt);
}
goto END_LOOP;
} else if (ret == AVERROR(EAGAIN)) {
printf("READ AGAIN\n");
} else {
MuxEnv::failed[i] = true;
printf("Failed\n");
}
printf("Failed to read frame\n");
av_packet_unref(pkt);
continue;
}
printf("PKT SIZE %d\n", pkt->size);
printf("Buffer into buffer %d\n", i);
MuxEnv::pktBuffer.push(i, pkt);
printf("Buffer into buffer %d Done\n", i);
pkt = nullptr;
++readed;
......@@ -379,12 +409,14 @@ int bufferPackets() {
void ioCtxInitialize() {
printf("Size of Buffer: %ld\n", MuxEnv::protos[0].size());
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
printf("Size of Buffer %d: %ld\n", i, MuxEnv::protos[i].size());
// IOProto require 20KB to probe stream informations.
if (!MuxEnv::inited[i] && MuxEnv::protos[i].size() > 65000) {
MuxEnv::ctxs[i] = new IOCtx::InCtx{"", &MuxEnv::protos[0]};
MuxEnv::ctxs[i] = new IOCtx::InCtx{"", &MuxEnv::protos[i]};
MuxEnv::inited[i] = true;
if (!MuxEnv::oCtxInited) {
......@@ -395,52 +427,103 @@ void ioCtxInitialize() {
MuxEnv::oCtx.newStream(s->codecpar);
MuxEnv::oCtx.writeHeader();
}
printf("INIT DONE\n");
printf("INIT DONE: %d\n", i);
++MuxEnv::initedCount;
}
}
}
EM_PORT_API(int) muxStep() {
void timestampSetup(AVPacket *pkt) {
static int ptsVal = 0, dtsVal = 0, duration = 0;
int i;
if (duration == 0) {
duration = MuxEnv::timescale / MuxEnv::framerate;
}
// Buffer Encoded frames from
// Encoder into 'MuxEnv::pktBuffer'
bufferPackets();
if (ptsVal == 0 && dtsVal == 0) {
dtsVal = -duration;
}
// Initialize IOCtx
ioCtxInitialize();
pkt->pts = ptsVal;
pkt->dts = dtsVal;
pkt->duration = duration;
pkt->stream_index = 0;
pkt->pos = -1;
printf("PTS: %d\n, DTS: %d\n", ptsVal, dtsVal);
ptsVal += duration;
dtsVal += duration;
}
int writeToOutput() {
int i = MuxEnv::currentChannel;
// Write out to file
while (true)
for (i = MuxEnv::currentChannel; i < MuxEnv::numOfStreams; ++i) {
while (true) {
AVPacket *p = MuxEnv::pktBuffer.pop(i);
if (p == nullptr)
goto END_LOOP;
if (p->data == nullptr && p->size == 0) {
av_packet_free(&p);
if (MuxEnv::finished[i]) {
printf("SKIP Cause of FIN %d\n", i);
goto NEXT;
} else {
printf("P is nullptr in %d\n", i);
goto END_LOOP;
}
else if (p->data == nullptr && p->size == 0) {
goto MUX_DONE;
}
timestampSetup(p);
// Write to output context
//MuxEnv::oCtx.writeFrame(p);
printf("Write Done\n");
MuxEnv::oCtx.writeFrame(p);
printf("Write Done %d\n", i);
// Cleaning
av_packet_free(&p);
NEXT:
i = (i + 1) % MuxEnv::numOfStreams;
}
END_LOOP:
printf("SKIP %d", i);
MuxEnv::currentChannel = i;
return 0;
MUX_DONE:
printf("WASM MUXER DONE\n");
for (int i = 0; i < MuxEnv::numOfStreams; ++i) {
printf("Remain in Proto %d is %d\n", i, MuxEnv::protos[i].size());
printf("Remain in packet buffer %d is %d\n\n", MuxEnv::pktBuffer.size(i));
}
MuxEnv::done = true;
MuxEnv::oCtx.writeTrailer();
// Cleaning
return 1;
}
return 0;
EM_PORT_API(int) muxStep() {
printf("Step Buffer\n");
// Buffer Encoded frames from
// Encoder into 'MuxEnv::pktBuffer'
bufferPackets();
printf("Init\n");
// Initialize IOCtx
if (MuxEnv::initedCount < MuxEnv::numOfStreams)
ioCtxInitialize();
printf("WriteOut\n");
// Try to write to output file
return writeToOutput();
}
EM_PORT_API(int) isDone() {
......
......@@ -16,7 +16,7 @@ describe("H264EncWWGroup Spec", () => {
const RGBAFrameSize = 1920*1080*4;
let grp = new H264EncWWGroup("h264enc", {
numOfWW: 8,
numOfWW: 3,
encchnlsize: RGBAFrameSize * 10,
bridgechnlsize: Math.pow(2, 25)
});
......
import { WW, WW_STATE } from '../src/WW.js';
import { Channel } from '../src/channel.js';
import { makeMsg, MESSAGE_TYPE } from '../src/encGrooupMsg.js';
import { Channel, createChnlBuffer } from '../src/channel.js';
import { makeMsg, MESSAGE_TYPE, PRIV_FLAGS } from '../src/encGrooupMsg.js';
import { sleep, assert } from '../src/utils.js';
import { saveAs } from 'file-save';
// For Webpack 5
let url = '../resources/tests/worker.js';
......@@ -191,12 +192,12 @@ describe("EncWW Specifications", () => {
await sleep(1000);
});
fit("Encoder and Muxer", async () => {
xit("Encoder and Muxer", async () => {
const RGBAFrameSize = 1920*1080*4;
const BRIDGE_SIZE = Math.pow(2, 26);
const INPUT_SIZE = RGBAFrameSize*30;
// Create Encoder
// Create Encoder 1
let enc = new WW("ENC", "../resources/workers/encWW.js");
let mem = new SharedArrayBuffer(INPUT_SIZE+1+12);
enc.postMessage(makeMsg(
......@@ -247,4 +248,111 @@ describe("EncWW Specifications", () => {
await sleep(100000);
}, 100000);
it("Muxer and Multiple Encoder", async () => {
const RGBAFrameSize = 1920*1080*4;
const BRIDGE_SIZE = Math.pow(2, 26);
const INPUT_SIZE = RGBAFrameSize*30;
// Create Enc 1
let enc_1 = new WW("ENC1", "../resources/workers/encWW.js");
let mem1 = new createChnlBuffer(INPUT_SIZE);
enc_1.postMessage(makeMsg(
MESSAGE_TYPE.INIT, { shm:mem1, size: INPUT_SIZE, ident: "ENC1" }));
// Create Enc 2
let enc_2 = new WW("ENC2", "../resources/workers/encWW.js");
let mem2 = new createChnlBuffer(INPUT_SIZE);
enc_2.postMessage(makeMsg(
MESSAGE_TYPE.INIT, { shm:mem2, size: INPUT_SIZE, ident: "ENC2" }));
// Create Muxer
let mux = new WW("Mux", "../resources/workers/muxWW.js");
mux.postMessage(makeMsg(
MESSAGE_TYPE.INIT, { numOfEncs: 2 }
));
// Bridge Encoder 1 and Muxer
let bridgeMem1 = new SharedArrayBuffer(BRIDGE_SIZE);
enc_1.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem1, size: BRIDGE_SIZE }
))
mux.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem1, size: BRIDGE_SIZE }
))
// Bridge Encoder 2 and Muxer
let bridgeMem2 = new SharedArrayBuffer(BRIDGE_SIZE);
enc_2.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem2, size: BRIDGE_SIZE }
))
mux.postMessage(makeMsg(
MESSAGE_TYPE.BRIDGE, { shm:bridgeMem2, size: BRIDGE_SIZE }
))
await new Promise(r => {
let intvler = setInterval(() => {
if (enc_1.getState() == WW_STATE.READY &&
enc_2.getState() == WW_STATE.READY) {
clearInterval(intvler);
r();
}
}, 100);
});
console.log("Ready");
const data = new Uint8Array([...Array(RGBAFrameSize).keys()]);
let input_1 = new Channel(INPUT_SIZE, SharedArrayBuffer, mem1);
let input_2 = new Channel(INPUT_SIZE, SharedArrayBuffer, mem2);
/* Send Data */
enc_1.postMessage(makeMsg(
MESSAGE_TYPE.DATA, {}
));
enc_2.postMessage(makeMsg(
MESSAGE_TYPE.DATA, {}
));
mux.postMessage(makeMsg(
MESSAGE_TYPE.DATA, {}
))
let inputs = [ input_1, input_2 ];
for (let i = 0; i < 300; ++i) {
while (inputs[i%2].push(data) == false) {
console.log("Sleep");
await sleep(100);
}
console.log("PUSH IN MAIN\n");
}
input_1.setPriv(PRIV_FLAGS.EOF);
input_2.setPriv(PRIV_FLAGS.EOF);
await new Promise(r => {
let intvler = setInterval(() => {
if (mux.isDone()) {
clearInterval(intvler);
r();
}
}, 1000);
});
console.log("ALL DONE");
let done = false;
mux.subscribe(msg => {
done = true;
});
mux.postMessage(makeMsg(MESSAGE_TYPE.DATA_REQUIRE, {}));
await sleep(3000);
expect(done).toBe(true);
}, 100000);
});
import { sleep } from '../src/utils.js';
import { Observable } from 'rxjs';
import { H264EncWWGroup } from '../src/encGroup.js';
import { Obervable, Observable } from 'rxjs';
import { Channel } from '../src/channel.js';
const areEqual = (first, second) =>
......@@ -28,10 +29,6 @@ describe("Channel Spec", () => {
expect(buffer.byteLength == 0).toBe(true);
});
it("Write Until Full", () => {
let channel = new Channel(16, ArrayBuffer);
});
it("Write datas", () => {
let channel = new Channel(10, ArrayBuffer);
......@@ -65,7 +62,7 @@ describe("Channel Spec", () => {
it("Write Until Full", () => {
let channel = new Channel(1024, ArrayBuffer);
let buffer = new Uint8Array([...Array(1023).keys()]);
let buffer = new Uint8Array([...Array(1024).keys()]);
let ret = channel.push(buffer);
expect(ret).toBe(true);
......@@ -75,7 +72,7 @@ describe("Channel Spec", () => {
ret = channel.push(new Uint8Array([1]));
expect(ret).toBe(false);
let readData = channel.readData(1023);
let readData = channel.readData(1024);
expect(areEqual(buffer, readData))
.withContext("Data Mismatch")
.toBe(true);
......@@ -114,16 +111,19 @@ describe("Channel Spec", () => {
// Push should be failed cause
// there are no more space.
ret = channel.push(buffer);
expect(ret).toBe(true);
ret = channel.push(buffer);
expect(ret).toBe(false);
let data = channel.readData(1);
expect(areEqual(buffer, data)).toBe(true);
expect(channel.isEmpty()).toBe(true);
});
it("Channel in WebWorker", () => {
data = channel.readData(1);
expect(areEqual(buffer, data)).toBe(true);
expect(channel.isEmpty()).toBe(true);
});
it("Transfer 64 MB to WebWorker", async () => {
......
......@@ -18,8 +18,8 @@ THIRD_DIR=${WORKPATH}/lib/third/build
FFMPEG_PROTO=${WORKPATH}/src/protos/src
WASM_DIR=${WORKPATH}/src/wasms
DEBUG="-O3"
#DEBUG="-O1 -g -fno-inline -gseparate-dwarf=/src/demo2/temp.debug.wasm -s SEPARATE_DWARF_URL=http://localhost:5000/temp.debug.wasm"
DEBUG="-O2"
#DEBUG="-O1 -g -fno-inline -gseparate-dwarf=./temp.debug.wasm -s SEPARATE_DWARF_URL=http://localhost:9678/temp.debug.wasm"
BUILD_DIR=${WORKPATH}/Build
......@@ -101,8 +101,9 @@ FLAGS=(
-s EXPORTED_FUNCTIONS="[_main,_malloc,_free]" # export main and proxy_main funcs
-s EXPORTED_RUNTIME_METHODS="[FS, cwrap, ccall, setValue, writeAsciiToMemory, getValue]" # export preamble funcs
-s INITIAL_MEMORY=268435456 # 64 KB * 1024 * 16 * 2047 = 2146435072 bytes ~= 2 GB, 268435456 =256M, 134,217,728 =128M
-s ALLOW_MEMORY_GROWTH=1
-s ASSERTIONS=1
-s SAFE_HEAP=1
-s WARN_UNALIGNED=1
--pre-js $WORKPATH/pre.js
--post-js $WORKPATH/post.js
$OPTIM_FLAGS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment