Go – running fixed number of concurrent go routines

This snippet is about the control of running a fixed number of concurrent worker functions concurrently.

Lets start with a basic version, that does no error checking at all and that will simply panic in case one of the concurrent processes crashes.

Snippet 1


package main

import "fmt"

func main(){
   parallel(func(x int) error {
    fmt.Println("Parameter:", x)
    return nil
  }, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, 3)
}

func parallel(fn func(int) error, sq []int, p int) error {
  var q = make(chan bool, p)
  defer func() { close(q) }()
  for _, s := range sq {
    q <- true
    go func(s int) {
      fn(s)
      <-q
    }(s)
  }
  for len(q) > 0 {
  }
        return nil
}

The function parallel takes the function to run in parallel (fn), a slice of parameters to start the concurrent workers (sq) with and the number of workers, you want to run in parallel. First a channel q of the capacity p (the number of concurrent processes) is instantiated. The defer function will care for proper closing. The for-loop takes care for starting the go routines with the appropriate values. By filling up the channel q the loop execution is blocked if the capacity is exhausted and p processes are running. Each return of a goroutine receives once from q and thus allows for a new loop until all parameter values are issued to the worker functions. The first for-loop exits and the second for-loop cares that all coroutines to finish before parallel returns.

Make sure your function fn in calling parallel(fn, sq, p) meets the function type (=signature) and the value table is adjusted properly then you are done with.

Note, that Snippet 1 panics if any of the concurrent processes crashes. Furthermore the second for loop will run in an endless loop, if one of the go routines didn’t return because ‚<-q' won't happen than. Such a code works well if the function called concurrently is guaranteed to non-panic or block (or if the project is an a state, where such panicking helps to improve logic). Snippet 2 cares for recovery and returns general error information and information on the parameter value that resulted in the crash.

Snippet 2


package main

import "fmt"
import "errors"

func main(){
    err, errlist := parallel(func(x int) error {
    fmt.Println(x, 100/x)
    return nil
    }, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0}, 3)
    if err != nil {
  fmt.Printf("Error! -> %v\n", err)
  for _, err := range errlist {
    fmt.Println("  ", err)
  }
    }
}

func parallel(fn func(int) error, sq []int, p int) (err error, errlist []error) {
  var q = make(chan bool, p)
  var e = make(chan error, len(sq))
  defer func() {
    close(q)
    close(e)
  }()
  for _, s := range sq {
    q <- true
    go func(s int) {
      defer func() {
        if P := recover(); P != nil {
          e <- errors.New(fmt.Sprintf("func(%v) crashed: %v", s, P))
        }
        <-q
      }()
      fn(s)
    }(s)
  }
  for len(q) > 0 {
  }
  for len(e) > 0 {
    err = errors.New(fmt.Sprintf("%d concurrent process crashed", len(e)))
    errlist = append(errlist, <-e)
  }
  return err, errlist
}

------------------------------
Output from running this code.
3 33
4 25
5 20
6 16
1 100
7 14
2 50
9 11
10 10
8 12
Error! -> 1 concurrent process crashed
   func(0) crashed: runtime error: integer divide by zero

This gives better control on crashes and allows the main process to proceed even in case one of the concurrent functions crashes. Anyway it does not care properly for a concurrent function not returning.

Furthermore errors are printed out after all processes finished, which might be inconvenient on long running processes.

In the next Snippet 3 errors are channeled out to a concurrent running receiver loop.


package main

import "fmt"
import "errors"

func main(){
  // -> routine 4 concurrent error / result output
  errChan := make(chan error, 11)
  //ergChan := make(chan int, 11) // enable concurrent result output
  alldone := make(chan bool, 1)
  go func() {
    for done := false; !done; {
      select {
      case e := <-errChan:
        fmt.Println("Error -> ", e)
      // case ee := <-ergChan: // enable concurrent result output
      //   fmt.Println("Result -> ", ee)
      case done = <-alldone:
        break
      }
    }
  }()
  // <-

  parallel(func(x int) error {
    r := 100 / x
    fmt.Println(x, r)
    // ergChan <- r
    return nil
  }, []int{1, 2, 3, 0, 4, 5, 0, 6, 0, 7, 8, 9, 10}, 3, errChan)

  // let error output goroutine exit
  close(alldone)

  // ....
}

// func parallel with errors returned through an error channel
func parallel(fn func(int) error, sq []int, p int, eC chan error) {
  var q = make(chan bool, p)
  defer func() {
    close(q)
  }()
  for _, s := range sq {
    q <- true
    go func(s int) {
      defer func() {
        if P := recover(); P != nil {
          eC <- errors.New(fmt.Sprintf("func(%v) crashed: %v", s, P))
        }
        <-q
      }()
      eC <- fn(s)
    }(s)
  }
  for len(q) > 0 {
  }
} 

I am aware that there is a number possibilities to make sure the number of concurrent running processes remains stable including the use of patterns to loop with a partial number of wait groups or monitoring a running counter and the like. None the less i think such use of channels is a particular elegant way to manage this problem.