Skip to content

Commit

Permalink
add support for continuous patching (#334)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Lehner <[email protected]>
  • Loading branch information
loreanvictor and matthiaslehnertum authored Dec 22, 2023
1 parent 62a3a8f commit 0f00b6b
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 23 deletions.
34 changes: 32 additions & 2 deletions src/main/apollon-editor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,44 @@ export class ApollonEditor {

/**
* Register callback which is executed when the model changes, receiving the changes to the model
* in [JSONPatch](http://jsonpatch.com/) format.
* in [JSONPatch](http://jsonpatch.com/) format. This callback is only executed for discrete changes to the model.
* Discrete changes are changes that should not be missed and are executed at the end of important user actions.
* @param callback function which is called when the model changes
* @return returns the subscription identifier which can be used to unsubscribe
* @returns the subscription identifier which can be used to unsubscribe
*/
subscribeToModelChangePatches(callback: (patch: Patch) => void): number {
return this.patcher.subscribeToDiscreteChanges(callback);
}

/**
* Registers a callback which is executed when the model changes, receiving the changes to the model
* in [JSONPatch](http://jsonpatch.com/) format. This callback is executed for every change to the model, including
* discrete and continuous changes. Discrete changes are changes that should not be missed and are executed at
* the end of important user actions. Continuous changes are changes that are executed during user actions, and is
* ok to miss some of them. For example: moving of an element is a continuous change, while releasing the element
* is a discrete change.
* @param callback function which is called when the model changes
* @returns the subscription identifier which can be used to unsubscribe using `unsubscribeFromModelChangePatches()`.
*/
subscribeToAllModelChangePatches(callback: (patch: Patch) => void): number {
return this.patcher.subscribe(callback);
}

/**
* Registers a callback which is executed when the model changes, receiving only the continuous changes to the model.
* Continuous changes are changes that are executed during user actions, and is ok to miss some of them. For example:
* moving of an element is a continuous change, while releasing the element is a discrete change.
*
* **IMPORTANT**: If you want to keep proper track of the model, make sure that you subscribe to discrete changes
* as well, either via `subscribeToModelChangePatches()` or `subscribeToAllModelChangePatches()`.
*
* @param callback function which is called when the model changes
* @returns the subscription identifier which can be used to unsubscribe using `unsubscribeFromModelChangePatches()`.
*/
subscribeToModelContinuousChangePatches(callback: (patch: Patch) => void): number {
return this.patcher.subscribeToContinuousChanges(callback);
}

/**
* Remove model change subscription, so that the corresponding callback is no longer executed when the model is changed.
* @param subscriptionId subscription identifier
Expand Down
4 changes: 3 additions & 1 deletion src/main/components/store/model-store.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ModelState, PartialModelState } from './model-state';
import {
createPatcherMiddleware,
createPatcherReducer,
isContinuousAction,
isDiscreteAction,
isSelectionAction,
Patcher,
Expand Down Expand Up @@ -70,7 +71,8 @@ export const createReduxStore = (
...(patcher
? [
createPatcherMiddleware<UMLModel, Actions, ModelState>(patcher, {
select: (action) => isDiscreteAction(action) || isSelectionAction(action),
selectDiscrete: (action) => isDiscreteAction(action) || isSelectionAction(action),
selectContinuous: (action) => isContinuousAction(action),
transform: (state) => ModelState.toModel(state, false),
}),
]
Expand Down
20 changes: 15 additions & 5 deletions src/main/services/patcher/patcher-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@ export type PatcherMiddlewareOptions<T, U = T, A = any> = {
transform?: (state: U) => T;

/**
* Selects actions that should trigger a check for changes. This is useful
* Selects actions that should trigger a check for discrete changes. This is useful
* when the patcher should only check for changes when certain actions are dispatched.
*/
select?: (action: A) => boolean;
selectDiscrete?: (action: A) => boolean;

/**
* Selects actions that should trigger a check for continuous changes. Continuous changes
* happen more frequently than discrete changes and are ok to miss a few.
*/
selectContinuous?: (action: A) => boolean;
};

const _DefaultOptions = {
transform: (state: any) => state,
select: () => true,
selectDiscrete: () => true,
selectContinuous: () => false,
};

/**
Expand All @@ -35,16 +42,19 @@ export function createPatcherMiddleware<T, A = any, U = T>(
options: PatcherMiddlewareOptions<T, U, A> = _DefaultOptions,
): PatcherMiddleware<U> {
const transform = options.transform || _DefaultOptions.transform;
const select = options.select || _DefaultOptions.select;
const selectDiscrete = options.selectDiscrete || _DefaultOptions.selectDiscrete;
const selectContinuous = options.selectContinuous || _DefaultOptions.selectContinuous;

return (store) => {
patcher.initialize(transform(store.getState()));

return (next) => (action: A) => {
const res = next(action as any);

if (select(action)) {
if (selectDiscrete(action)) {
patcher.check(transform(store.getState()));
} else if (selectContinuous(action)) {
patcher.checkContinuous(transform(store.getState()));
}

return res;
Expand Down
6 changes: 6 additions & 0 deletions src/main/services/patcher/patcher-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { Operation } from 'fast-json-patch';
import { Actions } from '../actions';
import { Action } from '../../utils/actions/actions';
import { SelectableActionTypes } from '../uml-element/selectable/selectable-types';
import { MovingActionTypes } from '../uml-element/movable/moving-types';
import { ResizingActionTypes } from '../uml-element/resizable/resizing-types';

/**
* Returns true if the action is discrete, i.e. if it is not in middle of
Expand All @@ -24,6 +26,10 @@ export const isSelectionAction = (action: Actions): boolean => {
return action.type === SelectableActionTypes.SELECT || action.type === SelectableActionTypes.DESELECT;
};

export const isContinuousAction = (action: Actions): boolean => {
return action.type === MovingActionTypes.MOVE || action.type === ResizingActionTypes.RESIZE;
};

/**
* A patch is a list of operations that can be applied to an object
* to change them in some desired manner. See [JSON patch](http://jsonpatch.com/) for more info.
Expand Down
107 changes: 94 additions & 13 deletions src/main/services/patcher/patcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { applyReducer } from 'fast-json-patch';
import { buffer, debounceTime, filter, map, Observable, Subject, Subscription } from 'rxjs';
import { buffer, debounceTime, filter, map, merge, Observable, Subject, Subscription, throttleTime } from 'rxjs';

import { compare } from './compare';
import { Patch, PatchListener } from './patcher-types';
Expand All @@ -10,28 +10,63 @@ import { Patch, PatchListener } from './patcher-types';
*/
export type Comparator<T> = (a: T, b: T) => Patch;

export interface PatcherOptions<T> {
/**
* Compares two objects and returns the difference
* in the form of a [JSON patch](http://jsonpatch.com/).
*/
diff: Comparator<T>;

/**
* The maximum frequency of continuous changes emitted by this patcher,
* per second. Defaults to 25. This does not affect discrete changes.
*/
maxFrequency: number;
}

const _DefaultOptions = {
diff: compare,
maxFrequency: 25,
};

/**
* A patcher tracks changes to an object and notifies subscribers.
* It also allows application of patches to the object.
*/
export class Patcher<T> {
private _snapshot: T | undefined;
private subscribers: { [key: number]: Subscription } = {};
private router = new Subject<Patch>();
private discreteRouter = new Subject<Patch>();
private continuousRouter = new Subject<Patch>();
private continuousPatchObservable: Observable<Patch>;
private observable: Observable<Patch>;
readonly options: PatcherOptions<T>;

/**
* @param diff A function that compares two objects and returns the difference
* in the form of a [JSON patch](http://jsonpatch.com/).
*/
constructor(readonly diff: Comparator<T> = compare as Comparator<T>) {
constructor(options: Partial<PatcherOptions<T>> = _DefaultOptions) {
this.options = {
diff: options.diff || _DefaultOptions.diff,
maxFrequency: options.maxFrequency || _DefaultOptions.maxFrequency,
};

//
// throttle continuous patches to handle back-pressure. note that
// unlike discrete changes, it is ok to miss some continuous changes.
//
this.continuousPatchObservable = this.continuousRouter.pipe(throttleTime(1000 / this.options.maxFrequency));

const router = merge(this.discreteRouter, this.continuousPatchObservable);

//
// we might get multiple patches in a single tick,
// for example due to some side effects of some patches being applied.
// to avoid backpressure, we buffer the patches and emit them all at once.
//
this.observable = this.router.pipe(
buffer(this.router.pipe(debounceTime(0))),
this.observable = router.pipe(
buffer(router.pipe(debounceTime(0))),
map((patches) => patches.flat()),
filter((patches) => patches.length > 0),
);
Expand All @@ -49,15 +84,17 @@ export class Patcher<T> {
* @param nextState The next state of the object.
*/
check(nextState: T): void {
this.validate();

const skip = Object.keys(this.subscribers).length === 0;
const patch = !skip && this.diff(this.snapshot, nextState);
this._snapshot = nextState;
this.checkAndUpdate(nextState);
}

if (patch && patch.length) {
this.router.next(patch);
}
/**
* Updates its snapshots, checks for continuous changes and notifies subscribers.
* Continuous changes are changes that happen frequently, such as mouse movement,
* and are ok to miss a few.
* @param nextState The next state of the object.
*/
checkContinuous(nextState: T): void {
this.checkAndUpdate(nextState, false);
}

/**
Expand Down Expand Up @@ -101,6 +138,32 @@ export class Patcher<T> {
return key;
}

/**
* Subscribes to discrete changes to the object. Discrete changes are changes
* that happen infrequently, such as a button click, and should not be missed.
* @param listener A function that will be called when the object changes.
* @returns A subscription ID that can be used to unsubscribe.
*/
subscribeToDiscreteChanges(listener: PatchListener): number {
const key = this.nextKey();
this.subscribers[key] = this.discreteRouter.subscribe(listener);

return key;
}

/**
* Subscribes to continuous changes to the object. Continuous changes are changes
* that happen frequently, such as mouse movement, and are ok to miss a few.
* @param listener A function that will be called when the object changes.
* @returns A subscription ID that can be used to unsubscribe.
*/
subscribeToContinuousChanges(listener: PatchListener): number {
const key = this.nextKey();
this.subscribers[key] = this.continuousPatchObservable.subscribe(listener);

return key;
}

/**
* Unsubscribes from changes to the object.
* @param subscriptionId The subscription ID returned by `subscribe`.
Expand All @@ -110,10 +173,28 @@ export class Patcher<T> {
delete this.subscribers[subscriptionId];
}

// checks for changes and notifies subscribers, using given router
private checkAndUpdate(nextState: T, discreteChange = true): void {
this.validate();

const skip = Object.keys(this.subscribers).length === 0;
const patch = !skip && this.options.diff(this.snapshot, nextState);
if (discreteChange) {
this._snapshot = nextState;
}

if (patch && patch.length) {
const router = discreteChange ? this.discreteRouter : this.continuousRouter;
router.next(patch);
}
}

// generates a unique key for a subscription
private nextKey() {
return Math.max(...Object.keys(this.subscribers).map((k) => parseInt(k, 10)), 0) + 1;
}

// throws if patcher is not initialized
private validate(): asserts this is { snapshot: T } {
if (!this.snapshot) {
throw new Error('Patcher not initialized');
Expand Down
76 changes: 76 additions & 0 deletions src/tests/unit/services/patcher/patcher-middleware-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import sleep from 'sleep-promise';

import { createPatcherMiddleware } from '../../../../main/services/patcher/patcher-middleware';
import { Patcher } from '../../../../main/services/patcher/patcher';

describe('patcher middleware.', () => {
test('should induce changes to the state to be reflected to the patcher.', async () => {
const cb = jest.fn();
const patcher = new Patcher<{ x: number }>();
let state = { x: 42 };

const middleware = createPatcherMiddleware(patcher);
patcher.subscribe(cb);

middleware({ getState: () => state } as any)((() => {
state = { x: 43 };
return state;
}) as any)('ladida');

await sleep(1);

expect(cb).toHaveBeenCalledTimes(1);
expect(patcher.snapshot).toEqual({ x: 43 });
});

test('should induce continuous changes when the action is continuous.', async () => {
const cb1 = jest.fn();
const cb2 = jest.fn();
const cb3 = jest.fn();

const patcher = new Patcher<{ x: number }>();
patcher.subscribe(cb1);
patcher.subscribeToContinuousChanges(cb2);
patcher.subscribeToDiscreteChanges(cb3);

let state = { x: 42 };

const action1 = { type: 'a1' };
const action2 = { type: 'a2' };
const dispatch = (action: { type: string }) => {
state = { x: state.x + 1 };
return state;
};

const middleware = createPatcherMiddleware(patcher, {
selectDiscrete: (action) => action.type === 'a1',
selectContinuous: (action) => action.type === 'a2',
});

const run = middleware({ getState: () => state } as any);

run(dispatch as any)(action1);
await sleep(1);

expect(cb1).toHaveBeenCalledTimes(1);
expect(cb2).toHaveBeenCalledTimes(0);
expect(cb3).toHaveBeenCalledTimes(1);
expect(patcher.snapshot).toEqual({ x: 43 });

run(dispatch as any)(action2);
await sleep(1);

expect(cb1).toHaveBeenCalledTimes(2);
expect(cb2).toHaveBeenCalledTimes(1);
expect(cb3).toHaveBeenCalledTimes(1);
expect(patcher.snapshot).toEqual({ x: 43 });

run(dispatch as any)(action1);
await sleep(1);

expect(cb1).toHaveBeenCalledTimes(3);
expect(cb2).toHaveBeenCalledTimes(1);
expect(cb3).toHaveBeenCalledTimes(2);
expect(patcher.snapshot).toEqual({ x: 45 });
});
});
Loading

0 comments on commit 0f00b6b

Please sign in to comment.