Skip to content

Commit

Permalink
feat(gatsby): PQR: merge data dependencies from workers to the main p…
Browse files Browse the repository at this point in the history
…rocess (gatsbyjs#32305)

* feat(gatsby): PQR: merge data dependencies from workers to the main process

* Move worker state merging to a standalone step

* revert acciental change

* different approach: replay actions vs merging state

* revert unneded changes

* do not use inline snapshot

* Update packages/gatsby/src/utils/worker/__tests__/queries.ts

Co-authored-by: Lennart <[email protected]>

Co-authored-by: Lennart <[email protected]>
  • Loading branch information
vladar and LekoArts authored Jul 19, 2021
1 parent 7df39aa commit bdb9352
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 2 deletions.
141 changes: 140 additions & 1 deletion packages/gatsby/src/utils/worker/__tests__/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import sourceNodesAndRemoveStaleNodes from "../../source-nodes"
import {
savePartialStateToDisk,
store,
emitter,
loadPartialStateFromDisk,
} from "../../../redux"
import { loadConfigAndPlugins } from "../../../bootstrap/load-config-and-plugins"
Expand Down Expand Up @@ -167,7 +168,6 @@ describeWhenLMDB(`worker (queries)`, () => {
savePartialStateToDisk([`components`, `staticQueryComponents`])

await Promise.all(worker.all.buildSchema())
await worker.single.runQueries(queryIdsSmall)
})

afterAll(() => {
Expand All @@ -180,9 +180,15 @@ describeWhenLMDB(`worker (queries)`, () => {
}
})

// This was the original implementation of state syncing between a worker and the main process.
// We switched to "replaying actions" as a mechanism for state syncing.
// But we can get back to state saving / merging if "replaying actions" proves to be too expensive
// TODO: delete or re-activate depending on results yielded by "replaying actions" approach.
// The logic for `loadPartialStateFromDisk` itself is tested in `share-state` tests
it(`should save worker "queries" state to disk`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
await Promise.all(worker.all.saveQueries())
// Pass "1" as workerId as the test only have one worker
const result = loadPartialStateFromDisk([`queries`], `1`)
Expand Down Expand Up @@ -233,6 +239,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute static queries`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const staticQueryResult = await fs.readJson(
Expand All @@ -250,6 +258,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute page queries`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const pageQueryResult = await fs.readJson(
Expand All @@ -265,6 +275,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute page queries with context variables`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const pageQueryResult = await fs.readJson(
Expand Down Expand Up @@ -331,4 +343,131 @@ describeWhenLMDB(`worker (queries)`, () => {

spy.mockRestore()
})

it(`should return actions occurred in worker to replay in the main process`, async () => {
const result = await worker.single.runQueries(queryIdsSmall)

const expectedActionShapes = {
QUERY_START: [`componentPath`, `isPage`, `path`],
PAGE_QUERY_RUN: [`componentPath`, `isPage`, `path`, `resultHash`],
CREATE_COMPONENT_DEPENDENCY: [`nodeId`, `path`],
ADD_PENDING_PAGE_DATA_WRITE: [`path`],
}
expect(result).toBeArrayOfSize(11)

for (const action of result) {
expect(action.type).toBeOneOf(Object.keys(expectedActionShapes))
expect(action.payload).toContainKeys(expectedActionShapes[action.type])
}
// Double-check that important actions are actually present
expect(result).toContainValue(
expect.objectContaining({ type: `QUERY_START` })
)
expect(result).toContainValue(
expect.objectContaining({ type: `PAGE_QUERY_RUN` })
)
})

it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => {
const expectedActions = [
{
payload: {
componentPath: `/static-query-component.js`,
isPage: false,
path: `sq--q1`,
},
type: `QUERY_START`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `sq--q1`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
componentPath: `/static-query-component.js`,
isPage: false,
path: `sq--q1`,
queryHash: `q1-hash`,
resultHash: `Dr5hgCDB+R0S9oRBWeZYj3lB7VI=`,
},
type: `PAGE_QUERY_RUN`,
},
{
payload: {
componentPath: `/foo.js`,
isPage: true,
path: `/foo`,
},
type: `QUERY_START`,
},
{
payload: {
componentPath: `/bar.js`,
isPage: true,
path: `/bar`,
},
type: `QUERY_START`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `/foo`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `/bar`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
path: `/foo`,
},
type: `ADD_PENDING_PAGE_DATA_WRITE`,
},
{
payload: {
componentPath: `/foo.js`,
isPage: true,
path: `/foo`,
resultHash: `8dW7PoqwZNk/0U8LO6kTj1qBCwU=`,
},
type: `PAGE_QUERY_RUN`,
},
{
payload: {
path: `/bar`,
},
type: `ADD_PENDING_PAGE_DATA_WRITE`,
},
{
payload: {
componentPath: `/bar.js`,
isPage: true,
path: `/bar`,
resultHash: `iKmhf9XgbsfK7qJw0tw95pmGwJM=`,
},
type: `PAGE_QUERY_RUN`,
},
]

const actualActions: Array<any> = []
function listenActions(action): void {
actualActions.push(action)
}
emitter.on(`*`, listenActions)
await runQueriesInWorkersQueue(worker, queryIdsSmall)
emitter.off(`*`, listenActions)

expect(actualActions).toContainAllValues(expectedActions)
})
})
40 changes: 39 additions & 1 deletion packages/gatsby/src/utils/worker/child/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import { GraphQLRunner } from "../../../query/graphql-runner"
import { getDataStore } from "../../../datastore"
import { setState } from "./state"
import { buildSchema } from "./schema"
import {
IAddPendingPageDataWriteAction,
ICreatePageDependencyAction,
IPageQueryRunAction,
IQueryStartAction,
} from "../../../redux/types"

export function setComponents(): void {
setState([`components`, `staticQueryComponents`])
Expand All @@ -29,7 +35,39 @@ function getGraphqlRunner(): GraphQLRunner {
return gqlRunner
}

export async function runQueries(queryIds: IGroupedQueryIds): Promise<void> {
type ActionsToReplay = Array<
| IQueryStartAction
| IPageQueryRunAction
| IAddPendingPageDataWriteAction
| ICreatePageDependencyAction
>

export async function runQueries(
queryIds: IGroupedQueryIds
): Promise<ActionsToReplay> {
const actionsToReplay: ActionsToReplay = []

const unsubscribe = store.subscribe(() => {
const action = store.getState().lastAction
if (
action.type === `QUERY_START` ||
action.type === `PAGE_QUERY_RUN` ||
action.type === `ADD_PENDING_PAGE_DATA_WRITE` ||
action.type === `CREATE_COMPONENT_DEPENDENCY`
) {
actionsToReplay.push(action)
}
})

try {
await doRunQueries(queryIds)
return actionsToReplay
} finally {
unsubscribe()
}
}

async function doRunQueries(queryIds: IGroupedQueryIds): Promise<void> {
const workerStore = store.getState()

// If buildSchema() didn't run yet, execute it
Expand Down
18 changes: 18 additions & 0 deletions packages/gatsby/src/utils/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging"
import { initReporterMessagingInMainProcess } from "./reporter"

import { GatsbyWorkerPool } from "./types"
import { store } from "../../redux"
import { ActionsUnion } from "../../redux/types"

export type { GatsbyWorkerPool }

Expand Down Expand Up @@ -48,6 +50,7 @@ export async function runQueriesInWorkersQueue(
promises.push(
pool.single
.runQueries({ pageQueryIds: [], staticQueryIds: segment })
.then(replayWorkerActions)
.then(() => {
activity.tick(segment.length)
})
Expand All @@ -58,6 +61,7 @@ export async function runQueriesInWorkersQueue(
promises.push(
pool.single
.runQueries({ pageQueryIds: segment, staticQueryIds: [] })
.then(replayWorkerActions)
.then(() => {
activity.tick(segment.length)
})
Expand All @@ -68,3 +72,17 @@ export async function runQueriesInWorkersQueue(

activity.end()
}

async function replayWorkerActions(
actions: Array<ActionsUnion>
): Promise<void> {
let i = 1
for (const action of actions) {
store.dispatch(action)

// Give event loop some breath
if (i++ % 100 === 0) {
await new Promise(resolve => process.nextTick(resolve))
}
}
}

0 comments on commit bdb9352

Please sign in to comment.