18 minutes read

Very often, there are problems related to data synchronization when processing data. In this topic, you will learn about possible solutions to synchronization tasks using channels.

This topic covers the following synchronization options:

  • between the parent goroutine and the child (when the goroutine is declared in the parent scope);
  • between independent goroutines;
  • synchronization by time;
  • synchronization of access to shared resources.

Await the result

The simplest case of synchronization is waiting for the result. For example, suppose you have a file with more than a million values. You need to find empty data values in this file and fill them in. The necessary data is stored on a remote server in the same type of file. This creates two parallel and independent processes — data loading and empty value finding. However, before comparing the data, both processes must be completed. Let's consider an example of such synchronization:

package main

import (
    "fmt"
    "time"
)

func main() {
    sync := make(chan bool, 1)

    go func() {
        fmt.Println("request the data")
        time.Sleep(time.Second)

        sync <- true
    }()

    fmt.Println("find empty values")

    fmt.Println("await the request")
    <-sync

    fmt.Println("compare data")
}

// Output:
// find empty values
// await the request
// request the data
// compare data

This synchronization approach is based on the fact that the goroutine waits for the completion of the read operation from the channel. The read operation is impossible as long as there is no data in the channel. As soon as the child goroutine sends a signal to the sync channel, it means that the data has been received and work can be continued.

This principle is fundamental in synchronization using channels. The parent goroutine acts as a synchronization provider.

Synchronization of multiple goroutines

In the next task, you synchronize the work of several goroutines. Let's call them workers. There is a single channel from which all workers receive data. The task of the workers is to receive the data and process it.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    tasksPipe := make(chan int)

    workers := 2
    done := make(chan bool, workers)
    for worker := 0; worker < workers; worker++ {
        go func(id int) {
            doWork(tasksPipe, done, id)
        }(worker)
    }

    tasks := 5
    for task := 0; task < tasks; task++ {
        fmt.Printf("task #%d is initialized\n", task)
        tasksPipe <- task
    }

    fmt.Println("tasks are over")
    close(tasksPipe)

    // await till workers done all jobs
    for ; workers > 0; workers-- {
        <-done
    }
}

func doWork(tasksPipe <-chan int, done chan<- bool, workerID int) {
    for task := range tasksPipe {
        fmt.Printf("worker %d process a task #%d\n", workerID, task)
        time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
    }

    done <- true
    fmt.Printf("worker %d finished\n", workerID)
}

// Output:
// task #0 is initialized
// task #1 is initialized
// task #2 is initialized
// worker 1 process a task #1
// worker 0 process a task #0
// worker 1 process a task #2
// task #3 is initialized
// task #4 is initialized
// worker 0 process a task #3
// worker 1 process a task #4
// tasks are over
// worker 1 finished
// worker 0 finished

In this case, the synchronization signal is the closing (close(tasksPipe)) of the channel with tasks. Reading data from the for task := range tasksPipe {...} loop for each worker creates an infinite loop until the channel is closed.

This approach is effective when distributing workload among CPU cores. The number of workers should match the number of available cores to maximize the usage of system resources and complete the task faster.

The semaphore principle

In the previous example, there is a queue of tasks that are executed when resources are released. Let's consider another example when there is a queue of workers, but the amount of resources is limited.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    workers := 10
    done := make(chan bool, workers)

    limit := 2
    sources := make(chan bool, limit)

    for worker := 0; worker < workers; worker++ {
        go func(id int) {
            doWork(sources, done, id)
        }(worker)
    }

    // await till workers done all jobs
    for ; workers > 0; workers-- {
        <-done
    }
}

func doWork(sources chan bool, done chan<- bool, workerID int) {
    sources <- true // acquiring a resource

    fmt.Printf("worker %d is working\n", workerID)
    time.Sleep(time.Millisecond * time.Duration(rand.Intn(200)))
    done <- true

    <-sources // release a resource
}

// Output:
// worker 0 is working
// worker 9 is working
// worker 4 is working
// worker 3 is working
// worker 1 is working
// worker 7 is working
// worker 6 is working
// worker 8 is working
// worker 5 is working
// worker 2 is working

In this example, the sources channel is used for synchronization. Each worker locks the resource (tries to write true to the sources channel) before starting to process it. If this is not possible (the sources channel is limited by the limit variable), then the goroutine is blocked until the write becomes possible.

As an example, consider a file system. Imagine you have a file that collects logs from various applications in the system. Each time an application wants to write a log, the file must be locked or wait until the file is available for writing.

Rate limit

In the context of the topic, the last type is synchronization by time. Let's say you have a pool of tasks that need to be executed at a specific interval. One way to achieve this is by using a time ticker (time.Tick), which allows you to schedule a periodic event.

package main

import (
    "fmt"
    "time"
)

func main() {
    tasksCount := 5
    sync := time.Tick(500 * time.Millisecond)

    for ; tasksCount > 0; tasksCount-- {
        <-sync
        fmt.Printf("tasks left: %d\n", tasksCount)
    }
}

// Output:
// tasks left: 4
// tasks left: 3
// tasks left: 2
// tasks left: 1
// tasks left: 0

In the example, each task is processed with an interval of half a second. When using this approach, it is important to keep in mind that the interval of the synchronization timer should be greater than or equal to the time taken to process a task.

This approach can be used for network requests. For example, some websites have built-in protection against the number of requests per second or, for technical reasons, may not have the ability to handle many requests. In this case, using a timer will allow the load to be taken off the receiving service. In other words, using a time ticker allows you to manage the rate of requests being made to a particular service, enabling you to avoid overloading or overwhelming the service.

Conclusion

Today you learned about four types of synchronization using Go channels. Here are the main points of the topic:

  • you can use channel read-write operations for synchronization (if a read operation from the channel is impossible, the goroutine will wait until the operation becomes available; the same holds true for write operations);
  • you can use the close() operation on the channel as a synchronization signal (in cases where goroutines continuously wait for data from the channel);
  • you can block access to resources using a channel (before using a resource, write to the channel, and after releasing the resource, read from the channel);
  • you can use a timer or ticker channel for synchronization by time.
4 learners liked this piece of theory. 0 didn't like it. What about you?
Report a typo