Friday, December 1, 2023
HomeIOS DevelopmentUnderstanding Swift Concurrency’s AsyncStream – Donny Wals

Understanding Swift Concurrency’s AsyncStream – Donny Wals


In an earlier put up, I wrote about other ways which you can bridge your current asynchronous code over to Swift’s new Concurrency system that leverages async / await. The mechanisms proven there work nice for code the place your code produces a single consequence that may be modeled as a single worth.

Nonetheless in some instances this isn’t attainable as a result of your current code will present a number of values over time. That is the case for issues like obtain progress, the person’s present location, and different comparable conditions.

Typically talking, these sorts of patterns can be modeled as AsyncSequence objects which you can iterate over utilizing an asynchronous for loop. A fundamental instance of this could be the traces property on URL:

let url = URL(string: "https://donnywals.com")!

for strive await line in url.traces {
    // use line
}

However what’s the easiest way to construct your personal async sequences? Implementing the AsyncSequence protocol and constructing your on AsyncIterator sounds tedious and error-prone. Fortunately, there’s no cause so that you can be doing any of that.

On this put up, I’ll present you how one can leverage Swift’s AsyncStream to construct customized async sequences that produce values everytime you want them to.

Producing a easy async stream

An async stream might be produced in numerous methods. The best method to create an async stream is to make use of the AsyncStream(unfolding:) initializer. Its utilization seems to be a bit as follows:

let stream = AsyncStream(unfolding: {
    return Int.random(in: 0..<Int.max)
})

In fact, this instance isn’t notably helpful by itself however it does present how easy the idea of AsyncStream(unfolding:) is. We use this model of AsyncStream each time we will produce and return return values for our async stream. The closure that’s handed to unfolding is async so which means that we will await asynchronous operations from inside our unfolding closure. Your unfolding closure will likely be known as each time you’re anticipated to start producing a price on your stream. In apply which means that your closure will likely be known as, you carry out some work, you come a price after which your closure known as. This repeats till the for loop is cancelled, the duty that comprises your async for loop is cancelled, or till you come nil out of your unfolding closure.

The AsyncStream(unfolding:) method to produce a stream of values is kind of handy however it’s notably helpful in conditions the place:

  • You need to carry out async work that must be awaited to supply parts
  • You could have a have to deal with again stress when bridging an API you personal

If you’re bridging an current API that’s based mostly on delegates or for APIs that leverage callbacks to speak outcomes, you most likely received’t be capable to use AsyncStream(unfolding:). Whereas it’s the only and least error-prone method to construct an async stream, it’s additionally the best way that I’ve discovered to be most limiting and it doesn’t usually match properly with bridging current code over to Swift Concurrency.

Extra flexibility might be discovered within the continuation based mostly API for AsyncStream.

Producing an async stream with a continuation

When an asynchronous closure doesn’t fairly suit your use case for creating your personal async stream, a continuation based mostly strategy is likely to be a significantly better answer for you. With a continuation you’ve got the power to assemble an async stream object and ship values over the async stream each time values turn out to be accessible.

We are able to do that by creating an AsyncStream utilizing the AsyncStream(construct:) initializer:

let stream2 = AsyncStream { cont in
    cont.yield(Int.random(in: 0..<Int.max))
}

The instance above creates an AsyncStream that produces a single integer worth. This worth is produced by calling yield on the continuation. Each time we now have a price to ship, we must always name yield on the continuation with the worth that we need to ship.

If we’re constructing an AsyncStream that wraps a delegate based mostly API, we will maintain on to our continuation within the delegate object and name yield each time a related delegate technique known as.

For instance, we might name continuation.yield from inside a CLLocationManagerDelegate each time a brand new person location is made accessible to us:

class AsyncLocationStream: NSObject, CLLocationManagerDelegate {
    lazy var stream: AsyncStream<CLLocation> = {
        AsyncStream { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
            self.continuation = continuation
        }
    }()
    var continuation: AsyncStream<CLLocation>.Continuation?

    func locationManager(_ supervisor: CLLocationManager, didUpdateLocations places: [CLLocation]) {

        for location in places {
            continuation?.yield(location)
        }
    }
}

The instance above is a really naive place to begin for creating an async stream of person places. There are a few issues we don’t absolutely keep in mind reminiscent of cancelling and beginning location commentary or asking for location permissions.

At its core although, this instance is a good place to begin for experimenting with async streams.

Be aware that this strategy won’t await shoppers of your async stream to eat a price absolutely earlier than you possibly can ship your subsequent worth down the stream. As a substitute, all values that you simply ship will likely be buffered in your async stream by default which can or might not be what you need.

In sensible phrases which means that whenever you ship values down your stream sooner than the consuming for loop can course of these values, you’ll find yourself with a buffer full of values that will likely be delivered to the consuming for loop with a delay. This is likely to be precisely what you want, but when the values you ship are considerably time delicate and ephemeral it might probably make sense to drop values if the consuming for loop isn’t able to obtain values.

We might resolve that we by no means need to maintain on to greater than 1 location and that we solely need to buffer the final identified location to keep away from processing stale knowledge. We are able to do that by setting a buffering coverage on our async stream:

lazy var stream: AsyncStream<CLLocation> = {
    AsyncStream(bufferingPolicy: .bufferingNewest(1)) { (continuation: AsyncStream<CLLocation>.Continuation) -> Void in
        self.continuation = continuation
    }
}()

This code passes a bufferingPolicy of .bufferingNewest(1) to our AsyncStream. Because of this we’ll solely buffer a single worth if the consuming for loop isn’t processing gadgets quick sufficient, and we’ll discard older values in favor of conserving solely the most recent location.

If our stream involves a pure shut, you possibly can name end() in your continuation to finish the stream of values.

In case your stream may fail with an error, you may also select to create an AsyncThrowingStream as an alternative of an AsyncStream. The important thing distinction is that customers of a throwing stream should await new values utilizing strive await as an alternative simply await. To make your stream throw an error you possibly can both name end(throwing:) in your continuation or you possibly can name yield(with:) utilizing a Outcome object that represents a failure.

Whereas the fundamentals of constructing an AsyncStream aren’t notably advanced, we do want to consider how we handle the lifecycles of the issues we create fastidiously. Particularly as a result of we’re not alleged to make our continuations outlive our streams which is an easy mistake to make whenever you’re bridging current delegate based mostly code.

Managing your stream’s lifecycle

There are basically two methods for an async stream to finish. First, the stream may naturally finish producing values as a result of no additional values might be produced. You’ll name end in your continuation and you may present any cleanup that it is advisable to do on the identical time. For instance, you can set the continuation that you simply’re holding on to to nil to ensure you can’t unintentionally use it anymore.

Alternatively, your stream can finish as a result of the duty that’s used to run your async stream is cancelled. Take into account the next:

let places = AsyncLocationStream()

let activity = Job {
    for await location in places.stream {
        print(location)
    }
}

activity.cancel()

When one thing just like the above occurs, we’ll need to ensure that we don’t name yield on our continuation anymore until we begin a brand new stream with a brand new, energetic, continuation.

We are able to detect and reply to the tip of our stream by setting an onTermination handler on our continuation:

self.continuation?.onTermination = { lead to
    print(consequence)
    self.continuation = nil
}

Ideally we set this handler instantly once we first create our async stream.

Along with the stream being cancelled or in any other case going out of scope, we might break out of our loop which can ultimately trigger our activity to complete. That is typically talking not one thing this can finish your async stream so if you’d like breaking out of your loop to finish your stream, you have to to take this into consideration your self.

Personally, I’ve discovered that the simplest method to ensure you do some cleanup is to have some technique in your stream producing object to cancel the stream as an alternative of simply breaking out of an async for loop. That manner, you possibly can carry out cleanup and never have a stream that’s sending values though no person is listening.

It’s additionally necessary to remember that the sample I confirmed earlier will solely work if one shopper makes use of your location stream object. You can not have a number of for loops iterating over a single stream in Swift Concurrency as a result of by default, async sequences lack the power to share their iterations with a number of loops.

In Abstract

On this put up, you discovered rather a lot about async streams and how one can produce your personal async sequences. First, you noticed the unfolding strategy of constructing an async stream and also you discovered that this strategy is comparatively simple however may not be very helpful for those who have to bridge current delegate or callback based mostly APIs.

After exploring unfolding for a bit, we took a take a look at the construct closure for async streams. You discovered that this strategy leverages a continuation object that may be known as to supply values if and when wanted.

You noticed a really rudimentary instance of an object that will bridge a CLLocationManager into async await, and also you discovered a however about appropriately managing your continuations to forestall sending values into an already accomplished stream.

When you’ve got any questions or feedback for me about this put up, please be at liberty to achieve out on Twitter or on Mastodon.



RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments