[concurrency-interest] ForkJoinPool deadlock - bug?

Matthias Schmalz matthias.schmalz at digitalasset.com
Wed Oct 21 10:52:25 EDT 2020


Dear experts,

We are using ForkJoinPool as part of a software project. In our integration
tests, we occasionally observe deadlocks with the following characteristics:
- The ForkJoinPool stops executing new tasks.
- All worker threads are waiting inside of ForkJoinPool.managedBlock.
- The number of running threads is 0, the number of active threads is 1.

After increasing the "parallelism" parameter, the problem still occurred.
Increasing the "minimumRunnable" parameter from 1 to 2 made the problem
disappear. So we have a workaround for the moment.

Further investigations revealed that worker threads may be SIGNALLED or
UNSIGNALLED. When a worker thread is UNSIGNALLED, it will be parked soon
after. However, an UNSIGNALLED thread scans the work queues before being
parked; so UNSIGNALLED worker threads may actually execute tasks.

It seems, if an UNSIGNALLED thread executes "managedBlock", the RC variable
may temporarily be too high by one. Which ultimately results in the
deadlock we have observed.

Below, you can find a unit test that reproduces the issue quite reliably
(on my machine). It has been compiled with Scala 2.12.11 (I hope Scala is
ok, if not I can convert it to Java) and executed with OpenJDK 11.0.6.

I kindly ask you to critically review my observations and let me know if
they are correct. Also, if there is a bug in ForkJoinPool, how can I
request a fix? Lastly, if this is not the right place to discuss such
observations, I would be glad if you could point me to the right place.

Thank you very much in advance and have a nice day!

Matthias Schmalz

import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ForkJoinPool, ForkJoinWorkerThread,
Semaphore, TimeUnit}

import org.scalatest.{Matchers, WordSpec}

/**
  * Demonstrates a deadlock in ForkJoinPool.
  *
  * Compiled with Scala 2.12.11
  * Executed with OpenJDK 11.0.6
  */
class ForkJoinPoolTest extends WordSpec with Matchers {

  lazy val forkJoinPool = new ForkJoinPool(
    1, // The problem can also be reproduced with parallelism = 2.
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    ((_, t) => t.printStackTrace()): Thread.UncaughtExceptionHandler,
    true
  )

  val MinDelayCycles = 50000
  val MaxDelayCycles = 100000
  val Iterations     = MaxDelayCycles - MinDelayCycles

  "A task may run on an UNSIGNALLED work queue" in {
    val completedTasks               = new Semaphore(0)
    val tasksOnUnsignalledWorkQueues = new AtomicInteger()

    for (delay <- MinDelayCycles until MaxDelayCycles) {
      forkJoinPool.execute(() => {
        if (!isSignalled) {
          tasksOnUnsignalledWorkQueues.getAndIncrement()
        }
        completedTasks.release()
      })

      busyWait(delay)
    }

    // All tasks scheduled. Awaiting termination.
    completedTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe true

    Console.err.println(s"Hits: ${tasksOnUnsignalledWorkQueues.get()},
Total: $Iterations")
    tasksOnUnsignalledWorkQueues.get() should be > 0
  }

  "If several blocking tasks run on SIGNALLED work queues, all tasks
complete" in {
    val startedTasks = new Semaphore(0)
    val blocker      = new Semaphore(0)
    val blockLimit   = new AtomicInteger(100)

    for (delay <- MinDelayCycles until MaxDelayCycles) {
      forkJoinPool.execute(() => {
        startedTasks.release()
        if (isSignalled && blockLimit.getAndDecrement() > 0) {
          blocking {
            blocker.acquire()
          }
        }
      })

      busyWait(delay)
    }

    // As ForkJoinPool.managedBlock will create extra threads,
    // all tasks will eventually be started.
    startedTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe true

    // Cleanup for next test
    blocker.release(100)
  }

  "If several blocking tasks run on UNSIGNALLED work queues, the pool
may deadlock" in {
    val runningTasks = new Semaphore(0)
    val blockers     = new Semaphore(0)
    val blockLimit   = new AtomicInteger(10)

    for (delay <- MinDelayCycles until MaxDelayCycles) {
      forkJoinPool.execute(() => {
        runningTasks.release()
        if (!isSignalled && blockLimit.getAndDecrement() > 0) {
          blocking {
            blockers.acquire()
          }
        }
      })

      busyWait(delay)
    }

    // PROBLEM: runningTasks can no longer be acquired, so
ForkJoinPool is running out of threads.
    runningTasks.tryAcquire(Iterations, 10, TimeUnit.SECONDS) shouldBe false

    // Looking at the output, all threads are parked or inside of
ForkJoinPool.managedBlock,
    // so apparently it failed to release an extra thread.
    printStackTraces()

    // The activeThreadCount seems to be off by one.
    forkJoinPool.getRunningThreadCount shouldBe 0
    forkJoinPool.getActiveThreadCount shouldBe 1
  }

  def busyWait(delay: Int): Long = {
    var result = 0L
    for (i <- 0 until delay) {
      result += i * i
    }
    result
  }

  def isSignalled: Boolean = {
    val workQueueField =
classOf[ForkJoinWorkerThread].getDeclaredField("workQueue")
    workQueueField.setAccessible(true)
    val q = workQueueField.get(Thread.currentThread())

    val phaseField =
Class.forName("java.util.concurrent.ForkJoinPool$WorkQueue").getDeclaredField("phase")
    phaseField.setAccessible(true)
    val phase = phaseField.get(q).asInstanceOf[Int]

    phase >= 0
  }

  def blocking(thunk: => Unit): Unit = {
    val blocker: ForkJoinPool.ManagedBlocker = new ForkJoinPool.ManagedBlocker {
      private[this] var done: Boolean = false
      override def block(): Boolean = {
        try {
          if (!done) {
            thunk
          }
        } finally {
          done = true
        }

        true
      }

      override def isReleasable: Boolean = done
    }
    ForkJoinPool.managedBlock(blocker)
  }

  private def printStackTraces(): Unit = {
    val traces = {
      import scala.collection.JavaConverters._
      Thread.getAllStackTraces.asScala.toMap
        .collect {
          case (thread, stackTrace) if
thread.getName.startsWith("ForkJoinPool") =>
            s"$thread state=${thread.getState.toString}" +
stackTrace.mkString("\n\t", "\n\t", "\n")
        }
        .mkString("\n")
    }
    Console.out.println(s"Here are the stack-traces of ForkJoinPool
worker threads:\n$traces")
  }
}

-- 
Dr. Matthias Schmalz
Senior Software Engineer
e: matthias.schmalz at digitalasset.com 
Digital Asset (Switzerland) GmbH
Luggwegstrasse 9
8048 Zurich, Switzerland
digitalasset.com <http://www.digitalasset.com/>

-- 
This message, and any attachments, is for the intended recipient(s) only, 
may contain information that is privileged, confidential and/or proprietary 
and subject to important terms and conditions available at 
http://www.digitalasset.com/emaildisclaimer.html 
<http://www.digitalasset.com/emaildisclaimer.html>. If you are not the 
intended recipient, please delete this message.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20201021/2eb8e27e/attachment-0001.htm>


More information about the Concurrency-interest mailing list