Commit ec6b7258 authored by Linshizhi's avatar Linshizhi

Implement WW and WWGroup, H264EncWWGroup.

parent df6015b9
//import { COMMANDS } from '../src/WWOpts';
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
self.importScripts('../src/WWOpts.js');
let init = false;
// Init
onmessage = e => {
postMessage(1);
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data);
}
}
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
let init = false;
// Init
onmessage = e => {
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data + 1);
}
}
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
let init = false;
// Init
onmessage = e => {
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data);
}
}
......@@ -2,61 +2,54 @@
import { Observable } from 'rxjs';
import { COMMANDS } from './WWOpts.js';
export class WW extends Observable {
#ident = undefined;
#init = false;
#healthy = true;
#ww = undefined;
#connected = {};
constructor(name, url) {
constructor(name, path) {
super(subscriber => {
if (this.#init == true) {
throw new Error("WW is not ready");
}
this.#ww.onmessage = (e) => {
subscriber.next(e);
};
this.#ww.onmessage =
e => { subscriber.next(e.data); };
});
this.#ident = name;
this.#ww = new Worker(url);
// onmessage will be overwriten
// after inited.
this.#ww.onmessage = (e) => {
if (e.data == COMMANDS.INIT_DONE) {
this.#init = true;
} else if (e.data == COMMANDS.INIT_FAIL) {
this.#init = false;
}
}
this.#ww = new Worker(
new URL(path, import.meta.url)
);
}
async waitReady() {
async start() {
if (this.#init) {
return;
return this.#init;
}
this.#ww.postMessage("123456");
await new Promise(resolve => {
let p = new Promise(resolve => {
let i = setInterval(() => {
if (this.#init == true) {
clearInterval(i);
resolve();
if (this.#init || !this.#healthy) {
resolve();
}
let sub = this.subscribe(e => {
if (e == COMMANDS.INIT_DONE) {
this.#init = true;
} else if (e == COMMANDS.INIT_FAIL) {
this.#init = false;
this.#healthy = false;
}
}, 1000);
});
sub.unsubscribe();
await p;
}
resolve();
});
this.#ww.postMessage("Initialize");
});
test() {
this.#ww.postMessage("123");
return this.#init;
}
ident() {
......@@ -67,6 +60,39 @@ export class WW extends Observable {
return this.#init;
}
postMessage(msg) {
return this.#ww.postMessage(msg);
}
/* Some types of WW may requires ability to
* get together to build up a WW Chain. So this
* interface has following semantics:
* 1.All datas output from current WW will be streamed
* to targetWW.
* 2.No datas from targetWW will be streamed to current WW.
* 3.If a WW is be connected already then the connection
* between them is remain effective. */
connect(targetWW) {
if (targetWW.ident in this.#connected) {
// Already connected no effect
return;
}
let sub = this.subscribe(data => {
targetWW.postMessage(data);
});
this.#connected[targetWW.ident] = sub;
}
disconnect(targetWW) {
if (targetWW.ident in this.#connected) {
this.#connected[targetWW.ident].unsubscribe();
delete this.#connected[targetWW.ident];
}
}
// FIXME: Observable close is required
terminate() {
return this.#ww.terminate();
}
......
import { OVERRIDE_IS_REQUIRE } from './utils.js';
/* WWGroup is a structure which shaped by WWs and specifications
* about how to communicate with WWs. WWGroup is built for a concrete
* target, WWGrup is destroyed once reach the goal.
*
* Implement WWGroup require concrete WWs and specificatinos about
* how to communicate with such WWs. */
export class WWGroup {
start() {
OVERRIDE_IS_REQUIRE();
}
numOfWorker() {
OVERRIDE_IS_REQUIRE();
}
dispatch() {
OVERRIDE_IS_REQUIRE();
}
/* If none of WWs is in working then the
* WWGroup is idel otherwise not. */
isIdle() {
OVERRIDE_IS_REQUIRE();
}
/* If each of WWs reside in WWGroup is in working
* then it's busy */
isBusy() {
OVERRIDE_IS_REQUIRE();
}
/* WWGroup meet it's gole and all exits. */
isDone() {
OVERRIDE_IS_REQUIRE();
}
}
export const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
......
import { assert } from './utils.js';
import { WWGroup } from './WWGroup.js';
import { WW } from './WW.js';
/* An abstraction that help to
* easily to transfer datas to target worker
* in real time manner */
class Channel {
/* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Data Area (1-n bytes) */
/* Size of memory area that help to
* maintain Channel. Types of meta info
* are shown below:
*
* 1.Write Position
* 2.Read Position */
#numOfMetaField = 2
#fieldSize = 0;
#metaSize = 0;
#WW = undefined;
#size = 0;
#shMem = undefined;
#view = undefined;
#writePos = 0;
// size's unit is byte
constructor(WW, size) {
this.#WW = WW;
this.#size = size;
// 4 Bytes
this.#fieldSize = 4;
this.#metaSize = this.#numOfMetaField * this.#fieldSize;
this.#shMem = new SharedArrayBuffer(size + this.#metaSize);
this.#view = DataView(this.#shMem);
this.#writePos = this.#metaSize;
this.#size = size;
}
#readPos() {
return this.#view.getUint32(0);
}
#isAbleToWrite(size) {
if (this.#readPos() == writePos) {
/* Empty */
return true;
} else {
return this.#remain() > size;
}
}
#remain() {
let readPos = this.#readPos();
if (this.#writePos == readPos) {
return this.#size;
} else if (this.#writePos > readPos) {
return size - (this.#writePos - readPos) - 1;
} else {
return readPos - this.#writePos - 1;
}
}
push(datas /* Uint8Array */) {
if (!this.#isAbleToWrite(datas.byteLength)) {
return false;
}
/* Write to shared Memory */
}
}
/* WWGroup which target is to encode RGB frames into
* MP4 video */
export class H264EncWWGroup extends WWGroup {
#numOfWorkers = 0;
#numOfEncWorker = 0;
#muxWorker = undefined;
#encWorkers = [];
#channels = {};
/* Number of frames WWGroup can handle at a time */
#maxPayload = 0;
/* Precondition: 2 <= options.numOfWW */
constructor(name, options) {
super();
assert(typeof(name) == "string", "INVALID ARGUMENT");
// Setting options
if (options != undefined) {
if ('max' in options)
this.#maxPayload = options.max;
if ('numOfWW' in options) {
// Precondition not meet:
// At least two web workers is required, one encode worker
// and one mux worker.
if (options.numOfWW < 2) {
throw new Error("Number of worker is too small (<2)");
}
this.#numOfWorkers = options.numOfWW;
this.#numOfEncWorker = options.numOfWW - 1;
}
}
// H264EncWWGroup has two types of workers:
// 1.Encoder: There can be arbitary number of encoders, they are
// encode RGB frame received from caller into H264 frame.
// Once such encode process is done the generated H264 frame
// will hand to muxer.
// 2.Muxer: There is only one Muxer, it's responsibility is to write
// frames that received from Encoder into a digitial container
// in order.
// Create Muxer
this.#muxWorker = new WW("MUX",
new URL('./workers/muxWW.js', import.meta.url)
);
// Create Encoder
for (let i = 0; i < this.#numOfEncWorker; ++i) {
let worker = new WW("ENC_" + i,
new URL('./workers/encWW.js', import.meta.url)
);
this.#encWorkers.push(worker);
}
}
async start() {
// Start Muxer
await this.#muxWorker.start();
for (let i = 0; i < this.#numOfEncWorker; ++i) {
await this.#encWorkers[i].start();
}
// Connect Encoders to the Muxer
for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#encWorkers[i].connect(this.#muxWorker);
}
}
numOfWorker() {
return this.#numOfWorkers;
}
dispatch(rgbFrame) {
}
}
export function assert(expr, message) {
if (!expr) {
throw new Error(message || 'Assertion failed');
......@@ -11,5 +10,5 @@ export function NEED_TO_IMPLEMENT(message) {
}
export function OVERRIDE_IS_REQUIRE(message) {
throw new Error(message || "Need to override method");
throw new Error(message || "Method require an override");
}
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
let init = false;
// Init
onmessage = e => {
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data);
}
}
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
let init = false;
// Init
onmessage = e => {
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data);
}
}
import { OVERRIDE_IS_REQUIRE, assert } from './utils.js';
export class WWGroup {
dispatch() {
OVERRIDE_IS_REQUIRE();
}
isDone() {
OVERRIDE_IS_REQUIRE();
}
}
export class VideoEncGroup extends WWGroup {
#numOfInited = 0;
#muxWorker = undefined;
#encWorkers = [];
/* Number of frames WWGroup can handle at a time */
#maxPayload = 0;
/* Precondition: 2 <= numOfWW */
constructor(numOfWW, maxPayload) {
assert(typeof(numOfWW) == "number" &&
typeof(maxPayload) == "number",
"INVALID ARGUMENTs");
this.#maxPayload = maxPayload;
this.#muxWorker = new SharedWorker("./muxRole.js", "mux");
let numOfEncWW = numOfWW - 1;
for (let i = numOfEncWW; i < numOfEncWW; ++i) {
let encWW = new SharedWorker("./encodeRole.js", "enc_" + i);
encWorkers.push(encWW);
}
}
}
import { firstValueFrom } from 'rxjs';
import { WW } from '../src/WW.js';
describe("WWGroup Suite", () => {
// For Webpack 5
let url = new URL('../resources/worker.js', import.meta.url);
describe("WW Spec", () => {
it("Instantiation", async () => {
let url = new URL('../resources/worker.js', import.meta.url);
let ww = new WW('core', url);
await ww.waitReady();
let success = await ww.start();
expect(success).toBe(true);
expect(ww.isReady()).toBe(true);
});
it("EchoWorker", async () => {
let ww = new WW('echo', url);
await ww.start();
expect(ww.isReady()).toBe(true);
let count = 0;
// Echo test
await new Promise(r => {
let sub = ww.subscribe(val => {
expect(val).toBe(1);
count = count + 1;
if (count == 500) {
r();
sub.unsubscribe();
}
});
for (let i = 0; i < 500; i=i+1) {
ww.postMessage(1);
}
});
expect(count).toBe(500);
});
// Conntect two worker to do plus
// of an input number.
it("Worker Connect", async () => {
let srcUrl = new URL('../resources/workerSRC.js', import.meta.url);
let dstUrl = new URL('../resources/workerDST.js', import.meta.url);
let src = new WW('SRC', srcUrl);
let dst = new WW('DST', dstUrl);
src.connect(dst);
let digit = 0;
await new Promise(resolve => {
let sub = dst.subscribe(num => {
digit = num;
if (digit == 100) {
sub.unsubscribe();
resolve();
}
src.postMessage(digit);
});
src.postMessage(digit);
});
expect(digit).toBe(100);
// Assume that disconnect is
// correct.
src.disconnect(dst);
});
});
import { H264EncWWGroup } from '../src/encGroup.js';
describe("H264EncWWGroup Spec", () => {
it("Instantiation", async () => {
let wg = new H264EncWWGroup("h264enc", { numOfWW: 3 });
await wg.start();
expect(wg.numOfWorker()).toBe(3);
});
})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment