Commit 0e0f0d0d authored by NzSN's avatar NzSN

Test Implement H264WWGroup.

parent 55815cd7
...@@ -67,6 +67,7 @@ async function init(msg) { ...@@ -67,6 +67,7 @@ async function init(msg) {
/* Init Muxer */ /* Init Muxer */
muxer._muxInit(numOfEncs); muxer._muxInit(numOfEncs);
postMessage(makeMsg(MESSAGE_TYPE.INIT, {}));
} }
async function destroy() { async function destroy() {
......
...@@ -2,11 +2,18 @@ ...@@ -2,11 +2,18 @@
import { Observable, filter } from './rxjs'; import { Observable, filter } from './rxjs';
export class WWInitError extends Error {
constructor(WWID) {
super("Web worker \'" + WWID + "\' fail to initialize");
}
}
export class WW extends Observable { export class WW extends Observable {
#ident = undefined; #ident = undefined;
#init = false; #inited = false;
#bridged = false;
#healthy = true; #healthy = true;
#ww = undefined; #ww = undefined;
#connected = {}; #connected = {};
...@@ -21,9 +28,11 @@ export class WW extends Observable { ...@@ -21,9 +28,11 @@ export class WW extends Observable {
this.#ww = new Worker(path); this.#ww = new Worker(path);
} }
async start(initWorks) { async start(initWork) {
this.#init = await initWorks(this); this.#inited = await initWork(this);
return this.#init; if (this.#inited == false)
throw new WWInitError(this.#ident);
return this.#inited;
} }
ident() { ident() {
...@@ -31,7 +40,7 @@ export class WW extends Observable { ...@@ -31,7 +40,7 @@ export class WW extends Observable {
} }
isReady() { isReady() {
return this.#init; return this.#inited;
} }
postMessage(msg) { postMessage(msg) {
...@@ -76,4 +85,9 @@ export class WW extends Observable { ...@@ -76,4 +85,9 @@ export class WW extends Observable {
return this.#ww.terminate(); return this.#ww.terminate();
} }
monitor() {
this.subscribe(msg => {
});
}
} }
export const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
import { assert, sleep } from './utils.js'; import { assert, waitCond } from './utils.js';
import { WWGroup } from './WWGroup.js'; import { WWGroup } from './WWGroup.js';
import { WW } from './WW.js'; import { WW, WWInitError } from './WW.js';
import { Channel } from './channel.js'; import { Channel } from './channel.js';
import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg } from './encGrooupMsg.js'; import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg, typeOfMsg } from './encGrooupMsg.js';
async function muxInit(chnls, ww) { export class H264EncWWGroupInitError extends Error {
let channel = new Channel(Math.pow(2, 20)); constructor(gid) {
ww.postMessage( super("H264EncWWGroup \'" + gid + "\' fail to initialize");
makeInitMsg(channel.getShMem(), Math.pow(2, 20))); }
chnls[ww.ident()] = channel;
return true;
}
async function encInit(chnls, ww) {
return await muxInit(chnls, ww);
}
async function encMuxBridge(bridge, enc, mux) {
let channel = new Channel(Math.pow(2, 20));
enc.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
mux.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
bridge[enc.ident()] = channel;
} }
...@@ -40,6 +24,8 @@ export class H264EncWWGroup extends WWGroup { ...@@ -40,6 +24,8 @@ export class H264EncWWGroup extends WWGroup {
#channels = {}; #channels = {};
#bridges = {}; #bridges = {};
#name = undefined;
/* Options */ /* Options */
/* Number of frames WWGroup can handle at a time */ /* Number of frames WWGroup can handle at a time */
...@@ -58,6 +44,8 @@ export class H264EncWWGroup extends WWGroup { ...@@ -58,6 +44,8 @@ export class H264EncWWGroup extends WWGroup {
assert(typeof(name) == "string", "INVALID ARGUMENT"); assert(typeof(name) == "string", "INVALID ARGUMENT");
this.#name = name;
// Setting options // Setting options
if (options != undefined) { if (options != undefined) {
...@@ -100,15 +88,25 @@ export class H264EncWWGroup extends WWGroup { ...@@ -100,15 +88,25 @@ export class H264EncWWGroup extends WWGroup {
} }
async start() { async start() {
// Start Muxer try {
await this.#muxWorker.start(async ww => { // Start Muxer
return await muxInit(this.#channels, ww); await this.#muxWorker.start(async ww => {
}); return await initStrategies(muxInit, INIT_FLAGS.NONE, ww,
// muxInit parameters
for (let i = 0; i < this.#numOfEncWorker; ++i) { this.#channels, ww)
await this.#encWorkers[i].start(async ww => {
return await encInit(this.#channels, ww);
}); });
for (let i = 0; i < this.#numOfEncWorker; ++i) {
await this.#encWorkers[i].start(async ww => {
return await initStrategies(encInit, INIT_FLAGS.NONE, ww,
// encInit parameters
this.#channels, ww);
});
}
} catch (e) {
if (e instanceof WWInitError) {
throw new H264EncWWGroupInitError(this.#name);
}
} }
// Connect Encoders to the Muxer // Connect Encoders to the Muxer
...@@ -130,3 +128,75 @@ export class H264EncWWGroup extends WWGroup { ...@@ -130,3 +128,75 @@ export class H264EncWWGroup extends WWGroup {
} }
} }
///////////////////////////////////////////////////////////////////////////////
// Misc //
///////////////////////////////////////////////////////////////////////////////
const INIT_FLAGS = Object.freeze({
NONE : 0x00,
ASYNCHRONOUS : 0x01,
});
async function initStrategies(initproc, flags, ww, ...args) {
let f = async () => await initproc(...args);
// Asynchronous
if (flags & INIT_FLAGS.ASYNCHRONOUS) {
f = f;
} else {
// Do Synchronous Initialization
f = syncInitStrategy(f, ww);
}
// Do initialization
let ret = await f();
return ret;
}
function syncInitStrategy(init, ww) {
let inited = undefined;
let strategy = async () => {
let sub = ww.subscribe(msg => {
inited = typeOfMsg(msg) == MESSAGE_TYPE.INIT;
sub.unsubscribe();
});
if (await init() === false)
return false;
// Waiting 1 Second for Encoder/Muxer
let ret = await waitCond(() => inited === true, 1000);
return ret;
};
return strategy;
}
async function muxInit(chnls, ww) {
let channel = new Channel(Math.pow(2, 20));
ww.postMessage(
makeInitMsg(channel.getShMem(), Math.pow(2, 20)));
chnls[ww.ident()] = channel;
return true;
}
async function encInit(chnls, ww) {
return await muxInit(chnls, ww);
}
async function encMuxBridge(bridge, enc, mux) {
let channel = new Channel(Math.pow(2, 20));
enc.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
mux.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
bridge[enc.ident()] = channel;
}
import { assert, NEED_TO_IMPLEMENT } from './utils.js'; import { assert, NEED_TO_IMPLEMENT, isInObj } from './utils.js';
import { ENC_GRPS } from './WWGroupList.js';
export let SUPPORT_CODECS = {
"h264": 0
};
export const ENCODE_MODE = Object.freeze({ export const ENCODE_MODE = Object.freeze({
URGENT: Symbol("URGENT"), URGENT : 0,
SAVE_MEMORY: Symbol("SAVEMEM"), SAVE_MEMORY : 1,
}); });
/* ParaEncoder use number of WebWorker specified by /* ParaEncoder use number of WebWorker specified by
* caller to encode RGB pictures into a video */ * caller to encode RGB pictures into a video */
export class ParaEncoder { export class ParaEncoder {
#numOfWW = 0;
#codec = undefined;
#mode = undefined
#grp = null;
constructor(numOfWW, codec, mode) { constructor(numOfWW, codec, mode) {
assert(typeof(numOfWW) == 'number', if (typeof(numOfWW) != 'number' ||
"typeof(n umOfTR) == 'number' failed"); !(codec in ENC_GRPS) ||
assert(typeof(codec) == 'string', !isInObj(ENCODE_MODE, mode)) {
"typeof(codec) == 'number' failed");
for (let encodeMode in ENCODE_MODE) { throw new TypeError(
assert(mode != encodeMode, "INVALID ENCODE MODE"); "Mismatch types parameter", "paraEncode.js");
} }
this.numOfWW = numOfWW;
this.codec = codec; this.#numOfWW = numOfWW;
this.mode = mode; this.#codec = codec;
this.#mode = mode;
}
async init() {
this.#grp = new ENC_GRPS[codec]("ENCs", {});
await this.#grp.start();
}
numOfWW() {
return this.#numOfWW;
}
codec() {
return this.#codec;
}
mode() {
return this.#mode;
} }
/* Encode pixels into a video frame, pixels can be generated from /* Encode pixels into a video frame, pixels can be generated from
* WebGLRenderingContext.readPixels() * WebGLRenderingContext.readPixels()
* details: https://developer.mozilla.org/en-US/docs/Web/API/WebGLRenderingContext/readPixels * details: https://developer.mozilla.org/en-US/docs/Web/API/WebGLRenderingContext/readPixels
* *
* Pixels should be passed to worker thread */ * pixels can be null which means there are no more frames to encode,
* after that encoder will rejected any pixels.
* */
encode(pixels) { encode(pixels) {
switch (this.mode) {
case ENCODE_MODE.URGENT:
return this.#encodeUrgent(pixels);
case ENCODE_MODE.SAVE_MEMORY:
return this.#encodeUrgent(pixels);
}
}
#encodeUrgent(pixels) {
NEED_TO_IMPLEMENT(); NEED_TO_IMPLEMENT();
} }
#encodeSaveMem(pixels) { /* Does all frames provide to Encoder be encoded.
*
* Note: isDone() alwasy return false before eof. */
isDone() {
NEED_TO_IMPLEMENT(); NEED_TO_IMPLEMENT();
} }
} }
...@@ -16,3 +16,24 @@ export function OVERRIDE_IS_REQUIRE(message) { ...@@ -16,3 +16,24 @@ export function OVERRIDE_IS_REQUIRE(message) {
export async function sleep(ms) { export async function sleep(ms) {
await new Promise(r => setTimeout(() => r(), ms)); await new Promise(r => setTimeout(() => r(), ms));
} }
export function isInObj(obj, v) {
const asArray = Object.entries(obj);
return asArray.reduce((prev, cur) => prev || cur[1] == v, false);
}
export async function waitCond(cond, timeout = 500, intvl = 200) {
let remain = timeout, ret = false;
await new Promise(resolve => {
let intvler = setInterval(() => {
if ((ret = cond()) || remain == 0) {
clearInterval(intvler);
resolve();
}
remain -= intvl;
}, intvl);
});
return ret;
}
import { ParaEncoder, ENCODE_MODE } from "../src/paraEncode.js"; import { ParaEncoder, ENCODE_MODE } from "../src/paraEncode.js";
let dut = new ParaEncoder(1, "h264", ENCODE_MODE.SAVE_MEMORY);
let paraEnc;
beforeEach(() => {
paraEnc = new ParaEncoder(2, "H264", ENCODE_MODE.SAVE_MEMORY);
});
describe("ParaEncoder", () => { describe("ParaEncoder", () => {
it("ParaEncoder Normal Init", () => { it("ParaEncoder Normal Init", () => {
expect(dut.numOfWW).toBe(1); expect(paraEnc.numOfWW()).toBe(2);
expect(dut.codec).toBe("h264"); expect(paraEnc.codec()).toBe("H264");
expect(paraEnc.mode()).toBe(ENCODE_MODE.SAVE_MEMORY);
}); });
it("ParaEncoder Invalid Init", () => { it("ParaEncoder Invalid Init", () => {
try { try {
new ParaEncoder(1, 1, ENCODE_MODE.SAVE_MEMORY); new ParaEncoder(2, 1, ENCODE_MODE.SAVE_MEMORY);
} catch (err) { } catch (err) {}
expect(err.message)
.toBe("typeof(codec) == 'number' failed");
}
}); });
it("ParaEncoder Invalid encode mode", () => { it("ParaEncoder Invalid encode mode", () => {
let WRONG_MODE = 0;
try { try {
new ParaEncoder(1, "h264", Symbol("URGENT")); new ParaEncoder(1, "h264", WRONG_MODE);
} catch (err) { } catch (err) {}
expect(err.message)
.toBe("INVALID ENCODE MODE");
}
}); });
it("Encode With ParaEncoder", () => {
});
}); });
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment