Event Store Adapter
Event Store Adapter APIā
Event Store adapters expose the following API that allows you to communicate with the underlying event store.
Function Name | Description |
---|---|
init | Initializes a database. |
drop | Drops a database. |
describe | Returns information about the event store. |
dispose | Disconnects from a database and disposes unmanaged resources. |
saveEvent | Saves an event to the database. |
loadEvents | Gets an array of events and the next cursor from the store based on the specified filter criteria. |
getLatestEvent | Gets the latest saved event. |
freeze | Freezes the database. |
unfreeze | Unfreezes the database. |
loadSnapshot | Loads a snapshot. |
saveSnapshot | Creates or updates a snapshot. |
dropSnapshot | Deletes a snapshot. |
incrementalImport | Incrementally imports events. |
beginIncrementalImport | Starts to build a batch of events to import. |
pushIncrementalImport | Adds events to an incremental import batch. |
commitIncrementalImport | Commits an incremental import batch to the event store. |
rollbackIncrementalImport | Drops an incremental import batch. |
getNextCursor | Gets the next database cursor used to traverse the data sample returned by the underlying database. |
importEvents | Gets a writable stream used to save events. |
exportEvents | Gets a readable stream used to load events. |
importSecrets | Gets a writable stream used to save secrets. |
exportSecrets | Gets a writable stream used to load secrets. |
init
ā
Initializes the database.
Exampleā
import createEventStoreAdapter from '@resolve-js/eventstore-xxx'
const eventStoreAdapter = createEventStoreAdapter(options)
await eventStoreAdapter.init()
drop
ā
Drops the database.
Exampleā
await eventStoreAdapter.drop()
describe
ā
Obtain information about the event store.
Exampleā
const eventCount = await eventStoreAdapter.describe({ estimateCounts: true })).eventCount
Argumentsā
Argument Name | Type | Description |
---|---|---|
options | object | Contains options that specifies what calculations should be performed. |
options
ā
Contains options that specifies what calculations should be performed. The options object has the following structure:
{
estimateCounts,
calculateCursor,
}
Field Name | Type | Description |
---|---|---|
estimateCounts? | boolean | Specifies whether or not to use estimated values for the returned eventCount and secretCount . |
calculateCursor? | boolean | Specifies whether or not to get database cursor used to traverse events. |
info
The estimateCounts
option is implemented for the @resolve-js/eventstore-postgresql
adapter to optimize performance on large event stores.
If set to true
, this option specifies that the returned eventCount
and secretCount
should be estimated based on metadata stored in service tables.
The default false
value specifies that the exact number of event entries in the database is calculated at the cost of performance.
Resultā
A promise that resolves to an object of the following structure:
{
eventCount, // (number) The number of events in the store.
secretCount, // (number) The number of secrets in the store.
setSecretCount, // (number) The number of saved secrets.
deletedSecretCount, // (number) The number of deleted secrets.
isFrozen, // (boolean) Indicates if the event store is frozen.
lastEventTimestamp, // (number) The timestamp of the last saved event.
cursor, // (string or null, optional) The database used to traverse the events.
resourceNames, // (object)
}
dispose
ā
Disconnects from the database and disposes unmanaged resources.
Exampleā
await eventStoreAdapter.dispose()
saveEvent
ā
Saves an event to the database.
Exampleā
await eventStoreAdapter.saveEvent({
aggregateId: 'user-id',
aggregateVersion: 1,
type: 'USER_CREATED',
timestamp: Date.now(),
payload: {
name: 'user-name',
},
})
Argumentsā
Argument Name | Type | Description |
---|---|---|
event | An event object. | A event to save. |
loadEvents
ā
Gets an array of events and the next cursor from the store based on the specified filter criteria.
Exampleā
const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})
Argumentsā
Argument Name | Type | Description |
---|---|---|
eventFilter | An event filter object. | Describes criteria used to filter the loaded events. |
Resultā
A promise
that resolves to an object of the following structure:
{
events, cursor
}
The result object contains the following fields:
Field Name | Type | Description |
---|---|---|
events | An array of event objects. | The resulting filtered set of events. |
cursor | string or null | A database cursor used to load the next batch of events. |
The returned cursor
points to the position within the resulting dataset past the loaded batch of events. You can use this cursor to chain together loadEvents
calls.
caution
If the startTime
and/or finishTime
filtering criteria are specified, the returned cursor
object is invalid and should not be used in subsequent loadEvents
calls.
Usageā
Filter by event types:
const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
cursor: null,
})
Filter by aggregate IDs:
const { events, cursor } = await adapter.loadEvents({
limit: 100,
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})
Combine filter criteria:
const { events, cursor } = await adapter.loadEvents({
limit: 100,
eventTypes: ['COMMENT_CREATED', 'COMMENT_REMOVED'],
aggregateIds: ['9f81a98a', '063c1ed5'],
cursor: null,
})
Load events from the specified time range:
const startTime = new Date('2021-10-15T09:00:00').getTime()
const finishTime = new Date('2021-11-20T09:30:00').getTime()
const { events } = await adapter.loadEvents({
limit: 100,
startTime: startTime,
finishTime: finishTime,
})
expect(events[0].timestamp).toBeGreaterThanOrEqual(startTime)
expect(events[events.length - 1].timestamp).toBeLessThanOrEqual(finishTime)
Use a cursor to chain loadEvents
calls:
const result = await adapter.loadEvents({
limit: 100,
cursor: null,
})
expect(result.events.length).toBeGreaterThan(0)
// Use the returned cursor to load the next batch of events.
const nextResult = await adapter.loadEvents({
limit: 100,
cursor: result.cursor,
})
if (nextResult.events.length === 0) {
console.log('No more events found by this filter')
}
getLatestEvent
ā
Gets the latest saved event.
freeze
ā
Freezes the database.
Exampleā
await eventStoreAdapter.freeze()
Resultā
A promise
that resolves after the event store has been successfully frozen.
unfreeze
ā
Unfreezes the database.
Exampleā
await eventStoreAdapter.unfreeze()
Resultā
A promise
that resolves after the event store has been successfully unfrozen.
loadSnapshot
ā
Loads a snapshot.
Exampleā
const content = await eventStoreAdapter.loadSnapshot(snapshotKey)
if (content == null) {
throw new Error('SnapshotNotFoundException')
}
Argumentsā
Argument Name | Type | Description |
---|---|---|
snapshotKey | string | A unique key in the table of snapshots. |
Resultā
A promise
that resolves to a string
that is a snapshot in text format or null
if the snapshot was not found.
saveSnapshot
ā
Creates or updates a snapshot.
Exampleā
await eventStoreAdapter.saveSnapshot(snapshotKey, content)
Argumentsā
Argument Name | Type | Description |
---|---|---|
snapshotKey | string | A unique key in the table of snapshots. |
content | string | A snapshot in text format. |
Resultā
A promise
that resolves after the snapshot has been successfully saved.
dropSnapshot
ā
Deletes a snapshot.
await eventStoreAdapter.dropSnapshot(snapshotKey)
Argumentsā
Argument Name | Type | Description |
---|---|---|
snapshotKey | string | A unique key in the table of snapshots |
Resultā
A promise
that resolves after the snapshot has been successfully deleted.
Exampleā
incrementalImport
ā
Incrementally imports events.
Exampleā
await eventStoreAdapter.incrementalImport(events)
Argumentsā
Argument Name | Type | Description |
---|---|---|
events | An array of event objects. | The events to import. |
Resultā
A promise
that resolves on the successful import.
beginIncrementalImport
ā
Starts to build a batch of events to import.
Exampleā
const importId = await eventStoreAdapter.beginIncrementalImport()
Resultā
A promise
that resolves to a string
that is the ID of the created import batch.
pushIncrementalImport
ā
Adds events to an incremental import batch.
Exampleā
await eventStoreAdapter.pushIncrementalImport(events, importId)
Argumentsā
Argument Name | Type | Description |
---|---|---|
events | An array of event objects. | The events to add to the batch. |
importId | string | A unique key of an import batch. |
Resultā
A promise
that resolves on the successful import.
commitIncrementalImport
ā
Commits an incremental import batch to the event store.
Exampleā
await eventStoreAdapter.commitIncrementalImport(importId)
Argumentsā
Argument Name | Type | Description |
---|---|---|
importId | string | A unique key of an import batch. |
Resultā
A promise
that resolves on the successful commit.
rollbackIncrementalImport
ā
Drops an incremental import batch.
await eventStoreAdapter.rollbackIncrementalImport()
Resultā
A promise
that resolves on the successful rollback.
getNextCursor
ā
Gets the next cursor in the event store database based on the previous cursor and an array of events obtained from it.
Argumentsā
Argument Name | Type | Description |
---|---|---|
prevCursor | string or null | The previous cursor. |
events | An array of event objects. | An array of events obtained from the previous cursor. |
Resultā
A string
that is a new database cursor.
importEvents
ā
Gets a writable stream used to save events.
Exampleā
import { promisify } from 'util'
import fs from 'fs'
await promisify(pipeline)(
fs.createReadStream('path/to/events.txt'),
eventstoreAdapter.importEvents()
)
Argumentsā
Argument Name | Type | Description |
---|---|---|
options? | object | Specifies event import options. |
options
ā
Specifies event import options.
{
byteOffset: number,
maintenanceMode: MAINTENANCE_MODE
}
Field Name | Type | Description |
---|---|---|
byteOffset | number | A byte offset within the source of event data from which to start reading. |
maintenanceMode | A Maintenance Mode value. | Defines whether or not to switch the event store to maintenance mode during the import. |
Resultā
An Import Events Stream object.
Usageā
Basic import from a file:
import { promisify } from 'util'
import fs from 'fs'
await promisify(pipeline)(
fs.createReadStream('path/to/events.txt'),
eventstoreAdapter.importEvents()
)
Import with a timeout:
import { promisify } from 'util'
import fs from 'fs'
const exportedEventsFileName = 'path/to/events.txt'
const exportedEventsFileSize = fs.statSync(exportedEventsFileName).size
let byteOffset = 0 // Initially set to zero (the file start).
let savedEventsCount = 0
while (true) {
const importStream = eventstoreAdapter.importEvents({
byteOffset,
maintenanceMode: MAINTENANCE_MODE_MANUAL,
})
const pipelinePromise = promisify(pipeline)(
fs.createReadStream(exportedEventsFileName, { start: byteOffset }), // Start reading from the beginning or continue from the offset.
importStream
).then(() => false)
const timeoutPromise =
new Promise() <
boolean >
((resolve) =>
setTimeout(() => {
resolve(true)
}, getInterruptingTimeout()))
const isTimedOut = await Promise.race([timeoutPromise, pipelinePromise])
if (isTimedOut) {
importStream.emit('timeout') // Notify that the time is over.
await pipelinePromise // Still need to make sure all async operations are completed.
}
byteOffset = importStream.byteOffset // Save byteOffset for future invocations so it can be passed to fs.createReadStream.
savedEventsCount += importStream.savedEventsCount
if (byteOffset >= exportedEventsFileSize) {
break
}
}
console.log(`Imported ${savedEventsCount} events`)
exportEvents
ā
Gets a readable stream used to load events.
Exampleā
import { promisify } from 'util'
import { pipeline } from 'stream'
await promisify(pipeline)(
inputEventstoreAdapter.exportEvents(),
outputEventstoreAdapter.importEvents()
)
Argumentsā
Argument Name | Type | Description |
---|---|---|
options? | object | Specifies event export options. |
options
ā
Specifies event export options.
{
cursor: string or null,
maintenanceMode: MAINTENANCE_MODE
}
Field Name | Type | Description |
---|---|---|
cursor | string or null | A cursor that specifies the position within the dataset from which to start reading events. If set to null , the events are read starting from the beginning. |
maintenanceMode | A Maintenance Mode value. | Defines whether or not to switch the event store to maintenance mode during the export. |
Resultā
An Export Events Stream object.
Usageā
Basic export to a file:
import fs from 'fs'
import { promisify } from 'util'
import { pipeline } from 'stream'
const exportFilePath = 'exported-events.txt'
const fileStream = fs.createWriteStream(exportFilePath)
await promisify(pipeline)(eventstoreAdapter.exportEvents(), fileStream)
await fileStream.close()
Export with a timeout:
import { promisify } from 'util'
let cursor = null
const exportBuffers = []
while (true) {
const exportStream = eventstoreAdapter.exportEvents({ cursor })
const tempStream = createStreamBuffer() // Some writable stream.
const pipelinePromise = promisify(pipeline)(exportStream, tempStream).then(
() => false
)
const timeoutPromise =
new Promise() <
boolean >
((resolve) =>
setTimeout(() => {
resolve(true)
}, getInterruptingTimeout()))
const isJsonStreamTimedOut = await Promise.race([
timeoutPromise,
pipelinePromise,
])
if (isJsonStreamTimedOut) {
exportStream.emit('timeout') // Notify that time is over.
await pipelinePromise // Still need to make sure all async operations are completed.
}
cursor = exportStream.cursor // Save cursor in so it can be used on the next loop iteration if required.
const buffer = tempStream.getBuffer().toString('utf8')
exportBuffers.push(buffer) // Save that could be read before the timeout.
if (exportStream.isEnd) {
break
}
}
// join and parse the obtained event data.
const outputEvents = exportBuffers
.join('')
.trim()
.split('\n')
.map((eventAsString) => JSON.parse(eventAsString.trim()))
importSecrets
ā
Gets a writable stream used to save secrets.
Exampleā
import { promisify } from 'util'
import { pipeline } from 'stream'
...
await promisify(pipeline)(
inputEventstoreAdapter.exportSecrets(),
outputEventstoreAdapter.importSecrets()
)
Argumentsā
Argument Name | Type | Description |
---|---|---|
options? | object | Specifies secret import options. |
options
ā
Specifies secret import options.
{
maintenanceMode: MAINTENANCE_MODE
}
Field Name | Type | Description |
---|---|---|
maintenanceMode | A Maintenance Mode value. | Defines whether or not to switch the event store to maintenance mode during the import. |
Resultā
A writable stream object. Secrets are written as single-row JSON data structures separate with the newline character ('\n'
). The JSON structures include the following fields:
Field Name | Description |
---|---|
secret | The secret value. |
id | The secret's unique identifier. |
idx | An index value that is incremented for each subsequent secret in the store. |
exportSecrets
ā
Gets a writable stream used to load secrets.
Exampleā
import fs from 'fs'
import { promisify } from 'util'
import { pipeline } from 'stream'
...
const exportFilePath = 'exported-secrets.txt'
const fileStream = fs.createWriteStream(exportFilePath)
await promisify(pipeline)(eventstoreAdapter.exportSecrets(), fileStream)
await fileStream.close()
Argumentsā
Argument Name | Type | Description |
---|---|---|
options? | object | Specifies secret export options. |
options
ā
Specifies secret export options.
{
idx: number or null,
maintenanceMode: MAINTENANCE_MODE
}
Field Name | Type | Description |
---|---|---|
idx | number or null | The index from which to start exporting secrets. If set to null or 0 , the secrets are exported starting from the beginning. |
maintenanceMode | A Maintenance Mode value. | Defines whether or not to switch the event store to maintenance mode during the export. |
Resultā
A readable stream object. Secrets are read as single-row JSON data structures separate with the newline character ('\n'
). The JSON structures include the following fields:
Field Name | Description |
---|---|
secret | The secret value. |
id | The secret's unique identifier. |
idx | An index value that is incremented for each subsequent secret in the store. |
Related Typesā
Maintenance Modeā
A maintenance mode option value defines whether or not to switch the event store to maintenance mode during an import and/or export operation. You can specify this option for the following operations:
The @resolve-js/eventstore-base
package exports constants that are the allowed values for the maintenance mode option:
import {
MAINTENANCE_MODE_MANUAL,
MAINTENANCE_MODE_AUTO,
} from '@resolve-js/eventstore-base'
These values define the following behavior.
On Export:
MAINTENANCE_MODE_AUTO
specifies that the operation shouldfreeze
the event store at the start andunfreeze
it at the end of the export process.MAINTENANCE_MODE_MANUAL
specifies that the operation should not do any implicit actions.
On Import:
MAINTENANCE_MODE_AUTO
- the same as on export, but also specifies that the events/secrets database table should be re-created from scratch.MAINTENANCE_MODE_MANUAL
specifies that the operation should not do any implicit actions.
Event Filterā
An event filter object is a parameter for the loadEvents
function that describes criteria used to filter the loaded events. It can contain the following fields:
limit (required)
ā
Maximum number of events to retrieve in one call.
cursor
ā
The value that specifies an internal position within the event store. loadEvents
returns events starting with this position. Cursors can be obtained from the previous loadEvents
or saveEvent
calls.
If this option is set to null
, the cursor in the initial position is used. Even if the cursor is null
, it should be be passed explicitly.
startTime
and finishTime
ā
Specify the inclusive start and end of the time range for which to load events. Specified in milliseconds elapsed since January 1, 1970 00:00:00 UTC. Both values can be omitted so that there is no lower and/or upper bound.
caution
If the startTime
and/or finishTime
options are specified, the cursor
should be omitted, otherwise an error will occur.
aggregateIds
ā
Array of included aggregate IDs.
eventTypes
ā
Array of included event types.
Import Events Streamā
A writable stream object that the importEvents
function returns. This object extends the Node.js fs.WriteStream
with the following properties:
Property Name | Description |
---|---|
byteOffset | A byte offset within the source of event data from which to start reading. The new offset is assigned to the property with each imported event. Use this property to resume an interrupted import process. |
savedEventsCount | The number of saved events. This property is incremented as you write events to the stream. |
Events are written as single-row JSON data structures separate with the newline character ('\n'
).
Export Events Streamā
A readable stream object that the exportEvents
function returns. This object extends the Node.js fs.ReadStream
with the following properties:
Property Name | Description |
---|---|
cursor | A database cursor that indicates the current position within the dataset. The cursor is incremented as you read the stream. |
isEnd | Indicates that all events have been read from the stream. |
Events are read as single-row JSON data structures separate with the newline character ('\n'
).