Preface

Concurrent programming is crucial in modern software development for improved performance. Threads, the event loop, and the actor model are the most common approaches to concurrent programming.

Developing correct, efficient, and fault-tolerant concurrent applications is challenging. It involves managing complexities such as error handling, timeouts, retries, race conditions, deadlocks, etc. Mastery of these aspects is a key skill that distinguishes experienced developers from novices.

The Event Loop

JavaScript is single-threaded. Which avoid a lot of complexity and pitfall of multi-threading programming.

The event loop allows JavaScript to handle multiple IO operations asynchronously without blocking the main execution thread.

The only caveat of this approch is that you should never block the main thread. No compute extensive work or blocking IO calls. Instead, offload heavy computations to other threads (workers).

Basic Flow of the Event Loop

Synchronous code executes first, and is placed on the call stack.

Asynchronous tasks (e.g., setTimeout, I/O operations) are handed off to the browser's background processes or to libraries like libuv (Node.js) or tokio (Deno).

Once the asynchronous task is complete, its callback function is placed in the callback queue.

The event loop checks if the call stack is empty. If it is, it will pull the next task from the callback queue and place it on the call stack for execution.

Callback

Callbacks are a fundamental concept in asynchronous programming. In JavaScript callbacks are functions that are passed as arguments to other functions and are executed at a later time, typically when a certain event occurs or a task completes.

setTimeout(() => console.log('boom'), 2000)

In most cases, data are passed to the callback function. In nodejs, as a convention, error and result are passed to the callback.

> fs.readdir('/', (err, files) => console.log(files))
undefined
> [
  '.VolumeIcon.icns', '.file',
  '.vol',             'Applications',
  'Library',          'System',
  'Users',            'Volumes',
  'bin',              'cores',
  'dev',              'etc',
  'home',             'opt',
  'private',          'sbin',
  'tmp',              'usr',
  'var'
]

In the browser callback are usually used to listen for DOM events or user interactions:

document.body.onclick = ev => consonsole.log(ev)

// or simply
document.body.onclick = consonsole.log

This is a problem with code above. That is what if the onclick propertie get orverwirtten by another line of code in the code base? Well will get back to this in the next chapter.

Sometimes callbacks can also be synchronous. That is it get called immediately(in the same event loop) after being passed to other functions.

const print = (amount, formatFn) => console.log(`amount is ${formatFn(amount)}`)

print(1/3, amount => amount.toFixed(2))

Callbacks are the fundation of other higher-level asynchronous primitives. We will introduce them in the following chapters.

Event

One problem of callbacks is that is a one to one pattern, it gets a little complicated when multiple participants want to get called. But that's still possible:

const listenForClick = (element, callback) => {
  const onclick = element.onclick
  element.onclick = ev => {
    onclick?.(ev) // or: onclick && onclick(ev)
    callback(ev)
  }
}

Everytime one needs to listen for the click event, call this function

listenForClick(document.body, ev => console.log(ev))

What a brilliant solution!

But there can be better ones, like event:

document.body.addEventListener('click', ev => console.log(ev))

Here the event is 'click', the listener is the function ev => console.log(ev).

You can add multiple listeners, there won't be any conflict like callbacks had.

And event better, you can remove them later.

const log = ev => console.log(ev)
document.body.addEventListener('click', log)
// to remove
document.body.removeEventListener('click', log)

implement event dispatcher using callbacks

type Listener<T> = (data?: T) => void

class EventEmitter<T> {
    private listeners: { [event: string]: Array<Listener<T>> } = {}

    on(event: string, listener: Listener<T>) {
        if (!this.listeners[event]) {
            this.listeners[event] = []
        }
        this.listeners[event].push(listener)
    }

    off(event: string, listener: Listener<T>) {
        if (this.listeners[event]) {
            this.listeners[event] = this.listeners[event].filter(x => x !== listener)
        }
    }

    emit(event: string, data?: T) {
        this.listeners[event]?.forEach(listener => listener(data))
    }
}

Promise

Promises, together with async/await, have become the go-to tools for handling concurrency in modern JavaScript development.

basics

Promise can be in one of three distinct states

  • pending: The operation is still ongoing,
  • fulfilled (resolved): The operation has completed successfully.
  • rejected: The operation failed.

consuming a Promise

promise.then(value => console.log(value), err => console.error(err))
const main = async () => {
  try {
    console.log(await promise)
  } catch (err) {
    console.error(err)
  }
}
const inc = x => x + 1

promise.then(inc).then(inc).then(console.log)

creating a Promise

an async function allways returns a Promise

const asyncValue = async <T>(val: T): Promise<T> => val

asyncValue(1).then(console.log)

creating a Promise using the constructor

const delay = (time: number): Promise<void> => {
  return new Promise((resolve, reject) => setTimeout(() => resolve(), time))
}

await delay(1000)
// 1 seconds later

a simplified Promise implementation

type Callback<T=any> = (value: T) => any

function isPromsie(value: any): boolean {
  return typeof value?.then === 'function'
}

export class SimplePromise<T> {

  state: 'pending' | 'fulfilled' | 'rejected' = 'pending'
  value: T
  reason: any
  onFulfilled: Array<Callback<T>> = []
  onRejected: Array<Callback> = []

  constructor(executor: (resolve: Callback<T>, reject: Callback) => void) {
    const resolve = (value: T) => {
      if (this.state === 'pending') {
        this.state = 'fulfilled'
        this.value = value
        this.onFulfilled.forEach(cb => cb(value))
      }
    }

    const reject = (reason: any) => {
      if (this.state === 'pending') {
        this.state = 'rejected'
        this.reason = reason
        this.onRejected.forEach(cb => cb(reason))
      }
    }

    try {
      executor(resolve, reject);
    } catch (error) {
      reject(error)
    }
  }

  then(onFulfilled?: Callback<T>, onRejected?: Callback) {
    return new SimplePromise((resolve, reject) => {
      const handleFulfilled = (value: T) => {
        if (onFulfilled) {
          try {
            const result = onFulfilled(value)
            isPromsie(result) ? result.then(resolve, reject) : resolve(result)
          } catch (error) {
            reject(error)
          }
        } else {
          resolve(value)
        }
      }

      const handleRejected = (reason: any) => {
        if (onRejected) {
          try {
            const result = onRejected(reason)
            isPromsie(result) ? result.then(resolve, reject) : resolve(result)
          } catch (error) {
            reject(error)
          }
        } else {
          reject(reason)
        }
      }

      if (this.state === 'fulfilled') {
        handleFulfilled(this.value)
      } else if (this.state === 'rejected') {
        handleRejected(this.reason)
      } else if (this.state === 'pending') {
        this.onFulfilled.push(handleFulfilled)
        this.onRejected.push(handleRejected)
      }
    })
  }

  catch(onRejected: Callback) {
    return this.then(undefined, onRejected)
  }
}

Coroutine

Coroutines are implemented using generator functions. A generator function allows you to pause the execution of a function at any point and later resume it from where it left off. This is done using the yield keyword.

function* myGenerator() {
  console.log("Start")
  yield  // Pause till next() is called
  console.log("Resumed")
  yield  // Pause again
  console.log("End")
}

const co = myGenerator()
co.next() // Start
co.next() // Resumed
co.next() // End

receive values from generator functions

function* myGenerator() {
  yield 1
  yield 2
  yield 3
}

const co = myGenerator()
console.log(co.next())
console.log(co.next())
console.log(co.next())
console.log(co.next())

Expected output:

{ value: 1, done: false }
{ value: 2, done: false }
{ value: 3, done: false }
{ value: undefined, done: true }

send values to generator functions

function* myGenerator() {
  console.log(yield 1)
  console.log(yield 2)
}

const co = myGenerator()
const val1 = co.next().value
const val2 = co.next(val1 * 2).value
co.next(val2 * 2)

Expected output:

2
4

what about yielding functions

function* myGenerator() {
  yield (cb) => setTimeout(cb, 1000)
  console.log('done')
}

const co = myGenerator()
const fn = co.next().value
fn(() => co.next())

We just eliminated callback hell!

make it more useful

A simple coroutine runner:

function run(generator) {
  const co = generator()

  function next(value) {
    const { value: fn, done } = co.next(value)
    done || fn((err, result) => next([err, result]))
  }

  next()
}

We can use it like this:

const delay = time => cb =>
  setTimeout(cb, time)

const asyncDouble = value => cb =>
  setTimeout(() => cb(null, value * 2), 1000)

function* myGenerator() {
  yield delay(1000)
  console.log('1 seconds passed')
  const [_, value] = yield asyncDouble(2)
  console.log(value)
}

run(myGenerator)

you can do more

What about yielding multiple functions?

What about yielding a nother generator?

What about yielding promises?

That is just async/await:

async function fetchData() {
  const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto')
  const data = await response.json()
  console.log(data)
}

AbortSignal

An async operation should be abortable, in golang, this is achived by using context.Context. In JavaScript, this can be achived by AbortSignal.

Abort on timeout

fetch(url, {signal: AbortSignal.timeout(1000)})

Support abort signal in you function:

function myOp({signal}) {
  return new Promise((resolve, reject) => {
    if (signal.aborted) {
      reject(signal.reason)
    }
    signal.addEventListener('abort', () => reject(signal.reason))
  })
}
myOp({signal: AbortSignal.timeout(1000)})

Explicit Abortion

AbortController allows you to abort explicitly

const controller = new AbortController()

fetch('https://githbub.com', {signal: controller.signal}).catch(console.log)

controller.abort()

Combining multiple sigals

like Promise.race()

fetch(url, {signal: AbortSignal.any([signal, signal2])})

Creating a aborted signal

like Promise.reject()

AbortSignal.abort(reason)

Concurrency Patterns

waiting for multiple Promises to settle

all settled:

await Promise.allSettled([p1, p2, p3])
console.log(await Promise.allSettled([
  new Promise(resolve => setTimeout(resolve)),
  "value",
  Promise.reject(new Error("error")),
]))

Expected output:

[
  { status: "fulfilled", value: undefined },
  { status: "fulfilled", value: "value" },
  {
    status: "rejected",
    reason: Error: error
  }
]

all fulfilled or first rejected:

await Promise.all([p1, p2, p3])

first fulfilled, reject only when all rejected:

await Promise.any([p1, p2, p3])

first settled:

await Promise.race([p1, p2, p3])

limiting concurrency

runWithConcurrencyLimit executes an array of asynchronous tasks with a specified concurrency limit, ensuring that no more than the given number of tasks run concurrently and returning the results once all tasks are completed.

function runWithConcurrencyLimit<T>(
  tasks: Array<() => Promise<T>>,
  concurrency: number
): Promise<Array<T>> {
  return new Promise((resolve, reject) => {
    if (tasks.length === 0) {
      resolve([])
    }

    const results: Array<T> = []
    let currentIndex = 0
    let completed = 0

    const next = () => {
      const index = currentIndex++
      tasks[index]().then(result => {
        results[index] = result
        if (++completed >= tasks.length) {
          resolve(results)
          return
        }
        if (currentIndex < tasks.length) {
          next()
        }
      }, reject)
    }

    for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
      next()
    }
  })
}

Stream

Stream represent a sequence of data that can be processed incrementally.

Stream allows you to process data as it arrives, reducing latency, also avoid to overwhelm memory. Think of uploading a large file, with streams you can upload the file to OSS as the browser start uploading.

Node.js has Stream support for a long time. Now Stream is a web standard, but it's not the same Stream. Let's take a look at the standard Streams.

ReadableStream

Readable streams are source of data that can be read from. Examples include files and network responses.

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello')
    controller.enqueue('world!')
    controller.close()
  }
})

const reader = stream.getReader()

while (true) {
  const {done, value} = await reader.read()
  if (done) {
    break
  }
  console.log(value)
}

Stream is async iterable:

for await (const chunk of stream) {
  console.log(chunk)
}

WritableStream

Writable streams are destinations for data to be written to. Examples includes files and network requests.

const readableStream = new ReadableStream({
  start(controller) {
    ['Hello', 'world'].forEach(chunk =>
      controller.enqueue(chunk)
    controller.close()
  }
})

const writableStream = new WritableStream({
  write(chunk) {
    console.log(chunk) 
  }
})

readableStream.pipeTo(writableStream)

TransformStream

Transform streams allow you to modify data as it flows through a stream.

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  }
})

const uppercaseTransformStream = new TransformStream({
  async transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase())
  }
})

const transformedStream = readableStream.pipeThrough(uppercaseTransformStream)

for await (const chunk of transformedStream) {
  console.log(chunk)
}

Cluster

Node.js's Cluster API allows you improve the performance of your Node.js applications by distributing incoming requests across multiple processes.

import cluster from 'node:cluster'
import http from 'node:http'
import os from 'node:os'

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`)

  for (let i = 0; i < os.cpus().length; i++) {
    cluster.fork()
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`)
    cluster.fork()
  })
} else {
  http.createServer((req, res) => {
    res.writeHead(200)
    res.end(`hello from ${process.pid}`)
  }).listen(3000)
  console.log(`Worker ${process.pid} started`)
}
The child processes aren't really listening to the same port. Incoming socket
connections to the master process are being delegated to the child
processes.

Web Workers

Workers are the way to run javascript off of the main thread. This can be useful to run compute heavy code without blocking the main thread.

Workers are a statdard, supported by browsers and also server side runtimes like Deno. In node.js, there is the worker_thread package.

An Example

Add a worker.ts:

function findPrime(max) {
  for (let i = max; i >= 2; i--) { 
    let isPrime = true
    for (let j = 2; j * j <= i; j ++) { 
      if (i % j === 0) {
        isPrime = false
        break 
      }
    }
    if (isPrime) return i
  }
}

self.onmessage = function(event) {
  self.postMessage(findPrime(event.data))
}

Here is how you can invoke it from main thread:

const worker = new Worker(
  new URL("./worker.ts", import.meta.url).href,
  {type: "module"}
)

worker.postMessage(1_000_000_000_000)

self.onmessage = function(event) {
  console.log(event.data)
}
$ deno --allow-read main.ts                                                      <<<
999999999989

A Callable Worker

In the previous example, it's hard to get the result of the computation, here is a solution:

class CallableWorker {
  private worker: Worker
  private id = 0
  private calls = new Map()

  constructor(url: string) {
    this.worker = new Worker(url, {type: "module"})
    this.worker.onmessage = (event) => {
      const [id, err, result] = event.data
      const [resolve, reject] = this.calls.get(id)
      err ? reject(err) : resolve(result)
      this.calls.delete(id)
    }
  }

  call(method: string, args: any[]) {
    this.id ++
    const id = this.id
    this.worker.postMessage([id, method, args]);
    return new Promise((resolve, reject) => {
      this.calls.set(id, [resolve, reject])
    })
  }
}

const worker = new CallableWorker(
  new URL("./worker.ts", import.meta.url).href
)

console.log(await worker.call("findPrime", [1_000_000_000_000]))

In worker.ts:

function findPrime(max) {
  for (let i = max; i >= 2; i--) { 
    let isPrime = true
    for (let j = 2; j * j <= i; j ++) { 
      if (i % j === 0) {
        isPrime = false
        break 
      }
    }
    if (isPrime) return i
  }
}

const methods = {
  findPrime
}

self.onmessage = async function(event) {
  const [id, method, args] = event.data
  if (!methods[method]) {
    self.postMessage([id, new Error(`Method not defined: ${method}`)])
    return
  }
  let result, err
  try {
    result = await methods[method](...args)
  } catch (e) {
    err = e
  }
  self.postMessage([id, err, result])
}

Goroutine

Golang was designed with concurrency as a first-class feature. Concurrency is achieved through goroutines and channels. This model is simple yet powerful.

Goroutines are lightweight threads managed by the Go runtime. They will be automatically multiplexed onto multiple OS threads when necessary. So there is no need to create and manage OS thread manualy.

package main

import (
	"fmt"
	"time"
)

func main() {
	go fmt.Print(1)
	go fmt.Print(2)
	fmt.Print(3)
	time.Sleep(time.Millisecond)
}

The output will be 312 or 321.

WaitGroup

In node.js, the process exits when there is no scheduled work. But in go, your program exits when main function exits.

In previouse example, there is a time.Sleep() at the end of the main function. Without it, the main function will exit immediately and the goroutines won't be executed.

WaitGroup can be used to wait for multiple goroutines to finish.

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := sync.WaitGroup{}
	wg.Add(2)

	go func() {
		fmt.Print(1)
		wg.Done()
	}()

	go func() {
		fmt.Print(2)
		wg.Done()
	}()

	fmt.Print(3)
	wg.Wait()
}

Channel

Channels are used to communicate between goroutines. They are a highly effective abstraction, the clojure guys reimplemented it as core.async.

send and receive

package main

import (
	"fmt"
)

func main() {
	ch := make(chan int)

	go func() {
        fmt.Println("sending 1")
		ch <- 1
        fmt.Println("sending 2")
		ch <- 2
	}()

	fmt.Println("        receiving 1")
	fmt.Println("        received", <-ch, "receiving 2")
	fmt.Println("        received", <-ch)
}

Output:

sending 1
        receiving 1
        received 1 receiving 2
sending 2
        received 2

Or:

        receiving 1
sending 1
sending 2
        received 1 receiving 2
        received 2

buffered channel

Both send and receive are blocking operations, unless the channel is buffered.

package main

import (
	"fmt"
)

func main() {
	ch := make(chan int, 1)
	ch <- 1
	fmt.Print(<-ch)
}

close a channel

A channel should be closed, if no more values will be send to it

package main

import (
	"fmt"
)

func main() {
	ch := make(chan int)

	go func() {
		ch <- 1
		close(ch)
	}()

	val, ok := <-ch
	fmt.Println(val, ok)
	val, ok = <-ch
	fmt.Println(val, ok)
}

Output:

1 true
0 false

iterate over channel

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan int)

	go func() {
		for i := range 10 {
			time.Sleep(time.Millisecond * 100)
			ch <- i
		}
		close(ch)
	}()

	for val := range ch {
		fmt.Print(val)
	}
}

select

select allows you to wait on multiple channels, similar to Promise.race() in JavaScript.

receive Timeout

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan string)

	go func() {
		time.Sleep(time.Second * 1)
		ch <- "1"
	}()

	select {
	case msg := <-ch:
		fmt.Println(msg)
	case <-time.After(time.Second * 2):
		fmt.Println("timeout")
	}
}

Mutex

Mutual exclusion locks are a fundamental synchronization primitive used in many programming languages to protect shared resources from concurrent access.

Go also offers sync.Mutex. It can be easily implemented using a buffered channel.

There is also RWMutex which allows multiple readers to access the shared resource concurrently, and ensures that the writer has exclusive access by blocking other readers and writers.

a naive counter

package main

import (
	"fmt"
	"sync"
)

type Conter struct {
	value int64
}

func (c *Conter) Inc() {
	c.value++
}

func (c *Conter) Value() int64 {
	return c.value
}

func main() {
	counter := &Conter{}
	var wg sync.WaitGroup

	inc := func() {
		counter.Inc()
		wg.Done()
	}

	for i := 0; i < 100; i++ {
		wg.Add(3)
		go inc()
		go inc()
		go inc()
	}
	wg.Wait()
	fmt.Println(counter.Value())
}
$ go run counter.go                                                                                                                                                                                                                                                                                                                         <<<
267

using Mutex

package main

import (
	"fmt"
	"sync"
)

type Conter struct {
	mu    sync.Mutex
	value int64
}

func (c *Conter) Inc() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
}

func (c *Conter) Value() int64 {
	return c.value
}

func main() {
	counter := &Conter{}
	var wg sync.WaitGroup

	inc := func() {
		counter.Inc()
		wg.Done()
	}

	for i := 0; i < 100; i++ {
		wg.Add(3)
		go inc()
		go inc()
		go inc()
	}
	wg.Wait()
	fmt.Println(counter.Value())
}
$ go run mutex.go                                                                                                                                                                                                                                                                                                                         <<<
300

implementing Mutex using channel

package main

import (
	"fmt"
	"sync"
)

type Mutex chan struct{}

func NewMutex() Mutex {
	return make(Mutex, 1)
}

func (m Mutex) Lock() {
	m <- struct{}{}
}

func (m Mutex) Unlock() {
	<-m
}

type Counter struct {
	mu    Mutex
	value int64
}

func NewCounter() *Counter {
	return &Counter{mu: NewMutex()}
}

func (c *Counter) Inc() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.value++
}

func (c *Counter) Value() int64 {
	return c.value
}

func main() {
	counter := NewCounter()

	var wg sync.WaitGroup
	inc := func() {
		counter.Inc()
		wg.Done()
	}

	for i := 0; i < 100; i++ {
		wg.Add(3)
		go inc()
		go inc()
		go inc()
	}
	wg.Wait()
	fmt.Println(counter.Value())
}

Atomic Number

For simple counter, sync/atomic can be used instead of Mutex.

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var d atomic.Uint32
	var wg sync.WaitGroup
	wg.Add(100)

	for range 100 {
		go func() {
			d.Add(1)
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Println(d.Load())
}

Lock-Free Data Structures

sync/atomic can also be used to implement lock-free data structures.

a lock-free stack

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type Node[T any] struct {
	value T
	next  *Node[T]
}

type Stack[T any] struct {
	head atomic.Pointer[Node[T]]
}

func (s *Stack[T]) Push(value T) {
	node := &Node[T]{value: value}

	for {
		node.next = s.head.Load()
		if s.head.CompareAndSwap(node.next, node) {
			return
		}
	}
}

func (s *Stack[T]) Pop() (T, bool) {
	for {
		head := s.head.Load()
		if head == nil {
			return *new(T), false
		}

		if s.head.CompareAndSwap(head, head.next) {
			return head.value, true
		}
	}
}

func main() {
	stack := &Stack[int]{}

	for i := range 10 {
		stack.Push(i)
	}

	wg := sync.WaitGroup{}
	wg.Add(10)

	for range 10 {
		go func() {
			value, _ := stack.Pop()
			fmt.Printf("%d ", value)
			wg.Done()
		}()
	}

	wg.Wait()
}

Context

The context package provides a mechanism to manage the flow of execution across API boundaries. It can be used for canceling operations, setting deadlines and propagating values.

In node.js there are AsyncLocalStorage and AbortSignal.

Go's API is pretty dump, no magic. You have to passdown Context to child function calls explicitly.

It's worth mentioning that the API of the context package takes an immutable approach. When creating a new context with context.WithTimeout, context.WithDeadline, or context.WithValue, a new context is returned, leaving the original context unchanged, which makes the code more predictable.

package main

import (
	"context"
	"fmt"
	"time"
)

func doWork(ctx context.Context) {
	select {
	case <-ctx.Done():
		fmt.Println("context canceled")
	case <-time.After(time.Second):
		fmt.Println("work done")
	}
}

func foo(ctx context.Context) {
	doWork(ctx)
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()
	go foo(ctx)
	<-ctx.Done()
}

example: limit http request processing time

package main

import (
	"fmt"
	"net/http"
	"time"
)

func TimeoutMiddleware(timeout time.Duration) func(http.HandlerFunc) http.HandlerFunc {
	return func(next http.HandlerFunc) http.HandlerFunc {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			done := make(chan bool, 1)
			go func() {
				next(w, r)
				done <- true
			}()

			select {
			case <-time.After(timeout):
				http.Error(w, "Request Timeout", http.StatusRequestTimeout)
			case <-done:
			}
		})
	}
}

func Handler(w http.ResponseWriter, r *http.Request) {
	time.Sleep(2 * time.Second)
	fmt.Fprintf(w, "Served on time")
}

func main() {
	http.HandleFunc("GET /timeout/1", TimeoutMiddleware(1*time.Second)(Handler))
	http.HandleFunc("GET /timeout/3", TimeoutMiddleware(3*time.Second)(Handler))
	if err := http.ListenAndServe(":3000", nil); err != nil {
		panic(err)
	}
}

A Brief History

Java had concurrency support from the begining.

Java 1(1996) includes the Thread class, the Runnable interface and the synchronized keyword. These features laid the foundation for multithreading in Java.

Java 5(2004) introduced the java.util.concurrency package with a comprehensive set of tools for concurrent programming including Future and Callable, Executors, Locks, Semaphore, Atomic Classes, Concurrent Collections, and BlockingQueue, making it easier to work with multithreading and reducing the complexity associated with manually managing threads.

Java 8(2014): Streams, CompletableFuture. CompletableFuture is currently the primary mechanism for concurrent programming in modern Java.

Java 9(2015): Reactive Stream

Java 21(2023): Virtual Threads (Actually Java had green threads at the begining, but switched to platform threads in version 1.2 in order to take full advantage of multi-core processors.)

Thread

Threads are lightweight process that can run concurrently. They can be created by extending the Thread class:

class MyThread extends Thread {
    public void run() {
        System.out.println("My thread is running");
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start();
    }
}

With modern java you can compile and run it with a single command:

java Main.java

Another way to create thread is to implement Runnable

class MyThread implements Runnable {
    public void run() {
        System.out.println("Thread via Runnable");
    }
}

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(new MyThread());
        thread.start();
    }
}

It can be further simplified using lambda:

public class Main {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            System.out.println("Thread as lambda expression");
        });
        thread.start();
    }
}
A java thread might run on different cores through out it's lifetime.

join()

Unlike goroutines, java program won't exit until all threads are finished (unless the threads are daemon threads).

But if you want to wait for a particular thread to finish, use join():

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("Thread started");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread finished");
        });

        thread.start();

        thread.join();
        System.out.println("Main thread finished");
    }
}

Expected output:

Thread started
Thread finished
Main thread finished

You can also pass a timeout when join:

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            System.out.println("Thread started");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread finished");
        });

        thread.start();

        thread.join(1000);

        System.out.println(thread.isAlive() ? "Thread is alive" : "Thread is dead");

        System.out.println("Main thread finished");
    }
}

Expected output:

Thread started
Thread is alive
Main thread finished
Thread finished

The problem of multithreading

A race condition occurs when multiple threads try to access and modify shared data concurrently in a naive way:

public class Main {
    private static int counter = 0;

    public static void main(String[] args) {

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    counter++;
                    System.out.println(Thread.currentThread().getName() + " " + counter);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
        }
    }
}

The output can be 38, 47 or other numbers.

synchronized

There are several ways to avoid race conditions, one of them is using syncrhonzied methods or blocks. synchronized ensures that only one thread can execute it at a time.

syncrhonized blocks

public class Main {
    private static int counter = 0;

    public static void main(String[] args) {
        Object lock = new Object();

        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                synchronized (lock) {
                    for (int j = 0; j < 10; j++) {
                        counter++;
                        System.out.println(
                            Thread.currentThread().getName() + " " + counter);
                        try {
                            Thread.sleep(50);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            thread.start();
        }
    }
}

syncrhonized methods

This is more effecient than the above example, because only the crucial part is synchronzied and the threads can sleep at the same time.

public class Main {
    private static int counter = 0;

    public synchronized static void count() {
        counter++;
    }

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    count();
                    System.out.println(
                        Thread.currentThread().getName() + " " + counter);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
        }
    }
}

CountDownLatch

CountDownLatch is quite similar to WaitGroup in go. It is used to wait for a set of operations to complete before proceeding.

import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        
        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() + " is working.");
            latch.countDown();
        };
        
        for (int i = 0; i < 3; i++) {
            new Thread(task).start();
        }

        latch.await();
        System.out.println("All threads are done, proceeding with main thread.");
    }
}

Expected output:

Thread-1 is working.
Thread-0 is working.
Thread-2 is working.
All threads are done, proceeding with main thread.

CyclicBarrier

A CyclicBarrier is like a rendezvous point for threads. All threads need to reach certain point(barrier) before they can continue.

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;

public class Main {

    public static void main(String[] args) {

        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("All threads have reached the barrier");
        });

        Runnable task = () -> {
            String name = Thread.currentThread().getName();
            try {
                System.out.println(name + " is performing task 1.");
                Thread.sleep(1000);
                // signal that current task have passed the barrier and wait for others
                barrier.await();
                System.out.println(name + " has passed the first barrier. starting task 2");
                Thread.sleep(1000);
                barrier.await();
                System.out.println(name + " has passed the second barrier.");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        };

        new Thread(task).start();
        new Thread(task).start();
        new Thread(task).start();
    }
}

Expected output:

Thread-0 is performing task 1.
Thread-1 is performing task 1.
Thread-2 is performing task 1.
All threads have reached the barrier
Thread-1 has passed the first barrier. starting task 2
Thread-2 has passed the first barrier. starting task 2
Thread-0 has passed the first barrier. starting task 2
All threads have reached the barrier
Thread-0 has passed the second barrier.
Thread-1 has passed the second barrier.
Thread-2 has passed the second barrier.

Locks

Locks are more sophisticated than synchronized blocks or methods.

ReentrantLock

ReentrantLock is reentrant, meaning that a thread can lock the same lock multiple times. This is different from the synchronized keyword, which would cause a thread to deadlock if it tried to re-enter a synchronized block it already holds. It's useful when it's not easy to keep track of whether you've already acquired a lock.

import java.util.concurrent.locks.ReentrantLock;

public class Main {
    
    private static final ReentrantLock lock = new ReentrantLock();
    private static int result;

    public static void fib(int num) {
        System.out.println("lock " + num);
        lock.lock();
        if (num == 1 || num == 2) {
            result += 1;
        } else {
            fib(num - 1);
            fib(num - 2);
        }
        System.out.println("unlock " + num);
        lock.unlock();
    }

    public static void main(String[] args) {
        result = 0;
        fib(5);
        System.out.println(result);
    }
}

Expected output:

lock 5
lock 4
lock 3
lock 2
unlock 2
lock 1
unlock 1
unlock 3
lock 2
unlock 2
unlock 4
lock 3
lock 2
unlock 2
lock 1
unlock 1
unlock 3
unlock 5
5

ReadWriteLock

ReadWriteLock allows multiple threads to acquire read lock concurrently but ensures that only one thread can write at a time.

There is also ReentrantReadWriteLock.

ReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.readLock().lock();
rwLock.readLock().unlock();

StampedLock

StampedLock provides optimistic read lock beside read and write lock.

Optimistic Read Lock tryOptimisticRead() Allows threads to optimistically read data without blocking, and later verify whether the read was successful or invalidated by a write.

This reduces contention and improves throughput when writes are infrequent.

import java.util.concurrent.locks.StampedLock;

class Counter {
    private int count = 0;
    private final StampedLock lock = new StampedLock();

    public void increment() {
        long stamp = lock.writeLock();
        count++;
        lock.unlockWrite(stamp);
        System.out.println("incremented: " + count);
    }

    public int getCount() {
        long stamp = lock.tryOptimisticRead();
        int currentCount = count;
        if (!lock.validate(stamp)) {
            System.out.println("invalid read, retry");
            stamp = lock.readLock();
            currentCount = count;
            lock.unlockRead(stamp);
        }
        return currentCount;
    }
}

public class Main {

    public static void main(String[] args) {
        Counter counter = new Counter();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                counter.increment();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("Read Count: " + counter.getCount());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

Expected output:

incremented: 1
Read Count: 1
incremented: 2
Read Count: 2
incremented: 3
invalid read, retry
Read Count: 3
Read Count: 3
incremented: 4
Read Count: 4
incremented: 5

Semaphore

A Semaphore is like a Lock, except that a Lock allows only one thread to access a resource at a time, a semaphore allows a specific limit number of threads to access a resource.

Semaphore is more versatile then a Lock. It can be used to limit access to a resource pool, such as a fixed number of database connections. It can also be used to limit the number of concurrent tasks.

import java.util.concurrent.Semaphore;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);

        Runnable task = () -> {
            try {
                String name = Thread.currentThread().getName();

                System.out.println(name + " is trying to acquire a permit...");
                semaphore.acquire();

                System.out.println(name + " has acquired a permit and is working...");
                Thread.sleep(2000);

                semaphore.release();
                System.out.println(name + " has released a permit.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        new Thread(task).start();
        new Thread(task).start();
        new Thread(task).start();
        new Thread(task).start();
    }
}

Expected output:

Thread-0 is trying to acquire a permit...
Thread-1 is trying to acquire a permit...
Thread-1 has acquired a permit and is working...
Thread-2 is trying to acquire a permit...
Thread-3 is trying to acquire a permit...
Thread-0 has acquired a permit and is working...
Thread-2 has acquired a permit and is working...
Thread-1 has released a permit.
Thread-3 has acquired a permit and is working...
Thread-0 has released a permit.
Thread-2 has released a permit.
Thread-3 has released a permit.

Exchanger

Exchanger can be used to sync data bewteen two threads.

import java.util.concurrent.Exchanger;

public class Main {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Runnable task = () -> {
            try {
                String name = Thread.currentThread().getName();
                // send out message and wait for message from others
                String response = exchanger.exchange("hello from " + name);
                System.out.println(name + " received: " + response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        new Thread(task).start();
        new Thread(task).start();
    }
}

Expected output:

Thread-0 received: hello from Thread-1
Thread-1 received: hello from Thread-0

BlockingQueue

BlockingQueue is like channel in go.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        // producer
        new Thread(() -> {
            try {
                for (int i = 1; i <= 6; i++) {
                    queue.put(i);
                    System.out.println("< queued: " + i);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();

        // consumer
        new Thread(() -> {
            try {
                while (true) {
                    Integer item = queue.take();
                    Thread.sleep(500);
                    System.out.println("> consumed: " + item);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

Expected output:

< queued: 1
< queued: 2
< queued: 3
< queued: 4
> consumed: 1
< queued: 5
> consumed: 2
< queued: 6
> consumed: 3
> consumed: 4
> consumed: 5
> consumed: 6

Atomic Values

Atomic values use CAS-like (Compare-and-Swap) techniques to ensure that operations on variables are atomic. Which means that when an operation is performed, the operation either fully succeeds or fully fails.

Atomic values are high-performance, thread-safe alternatives to other synchronization mechanisms like locks.

import java.util.concurrent.atomic.AtomicInteger;

public class Main {

    public static void main(String[] args) {

        final AtomicInteger counter = new AtomicInteger(0);

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                counter.set(counter.get() + 1);
            }).start();
        }

        System.out.println(counter.get());
    }
}

Executor

Executor making it easier to manage thread execution. Instead of manage thread manually, you submit tasks to Executor, the tasks will be run concurrently within a pool of threads.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
            }
        });

        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
            }
        });

        executor.shutdown();
    }
}

The output shoul look like following:

pool-1-thread-1 0
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-2 0
pool-1-thread-1 6
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-2 3
pool-1-thread-2 4
pool-1-thread-2 5
pool-1-thread-2 6
pool-1-thread-2 7
pool-1-thread-2 8
pool-1-thread-2 9
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9

Submit more tasks:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        Runnable task = () -> {
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + " " + i);
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        executor.submit(task);
        executor.submit(task);
        executor.submit(task);

        executor.shutdown();
    }
}

Still two threads running at the same time:

pool-1-thread-2 0
pool-1-thread-1 0
pool-1-thread-1 1
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-1 2
pool-1-thread-2 3
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-2 4
pool-1-thread-2 0
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-2 3
pool-1-thread-2 4

Virtual Threads

Using virtual threads via Executors.newVirtualThreadPerTaskExecutor():

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Instant startTime = Instant.now(); 
        // ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

        AtomicLong memUsage = new AtomicLong(0L);

        for (int i = 0; i < 100_000; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(100);
                    Long usage = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024;
                    if (memUsage.get() < usage) {
                        memUsage.set(usage);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

        System.out.println("Execution time: " + Duration.between(startTime, Instant.now()) + " Memory: " + memUsage.get());
    }
}

It's obvious that virtual threads are much more efficient than the platform thread:

Execution time: PT4.35937S Memory: 1876
Execution time: PT0.410156S Memory: 203

Future

Futures provide a higher-level abstraction that simplifies asynchronous programming very similar to Promise in Javascript.

Future is usually used with Executor:

import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        
        Future<Integer> future = executor.submit(() -> {
            Thread.sleep(1000);
            return 1;
        });
        
        System.out.println(future.get());
        executor.shutdown();
    }
}

It's also possible to create Future mannualy:

import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            Thread.sleep(1000);
            return 1;
        });

        new Thread(futureTask).start();

        System.out.println(futureTask.get());
    }
}

CompletableFuture

CompletableFuture provides a more composable and flexible way to chain and combine Futures. You can use methods like thenApply, thenCombine, etc., to chain operations.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            return 1;
        });

        CompletableFuture<Integer> result = future.thenApply((num) -> {
            return num * 2;
        });

        System.out.println(result.get());
    }
}

Handling exception

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Boom!");
            }
            return 1;
        });

        future
            .exceptionally(e -> {
                System.out.println(e.getMessage());
                return 0;
            })
            .thenAccept(result -> System.out.println("result: " + result));
    }
}

Structrued Concurrency

Structrued concurrency put multiple tasks in to a scope. If a task fails, the scope automatically cancels other tasks within it.

import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;

void main() throws InterruptedException, ExecutionException {

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

        var task1 = scope.fork(() -> {
            Thread.sleep(2000);
            System.out.println("Task 1 Completed");
            return 1;
        });

        var task2 = scope.fork(() -> {
            Thread.sleep(1000);
            return 1 / 0;
        });

        var task3 = scope.fork(() -> {
            Thread.sleep(500);
            System.out.println("Task 3 Completed");
            return null;
        });

        scope.join().throwIfFailed();;
    } catch (Exception e) {
        System.err.println("Exception while executing tasks: " + e.getMessage());
    }
}

It's still in preview as of java 23, you need to pass the --enable-preview flag:

java --enable-preview Main.java                                                                                                             <<<

Expected output:

Task 3 Completed
Exception while executing tasks: java.lang.ArithmeticException: / by zero

OTP

OTP (Open Telecom Platform) is a collection of tools and libraries for building fault-tolerant, scalable and concurrent systems in Erlang.

It was originally developed at Ericsson, a telecom company.

Elixir runs on the Erlang VM (BEAM) and it can fully leverages OTP as well.

The Actor Model

Like Erlang, Elixir uses the Actor Model. In the Actor Model, actors are the fundamental units of computation.

An actor is an independent, self-contained entity with its own state. There is no shared state between the actors.

An actor can change its internal state and can send messages to and receive messages from other actors.

An actor can also create new actors.

Because actors work independently and interact only by passing messages, this approach make it much easier to write highly concurrent and fault-tolerant systems.

Process

Process follows the Actor model.

Spawn a process

Use spawn to create a new process, it will return a Process Identifier (PID):

defmodule Adder do
  def add(a, b) do
    IO.puts(a + b)
  end
end
iex> pid = spawn(Adder, :add, [1, 1])
2
#PID<0.110.0>

Receiving and sending messages

Use send to send message to a process:

defmodule Foo do
  def listen do
    receive do
      msg -> IO.puts("received #{msg}")
    end
  end
end
iex> pid = spawn(Foo, :listen, [])
#PID<0.117.0>
iex> send(pid, "hello")
received hello

But if you send message to the process again, nothing happens, the process is dead.

iex> Process.alive?(pid)
false

To fix this, we need to call listen recursively:

defmodule Foo do
  def listen do
    receive do
      msg -> IO.puts("received #{msg}")
    end
    listen()
  end
end

Linked Process

If you need to make sure the process is spawned succssfully. Use spawn_link instead of spawn will link the newly created process to the current one. Two linked processes will receive exit signals from one anther.

defmodule Foo do
  def boom do
    exit(:boom)
  end
end

pid = spawn_link(Foo, :boom , [])
** (EXIT from #PID<0.104.0>) shell process exited with reason: :boom

By default the exit signal will crash the process.

Trap the exit signal

Using Process.flag(:trap_exit, true) to trap exit notifications.

defmodule Foo do

  def boom do
    exit(:boom)
  end

  def run do
    Process.flag(:trap_exit, true)

    spawn_link(Foo, :boom, [])

    receive do
      {:EXIT, _pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end
iex> Foo.run
Exit reason: boom

Monitoring

Another way is to use spawn_monitor to receive process down signal:

defmodule Foo do
  def boom do
    exit(:boom)
  end

  def run do
    spawn_monitor(Foo, :boom , [])

    receive do
      {:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
    end
  end
end

Foo.run

Agent

Agent is used to manage shared state. It is an abstraction over the low-level process management.

Actors are entities that encapsulate state and communicate by passing messages. Agents are often seen as "facilitators" or "intermediaries" for managing state and behavior in a controlled way.

The term "agent" is widely used in concurrent and distributed programming models. Other languages like Clojure also use similar terminology.

{:ok, agent} = Agent.start(fn -> 0 end)
Agent.get(agent, fn state -> state end)
Agent.update(agent, fn (state) -> state + 1 end)
Agent.get(agent, fn state -> state end)

Task

Task provides a way to run jobs in the background and get its result later. Similar to Future or Promise in other languages.

task = Task.async(fn -> 
  :timer.sleep(1000)
  :ok
end)

result = Task.await(task)
IO.puts("Task result: #{result}")

Stream

Unlike Enum, which eagerly evaluates a collection, Stream defers execution until it's absolutely needed — ideal for working with large datasets as it is more efficient and memory-friendly.

lazy enumerables

Stream.iterate(1, fn x -> x + 1 end)
|> Stream.filter(fn x -> rem(x, 2) == 0 end)
|> Enum.take(4)
[2, 4, 6, 8]

jsonl processing

{"id": 1, "name": "Alice", "role": "developer"}
{"id": 2, "name": "Bob", "role": "designer"}
{"id": 3, "name": "Charlie", "role": "manager"}
{"id": 4, "name": "Diana", "role": "developer"}
{"id": 5, "name": "Eve", "role": "intern"}
...
File.stream!("data.jsonl")
|> Stream.map(&JSON.decode/1)
|> Stream.map(fn {:ok, record} -> record end)
|> Enum.take(1)
[%{"id" => 1, "name" => "Alice", "role" => "developer"}]

GenServer

One of the most widely used OTP abstractions is GenServer. A GenServer (generic server) provides a standardized interface for handling sync and async requests and managing state.

defmodule Counter do
  use GenServer

  def init(initial_value) do
    {:ok, initial_value}
  end

  # sync calls
  def handle_call(:get_count, _from, state) do
    {:reply, state, state}
  end

  # async calls
  def handle_cast(:incr, state) do
    {:noreply, state + 1}
  end

  def handle_cast(:decr, state) do
    {:noreply, state - 1}
  end
end

Use the Counter:

GenServer.start_link(Counter, 0, name: Counter)
GenServer.cast(Counter, :incr)
GenServer.call(Counter, :get_count) # 1
GenServer.cast(Counter, :decr)
GenServer.call(Counter, :get_count) # 0

Add helper functions

defmodule Counter do
  use GenServer
  
  def init(initial_value) do
    {:ok, initial_value}
  end

  # sync calls
  def handle_call(:get_count, _from, state) do
    {:reply, state, state}
  end

  # async calls
  def handle_cast(:incr, state) do
    {:noreply, state + 1}
  end

  def handle_cast(:decr, state) do
    {:noreply, state - 1}
  end

  # helper functions
  def incr, do: GenServer.cast(__MODULE__, :incr)
  def decr, do: GenServer.cast(__MODULE__, :decr)
  def get_count, do: GenServer.call(__MODULE__, :get_count)

  def start_link(initial_value) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end
end

Now we can have better APIs:

Counter.start_link(0)
Counter.incr
Counter.get_count
Counter.decr
Counter.get_count

GenServers are a higher-level abstraction than processes and can be used with Supervisors.

Supervisor

Supervisors are a fundamental concept in OTP. They monitor and manage the lifecycle of processes, restarting them if they fail.

Create a application using mix:

mix new counter --sup

lib/counter/application.ex

defmodule Counter.Application do

  use Application

  def start(_type, _args) do
    children = [
      {Counter, 0}
    ]

    opts = [strategy: :one_for_one, name: Counter.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

put the counter code into lib/counter.ex

defmodule Counter do
  use GenServer
  
  def init(initial_value) do
    {:ok, initial_value}
  end

  # sync calls
  def handle_call(:get_count, _from, state) do
    {:reply, state, state}
  end

  # async calls
  def handle_cast(:incr, state) do
    {:noreply, state + 1}
  end

  # helper functions
  def incr, do: GenServer.cast(__MODULE__, :incr)
  def get_count, do: GenServer.call(__MODULE__, :get_count)

  def start_link(initial_value) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end
end
iex -S mix
iex(1)> Counter.get_count
0
iex(2)> Counter.incr
:ok
iex(3)> Counter.get_count
1

Distributed Systems

OTP's support for distributed systems is something that set Erlang and Elixir apart from other programming languages.

The BEAM vm is designed from the ground up to support distributed computing. With minimal efforts, you can have multiple Erlang/Elixir nodes running on different machines that can seamlessly communicate with each other.

Those nodes can form a cluster out of the box. Nodes can automatically connect to each other, share state, and communicate as if they were a single system, even if they're running on different machines or across data centers.

In most other languages, it usually require external libraries, frameworks, or infrastructure to provide essential services to set up distributed systems. Which includes:

  • Service Discovery
  • RPC
  • Load balancing
  • Auto-scaling
  • Self-healing
  • Tracing
iex --sname foo@localhost -S mix
iex --sname bar@localhost -S mix
iex(bar@localhost)1> Node.spawn_link(:foo@localhost, fn -> Counter.incr end)
iex(foo@localhost)1> Counter.get_count
1

Theads

Because of GIL, threads in ruby are not very useful for CPU-bound tasks. But they can still be used for IO-bound operations.

thread = Thread.new do
  sleep 1
  puts "Hello"
end

thread.join

Concurrent IO

require 'net/http'
require 'uri'

urls = [
  "https://developer.mozilla.org/",
  "https://elixir-lang.org/",
]

threads = urls.map do |url|
  Thread.new do
    response = Net::HTTP.get_response(URI.parse(url))
    puts "#{response.code} #{url}"
  end
end

threads.each(&:join)

Threads are hard. There are tools like Mutex and Queue in the thread module to achieve synchronization and data sharing. But you don't have to work with threads. There are better options.

Fiber

Fiber allows you to pause and resume the execution of code explicitly, can be used to implement generators and coroutines.

yield and resume

fib = Fiber.new do
  puts "2. fib can be paused using yield"
  Fiber.yield
  puts "4. fib resumed again"
end

puts "1. fib are paused when created, resume to start"
fib.resume
puts "3. paused fib can be resumed"
fib.resume
puts "5. done"

yield and resume with values

fib = Fiber.new do
  puts Fiber.yield('1. yielded value')
  '3. done'
end

puts fib.resume
puts fib.resume('2. resumed value')

Non-Blocking Fibers

Non-blocking Fibers is added in Ruby 3.0.

The async gem is built on top of non-blocking fibers.

Async

Async is a gem, not a language feature.

bundle add async

Async and wait

The top level Async block is needed to create a reactor context to do the scheduing, otherwise the first puts won't get executed utill the task is finished.

require 'async'

Async do
  task = Async do
    sleep 1
    rand
  end
  
  puts "wait for 1 second"
  puts task.wait
end

Concurrency

Async do
  tasks = (1..5).map do |x|
    Async do
      sleep 1
      x
    end
  end
  puts tasks.map(&:wait)
end

Limiting Concurrency Level

require 'async'
require 'async/semaphore'

Async do
  semaphore = Async::Semaphore.new(2)

  (1..5).each do |x|
    semaphore.async do
      puts "#{x} started"
      sleep 1
      puts "         #{x} finished"
    end
  end
end

Waiting for tasks

Barrier can be used to wait for a group of tasks, but you can just simplely do tasks.each(&:wait).

Waiter can be used to wait for a specific number of tasks from all tasks, kind of like Promise.race in js or select in go, but more flexible.

Barrier and Waiter can be used together.

require 'async'
require 'async/waiter'
require 'async/barrier'

barrier = Async::Barrier.new
waiter = Async::Waiter.new(parent: barrier)

Async do
  [3, 2, 1].each do |x|
    waiter.async do
      sleep x
      x
    end
  end
	
  puts waiter.wait(2).inspect
	
  # stop the remaining jobs
  barrier.stop
end

this should puts

[1, 2]

Timeouts

require 'async'

Async do |task|
  task.with_timeout(0.5) do
    sleep rand
  rescue Async::TimeoutError
    puts "timeout!"
  end
end

Ractor

Ractor is experimental, and the behavior may change in future versions
of Ruby! Also there are many implementation issues.

async is a solution for concurrency. Ractor is for parallelism.

ractors = (1..6).map do |x|
  Ractor.new(x) do |id|
    result = (1..1_000_000_000).reduce(:+)
    Ractor.yield([id, result])
  end
end

puts ractors.map(&:take).sort_by(&:first).map(&:last).inspect

Message Passing

Ractor also follows the actor model(ruby actor?). Each actor mantains inner state, and communicates using messages.

actor = Ractor.new do
  state = 0
  loop do
    message = Ractor.receive
    case message
    when :increment
      state += 1
      Ractor.yield(state)
    when :get_state
      Ractor.yield(state)
    when :exit
      break
    end
  end
end

actor.send(:increment)
puts actor.take
actor.send(:get_state)
puts actor.take
actor.send(:exit)

Thread

Like other scripting languages, python threads are good for IO-bound tasks, but not ideal for CPU-bound tasks.

import threading
import time

def task(id):
    time.sleep(1)
    print(id)

threads = [threading.Thread(target=task, args=(i,)) for i in range(5)]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

Output:

3
2
1
4
0

asyncio

Use the async/await syntax to declare coroutines, then use asyncio.run() to schedule it.

import asyncio

async def main():
    await asyncio.sleep(1)
    print('done')

asyncio.run(main())

Wait for multiple tasks to complete

Use asyncio.gather

import asyncio

async def task(id):
    await asyncio.sleep(1)
    print(id)
    return id

async def main():
    tasks = [task(i) for i in range(5)]

    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

Output:

0
1
2
3
4
[0, 1, 2, 3, 4]

TaskGroup

TaskGroup works like WaitGroup or Barrier in other languages, but you don't have to explicitly wait for it.

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.random())
    print(id)
    return id

async def main():

    async with asyncio.TaskGroup() as g:
        task1 = g.create_task(task(1))
        task2 = g.create_task(task(2))
    print(f"done: {task1.result()}, {task2.result()}")

asyncio.run(main())

Handling exceptions

asyncio.gather works much like Promise.all, if an exception is raised, it immediately propagated to the task that awaits on gather(). And other tasks will continue to run.

asyncio.gather also accept keyword argument return_exceptions, when set to True, it works more like Promise.allSettled.

import asyncio

async def task(id):
    await asyncio.sleep(1)
    print(id)
    return id

async def main():
    tasks = [task(i) for i in range(5)]

    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(results)

asyncio.run(main())

Output:

[ZeroDivisionError('division by zero'), 1.0, 0.5, 0.3333333333333333, 0.25]

Wait for the first task

asyncio.wait gives you more control than asyncio.gather:

import asyncio
import random

async def task(id):
    await asyncio.sleep(random.random())
    print(id)
    return id

async def main():
    # must be a list of Tasks, not coroutines
    tasks = [asyncio.create_task(task(i)) for i in range(5)]

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    print("done", [x.result() for x in done])
    print("pending tasks:", len(pending))
    for x in pending:
        x.cancel()

asyncio.run(main())

Output:

2
done [2]
pending tasks: 4

Timeout

When timed out, the task is also canceled.

import asyncio
import random

async def task():
    await asyncio.sleep(random.random())
    print("done")

async def main():
    try:
        async with asyncio.timeout(0.5):
            await task()
    except TimeoutError:
        print("Timed out")

asyncio.run(main())