Looking for a specific post? Checkout the Blog Index.

Apr 19th, 2019 - written by Kimserey with .

In our previous post on mutability, we discussed the fact that time was to be taken into consideration when mutability was involved. We observed objects lifetime as instant by instant, computing and assigning objects states for each instant.
Another way to look at time is to consider the state as a discrete set of values. From this perspective, we can see the time as being an infinite sequence, a *stream*, and the state can also be seen as a stream computed out of time.

Today we will look at how to represent a stream, and in particular how to model an infinite stream together with operation working on the infinite stream. We will also make a connection with stream implementations in common languages.

A stream is an abstract concept representing a sequence of elements over the time. If we think of the delta between each time sample as instant, in other word *zero*, we can implement a stream as a list, where all events in time happen instantly.

But as we know, this isn’t the case. Most of the events don’t occur instantly at the scale where we normally work. Therefore to represent the delta, a stream introduces the technique of *delayed evaluation*. A delayed evaluation, also called lazy evaluation, delays the execution of an expression until needed.

For example, if we want to build a range of integer, we can create `interval`

procedure.

1
2
3
4

(define (interval low high)
(if (> low high)
'()
(cons low (interval (+ low 1) high))))

And executing `interval`

would yield a complete list.

1
2

> (interval 0 10)
'(0 1 2 3 4 5 6 7 8 9 10)

At each recursion, the `cons`

step needs to evaluate the arguments prior being able to apply `cons`

. Therefore the result of `interval`

is the fully computed list with intergers from 0 to 10.

One of the problem of having the list fully computed is if we needed only a subset of the list, we would still need to compute the whole list.

For example, if we need to *take* three elements from the list, we define `take`

, a procedure accepting an interger `n`

and a list.

1
2
3

(define (take n xs)
(cond [(= n 0) '()]
[else (cons (car xs) (take (- n 1) (cdr xs)))]))

Using `interval`

, we would be able to get the three first values of the interval.

1
2

> (take 3 (interval 0 20))
'(0 1 2)

As we saw earlier, `(interval 0 20)`

will need to be evaluated prior executing the filter. We can witness the performance problem by creating an interval for an extremely large range.

1
2

> (take 3 (interval 0 10000000000000000000000))
. Interactions disabled; out of memory

Even though the resulting subset is composed of *only* the three first values of the list, the program is unable to complete due to `interval`

execution provoking an out of memory exception.

A stream solves this problem by introducing delay.

If we think of a list as a pair composed of a an element plus a *rest*, where the rest points to another element, also a pair, composed of a value plus another rest, and so on until the last element being composed of its value plus the empty list, we can represent a list as followed.

1
2

> (cons 0 (cons 1 (cons 2 (cons 3 '()))))
'(0 1 2 3)

Where `car`

provides the first value and `cdr`

the rest.

1
2
3
4

> (car (cons 0 (cons 1 (cons 2 (cons 3 '())))))
0
> (cdr (cons 0 (cons 1 (cons 2 (cons 3 '())))))
'(1 2 3)

A stream is constructed by introducing a delay in execution of the *rest*. The delay can be implemented using a lambda, which would need to be executed on top of being evaluated. The implementation would look as such:

1
2

> (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream)))))))
'(0 . #<procedure>)

Each `cdr`

is represented by a lambda delaying the execution. Therefore each element of the list is provided on demand. And at each iteration, the next value is a delayed `cdr`

which needs to be *forced*.

If we want the first value, we use `car`

.

1
2

> (car (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
0

And if we want the next value, we use `cdr`

, returning a procedure which once executed containing the next value in `car`

position.

1
2

> (cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
#<procedure>

To execute the procedure, we create `force`

, a procedure used to force the execution, and `stream-car`

and `stream-cdr`

, `car`

and `cdr`

used with streams.

1
2
3
4
5
6
7
8
9
10
11

(define (stream-null? xs)
(eq? xs 'empty-stream))
(define (force expr) (expr))
(define (stream-car xs) (car xs))
(define (stream-cdr xs)
(if (stream-null? xs)
'empty-stream
(force (cdr xs))))

`Stream-cdr`

forces the execution at the moment when the `cdr`

is requested. The `'empty-stream`

denotes an empty stream, and therefore is used as the last value to denote the end of a stream.

1
2
3
4
5
6

> (stream-cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream))))))))
'(1 . #<procedure>)
> (stream-cdr(stream-cdr (cons 0 (λ () (cons 1 (λ () (cons 2 (λ () (cons 3 'empty-stream)))))))))
#<procedure:>>
'(2 . #<procedure>)

Going back to our implementation of `interval`

, we can introduce delay by wrapping the *rest* of the list in a lambda.

1
2
3
4

(define (stream-interval low high)
(if (> low high)
'empty-stream
(cons low (λ () (stream-interval (+ low 1) high)))))

When we execute `stream-interval`

, we now get a pair composed of a value and the delayed rest of the sequence.

1
2

> (stream-interval 0 1000000000)
'(0 . #<procedure:...osts/streams.rkt:11:16>)

And using `stream-cdr`

, we can access the next value.

1
2

> (stream-cdr (stream-interval 0 1000000000))
'(1 . #<procedure:...osts/streams.rkt:11:16>)

We can then define `take-stream`

which wraps the `cdr`

of the initial `cons`

in a lambda and uses `stream-cdr`

.

1
2
3
4

(define (take-stream n xs)
(cond [(= n 0) 'empty-stream]
[else (cons (stream-car xs)
(λ () (take-stream (- n 1) (stream-cdr xs))))]))

Using `take-stream`

, we solve the issue by delaying the execution of the next elements. At each execution of `stream-cdr`

, we resolve a new element on demand until the `n`

’s element were `'empty-stream`

is then returned.

1
2
3
4
5
6
7
8
9
10
11
12
13
14

> (take-stream 3 (stream-interval 0 10000))
'(0 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (take-stream 3 (stream-interval 0 10000)))
'(1 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000))))
'(2 . #<procedure:...osts/streams.rkt:29:20>)
> (stream-cdr (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000)))))
'empty-stream
> (stream-cdr (stream-cdr (stream-cdr (stream-cdr (take-stream 3 (stream-interval 0 10000))))))
'empty-stream

Essentially the whole interval no longer needs to be completely evaluated. If we define a `for-each-stream`

1
2
3
4
5
6

(define (for-each-stream proc xs)
(if (stream-null? xs)
'done
(begin
(proc (stream-car xs))
(for-each-stream proc (stream-cdr xs)))))

we can display the stream without incurring any performance issue.

1
2
3
4
5

> (for-each-stream displayln (take-stream 3 (stream-interval 0 100000000)))
0
1
2
'done

The stream is iterated for as many elements as needed by the `take-stream`

filter.

Stream implementation with delay allows us to implement infinite sequences. For example instead of having an interval, we can implement a procedure constructing a sequence starting for a particular integer.

1
2

(define (integers-starting-from n)
(cons n (λ () (integers-starting-from (+ n 1)))))

If the `cdr`

was not delayed, this procedure implementation would recurse infinitely. But because it is delayed, we can use `stream-cdr`

to query the next element as necessary.

1
2
3
4
5

> (integers-starting-from 1)
'(1 . #<procedure:...osts/streams.rkt:40:10>)
> (stream-cdr (integers-starting-from 1))
'(2 . #<procedure:...osts/streams.rkt:40:10>)

In our previous post about Fibonacci, we saw that Fibonacci was defined as $f_{n}=f_{n-1} + f_{n-2}$.

We can write Fibonacci in term of stream

1
2
3

(define (fib-stream a b) (cons a (λ () (fib-stream b (+ a b)))))
(define fibonacci (fib-stream 0 1))

Similarly to `integers-starting-from`

, `fib-stream`

infinitely recurse providing at each iteration the next Fibonacci number. In order to appreciate this implementation, we define `stream-ref`

, a procedure returning an element of the stream at an index `n`

.

1
2
3
4
5

(define (stream-ref n xs)
(cond
[(stream-null? xs) 'empty-stream]
[(= n 0) (car xs)]
[else (stream-ref (- n 1) (stream-cdr xs))]))

We can then use `stream-ref`

to retrieve a specific Fibonacci number from the infinite sequence of Fibonacci number.

1
2
3
4
5

> (stream-ref 10 fibonacci)
55
> (stream-ref 300 fibonacci)
222232244629420445529739893461909967206666939096499764990979600

The concept of a stream is expended to *data streams*. In programming languages, when talking about streams, we usually refer to data being transferred from a system to another.

For example, reading a file in a program would be done via a stream, in dotnet C#, that would be via a `FileStream`

or `StreamReader`

.

1
2
3
4
5
6
7

using (StreamReader stream = File.OpenText("./file.txt"))
{
while (!stream.EndOfStream)
{
Console.WriteLine(stream.ReadLine());
}
}

The `StreamReader`

implementation provides a way to represent a sequence of data composing a file, which can be iterated in multiple stages. With `./file.txt`

, we used the lines as separator and iterated over the whole content line by line. We could have iterated block of bytes by block of bytes which would have been appropriate if there was no clear way of splitting the data.

The same type of data stream is used for communication over transport layer protocols. In applications, we handle all sort of transmissions, with data transmitted over the time, as streams. And we represent packets of data composing the overal message as elements of the stream.

The delay method provides a way to the receiver to read packets of data on demand rather than forcing the whole transmission at a single time.

Today we looked into what streams were. We started by looking at how delay could be introduced and saw how time could be modeled with streams. We then looked into creating infinite streams and looked into one example where it could be used. Lastly we made the link between the concept and the implementation in common programming languages. I hope you liked this post and I see you on the next one!