Commit 9d20a86f authored by Linshizhi's avatar Linshizhi

update

parent 5b72bcf8
......@@ -134,7 +134,6 @@ async function step() {
continue;
}
/* Write data into wasm */
muxBuffer = muxer._malloc(data.byteLength);
muxer.HEAP8.set(data, muxBuffer);
......
import { MESSAGE_TYPE, typeOfMsg } from './encGrooupMsg';
import { makeMsg, MESSAGE_TYPE, typeOfMsg } from './encGrooupMsg';
import { Observable, filter } from './rxjs';
......@@ -12,8 +12,10 @@ export class WWInitError extends Error {
export const WW_STATE = Object.freeze({
UNINITIALIZED : 0,
INITED : 4,
READY : 1,
RUNNING : 2,
FIN : 3,
});
......@@ -29,6 +31,7 @@ export class WW extends Observable {
#remoteInited = false;
#remoteBridged = false;
#done = false;
#monitorSub = null;
#state = WW_STATE.UNINITIALIZED;
......@@ -87,10 +90,10 @@ export class WW extends Observable {
await preconnect(this, targetWW);
let sub = this.pipe(filter(data => msgPredicate(data)))
.subscribe(data => { targetWW.postMessage(data); });
//let sub = this.pipe(filter(data => { msgPredicate(data);} ))
// .subscribe(data => { targetWW.postMessage(data); });
this.#connected[targetWW.ident] = sub;
//this.#connected[targetWW.ident] = sub;
}
async disconnect(targetWW, uninstall) {
......@@ -98,7 +101,7 @@ export class WW extends Observable {
await uninstall(targetWW);
if (targetWW.ident in this.#connected) {
this.#connected[targetWW.ident].unsubscribe();
//this.#connected[targetWW.ident].unsubscribe();
delete this.#connected[targetWW.ident];
}
}
......@@ -123,12 +126,14 @@ export class WW extends Observable {
switch (typeOfMsg(msg)) {
case MESSAGE_TYPE.INIT:
this.#remoteInited = true;
this.#state = WW_STATE.INITED;
break;
case MESSAGE_TYPE.BRIDGE:
this.#remoteBridged = true;
break;
case MESSAGE_TYPE.DONE:
this.#done = true;
this.#state == WW_STATE.FIN;
break;
}
......@@ -139,5 +144,4 @@ export class WW extends Observable {
});
}
}
......@@ -32,7 +32,7 @@ export class Channel {
#rFieldPosLen = 4;
#wFieldPosLen = 4;
#priFieldLen = 4;
#numOfMetaField = 3
#numOfMetaField = 3;
#fieldSize = 0;
#metaSize = 0;
......
......@@ -52,7 +52,7 @@ export class H264EncWWGroup extends WWGroup {
#terminated = false;
/* Precondition: 2 <= options.numOfWW */
constructor(name, options) {
constructor(name, numOfWW, options = {}) {
super();
......@@ -75,17 +75,8 @@ export class H264EncWWGroup extends WWGroup {
if ('bridgechnlsize' in options)
this.#bridgeChannelSize = options.bridgechnlsize;
if ('numOfWW' in options) {
// Precondition not meet:
// At least two web workers is required, one encode worker
// and one mux worker.
if (options.numOfWW < 2) {
throw new Error("Number of worker is too small (<2)");
}
this.#numOfWorkers = options.numOfWW;
this.#numOfEncWorker = options.numOfWW - 1;
}
this.#numOfWorkers = numOfWW;
this.#numOfEncWorker = this.#numOfWorkers - 1;
}
// H264EncWWGroup has two types of workers:
......@@ -158,11 +149,8 @@ export class H264EncWWGroup extends WWGroup {
}
if (this.#terminated) return;
console.log("Proc WW is " + this.#encWorkers[this.#curProcWW].ident());
// Null is treated as terminated frame.
if (rgbFrame == null) {
console.log("EOF");
for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#channels[i].setPriv(PRIV_FLAGS.EOF);
}
......@@ -171,21 +159,22 @@ export class H264EncWWGroup extends WWGroup {
return;
}
console.log("Ready to push...");
while (this.#channels[this.#curProcWW].push(rgbFrame) == false) {
console.log("Pushing...");
await sleep(300);
}
if (!this.#channels[this.#curProcWW].isSetPriv(PRIV_FLAGS.EXECUTING)) {
this.#encWorkers[this.#curProcWW].postMessage(
makeMsg(MESSAGE_TYPE.DATA, {}))
}
this.#curProcWW = (this.#curProcWW + 1) % this.#numOfEncWorker;
}
isDone() {
return this.#muxWorker.isDone();
}
async getResult() {
// Muxer is not done the job.
......@@ -201,11 +190,17 @@ export class H264EncWWGroup extends WWGroup {
makeMsg(MESSAGE_TYPE.DATA_REQUIRE, {}));
await waitCond(() => result != null, 500);
sub.unsusbscribe();
sub.unsubscribe();
return result;
}
terminate() {
this.#muxWorker.terminate();
for (let i = 0; i < this.#numOfEncWorker; ++i)
this.#encWorkers[i].terminate();
}
}
......@@ -241,18 +236,11 @@ function syncInitStrategy(init, ww) {
let inited = undefined;
let strategy = async () => {
let sub = ww.subscribe(msg => {
inited = typeOfMsg(msg) == MESSAGE_TYPE.INIT;
console.log("INITED: " + ww.ident());
sub.unsubscribe();
});
if (await init() === false) {
return false;
}
// Waiting 1 Second for Encoder/Muxer
let ret = await waitCond(() => inited === true, 5000);
let ret = await waitCond(() => ww.getState() == WW_STATE.INITED, 5000);
return ret;
};
......
......@@ -197,6 +197,7 @@ int writeToOFormat() {
}
streamWriteFrame(octx, pkt);
printf("Writed\n");
av_packet_free(&pkt);
......
import { sleep } from '../src/utils.js';
import { sleep, waitCond } from '../src/utils.js';
import { H264EncWWGroup, H264GRP_OPTIONS } from '../src/encGroup.js';
describe("H264EncWWGroup Spec", () => {
it("Instantiation", async () => {
let wg = new H264EncWWGroup("h264enc", { numOfWW: 2 });
let wg = new H264EncWWGroup("h264enc", 2);
await wg.start();
await sleep(1000);
......@@ -11,11 +11,10 @@ describe("H264EncWWGroup Spec", () => {
expect(wg.numOfWorker()).toBe(2);
});
fit("Encode by H264EncWWGroup Spec", async () => {
it("Encode by H264EncWWGroup Spec", async () => {
const RGBAFrameSize = 1920*1080*4;
let grp = new H264EncWWGroup("h264enc", {
numOfWW: 11,
let grp = new H264EncWWGroup("h264enc", 7, {
encchnlsize: RGBAFrameSize * 10,
bridgechnlsize: Math.pow(2, 25)
});
......@@ -28,11 +27,16 @@ describe("H264EncWWGroup Spec", () => {
await grp.dispatch(data);
}
console.log("Main Done");
// Terminated
await grp.dispatch(null);
await sleep(100000);
}, 300000)
// Waiting for EncGroup
await waitCond(() => grp.isDone());
let result = await grp.getResult();
console.log(result);
grp.terminate();
}, 3000000)
})
import { ParaEncoder, ENCODE_MODE } from "../src/paraEncode.js";
import { sleep } from "../src/utils.js";
let paraEnc;
beforeEach(() => {
paraEnc = new ParaEncoder(2, "H264", ENCODE_MODE.SAVE_MEMORY);
const RGBAFrameSize = 1920*1080*4;
beforeEach(async () => {
paraEnc = new ParaEncoder(11, {
codec: "H264",
grpcfg: {
encchnlsize: RGBAFrameSize * 10,
bridgechnlsize: Math.pow(2, 25),
}
});
await paraEnc.init();
});
describe("ParaEncoder", () => {
it("ParaEncoder Normal Init", () => {
expect(paraEnc.numOfWW()).toBe(2);
expect(paraEnc.codec()).toBe("H264");
expect(paraEnc.mode()).toBe(ENCODE_MODE.SAVE_MEMORY);
});
fit("Encode With ParaEncoder", async () => {
const data = new Uint8Array([...Array(RGBAFrameSize).keys()]);
it("ParaEncoder Invalid Init", () => {
try {
new ParaEncoder(2, 1, ENCODE_MODE.SAVE_MEMORY);
} catch (err) {}
});
for (let i = 0; i < 3000; ++i) {
await paraEnc.encode(data);
}
it("ParaEncoder Invalid encode mode", () => {
await paraEnc.encode(null);
let WRONG_MODE = 0;
try {
new ParaEncoder(1, "h264", WRONG_MODE);
} catch (err) {}
});
it("Encode With ParaEncoder", () => {
});
await sleep(3000000);
}, 300000);
});
......@@ -114,6 +114,7 @@ FLAGS_ENCODER=(
-s EXPORTED_FUNCTIONS="[_main,_malloc,_free]" # export main and proxy_main funcs
-s EXPORTED_RUNTIME_METHODS="[FS, cwrap, ccall, setValue, writeAsciiToMemory, getValue]" # export preamble funcs
-s INITIAL_MEMORY=268435456 # 64 KB * 1024 * 16 * 2047 = 2146435072 bytes ~= 2 GB, 268435456 =256M, 134,217,728 =128M
-s ASSERTIONS=1
--pre-js $WORKPATH/pre.js
--post-js $WORKPATH/post.js
$ENCODER_OPTIM_FLAGS
......@@ -138,6 +139,7 @@ FLAGS_MUXER=(
-s EXPORTED_FUNCTIONS="[_main,_malloc,_free]" # export main and proxy_main funcs
-s EXPORTED_RUNTIME_METHODS="[FS, cwrap, ccall, setValue, writeAsciiToMemory, getValue]" # export preamble funcs
-s INITIAL_MEMORY=536870912 # 64 KB * 1024 * 16 * 2047 = 2146435072 bytes ~= 2 GB, 268435456 =256M, 134,217,728 =128M
-s ASSERTIONS=1
--pre-js $WORKPATH/pre.js
--post-js $WORKPATH/post.js
$MUXER_OPTIM_FLAGS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment