Commit 6ff90b93 authored by Linshizhi's avatar Linshizhi

Add direct write to H264EncGroup.

parent 216d0dcf
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
import { assert, waitCond, sleep } from './utils.js'; import { assert, waitCond, sleep } from './utils.js';
import { WWGroup } from './WWGroup.js'; import { WWGroup } from './WWGroup.js';
import { WW, WWInitError, WW_STATE } from './WW.js'; import { WW, WWInitError, WW_STATE } from './WW.js';
import { Channel } from './channel.js'; import { Channel, ChnlSetupConflictError, ChnlStepConflictError } from './channel.js';
import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg, typeOfMsg, PRIV_FLAGS } from './encGrooupMsg.js'; import { MESSAGE_TYPE, makeMsg, isDataMsg, makeInitMsg, typeOfMsg, PRIV_FLAGS } from './encGrooupMsg.js';
export class H264EncWWGroupInitError extends Error { export class H264EncWWGroupInitError extends Error {
...@@ -17,6 +17,7 @@ export const H264GRP_OPTIONS = Object.freeze({ ...@@ -17,6 +17,7 @@ export const H264GRP_OPTIONS = Object.freeze({
MUXER_CHNL_SIZE: "muxchnlsize", MUXER_CHNL_SIZE: "muxchnlsize",
ENC_CHNL_SIZE: "encchnlsize", ENC_CHNL_SIZE: "encchnlsize",
BRIDGE_CHNL_SIZE: "bridgechnlsize", BRIDGE_CHNL_SIZE: "bridgechnlsize",
BLOCK_SIZE: "blocksize",
}); });
/* WWGroup which target is to encode RGB frames into /* WWGroup which target is to encode RGB frames into
...@@ -51,6 +52,8 @@ export class H264EncWWGroup extends WWGroup { ...@@ -51,6 +52,8 @@ export class H264EncWWGroup extends WWGroup {
#begin = false; #begin = false;
#terminated = false; #terminated = false;
#blockSize = 0;
/* Precondition: 2 <= options.numOfWW */ /* Precondition: 2 <= options.numOfWW */
constructor(name, numOfWW, options = {}) { constructor(name, numOfWW, options = {}) {
...@@ -75,6 +78,9 @@ export class H264EncWWGroup extends WWGroup { ...@@ -75,6 +78,9 @@ export class H264EncWWGroup extends WWGroup {
if ('bridgechnlsize' in options) if ('bridgechnlsize' in options)
this.#bridgeChannelSize = options.bridgechnlsize; this.#bridgeChannelSize = options.bridgechnlsize;
if ('blocksize' in options)
this.#blockSize = options.blocksize;
this.#numOfWorkers = numOfWW; this.#numOfWorkers = numOfWW;
this.#numOfEncWorker = this.#numOfWorkers - 1; this.#numOfEncWorker = this.#numOfWorkers - 1;
} }
...@@ -89,11 +95,11 @@ export class H264EncWWGroup extends WWGroup { ...@@ -89,11 +95,11 @@ export class H264EncWWGroup extends WWGroup {
// in order. // in order.
// Create Muxer // Create Muxer
this.#muxWorker = new WW("MUX", '/paraencode/resources/workers/muxWW.js'); this.#muxWorker = new WW("MUX", '../resources/workers/muxWW.js');
// Create Encoder // Create Encoder
for (let i = 0; i < this.#numOfEncWorker; ++i) { for (let i = 0; i < this.#numOfEncWorker; ++i) {
let worker = new WW("ENC_" + i, '/paraencode/resources/workers/encWW.js'); let worker = new WW("ENC_" + i, '../resources/workers/encWW.js');
this.#encWorkers.push(worker); this.#encWorkers.push(worker);
} }
} }
...@@ -111,7 +117,7 @@ export class H264EncWWGroup extends WWGroup { ...@@ -111,7 +117,7 @@ export class H264EncWWGroup extends WWGroup {
await this.#encWorkers[i].start(async ww => { await this.#encWorkers[i].start(async ww => {
return await initStrategies(encInit, INIT_FLAGS.NONE, ww, return await initStrategies(encInit, INIT_FLAGS.NONE, ww,
// encInit parameters // encInit parameters
this.#channels, this.#encChannelSize, ww); this.#channels, this.#encChannelSize, ww, this.#blockSize);
}); });
} }
} catch (e) { } catch (e) {
...@@ -140,8 +146,7 @@ export class H264EncWWGroup extends WWGroup { ...@@ -140,8 +146,7 @@ export class H264EncWWGroup extends WWGroup {
return this.#numOfWorkers; return this.#numOfWorkers;
} }
async dispatch(rgbFrame) { async #writeInternal(frame, writeMethod, frameCheck = true) {
if (!this.#begin) { if (!this.#begin) {
this.#muxWorker.postMessage( this.#muxWorker.postMessage(
makeMsg(MESSAGE_TYPE.DATA, {})); makeMsg(MESSAGE_TYPE.DATA, {}));
...@@ -150,7 +155,7 @@ export class H264EncWWGroup extends WWGroup { ...@@ -150,7 +155,7 @@ export class H264EncWWGroup extends WWGroup {
if (this.#terminated) return; if (this.#terminated) return;
// Null is treated as terminated frame. // Null is treated as terminated frame.
if (rgbFrame == null) { if (frameCheck && frame == null) {
for (let i = 0; i < this.#numOfEncWorker; ++i) { for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#channels[i].setPriv(PRIV_FLAGS.EOF); this.#channels[i].setPriv(PRIV_FLAGS.EOF);
} }
...@@ -159,9 +164,7 @@ export class H264EncWWGroup extends WWGroup { ...@@ -159,9 +164,7 @@ export class H264EncWWGroup extends WWGroup {
return; return;
} }
while (this.#channels[this.#curProcWW].push(rgbFrame) == false) { await writeMethod(frame);
await sleep(50);
}
if (!this.#channels[this.#curProcWW].isSetPriv(PRIV_FLAGS.EXECUTING)) { if (!this.#channels[this.#curProcWW].isSetPriv(PRIV_FLAGS.EXECUTING)) {
this.#encWorkers[this.#curProcWW].postMessage( this.#encWorkers[this.#curProcWW].postMessage(
...@@ -171,6 +174,34 @@ export class H264EncWWGroup extends WWGroup { ...@@ -171,6 +174,34 @@ export class H264EncWWGroup extends WWGroup {
this.#curProcWW = (this.#curProcWW + 1) % this.#numOfEncWorker; this.#curProcWW = (this.#curProcWW + 1) % this.#numOfEncWorker;
} }
async dispatch(rgbFrame) {
await this.#writeInternal(rgbFrame, async () => {
while (this.#channels[this.#curProcWW].push(rgbFrame) == false) {
await sleep(50);
}
});
}
directWPrepare(size) {
try {
return this.#channels[this.#curProcWW].writeStep1(size);
} catch (e) {
if (e instanceof ChnlStepConflictError)
throw new Error("Previous direct write need to be confirmed");
}
}
async directWConfirm() {
await this.#writeInternal(undefined, async () => {
try {
this.#channels[this.#curProcWW].writeStep2();
} catch (e) {
if (e instanceof ChnlSetupConflictError)
throw new Error("Need to call directWPrepare() before directWConfirm");
}
}, false);
}
isDone() { isDone() {
return this.#muxWorker.isDone(); return this.#muxWorker.isDone();
} }
...@@ -247,11 +278,21 @@ function syncInitStrategy(init, ww) { ...@@ -247,11 +278,21 @@ function syncInitStrategy(init, ww) {
return strategy; return strategy;
} }
async function encInit(chnls, size, ww) { async function encInit(chnls, size, ww, blockSize) {
let channel = new Channel(size); let channel = new Channel(size);
if (blockSize > 0) {
channel.enableBlockMode(blockSize);
}
ww.postMessage( ww.postMessage(
makeInitMsg(channel.getShMem(), size, ww.ident())); makeMsg(MESSAGE_TYPE.INIT, {
shm: channel.getShMem(),
size: size,
ident: ww.ident(),
blocksize: blockSize,
}));
chnls.push(channel); chnls.push(channel);
return true; return true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment