Preface
Concurrent programming is crucial in modern software development for improved performance. Threads, the event loop, and the actor model are the most common approaches to concurrent programming.
Developing correct, efficient, and fault-tolerant concurrent applications is challenging. It involves managing complexities such as error handling, timeouts, retries, race conditions, deadlocks, etc. Mastery of these aspects is a key skill that distinguishes experienced developers from novices.
The Event Loop
JavaScript is single-threaded. Which avoid a lot of complexity and pitfall of multi-threading programming.
The event loop allows JavaScript to handle multiple IO operations asynchronously without blocking the main execution thread.
The only caveat of this approch is that you should never block the main thread. No compute extensive work or blocking IO calls. Instead, offload heavy computations to other threads (workers).
Basic Flow of the Event Loop
Synchronous code executes first, and is placed on the call stack.
Asynchronous tasks (e.g., setTimeout, I/O operations) are handed off to the browser's background processes or to libraries like libuv (Node.js) or tokio (Deno).
Once the asynchronous task is complete, its callback function is placed in the callback queue.
The event loop checks if the call stack is empty. If it is, it will pull the next task from the callback queue and place it on the call stack for execution.
Callback
Callbacks are a fundamental concept in asynchronous programming. In JavaScript callbacks are functions that are passed as arguments to other functions and are executed at a later time, typically when a certain event occurs or a task completes.
setTimeout(() => console.log('boom'), 2000)
In most cases, data are passed to the callback function. In nodejs, as a convention, error and result are passed to the callback.
> fs.readdir('/', (err, files) => console.log(files))
undefined
> [
'.VolumeIcon.icns', '.file',
'.vol', 'Applications',
'Library', 'System',
'Users', 'Volumes',
'bin', 'cores',
'dev', 'etc',
'home', 'opt',
'private', 'sbin',
'tmp', 'usr',
'var'
]
In the browser callback are usually used to listen for DOM events or user interactions:
document.body.onclick = ev => consonsole.log(ev)
// or simply
document.body.onclick = consonsole.log
This is a problem with code above. That is what if the onclick propertie get orverwirtten by another line of code in the code base? Well will get back to this in the next chapter.
Sometimes callbacks can also be synchronous. That is it get called immediately(in the same event loop) after being passed to other functions.
const print = (amount, formatFn) => console.log(`amount is ${formatFn(amount)}`)
print(1/3, amount => amount.toFixed(2))
Callbacks are the fundation of other higher-level asynchronous primitives. We will introduce them in the following chapters.
Event
One problem of callbacks is that is a one to one pattern, it gets a little complicated when multiple participants want to get called. But that's still possible:
const listenForClick = (element, callback) => {
const onclick = element.onclick
element.onclick = ev => {
onclick?.(ev) // or: onclick && onclick(ev)
callback(ev)
}
}
Everytime one needs to listen for the click event, call this function
listenForClick(document.body, ev => console.log(ev))
What a brilliant solution!
But there can be better ones, like event:
document.body.addEventListener('click', ev => console.log(ev))
Here the event is 'click', the listener is the function ev => console.log(ev).
You can add multiple listeners, there won't be any conflict like callbacks had.
And event better, you can remove them later.
const log = ev => console.log(ev)
document.body.addEventListener('click', log)
// to remove
document.body.removeEventListener('click', log)
implement event dispatcher using callbacks
type Listener<T> = (data?: T) => void
class EventEmitter<T> {
private listeners: { [event: string]: Array<Listener<T>> } = {}
on(event: string, listener: Listener<T>) {
if (!this.listeners[event]) {
this.listeners[event] = []
}
this.listeners[event].push(listener)
}
off(event: string, listener: Listener<T>) {
if (this.listeners[event]) {
this.listeners[event] = this.listeners[event].filter(x => x !== listener)
}
}
emit(event: string, data?: T) {
this.listeners[event]?.forEach(listener => listener(data))
}
}
Promise
Promises, together with async/await, have become the go-to tools for handling concurrency in modern JavaScript development.
basics
Promise can be in one of three distinct states
- pending: The operation is still ongoing,
- fulfilled (resolved): The operation has completed successfully.
- rejected: The operation failed.
consuming a Promise
promise.then(value => console.log(value), err => console.error(err))
const main = async () => {
try {
console.log(await promise)
} catch (err) {
console.error(err)
}
}
const inc = x => x + 1
promise.then(inc).then(inc).then(console.log)
creating a Promise
an async function allways returns a Promise
const asyncValue = async <T>(val: T): Promise<T> => val
asyncValue(1).then(console.log)
creating a Promise using the constructor
const delay = (time: number): Promise<void> => {
return new Promise((resolve, reject) => setTimeout(() => resolve(), time))
}
await delay(1000)
// 1 seconds later
a simplified Promise implementation
type Callback<T=any> = (value: T) => any
function isPromsie(value: any): boolean {
return typeof value?.then === 'function'
}
export class SimplePromise<T> {
state: 'pending' | 'fulfilled' | 'rejected' = 'pending'
value: T
reason: any
onFulfilled: Array<Callback<T>> = []
onRejected: Array<Callback> = []
constructor(executor: (resolve: Callback<T>, reject: Callback) => void) {
const resolve = (value: T) => {
if (this.state === 'pending') {
this.state = 'fulfilled'
this.value = value
this.onFulfilled.forEach(cb => cb(value))
}
}
const reject = (reason: any) => {
if (this.state === 'pending') {
this.state = 'rejected'
this.reason = reason
this.onRejected.forEach(cb => cb(reason))
}
}
try {
executor(resolve, reject);
} catch (error) {
reject(error)
}
}
then(onFulfilled?: Callback<T>, onRejected?: Callback) {
return new SimplePromise((resolve, reject) => {
const handleFulfilled = (value: T) => {
if (onFulfilled) {
try {
const result = onFulfilled(value)
isPromsie(result) ? result.then(resolve, reject) : resolve(result)
} catch (error) {
reject(error)
}
} else {
resolve(value)
}
}
const handleRejected = (reason: any) => {
if (onRejected) {
try {
const result = onRejected(reason)
isPromsie(result) ? result.then(resolve, reject) : resolve(result)
} catch (error) {
reject(error)
}
} else {
reject(reason)
}
}
if (this.state === 'fulfilled') {
handleFulfilled(this.value)
} else if (this.state === 'rejected') {
handleRejected(this.reason)
} else if (this.state === 'pending') {
this.onFulfilled.push(handleFulfilled)
this.onRejected.push(handleRejected)
}
})
}
catch(onRejected: Callback) {
return this.then(undefined, onRejected)
}
}
Coroutine
Coroutines are implemented using generator functions. A generator
function allows you to pause the execution of a function at any point
and later resume it from where it left off. This is done using the
yield keyword.
function* myGenerator() {
console.log("Start")
yield // Pause till next() is called
console.log("Resumed")
yield // Pause again
console.log("End")
}
const co = myGenerator()
co.next() // Start
co.next() // Resumed
co.next() // End
receive values from generator functions
function* myGenerator() {
yield 1
yield 2
yield 3
}
const co = myGenerator()
console.log(co.next())
console.log(co.next())
console.log(co.next())
console.log(co.next())
Expected output:
{ value: 1, done: false }
{ value: 2, done: false }
{ value: 3, done: false }
{ value: undefined, done: true }
send values to generator functions
function* myGenerator() {
console.log(yield 1)
console.log(yield 2)
}
const co = myGenerator()
const val1 = co.next().value
const val2 = co.next(val1 * 2).value
co.next(val2 * 2)
Expected output:
2
4
what about yielding functions
function* myGenerator() {
yield (cb) => setTimeout(cb, 1000)
console.log('done')
}
const co = myGenerator()
const fn = co.next().value
fn(() => co.next())
We just eliminated callback hell!
make it more useful
A simple coroutine runner:
function run(generator) {
const co = generator()
function next(value) {
const { value: fn, done } = co.next(value)
done || fn((err, result) => next([err, result]))
}
next()
}
We can use it like this:
const delay = time => cb =>
setTimeout(cb, time)
const asyncDouble = value => cb =>
setTimeout(() => cb(null, value * 2), 1000)
function* myGenerator() {
yield delay(1000)
console.log('1 seconds passed')
const [_, value] = yield asyncDouble(2)
console.log(value)
}
run(myGenerator)
you can do more
What about yielding multiple functions?
What about yielding a nother generator?
What about yielding promises?
That is just async/await:
async function fetchData() {
const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto')
const data = await response.json()
console.log(data)
}
AbortSignal
An async operation should be abortable, in golang, this is achived by
using context.Context. In JavaScript, this can be achived by
AbortSignal.
Abort on timeout
fetch(url, {signal: AbortSignal.timeout(1000)})
Support abort signal in you function:
function myOp({signal}) {
return new Promise((resolve, reject) => {
if (signal.aborted) {
reject(signal.reason)
}
signal.addEventListener('abort', () => reject(signal.reason))
})
}
myOp({signal: AbortSignal.timeout(1000)})
Explicit Abortion
AbortController allows you to abort explicitly
const controller = new AbortController()
fetch('https://githbub.com', {signal: controller.signal}).catch(console.log)
controller.abort()
Combining multiple sigals
like Promise.race()
fetch(url, {signal: AbortSignal.any([signal, signal2])})
Creating a aborted signal
like Promise.reject()
AbortSignal.abort(reason)
Concurrency Patterns
waiting for multiple Promises to settle
all settled:
await Promise.allSettled([p1, p2, p3])
console.log(await Promise.allSettled([
new Promise(resolve => setTimeout(resolve)),
"value",
Promise.reject(new Error("error")),
]))
Expected output:
[
{ status: "fulfilled", value: undefined },
{ status: "fulfilled", value: "value" },
{
status: "rejected",
reason: Error: error
}
]
all fulfilled or first rejected:
await Promise.all([p1, p2, p3])
first fulfilled, reject only when all rejected:
await Promise.any([p1, p2, p3])
first settled:
await Promise.race([p1, p2, p3])
limiting concurrency
runWithConcurrencyLimit executes an array of asynchronous tasks with
a specified concurrency limit, ensuring that no more than the given
number of tasks run concurrently and returning the results once all
tasks are completed.
function runWithConcurrencyLimit<T>(
tasks: Array<() => Promise<T>>,
concurrency: number
): Promise<Array<T>> {
return new Promise((resolve, reject) => {
if (tasks.length === 0) {
resolve([])
}
const results: Array<T> = []
let currentIndex = 0
let completed = 0
const next = () => {
const index = currentIndex++
tasks[index]().then(result => {
results[index] = result
if (++completed >= tasks.length) {
resolve(results)
return
}
if (currentIndex < tasks.length) {
next()
}
}, reject)
}
for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
next()
}
})
}
Stream
Stream represent a sequence of data that can be processed incrementally.
Stream allows you to process data as it arrives, reducing latency, also avoid to overwhelm memory. Think of uploading a large file, with streams you can upload the file to OSS as the browser start uploading.
Node.js has Stream support for a long time. Now Stream is a web standard, but it's not the same Stream. Let's take a look at the standard Streams.
ReadableStream
Readable streams are source of data that can be read from. Examples include files and network responses.
const stream = new ReadableStream({
start(controller) {
controller.enqueue('Hello')
controller.enqueue('world!')
controller.close()
}
})
const reader = stream.getReader()
while (true) {
const {done, value} = await reader.read()
if (done) {
break
}
console.log(value)
}
Stream is async iterable:
for await (const chunk of stream) {
console.log(chunk)
}
WritableStream
Writable streams are destinations for data to be written to. Examples includes files and network requests.
const readableStream = new ReadableStream({
start(controller) {
['Hello', 'world'].forEach(chunk =>
controller.enqueue(chunk)
controller.close()
}
})
const writableStream = new WritableStream({
write(chunk) {
console.log(chunk)
}
})
readableStream.pipeTo(writableStream)
TransformStream
Transform streams allow you to modify data as it flows through a stream.
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
}
})
const uppercaseTransformStream = new TransformStream({
async transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase())
}
})
const transformedStream = readableStream.pipeThrough(uppercaseTransformStream)
for await (const chunk of transformedStream) {
console.log(chunk)
}
Cluster
Node.js's Cluster API allows you improve the performance of your Node.js applications by distributing incoming requests across multiple processes.
import cluster from 'node:cluster'
import http from 'node:http'
import os from 'node:os'
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`)
for (let i = 0; i < os.cpus().length; i++) {
cluster.fork()
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`)
cluster.fork()
})
} else {
http.createServer((req, res) => {
res.writeHead(200)
res.end(`hello from ${process.pid}`)
}).listen(3000)
console.log(`Worker ${process.pid} started`)
}
The child processes aren't really listening to the same port. Incoming socket
connections to the master process are being delegated to the child
processes.
Web Workers
Workers are the way to run javascript off of the main thread. This can be useful to run compute heavy code without blocking the main thread.
Workers are a statdard, supported by browsers and also server side runtimes like Deno. In node.js, there is the worker_thread package.
An Example
Add a worker.ts:
function findPrime(max) {
for (let i = max; i >= 2; i--) {
let isPrime = true
for (let j = 2; j * j <= i; j ++) {
if (i % j === 0) {
isPrime = false
break
}
}
if (isPrime) return i
}
}
self.onmessage = function(event) {
self.postMessage(findPrime(event.data))
}
Here is how you can invoke it from main thread:
const worker = new Worker(
new URL("./worker.ts", import.meta.url).href,
{type: "module"}
)
worker.postMessage(1_000_000_000_000)
self.onmessage = function(event) {
console.log(event.data)
}
$ deno --allow-read main.ts <<<
999999999989
A Callable Worker
In the previous example, it's hard to get the result of the computation, here is a solution:
class CallableWorker {
private worker: Worker
private id = 0
private calls = new Map()
constructor(url: string) {
this.worker = new Worker(url, {type: "module"})
this.worker.onmessage = (event) => {
const [id, err, result] = event.data
const [resolve, reject] = this.calls.get(id)
err ? reject(err) : resolve(result)
this.calls.delete(id)
}
}
call(method: string, args: any[]) {
this.id ++
const id = this.id
this.worker.postMessage([id, method, args]);
return new Promise((resolve, reject) => {
this.calls.set(id, [resolve, reject])
})
}
}
const worker = new CallableWorker(
new URL("./worker.ts", import.meta.url).href
)
console.log(await worker.call("findPrime", [1_000_000_000_000]))
In worker.ts:
function findPrime(max) {
for (let i = max; i >= 2; i--) {
let isPrime = true
for (let j = 2; j * j <= i; j ++) {
if (i % j === 0) {
isPrime = false
break
}
}
if (isPrime) return i
}
}
const methods = {
findPrime
}
self.onmessage = async function(event) {
const [id, method, args] = event.data
if (!methods[method]) {
self.postMessage([id, new Error(`Method not defined: ${method}`)])
return
}
let result, err
try {
result = await methods[method](...args)
} catch (e) {
err = e
}
self.postMessage([id, err, result])
}
Goroutine
Golang was designed with concurrency as a first-class feature. Concurrency is achieved through goroutines and channels. This model is simple yet powerful.
Goroutines are lightweight threads managed by the Go runtime. They will be automatically multiplexed onto multiple OS threads when necessary. So there is no need to create and manage OS thread manualy.
package main
import (
"fmt"
"time"
)
func main() {
go fmt.Print(1)
go fmt.Print(2)
fmt.Print(3)
time.Sleep(time.Millisecond)
}
The output will be 312 or 321.
WaitGroup
In node.js, the process exits when there is no scheduled work. But in go, your program exits when main function exits.
In previouse example, there is a time.Sleep() at the end of the main
function. Without it, the main function will exit immediately and the
goroutines won't be executed.
WaitGroup can be used to wait for multiple goroutines to finish.
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
fmt.Print(1)
wg.Done()
}()
go func() {
fmt.Print(2)
wg.Done()
}()
fmt.Print(3)
wg.Wait()
}
Channel
Channels are used to communicate between goroutines. They are a highly
effective abstraction, the clojure guys reimplemented it as core.async.
send and receive
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
fmt.Println("sending 1")
ch <- 1
fmt.Println("sending 2")
ch <- 2
}()
fmt.Println(" receiving 1")
fmt.Println(" received", <-ch, "receiving 2")
fmt.Println(" received", <-ch)
}
Output:
sending 1
receiving 1
received 1 receiving 2
sending 2
received 2
Or:
receiving 1
sending 1
sending 2
received 1 receiving 2
received 2
buffered channel
Both send and receive are blocking operations, unless the channel is buffered.
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
ch <- 1
fmt.Print(<-ch)
}
close a channel
A channel should be closed, if no more values will be send to it
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go func() {
ch <- 1
close(ch)
}()
val, ok := <-ch
fmt.Println(val, ok)
val, ok = <-ch
fmt.Println(val, ok)
}
Output:
1 true
0 false
iterate over channel
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
for i := range 10 {
time.Sleep(time.Millisecond * 100)
ch <- i
}
close(ch)
}()
for val := range ch {
fmt.Print(val)
}
}
select
select allows you to wait on multiple channels, similar to Promise.race() in JavaScript.
receive Timeout
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(time.Second * 1)
ch <- "1"
}()
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(time.Second * 2):
fmt.Println("timeout")
}
}
Mutex
Mutual exclusion locks are a fundamental synchronization primitive used in many programming languages to protect shared resources from concurrent access.
Go also offers sync.Mutex. It can be easily implemented using a
buffered channel.
There is also RWMutex which allows multiple readers to access the
shared resource concurrently, and ensures that the writer has
exclusive access by blocking other readers and writers.
a naive counter
package main
import (
"fmt"
"sync"
)
type Conter struct {
value int64
}
func (c *Conter) Inc() {
c.value++
}
func (c *Conter) Value() int64 {
return c.value
}
func main() {
counter := &Conter{}
var wg sync.WaitGroup
inc := func() {
counter.Inc()
wg.Done()
}
for i := 0; i < 100; i++ {
wg.Add(3)
go inc()
go inc()
go inc()
}
wg.Wait()
fmt.Println(counter.Value())
}
$ go run counter.go <<<
267
using Mutex
package main
import (
"fmt"
"sync"
)
type Conter struct {
mu sync.Mutex
value int64
}
func (c *Conter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Conter) Value() int64 {
return c.value
}
func main() {
counter := &Conter{}
var wg sync.WaitGroup
inc := func() {
counter.Inc()
wg.Done()
}
for i := 0; i < 100; i++ {
wg.Add(3)
go inc()
go inc()
go inc()
}
wg.Wait()
fmt.Println(counter.Value())
}
$ go run mutex.go <<<
300
implementing Mutex using channel
package main
import (
"fmt"
"sync"
)
type Mutex chan struct{}
func NewMutex() Mutex {
return make(Mutex, 1)
}
func (m Mutex) Lock() {
m <- struct{}{}
}
func (m Mutex) Unlock() {
<-m
}
type Counter struct {
mu Mutex
value int64
}
func NewCounter() *Counter {
return &Counter{mu: NewMutex()}
}
func (c *Counter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int64 {
return c.value
}
func main() {
counter := NewCounter()
var wg sync.WaitGroup
inc := func() {
counter.Inc()
wg.Done()
}
for i := 0; i < 100; i++ {
wg.Add(3)
go inc()
go inc()
go inc()
}
wg.Wait()
fmt.Println(counter.Value())
}
Atomic Number
For simple counter, sync/atomic can be used instead of Mutex.
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var d atomic.Uint32
var wg sync.WaitGroup
wg.Add(100)
for range 100 {
go func() {
d.Add(1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(d.Load())
}
Lock-Free Data Structures
sync/atomic can also be used to implement lock-free data structures.
a lock-free stack
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Node[T any] struct {
value T
next *Node[T]
}
type Stack[T any] struct {
head atomic.Pointer[Node[T]]
}
func (s *Stack[T]) Push(value T) {
node := &Node[T]{value: value}
for {
node.next = s.head.Load()
if s.head.CompareAndSwap(node.next, node) {
return
}
}
}
func (s *Stack[T]) Pop() (T, bool) {
for {
head := s.head.Load()
if head == nil {
return *new(T), false
}
if s.head.CompareAndSwap(head, head.next) {
return head.value, true
}
}
}
func main() {
stack := &Stack[int]{}
for i := range 10 {
stack.Push(i)
}
wg := sync.WaitGroup{}
wg.Add(10)
for range 10 {
go func() {
value, _ := stack.Pop()
fmt.Printf("%d ", value)
wg.Done()
}()
}
wg.Wait()
}
Context
The context package provides a mechanism to manage the flow of execution across API boundaries. It can be used for canceling operations, setting deadlines and propagating values.
In node.js there are AsyncLocalStorage and AbortSignal.
Go's API is pretty dump, no magic. You have to passdown Context to child function calls explicitly.
It's worth mentioning that the API of the context package takes an
immutable approach. When creating a new context with
context.WithTimeout, context.WithDeadline, or context.WithValue,
a new context is returned, leaving the original context
unchanged, which makes the code more predictable.
package main
import (
"context"
"fmt"
"time"
)
func doWork(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println("context canceled")
case <-time.After(time.Second):
fmt.Println("work done")
}
}
func foo(ctx context.Context) {
doWork(ctx)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go foo(ctx)
<-ctx.Done()
}
example: limit http request processing time
package main
import (
"fmt"
"net/http"
"time"
)
func TimeoutMiddleware(timeout time.Duration) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
done := make(chan bool, 1)
go func() {
next(w, r)
done <- true
}()
select {
case <-time.After(timeout):
http.Error(w, "Request Timeout", http.StatusRequestTimeout)
case <-done:
}
})
}
}
func Handler(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second)
fmt.Fprintf(w, "Served on time")
}
func main() {
http.HandleFunc("GET /timeout/1", TimeoutMiddleware(1*time.Second)(Handler))
http.HandleFunc("GET /timeout/3", TimeoutMiddleware(3*time.Second)(Handler))
if err := http.ListenAndServe(":3000", nil); err != nil {
panic(err)
}
}
A Brief History
Java had concurrency support from the begining.
Java 1(1996) includes the Thread class, the Runnable interface and
the synchronized keyword. These features laid the foundation for
multithreading in Java.
Java 5(2004) introduced the java.util.concurrency package with a
comprehensive set of tools for concurrent programming including Future
and Callable, Executors, Locks, Semaphore, Atomic Classes, Concurrent
Collections, and BlockingQueue, making it easier to work with
multithreading and reducing the complexity associated with manually
managing threads.
Java 8(2014): Streams, CompletableFuture. CompletableFuture is currently the primary mechanism for concurrent programming in modern Java.
Java 9(2015): Reactive Stream
Java 21(2023): Virtual Threads (Actually Java had green threads at the begining, but switched to platform threads in version 1.2 in order to take full advantage of multi-core processors.)
Thread
Threads are lightweight process that can run concurrently. They can be created by extending the Thread class:
class MyThread extends Thread {
public void run() {
System.out.println("My thread is running");
}
}
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
With modern java you can compile and run it with a single command:
java Main.java
Another way to create thread is to implement Runnable
class MyThread implements Runnable {
public void run() {
System.out.println("Thread via Runnable");
}
}
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyThread());
thread.start();
}
}
It can be further simplified using lambda:
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Thread as lambda expression");
});
thread.start();
}
}
A java thread might run on different cores through out it's lifetime.
join()
Unlike goroutines, java program won't exit until all threads are finished (unless the threads are daemon threads).
But if you want to wait for a particular thread to finish, use join():
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("Thread started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread finished");
});
thread.start();
thread.join();
System.out.println("Main thread finished");
}
}
Expected output:
Thread started
Thread finished
Main thread finished
You can also pass a timeout when join:
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
System.out.println("Thread started");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread finished");
});
thread.start();
thread.join(1000);
System.out.println(thread.isAlive() ? "Thread is alive" : "Thread is dead");
System.out.println("Main thread finished");
}
}
Expected output:
Thread started
Thread is alive
Main thread finished
Thread finished
The problem of multithreading
A race condition occurs when multiple threads try to access and modify shared data concurrently in a naive way:
public class Main {
private static int counter = 0;
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10; j++) {
counter++;
System.out.println(Thread.currentThread().getName() + " " + counter);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
}
The output can be 38, 47 or other numbers.
synchronized
There are several ways to avoid race conditions, one of them is using syncrhonzied methods or blocks.
synchronized ensures that only one thread can execute it at a time.
syncrhonized blocks
public class Main {
private static int counter = 0;
public static void main(String[] args) {
Object lock = new Object();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
synchronized (lock) {
for (int j = 0; j < 10; j++) {
counter++;
System.out.println(
Thread.currentThread().getName() + " " + counter);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.start();
}
}
}
syncrhonized methods
This is more effecient than the above example, because only the crucial part is synchronzied and the threads can sleep at the same time.
public class Main {
private static int counter = 0;
public synchronized static void count() {
counter++;
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 10; j++) {
count();
System.out.println(
Thread.currentThread().getName() + " " + counter);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread.start();
}
}
}
CountDownLatch
CountDownLatch is quite similar to WaitGroup in go. It is used to wait for a set of operations to complete before proceeding.
import java.util.concurrent.CountDownLatch;
public class Main {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
Runnable task = () -> {
System.out.println(Thread.currentThread().getName() + " is working.");
latch.countDown();
};
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
latch.await();
System.out.println("All threads are done, proceeding with main thread.");
}
}
Expected output:
Thread-1 is working.
Thread-0 is working.
Thread-2 is working.
All threads are done, proceeding with main thread.
CyclicBarrier
A CyclicBarrier is like a rendezvous point for threads. All threads need to reach certain point(barrier) before
they can continue.
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
public class Main {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads have reached the barrier");
});
Runnable task = () -> {
String name = Thread.currentThread().getName();
try {
System.out.println(name + " is performing task 1.");
Thread.sleep(1000);
// signal that current task have passed the barrier and wait for others
barrier.await();
System.out.println(name + " has passed the first barrier. starting task 2");
Thread.sleep(1000);
barrier.await();
System.out.println(name + " has passed the second barrier.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
}
}
Expected output:
Thread-0 is performing task 1.
Thread-1 is performing task 1.
Thread-2 is performing task 1.
All threads have reached the barrier
Thread-1 has passed the first barrier. starting task 2
Thread-2 has passed the first barrier. starting task 2
Thread-0 has passed the first barrier. starting task 2
All threads have reached the barrier
Thread-0 has passed the second barrier.
Thread-1 has passed the second barrier.
Thread-2 has passed the second barrier.
Locks
Locks are more sophisticated than synchronized blocks or methods.
ReentrantLock
ReentrantLock is reentrant, meaning that a thread can lock the same
lock multiple times. This is different from the synchronized keyword,
which would cause a thread to deadlock if it tried to re-enter a
synchronized block it already holds. It's useful when it's not easy to
keep track of whether you've already acquired a lock.
import java.util.concurrent.locks.ReentrantLock;
public class Main {
private static final ReentrantLock lock = new ReentrantLock();
private static int result;
public static void fib(int num) {
System.out.println("lock " + num);
lock.lock();
if (num == 1 || num == 2) {
result += 1;
} else {
fib(num - 1);
fib(num - 2);
}
System.out.println("unlock " + num);
lock.unlock();
}
public static void main(String[] args) {
result = 0;
fib(5);
System.out.println(result);
}
}
Expected output:
lock 5
lock 4
lock 3
lock 2
unlock 2
lock 1
unlock 1
unlock 3
lock 2
unlock 2
unlock 4
lock 3
lock 2
unlock 2
lock 1
unlock 1
unlock 3
unlock 5
5
ReadWriteLock
ReadWriteLock allows multiple threads to acquire read lock concurrently but
ensures that only one thread can write at a time.
There is also ReentrantReadWriteLock.
ReadWriteLock rwLock = new ReentrantReadWriteLock();
rwLock.readLock().lock();
rwLock.readLock().unlock();
StampedLock
StampedLock provides optimistic read lock beside read and write lock.
Optimistic Read Lock tryOptimisticRead() Allows threads to
optimistically read data without blocking, and later verify whether
the read was successful or invalidated by a write.
This reduces contention and improves throughput when writes are infrequent.
import java.util.concurrent.locks.StampedLock;
class Counter {
private int count = 0;
private final StampedLock lock = new StampedLock();
public void increment() {
long stamp = lock.writeLock();
count++;
lock.unlockWrite(stamp);
System.out.println("incremented: " + count);
}
public int getCount() {
long stamp = lock.tryOptimisticRead();
int currentCount = count;
if (!lock.validate(stamp)) {
System.out.println("invalid read, retry");
stamp = lock.readLock();
currentCount = count;
lock.unlockRead(stamp);
}
return currentCount;
}
}
public class Main {
public static void main(String[] args) {
Counter counter = new Counter();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
counter.increment();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Read Count: " + counter.getCount());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
Expected output:
incremented: 1
Read Count: 1
incremented: 2
Read Count: 2
incremented: 3
invalid read, retry
Read Count: 3
Read Count: 3
incremented: 4
Read Count: 4
incremented: 5
Semaphore
A Semaphore is like a Lock, except that a Lock allows only one thread to access a resource at a time, a semaphore allows a specific limit number of threads to access a resource.
Semaphore is more versatile then a Lock. It can be used to limit access to a resource pool, such as a fixed number of database connections. It can also be used to limit the number of concurrent tasks.
import java.util.concurrent.Semaphore;
public class Main {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
Runnable task = () -> {
try {
String name = Thread.currentThread().getName();
System.out.println(name + " is trying to acquire a permit...");
semaphore.acquire();
System.out.println(name + " has acquired a permit and is working...");
Thread.sleep(2000);
semaphore.release();
System.out.println(name + " has released a permit.");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
new Thread(task).start();
}
}
Expected output:
Thread-0 is trying to acquire a permit...
Thread-1 is trying to acquire a permit...
Thread-1 has acquired a permit and is working...
Thread-2 is trying to acquire a permit...
Thread-3 is trying to acquire a permit...
Thread-0 has acquired a permit and is working...
Thread-2 has acquired a permit and is working...
Thread-1 has released a permit.
Thread-3 has acquired a permit and is working...
Thread-0 has released a permit.
Thread-2 has released a permit.
Thread-3 has released a permit.
Exchanger
Exchanger can be used to sync data bewteen two threads.
import java.util.concurrent.Exchanger;
public class Main {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Runnable task = () -> {
try {
String name = Thread.currentThread().getName();
// send out message and wait for message from others
String response = exchanger.exchange("hello from " + name);
System.out.println(name + " received: " + response);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(task).start();
new Thread(task).start();
}
}
Expected output:
Thread-0 received: hello from Thread-1
Thread-1 received: hello from Thread-0
BlockingQueue
BlockingQueue is like channel in go.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
// producer
new Thread(() -> {
try {
for (int i = 1; i <= 6; i++) {
queue.put(i);
System.out.println("< queued: " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// consumer
new Thread(() -> {
try {
while (true) {
Integer item = queue.take();
Thread.sleep(500);
System.out.println("> consumed: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
Expected output:
< queued: 1
< queued: 2
< queued: 3
< queued: 4
> consumed: 1
< queued: 5
> consumed: 2
< queued: 6
> consumed: 3
> consumed: 4
> consumed: 5
> consumed: 6
Atomic Values
Atomic values use CAS-like (Compare-and-Swap) techniques to ensure that operations on variables are atomic. Which means that when an operation is performed, the operation either fully succeeds or fully fails.
Atomic values are high-performance, thread-safe alternatives to other synchronization mechanisms like locks.
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static void main(String[] args) {
final AtomicInteger counter = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
counter.set(counter.get() + 1);
}).start();
}
System.out.println(counter.get());
}
}
Executor
Executor making it easier to manage thread execution. Instead of manage thread manually, you submit tasks to Executor, the tasks will be run concurrently within a pool of threads.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
});
executor.submit(() -> {
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
}
});
executor.shutdown();
}
}
The output shoul look like following:
pool-1-thread-1 0
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5
pool-1-thread-2 0
pool-1-thread-1 6
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-2 3
pool-1-thread-2 4
pool-1-thread-2 5
pool-1-thread-2 6
pool-1-thread-2 7
pool-1-thread-2 8
pool-1-thread-2 9
pool-1-thread-1 7
pool-1-thread-1 8
pool-1-thread-1 9
Submit more tasks:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable task = () -> {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " " + i);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.submit(task);
executor.submit(task);
executor.submit(task);
executor.shutdown();
}
}
Still two threads running at the same time:
pool-1-thread-2 0
pool-1-thread-1 0
pool-1-thread-1 1
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-1 2
pool-1-thread-2 3
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-2 4
pool-1-thread-2 0
pool-1-thread-2 1
pool-1-thread-2 2
pool-1-thread-2 3
pool-1-thread-2 4
Virtual Threads
Using virtual threads via Executors.newVirtualThreadPerTaskExecutor():
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class Main {
public static void main(String[] args) throws InterruptedException {
Instant startTime = Instant.now();
// ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
AtomicLong memUsage = new AtomicLong(0L);
for (int i = 0; i < 100_000; i++) {
executor.submit(() -> {
try {
Thread.sleep(100);
Long usage = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / 1024 / 1024;
if (memUsage.get() < usage) {
memUsage.set(usage);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
System.out.println("Execution time: " + Duration.between(startTime, Instant.now()) + " Memory: " + memUsage.get());
}
}
It's obvious that virtual threads are much more efficient than the platform thread:
Execution time: PT4.35937S Memory: 1876
Execution time: PT0.410156S Memory: 203
Future
Futures provide a higher-level abstraction that simplifies asynchronous programming very similar to Promise in Javascript.
Future is usually used with Executor:
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000);
return 1;
});
System.out.println(future.get());
executor.shutdown();
}
}
It's also possible to create Future mannualy:
import java.util.concurrent.FutureTask;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
Thread.sleep(1000);
return 1;
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
CompletableFuture
CompletableFuture provides a more composable and flexible way to chain
and combine Futures. You can use methods like thenApply,
thenCombine, etc., to chain operations.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 1;
});
CompletableFuture<Integer> result = future.thenApply((num) -> {
return num * 2;
});
System.out.println(result.get());
}
}
Handling exception
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Boom!");
}
return 1;
});
future
.exceptionally(e -> {
System.out.println(e.getMessage());
return 0;
})
.thenAccept(result -> System.out.println("result: " + result));
}
}
Structrued Concurrency
Structrued concurrency put multiple tasks in to a scope. If a task fails, the scope automatically cancels other tasks within it.
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.ExecutionException;
void main() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> {
Thread.sleep(2000);
System.out.println("Task 1 Completed");
return 1;
});
var task2 = scope.fork(() -> {
Thread.sleep(1000);
return 1 / 0;
});
var task3 = scope.fork(() -> {
Thread.sleep(500);
System.out.println("Task 3 Completed");
return null;
});
scope.join().throwIfFailed();;
} catch (Exception e) {
System.err.println("Exception while executing tasks: " + e.getMessage());
}
}
It's still in preview as of java 23, you need to pass the --enable-preview flag:
java --enable-preview Main.java <<<
Expected output:
Task 3 Completed
Exception while executing tasks: java.lang.ArithmeticException: / by zero
OTP
OTP (Open Telecom Platform) is a collection of tools and libraries for building fault-tolerant, scalable and concurrent systems in Erlang.
It was originally developed at Ericsson, a telecom company.
Elixir runs on the Erlang VM (BEAM) and it can fully leverages OTP as well.
The Actor Model
Like Erlang, Elixir uses the Actor Model. In the Actor Model, actors are the fundamental units of computation.
An actor is an independent, self-contained entity with its own state. There is no shared state between the actors.
An actor can change its internal state and can send messages to and receive messages from other actors.
An actor can also create new actors.
Because actors work independently and interact only by passing messages, this approach make it much easier to write highly concurrent and fault-tolerant systems.
Process
Process follows the Actor model.
Spawn a process
Use spawn to create a new process, it will return a Process Identifier (PID):
defmodule Adder do
def add(a, b) do
IO.puts(a + b)
end
end
iex> pid = spawn(Adder, :add, [1, 1])
2
#PID<0.110.0>
Receiving and sending messages
Use send to send message to a process:
defmodule Foo do
def listen do
receive do
msg -> IO.puts("received #{msg}")
end
end
end
iex> pid = spawn(Foo, :listen, [])
#PID<0.117.0>
iex> send(pid, "hello")
received hello
But if you send message to the process again, nothing happens, the process is dead.
iex> Process.alive?(pid)
false
To fix this, we need to call listen recursively:
defmodule Foo do
def listen do
receive do
msg -> IO.puts("received #{msg}")
end
listen()
end
end
Linked Process
If you need to make sure the process is spawned succssfully. Use
spawn_link instead of spawn will link the newly created process to
the current one. Two linked processes will receive exit signals from
one anther.
defmodule Foo do
def boom do
exit(:boom)
end
end
pid = spawn_link(Foo, :boom , [])
** (EXIT from #PID<0.104.0>) shell process exited with reason: :boom
By default the exit signal will crash the process.
Trap the exit signal
Using Process.flag(:trap_exit, true) to trap exit notifications.
defmodule Foo do
def boom do
exit(:boom)
end
def run do
Process.flag(:trap_exit, true)
spawn_link(Foo, :boom, [])
receive do
{:EXIT, _pid, reason} -> IO.puts("Exit reason: #{reason}")
end
end
end
iex> Foo.run
Exit reason: boom
Monitoring
Another way is to use spawn_monitor to receive process down signal:
defmodule Foo do
def boom do
exit(:boom)
end
def run do
spawn_monitor(Foo, :boom , [])
receive do
{:DOWN, _ref, :process, _from_pid, reason} -> IO.puts("Exit reason: #{reason}")
end
end
end
Foo.run
Agent
Agent is used to manage shared state. It is an abstraction over the low-level process management.
Actors are entities that encapsulate state and communicate by passing messages. Agents are often seen as "facilitators" or "intermediaries" for managing state and behavior in a controlled way.
The term "agent" is widely used in concurrent and distributed programming models. Other languages like Clojure also use similar terminology.
{:ok, agent} = Agent.start(fn -> 0 end)
Agent.get(agent, fn state -> state end)
Agent.update(agent, fn (state) -> state + 1 end)
Agent.get(agent, fn state -> state end)
Task
Task provides a way to run jobs in the background and get its result later. Similar to Future or Promise in other languages.
task = Task.async(fn ->
:timer.sleep(1000)
:ok
end)
result = Task.await(task)
IO.puts("Task result: #{result}")
Stream
Unlike Enum, which eagerly evaluates a collection, Stream defers
execution until it's absolutely needed — ideal for working with large
datasets as it is more efficient and memory-friendly.
lazy enumerables
Stream.iterate(1, fn x -> x + 1 end)
|> Stream.filter(fn x -> rem(x, 2) == 0 end)
|> Enum.take(4)
[2, 4, 6, 8]
jsonl processing
{"id": 1, "name": "Alice", "role": "developer"}
{"id": 2, "name": "Bob", "role": "designer"}
{"id": 3, "name": "Charlie", "role": "manager"}
{"id": 4, "name": "Diana", "role": "developer"}
{"id": 5, "name": "Eve", "role": "intern"}
...
File.stream!("data.jsonl")
|> Stream.map(&JSON.decode/1)
|> Stream.map(fn {:ok, record} -> record end)
|> Enum.take(1)
[%{"id" => 1, "name" => "Alice", "role" => "developer"}]
GenServer
One of the most widely used OTP abstractions is GenServer. A GenServer (generic server) provides a standardized interface for handling sync and async requests and managing state.
defmodule Counter do
use GenServer
def init(initial_value) do
{:ok, initial_value}
end
# sync calls
def handle_call(:get_count, _from, state) do
{:reply, state, state}
end
# async calls
def handle_cast(:incr, state) do
{:noreply, state + 1}
end
def handle_cast(:decr, state) do
{:noreply, state - 1}
end
end
Use the Counter:
GenServer.start_link(Counter, 0, name: Counter)
GenServer.cast(Counter, :incr)
GenServer.call(Counter, :get_count) # 1
GenServer.cast(Counter, :decr)
GenServer.call(Counter, :get_count) # 0
Add helper functions
defmodule Counter do
use GenServer
def init(initial_value) do
{:ok, initial_value}
end
# sync calls
def handle_call(:get_count, _from, state) do
{:reply, state, state}
end
# async calls
def handle_cast(:incr, state) do
{:noreply, state + 1}
end
def handle_cast(:decr, state) do
{:noreply, state - 1}
end
# helper functions
def incr, do: GenServer.cast(__MODULE__, :incr)
def decr, do: GenServer.cast(__MODULE__, :decr)
def get_count, do: GenServer.call(__MODULE__, :get_count)
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
end
Now we can have better APIs:
Counter.start_link(0)
Counter.incr
Counter.get_count
Counter.decr
Counter.get_count
GenServers are a higher-level abstraction than processes and can be used with Supervisors.
Supervisor
Supervisors are a fundamental concept in OTP. They monitor and manage the lifecycle of processes, restarting them if they fail.
Create a application using mix:
mix new counter --sup
lib/counter/application.ex
defmodule Counter.Application do
use Application
def start(_type, _args) do
children = [
{Counter, 0}
]
opts = [strategy: :one_for_one, name: Counter.Supervisor]
Supervisor.start_link(children, opts)
end
end
put the counter code into lib/counter.ex
defmodule Counter do
use GenServer
def init(initial_value) do
{:ok, initial_value}
end
# sync calls
def handle_call(:get_count, _from, state) do
{:reply, state, state}
end
# async calls
def handle_cast(:incr, state) do
{:noreply, state + 1}
end
# helper functions
def incr, do: GenServer.cast(__MODULE__, :incr)
def get_count, do: GenServer.call(__MODULE__, :get_count)
def start_link(initial_value) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
end
iex -S mix
iex(1)> Counter.get_count
0
iex(2)> Counter.incr
:ok
iex(3)> Counter.get_count
1
Distributed Systems
OTP's support for distributed systems is something that set Erlang and Elixir apart from other programming languages.
The BEAM vm is designed from the ground up to support distributed computing. With minimal efforts, you can have multiple Erlang/Elixir nodes running on different machines that can seamlessly communicate with each other.
Those nodes can form a cluster out of the box. Nodes can automatically connect to each other, share state, and communicate as if they were a single system, even if they're running on different machines or across data centers.
In most other languages, it usually require external libraries, frameworks, or infrastructure to provide essential services to set up distributed systems. Which includes:
- Service Discovery
- RPC
- Load balancing
- Auto-scaling
- Self-healing
- Tracing
iex --sname foo@localhost -S mix
iex --sname bar@localhost -S mix
iex(bar@localhost)1> Node.spawn_link(:foo@localhost, fn -> Counter.incr end)
iex(foo@localhost)1> Counter.get_count
1
Theads
Because of GIL, threads in ruby are not very useful for CPU-bound tasks. But they can still be used for IO-bound operations.
thread = Thread.new do
sleep 1
puts "Hello"
end
thread.join
Concurrent IO
require 'net/http'
require 'uri'
urls = [
"https://developer.mozilla.org/",
"https://elixir-lang.org/",
]
threads = urls.map do |url|
Thread.new do
response = Net::HTTP.get_response(URI.parse(url))
puts "#{response.code} #{url}"
end
end
threads.each(&:join)
Threads are hard. There are tools like Mutex and Queue in the
thread module to achieve synchronization and data sharing. But you
don't have to work with threads. There are better options.
Fiber
Fiber allows you to pause and resume the execution of code explicitly, can be used to implement generators and coroutines.
yield and resume
fib = Fiber.new do
puts "2. fib can be paused using yield"
Fiber.yield
puts "4. fib resumed again"
end
puts "1. fib are paused when created, resume to start"
fib.resume
puts "3. paused fib can be resumed"
fib.resume
puts "5. done"
yield and resume with values
fib = Fiber.new do
puts Fiber.yield('1. yielded value')
'3. done'
end
puts fib.resume
puts fib.resume('2. resumed value')
Non-Blocking Fibers
Non-blocking Fibers is added in Ruby 3.0.
The async gem is built on top of non-blocking fibers.
Async
Async is a gem, not a language feature.
bundle add async
Async and wait
The top level Async block is needed to create a reactor context to
do the scheduing, otherwise the first puts won't get executed utill
the task is finished.
require 'async'
Async do
task = Async do
sleep 1
rand
end
puts "wait for 1 second"
puts task.wait
end
Concurrency
Async do
tasks = (1..5).map do |x|
Async do
sleep 1
x
end
end
puts tasks.map(&:wait)
end
Limiting Concurrency Level
require 'async'
require 'async/semaphore'
Async do
semaphore = Async::Semaphore.new(2)
(1..5).each do |x|
semaphore.async do
puts "#{x} started"
sleep 1
puts " #{x} finished"
end
end
end
Waiting for tasks
Barrier can be used to wait for a group of tasks, but you can just
simplely do tasks.each(&:wait).
Waiter can be used to wait for a specific number of tasks from all
tasks, kind of like Promise.race in js or select in go, but more
flexible.
Barrier and Waiter can be used together.
require 'async'
require 'async/waiter'
require 'async/barrier'
barrier = Async::Barrier.new
waiter = Async::Waiter.new(parent: barrier)
Async do
[3, 2, 1].each do |x|
waiter.async do
sleep x
x
end
end
puts waiter.wait(2).inspect
# stop the remaining jobs
barrier.stop
end
this should puts
[1, 2]
Timeouts
require 'async'
Async do |task|
task.with_timeout(0.5) do
sleep rand
rescue Async::TimeoutError
puts "timeout!"
end
end
Ractor
Ractor is experimental, and the behavior may change in future versions
of Ruby! Also there are many implementation issues.
async is a solution for concurrency. Ractor is for parallelism.
ractors = (1..6).map do |x|
Ractor.new(x) do |id|
result = (1..1_000_000_000).reduce(:+)
Ractor.yield([id, result])
end
end
puts ractors.map(&:take).sort_by(&:first).map(&:last).inspect
Message Passing
Ractor also follows the actor model(ruby actor?). Each actor mantains inner state, and communicates using messages.
actor = Ractor.new do
state = 0
loop do
message = Ractor.receive
case message
when :increment
state += 1
Ractor.yield(state)
when :get_state
Ractor.yield(state)
when :exit
break
end
end
end
actor.send(:increment)
puts actor.take
actor.send(:get_state)
puts actor.take
actor.send(:exit)
Thread
Like other scripting languages, python threads are good for IO-bound tasks, but not ideal for CPU-bound tasks.
import threading
import time
def task(id):
time.sleep(1)
print(id)
threads = [threading.Thread(target=task, args=(i,)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Output:
3
2
1
4
0
asyncio
Use the async/await syntax to declare coroutines, then use asyncio.run() to schedule it.
import asyncio
async def main():
await asyncio.sleep(1)
print('done')
asyncio.run(main())
Wait for multiple tasks to complete
Use asyncio.gather
import asyncio
async def task(id):
await asyncio.sleep(1)
print(id)
return id
async def main():
tasks = [task(i) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Output:
0
1
2
3
4
[0, 1, 2, 3, 4]
TaskGroup
TaskGroup works like WaitGroup or Barrier in other languages, but you don't have to explicitly wait for it.
import asyncio
import random
async def task(id):
await asyncio.sleep(random.random())
print(id)
return id
async def main():
async with asyncio.TaskGroup() as g:
task1 = g.create_task(task(1))
task2 = g.create_task(task(2))
print(f"done: {task1.result()}, {task2.result()}")
asyncio.run(main())
Handling exceptions
asyncio.gather works much like Promise.all, if an exception is
raised, it immediately propagated to the task that awaits on
gather(). And other tasks will continue to run.
asyncio.gather also accept keyword argument return_exceptions, when set to True, it works more like Promise.allSettled.
import asyncio
async def task(id):
await asyncio.sleep(1)
print(id)
return id
async def main():
tasks = [task(i) for i in range(5)]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(results)
asyncio.run(main())
Output:
[ZeroDivisionError('division by zero'), 1.0, 0.5, 0.3333333333333333, 0.25]
Wait for the first task
asyncio.wait gives you more control than asyncio.gather:
import asyncio
import random
async def task(id):
await asyncio.sleep(random.random())
print(id)
return id
async def main():
# must be a list of Tasks, not coroutines
tasks = [asyncio.create_task(task(i)) for i in range(5)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print("done", [x.result() for x in done])
print("pending tasks:", len(pending))
for x in pending:
x.cancel()
asyncio.run(main())
Output:
2
done [2]
pending tasks: 4
Timeout
When timed out, the task is also canceled.
import asyncio
import random
async def task():
await asyncio.sleep(random.random())
print("done")
async def main():
try:
async with asyncio.timeout(0.5):
await task()
except TimeoutError:
print("Timed out")
asyncio.run(main())