Skip to content

Commit

Permalink
Merge pull request Hubs-Foundation#5904 from mozilla/feature/message-…
Browse files Browse the repository at this point in the history
…store

Entity state persistence
  • Loading branch information
johnshaughnessy authored Feb 7, 2023
2 parents acfc970 + 7df9cf2 commit 0bac051
Show file tree
Hide file tree
Showing 23 changed files with 525 additions and 267 deletions.
1 change: 1 addition & 0 deletions src/bit-components.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Networked.creator[$isStringType] = true;
Networked.owner[$isStringType] = true;

export const Owned = defineComponent();
export const EntityStateDirty = defineComponent();
export const NetworkedMediaFrame = defineComponent({
capturedNid: Types.ui32,
scale: [Types.f32, 3]
Expand Down
6 changes: 5 additions & 1 deletion src/bit-systems/delete-entity-system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import { animate } from "../utils/animate";
import { findAncestorEntity } from "../utils/bit-utils";
import { coroutine } from "../utils/coroutine";
import { easeOutQuadratic } from "../utils/easing";
import { deleteEntityState, hasSavedEntityState } from "../utils/entity-state-utils";

// TODO Move to coroutine.ts when it exists
// TODO Figure out the appropriate type and use it everywhere
export type Coroutine = Generator<Promise<void>, void, unknown>;

const END_SCALE = new Vector3().setScalar(0.001);
function* animateThenRemoveEntity(world: HubsWorld, eid: number): Coroutine {
if (hasSavedEntityState(world, eid)) {
deleteEntityState(APP.hubChannel!, world, eid);
}
const obj = world.eid2obj.get(eid)!;
yield* animate({
properties: [[obj.scale.clone(), END_SCALE]],
Expand All @@ -33,7 +37,7 @@ const hoveredRightQuery = defineQuery([HoveredRemoteRight]);
const hoveredLeftQuery = defineQuery([HoveredRemoteLeft]);
const coroutines = new Map();

function deleteTheDeletableAncestor(world: HubsWorld, eid: number) {
export function deleteTheDeletableAncestor(world: HubsWorld, eid: number) {
const ancestor = findAncestorEntity(world, eid, (e: number) => hasComponent(world, Deletable, e));
if (ancestor && !coroutines.has(ancestor)) {
coroutines.set(ancestor, coroutine(animateThenRemoveEntity(world, ancestor)));
Expand Down
62 changes: 62 additions & 0 deletions src/bit-systems/entity-persistence-system.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { addComponent, defineQuery, entityExists, exitQuery, hasComponent, removeComponent } from "bitecs";
import { HubsWorld } from "../app";
import { Constraint, EntityStateDirty, Owned } from "../bit-components";
import { coroutine, crNextFrame } from "../utils/coroutine";
import { hasSavedEntityState, updateEntityState } from "../utils/entity-state-utils";
import HubChannel from "../utils/hub-channel";
import { EntityID } from "../utils/networking-types";
import { localClientID } from "./networking";

const timers = new Map<EntityID, number>();

// Throttle calls to saveEntityState so that rapid changes
// (e.g. scrubbing a video playhead) do not flood reticulum.
function* saveEntityStateJob(hubChannel: HubChannel, world: HubsWorld, eid: EntityID, maxDelay: number) {
while (world.time.elapsed < timers.get(eid)! && world.time.elapsed < maxDelay) {
yield crNextFrame();
}

// Don't save entity state if this entity is no longer persistent
if (!hasSavedEntityState(world, eid)) return;

updateEntityState(hubChannel, world, eid);
}

// TODO type for coroutine
type Coroutine = () => IteratorResult<undefined, any>;
const jobs = new Map<EntityID, Coroutine>();
const saveDelayMS = 500;
const maxSaveDelayMS = 3000;

const constraintExitQuery = exitQuery(defineQuery([Constraint, Owned]));
const ownedExitQuery = exitQuery(defineQuery([Owned]));
const entityStateDirtyQuery = defineQuery([EntityStateDirty]);
export function entityPersistenceSystem(world: HubsWorld, hubChannel: HubChannel) {
if (!localClientID) return; // Not connected yet

constraintExitQuery(world).forEach(function (eid) {
if (entityExists(world, eid) && hasComponent(world, Owned, eid) && !hasComponent(world, Constraint, eid)) {
addComponent(world, EntityStateDirty, eid);
}
});

// TODO Is it necessary to duplicate this array (since we are calling removeComponent within)?
Array.from(entityStateDirtyQuery(world)).forEach(function (eid) {
if (hasSavedEntityState(world, eid)) {
timers.set(eid, world.time.elapsed + saveDelayMS);
if (!jobs.has(eid)) {
jobs.set(eid, coroutine(saveEntityStateJob(hubChannel, world, eid, world.time.elapsed + maxSaveDelayMS)));
}
}
removeComponent(world, EntityStateDirty, eid);
});

// Don't bother saving state if we lose ownership
ownedExitQuery(world).forEach(function (eid) {
jobs.delete(eid);
});

jobs.forEach((job, eid) => {
if (job().done) jobs.delete(eid);
});
}
106 changes: 70 additions & 36 deletions src/bit-systems/network-receive-system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ import { addComponent, defineQuery, enterQuery, hasComponent, removeComponent, r
import { HubsWorld } from "../app";
import { Networked, Owned } from "../bit-components";
import { renderAsNetworkedEntity } from "../utils/create-networked-entity";
import { deleteEntityState, hasSavedEntityState } from "../utils/entity-state-utils";
import { networkableComponents, schemas, StoredComponent } from "../utils/network-schemas";
import type { ClientID, CursorBufferUpdateMessage, EntityID, StringID, UpdateMessage } from "../utils/networking-types";
import { hasPermissionToSpawn } from "../utils/permissions";
import { tryUnpin } from "../utils/store-networked-state";
import { takeSoftOwnership } from "../utils/take-soft-ownership";
import {
connectedClientIds,
createMessageDatas,
isPinned,
disconnectedClientIds,
isNetworkInstantiated,
localClientID,
networkedQuery,
pendingCreatorChanges,
pendingMessages,
pendingParts,
softRemovedEntities
} from "./networking";

function isCursorBufferUpdateMessage(update: any): update is CursorBufferUpdateMessage {
export function isCursorBufferUpdateMessage(update: any): update is CursorBufferUpdateMessage {
return !!update.hasOwnProperty("componentIds");
}

Expand All @@ -35,7 +38,6 @@ function isOutdatedMessage(eid: EntityID, updateMessage: UpdateMessage) {
return updateMessage.owner !== breakTie(APP.getString(Networked.owner[eid])!, updateMessage.owner);
}

const partedClientIds = new Set<StringID>();
export const storedUpdates = new Map<StringID, UpdateMessage[]>();
const enteredNetworkedQuery = enterQuery(defineQuery([Networked]));

Expand All @@ -46,8 +48,6 @@ export function networkReceiveSystem(world: HubsWorld) {
// When a user leaves, remove the entities created by that user
const networkedEntities = networkedQuery(world);
pendingParts.forEach(partingClientId => {
partedClientIds.add(partingClientId);

networkedEntities
.filter(eid => Networked.creator[eid] === partingClientId)
.forEach(eid => {
Expand All @@ -60,27 +60,30 @@ export function networkReceiveSystem(world: HubsWorld) {
});
}

// Handle delete entity messages
for (let i = 0; i < pendingMessages.length; i++) {
const message = pendingMessages[i];

for (let j = 0; j < message.deletes.length; j += 1) {
const nid = APP.getSid(message.deletes[j]);
if (world.deletedNids.has(nid)) continue;
world.deletedNids.add(nid);
if (world.deletedNids.has(nid)) continue; // Already done

const eid = world.nid2eid.get(nid);
if (eid && !isNetworkInstantiated(eid)) {
console.warn("Received delete message for a non network-instantiated entity. Ignoring it.");
continue;
}

world.deletedNids.add(nid);

if (eid) {
if (isPinned(eid)) {
// We only expect this to happen if the client who sent the delete
// didn't know it was pinned yet.
console.warn("Told to delete a pinned entity. Unpinning it...");
tryUnpin(world, eid, APP.hubChannel!);
if (hasSavedEntityState(world, eid)) {
console.warn("Received delete message for a persistent entity. Deleting its entity state...");
deleteEntityState(APP.hubChannel!, world, eid);
}

createMessageDatas.delete(eid);
world.nid2eid.delete(nid);
removeEntity(world, eid);
console.log("Deleting ", APP.getString(nid));
}

// TODO: Clear out any stored messages for this entity's children.
Expand All @@ -91,56 +94,90 @@ export function networkReceiveSystem(world: HubsWorld) {
}
}

// Handle create entity messages
for (let i = 0; i < pendingMessages.length; i++) {
const message = pendingMessages[i];

for (let j = 0; j < message.creates.length; j++) {
const [nidString, prefabName, initialData] = message.creates[j];
const { version, networkId: nidString, prefabName, initialData } = message.creates[j];
if (version !== 1) {
console.warn(`Received create message with unsupported version (${version}).`);
continue;
}

const creator = message.fromClientId;
if (!creator) {
// We do not expect to get here.
// We only check because we are synthesizing messages elsewhere;
// They should not have any create messages in them.
throw new Error("Received create message without a fromClientId.");
}

const nid = APP.getSid(nidString);

if (world.deletedNids.has(nid)) {
console.warn(`Received a create message for an entity I've already deleted. Skipping ${nidString}`);
// TODO : Rebroadcast a delete for this nid, because a client must not have known about it.
// This can happen in the unlikely case that the client who created this object disconnected as someone else deleted it.
// The creator will send another create message when it reconnects.
} else if (world.nid2eid.has(nid)) {
console.warn(`Received create message for entity I already created. Skipping ${nidString}.`);
} else if (world.ignoredNids.has(nid)) {
console.warn(`Received a create message for an entity I've already deleted. Skipping ${nidString}`);
continue;
}

if (world.nid2eid.has(nid)) {
// We expect this case to happen often, because saveEntityState
// will rebroadcast create message for saved entities.
// console.log(`Received create message for entity I already created. Skipping ${nidString}.`);
continue;
}

if (world.ignoredNids.has(nid)) {
console.warn(`Received create message for nid I ignored. Skipping ${nidString}.`);
} else if (!hasPermissionToSpawn(creator, prefabName)) {
continue;
}

if (!hasPermissionToSpawn(creator, prefabName)) {
// This should only happen if there is a bug or the sender is maliciously modified.
console.warn(
`Received create from a user who does not have permission to spawn ${prefabName}. Skipping ${nidString}.`
);
world.ignoredNids.add(nid);
} else {
const eid = renderAsNetworkedEntity(world, prefabName, initialData, nidString, creator);
console.log(`Received create message for ${nidString}. (eid: ${eid})`);
continue;
}

renderAsNetworkedEntity(world, prefabName, initialData, nidString, creator);
}
}

{
// If reticulum told us to reassign an entity's creator, do so now
pendingCreatorChanges.forEach(({ nid, creator }) => {
const eid = world.nid2eid.get(APP.getSid(nid));
if (!eid) return; // Nothing to do.

const creatorSid = APP.getSid(creator);
if (creator !== "reticulum" && !connectedClientIds.has(creatorSid)) {
// If we do not recognize the clientId, then the entity is no longer valid.
// This can happen if a client unpins something just before they disconnect.
removeEntity(world, eid);
softRemovedEntities.add(eid);
return;
}

Networked.creator[eid] = APP.getSid(creator);
});
pendingCreatorChanges.length = 0;
}

// If we stored updates for newly created entities, queue them for processing
enteredNetworkedQuery(world).forEach(eid => {
const nid = Networked.id[eid];
if (storedUpdates.has(nid)) {
console.log("Had stored updates for", APP.getString(nid), storedUpdates.get(nid));
const updates = storedUpdates.get(nid)!;

for (let i = 0; i < updates.length; i++) {
const update = updates[i];
if (partedClientIds.has(APP.getSid(update.owner))) {
if (disconnectedClientIds.has(APP.getSid(update.owner))) {
// We missed the frame when we would have taken soft ownership from this owner,
// so modify the message to act as though we had done so.
console.log("Rewriting update message from client who left.", JSON.stringify(update));
// console.log("Rewriting update message from client who left.", JSON.stringify(update));
update.owner = NAF.clientId;
update.lastOwnerTime = update.timestamp + 1;
}
Expand All @@ -160,23 +197,23 @@ export function networkReceiveSystem(world: HubsWorld) {
const nid = APP.getSid(updateMessage.nid);

if (world.ignoredNids.has(nid)) {
console.log(`Ignoring update for ignored entity ${updateMessage.nid}`);
// console.log(`Ignoring update for ignored entity ${updateMessage.nid}`);
continue;
}

if (world.deletedNids.has(nid)) {
console.log(`Ignoring update for deleted entity ${updateMessage.nid}`);
// console.log(`Ignoring update for deleted entity ${updateMessage.nid}`);
continue;
}

if (!world.nid2eid.has(nid)) {
console.log(`Holding onto an update for ${updateMessage.nid} because we don't have it yet.`);
// console.log(`Holding onto an update for ${updateMessage.nid} because we don't have it yet.`);
// TODO What if we will NEVER be able to apply this update?
// Can we use connectedClientIds / disconnectedClientIds / and the nid prefix to figure this out?
// TODO It would be nice if we could squash these updates
const updates = storedUpdates.get(nid) || [];
updates.push(updateMessage);
storedUpdates.set(nid, updates);
console.log(storedUpdates);
continue;
}

Expand All @@ -187,14 +224,11 @@ export function networkReceiveSystem(world: HubsWorld) {
}

if (updateMessage.owner === NAF.clientId) {
console.log("Got a message telling us we are the owner.");
addComponent(world, Owned, eid);
} else if (hasComponent(world, Owned, eid)) {
console.log("Lost ownership: ", updateMessage.nid);
removeComponent(world, Owned, eid);
}

Networked.creator[eid] = APP.getSid(updateMessage.creator);
Networked.owner[eid] = APP.getSid(updateMessage.owner);
Networked.lastOwnerTime[eid] = updateMessage.lastOwnerTime;
Networked.timestamp[eid] = updateMessage.timestamp;
Expand Down
23 changes: 6 additions & 17 deletions src/bit-systems/network-send-system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { HubsWorld } from "../app";
import { Networked, Owned } from "../bit-components";
import { getServerTime } from "../phoenix-adapter";
import { messageFor } from "../utils/message-for";
import type { Message } from "../utils/networking-types";
import {
createMessageDatas,
isCreatedByMe,
Expand All @@ -23,8 +22,6 @@ const enteredNetworkedQuery = enterQuery(networkedQuery);
const enteredOwnedNetworkedQuery = enterQuery(ownedNetworkedQuery);
const exitedNetworkedQuery = exitQuery(networkedQuery);

export const unpinMessages: Message[] = [];

export function networkSendSystem(world: HubsWorld) {
if (!localClientID) return; // Not connected yet

Expand All @@ -48,7 +45,7 @@ export function networkSendSystem(world: HubsWorld) {
});
}

// Tell joining users about entities I network instantiated, and full updates for entities I own
// Send newly joined clients creates for entities where I am the creator, and full updates for entities I own
{
if (pendingJoins.length) {
const ownedNetworkedEntities = ownedNetworkedQuery(world);
Expand All @@ -60,23 +57,15 @@ export function networkSendSystem(world: HubsWorld) {
[],
false
);
if (message) {
pendingJoins.forEach(clientId => NAF.connection.sendDataGuaranteed(APP.getString(clientId)!, "nn", message));
}
pendingJoins.forEach(clientId => {
if (message) {
NAF.connection.sendDataGuaranteed(APP.getString(clientId)!, "nn", message);
}
});
pendingJoins.length = 0;
}
}

// Tell everyone about entities I unpin
// TODO: Make reticulum broadcast the actual unpin message, like it does for pin messages.
{
for (let i = 0; i < unpinMessages.length; i++) {
const message = unpinMessages[i];
NAF.connection.broadcastDataGuaranteed("nn", message);
}
unpinMessages.length = 0;
}

// Tell everyone about entities I created, entities I own, and entities that I deleted
{
const removedEntities = exitedNetworkedQuery(world).filter(isNetworkInstantiated);
Expand Down
Loading

0 comments on commit 0bac051

Please sign in to comment.