From 9f1c890fc85c19c572d6a285142d06618d50af0b Mon Sep 17 00:00:00 2001 From: Marcus Schiesser <mail@marcusschiesser.de> Date: Fri, 11 Jun 2021 10:53:22 +0200 Subject: [PATCH] consumers of gofabric can read unlimited messages --- src/gofabric/main.go | 52 +++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/gofabric/main.go b/src/gofabric/main.go index 58dc6ba..0dc7dce 100644 --- a/src/gofabric/main.go +++ b/src/gofabric/main.go @@ -6,49 +6,61 @@ import ( ) const ( - MESSAGES = 10 - PRODUCERS = 3 - CONSUMERS = 5 + MESSAGES = 6 + PRODUCERS = 2 + CONSUMERS = 4 ) var ( - ch chan string - wg sync.WaitGroup + done chan struct{} + ch chan string + wgProducers sync.WaitGroup + wgConsumers sync.WaitGroup ) func main() { ch = make(chan string) + done = make(chan struct{}) for i := 1; i <= PRODUCERS; i++ { - createProducer(i, MESSAGES/PRODUCERS) + if i != PRODUCERS { + createProducer(i, MESSAGES/PRODUCERS) + } else { + createProducer(i, MESSAGES/PRODUCERS+MESSAGES%PRODUCERS) + } } - createProducer(0, MESSAGES%PRODUCERS) for i := 1; i <= CONSUMERS; i++ { - createConsumer(i, MESSAGES/CONSUMERS) + createConsumer(i) } - createProducer(0, MESSAGES%CONSUMERS) - // wait for producer and consumers - wg.Wait() + // wait for producers + wgProducers.Wait() + close(done) + // wait for consumers + wgConsumers.Wait() fmt.Println("Consumers and producers are finished.") } -func createConsumer(i, messages int) { +func createConsumer(i int) { fmt.Printf("Starting consumer %d\n", i) - wg.Add(1) + wgConsumers.Add(1) go func() { - defer wg.Done() - for m := 1; m <= messages; m++ { - msg := <-ch - fmt.Printf("Consumer %d received message: %v\n", i, msg) + defer wgConsumers.Done() + for { + select { + case <-done: + fmt.Printf("Consumer %d done\n", i) + return + case msg := <-ch: + fmt.Printf("Consumer %d received message: %v\n", i, msg) + } } - fmt.Printf("Consumer %d done\n", i) }() } func createProducer(i, messages int) { fmt.Printf("Starting producer %d\n", i) - wg.Add(1) + wgProducers.Add(1) go func() { - defer wg.Done() + defer wgProducers.Done() for m := 1; m <= messages; m++ { ch <- fmt.Sprintf("Msg %d from producer %d", m, i) } -- GitLab