Commit 37a23efe authored by Linshizhi's avatar Linshizhi

Implement Encoder.

parent faa64552
...@@ -23,33 +23,8 @@ export class WW extends Observable { ...@@ -23,33 +23,8 @@ export class WW extends Observable {
); );
} }
async start() { async start(initWorks) {
this.#init = await initWorks(this);
if (this.#init) {
return this.#init;
}
await new Promise(resolve => {
if (this.#init || !this.#healthy) {
resolve();
}
let sub = this.subscribe(e => {
if (e == COMMANDS.INIT_DONE) {
this.#init = true;
} else if (e == COMMANDS.INIT_FAIL) {
this.#init = false;
this.#healthy = false;
}
sub.unsubscribe();
resolve();
});
this.#ww.postMessage("Initialize");
});
return this.#init; return this.#init;
} }
......
...@@ -10,7 +10,7 @@ import { OVERRIDE_IS_REQUIRE } from './utils.js'; ...@@ -10,7 +10,7 @@ import { OVERRIDE_IS_REQUIRE } from './utils.js';
* how to communicate with such WWs. */ * how to communicate with such WWs. */
export class WWGroup { export class WWGroup {
start() { async start() {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
...@@ -18,24 +18,28 @@ export class WWGroup { ...@@ -18,24 +18,28 @@ export class WWGroup {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
dispatch() { async dispatch() {
OVERRIDE_IS_REQUIRE();
}
async isReady() {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
/* If none of WWs is in working then the /* If none of WWs is in working then the
* WWGroup is idel otherwise not. */ * WWGroup is idel otherwise not. */
isIdle() { async isIdle() {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
/* If each of WWs reside in WWGroup is in working /* If each of WWs reside in WWGroup is in working
* then it's busy */ * then it's busy */
isBusy() { async isBusy() {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
/* WWGroup meet it's gole and all exits. */ /* WWGroup meet it's gole and all exits. */
isDone() { async isDone() {
OVERRIDE_IS_REQUIRE(); OVERRIDE_IS_REQUIRE();
} }
} }
......
...@@ -86,24 +86,34 @@ export class Channel { ...@@ -86,24 +86,34 @@ export class Channel {
return this.#view.getUint32(4); return this.#view.getUint32(4);
} }
metaSize() {
return this.#metaSize;
}
readPriv() { readPriv() {
return [ return this.#view.getUint32(8);
this.#view.getUint8(8),
this.#view.getUint8(9),
this.#view.getUint8(10),
this.#view.getUint8(11),
];
} }
writePriv(privData) { writePriv(privData) {
if (!(privData instanceof Array) || try {
privData.length > this.#priFieldLen) { this.#view.setUint32(8, privData);
throw new Error("Invalid Private datas"); } catch (error) {
if (error instanceof RangeError)
return false;
} }
for (let i = 8; i < 12; ++i) { return true;
this.#view.setUint8(i); }
}
setPriv(flag) {
let old = this.readPriv();
this.writePriv(flag | old);
}
unsetPriv(flag) {
let old = this.readPriv();
flag = flag ^ 0x11111111;
this.writePriv(flag & old);
} }
/* Semantic: Is able to write 'size' of datas /* Semantic: Is able to write 'size' of datas
...@@ -164,7 +174,7 @@ export class Channel { ...@@ -164,7 +174,7 @@ export class Channel {
} }
isEmpty() { isEmpty() {
return this.#writePointerCache == this.#readPointerCache; return this.#getWritePointer() == this.#getReadPointer();
} }
// This method is for testing purposes. // This method is for testing purposes.
...@@ -179,7 +189,7 @@ export class Channel { ...@@ -179,7 +189,7 @@ export class Channel {
readBuffer = this.#buffer.slice(this.#readPointerCache, readTo); readBuffer = this.#buffer.slice(this.#readPointerCache, readTo);
this.#readPointerUpdate(readTo); this.#readPointerUpdate(readTo);
} else { } else {
// Read two times // To make sure
let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPointerCache); let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPointerCache);
let secondRSize = firstRSize < size ? size - firstRSize : 0; let secondRSize = firstRSize < size ? size - firstRSize : 0;
secondRSize = Math.min(secondRSize, writePos-this.#metaSize); secondRSize = Math.min(secondRSize, writePos-this.#metaSize);
......
...@@ -2,29 +2,29 @@ ...@@ -2,29 +2,29 @@
import { assert } from './utils.js'; import { assert } from './utils.js';
import { WWGroup } from './WWGroup.js'; import { WWGroup } from './WWGroup.js';
import { WW } from './WW.js'; import { WW } from './WW.js';
import { Channel } from './channel.js';
import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg } from './encGrooupMsg.js';
/////////////////////////////////////////////////////////////////////////////// async function muxInit(chnls, ww) {
// Message Definitions // let channel = new Channel(Math.pow(2, 20));
/////////////////////////////////////////////////////////////////////////////// ww.postMessage(
makeInitMsg(channel.getShMem(), Math.pow(2, 20)));
const MESSAGE_TYPE = Object.freeze({ chnls[ww.ident()] = channel;
/* Once Muxer receive a 'CONNECT_PREPARE' message
* It will use information along with the message to
* change it's status to get ready to be connected by
* Encoder. */
CONNECT_PREPARE : 0,
/* Similar to 'CONNECT_PREPARE' */
DISCONNECT_PREPARE : 1,
DATA : 2,
});
function makeMessage(msgType, datas) {
return {type:msgType, data: datas};
}
return true;
}
async function connectPrepare(WW) { async function encInit(chnls, ww) {
return await muxInit(chnls, ww);
}
async function encMuxBridge(bridge, enc, mux) {
let channel = new Channel(Math.pow(2, 20));
enc.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
mux.postMessage(
makeMsg(MESSAGE_TYPE.CONNECT_PREPARE, channel.getShMem()));
bridge[enc.ident()] = channel;
} }
...@@ -38,10 +38,19 @@ export class H264EncWWGroup extends WWGroup { ...@@ -38,10 +38,19 @@ export class H264EncWWGroup extends WWGroup {
#encWorkers = []; #encWorkers = [];
#channels = {}; #channels = {};
#bridges = {};
/* Options */
/* Number of frames WWGroup can handle at a time */ /* Number of frames WWGroup can handle at a time */
#maxPayload = 0; #maxPayload = 0;
/* Muxer Channel Size */
#muxChannelSize = Math.pow(2, 20);
/* Encoder Channel Size */
#encChannelSize = Math.pow(2, 20);
/* Precondition: 2 <= options.numOfWW */ /* Precondition: 2 <= options.numOfWW */
constructor(name, options) { constructor(name, options) {
...@@ -54,8 +63,11 @@ export class H264EncWWGroup extends WWGroup { ...@@ -54,8 +63,11 @@ export class H264EncWWGroup extends WWGroup {
if ('max' in options) if ('max' in options)
this.#maxPayload = options.max; this.#maxPayload = options.max;
if ('muxchnlsize' in options)
this.#muxChannelSize = options.muxchnlsize;
if ('encchnlsize' in options)
this.#encChannelSize = options.encChanlsize;
if ('numOfWW' in options) { if ('numOfWW' in options) {
// Precondition not meet: // Precondition not meet:
// At least two web workers is required, one encode worker // At least two web workers is required, one encode worker
// and one mux worker. // and one mux worker.
...@@ -93,20 +105,23 @@ export class H264EncWWGroup extends WWGroup { ...@@ -93,20 +105,23 @@ export class H264EncWWGroup extends WWGroup {
async start() { async start() {
// Start Muxer // Start Muxer
await this.#muxWorker.start(); await this.#muxWorker.start(async ww => {
return await muxInit(this.#channels, ww);
});
for (let i = 0; i < this.#numOfEncWorker; ++i) { for (let i = 0; i < this.#numOfEncWorker; ++i) {
await this.#encWorkers[i].start(); await this.#encWorkers[i].start(async ww => {
return await encInit(this.#channels, ww);
});
} }
// Connect Encoders to the Muxer // Connect Encoders to the Muxer
for (let i = 0; i < this.#numOfEncWorker; ++i) { for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#encWorkers[i].connect( let enc = this.#encWorkers[i];
enc.connect(
this.#muxWorker, this.#muxWorker,
connectPrepare, async ww => { encMuxBridge(this.#bridges, ww, this.#muxWorker) },
async msg => { async msg => { return isDataMsg(msg); });
return msg.type == MESSAGE_TYPE.DATA
});
} }
} }
...@@ -114,7 +129,7 @@ export class H264EncWWGroup extends WWGroup { ...@@ -114,7 +129,7 @@ export class H264EncWWGroup extends WWGroup {
return this.#numOfWorkers; return this.#numOfWorkers;
} }
dispatch(rgbFrame) { async dispatch(rgbFrame) {
} }
......
let init = false; import { sleep } from '../src/utils.js';
const COMMANDS = Object.freeze({ import * as MSG from '../src/encGrooupMsg.js';
INIT: 0, import { Channel } from '../src/channel.js';
INIT_DONE: 1,
INIT_FAIL: 2,
});
let isInited = false;
let isBridged = false;
let src = null;
let bridge = null;
class ChannelReader { // Parameters
#rFieldPosLen = 4; // Number of bytes for each read
#wFieldPosLen = 4; // from channel
#privFieldLen = 4; let READ_SIZE = (1920*1080*4) * 10 ;
const RETRY_COUNT = 5;
#metaSize = this.#rFieldPosLen + const SLEEP_INTERVAL = 100;
this.#wFieldPosLen +
this.#privFieldLen;
#readPos = 0;
#buffer = null;
#view = null;
#shMem = null;
// Init
onmessage = async e => {
let msg = e.data;
await main(msg);
}
/* Buffer should follow the specification async function main(msg) {
* of Channel in src/encGroup.js */
constructor(shMem) {
this.#shMem = shMem;
this.#buffer = new Uint8Array(this.#shMem);
this.#view = new DataView(this.#shMem);
this.#readPos = this.#view.getUint32(0); if (!MSG.isEncGrpMsg(msg)) {
return;
} }
readPriv() { switch (MSG.typeOfMsg(msg)) {
return [ case MSG.MESSAGE_TYPE.INIT:
this.#view.getUint8(8), init(msg);
this.#view.getUint8(9), break;
this.#view.getUint8(10), case MSG.MESSAGE_TYPE.DATA:
this.#view.getUint8(11), await steps();
]; break;
case MSG.MESSAGE_TYPE.DESTROY:
break;
case MSG.MESSAGE_TYPE.ERROR:
break;
} }
}
writePriv(privData) { function init(msg) {
if (!(privData instanceof Array) || let info = MSG.getInfoFromMsg(msg);
privData.length > this.#privFieldLen) { src = new Channel(info.size, SharedArrayBuffer, info.shm);
throw new Error("Invalid Private datas"); isInited = true;
} }
for (let i = 8; i < 12; ++i) { // Return value:
this.#view.setUint8(i); // True : Success
} // False : Fail
async function step() {
// Read RGB Frame from Channel.
let data = src.readData(READ_SIZE);
} /* RGB Frame Processing */
return await RGBProcessing(data);
}
readData(size) { async function steps() {
let writePos = this.#writePos();
let readTo = 0, readBuffer = null;
if (this.#readPos == writePos) {
return new Uint8Array(0);
} else if (this.#readPos < writePos) {
readTo = this.#readPos + Math.min(size, writePos - this.#readPos);
readBuffer = this.#buffer.slice(this.#readPos, readTo);
this.#readPosUpdate(readTo);
} else {
// Read two times
let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPos);
let secondRSize = firstRSize < size ? size - firstRSize : 0;
secondRSize = Math.min(secondRSize, writePos-this.#metaSize);
readBuffer = new Uint8Array(firstRSize+secondRSize);
// First read
readBuffer.set(this.#buffer.slice(
this.#readPos, this.#readPos+firstRSize), 0);
// Second Read
if (secondRSize > 0) {
readBuffer.set(
this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize),
firstRSize);
this.#readPosUpdate(this.#metaSize+secondRSize);
} else {
let newPos = this.#readPos+firstRSize;
newPos = newPos == this.#buffer.byteLength ? this.#metaSize : newPos;
this.#readPosUpdate(newPos);
}
}
return readBuffer; let retryCount = RETRY_COUNT;
}
#writePos() { assert(retryCount > 0);
return this.#view.getUint32(4);
}
#readPosUpdate(pos) { src.setPriv(MSG.PRIV_FLAGS.EXECUTING);
this.#readPos = pos;
this.#view.setUint32(0, pos);
}
};
while (true) {
if (await step() === false && retryCount >= 0) {
// Init if (retryCount == 0)
onmessage = e => { break;
if (init == false) { await sleep(SLEEP_INTERVAL);
postMessage(COMMANDS.INIT_DONE); --retryCount;
init = true; }
} else {
// Echo
postMessage(e.data);
} }
src.unsetPriv(MSG.PRIV_FLAGS.EXECUTING);
}
///////////////////////////////////////////////////////////////////////////////
// RGB Processing Definitions //
///////////////////////////////////////////////////////////////////////////////
async function RGBProcessing(frame) {
} }
const COMMANDS = Object.freeze({
INIT: 0,
INIT_DONE: 1,
INIT_FAIL: 2,
});
let init = false;
// Init // Init
onmessage = e => { onmessage = e => {
if (init == false) {
postMessage(COMMANDS.INIT_DONE);
init = true;
} else {
// Echo
postMessage(e.data);
}
} }
...@@ -7,7 +7,8 @@ describe("WW Spec", () => { ...@@ -7,7 +7,8 @@ describe("WW Spec", () => {
it("Instantiation", async () => { it("Instantiation", async () => {
let ww = new WW('core', url); let ww = new WW('core', url);
let success = await ww.start(); let success = await ww.start(
async w => { return true });
expect(success).toBe(true); expect(success).toBe(true);
expect(ww.isReady()).toBe(true); expect(ww.isReady()).toBe(true);
...@@ -16,7 +17,7 @@ describe("WW Spec", () => { ...@@ -16,7 +17,7 @@ describe("WW Spec", () => {
it("EchoWorker", async () => { it("EchoWorker", async () => {
let ww = new WW('echo', url); let ww = new WW('echo', url);
await ww.start(); await ww.start(async w => { return true });
expect(ww.isReady()).toBe(true); expect(ww.isReady()).toBe(true);
let count = 0; let count = 0;
......
...@@ -82,7 +82,7 @@ describe("Channel Spec", () => { ...@@ -82,7 +82,7 @@ describe("Channel Spec", () => {
.withContext("Data Mismatch") .withContext("Data Mismatch")
.toBe(true); .toBe(true);
expect(channel.isEmpty()).toBe(true); expect(channel.isEmpty()).toBe(true);
}) });
it("32 KB Datas Write and Read", () => { it("32 KB Datas Write and Read", () => {
let ret = false; let ret = false;
...@@ -131,7 +131,7 @@ describe("Channel Spec", () => { ...@@ -131,7 +131,7 @@ describe("Channel Spec", () => {
it("Transfer 64 MB to WebWorker", async () => { it("Transfer 64 MB to WebWorker", async () => {
let url = new URL('./workers/channelWW.js', import.meta.url), let url = new URL('./workers/channelWW.js', import.meta.url),
ret = true; ret = true;
let channel = new Channel(Math.pow(2,20)); let channel = new Channel(Math.pow(2,16));
let worker = new Worker(url, { type: 'module' }); let worker = new Worker(url, { type: 'module' });
let sended = 0; let sended = 0;
...@@ -139,9 +139,9 @@ describe("Channel Spec", () => { ...@@ -139,9 +139,9 @@ describe("Channel Spec", () => {
[...Array(getRandomInt(Math.pow(2,10))).keys()]); [...Array(getRandomInt(Math.pow(2,10))).keys()]);
let cur = 0; let cur = 0;
let size = Math.pow(2, 26); let size = Math.pow(2, 16);
let rBuffer = new Uint8Array(Math.pow(2, 27)); let rBuffer = new Uint8Array(Math.pow(2, 17));
let sBuffer = new Uint8Array(Math.pow(2, 27)); let sBuffer = new Uint8Array(Math.pow(2, 17));
worker.postMessage(channel.getShMem()); worker.postMessage(channel.getShMem());
...@@ -191,7 +191,11 @@ describe("H264EncWWGroup Spec", () => { ...@@ -191,7 +191,11 @@ describe("H264EncWWGroup Spec", () => {
it("Instantiation", async () => { it("Instantiation", async () => {
let wg = new H264EncWWGroup("h264enc", { numOfWW: 3 }); let wg = new H264EncWWGroup("h264enc", { numOfWW: 3 });
await wg.start(); await wg.start();
await sleep(1000);
expect(wg.numOfWorker()).toBe(3); expect(wg.numOfWorker()).toBe(3);
}); });
}) })
import { Channel } from '../src/channel.js'; import { Channel } from '../src/channel.js';
async function sleep(ms) { async function sleep(ms) {
await new Promise(r => setTimeout(() => r(), ms)); await new Promise(r => setTimeout(() => r(), ms));
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment