Commit 58af7a0b authored by NzSN's avatar NzSN

Add Block based channel.

parent a939bbfb
import { assert } from './utils.js';
import { assert, NEED_TO_IMPLEMENT } from './utils.js';
class CpySchedule {
first = { pos : 0, size: 0 };
......@@ -44,9 +44,10 @@ export class Channel {
* 2.Read Position
*
* Figure:
* --------------------------------------------------------------------------------------------
* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Private (4 Bytes) | Data Area (N bytes) |
* --------------------------------------------------------------------------------------------
* ----------------------------------------------------------------------------------------------------------------
* | ReadPointer (4 bytes) | WritePointer (4 Bytes) | Private (4 Bytes) | WBlocks (2 Bytes) | RBlocks (2 Bytes) | Data Area (N bytes) |
* ----------------------------------------------------------------------------------------------------------------
*
* where N >= 2
*
* */
......@@ -74,6 +75,8 @@ export class Channel {
#writePhase = WRITE_PHASE.NO_STEP_WRITE;
#writePhaseEndPos = 0;
#scheduler = undefined;
// size's unit is byte
// bufferType parameter is mainly for testability.
constructor(size, bufferType = SharedArrayBuffer, shMem = null) {
......@@ -100,6 +103,7 @@ export class Channel {
this.#totalSize = this.#metaSize + this.#size;
this.#endPos = this.#metaSize + this.#size;
this.#scheduler = new CopyStrategyInByte(this.#size, this.#metaSize);
}
#getReadPointer() {
......@@ -182,6 +186,19 @@ export class Channel {
}
#remain() {
switch (this.#mode) {
case CHANNEL_MODE.BYTE:
return this.#remainBytes();
case CHANNEL_MODE.BLOCK:
return this.#remainBlocks();
}
}
#remainBlocks() {
}
#remainBytes() {
this.#readPointerCache = this.#getReadPointer();
if (this.#writePointerCache == this.#readPointerCache) {
......@@ -202,79 +219,8 @@ export class Channel {
* Caution: Before making a copy schedule you must
* make sure there has enough spaces.*/
#cpySchedule(size) {
switch (this.#mode) {
case CHANNEL_MODE.BYTE:
return this.#cpyScheduleBytes(size);
case CHANNEL_MODE.BLOCK:
return this.#cpyScheduleBlocks(size);
}
}
#cpyScheduleBlocks(size) {
let retSchedule = new CpySchedule();
let blocksRequired = Math.floor(size / this.#blockSize);
let endPos = this.#endPos - 1;
if (blocksRequired == 0) {
// Schedules with all zero size
return retSchedule;
}
if (this.#writePointerCache > this.#readPointerCache) {
/* Make sure block has consecutive bytes */
let spaceBetweenEnd = endPos - this.#writePointerCache;
let numOfBlksFirstAblePlace = Math.floor(spaceBetweenEnd / this.#blockSize);
if (numOfBlksFirstAblePlace == 0 || (spaceBetweenEnd % this.#blockSize) > 0) {
/* Must not happen */
return retSchedule();
}
/* Do first copy schedule */
retSchedule.first.pos = this.#writePointerCache;
if (numOfBlksFirstAblePlace >= blocksRequired) {
retSchedule.first.size = blocksRequired * this.#blockSize;
return retSchedule;
} else {
retSchedule.first.size = numOfBlksFirstAblePlace * this.#blockSize;
blocksRequired -= numOfBlksFirstAblePlace;
}
/* Second one is required */
retSchedule.second.pos = this.#metaSize;
let spaceBetweenRWPtr = this.#readPointerCache - this.#writePointerCache;
/* No enough space */
if (Math.floor(spaceBetweenRWPtr/this.#blockSize) < blocksRequired) {
retSchedule.reset();
return retSchedule;
}
retSchedule.second.size = blocksRequired * this.#blockSize;
return retSchedule;
}
#cpyScheduleBytes(size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule();
let readPos = this.#getReadPointer();
if (this.#writePointerCache >= readPos) {
spaceToTail = this.#endPos - this.#writePointerCache;
firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
secondCpySize = Math.min(secondCpySize, readPos - this.#metaSize);
schedule.first.pos = this.#writePointerCache;
schedule.first.size = firstCpySize;
schedule.second.pos = secondCpySize > 0 ? this.#metaSize : 0;
schedule.second.size = secondCpySize;
} else {
schedule.first.pos = this.#writePointerCache;
schedule.first.size = Math.min(readPos - this.#writePointerCache - 1, size);
}
return schedule;
return this.#scheduler.schedule(
this.#readPointerCache, this.#writePointerCache , size);
}
dataView() {
......@@ -430,3 +376,113 @@ export class Channel {
this.#view.setUint32(0, pos);
}
}
///////////////////////////////////////////////////////////////////////////////
// Copy Strategies //
///////////////////////////////////////////////////////////////////////////////
class CopyStrategy {
schedule(rPos, wPos) {
return NEED_TO_IMPLEMENT;
}
};
class CopyStrategyInByte extends CopyStrategy {
#size = 0;
#shift = 0;
#endPos = 0;
constructor(size, shift) {
super();
this.#size = size;
this.#shift = shift;
this.#endPos = this.#shift + this.#size;
}
schedule(rPos, wPos, size) {
let firstCpySize = 0, secondCpySize = 0, spaceToTail = 0;
let schedule = new CpySchedule();
if (wPos >= rPos) {
spaceToTail = this.#endPos - wPos;
firstCpySize = Math.min(size, spaceToTail);
secondCpySize = firstCpySize < size ? size - firstCpySize : 0;
secondCpySize = Math.min(secondCpySize, rPos - this.#shift);
schedule.first.pos = wPos;
schedule.first.size = firstCpySize;
schedule.second.pos = secondCpySize > 0 ? this.#shift : 0;
schedule.second.size = secondCpySize;
} else {
schedule.first.pos = wPos;
schedule.first.size = Math.min(rPos - wPos - 1, size);
}
return schedule;
}
};
class CopyStrategyInBlock extends CopyStrategy {
#size = 0;
#shift = 0;
#blockSize = 0;
#endPos = 0;
constructor(size, shift, blockSize) {
super();
this.#size = size;
this.#shift = shift;
this.#endPos = this.#shift + this.#size;
this.#blockSize = blockSize;
}
schedule(rPos, wPos, size) {
let retSchedule = new CpySchedule();
let blocksRequired = Math.floor(size / this.#blockSize);
let endPos = this.#endPos - 1;
if (blocksRequired == 0) {
// Schedules with all zero size
return retSchedule;
}
if (wPos > rPos) {
/* Make sure block has consecutive bytes */
let spaceBetweenEnd = endPos - wPos;
let numOfBlksFirstAblePlace = Math.floor(spaceBetweenEnd / this.#blockSize);
if (numOfBlksFirstAblePlace == 0 || (spaceBetweenEnd % this.#blockSize) > 0) {
/* Must not happen */
return retSchedule();
}
/* Do first copy schedule */
retSchedule.first.pos = wPos;
if (numOfBlksFirstAblePlace >= blocksRequired) {
retSchedule.first.size = blocksRequired * this.#blockSize;
return retSchedule;
} else {
retSchedule.first.size = numOfBlksFirstAblePlace * this.#blockSize;
blocksRequired -= numOfBlksFirstAblePlace;
}
/* Second one is required */
retSchedule.second.pos = this.#shift;
let spaceBetweenRWPtr = rPos - wPos;
/* No enough space */
if (Math.floor(spaceBetweenRWPtr/this.#blockSize) < blocksRequired) {
retSchedule.reset();
return retSchedule;
}
retSchedule.second.size = blocksRequired * this.#blockSize;
}
return retSchedule;
}
}
......@@ -183,8 +183,9 @@ describe("Channel Spec", () => {
arrays = channel.writeStep1(unitSize);
console.log(arrays);
expect(arrays.length).toBe(1);
arrays[0].set(wData, 0);
expect(arrays.length).toBe(2);
arrays[0].set([0], 0);
arrays[1].set([1,2,3,4,5,6,7,8,9], 0);
channel.writeStep2();
datas = channel.readData(unitSize);
......
......@@ -4,17 +4,17 @@ import { sleep, waitCond } from "../src/utils.js";
let paraEnc;
const RGBAFrameSize = 1920*1080*4;
beforeEach(async () => {
let trNum = navigator.hardwareConcurrency;
paraEnc = new ParaEncoder(trNum, {
codec: "H264",
config: {
encchnlsize: RGBAFrameSize * 10,
bridgechnlsize: Math.pow(2, 25),
}
});
await paraEnc.init();
}, 20000);
// beforeEach(async () => {
// let trNum = navigator.hardwareConcurrency;
// paraEnc = new ParaEncoder(trNum, {
// codec: "H264",
// config: {
// encchnlsize: RGBAFrameSize * 10,
// bridgechnlsize: Math.pow(2, 25),
// }
// });
// await paraEnc.init();
// }, 20000);
describe("ParaEncoder", () => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment