How to: GraphQL subscriptions

This post describes implementing GraphQL subscriptions over WebSockets using the GraphQL WS package by The Guild.

How to: GraphQL subscriptions
Photo by NASA / Unsplash

At MakerX we often use GraphQL-based APIs because they provide a nicely modelled and decoupled API surface area and a really great consumption experience.

GraphQL defines 3 distinct types of operations:

  • query: fetch data
  • mutation: change data
  • subscription: subscribe to streams of data

Subscriptions are a niche use-case when compared with standard HTTP request/response query and mutation operations. It's quite rare to see subscriptions implemented, but they fulfil an important role to provide data in near-real-time (when it matters).

We recently found a good use-case for subscriptions: surfacing event data received from server-side callbacks. We initially wrote our SPA web client to poll for the expected data.

Polling is OK, but it is always a compromise between efficiency vs responsiveness (how fast do you poll?). We decided to add subscription support for API clients that prefer it over polling.

This post will walk you through each part of implementing subscriptions.

💡
An intermediate level of GraphQL knowledge is assumed in this post

The GraphQL WS package represents the bulk of the implementation and it works across many types of GraphQL servers and event sources. So if your tech stack is different to ours, this post will still be useful, read on.

In our case, we used the following tech stack:

  • Apollo Server & Express for running the API and HTTP endpoints
  • Redis for pub/sub of event data
  • Authentication via JWT verification, authorization via GraphQL Shield
  • Code written in Typescript, running on NodeJS
  • Deployed to Azure App Service (supports WebSockets)

Architecture

Why did we need subscriptions, why do we need a pub/sub provider?

If you're not interested in the why, skip straight to the how, below.

Why subscriptions?

Our application issues Verifiable Credentials using the Microsoft Verified ID service.

The sequence diagram below shows that our web app (SPA) needs to display a QR code to start issuance, display a PIN code when issuance has started, then finally navigate to the next screen once issuance is complete.

Two of the web app screen changes are triggered by MS Authenticator calling the MS Verified ID Service which then calls our API. This is where pushing events from the API to the client, via a subscription, is used to trigger the screen change.

The web app:

  • Creates the issuance request by calling a mutation
  • Displays a QR code (contained in the issuance request response)
  • Subscribes to issuance events (specific to this issuance request)
  • When the issuance started event is received, displays a PIN code for issuance completion
  • When the issuance completed event is received, navigates to 'issuance complete thanks' screen

Why pub/sub?

Why might we need a pub/sub provider, like Redis? Because it offers an easy-to-use and efficient* architecture for broadcasting events to subscribers.

Why do we need to broadcast events at all? Consider the architecture...

A logical representation of the flow of event data received from an external service via a callback endpoint (or web hook).

If we only had one API server instance running, we wouldn't need a pub/sub provider. We could generate an event for every connected client when the callback endpoint is invoked.

In reality, we have multiple API server instances and the flow of event data looks more like this:

A more accurate representation showing multiple clients connected to multiple API server instances (through a load balancer). The external service invokes the callback endpoint on just one of those server instances (through a load balancer).

How do we distribute data from the callback received on one server to clients subscribed through any server? That's where the pub/sub provider comes in:

All the things are joined together ✅

*The pub/sub architecture is efficient because:

  • If there are no subscribed clients and we publish an event, nothing happens.
  • Subscribed clients might only be interested in certain types of events with specific criteria. With pub/sub providers like Redis, we can optimise publishing and subscribing through careful use of channels (for publishing) and patterns (for subscribing).

Code

Finally, after scrolling all day, some code...

Approach

Our goal was to make the approach for coding subscriptions consistent with queries and mutations.

By that I mean ensuring authn, authz, context, logging and instrumentation, data access etc was handled consistently.

Subscriptions are distinct from queries and mutations for a few reasons:

  • For a standard query or mutation request, we use Express middleware for authentication. A WebSocket connection is not handled by Express.
  • The existing GraphQL context function, providing resolver support for queries and mutations with auth context, services, data loaders etc, is not called for subscription resolvers. This is because the existing context function is called by Apollo Server, which itself is called via Express middleware.
  • There is no HTTP request to gather info from. However, there is a request included with the initial connection, see the context code example below.

It was easy to work through these issues, the following steps describe how:

Install dependencies

npm i graphql-ws ws

Create your subscriptions server

The useServer(options, wsServer) function from graphql-ws sets up the server. We'll build this out in subsequent steps. You can read about the options here.

import { GraphQLSchema } from 'graphql'
import { useServer } from 'graphql-ws/lib/use/ws'
import type { Server } from 'http'
import { WebSocketServer } from 'ws'

export function createSubscriptionServer({
  schema,
  httpServer,
  path = '/graphql',
}: {
  schema: GraphQLSchema
  httpServer: Server
  path?: string
}) {
  const wsServer = new WebSocketServer({
    server: httpServer,
    path,
  })

  return useServer(
    {
      schema,
      onConnect: () => {
        // authn, return true or false to allow or deny connection
        // log connection
      },
      onDisconnect() {
        // log disconnect
      },
      context: () => {
        // return a context
      },
      onOperation() {
        // log operation
      },
      onNext() {
        // log data returned
      },
      onError() {
        // log errors
      },
    },
    wsServer,
  )
}

Configure authentication

The existing authentication on our API verifies and decodes an HTTP Authentication Bearer token using the the jsonwebtoken lib.

We can do the same thing for subscriptions via the onConnect callback; but instead of consuming the HTTP Authorization header, we will extract the token from connectionParams supplied by the client (example).

Once we've validated and decoded the JWT, we'll add the JwtPayload to the server ctx.extra field for subsequent callbacks to use.

onConnect: async (ctx) => {
  const token = extractTokenFromConnectionParams(ctx.connectionParams)
  if (!token) return false
  try {
    const user = await verifyJwt(token)
    ctx.extra.user = user
    return true
  } catch (err) {
    return false
  }
}

Create GraphQL context

Building a consistent context for subscriptions is important so that our resolvers for types and fields can remain consistent across subscriptions, queries and mutations.

In our case, we generally load up our context with the current user, a logger which logs some useful request metadata (e.g. the request origin), services, data access repos, data-loader instances.

Here's a simplified example which uses the context callback input to create a GraphQL context, including the user JwtPayload from the onConnect step, plus some extra info, which will be supplied to resolvers.

context: ({ extra: { request, user } }) => ({
  user,
  requestInfo: {
    origin: request.headers.origin,
    clientIp: request.headers['x-forwarded-for'] ?? request.socket.remoteAddress,
  },
}),

This example is not using the second and third context args, they're also available in the onOperation callback, where we can log subscription operation info using those same args.

Note: in our full implementation, we define a type for our GraphQL context to ensure it remains the same for both subscriptions non-subscription context creation. We use a single context creation function for both HTTP and WebSocket entry points, with a little bit of mapping code in between. We recommend taking that approach to ensure consistency.

Configure pub/sub engine

For local development, we use an in-memory (non-production) engine from the graphql-subscriptions lib. When deployed to Azure, we use the Redis engine from the graphql-redis-subscriptions lib.

import { RedisPubSub } from 'graphql-redis-subscriptions'
import { PubSub } from 'graphql-subscriptions'

export const pubsub = isLocalDev ? new PubSub() : new RedisPubSub({ connection: `rediss://:${redisKey}@${redisHost}:6380` })
rediss:// is the TLS protocol, 6380 is the standard Azure port

Publish an event, subscribe to events

In Redis, 'messages' are published to a 'channel'. Pattern matching can be used when subscribing, to allow for more specific subscriptions. Pattern matching doesn't have to be used, event data can be filtered at the resolver.

By default, objects are JSON serialized and deserialized automatically.

In the example below, requestId forms part of the channel, meaning subscribers will only see events for one issuance operation.

const ISSUANCE_CHANNEL = 'issuance'

export function publishIssuanceEvent(data: IssuanceEventData) {
  pubsub.publish(`${ISSUANCE_CHANNEL}.${data.requestId}`, data)
}

export const subscribeToIssuanceEvents = (args: SubscriptionIssuanceEventArgs) => pubsub.asyncIterator<IssuanceEventData>(`${ISSUANCE_CHANNEL}.${args.requestId}`)

If you wanted to subscribe to any issuance event, you could use pattern matching:

pubsub.asyncIterator<IssuanceEventData>(`${ISSUANCE_CHANNEL}.*`, { pattern: true })

Resolve the subscription

A subscription resolver has two members:

  • subscribe takes the resolver args and returns an AsyncIterable<T>
  • resolve (optional) supports mapping from T in AsyncIterable<T> into whatever you need to return from the subscription
export const resolvers = {
  Subscription: {
    issuanceEvent: {
      subscribe: (parent, args) => subscribeToIssuanceEvents(args),
      resolve: (issuanceEventData, args, context) => context.issuanceRepo.findByRequestId(issuanceEventData.requestId)
    },
  },
};

Filtering event data

If you need to filter the event data, for example using arguments to your subscription operation, the withFilter helper makes it easy to pair your subscribe function with a filter:

import { withFilter } from 'graphql-subscriptions'

export const resolvers = {
  Subscription: {
    issuanceEvent: {
      subscribe: withFilter(
        (parent, args) => subscribeToIssuanceEvents(args),
        (issuanceEventData, args) => {
          if (args.status && args.status !== issuanceEventData.status) return false
          return true
        },
      ),
      resolve: (issuanceEventData, args, context) => context.issuanceRepo.findByRequestId(issuanceEventData.requestId)
    },
  },
};

Logging

If you wish to log from the various WebSocket server callbacks, here's a simplified example which logs some useful info:

import { print } from 'graphql'

onDisconnect(ctx) {
  // authn here
  console.info('Subscription connection established', { user })
},
onDisconnect({ extra: { user } }) {
  console.info('Subscription connection disconnected', { user })
},
onOperation({ extra: { user } }, _message, { operationName, variableValues, document }, _result) {
  console.info('GraphQL subscription operation', { operationName, query: print(document), variableValues, user })
},
onNext({ extra: { user } }, _message, { operationName, variableValues, document }, result) {
  console.info('GraphQL subscription data', { operationName, query: print(document), variableValues, user, result })
},
onError(_ctx, message, errors) {
  console.error('GraphQL subscription server error', { message, errors })
},

MakerX open source libraries

We've implemented the approach described above in our @makerx/graphql-core lib, which you are welcome to use, along with others we use for our GraphQL APIs such as @makerx/graphql-apollo-server.

Further reading

Use the following references to build your own solution.

Apollo

The Apollo subscriptions documentation is excellent. It includes:

GraphQL WS

'The Guild' documentation is comprehensive, take a look at:

GraphQL Subscriptions lib

Makes it easy to implement subscriptions across a variety of out-of-the-box event data sources, or to write your own PubSubEngine implementation.

Redis

We use Redis for caching, including server-side caching, as well as a pub/sub provider. It is a mature product with comprehensive documentation and examples:

Delivery semantics

Please note that the described implementation of GraphQL subscriptions using Redis pub/sub will give you at-most-once delivery.

If you're looking for at-least-once or exactly-once delivery, carefully consider whether GraphQL subscriptions is the best fit 🕵🏻. You'd need a durable subscription provider e.g. Redis Streams, Kafka, Rabbit MQ, AWS SNS+SQS.

GraphQL Yoga

GraphQL Yoga supports an alternative subscription implementation over HTTP Server Sent Events (SSE) in addition to WebSockets using graphql-ws.

GraphQL Shield

We love using GraphQL shield for authorization because it provides a single, clear, declarative view of rules applied across the schema.

However, rules are not currently applied against the subscribe operations, due to a bug in graphql-middleware which only applies rules to resolve operations. We used a patch (written by someone else) to solve this. We raised a PR based on the patch, which hopefully gets merged at some point.

MakerX

I'm proud to say I work for MakerX. We have extensive experience developing solutions using GraphQL. If you have an idea that you need help with, talk to us!