21# pragma warning( disable: 4275 )
22# pragma warning(disable: 4251)
46#define ENABLE_VERBOSE_THREAD_TIMELINE 0
49#define AGX_MAX_NUM_THREADS 1024
151 inline bool joinable();
156 inline unsigned int getThreadState();
190 std::mutex m_handleMutex;
261 static Thread *getThread(
size_t id);
266 static Thread *getMainThread();
297 static bool isMainThread();
322 const char* description,
323 const char* extraDataTitle =
nullptr,
329 Real getOverheadTime()
const;
370 static int log(
const char *format, ...);
394 class MainThreadSingleton;
395 static Thread *initMainThread();
397 static void performNumThreadsChange();
401 void freeDefaultStorages();
402 void spawn(
Job *job);
403 void sortInsertJob(
Job *job);
406 void completeFrame(
Task *task);
408 bool isActive()
const;
411 template <
bool THREAD_TIMELINE_STATISTICS>
415 void blockingDoWork();
418 void wakeupThreads();
419 void pushTargetJob(
Thread *target,
Job *job,
bool activateTarget =
true);
420 static void taskCompleted(
Task *task);
425 Job *getExecutionJob();
427 Index getRandomOtherThreadId(
Index excludeIndex = InvalidIndex);
435 Notify::ThreadData *getNotifyData();
450 static const UInt32 mod = ((1ULL << 32) - 5);
451 static const UInt32 mul = 69070U;
459 FastRandom m_fastRandom;
463 inline Frame(
Task *t =
nullptr) : task(t), done(false) {}
464 inline Frame& operator=(
const Frame& other) {task = other.task; done = other.done.load();
return *
this;}
466 std::atomic<bool> done;
469 #define AGX_THREAD_FRAME_STACK_MAX_DEPTH 64
472 size_t m_activationDepth;
473 std::atomic<Int32> m_activationCount;
480 return lhs->getCostEstimate() < rhs->getCostEstimate();
487 Thread *m_listNodeNext;
489 using JobQueue = std::priority_queue<Job *, agx::VectorPOD<Job *>, JobCompare>;
491 JobQueue m_sharedJobs;
495 std::atomic<Int32> m_pushCounter;
496 std::atomic<bool> m_running;
507 Timer m_overheadTimer;
511 UInt64 m_savedRegisterState;
516 std::random_device m_randomDevice;
517 RandomGenerator m_mersienneTwister;
522 void registerContainerAllocation(
Container *container);
523 void unregisterContainerAllocation(
Container *container);
524 void *allocateScratchPadBuffer(
size_t numBytes);
525 void deallocateScratchPadBuffer(
void *buffer,
size_t numBytes);
527 struct ScratchPadArea
529 ScratchPadArea() : buffer(nullptr),
end(nullptr), head(nullptr), m_allocator(
"ScratchPad")
532 ~ScratchPadArea() { m_allocator.deallocateBytes(buffer); }
537 ByteAllocator m_allocator;
538 VectorPOD<Container *> m_activeAllocations;
541 ScratchPadArea m_scratchPad;
547 static Thread *s_mainThread;
548 static Callback1<Task *> s_taskCompletionCallback;
550 static bool s_enableJobTimeline;
552 static std::exception_ptr s_unhandledException;
555 struct LocalTimelineEntry
565 , extraDataTitle(nullptr)
569 enum JobType { PRE, DISPATCH, POST,
UNKNOWN };
580 const char* extraDataTitle;
584 Vector<LocalTimelineEntry> m_localTimelineEntries;
588 void exportTimelineEntries();
589 void reportTimelineJob(Job *job);
590 void pushTimelineEntry(
const LocalTimelineEntry& entry);
593 class LogChunk :
public Referenced
596 LogChunk(
size_t numBytes);
601 size_t numAvailable();
610 LogEntry(
agx::UInt64 timestamp_,
const char* message_);
615 int logImplementation(
const char* format, va_list ap);
616 static int cmpLogEntry(
const LogEntry& entry1,
const LogEntry& entry2);
618 LogChunkRef m_logChunk;
619 VectorPOD<LogEntry> m_logEntries;
621 typedef HashTable<agxData::EntityModel*, ref_ptr<Referenced>> DefaultStorageTable;
622 DefaultStorageTable m_defaultStorageTable;
658 return s_threads[id];
668 AGX_FORCE_INLINE bool Thread::isActive()
const {
return m_activationCount.load() > 0; }
672 return m_mersienneTwister;
681 va_start(arguments, format);
691 return m_byteAllocator.allocate(numBytes);
696 m_byteAllocator.deallocateBytes(ptr);
699 template <
typename T>
702 return this->getPool<T>()->allocate();
705 template <
typename T>
708 this->getPool<T>()->deallocate(ptr);
712 template <
typename T>
715 return this->getPool<T>()->create();
718 template <
typename T>
721 this->getPool<T>()->destroy(ptr);
724 template <
typename T>
727 uint32_t
id = agxData::getType<T>()->getId();
728 if (
id >= m_pools.size())
729 m_pools.resize(
id+1, 0);
732 m_pools[id] =
new MemoryPool<T>;
734 return static_cast<MemoryPool<T> *
>(m_pools[id]);
737 void setCurrentConstructionObject(
void *ptr);
738 void *getCurrentConstructionObject();
748 if (m_scratchPad.head + numBytes > m_scratchPad.end)
750 size_t currentSize = m_scratchPad.end - m_scratchPad.buffer;
751 const Real growFactor = 1.5;
752 size_t newSize = (size_t)(
Real(currentSize + numBytes) * growFactor );
755 char *newBuffer = (
char *)m_scratchPad.m_allocator.allocateBytes(newSize, 64);
756 agxAssertN(newBuffer,
"Thread %d could not allocate %u bytes for job scratch pad!", this->
getId(), (
unsigned)newSize);
761 size_t numUsed = m_scratchPad.head - m_scratchPad.buffer;
764 memcpy(newBuffer, m_scratchPad.buffer, numUsed);
766 m_scratchPad.m_allocator.deallocateBytes(m_scratchPad.buffer);
769 for (
size_t i = 0; i < m_scratchPad.m_activeAllocations.size(); ++i)
771 Container *container = m_scratchPad.m_activeAllocations[i];
773 if (container->m_buffer)
775 ptrdiff_t offset = (
char *)container->m_buffer - (
char *)m_scratchPad.buffer;
776 container->m_buffer = newBuffer + offset;
780 m_scratchPad.buffer = newBuffer;
781 m_scratchPad.end = m_scratchPad.buffer + newSize;
782 m_scratchPad.head = m_scratchPad.buffer + numUsed;
785 void *mem = m_scratchPad.head;
786 m_scratchPad.head += numBytes;
791 AGX_FORCE_INLINE void Thread::deallocateScratchPadBuffer(
void *buffer,
size_t numBytes)
793 agxAssert(!buffer || (buffer >= m_scratchPad.buffer && buffer < m_scratchPad.end));
795 if ((
char *)buffer + numBytes == m_scratchPad.head)
796 m_scratchPad.head = (
char *)buffer;
799 AGX_FORCE_INLINE void Thread::registerContainerAllocation(Container *container)
801 m_scratchPad.m_activeAllocations.push_back(container);
805 AGX_FORCE_INLINE void Thread::unregisterContainerAllocation(Container *container)
810 agxAssert1(!m_scratchPad.m_activeAllocations.empty() && container == m_scratchPad.m_activeAllocations.back(),
"LIFO order required!");
811 m_scratchPad.m_activeAllocations.pop_back();
814 if (m_scratchPad.m_activeAllocations.empty())
815 m_scratchPad.head = m_scratchPad.buffer;
821#if ENABLE_VERBOSE_THREAD_TIMELINE
822 #define AGX_BEGIN_TIMELINE_REPORT(variable) \
823 auto variable ## _begin_time = agx::Timer::getCurrentTick()
825 #define AGX_END_TIMELINE_REPORT(variable, title) \
826 auto variable ## _end_time = agx::Timer::getCurrentTick(); \
827 agx::Thread::getCurrentThread()->reportSystemJob( variable ## _begin_time , variable ## _end_time, title);
829 #define AGX_END_TIMELINE_REPORT_DATA(variable, title, title2, data) \
830 auto variable ## _end_time = agx::Timer::getCurrentTick(); \
831 agx::Thread::getCurrentThread()->reportSystemJob( variable ## _begin_time , variable ## _end_time, title, title2, data)
833 #define AGX_BEGIN_TIMELINE_REPORT(variable)
834 #define AGX_END_TIMELINE_REPORT(variable, title)
835 #define AGX_END_TIMELINE_REPORT_DATA(variable, title, title2, data)
#define AGX_DECLARE_POINTER_TYPES(type)
#define AGX_THREAD_FRAME_STACK_MAX_DEPTH
#define AGX_MAX_NUM_THREADS
void agxFlushThreadLogs()
#define AGXPHYSICS_EXPORT
An abstract description of a data entity stored using SOA (structure of arrays) pattern in a EntitySt...
Data storage for a collection of entity instances of a specified EntityModel.
Basic wrapper class aroud std::thread.
BasicThread & operator=(const BasicThread &rhs)=delete
BasicThread()
Default constructor.
bool setThreadAffinity(agx::UInt64 cpumask)
Thread Affinity can be used to influence on which logical cores threads are scheduled and allowed to ...
BasicThread(const BasicThread &other)=delete
void detach()
Detaches the thread to the background.
void cancel()
Threads should normally not need to be killed.
virtual ~BasicThread()=default
Destructor.
std::atomic< unsigned int > m_state
bool join()
Joins the thread.
virtual void run()
This method is invoked by start.
bool joinable()
True if thread is joinable.
ThreadState
States for the thread.
unsigned int getThreadState()
Returns the current thread state.
bool start()
Launches the thread.
static std::thread::native_handle_type getCurrentThreadHandle()
Return a native_handle for the current executing thread.
Block synchronization primitive.
The Container is the base class for several of the container classes proided by AGX,...
The object defining a frame of reference and providing transformations operations.
An abstract job/workblock representation, which allows work threads to execute arbitrary tasks.
Inheritance with partial specialization due to bug with ref_ptr containers.
Class for handling logging of messages.
A representation of a generic task.
agx::Thread is a representation of an OS specific implementation of a computational thread.
static void writePerThreadStorage(ThreadStorageKey key, ThreadStorageData data)
Write to the thread-local location owned by the currently executing thread.
HashTable< Thread *, Index > ThreadIdTable
friend void AGXPHYSICS_EXPORT shutdown()
Shutdown of the AGX Dynamics API will be done when the number of shutdown matches the number of calls...
static void resetStartTick()
agx::Uuid generateUuid()
Generates a unique universal identifier.
void stop()
Stop the thread.
static void makeCurrentThreadMainThread()
Register current thread as main thread.
static int log(const char *format,...)
static Thread * getCurrentThread()
static void freePerThreadStorage(ThreadStorageKey key)
Deallocate the storage location.
static void addTask(Task *task)
agxData::EntityStorage * getDefaultStorage(agxData::EntityModel *entity)
static bool isMainThread()
void resetOverheadTime()
Reset the thread overhead time.
bool start()
Start the thread.
Thread(const Thread &)=delete
static void initThreadSystem()
static void shutdown()
Shutdown the threading system.
void reportSystemJob(UInt64 startTick, UInt64 endTick, const char *description, const char *extraDataTitle=nullptr, agx::Real64 extraData=0.0)
Add an entry to the job duration log.
static std::string getCurrentThreadDescription()
static void setEnableJobTimeline(bool flag)
Enable or disable job timeline statistics.
static Thread * registerAsAgxThread()
Register the current thread as an AGX thread.
friend void AGXCORE_EXPORT setNumThreads(size_t numThreads)
Set the number of threads to use (including the main thread).
pthread_key_t ThreadStorageKey
std::mt19937 RandomGenerator
Index getIndex() const
Get an index suitable for use when storing per-thread data in e.g.
static ThreadStorageKey allocatePerThreadStorage()
Allocate a storage location that is unique for each thread.
agxData::EntityStorageRef popTimelineEntryStorage()
RandomGenerator & getRandomGenerator()
Return a reference to the mersienne twister used for generating random numbers.
static bool immediateLogging
static void flushTimelineLogs()
static bool isShuttingDown()
static Thread * getThread(size_t id)
static Thread * getMainThread()
static bool getEnableJobTimeline()
static ThreadStorageData readPerThreadStorage(ThreadStorageKey key)
Read the thread-local value for the currently executing thread associated with the given key.
static void unregisterAsAgxThread()
Remove agx attributes from current thread.
Index getId() const
Returns the thread's AGX thread ID, a value between 0 and N-1, for AGX's internal threads,...
Real getOverheadTime() const
static Thread::ThreadIdTable * getPromotedThreads()
static void exportAllTimelines()
Generator of UUID values based on V4 http://en.wikipedia.org/wiki/Universally_unique_identifier.
A UUID, or Universally unique identifier, is intended to uniquely identify information in a distribut...
Vector containing 'raw' data.
#define agxAssert1(expr, msg)
#define agxAssertN(expr, format,...)
#define AGX_STATIC_ASSERT(X)
Contains classes for low level data storage for AGX.
The agx namespace contains the dynamics/math part of the AGX Dynamics API.
agx::VectorPOD< Thread * > ThreadPtrVector
LinearProbingHashSetImplementation< KeyT, HashT >::iterator end(LinearProbingHashSetImplementation< KeyT, HashT > &set)
VectorPOD< class Job * > JobPtrVector
void AGXPHYSICS_EXPORT init()
Initialize AGX Dynamics API including thread resources and must be executed before using the AGX API.