Skip to content
Snippets Groups Projects
Commit 9f1c890f authored by Marcus Schiesser's avatar Marcus Schiesser
Browse files

consumers of gofabric can read unlimited messages

parent b872100e
No related branches found
No related tags found
No related merge requests found
...@@ -6,49 +6,61 @@ import ( ...@@ -6,49 +6,61 @@ import (
) )
const ( const (
MESSAGES = 10 MESSAGES = 6
PRODUCERS = 3 PRODUCERS = 2
CONSUMERS = 5 CONSUMERS = 4
) )
var ( var (
ch chan string done chan struct{}
wg sync.WaitGroup ch chan string
wgProducers sync.WaitGroup
wgConsumers sync.WaitGroup
) )
func main() { func main() {
ch = make(chan string) ch = make(chan string)
done = make(chan struct{})
for i := 1; i <= PRODUCERS; i++ { for i := 1; i <= PRODUCERS; i++ {
createProducer(i, MESSAGES/PRODUCERS) if i != PRODUCERS {
createProducer(i, MESSAGES/PRODUCERS)
} else {
createProducer(i, MESSAGES/PRODUCERS+MESSAGES%PRODUCERS)
}
} }
createProducer(0, MESSAGES%PRODUCERS)
for i := 1; i <= CONSUMERS; i++ { for i := 1; i <= CONSUMERS; i++ {
createConsumer(i, MESSAGES/CONSUMERS) createConsumer(i)
} }
createProducer(0, MESSAGES%CONSUMERS) // wait for producers
// wait for producer and consumers wgProducers.Wait()
wg.Wait() close(done)
// wait for consumers
wgConsumers.Wait()
fmt.Println("Consumers and producers are finished.") fmt.Println("Consumers and producers are finished.")
} }
func createConsumer(i, messages int) { func createConsumer(i int) {
fmt.Printf("Starting consumer %d\n", i) fmt.Printf("Starting consumer %d\n", i)
wg.Add(1) wgConsumers.Add(1)
go func() { go func() {
defer wg.Done() defer wgConsumers.Done()
for m := 1; m <= messages; m++ { for {
msg := <-ch select {
fmt.Printf("Consumer %d received message: %v\n", i, msg) case <-done:
fmt.Printf("Consumer %d done\n", i)
return
case msg := <-ch:
fmt.Printf("Consumer %d received message: %v\n", i, msg)
}
} }
fmt.Printf("Consumer %d done\n", i)
}() }()
} }
func createProducer(i, messages int) { func createProducer(i, messages int) {
fmt.Printf("Starting producer %d\n", i) fmt.Printf("Starting producer %d\n", i)
wg.Add(1) wgProducers.Add(1)
go func() { go func() {
defer wg.Done() defer wgProducers.Done()
for m := 1; m <= messages; m++ { for m := 1; m <= messages; m++ {
ch <- fmt.Sprintf("Msg %d from producer %d", m, i) ch <- fmt.Sprintf("Msg %d from producer %d", m, i)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment