How to: GraphQL subscriptions
At MakerX we often use GraphQL-based APIs because they provide a nicely modelled and decoupled API surface area and a really great consumption experience.
GraphQL defines 3 distinct types of operations:
- query: fetch data
- mutation: change data
- subscription: subscribe to streams of data
Subscriptions are a niche use-case when compared with standard HTTP request/response query and mutation operations. It's quite rare to see subscriptions implemented, but they fulfil an important role to provide data in near-real-time (when it matters).
We recently found a good use-case for subscriptions: surfacing event data received from server-side callbacks. We initially wrote our SPA web client to poll for the expected data.
Polling is OK, but it is always a compromise between efficiency vs responsiveness (how fast do you poll?). We decided to add subscription support for API clients that prefer it over polling.
This post will walk you through each part of implementing subscriptions.
The GraphQL WS package represents the bulk of the implementation and it works across many types of GraphQL servers and event sources. So if your tech stack is different to ours, this post will still be useful, read on.
In our case, we used the following tech stack:
- Apollo Server & Express for running the API and HTTP endpoints
- Redis for pub/sub of event data
- Authentication via JWT verification, authorization via GraphQL Shield
- Code written in Typescript, running on NodeJS
- Deployed to Azure App Service (supports WebSockets)
Architecture
Why did we need subscriptions, why do we need a pub/sub provider?
If you're not interested in the why, skip straight to the how, below.
Why subscriptions?
Our application issues Verifiable Credentials using the Microsoft Verified ID service.
The sequence diagram below shows that our web app (SPA) needs to display a QR code to start issuance, display a PIN code when issuance has started, then finally navigate to the next screen once issuance is complete.
Two of the web app screen changes are triggered by MS Authenticator calling the MS Verified ID Service which then calls our API. This is where pushing events from the API to the client, via a subscription, is used to trigger the screen change.
The web app:
- Creates the issuance request by calling a mutation
- Displays a QR code (contained in the issuance request response)
- Subscribes to issuance events (specific to this issuance request)
- When the issuance started event is received, displays a PIN code for issuance completion
- When the issuance completed event is received, navigates to 'issuance complete thanks' screen
Why pub/sub?
Why might we need a pub/sub provider, like Redis? Because it offers an easy-to-use and efficient* architecture for broadcasting events to subscribers.
Why do we need to broadcast events at all? Consider the architecture...
If we only had one API server instance running, we wouldn't need a pub/sub provider. We could generate an event for every connected client when the callback endpoint is invoked.
In reality, we have multiple API server instances and the flow of event data looks more like this:
How do we distribute data from the callback received on one server to clients subscribed through any server? That's where the pub/sub provider comes in:
*The pub/sub architecture is efficient because:
- If there are no subscribed clients and we publish an event, nothing happens.
- Subscribed clients might only be interested in certain types of events with specific criteria. With pub/sub providers like Redis, we can optimise publishing and subscribing through careful use of channels (for publishing) and patterns (for subscribing).
Code
Finally, after scrolling all day, some code...
Approach
Our goal was to make the approach for coding subscriptions consistent with queries and mutations.
By that I mean ensuring authn, authz, context, logging and instrumentation, data access etc was handled consistently.
Subscriptions are distinct from queries and mutations for a few reasons:
- For a standard query or mutation request, we use Express middleware for authentication. A WebSocket connection is not handled by Express.
- The existing GraphQL context function, providing resolver support for queries and mutations with auth context, services, data loaders etc, is not called for subscription resolvers. This is because the existing context function is called by Apollo Server, which itself is called via Express middleware.
- There is no HTTP request to gather info from. However, there is a request included with the initial connection, see the
context
code example below.
It was easy to work through these issues, the following steps describe how:
Install dependencies
npm i graphql-ws ws
Create your subscriptions server
The useServer(options, wsServer)
function from graphql-ws
sets up the server. We'll build this out in subsequent steps. You can read about the options here.
import { GraphQLSchema } from 'graphql'
import { useServer } from 'graphql-ws/lib/use/ws'
import type { Server } from 'http'
import { WebSocketServer } from 'ws'
export function createSubscriptionServer({
schema,
httpServer,
path = '/graphql',
}: {
schema: GraphQLSchema
httpServer: Server
path?: string
}) {
const wsServer = new WebSocketServer({
server: httpServer,
path,
})
return useServer(
{
schema,
onConnect: () => {
// authn, return true or false to allow or deny connection
// log connection
},
onDisconnect() {
// log disconnect
},
context: () => {
// return a context
},
onOperation() {
// log operation
},
onNext() {
// log data returned
},
onError() {
// log errors
},
},
wsServer,
)
}
Configure authentication
The existing authentication on our API verifies and decodes an HTTP Authentication Bearer token using the the jsonwebtoken lib.
We can do the same thing for subscriptions via the onConnect callback; but instead of consuming the HTTP Authorization
header, we will extract the token from connectionParams supplied by the client (example).
Once we've validated and decoded the JWT, we'll add the JwtPayload
to the server ctx.extra field for subsequent callbacks to use.
onConnect: async (ctx) => {
const token = extractTokenFromConnectionParams(ctx.connectionParams)
if (!token) return false
try {
const user = await verifyJwt(token)
ctx.extra.user = user
return true
} catch (err) {
return false
}
}
Create GraphQL context
Building a consistent context for subscriptions is important so that our resolvers for types and fields can remain consistent across subscriptions, queries and mutations.
In our case, we generally load up our context with the current user, a logger which logs some useful request metadata (e.g. the request origin), services, data access repos, data-loader instances.
Here's a simplified example which uses the context callback input to create a GraphQL context, including the user JwtPayload
from the onConnect
step, plus some extra info, which will be supplied to resolvers.
context: ({ extra: { request, user } }) => ({
user,
requestInfo: {
origin: request.headers.origin,
clientIp: request.headers['x-forwarded-for'] ?? request.socket.remoteAddress,
},
}),
This example is not using the second and third context
args, they're also available in the onOperation
callback, where we can log subscription operation info using those same args.
Note: in our full implementation, we define a type for our GraphQL context to ensure it remains the same for both subscriptions non-subscription context creation. We use a single context creation function for both HTTP and WebSocket entry points, with a little bit of mapping code in between. We recommend taking that approach to ensure consistency.
Configure pub/sub engine
For local development, we use an in-memory (non-production) engine from the graphql-subscriptions lib. When deployed to Azure, we use the Redis engine from the graphql-redis-subscriptions lib.
Publish an event, subscribe to events
In Redis, 'messages' are published to a 'channel'. Pattern matching can be used when subscribing, to allow for more specific subscriptions. Pattern matching doesn't have to be used, event data can be filtered at the resolver.
By default, objects are JSON serialized and deserialized automatically.
In the example below, requestId
forms part of the channel, meaning subscribers will only see events for one issuance operation.
const ISSUANCE_CHANNEL = 'issuance'
export function publishIssuanceEvent(data: IssuanceEventData) {
pubsub.publish(`${ISSUANCE_CHANNEL}.${data.requestId}`, data)
}
export const subscribeToIssuanceEvents = (args: SubscriptionIssuanceEventArgs) => pubsub.asyncIterator<IssuanceEventData>(`${ISSUANCE_CHANNEL}.${args.requestId}`)
If you wanted to subscribe to any issuance event, you could use pattern matching:
pubsub.asyncIterator<IssuanceEventData>(`${ISSUANCE_CHANNEL}.*`, { pattern: true })
Resolve the subscription
A subscription resolver has two members:
subscribe
takes the resolver args and returns anAsyncIterable<T>
resolve
(optional) supports mapping fromT
inAsyncIterable<T>
into whatever you need to return from the subscription
export const resolvers = {
Subscription: {
issuanceEvent: {
subscribe: (parent, args) => subscribeToIssuanceEvents(args),
resolve: (issuanceEventData, args, context) => context.issuanceRepo.findByRequestId(issuanceEventData.requestId)
},
},
};
Filtering event data
If you need to filter the event data, for example using arguments to your subscription operation, the withFilter
helper makes it easy to pair your subscribe
function with a filter:
import { withFilter } from 'graphql-subscriptions'
export const resolvers = {
Subscription: {
issuanceEvent: {
subscribe: withFilter(
(parent, args) => subscribeToIssuanceEvents(args),
(issuanceEventData, args) => {
if (args.status && args.status !== issuanceEventData.status) return false
return true
},
),
resolve: (issuanceEventData, args, context) => context.issuanceRepo.findByRequestId(issuanceEventData.requestId)
},
},
};
Logging
If you wish to log from the various WebSocket server callbacks, here's a simplified example which logs some useful info:
import { print } from 'graphql'
onDisconnect(ctx) {
// authn here
console.info('Subscription connection established', { user })
},
onDisconnect({ extra: { user } }) {
console.info('Subscription connection disconnected', { user })
},
onOperation({ extra: { user } }, _message, { operationName, variableValues, document }, _result) {
console.info('GraphQL subscription operation', { operationName, query: print(document), variableValues, user })
},
onNext({ extra: { user } }, _message, { operationName, variableValues, document }, result) {
console.info('GraphQL subscription data', { operationName, query: print(document), variableValues, user, result })
},
onError(_ctx, message, errors) {
console.error('GraphQL subscription server error', { message, errors })
},
MakerX open source libraries
We've implemented the approach described above in our @makerx/graphql-core lib, which you are welcome to use, along with others we use for our GraphQL APIs such as @makerx/graphql-apollo-server.
Further reading
Use the following references to build your own solution.
Apollo
The Apollo subscriptions documentation is excellent. It includes:
GraphQL WS
'The Guild' documentation is comprehensive, take a look at:
GraphQL Subscriptions lib
Makes it easy to implement subscriptions across a variety of out-of-the-box event data sources, or to write your own PubSubEngine implementation.
Redis
We use Redis for caching, including server-side caching, as well as a pub/sub provider. It is a mature product with comprehensive documentation and examples:
Delivery semantics
Please note that the described implementation of GraphQL subscriptions using Redis pub/sub will give you at-most-once delivery.
If you're looking for at-least-once or exactly-once delivery, carefully consider whether GraphQL subscriptions is the best fit 🕵🏻. You'd need a durable subscription provider e.g. Redis Streams, Kafka, Rabbit MQ, AWS SNS+SQS.
GraphQL Yoga
GraphQL Yoga supports an alternative subscription implementation over HTTP Server Sent Events (SSE) in addition to WebSockets using graphql-ws
.
GraphQL Shield
We love using GraphQL shield for authorization because it provides a single, clear, declarative view of rules applied across the schema.
However, rules are not currently applied against the subscribe operations, due to a bug in graphql-middleware which only applies rules to resolve operations. We used a patch (written by someone else) to solve this. We raised a PR based on the patch, which hopefully gets merged at some point.
MakerX
I'm proud to say I work for MakerX. We have extensive experience developing solutions using GraphQL. If you have an idea that you need help with, talk to us!