Skip to main content

Event Store Adapter

Event Store Adapter APIā€‹

Event Store adapters expose the following API that allows you to communicate with the underlying event store.

Function NameDescription
initInitializes a database.
dropDrops a database.
describeReturns information about the event store.
disposeDisconnects from a database and disposes unmanaged resources.
saveEventSaves an event to the database.
loadEventsGets an array of events and the next cursor from the store based on the specified filter criteria.
getLatestEventGets the latest saved event.
freezeFreezes the database.
unfreezeUnfreezes the database.
loadSnapshotLoads a snapshot.
saveSnapshotCreates or updates a snapshot.
dropSnapshotDeletes a snapshot.
incrementalImportIncrementally imports events.
beginIncrementalImportStarts to build a batch of events to import.
pushIncrementalImportAdds events to an incremental import batch.
commitIncrementalImportCommits an incremental import batch to the event store.
rollbackIncrementalImportDrops an incremental import batch.
getNextCursorGets the next database cursor used to traverse the data sample returned by the underlying database.
importEventsGets a writable stream used to save events.
exportEventsGets a readable stream used to load events.
importSecretsGets a writable stream used to save secrets.
exportSecretsGets a writable stream used to load secrets.

initā€‹

Initializes the database.

Exampleā€‹

import createEventStoreAdapter from '@resolve-js/eventstore-xxx'

const eventStoreAdapter = createEventStoreAdapter(options)

await eventStoreAdapter.init()

dropā€‹

Drops the database.

Exampleā€‹

await eventStoreAdapter.drop()

describeā€‹

Obtain information about the event store.

Exampleā€‹

const eventCount = await eventStoreAdapter.describe({ estimateCounts: true })).eventCount

Argumentsā€‹

Argument NameTypeDescription
optionsobjectContains options that specifies what calculations should be performed.

optionsā€‹

Contains options that specifies what calculations should be performed. The options object has the following structure:

Object Structure
{
estimateCounts,
calculateCursor,
}
Field NameTypeDescription
estimateCounts?booleanSpecifies whether or not to use estimated values for the returned eventCount and secretCount.
calculateCursor?booleanSpecifies whether or not to get database cursor used to traverse events.
info

The estimateCounts option is implemented for the @resolve-js/eventstore-postgresql adapter to optimize performance on large event stores. If set to true, this option specifies that the returned eventCount and secretCount should be estimated based on metadata stored in service tables.

The default false value specifies that the exact number of event entries in the database is calculated at the cost of performance.

Resultā€‹

A promise that resolves to an object of the following structure:

{
eventCount, // (number) The number of events in the store.
secretCount, // (number) The number of secrets in the store.
setSecretCount, // (number) The number of saved secrets.
deletedSecretCount, // (number) The number of deleted secrets.
isFrozen, // (boolean) Indicates if the event store is frozen.
lastEventTimestamp, // (number) The timestamp of the last saved event.
cursor, // (string or null, optional) The database used to traverse the events.
resourceNames, // (object)
}

disposeā€‹

Disconnects from the database and disposes unmanaged resources.

Exampleā€‹

await eventStoreAdapter.dispose()

saveEventā€‹

Saves an event to the database.

Exampleā€‹

await eventStoreAdapter.saveEvent({
aggregateId: 'user-id',
aggregateVersion: 1,
type: 'USER_CREATED',
timestamp: Date.now(),
payload: {
name: 'user-name',
},
})

Argumentsā€‹

Argument NameTypeDescription
eventAn event object.A event to save.

loadEventsā€‹

Gets an array of events and the next cursor from the store based on the specified filter criteria.

Exampleā€‹

const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})

Argumentsā€‹

Argument NameTypeDescription
eventFilterAn event filter object.Describes criteria used to filter the loaded events.

Resultā€‹

A promise that resolves to an object of the following structure:

{
events, cursor
}

The result object contains the following fields:

Field NameTypeDescription
eventsAn array of event objects.The resulting filtered set of events.
cursorstring or nullA database cursor used to load the next batch of events.

The returned cursor points to the position within the resulting dataset past the loaded batch of events. You can use this cursor to chain together loadEvents calls.

caution

If the startTime and/or finishTime filtering criteria are specified, the returned cursor object is invalid and should not be used in subsequent loadEvents calls.

Usageā€‹

Filter by event types:

const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
cursor: null,
})

Filter by aggregate IDs:

const { events, cursor } = await adapter.loadEvents({
limit: 100,
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})

Combine filter criteria:

const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})

Load events from the specified time range:

const startTime = new Date('2021-10-15T09:00:00').getTime()
const finishTime = new Date('2021-11-20T09:30:00').getTime()

const { events } = await adapter.loadEvents({
limit: 100,
startTime: startTime,
finishTime: finishTime,
})

expect(events[0].timestamp).toBeGreaterThanOrEqual(startTime)
expect(events[events.length - 1].timestamp).toBeLessThanOrEqual(finishTime)

Use a cursor to chain loadEvents calls:

const result = await adapter.loadEvents({
limit: 100,
cursor: null,
})

expect(result.events.length).toBeGreaterThan(0)

// Use the returned cursor to load the next batch of events.
const nextResult = await adapter.loadEvents({
limit: 100,
cursor: result.cursor,
})

if (nextResult.events.length === 0) {
console.log('No more events found by this filter')
}

getLatestEventā€‹

Gets the latest saved event.

freezeā€‹

Freezes the database.

Exampleā€‹

await eventStoreAdapter.freeze()

Resultā€‹

A promise that resolves after the event store has been successfully frozen.

unfreezeā€‹

Unfreezes the database.

Exampleā€‹

await eventStoreAdapter.unfreeze()

Resultā€‹

A promise that resolves after the event store has been successfully unfrozen.

loadSnapshotā€‹

Loads a snapshot.

Exampleā€‹

const content = await eventStoreAdapter.loadSnapshot(snapshotKey)
if (content == null) {
throw new Error('SnapshotNotFoundException')
}

Argumentsā€‹

Argument NameTypeDescription
snapshotKeystringA unique key in the table of snapshots.

Resultā€‹

A promise that resolves to a string that is a snapshot in text format or null if the snapshot was not found.

saveSnapshotā€‹

Creates or updates a snapshot.

Exampleā€‹

await eventStoreAdapter.saveSnapshot(snapshotKey, content)

Argumentsā€‹

Argument NameTypeDescription
snapshotKeystringA unique key in the table of snapshots.
contentstringA snapshot in text format.

Resultā€‹

A promise that resolves after the snapshot has been successfully saved.

dropSnapshotā€‹

Deletes a snapshot.

await eventStoreAdapter.dropSnapshot(snapshotKey)

Argumentsā€‹

Argument NameTypeDescription
snapshotKeystringA unique key in the table of snapshots

Resultā€‹

A promise that resolves after the snapshot has been successfully deleted.

Exampleā€‹

incrementalImportā€‹

Incrementally imports events.

Exampleā€‹

await eventStoreAdapter.incrementalImport(events)

Argumentsā€‹

Argument NameTypeDescription
eventsAn array of event objects.The events to import.

Resultā€‹

A promise that resolves on the successful import.

beginIncrementalImportā€‹

Starts to build a batch of events to import.

Exampleā€‹

const importId = await eventStoreAdapter.beginIncrementalImport()

Resultā€‹

A promise that resolves to a string that is the ID of the created import batch.

pushIncrementalImportā€‹

Adds events to an incremental import batch.

Exampleā€‹

await eventStoreAdapter.pushIncrementalImport(events, importId)

Argumentsā€‹

Argument NameTypeDescription
eventsAn array of event objects.The events to add to the batch.
importIdstringA unique key of an import batch.

Resultā€‹

A promise that resolves on the successful import.

commitIncrementalImportā€‹

Commits an incremental import batch to the event store.

Exampleā€‹

await eventStoreAdapter.commitIncrementalImport(importId)

Argumentsā€‹

Argument NameTypeDescription
importIdstringA unique key of an import batch.

Resultā€‹

A promise that resolves on the successful commit.

rollbackIncrementalImportā€‹

Drops an incremental import batch.

await eventStoreAdapter.rollbackIncrementalImport()

Resultā€‹

A promise that resolves on the successful rollback.

getNextCursorā€‹

Gets the next cursor in the event store database based on the previous cursor and an array of events obtained from it.

Argumentsā€‹

Argument NameTypeDescription
prevCursorstring or nullThe previous cursor.
eventsAn array of event objects.An array of events obtained from the previous cursor.

Resultā€‹

A string that is a new database cursor.

importEventsā€‹

Gets a writable stream used to save events.

Exampleā€‹

import { promisify } from 'util'
import fs from 'fs'

await promisify(pipeline)(
fs.createReadStream('path/to/events.txt'),
eventstoreAdapter.importEvents()
)

Argumentsā€‹

Argument NameTypeDescription
options?objectSpecifies event import options.

optionsā€‹

Specifies event import options.

Object Structure
{
byteOffset: number,
maintenanceMode: MAINTENANCE_MODE
}
Field NameTypeDescription
byteOffsetnumberA byte offset within the source of event data from which to start reading.
maintenanceModeA Maintenance Mode value.Defines whether or not to switch the event store to maintenance mode during the import.

Resultā€‹

An Import Events Stream object.

Usageā€‹

Basic import from a file:

import { promisify } from 'util'
import fs from 'fs'

await promisify(pipeline)(
fs.createReadStream('path/to/events.txt'),
eventstoreAdapter.importEvents()
)

Import with a timeout:

import { promisify } from 'util'
import fs from 'fs'

const exportedEventsFileName = 'path/to/events.txt'
const exportedEventsFileSize = fs.statSync(exportedEventsFileName).size

let byteOffset = 0 // Initially set to zero (the file start).
let savedEventsCount = 0

while (true) {
const importStream = eventstoreAdapter.importEvents({
byteOffset,
maintenanceMode: MAINTENANCE_MODE_MANUAL,
})

const pipelinePromise = promisify(pipeline)(
fs.createReadStream(exportedEventsFileName, { start: byteOffset }), // Start reading from the beginning or continue from the offset.
importStream
).then(() => false)

const timeoutPromise =
new Promise() <
boolean >
((resolve) =>
setTimeout(() => {
resolve(true)
}, getInterruptingTimeout()))

const isTimedOut = await Promise.race([timeoutPromise, pipelinePromise])

if (isTimedOut) {
importStream.emit('timeout') // Notify that the time is over.
await pipelinePromise // Still need to make sure all async operations are completed.
}

byteOffset = importStream.byteOffset // Save byteOffset for future invocations so it can be passed to fs.createReadStream.
savedEventsCount += importStream.savedEventsCount

if (byteOffset >= exportedEventsFileSize) {
break
}
}
console.log(`Imported ${savedEventsCount} events`)

exportEventsā€‹

Gets a readable stream used to load events.

Exampleā€‹

import { promisify } from 'util'
import { pipeline } from 'stream'

await promisify(pipeline)(
inputEventstoreAdapter.exportEvents(),
outputEventstoreAdapter.importEvents()
)

Argumentsā€‹

Argument NameTypeDescription
options?objectSpecifies event export options.

optionsā€‹

Specifies event export options.

Object Structure
{
cursor: string or null,
maintenanceMode: MAINTENANCE_MODE
}
Field NameTypeDescription
cursorstring or nullA cursor that specifies the position within the dataset from which to start reading events. If set to null, the events are read starting from the beginning.
maintenanceModeA Maintenance Mode value.Defines whether or not to switch the event store to maintenance mode during the export.

Resultā€‹

An Export Events Stream object.

Usageā€‹

Basic export to a file:

import fs from 'fs'
import { promisify } from 'util'
import { pipeline } from 'stream'

const exportFilePath = 'exported-events.txt'
const fileStream = fs.createWriteStream(exportFilePath)
await promisify(pipeline)(eventstoreAdapter.exportEvents(), fileStream)
await fileStream.close()

Export with a timeout:

import { promisify } from 'util'

let cursor = null
const exportBuffers = []
while (true) {
const exportStream = eventstoreAdapter.exportEvents({ cursor })
const tempStream = createStreamBuffer() // Some writable stream.
const pipelinePromise = promisify(pipeline)(exportStream, tempStream).then(
() => false
)

const timeoutPromise =
new Promise() <
boolean >
((resolve) =>
setTimeout(() => {
resolve(true)
}, getInterruptingTimeout()))

const isJsonStreamTimedOut = await Promise.race([
timeoutPromise,
pipelinePromise,
])

if (isJsonStreamTimedOut) {
exportStream.emit('timeout') // Notify that time is over.
await pipelinePromise // Still need to make sure all async operations are completed.
}

cursor = exportStream.cursor // Save cursor in so it can be used on the next loop iteration if required.

const buffer = tempStream.getBuffer().toString('utf8')

exportBuffers.push(buffer) // Save that could be read before the timeout.
if (exportStream.isEnd) {
break
}
}

// join and parse the obtained event data.
const outputEvents = exportBuffers
.join('')
.trim()
.split('\n')
.map((eventAsString) => JSON.parse(eventAsString.trim()))

importSecretsā€‹

Gets a writable stream used to save secrets.

Exampleā€‹

import { promisify } from 'util'
import { pipeline } from 'stream'
...
await promisify(pipeline)(
inputEventstoreAdapter.exportSecrets(),
outputEventstoreAdapter.importSecrets()
)

Argumentsā€‹

Argument NameTypeDescription
options?objectSpecifies secret import options.

optionsā€‹

Specifies secret import options.

Object Structure
{
maintenanceMode: MAINTENANCE_MODE
}
Field NameTypeDescription
maintenanceModeA Maintenance Mode value.Defines whether or not to switch the event store to maintenance mode during the import.

Resultā€‹

A writable stream object. Secrets are written as single-row JSON data structures separate with the newline character ('\n'). The JSON structures include the following fields:

Field NameDescription
secretThe secret value.
idThe secret's unique identifier.
idxAn index value that is incremented for each subsequent secret in the store.

exportSecretsā€‹

Gets a writable stream used to load secrets.

Exampleā€‹

import fs from 'fs'
import { promisify } from 'util'
import { pipeline } from 'stream'
...
const exportFilePath = 'exported-secrets.txt'
const fileStream = fs.createWriteStream(exportFilePath)
await promisify(pipeline)(eventstoreAdapter.exportSecrets(), fileStream)
await fileStream.close()

Argumentsā€‹

Argument NameTypeDescription
options?objectSpecifies secret export options.

optionsā€‹

Specifies secret export options.

Object Structure
{
idx: number or null,
maintenanceMode: MAINTENANCE_MODE
}
Field NameTypeDescription
idxnumber or nullThe index from which to start exporting secrets. If set to null or 0, the secrets are exported starting from the beginning.
maintenanceModeA Maintenance Mode value.Defines whether or not to switch the event store to maintenance mode during the export.

Resultā€‹

A readable stream object. Secrets are read as single-row JSON data structures separate with the newline character ('\n'). The JSON structures include the following fields:

Field NameDescription
secretThe secret value.
idThe secret's unique identifier.
idxAn index value that is incremented for each subsequent secret in the store.

Maintenance Modeā€‹

A maintenance mode option value defines whether or not to switch the event store to maintenance mode during an import and/or export operation. You can specify this option for the following operations:

The @resolve-js/eventstore-base package exports constants that are the allowed values for the maintenance mode option:

import {
MAINTENANCE_MODE_MANUAL,
MAINTENANCE_MODE_AUTO,
} from '@resolve-js/eventstore-base'

These values define the following behavior.

On Export:

  • MAINTENANCE_MODE_AUTO specifies that the operation should freeze the event store at the start and unfreeze it at the end of the export process.
  • MAINTENANCE_MODE_MANUAL specifies that the operation should not do any implicit actions.

On Import:

  • MAINTENANCE_MODE_AUTO - the same as on export, but also specifies that the events/secrets database table should be re-created from scratch.
  • MAINTENANCE_MODE_MANUAL specifies that the operation should not do any implicit actions.

Event Filterā€‹

An event filter object is a parameter for the loadEvents function that describes criteria used to filter the loaded events. It can contain the following fields:

limit (required)ā€‹

Maximum number of events to retrieve in one call.

cursorā€‹

The value that specifies an internal position within the event store. loadEvents returns events starting with this position. Cursors can be obtained from the previous loadEvents or saveEvent calls.

If this option is set to null, the cursor in the initial position is used. Even if the cursor is null, it should be be passed explicitly.

startTime and finishTimeā€‹

Specify the inclusive start and end of the time range for which to load events. Specified in milliseconds elapsed since January 1, 1970 00:00:00 UTC. Both values can be omitted so that there is no lower and/or upper bound.

caution

If the startTime and/or finishTime options are specified, the cursor should be omitted, otherwise an error will occur.

aggregateIdsā€‹

Array of included aggregate IDs.

eventTypesā€‹

Array of included event types.

Import Events Streamā€‹

A writable stream object that the importEvents function returns. This object extends the Node.js fs.WriteStream with the following properties:

Property NameDescription
byteOffsetA byte offset within the source of event data from which to start reading. The new offset is assigned to the property with each imported event. Use this property to resume an interrupted import process.
savedEventsCountThe number of saved events. This property is incremented as you write events to the stream.

Events are written as single-row JSON data structures separate with the newline character ('\n').

Export Events Streamā€‹

A readable stream object that the exportEvents function returns. This object extends the Node.js fs.ReadStream with the following properties:

Property NameDescription
cursorA database cursor that indicates the current position within the dataset. The cursor is incremented as you read the stream.
isEndIndicates that all events have been read from the stream.

Events are read as single-row JSON data structures separate with the newline character ('\n').