孔乙己:Kotlin生产者消费者问题的八种解法

2025-05-29 0 95

孔乙己:Kotlin生产者消费者问题的八种解法

孔乙己:Kotlin生产者消费者问题的八种解法

生产者消费者问题是线程模型中的经典问题:生产者消费者在同一时间段内共用同一个缓冲区(Buffer),生产者往 Buffer 中添加产品,消费者从 Buffer 中取走产品,当 Buffer 为空时,消费者阻塞,当 Buffer 满时,生产者阻塞。

Kotlin 中有多种方法可以实现多线程的生产/消费模型(大多也适用于Java)

  1. Synchronized
  2. ReentrantLock
  3. BlockingQueue
  4. Semaphore
  5. PipedXXXStream
  6. RxJava
  7. Coroutine
  8. Flow

1. Synchronized

Synchronized 是最最基本的线程同步工具,配合 wait/notify 可以实现实现生产消费问题。

  1. valbuffer=LinkedList<Data>()
  2. valMAX=5//buffer最大size
  3. vallock=Object()
  4. funproduce(data:Data){
  5. sleep(2000)//mockproduce
  6. synchronized(lock){
  7. while(buffer.size>=MAX){
  8. //当buffer满时,停止生产
  9. //注意此处使用while不能使用if,因为有可能是被另一个生产线程而非消费线程唤醒,所以要再次检查buffer状态
  10. //如果生产消费两把锁,则不必担心此问题
  11. lock.wait()
  12. }
  13. buffer.push(data)
  14. //notify方法只唤醒其中一个线程,选择哪个线程取决于操作系统对多线程管理的实现。
  15. //notifyAll会唤醒所有等待中线程,哪一个线程将会第一个处理取决于操作系统的实现,但是都有机会处理。
  16. //此处使用notify有可能唤醒的是另一个生产线程从而造成死锁,所以必须使用notifyAll
  17. lock.notifyAll()
  18. }
  19. }
  20. funconsume(){
  21. synchronized(lock){
  22. while(buffer.isEmpty())
  23. lock.wait()//暂停消费
  24. buffer.removeFirst()
  25. lock.notifyAll()
  26. }
  27. sleep(2000)//mockconsume
  28. }
  29. @Test
  30. funtest(){
  31. //同时启动多个生产、消费线程
  32. repeat(10){
  33. Thread{produce(Data())}.start()
  34. }
  35. repeat(10){
  36. Thread{consume()}.start()
  37. }
  38. }

2. ReentrantLock

Lock 相对于 Synchronized 好处是当有多个生产线/消费线程时,我们可以通过定义多个 condition 精确指定唤醒哪一个。下面的例子展示 Lock 配合 await/single 替换前面 Synchronized 写法。

  1. valbuffer=LinkedList<Data>()
  2. valMAX=5//buffer最大size
  3. vallock=ReentrantLock()
  4. valcondition=lock.newCondition()
  5. funproduce(data:Data){
  6. sleep(2000)//mockproduce
  7. lock.lock()
  8. while(buffer.size>=5)
  9. condition.await()
  10. buffer.push(data)
  11. condition.signalAll()
  12. lock.unlock()
  13. }
  14. funconsume(){
  15. lock.lock()
  16. while(buffer.isEmpty())
  17. condition.await()
  18. buffer.removeFirst()
  19. condition.singleAll()
  20. lock.unlock()
  21. sleep(2000)//mockconsume
  22. }

3. BlockingQueue (阻塞队列)

BlockingQueue在达到临界条件时,再进行读写会自动阻塞当前线程等待锁的释放,天然适合这种生产/消费场景。

  1. valbuffer=LinkedBlockingQueue<Data>(5)
  2. funproduce(data:Data){
  3. sleep(2000)//mockproduce
  4. buffer.put(data)//buffer满时自动阻塞
  5. }
  6. funconsume(){
  7. buffer.take()//buffer空时自动阻塞
  8. sleep(2000)//mockconsume
  9. }

注意 BlockingQueue 的有三组读/写方法,只有一组有阻塞效果,不要用错。

方法 说明
add(o)/remove(o) add 方法在添加元素的时候,若超出了队列的长度会直接抛出异常
offer(o)/poll(o) offer 在添加元素时,如果发现队列已满无法添加的话,会直接返回false
put(o)/take(o) put 向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素

4. Semaphore(信号量)

Semaphore 是 JUC 提供的一种共享锁机制,可以进行拥塞控制,此特性可用来控制 buffer 的大小。

  1. //canProduce:可以生产数量(即buffer可用的数量),生产者调用acquire,减少permit数目
  2. valcanProduce=Semaphore(5)
  3. //canConsumer:可以消费数量,生产者调用release,增加permit数目
  4. valcanConsume=Semaphore(5)
  5. //控制buffer访问互斥
  6. valmutex=Semaphore(0)
  7. valbuffer=LinkedList<Data>()
  8. funproduce(data:Data){
  9. if(canProduce.tryAcquire()){
  10. sleep(2000)//mockproduce
  11. mutex.acquire()
  12. buffer.push(data)
  13. mutex.release()
  14. canConsume.release()//通知消费端新增加了一个产品
  15. }
  16. }
  17. funconsume(){
  18. if(canConsume.tryAcquire()){
  19. sleep(2000)//mockconsume
  20. mutex.acquire()
  21. buffer.removeFirst()
  22. mutex.release()
  23. canProduce.release()//通知生产端可以再追加生产
  24. }
  25. }

5. PipedXXXStream (管道)

Java 里的管道输入/输出流 PipedInputStream / PipedOutputStream 实现了类似管道的功能,用于不同线程之间的相互通信,输入流中有一个缓冲数组,当缓冲数组为空的时候,输入流 PipedInputStream 所在的线程将阻塞。

  1. valpis:PipedInputStream=PipedInputStream()
  2. valpos:PipedOutputStreambylazy{
  3. PipedOutputStream().apply{
  4. pis.connect(this)//输入输出流之间建立连接
  5. }
  6. }
  7. funproduce(data:ContactsContract.Data){
  8. while(true){
  9. sleep(2000)
  10. pos.use{//Kotlin使用use方便的进行资源释放
  11. it.write(data.getBytes())
  12. it.flush()
  13. }
  14. }
  15. }
  16. funconsume(){
  17. while(true){
  18. sleep(2000)
  19. pis.use{
  20. valbyteArray=ByteArray(1024)
  21. it.read(byteArray)
  22. }
  23. }
  24. }
  25. @Test
  26. funTest(){
  27. repeat(10){
  28. Thread{produce(Data())}.start()
  29. }
  30. repeat(10){
  31. Thread{consume()}.start()
  32. }
  33. }

6. RxJava

RxJava 从概念上,可以将 Observable/Subject 作为生产者, Subscriber 作为消费者, 但是无论 Subject 或是 Observable 都缺少 Buffer 溢出时的阻塞机制,难以独立实现生产者/消费者模型。

Flowable 的背压机制,可以用来控制 buffer 数量,并在上下游之间建立通信, 配合 Atomic 可以变向实现单生产者/单消费者场景,(不适用于多生产者/多消费者场景)。

  1. classProducer:Flowable<Data>(){
  2. overridefunsubscribeActual(subscriber:org.reactivestreams.Subscriber<inData>){
  3. subscriber.onSubscribe(object:Subscription{
  4. overridefuncancel(){
  5. //…
  6. }
  7. privatevaloutStandingRequests=AtomicLong(0)
  8. overridefunrequest(n:Long){//收到下游通知,开始生产
  9. outStandingRequests.addAndGet(n)
  10. while(outStandingRequests.get()>0){
  11. sleep(2000)
  12. subscriber.onNext(Data())
  13. outStandingRequests.decrementAndGet()
  14. }
  15. }
  16. })
  17. }
  18. }
  19. classConsumer:DefaultSubscriber<Data>(){
  20. overridefunonStart(){
  21. request(1)
  22. }
  23. overridefunonNext(i:Data?){
  24. sleep(2000)//mockconsume
  25. request(1)//通知上游可以增加生产
  26. }
  27. overridefunonError(throwable:Throwable){
  28. //…
  29. }
  30. overridefunonComplete(){
  31. //…
  32. }
  33. }
  34. @Test
  35. funtest_rxjava(){
  36. try{
  37. valtestProducer=Producer)
  38. valtestConsumer=Consumer()
  39. testProducer
  40. .subscribeOn(Schedulers.computation())
  41. .observeOn(Schedulers.single())
  42. .blockingSubscribe(testConsumer)
  43. }catch(t:Throwable){
  44. t.printStackTrace()
  45. }
  46. }

7. Coroutine Channel

协程中的 Channel 具有拥塞控制机制,可以实现生产者消费者之间的通信。可以把 Channel 理解为一个协程版本的阻塞队列,capacity 指定队列容量。

  1. valchannel=Channel<Data>(capacity=5)
  2. suspendfunproduce(data:ContactsContract.Contacts.Data)=run{
  3. delay(2000)//mockproduce
  4. channel.send(data)
  5. }
  6. suspendfunconsume()=run{
  7. delay(2000)//mockconsume
  8. channel.receive()
  9. }
  10. @Test
  11. funtest_channel(){
  12. repeat(10){
  13. GlobalScope.launch{
  14. produce(Data())
  15. }
  16. }
  17. repeat(10){
  18. GlobalScope.launch{
  19. consume()
  20. }
  21. }
  22. }

此外,Coroutine 提供了 produce 方法,在声明 Channel 的同时生产数据,写法上更简单,适合单消费者生产者的场景:

  1. funCoroutineScope.produce():ReceiveChannel<Data>=produce{
  2. repeat(10){
  3. delay(2000)//mockproduce
  4. send(Data())
  5. }
  6. }
  7. @Test
  8. funtest_produce(){
  9. GlobalScope.launch{
  10. produce.consumeEach{
  11. delay(2000)//mockconsume
  12. }
  13. }
  14. }

8. Coroutine Flow

Flow 跟 RxJava 一样,因为缺少 Buffer 溢出时的阻塞机制,不适合处理生产消费问题,其背压机制也比较简单,无法像 RxJava 那样收到下游通知。但是 Flow 后来发布了 SharedFlow, 作为带缓冲的热流,提供了 Buffer 溢出策略,可以用作生产者/消费者之间的同步。

  1. valflow:MutableSharedFlow<Data>=MutableSharedFlow(
  2. extraBufferCapacity=5//缓冲大小
  3. ,onBufferOverflow=BufferOverflow.SUSPEND//缓冲溢出时的策略:挂起
  4. )
  5. @Test
  6. funtest(){
  7. GlobalScope.launch{
  8. repeat(10){
  9. delay(2000)//mockproduce
  10. sharedFlow.emit(Data())
  11. }
  12. }
  13. GlobalScope.launch{
  14. sharedFlow.collect{
  15. delay(2000)//mockconsume
  16. }
  17. }
  18. }

注意 SharedFlow 也只能用在单生产者/单消费者场景。

总结

生产者/消费者问题,其本质核心还是多线程读写共享资源(Buffer)时的同步问题,理论上只要具有同步机制的多线程框架,例如线程锁、信号量、阻塞队列、协程 Channel等,都是可以实现生产消费模型的。

另外,RxJava 和 Flow 虽然也是多线程框架,但是缺少Buffer溢出时的阻塞机制,不适用于生产/消费场景,更适合在纯响应式场景中使用。

原文链接:https://mp.weixin.qq.com/s/zWrSq8JVRkLUilMMgdv1Cw

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 孔乙己:Kotlin生产者消费者问题的八种解法 https://www.kuaiidc.com/108091.html

相关文章

发表评论
暂无评论