Home | Libraries | People | FAQ | More |
Each fiber owns a stack and manages its execution state, including all registers and CPU flags, the instruction pointer and the stack pointer. That means, in general, a fiber is not bound to a specific thread.[3],[4]
Migrating a fiber from a logical CPU with heavy workload to another logical CPU with a lighter workload might speed up the overall execution. Note that in the case of NUMA-architectures, it is not always advisable to migrate data between threads. Suppose fiber f is running on logical CPU cpu0 which belongs to NUMA node node0. The data of f are allocated on the physical memory located at node0. Migrating the fiber from cpu0 to another logical CPU cpuX which is part of a different NUMA node nodeX might reduce the performance of the application due to increased latency of memory access.
Only fibers that are contained in algorithm
’s ready queue can
migrate between threads. You cannot migrate a running fiber, nor one that is
blocked. You cannot migrate
a fiber if its context::is_context()
method returns true
for pinned_context
.
In Boost.Fiber a fiber is migrated by invoking
context::detach()
on the thread from which the fiber migrates
and context::attach()
on the thread to which the fiber migrates.
Thus, fiber migration is accomplished by sharing state between instances of
a user-coded algorithm
implementation running on different threads.
The fiber’s original thread calls algorithm::awakened()
, passing
the fiber’s context
*
. The awakened()
implementation calls context::detach()
.
At some later point, when the same or a different thread calls algorithm::pick_next()
,
the pick_next()
implementation selects a ready fiber and calls context::attach()
on
it before returning it.
As stated above, a context
for which is_context(pinned_context)
== true
must never be passed to either context::detach()
or context::attach()
.
It may only be returned from pick_next()
called by the same thread
that passed that context to awakened()
.
In the example work_sharing.cpp
multiple worker fibers are created on the main thread. Each fiber gets a character
as parameter at construction. This character is printed out ten times. Between
each iteration the fiber calls this_fiber::yield()
. That puts
the fiber in the ready queue of the fiber-scheduler shared_ready_queue,
running in the current thread. The next fiber ready to be executed is dequeued
from the shared ready queue and resumed by shared_ready_queue
running on any participating thread.
All instances of shared_ready_queue share one global concurrent queue, used as ready queue. This mechanism shares all worker fibers between all instances of shared_ready_queue, thus between all participating threads.
In main()
the fiber-scheduler is installed and the worker fibers and the threads are
launched.
boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { boost::fibers::fiber([c](){ whatevah( c); }).detach(); ++fiber_count; } boost::fibers::detail::thread_barrier b( 4); std::thread threads[] = { std::thread( thread, & b), std::thread( thread, & b), std::thread( thread, & b) }; b.wait(); { lock_type lk( mtx_count); cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); } BOOST_ASSERT( 0 == fiber_count); for ( std::thread & t : threads) { t.join(); }
Install the scheduling algorithm |
|
Launch a number of worker fibers; each worker fiber picks up a character
that is passed as parameter to fiber-function |
|
Increment fiber counter for each new fiber. |
|
Launch a couple of threads that join the work sharing. |
|
sync with other threads: allow them to start processing |
|
|
|
Suspend main fiber and resume worker fibers in the meanwhile. Main fiber
gets resumed (e.g returns from |
|
Releasing lock of mtx_count is required before joining the threads, otherwise the other threads would be blocked inside condition_variable::wait() and would never return (deadlock). |
|
wait for threads to terminate |
The start of the threads is synchronized with a barrier. The main fiber of
each thread (including main thread) is suspended until all worker fibers are
complete. When the main fiber returns from condition_variable::wait()
,
the thread terminates: the main thread joins all other threads.
void thread( boost::fibers::detail::thread_barrier * b) { std::ostringstream buffer; buffer << "thread started " << std::this_thread::get_id() << std::endl; std::cout << buffer.str() << std::flush; boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); b->wait(); lock_type lk( mtx_count); cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); BOOST_ASSERT( 0 == fiber_count); }
Install the scheduling algorithm |
|
sync with other threads: allow them to start processing |
|
Suspend main fiber and resume worker fibers in the meanwhile. Main fiber
gets resumed (e.g returns from |
Each worker fiber executes function whatevah()
with character me
as parameter. The fiber yields in a loop and prints out a message if it was
migrated to another thread.
void whatevah( char me) { try { std::thread::id my_thread = std::this_thread::get_id(); { std::ostringstream buffer; buffer << "fiber " << me << " started on thread " << my_thread << '\n'; std::cout << buffer.str() << std::flush; } for ( unsigned i = 0; i < 10; ++i) { boost::this_fiber::yield(); std::thread::id new_thread = std::this_thread::get_id(); if ( new_thread != my_thread) { my_thread = new_thread; std::ostringstream buffer; buffer << "fiber " << me << " switched to thread " << my_thread << '\n'; std::cout << buffer.str() << std::flush; } } } catch ( ... ) { } lock_type lk( mtx_count); if ( 0 == --fiber_count) { lk.unlock(); cnd_count.notify_all(); } }
get ID of initial thread |
|
loop ten times |
|
yield to other fibers |
|
get ID of current thread |
|
test if fiber was migrated to another thread |
|
Decrement fiber counter for each completed fiber. |
|
Notify all fibers waiting on |
The fiber scheduler shared_ready_queue
is like round_robin
, except
that it shares a common ready queue among all participating threads. A thread
participates in this pool by executing use_scheduling_algorithm()
before
any other Boost.Fiber operation.
The important point about the ready queue is that it’s a class static, common
to all instances of shared_ready_queue. Fibers that are enqueued via algorithm::awakened()
(fibers
that are ready to be resumed) are thus available to all threads. It is required
to reserve a separate, scheduler-specific queue for the thread’s main fiber
and dispatcher fibers: these may not be shared between
threads! When we’re passed either of these fibers, push it there instead of
in the shared queue: it would be Bad News for thread B to retrieve and attempt
to execute thread A’s main fiber.
[awakened_ws]
When algorithm::pick_next()
gets called inside one thread,
a fiber is dequeued from rqueue_ and will be resumed in
that thread.
[pick_next_ws]
The source code above is found in work_sharing.cpp.
[3] The “main” fiber on each thread, that is, the fiber on which the thread is launched, cannot migrate to any other thread. Also Boost.Fiber implicitly creates a dispatcher fiber for each thread — this cannot migrate either.
[4] Of course it would be problematic to migrate a fiber that relies on thread-local storage.