Stream
The Stream model implements an append-only log of events or changes. It's authoritative and optimistic, making it suitable for scenarios where the history of changes can be used to derive your state. The stream of events is transparent to all clients.
Choose between Atom, Stream, or Projection models based on your application's requirements. Each model has its own strengths and use cases. More information on the models can be found in the Model Comparison section.
Stream Explained
The Stream Model in Krmx State provides a powerful way to manage state derived from a series of events. This model is particularly useful for applications that need to maintain a history of changes or implement event-sourcing patterns.
Core Concepts
At its heart, this model supports a state that is derived from a stream of JSON-serializable events. This approach allows for complex state management with a clear history of how that state evolved over time.
The Stream Model uses the stream/
prefix for its messages, distinguishing it from other models in Krmx State.
When a new client joins, it receives the entire history of events in the stream through the stream/history
message. This ensures that every client has access to the complete state, regardless of when they connected.
Clients have the ability to append new events to the stream using the stream/append
message. This allows for dynamic updates to the shared state from any connected client. However, the server plays a crucial role in maintaining data integrity.
Server's Role and Event Validation
The server acts as a gatekeeper for the stream. When a client appends an event, the server first validates it. Only events that pass this validation are broadcast to all connected clients using the stream/append
message. This process ensures that only valid changes are incorporated into the shared state.
Client-Side State Management
On the client side, each instance maintains the latest derived state based on the stream of events it has received. This allows for quick access to the current state without needing to reprocess the entire event history each time.
Optimistic Updates
To provide a responsive user experience, the Stream Model implements an optimistic update system. When a client sends an event, it immediately adds this event to its local stream, creating an optimistic state. This allows the client to reflect changes instantly, without waiting for server confirmation.
If the server rejects an event (for example, if it fails validation), it sends a stream/invalidate
message back to the client. This allows the client to remove the invalid event from its optimistic stream, ensuring that the local state remains consistent with the server's authoritative state.
This optimistic approach, combined with server validation, strikes a balance between responsiveness and data integrity in your Krmx-powered application.
Getting Started
This section demonstrates how to use the Stream model on both server-side and client-side of a Krmx application.
Shared Code
This model requires some shared code to define a stream model that can be used by your server and clients. After installing Krmx State in your shared code package, by running npm install @krmx/state
, you can create a stream model like this:
// in shared/index.ts
import { z } from 'zod';
import { StreamModel } from '@krmx/state';
export const claimableCounterModel = new StreamModel({ counter: 0, claim: undefined as (string | undefined) });
export const increment = claimableCounterModel.when('increment', z.number(), (state, dispatcher, payload) => {
if (state.claim !== undefined && state.claim !== dispatcher) {
throw new Error('claimed by someone else');
}
state.counter += payload;
});
export const claim = claimableCounterModel.when('claim', z.undefined(), (state, dispatcher) => {
if (state.claim !== undefined) {
throw new Error('already claimed');
}
state.claim = dispatcher;
});
export const release = claimableCounterModel.when('release', z.undefined(), (state, dispatcher) => {
if (state.claim !== dispatcher) {
throw new Error('claimed by someone else');
}
state.claim = undefined;
});
The code above creates and exports an example model named claimableCounterModel
. It includes corresponding event creators to increment
, claim
, and release
a counter. This example shows how you can only let the person with the claim increment the counter. In this case, when the counter is not claimed, anyone can increment the counter.
Server Side
To use the Stream model on the server, you need to register it with your Krmx server instance. After installing the server extension for Krmx state with npm install @krmx/state-server
, this can be done with a single line of code, and you have the option to provide additional configuration if needed. Here's an example of how to set it up:
// in server/index.ts
import { createServer } from '@krmx/server';
import { registerStream } from '@krmx/state-server';
import { claimableCounterModel } from 'file:../shared'; // reference your shared code package here
const server = createServer();
registerStream(
server,
'my-claimable-counter',
claimableCounterModel,
{ optimisticSeconds: 10 }
); // <-- this line
Client Side (React only)
For client-side usage in React applications, you'll need to register the Stream model with your Krmx client. This process is similar to the server-side setup, first you install the React client extension for Krmx state with npm install @krmx/state-client-react
, then all that is required is one line of code. Here's how to set it up:
// in krmx.ts
import { createClient } from '@krmx/client-react';
import { registerStream } from '@krmx/state-client-react';
import { claimableCounterModel } from 'file:../shared'; // reference your shared code package here
export const { client, useClient } = createClient();
export const { use: useCounter, dispatch: dispatchCounter } = registerStream(
client,
'my-claimable-counter',
claimableCounterModel,
{ optimisticSeconds: 10 }
); // <-- this line!
Once registered on both server and client, you can use the Stream model in your React components. The following example demonstrates how to use the claimable counter model in a React component:
// in my-component.tsx
import { useCounter, dispatchCounter } from './krmx.ts'
import { increment, claim, release } from 'file:../shared'; // reference your shared code package here
export const MyComponent = () => {
const state = useCounter();
return <div>
<button onClick={() => dispatchCounter(increment(3))}>
{state.counter}
</button>
<p>Claimed by {state.claim ?? 'no one'}</p>
<button onClick={() => dispatchCounter(claim())}>
Claim
</button>
<button onClick={() => dispatchCounter(release())}>
Release
</button>
</div>;
};
In this setup, all clients connected to the server will receive updates to the Stream model automatically. This ensures that your application's state remains synchronized across all connected clients.
Using
dispatchCounter(increment(3))
vsclient.send(increment(3))
. Both would work, however by using the dispatchCounter you ensure that you benefit from optimistic updates on this client while the server is still processing the validity of the event. If the server would invalidate the event the optimistic update is automatically pruned from the optimistic state.
More Information
Take a look at the TypeScript SDK reference and source code (opens in a new tab) to learn more about the Stream model.
An example can be found in the Krmx Starter (opens in a new tab) in the Example Alphabet (opens in a new tab).