Commit 0bf24f43 authored by Linshizhi's avatar Linshizhi

Use classic Web worker.

parent 58e2992e
......@@ -18,8 +18,7 @@ export class WW extends Observable {
this.#ident = name;
this.#ww = new Worker(
new URL(path, import.meta.url),
{ type: 'module' }
new URL(path, import.meta.url)
);
}
......
import { assert } from './utils.js';
import { assert, sleep } from './utils.js';
import { WWGroup } from './WWGroup.js';
import { WW } from './WW.js';
import { Channel } from './channel.js';
......
import { sleep } from '/src/utils.js';
import * as MSG from '/src/encGrooupMsg.js';
import { Channel } from '/src/channel.js';
let encoder = null;
let isInited = false;
let isBridged = false;
let src = null;
let bridge = null;
// Parameters
// Number of bytes for each read
// from channel
let READ_SIZE = (1920*1080*4) * 10 ;
const RETRY_COUNT = 5;
const SLEEP_INTERVAL = 100;
// Load wasm encoder
self.importScripts('/src/mp4encoder.js');
createMP4Encoder().then(m => {
encoder = m;
});
// Init
onmessage = async e => {
let msg = e.data;
......@@ -25,27 +25,37 @@ onmessage = async e => {
async function main(msg) {
if (!MSG.isEncGrpMsg(msg)) {
if (!isEncGrpMsg(msg)) {
return;
}
switch (MSG.typeOfMsg(msg)) {
case MSG.MESSAGE_TYPE.INIT:
if (!isInited) init(msg);
switch (typeOfMsg(msg)) {
case MESSAGE_TYPE.INIT:
if (!isInited)
await init(msg);
break;
case MSG.MESSAGE_TYPE.DATA:
case MESSAGE_TYPE.DATA:
await steps();
break;
case MSG.MESSAGE_TYPE.DESTROY:
case MESSAGE_TYPE.DESTROY:
break;
case MSG.MESSAGE_TYPE.ERROR:
case MESSAGE_TYPE.ERROR:
break;
}
}
function init(msg) {
let info = MSG.getInfoFromMsg(msg);
async function init(msg) {
let info = getInfoFromMsg(msg);
src = new Channel(info.size, SharedArrayBuffer, info.shm);
// Wait encoder init
while (encoder == null)
await sleep(100);
let size = encoder._malloc(4);
let mem = encoder._encode(size);
console.log(encoder.getValue(size, 'i32'));
isInited = true;
}
......@@ -88,4 +98,402 @@ async function steps() {
async function RGBProcessing(frame) {
return false;
}
///////////////////////////////////////////////////////////////////////////////
// Definitions //
///////////////////////////////////////////////////////////////////////////////
class CpySchedule {
first = { pos: 0, size: 0 }
second = { pos: 0, size: 0 }
}
/* An abstraction that help to
* easily to transfer datas to target worker
* in real time manner */
class Channel {
/* Size of memory area that help to
* maintain Channel. Types of meta info
* are shown below:
*
* 1.Write Position
* 2.Read Position
*
* Figure:
* --------------------------------------------------------------------------------------------
* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Private (4 Bytes) | Data Area (N bytes) |
* --------------------------------------------------------------------------------------------
* where N >= 2
*
* */
#rFieldPosLen = 4;
#wFieldPosLen = 4;
#priFieldLen = 4;
#numOfMetaField = 3
#fieldSize = 0;
#metaSize = 0;
#size = 0;
#totalSize = 0;
#shMem = undefined;
#view = undefined;
#buffer = undefined;
#writePointerCache = 0;
#readPointerCache = 0;
#endPos = 0;
// size's unit is byte
// bufferType parameter is mainly for testability.
constructor(size, bufferType = SharedArrayBuffer, shMem = null) {
assert(size >= 2, `Channel require its data area has at least 2 Bytes.`)
this.#size = size;
// Init shared memory
this.#metaSize = this.#rFieldPosLen + this.#wFieldPosLen + this.#priFieldLen;
this.#shMem = shMem == null ? new bufferType(size + this.#metaSize) : shMem;
this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
if (shMem == null) {
this.#writePointerCache = this.#metaSize;
this.#readPointerCache = this.#metaSize;
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#writePointerCache);
this.#view.setUint32(4, this.#readPointerCache);
} else {
this.#writePointerCache = this.#getWritePointer();
this.#readPointerCache = this.#getReadPointer();
}
this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
}
#getReadPointer() {
return this.#view.getUint32(0);
}
#getWritePointer() {
return this.#view.getUint32(4);
}
metaSize() {
return this.#metaSize;
}
readPriv() {
return this.#view.getUint32(8);
}
writePriv(privData) {
try {
this.#view.setUint32(8, privData);
} catch (error) {
if (error instanceof RangeError)
return false;
}
return true;
}
setPriv(flag) {
let old = this.readPriv();
this.writePriv(flag | old);
}
unsetPriv(flag) {
let old = this.readPriv();
flag = flag ^ 0x11111111;
this.writePriv(flag & old);
}
/* Semantic: Is able to write 'size' of datas
* into #shMem. */
#isAbleToWrite(size) {
return this.#remain() >= size;
}
getShMem() {
return this.#shMem;
}
#remain() {
let readPos = this.#getReadPointer();
if (this.#writePointerCache == readPos) {
return this.#size - 1;
} else if (this.#writePointerCache > readPos) {
return this.#size - (this.#writePointerCache - readPos) - 1;
} else {
return readPos - this.#writePointerCache - 1;
}
}
/* Channel use an array buffer as
* a circular buffer, so some datas
* may unable to be done in one copy.
* Two times of copy is required to handle
* such kind of situation.
*
* Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/
#cpySchedule(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule();
let readPos = this.#getReadPointer();
if (this.#writePointerCache >= readPos) {
spaceToTail = this.#endPos - this.#writePointerCache;
firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
secondCpySize = Math.min(secondCpySize, readPos - this.#metaSize);
schedule.first.pos = this.#writePointerCache;
schedule.first.size = firstCpySize;
schedule.second.pos = secondCpySize > 0 ? this.#metaSize : 0;
schedule.second.size = secondCpySize;
} else {
schedule.first.pos = this.#writePointerCache;
schedule.first.size = Math.min(readPos - this.#writePointerCache - 1, size);
}
return schedule;
}
dataView() {
return this.#buffer;
}
isEmpty() {
return this.#getWritePointer() == this.#getReadPointer();
}
// This method is for testing purposes.
readData(size) {
let writePos = this.#getWritePointer();
let readTo = 0, readBuffer = null;
if (this.#readPointerCache == writePos) {
return new Uint8Array(0);
} else if (this.#readPointerCache < writePos) {
readTo = this.#readPointerCache + Math.min(size, writePos - this.#readPointerCache);
readBuffer = this.#buffer.slice(this.#readPointerCache, readTo);
this.#readPointerUpdate(readTo);
} else {
// To make sure
let firstRSize = Math.min(size, this.#buffer.byteLength-this.#readPointerCache);
let secondRSize = firstRSize < size ? size - firstRSize : 0;
secondRSize = Math.min(secondRSize, writePos-this.#metaSize);
readBuffer = new Uint8Array(firstRSize+secondRSize);
// First read
readBuffer.set(this.#buffer.slice(
this.#readPointerCache, this.#readPointerCache+firstRSize), 0);
// Second Read
if (secondRSize > 0) {
readBuffer.set(
this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize),
firstRSize);
this.#readPointerUpdate(this.#metaSize+secondRSize);
} else {
let newPos = this.#readPointerCache+firstRSize;
newPos = newPos == this.#buffer.byteLength ? this.#metaSize : newPos;
this.#readPointerUpdate(newPos);
}
}
return readBuffer;
}
push(data /* Uint8Array */) {
let writePos = this.#writePointerCache;
if (!this.#isAbleToWrite(data.byteLength)) {
return false;
}
/* Write to shared Memory */
// Get a copy schedule, more details please read
// comment of #cpySchedule.
let schedule = this.#cpySchedule(data.byteLength);
// Perfrom write schedule
let srcPos = 0, plan;
for (let key in schedule) {
plan = schedule[key];
if (plan.size == 0)
continue;
this.#buffer.set(
data.slice(srcPos, srcPos+plan.size), plan.pos)
srcPos += plan.size;
writePos = plan.pos+plan.size;
}
// Caution: 'Write Pointer' must be updated after
// all datas are writed but not before or
// at intermediate of some writes otherwise
// oppsite side may read invalid datas.
this.#writePointerUpdate(writePos);
return true;
}
#writePointerUpdate(pos) {
this.#writePointerCache = pos;
this.#view.setUint32(4, pos);
}
#readPointerUpdate(pos) {
this.#readPointerCache = pos;
this.#view.setUint32(0, pos);
}
}
///////////////////////////////////////////////////////////////////////////////
// Message Definitions //
///////////////////////////////////////////////////////////////////////////////
const MESSAGE_TYPE = Object.freeze({
ERROR : -1,
MSG_MIN : 0,
INIT : 0,
DESTROY : 1,
/* Once Muxer receive a 'CONNECT_PREPARE' message
* It will use information along with the message to
* change it's status to get ready to be connected by
* Encoder. */
CONNECT_PREPARE : 2,
/* Similar to 'CONNECT_PREPARE' */
DISCONNECT_PREPARE : 3,
/* Notify to oppsite that there is datas
* within shared memory */
DATA : 4,
MSG_MAX : 5,
});
function makeMsg(type, info_) {
assert(type >= MESSAGE_TYPE.MSG_MIN &&
type <= MESSAGE_TYPE.MSG_MAX);
return { type: type, info: info_ };
}
function makeInitMsg(shm, size) {
return makeMsg(MESSAGE_TYPE.INIT, {shm: shm, size:size})
}
function typeOfMsg(msg) {
if ('type' in msg) {
return msg.type;
}
return MESSAGE_TYPE.ERROR;
}
function getInfoFromMsg(msg) {
if ('info' in msg) {
return msg.info;
} else {
throw new Error("Not a encGrp message");
}
}
function isEncGrpMsg(msg) {
return msg instanceof Object &&
'type' in msg &&
'info' in msg &&
msg.type >= MESSAGE_TYPE.MSG_MIN &&
msg.type <= MESSAGE_TYPE.MSG_MAX;
}
function isMsgWithType(msg, type) {
if (type >= MESSAGE_TYPE.MSG_MIN &&
type <= MESSAGE_TYPE.MSG_MAX) {
return false;
}
if (!(msg instanceof Object)) {
return false;
}
if ('type' in msg) {
return msg['type'] == type;
} else {
return false;
}
}
function isInitMsg(msg) {
return isMsgWithType(msg, MESSAGE_TYPE.INIT);
}
function isDestroyMsg(msg) {
return isMsgWithType(msg, MESSAGE_TYPE.DESTROY);
}
function isConnectMsg(msg) {
return isMsgWithType(msg, MESSAGE_TYPE.CONNECT_PREPARE);
}
function isDisconnectMsg(msg) {
return isMsgWithType(msg, MESSAGE_TYPE.DISCONNECT_PREPARE);
}
function isDataMsg(msg) {
return isMsgWithType(msg, MESSAGE_TYPE.DATA);
}
///////////////////////////////////////////////////////////////////////////////
// Private Field of Channel //
///////////////////////////////////////////////////////////////////////////////
/*
* Priv fields
* ----------------------------------------------------------------------------------
* | NAN (1 Bye) | NAN (1 Byte) | NAN (1 Byte) | NAN (7 Bits) | Executing (1 Bit) |
* ----------------------------------------------------------------------------------
* */
let PRIV_FLAGS = Object.freeze({
EXECUTING : 0x00000001,
});
function assert(expr, message) {
if (!expr) {
throw new Error(message || 'Assertion failed');
}
}
async function sleep(ms) {
await new Promise(r => setTimeout(() => r(), ms));
}
import { sleep } from '../src/utils.js';
import { H264EncWWGroup } from '../src/encGroup.js';
import { Obervable, Observable } from 'rxjs';
......@@ -189,12 +188,12 @@ describe("Channel Spec", () => {
describe("H264EncWWGroup Spec", () => {
it("Instantiation", async () => {
let wg = new H264EncWWGroup("h264enc", { numOfWW: 3 });
let wg = new H264EncWWGroup("h264enc", { numOfWW: 2 });
await wg.start();
await sleep(1000);
expect(wg.numOfWorker()).toBe(3);
expect(wg.numOfWorker()).toBe(2);
});
......
......@@ -2,7 +2,7 @@ const path = require('path');
const webpack = require("webpack");
module.exports = {
entry: './tests/specs.js',
entry: './specs.js',
mode: 'development',
output: {
filename: 'spec_bundle.js',
......@@ -16,9 +16,6 @@ module.exports = {
exclude: /node_modules/,
use: ['babel-loader']
},
{
}
]
},
plugins: [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment