Commit 778c87a3 authored by Linshizhi's avatar Linshizhi

update

parent fb0b05d0
import { Observable } from 'rxjs'; import { Observable, filter } from 'rxjs';
import { COMMANDS } from './WWOpts.js'; import { COMMANDS } from './WWOpts.js';
export class WW extends Observable { export class WW extends Observable {
...@@ -72,24 +72,24 @@ export class WW extends Observable { ...@@ -72,24 +72,24 @@ export class WW extends Observable {
* 2.No datas from targetWW will be streamed to current WW. * 2.No datas from targetWW will be streamed to current WW.
* 3.If a WW is be connected already then the connection * 3.If a WW is be connected already then the connection
* between them is remain effective. */ * between them is remain effective. */
connect(targetWW, install, predict) { async connect(targetWW, preconnect, msgPredicate) {
if (targetWW.ident in this.#connected) { if (targetWW.ident in this.#connected) {
// Already connected no effect // Already connected no effect
return; return;
} }
install(targetWW); await preconnect(targetWW);
let sub = this.pipe(filter(data => predict(data))) let sub = this.pipe(filter(data => msgPredicate(data)))
.subscribe(data => { targetWW.postMessage(data); }); .subscribe(data => { targetWW.postMessage(data); });
this.#connected[targetWW.ident] = sub; this.#connected[targetWW.ident] = sub;
} }
disconnect(targetWW, uninstall) { async disconnect(targetWW, uninstall) {
uninstall(targetWW); await uninstall(targetWW);
if (targetWW.ident in this.#connected) { if (targetWW.ident in this.#connected) {
this.#connected[targetWW.ident].unsubscribe(); this.#connected[targetWW.ident].unsubscribe();
......
...@@ -3,12 +3,29 @@ import { assert } from './utils.js'; ...@@ -3,12 +3,29 @@ import { assert } from './utils.js';
import { WWGroup } from './WWGroup.js'; import { WWGroup } from './WWGroup.js';
import { WW } from './WW.js'; import { WW } from './WW.js';
///////////////////////////////////////////////////////////////////////////////
// Message Definitions //
///////////////////////////////////////////////////////////////////////////////
const MESSAGE_TYPE = Object.freeze({
/* Once Muxer receive a 'CONNECT_PREPARE' message
* It will use information along with the message to
* change it's status to get ready to be connected by
* Encoder. */
CONNECT_PREPARE : 0,
/* Similar to 'CONNECT_PREPARE' */
DISCONNECT_PREPARE : 1,
DATA : 2,
});
function makeMessage(msgType, datas) {
return {type:msgType, data: datas};
}
/* An abstraction that help to /* An abstraction that help to
* easily to transfer datas to target worker * easily to transfer datas to target worker
* in real time manner */ * in real time manner */
class Channel { export class Channel {
/* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Data Area (1-n bytes) */ /* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Data Area (1-n bytes) */
...@@ -22,65 +39,186 @@ class Channel { ...@@ -22,65 +39,186 @@ class Channel {
#fieldSize = 0; #fieldSize = 0;
#metaSize = 0; #metaSize = 0;
#size = 0;
#totalSize = 0;
#WW = undefined; #WW = undefined;
#size = 0;
#shMem = undefined; #shMem = undefined;
#view = undefined; #view = undefined;
#buffer = undefined;
#writePos = 0; #writePos = 0;
#endPos = 0;
// size's unit is byte // size's unit is byte
constructor(WW, size) { // bufferType parameter is mainly for testability.
constructor(WW, size, bufferType = SharedArrayBuffer) {
this.#WW = WW; this.#WW = WW;
this.#size = size; this.#size = size;
// 4 Bytes // 4 Bytes
this.#fieldSize = 4; this.#fieldSize = 4;
this.#metaSize = this.#numOfMetaField * this.#fieldSize; this.#metaSize = this.#numOfMetaField * this.#fieldSize;
this.#shMem = new SharedArrayBuffer(size + this.#metaSize); this.#shMem = new bufferType(size + this.#metaSize);
this.#view = DataView(this.#shMem); this.#view = new DataView(this.#shMem);
this.#buffer = new Uint8Array(this.#shMem);
this.#writePos = this.#metaSize; this.#writePos = this.#metaSize;
this.#size = size; this.#size = size;
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
this.#view.setUint32(0, this.#metaSize);
this.#view.setUint32(4, this.#metaSize);
} }
#readPos() { #readPos() {
return this.#view.getUint32(0); return this.#view.getUint32(0);
} }
/* Semantic: Is able to write 'size' of datas
* into #shMem. */
#isAbleToWrite(size) { #isAbleToWrite(size) {
if (this.#readPos() == writePos) { return this.#remain() >= size;
/* Empty */
return true;
} else {
return this.#remain() > size;
}
} }
#remain() { #remain() {
let readPos = this.#readPos(); let readPos = this.#readPos();
if (this.#writePos == readPos) { if (this.#writePos == readPos) {
return this.#size; return this.#size - 1;
} else if (this.#writePos > readPos) { } else if (this.#writePos > readPos) {
return size - (this.#writePos - readPos) - 1; return this.#size - (this.#writePos - readPos) - 1;
} else { } else {
return readPos - this.#writePos - 1; return readPos - this.#writePos - 1;
} }
} }
push(datas /* Uint8Array */) { /* Channel use an array buffer as
if (!this.#isAbleToWrite(datas.byteLength)) { * a circular buffer, so some datas
* may unable to be done in one copy.
* Two times of copy is required to handle
* such kind of situation.
*
* Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/
#cpySchedule(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule;
if (this.#writePos >= this.#readPos()) {
spaceToTail = this.#endPos - this.#writePos;
firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
schedule = {
first : { pos: this.#writePos, size: firstCpySize },
second: { pos: secondCpySize > 0 ? this.#metaSize : 0, size: secondCpySize }
};
} else {
schedule = { first : { pos: this.#writePos, size: firstCpySize },
second: { pos: 0, size: 0 } };
}
return schedule;
}
dataView() {
return this.#buffer;
}
isEmpty() {
return this.#writePos == this.#readPos();
}
// This method is for testing purposes.
readData(size) {
let readPos = this.#readPos(), readSize = 0,
firstReadSize = 0, secondReadSize = 0;
// Empty
if (this.#writePos == readPos) {
return new Uint8Array(0);
} else if (this.#writePos > readPos) {
readSize = Math.min(size, this.#writePos - readPos);
this.#readPosUpdate(readPos+readSize);
return this.#buffer.slice(readPos, readPos+readSize);
} else {
// Read two times
firstReadSize = Math.min(size, this.#totalSize-readPos);
secondReadSize = firstReadSize < size ? size - firstReadSize : 0;
secondReadSize = Math.min(secondReadSize, this.#writePos-this.#metaSize);
let readBuffer = new Uint8Array(firstReadSize+secondReadSize);
// First read
readBuffer.set(this.#buffer.slice(readPos, readPos+firstReadSize), 0);
if (secondReadSize > 0) {
readBuffer.set(this.#buffer.slice(this.#metaSize, this.#metaSize+secondReadSize), firstReadSize);
this.#readPosUpdate(this.#metaSize+secondReadSize);
} else {
let newPos = readPos+firstReadSize;
newPos = newPos == this.#totalSize ? this.#metaSize : newPos;
this.#readPosUpdate(newPos);
}
return readBuffer;
}
}
push(data /* Uint8Array */) {
let writePos = this.#writePos;
if (!this.#isAbleToWrite(data.byteLength)) {
return false; return false;
} }
/* Write to shared Memory */ /* Write to shared Memory */
// Get a copy schedule, more details please read
// comment of #cpySchedule.
let schedule = this.#cpySchedule(data.byteLength);
// Perfrom write schedule
let srcPos = 0;
for (let key in schedule) {
let plan = schedule[key];
if (plan.size == 0)
continue;
let d_ = data.slice(srcPos, srcPos+plan.size);
this.#buffer.set(d_, plan.pos)
srcPos = srcPos + plan.size;
writePos = plan.pos+plan.size;
}
// Caution: 'Write Pointer' must be updated after
// all datas are writed but not before or
// at intermediate of some writes otherwise
// oppsite side may read invalid datas.
this.#writePosUpdate(writePos);
return true;
}
#writePosUpdate(pos) {
this.#writePos = pos;
this.#view.setUint32(4, pos);
}
#readPosUpdate(pos) {
this.#view.setUint32(0, pos);
} }
} }
async function connectPrepare(WW) {
}
/* WWGroup which target is to encode RGB frames into /* WWGroup which target is to encode RGB frames into
* MP4 video */ * MP4 video */
export class H264EncWWGroup extends WWGroup { export class H264EncWWGroup extends WWGroup {
...@@ -156,10 +294,10 @@ export class H264EncWWGroup extends WWGroup { ...@@ -156,10 +294,10 @@ export class H264EncWWGroup extends WWGroup {
for (let i = 0; i < this.#numOfEncWorker; ++i) { for (let i = 0; i < this.#numOfEncWorker; ++i) {
this.#encWorkers[i].connect( this.#encWorkers[i].connect(
this.#muxWorker, this.#muxWorker,
// Install connectPrepare,
w => { }, async msg => {
// Data filter return msg.type == MESSAGE_TYPE.DATA
data => { } ); });
} }
} }
...@@ -170,4 +308,5 @@ export class H264EncWWGroup extends WWGroup { ...@@ -170,4 +308,5 @@ export class H264EncWWGroup extends WWGroup {
dispatch(rgbFrame) { dispatch(rgbFrame) {
} }
} }
...@@ -52,7 +52,9 @@ describe("WW Spec", () => { ...@@ -52,7 +52,9 @@ describe("WW Spec", () => {
let src = new WW('SRC', srcUrl); let src = new WW('SRC', srcUrl);
let dst = new WW('DST', dstUrl); let dst = new WW('DST', dstUrl);
src.connect(dst, ()=>{}, (data)=>{ typeof(data) == 'number'; }); src.connect(dst, async ()=>{}, (data)=>{
return typeof(data) == 'number';
});
let digit = 0; let digit = 0;
...@@ -64,6 +66,8 @@ describe("WW Spec", () => { ...@@ -64,6 +66,8 @@ describe("WW Spec", () => {
resolve(); resolve();
} }
src.postMessage(digit); src.postMessage(digit);
// Should be filtered out
src.postMessage('abc');
}); });
src.postMessage(digit); src.postMessage(digit);
...@@ -73,7 +77,7 @@ describe("WW Spec", () => { ...@@ -73,7 +77,7 @@ describe("WW Spec", () => {
// Assume that disconnect is // Assume that disconnect is
// correct. // correct.
src.disconnect(dst); src.disconnect(dst, async ()=>{});
}); });
}); });
import { H264EncWWGroup } from '../src/encGroup.js'; import { WW } from '../src/WW.js';
import { H264EncWWGroup, Channel } from '../src/encGroup.js';
const areEqual = (first, second) =>
first.length === second.length &&
first.every((value, index) => value === second[index]);
function getRandomInt(max) {
return Math.floor(Math.random() * max);
}
describe("Channel Spec", () => {
it("Write datas bigger than channel", () => {
let channel = new Channel(WW, 10, ArrayBuffer);
// A buffer with length 11
let buffer = new Uint8Array([1,2,3,4,5,6,7,8,9,0,1]);
let ret = channel.push(buffer);
expect(ret).toBe(false);
});
it("Read Empty channel", () => {
let channel = new Channel(WW, 10, ArrayBuffer);
let buffer = channel.readData(5);
expect(buffer.byteLength == 0).toBe(true);
});
it("Write Until Full", () => {
let channel = new Channel(WW, 16, ArrayBuffer);
});
it("Write datas", () => {
let channel = new Channel(WW, 10, ArrayBuffer);
let buffer = new Uint8Array([1,2,3,4,5]);
let ret = channel.push(buffer);
let data = channel.readData(5);
expect(areEqual(buffer, data)).toBe(true);
expect(ret).toBe(true);
});
it("Write More Datas", () => {
let channel = new Channel(WW, 10, ArrayBuffer), ret = true,
data = undefined;
// Let read pointer move forward 5 bytes
let buffer = new Uint8Array([0,1,2,3,4]);
ret = channel.push(buffer);
expect(ret).toBe(true);
data = channel.readData(5);
expect(areEqual(data, buffer)).toBe(true);
// Then write 7 bytes
buffer = new Uint8Array([0,1,2,3,4,5,6]);
ret = channel.push(buffer);
expect(ret).toBe(true);
data = channel.readData(7);
expect(areEqual(buffer, data)).toBe(true);
});
it("Write Until Full", () => {
let channel = new Channel(WW, 1024, ArrayBuffer);
let buffer = new Uint8Array([...Array(1023).keys()]);
let ret = channel.push(buffer);
expect(ret).toBe(true);
let readData = channel.readData(1023);
expect(areEqual(buffer, readData))
.withContext("Data Mismatch")
.toBe(true);
expect(channel.isEmpty()).toBe(true);
})
it("32 KB Datas Write and Read", () => {
let ret = false;
let remainToWrite = Math.pow(2, 15), readData = null;
let channel = new Channel(WW, Math.pow(2, 10), ArrayBuffer);
while (remainToWrite > 0) {
let dataToWrite = new Uint8Array([...Array(getRandomInt(Math.pow(2,10)))]);
ret = channel.push(dataToWrite);
expect(ret).toBe(true);
readData = channel.readData(dataToWrite.byteLength);
ret = areEqual(dataToWrite, readData);
expect(ret).withContext("Data Mismatch").toBe(true);
remainToWrite -= readData.byteLength;
}
expect(channel.isEmpty()).toBe(true);
})
});
describe("H264EncWWGroup Spec", () => { describe("H264EncWWGroup Spec", () => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment