Commit 2f83cc36 authored by Linshizhi's avatar Linshizhi

WW as Module Worker if browser support.

parent 86536ce1
...@@ -18,7 +18,8 @@ export class WW extends Observable { ...@@ -18,7 +18,8 @@ export class WW extends Observable {
this.#ident = name; this.#ident = name;
this.#ww = new Worker( this.#ww = new Worker(
new URL(path, import.meta.url) new URL(path, import.meta.url),
{ type: 'module' }
); );
} }
......
...@@ -22,244 +22,6 @@ function makeMessage(msgType, datas) { ...@@ -22,244 +22,6 @@ function makeMessage(msgType, datas) {
return {type:msgType, data: datas}; return {type:msgType, data: datas};
} }
class CpySchedule {
first = { pos: 0, size: 0 }
second = { pos: 0, size: 0 }
}
/* An abstraction that help to
* easily to transfer datas to target worker
* in real time manner */
export class Channel {
/* Size of memory area that help to
* maintain Channel. Types of meta info
* are shown below:
*
* 1.Write Position
* 2.Read Position
*
* Figure:
* --------------------------------------------------------------------------------------------
* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Private (4 Bytes) | Data Area (N bytes) |
* --------------------------------------------------------------------------------------------
* where N >= 2
*
* */
#rFieldPosLen = 4;
#wFieldPosLen = 4;
#priFieldLen = 4;
#numOfMetaField = 3
#fieldSize = 0;
#metaSize = 0;
#size = 0;
#totalSize = 0;
#WW = undefined;
#shMem = undefined;
#view = undefined;
#buffer = undefined;
#writePos = 0;
#endPos = 0;
// size's unit is byte
// bufferType parameter is mainly for testability.
constructor(size, bufferType = SharedArrayBuffer) {
assert(size >= 2, `Channel require its data area has at least 2 Bytes.`)
this.#WW = WW;
this.#size = size;
// Init shared memory
this.#metaSize = this.#rFieldPosLen + this.#wFieldPosLen + this.#priFieldLen;
this.#shMem = new bufferType(size + this.#metaSize);
this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
this.#writePos = this.#metaSize;
this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
// Init readPointer and writePointer to
// the first bytes of data area.
this.#view.setUint32(0, this.#metaSize);
this.#view.setUint32(4, this.#metaSize);
}
#readPos() {
return this.#view.getUint32(0);
}
readPriv() {
return [
this.#view.getUint8(8),
this.#view.getUint8(9),
this.#view.getUint8(10),
this.#view.getUint8(11),
];
}
writePriv(privData) {
if (!(privData instanceof Array) ||
privData.length > this.#priFieldLen) {
throw new Error("Invalid Private datas");
}
for (let i = 8; i < 12; ++i) {
this.#view.setUint8(i);
}
}
/* Semantic: Is able to write 'size' of datas
* into #shMem. */
#isAbleToWrite(size) {
return this.#remain() >= size;
}
getShMem() {
return this.#shMem;
}
#remain() {
let readPos = this.#readPos();
if (this.#writePos == readPos) {
return this.#size - 1;
} else if (this.#writePos > readPos) {
return this.#size - (this.#writePos - readPos) - 1;
} else {
return readPos - this.#writePos - 1;
}
}
/* Channel use an array buffer as
* a circular buffer, so some datas
* may unable to be done in one copy.
* Two times of copy is required to handle
* such kind of situation.
*
* Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/
#cpySchedule(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule();
let readPos = this.#readPos();
if (this.#writePos >= readPos) {
spaceToTail = this.#endPos - this.#writePos;
firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
secondCpySize = Math.min(secondCpySize, readPos - this.#metaSize);
schedule.first.pos = this.#writePos;
schedule.first.size = firstCpySize;
schedule.second.pos = secondCpySize > 0 ? this.#metaSize : 0;
schedule.second.size = secondCpySize;
} else {
schedule.first.pos = this.#writePos;
schedule.first.size = Math.min(readPos - this.#writePos - 1, size);
}
return schedule;
}
dataView() {
return this.#buffer;
}
isEmpty() {
return this.#writePos == this.#readPos();
}
// This method is for testing purposes.
readData(size) {
let readPos = this.#readPos(), readSize = 0,
firstReadSize = 0, secondReadSize = 0;
// Empty
if (this.#writePos == readPos) {
return new Uint8Array(0);
} else if (this.#writePos > readPos) {
readSize = Math.min(size, this.#writePos - readPos);
this.#readPosUpdate(readPos+readSize);
return this.#buffer.slice(readPos, readPos+readSize);
} else {
// Read two times
firstReadSize = Math.min(size, this.#totalSize-readPos);
secondReadSize = firstReadSize < size ? size - firstReadSize : 0;
secondReadSize = Math.min(secondReadSize, this.#writePos-this.#metaSize);
let readBuffer = new Uint8Array(firstReadSize+secondReadSize);
// First read
readBuffer.set(this.#buffer.slice(readPos, readPos+firstReadSize), 0);
if (secondReadSize > 0) {
readBuffer.set(this.#buffer.slice(this.#metaSize, this.#metaSize+secondReadSize), firstReadSize);
this.#readPosUpdate(this.#metaSize+secondReadSize);
} else {
let newPos = readPos+firstReadSize;
newPos = newPos == this.#totalSize ? this.#metaSize : newPos;
this.#readPosUpdate(newPos);
}
return readBuffer;
}
}
push(data /* Uint8Array */) {
let writePos = this.#writePos;
if (!this.#isAbleToWrite(data.byteLength)) {
return false;
}
/* Write to shared Memory */
// Get a copy schedule, more details please read
// comment of #cpySchedule.
let schedule = this.#cpySchedule(data.byteLength);
// Perfrom write schedule
let srcPos = 0, plan;
for (let key in schedule) {
plan = schedule[key];
if (plan.size == 0)
continue;
this.#buffer.set(
data.slice(srcPos, srcPos+plan.size), plan.pos)
srcPos += plan.size;
writePos = plan.pos+plan.size;
}
// Caution: 'Write Pointer' must be updated after
// all datas are writed but not before or
// at intermediate of some writes otherwise
// oppsite side may read invalid datas.
this.#writePosUpdate(writePos);
return true;
}
#writePosUpdate(pos) {
this.#writePos = pos;
this.#view.setUint32(4, pos);
}
#readPosUpdate(pos) {
this.#view.setUint32(0, pos);
}
}
async function connectPrepare(WW) { async function connectPrepare(WW) {
......
...@@ -7,6 +7,7 @@ const COMMANDS = Object.freeze({ ...@@ -7,6 +7,7 @@ const COMMANDS = Object.freeze({
INIT_FAIL: 2, INIT_FAIL: 2,
}); });
class ChannelReader { class ChannelReader {
#rFieldPosLen = 4; #rFieldPosLen = 4;
...@@ -58,9 +59,10 @@ class ChannelReader { ...@@ -58,9 +59,10 @@ class ChannelReader {
let writePos = this.#writePos(); let writePos = this.#writePos();
let readTo = 0, readBuffer = null; let readTo = 0, readBuffer = null;
if (this.#readPos == writePos) { if (this.#readPos == writePos) {
return new Uint8Array(0); return new Uint8Array(0);
} else if (this.readPos < writePos) { } else if (this.#readPos < writePos) {
readTo = this.#readPos + Math.min(size, writePos - this.#readPos); readTo = this.#readPos + Math.min(size, writePos - this.#readPos);
readBuffer = this.#buffer.slice(this.#readPos, readTo); readBuffer = this.#buffer.slice(this.#readPos, readTo);
this.#readPosUpdate(readTo); this.#readPosUpdate(readTo);
...@@ -73,17 +75,18 @@ class ChannelReader { ...@@ -73,17 +75,18 @@ class ChannelReader {
readBuffer = new Uint8Array(firstRSize+secondRSize); readBuffer = new Uint8Array(firstRSize+secondRSize);
// First read // First read
readBuffer.set(this.#buffer.slice(this.#readPos, this.#readPos+firstRSize), 0); readBuffer.set(this.#buffer.slice(
this.#readPos, this.#readPos+firstRSize), 0);
// Second Read // Second Read
if (secondRSize > 0) { if (secondRSize > 0) {
readBuffer.set( readBuffer.set(
this.#buffer.slice(this.metaSize, this.#metaSize+secondRSize), this.#buffer.slice(this.#metaSize, this.#metaSize+secondRSize),
firstRSize); firstRSize);
this.#readPosUpdate(this.#metaSize+secondRSize); this.#readPosUpdate(this.#metaSize+secondRSize);
} else { } else {
let newPos = this.#readPos+firstReadSize; let newPos = this.#readPos+firstRSize;
newPos = newPos == this.#buffer.byte.byteLength ? this.#metaSize : newPos; newPos = newPos == this.#buffer.byteLength ? this.#metaSize : newPos;
this.#readPosUpdate(newPos); this.#readPosUpdate(newPos);
} }
} }
...@@ -101,6 +104,7 @@ class ChannelReader { ...@@ -101,6 +104,7 @@ class ChannelReader {
} }
}; };
// Init // Init
onmessage = e => { onmessage = e => {
......
import { sleep } from '../src/utils.js'; import { sleep } from '../src/utils.js';
import { H264EncWWGroup, Channel, CpySchedule } from '../src/encGroup.js'; import { H264EncWWGroup } from '../src/encGroup.js';
import { Obervable, Observable } from 'rxjs'; import { Obervable, Observable } from 'rxjs';
import { Channel } from '../src/channel.js';
const areEqual = (first, second) => const areEqual = (first, second) =>
first.length === second.length && first.length === second.length &&
...@@ -127,19 +128,20 @@ describe("Channel Spec", () => { ...@@ -127,19 +128,20 @@ describe("Channel Spec", () => {
}); });
it("Communicate with WebWorker", async () => { it("Transfer 64 MB to WebWorker", async () => {
let url = new URL('./workers/channelWW.js', import.meta.url), let url = new URL('./workers/channelWW.js', import.meta.url),
ret = true; ret = true;
let channel = new Channel(Math.pow(2,10)); let channel = new Channel(Math.pow(2,20));
let worker = new Worker(url); let worker = new Worker(url, { type: 'module' });
let sended = 0; let sended = 0;
let dataToWrite = new Uint8Array( let dataToWrite = new Uint8Array(
[...Array(getRandomInt(Math.pow(2,10))).keys()]); [...Array(getRandomInt(Math.pow(2,10))).keys()]);
let cur = 0; let cur = 0;
let size = Math.pow(2, 11); let size = Math.pow(2, 26);
let rBuffer = new Uint8Array(Math.pow(2, 12)); let rBuffer = new Uint8Array(Math.pow(2, 27));
let sBuffer = new Uint8Array(Math.pow(2, 12)); let sBuffer = new Uint8Array(Math.pow(2, 27));
worker.postMessage(channel.getShMem()); worker.postMessage(channel.getShMem());
...@@ -157,7 +159,7 @@ describe("Channel Spec", () => { ...@@ -157,7 +159,7 @@ describe("Channel Spec", () => {
ret = channel.push(dataToWrite); ret = channel.push(dataToWrite);
if (ret == false) { if (ret == false) {
await sleep(500); await sleep(100);
continue; continue;
} else { } else {
sBuffer.set(dataToWrite, sended); sBuffer.set(dataToWrite, sended);
...@@ -176,12 +178,12 @@ describe("Channel Spec", () => { ...@@ -176,12 +178,12 @@ describe("Channel Spec", () => {
r(); r();
} }
}, 10); }, 10);
}); }, 1000000);
worker.terminate(); worker.terminate();
expect(areEqual(rBuffer, sBuffer)).toBe(true); expect(areEqual(rBuffer, sBuffer)).toBe(true);
}); }, 10000);
}); });
......
...@@ -14,7 +14,6 @@ class ChannelReader { ...@@ -14,7 +14,6 @@ class ChannelReader {
#view = null; #view = null;
#shMem = null; #shMem = null;
/* Buffer should follow the specification /* Buffer should follow the specification
* of Channel in src/encGroup.js */ * of Channel in src/encGroup.js */
constructor(shMem) { constructor(shMem) {
...@@ -116,10 +115,10 @@ onmessage = async e => { ...@@ -116,10 +115,10 @@ onmessage = async e => {
while (true) { while (true) {
// Read 1K block // Read 1K block
data = chnlReader.readData(rSize); let data = chnlReader.readData(rSize);
if (data.byteLength == 0) { if (data.byteLength == 0) {
await sleep(1000); await sleep(10);
continue; continue;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment