ogl_beamforming

Ultrasound Beamforming Implemented with OpenGL
git clone anongit@rnpnr.xyz:ogl_beamforming.git
Log | Files | Refs | Feed | Submodules | README | LICENSE

Commit: 60996fa80869d983137b589ff9e6448f5591bbf9
Parent: 1a82dd8b381564b70d50af5303188e09280bac3c
Author: Randy Palamar
Date:   Sun,  9 Nov 2025 18:13:41 -0700

core: upload is now multi core by default

see https://www.rfleury.com/p/multi-core-by-default or the
raddebugger source code for references.

NOTE: since the upload is entirely bound by memory access
uploading over multiple threads does not give any measurable
performance difference; for now lane count gets hardcoded to 1.
This may no longer be true once we start doing some actual work on
the CPU.

This commit was mostly about setting up the base infrastructure
and this concept will likely move into the other parts of the code
as well.

Diffstat:
Mbeamformer.c | 101++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------
Mbeamformer.h | 4+++-
Mbeamformer_shared_memory.c | 4++--
Mbuild.c | 5+++++
Mhelpers/ogl_beamformer_lib.c | 3++-
Mmain_linux.c | 5++++-
Mmain_w32.c | 6++++--
Mmath.c | 15+++++++++++++++
Mos_linux.c | 92+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Mos_win32.c | 144++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Mstatic.c | 76++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Mtests/throughput.c | 10+---------
Athreads.c | 32++++++++++++++++++++++++++++++++
Mutil.c | 10++++++++--
Mutil.h | 33+++++++++++++++++++++++++++++++++
15 files changed, 433 insertions(+), 107 deletions(-)

diff --git a/beamformer.c b/beamformer.c @@ -1397,7 +1397,7 @@ DEBUG_EXPORT BEAMFORMER_COMPLETE_COMPUTE_FN(beamformer_complete_compute) } function void -beamformer_rf_buffer_allocate(BeamformerRFBuffer *rf, u32 rf_size, Arena arena) +beamformer_rf_buffer_allocate(BeamformerRFBuffer *rf, u32 rf_size) { assert((rf_size % 64) == 0); glDeleteBuffers(1, &rf->ssbo); @@ -1411,50 +1411,87 @@ beamformer_rf_buffer_allocate(BeamformerRFBuffer *rf, u32 rf_size, Arena arena) DEBUG_EXPORT BEAMFORMER_RF_UPLOAD_FN(beamformer_rf_upload) { - BeamformerSharedMemory *sm = ctx->shared_memory->region; - + struct load_context { + u8 *buffer; + void *data; + u32 channel_count; + u32 channel_stride_bytes; + } load_context_store = {0}; + struct load_context *lctx = 0; + + BeamformerSharedMemory *sm = 0; BeamformerSharedMemoryLockKind scratch_lock = BeamformerSharedMemoryLockKind_ScratchSpace; BeamformerSharedMemoryLockKind upload_lock = BeamformerSharedMemoryLockKind_UploadRF; - u32 scratch_rf_size; - if (atomic_load_u32(sm->locks + upload_lock) && - (scratch_rf_size = atomic_swap_u32(&sm->scratch_rf_size, 0)) && - os_shared_memory_region_lock(ctx->shared_memory, sm->locks, (i32)scratch_lock, (u32)-1)) - { - BeamformerRFBuffer *rf = ctx->rf_buffer; - rf->active_rf_size = (u32)round_up_to(scratch_rf_size, 64); - if (rf->size < rf->active_rf_size) - beamformer_rf_buffer_allocate(rf, rf->active_rf_size, arena); - u32 slot = rf->insertion_index++ % countof(rf->compute_syncs); + u32 insertion_slot = 0; + if (lane_index() == 0) { + sm = ctx->shared_memory->region; + lctx = &load_context_store; + u32 scratch_rf_size; - /* NOTE(rnp): if the rest of the code is functioning then the first - * time the compute thread processes an upload it must have gone - * through this path. therefore it is safe to spin until it gets processed */ - spin_wait(atomic_load_u64(rf->upload_syncs + slot)); - - if (atomic_load_u64(rf->compute_syncs + slot)) { - GLenum sync_result = glClientWaitSync(rf->compute_syncs[slot], 0, 1000000000); - if (sync_result == GL_TIMEOUT_EXPIRED || sync_result == GL_WAIT_FAILED) { - // TODO(rnp): what do? + if (atomic_load_u32(sm->locks + upload_lock) && + (scratch_rf_size = atomic_swap_u32(&sm->rf_meta.size, 0)) && + os_shared_memory_region_lock(ctx->shared_memory, sm->locks, (i32)scratch_lock, (u32)-1)) + { + BeamformerRFBuffer *rf = ctx->rf_buffer; + rf->active_rf_size = (u32)round_up_to(scratch_rf_size, 64); + if (rf->size < rf->active_rf_size) + beamformer_rf_buffer_allocate(rf, rf->active_rf_size); + + insertion_slot = rf->insertion_index++ % countof(rf->compute_syncs); + + /* NOTE(rnp): if the rest of the code is functioning then the first + * time the compute thread processes an upload it must have gone + * through this path. therefore it is safe to spin until it gets processed */ + spin_wait(atomic_load_u64(rf->upload_syncs + insertion_slot)); + + if (atomic_load_u64(rf->compute_syncs + insertion_slot)) { + GLenum sync_result = glClientWaitSync(rf->compute_syncs[insertion_slot], 0, 1000000000); + if (sync_result == GL_TIMEOUT_EXPIRED || sync_result == GL_WAIT_FAILED) { + // TODO(rnp): what do? + } + glDeleteSync(rf->compute_syncs[insertion_slot]); } - glDeleteSync(rf->compute_syncs[slot]); - } - /* NOTE(rnp): nVidia's drivers really don't play nice with persistant mapping, - * at least when it is a big as this one wants to be. mapping and unmapping the - * desired range each time doesn't seem to introduce any performance hit */ - u32 access = GL_MAP_WRITE_BIT|GL_MAP_FLUSH_EXPLICIT_BIT|GL_MAP_UNSYNCHRONIZED_BIT; - u8 *buffer = glMapNamedBufferRange(rf->ssbo, slot * rf->active_rf_size, (i32)rf->active_rf_size, access); + /* NOTE(rnp): nVidia's drivers really don't play nice with persistant mapping, + * at least when it is a big as this one wants to be. mapping and unmapping the + * desired range each time doesn't seem to introduce any performance hit */ + u32 access = GL_MAP_WRITE_BIT|GL_MAP_FLUSH_EXPLICIT_BIT|GL_MAP_UNSYNCHRONIZED_BIT; + lctx->buffer = glMapNamedBufferRange(rf->ssbo, insertion_slot * rf->active_rf_size, + (i32)rf->active_rf_size, access); + lctx->data = beamformer_shared_memory_scratch_arena(sm).beg; + + BeamformerParameterBlock *b = beamformer_parameter_block(sm, atomic_load_u32(&sm->rf_meta.block)); + BeamformerParameters *bp = &b->parameters; + BeamformerDataKind data_kind = b->pipeline.data_kind; + + u32 size = bp->acquisition_count * bp->sample_count * beamformer_data_kind_byte_size[data_kind]; + lctx->channel_count = bp->channel_count; + lctx->channel_stride_bytes = size; + } + } + lane_sync_u64(&lctx, 0); + + if (lctx->buffer) { + RangeU64 range = lane_range(lctx->channel_count); + for (u64 channel = range.start; channel < range.stop; channel++) { + u8 *out = lctx->buffer + channel * lctx->channel_stride_bytes; + u8 *in = lctx->data + channel * lctx->channel_stride_bytes; + mem_copy(out, in, lctx->channel_stride_bytes); + } + } + lane_sync(); - mem_copy(buffer, beamformer_shared_memory_scratch_arena(sm).beg, rf->active_rf_size); + if (lctx->buffer && lane_index() == 0) { os_shared_memory_region_unlock(ctx->shared_memory, sm->locks, (i32)scratch_lock); post_sync_barrier(ctx->shared_memory, upload_lock, sm->locks); + BeamformerRFBuffer *rf = ctx->rf_buffer; glFlushMappedNamedBufferRange(rf->ssbo, 0, (i32)rf->active_rf_size); glUnmapNamedBuffer(rf->ssbo); - atomic_store_u64(rf->upload_syncs + slot, glFenceSync(GL_SYNC_GPU_COMMANDS_COMPLETE, 0)); - atomic_store_u64(rf->compute_syncs + slot, 0); + atomic_store_u64(rf->upload_syncs + insertion_slot, glFenceSync(GL_SYNC_GPU_COMMANDS_COMPLETE, 0)); + atomic_store_u64(rf->compute_syncs + insertion_slot, 0); os_wake_waiters(ctx->compute_worker_sync); diff --git a/beamformer.h b/beamformer.h @@ -13,6 +13,7 @@ /////////////////// // REQUIRED OS API +function void os_barrier_wait(Barrier); function iptr os_error_handle(void); function s8 os_path_separator(void); function OS_READ_WHOLE_FILE_FN(os_read_whole_file); @@ -21,6 +22,7 @@ function OS_SHARED_MEMORY_UNLOCK_REGION_FN(os_shared_memory_region_unlock); function OS_WAKE_WAITERS_FN(os_wake_waiters); function OS_WRITE_FILE_FN(os_write_file); +#include "threads.c" #include "util_gl.c" enum gl_vendor_ids { @@ -368,7 +370,7 @@ typedef BEAMFORMER_FRAME_STEP_FN(beamformer_frame_step_fn); #define BEAMFORMER_COMPLETE_COMPUTE_FN(name) void name(iptr user_context, Arena *arena, iptr gl_context) typedef BEAMFORMER_COMPLETE_COMPUTE_FN(beamformer_complete_compute_fn); -#define BEAMFORMER_RF_UPLOAD_FN(name) void name(BeamformerUploadThreadContext *ctx, Arena arena) +#define BEAMFORMER_RF_UPLOAD_FN(name) void name(BeamformerUploadThreadContext *ctx) typedef BEAMFORMER_RF_UPLOAD_FN(beamformer_rf_upload_fn); #define BEAMFORMER_RELOAD_SHADER_FN(name) b32 name(s8 path, ShaderReloadContext *src, \ diff --git a/beamformer_shared_memory.c b/beamformer_shared_memory.c @@ -1,5 +1,5 @@ /* See LICENSE for license details. */ -#define BEAMFORMER_SHARED_MEMORY_VERSION (19UL) +#define BEAMFORMER_SHARED_MEMORY_VERSION (20UL) typedef struct BeamformerFrame BeamformerFrame; @@ -167,7 +167,7 @@ typedef struct { u32 reserved_parameter_blocks; /* TODO(rnp): this is really sucky. we need a better way to communicate this */ - u32 scratch_rf_size; + alignas(8) union { struct {u32 block, size;}; u64 U64; } rf_meta; BeamformerLiveImagingParameters live_imaging_parameters; BeamformerLiveImagingDirtyFlags live_imaging_dirty_flags; diff --git a/build.c b/build.c @@ -528,6 +528,9 @@ cmd_base(Arena *a, Options *o) if (o->debug) cmd_append(a, &result, DEBUG_FLAGS); else cmd_append(a, &result, OPTIMIZED_FLAGS); + /* NOTE: glibc devs are actually buffoons who never write any real code */ + if (is_unix) cmd_append(a, &result, "-D_XOPEN_SOURCE=600"); + /* NOTE: ancient gcc bug: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=80454 */ if (is_gcc) cmd_append(a, &result, "-Wno-missing-braces"); @@ -2861,6 +2864,8 @@ metagen_file_direct(Arena arena, char *filename) i32 main(i32 argc, char *argv[]) { + os_common_init(); + u64 start_time = os_get_timer_counter(); g_argv0 = argv[0]; diff --git a/helpers/ogl_beamformer_lib.c b/helpers/ogl_beamformer_lib.c @@ -442,7 +442,8 @@ beamformer_push_data_base(void *data, u32 data_size, i32 timeout_ms, u32 block) lib_release_lock(BeamformerSharedMemoryLockKind_ScratchSpace); /* TODO(rnp): need a better way to communicate this */ - atomic_store_u32(&g_beamformer_library_context.bp->scratch_rf_size, size); + typeof(g_beamformer_library_context.bp->rf_meta) meta = {.block = block, .size = size}; + atomic_store_u64(&g_beamformer_library_context.bp->rf_meta.U64, meta.U64); result = 1; } } diff --git a/main_linux.c b/main_linux.c @@ -71,11 +71,14 @@ dispatch_file_watch_events(FileWatchDirectoryList *fwctx, Arena arena) extern i32 main(void) { - Arena program_memory = os_alloc_arena(MB(16)); + os_common_init(); + + Arena program_memory = os_alloc_arena(MB(16) + KB(4)); BeamformerCtx *ctx = 0; BeamformerInput *input = 0; + os_linux_context.arena = sub_arena(&program_memory, KB(4), KB(4)); os_linux_context.inotify_handle = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); setup_beamformer(&program_memory, &ctx, &input); diff --git a/main_w32.c b/main_w32.c @@ -100,14 +100,16 @@ clear_io_queue(BeamformerInput *input, Arena arena) extern i32 main(void) { - Arena program_memory = os_alloc_arena(MB(16)); + os_common_init(); + + Arena program_memory = os_alloc_arena(MB(16) + KB(4)); BeamformerCtx *ctx = 0; BeamformerInput *input = 0; + os_w32_context.arena = sub_arena(&program_memory, KB(4), KB(4)); os_w32_context.error_handle = GetStdHandle(STD_ERROR_HANDLE); os_w32_context.io_completion_handle = CreateIoCompletionPort(INVALID_FILE, 0, 0, 0); - os_w32_context.timer_frequency = os_get_timer_frequency(); setup_beamformer(&program_memory, &ctx, &input); diff --git a/math.c b/math.c @@ -128,6 +128,21 @@ u128_equal(u128 a, u128 b) return result; } +function RangeU64 +subrange_n_from_n_m_count(u64 n, u64 n_count, u64 m) +{ + assert(n < n_count); + + u64 per_lane = m / n_count; + u64 leftover = m - per_lane * n_count; + u64 leftovers_before_n = MIN(leftover, n); + u64 base_index = n * per_lane + leftovers_before_n; + u64 one_past_last_index = base_index + per_lane + ((n < leftover) ? 1 : 0); + + RangeU64 result = {base_index, one_past_last_index}; + return result; +} + function b32 iv2_equal(iv2 a, iv2 b) { diff --git a/os_linux.c b/os_linux.c @@ -19,10 +19,14 @@ #include <sys/mman.h> #include <sys/stat.h> #include <sys/syscall.h> +#include <sys/sysinfo.h> #include <unistd.h> typedef struct { + Arena arena; + i32 arena_lock; i32 inotify_handle; + OS_SystemInfo system_info; } OS_LinuxContext; global OS_LinuxContext os_linux_context; @@ -33,6 +37,7 @@ global OS_LinuxContext os_linux_context; i32 ftruncate(i32, i64); i64 syscall(i64, ...); i32 clock_gettime(i32, struct timespec *); +int getpagesize(void); #ifdef _DEBUG function void * @@ -99,10 +104,17 @@ os_get_timer_counter(void) return result; } +function void +os_common_init(void) +{ + os_linux_context.system_info.logical_processor_count = (u32)get_nprocs(); + os_linux_context.system_info.page_size = (u32)getpagesize(); +} + function iz os_round_up_to_page_size(iz value) { - iz result = round_up_to(value, sysconf(_SC_PAGESIZE)); + iz result = round_up_to(value, os_linux_context.system_info.page_size); return result; } @@ -260,16 +272,6 @@ function OS_ADD_FILE_WATCH_FN(os_add_file_watch) fw->hash = u64_hash_from_s8(s8_cut_head(path, dir->name.len + 1)); } -i32 pthread_setname_np(pthread_t, char *); -function iptr -os_create_thread(Arena arena, iptr user_context, s8 name, os_thread_entry_point_fn *fn) -{ - pthread_t result; - pthread_create(&result, 0, (void *)fn, (void *)user_context); - pthread_setname_np(result, (char *)name.data); - return (iptr)result; -} - function OS_WAIT_ON_VALUE_FN(os_wait_on_value) { struct timespec *timeout = 0, timeout_value; @@ -289,23 +291,81 @@ function OS_WAKE_WAITERS_FN(os_wake_waiters) } } -function OS_SHARED_MEMORY_LOCK_REGION_FN(os_shared_memory_region_lock) +function b32 +os_take_lock(i32 *lock, i32 timeout_ms) { b32 result = 0; for (;;) { i32 current = 0; - if (atomic_cas_u32(locks + lock_index, &current, 1)) + if (atomic_cas_u32(lock, &current, 1)) result = 1; - if (result || !timeout_ms || !os_wait_on_value(locks + lock_index, current, timeout_ms)) + if (result || !timeout_ms || !os_wait_on_value(lock, current, (u32)timeout_ms)) break; } return result; } -function OS_SHARED_MEMORY_UNLOCK_REGION_FN(os_shared_memory_region_unlock) +function void +os_release_lock(i32 *lock) { - i32 *lock = locks + lock_index; assert(atomic_load_u32(lock)); atomic_store_u32(lock, 0); os_wake_waiters(lock); } + +function OS_SHARED_MEMORY_LOCK_REGION_FN(os_shared_memory_region_lock) +{ + b32 result = os_take_lock(locks + lock_index, (i32)timeout_ms); + return result; +} + +function OS_SHARED_MEMORY_UNLOCK_REGION_FN(os_shared_memory_region_unlock) +{ + os_release_lock(locks + lock_index); +} + +function OS_SystemInfo * +os_get_system_info(void) +{ + return &os_linux_context.system_info; +} + +function Barrier +os_barrier_alloc(u32 count) +{ + Barrier result = {0}; + DeferLoop(os_take_lock(&os_linux_context.arena_lock, -1), + os_release_lock(&os_linux_context.arena_lock)) + { + pthread_barrier_t *barrier = push_struct(&os_linux_context.arena, pthread_barrier_t); + pthread_barrier_init(barrier, 0, count); + result.value[0] = (u64)barrier; + } + return result; +} + +function void +os_barrier_wait(Barrier barrier) +{ + pthread_barrier_t *b = (pthread_barrier_t *)barrier.value[0]; + if (b) pthread_barrier_wait(b); +} + +function iptr +os_create_thread(iptr user_context, os_thread_entry_point_fn *fn) +{ + pthread_t result; + pthread_create(&result, 0, (void *)fn, (void *)user_context); + return (iptr)result; +} + +i32 pthread_setname_np(pthread_t, char *); +function void +os_set_thread_name(iptr thread, s8 name) +{ + char buffer[16]; + u64 length = (u64)CLAMP(name.len, 0, countof(buffer) - 1); + mem_copy(buffer, name.data, length); + buffer[length] = 0; + pthread_setname_np((pthread_t)thread, buffer); +} diff --git a/os_win32.c b/os_win32.c @@ -58,6 +58,28 @@ typedef struct { } w32_file_notify_info; typedef struct { + u32 reserved1; + u32 reserved2; + u64 Reserved3[2]; + u32 reserved4; + u32 reserved5; +} w32_synchronization_barrier; + +typedef struct { + u16 architecture; + u16 _pad1; + u32 page_size; + iz minimum_application_address; + iz maximum_application_address; + u64 active_processor_mask; + u32 number_of_processors; + u32 processor_type; + u32 allocation_granularity; + u16 processor_level; + u16 processor_revision; +} w32_system_info; + +typedef struct { uptr internal, internal_high; union { struct {u32 off, off_high;}; @@ -90,6 +112,7 @@ W32(iptr) CreateIoCompletionPort(iptr, iptr, uptr, u32); W32(iptr) CreateSemaphoreA(iptr, i32, i32, c8 *); W32(iptr) CreateThread(iptr, uz, iptr, iptr, u32, u32 *); W32(b32) DeleteFileA(c8 *); +W32(b32) EnterSynchronizationBarrier(w32_synchronization_barrier *, u32); W32(void) ExitProcess(i32); W32(b32) FreeLibrary(void *); W32(i32) GetFileAttributesA(c8 *); @@ -99,7 +122,8 @@ W32(void *) GetModuleHandleA(c8 *); W32(void *) GetProcAddress(void *, c8 *); W32(b32) GetQueuedCompletionStatus(iptr, u32 *, uptr *, w32_overlapped **, u32); W32(iptr) GetStdHandle(i32); -W32(void) GetSystemInfo(void *); +W32(void) GetSystemInfo(w32_system_info *); +W32(b32) InitializeSynchronizationBarrier(w32_synchronization_barrier *, i32, i32); W32(void *) LoadLibraryA(c8 *); W32(void *) MapViewOfFile(iptr, u32, u32, u32, u64); W32(b32) QueryPerformanceCounter(u64 *); @@ -116,9 +140,12 @@ W32(b32) WriteFile(iptr, u8 *, i32, i32 *, void *); W32(void *) VirtualAlloc(u8 *, iz, u32, u32); typedef struct { - iptr error_handle; - iptr io_completion_handle; - u64 timer_frequency; + Arena arena; + i32 arena_lock; + iptr error_handle; + iptr io_completion_handle; + u64 timer_frequency; + OS_SystemInfo system_info; } OS_W32Context; global OS_W32Context os_w32_context; @@ -173,8 +200,7 @@ os_path_separator(void) function u64 os_get_timer_frequency(void) { - u64 result; - QueryPerformanceFrequency(&result); + u64 result = os_w32_context.timer_frequency; return result; } @@ -186,24 +212,22 @@ os_get_timer_counter(void) return result; } +function void +os_common_init(void) +{ + w32_system_info info = {0}; + GetSystemInfo(&info); + + os_w32_context.system_info.page_size = info.page_size; + os_w32_context.system_info.logical_processor_count = info.number_of_processors; + + QueryPerformanceFrequency(&os_w32_context.timer_frequency); +} + function iz os_round_up_to_page_size(iz value) { - struct { - u16 architecture; - u16 _pad1; - u32 page_size; - iz minimum_application_address; - iz maximum_application_address; - u64 active_processor_mask; - u32 number_of_processors; - u32 processor_type; - u32 allocation_granularity; - u16 processor_level; - u16 processor_revision; - } info; - GetSystemInfo(&info); - iz result = round_up_to(value, info.page_size); + iz result = round_up_to(value, os_w32_context.system_info.page_size); return result; } @@ -386,14 +410,6 @@ function OS_ADD_FILE_WATCH_FN(os_add_file_watch) fw->hash = u64_hash_from_s8(s8_cut_head(path, dir->name.len + 1)); } -function iptr -os_create_thread(Arena arena, iptr user_context, s8 name, os_thread_entry_point_fn *fn) -{ - iptr result = CreateThread(0, 0, (iptr)fn, user_context, 0, 0); - SetThreadDescription(result, s8_to_s16(&arena, name).data); - return result; -} - function OS_WAIT_ON_VALUE_FN(os_wait_on_value) { return WaitOnAddress(value, &current, sizeof(*value), timeout_ms); @@ -407,6 +423,28 @@ function OS_WAKE_WAITERS_FN(os_wake_waiters) } } +function b32 +os_take_lock(i32 *lock, i32 timeout_ms) +{ + b32 result = 0; + for (;;) { + i32 current = 0; + if (atomic_cas_u32(lock, &current, 1)) + result = 1; + if (result || !timeout_ms || !os_wait_on_value(lock, current, (u32)timeout_ms)) + break; + } + return result; +} + +function void +os_release_lock(i32 *lock) +{ + assert(atomic_load_u32(lock)); + atomic_store_u32(lock, 0); + os_wake_waiters(lock); +} + function OS_SHARED_MEMORY_LOCK_REGION_FN(os_shared_memory_region_lock) { w32_shared_memory_context *ctx = (typeof(ctx))sm->os_context; @@ -418,7 +456,51 @@ function OS_SHARED_MEMORY_LOCK_REGION_FN(os_shared_memory_region_lock) function OS_SHARED_MEMORY_UNLOCK_REGION_FN(os_shared_memory_region_unlock) { w32_shared_memory_context *ctx = (typeof(ctx))sm->os_context; - assert(atomic_load_u32(locks + lock_index)); - os_wake_waiters(locks + lock_index); + os_release_lock(locks + lock_index); ReleaseSemaphore(ctx->semaphores[lock_index], 1, 0); } + +function OS_SystemInfo * +os_get_system_info(void) +{ + return &os_w32_context.system_info; +} + +function Barrier +os_barrier_alloc(u32 count) +{ + Barrier result = {0}; + DeferLoop(os_take_lock(&os_w32_context.arena_lock, -1), + os_release_lock(&os_w32_context.arena_lock)) + { + w32_synchronization_barrier *barrier = push_struct(&os_w32_context.arena, w32_synchronization_barrier); + InitializeSynchronizationBarrier(barrier, (i32)count, -1); + result.value[0] = (u64)barrier; + } + return result; +} + +function void +os_barrier_wait(Barrier barrier) +{ + w32_synchronization_barrier *b = (w32_synchronization_barrier *)barrier.value[0]; + if (b) EnterSynchronizationBarrier(b, 0); +} + +function iptr +os_create_thread(iptr user_context, os_thread_entry_point_fn *fn) +{ + iptr result = CreateThread(0, 0, (iptr)fn, user_context, 0, 0); + return result; +} + +function void +os_set_thread_name(iptr thread, s8 name) +{ + DeferLoop(os_take_lock(&os_w32_context.arena_lock, -1), + os_release_lock(&os_w32_context.arena_lock)) + { + Arena arena = os_w32_context.arena; + SetThreadDescription(thread, s8_to_s16(&arena, name).data); + } +} diff --git a/static.c b/static.c @@ -278,6 +278,7 @@ worker_thread_sleep(GLWorkerThreadContext *ctx, BeamformerSharedMemory *sm) break; } + /* TODO(rnp): clean this crap up; we shouldn't need two values to communicate this */ atomic_store_u32(&ctx->asleep, 1); os_wait_on_value(&ctx->sync_variable, 1, (u32)-1); atomic_store_u32(&ctx->asleep, 0); @@ -306,6 +307,32 @@ function OS_THREAD_ENTRY_POINT_FN(compute_worker_thread_entry_point) return 0; } +function OS_THREAD_ENTRY_POINT_FN(beamformer_upload_entry_point) +{ + GLWorkerThreadContext *gl_thread_context = 0; + BeamformerUploadThreadContext *up = 0; + { + ThreadContext *ctx = (ThreadContext *)_ctx; + lane_context(ctx); + + if (lane_index() == 0) { + gl_thread_context = (GLWorkerThreadContext *)ctx->lane_context.broadcast_memory[0]; + up = (BeamformerUploadThreadContext *)gl_thread_context->user_context; + } + } + + for (;;) { + if (lane_index() == 0) + worker_thread_sleep(gl_thread_context, up->shared_memory->region); + + lane_sync(); + + beamformer_rf_upload(up); + } + + unreachable(); +} + function OS_THREAD_ENTRY_POINT_FN(upload_worker_thread_entry_point) { GLWorkerThreadContext *ctx = (GLWorkerThreadContext *)_ctx; @@ -317,12 +344,41 @@ function OS_THREAD_ENTRY_POINT_FN(upload_worker_thread_entry_point) /* NOTE(rnp): start this here so we don't have to worry about it being started or not */ glQueryCounter(up->rf_buffer->data_timestamp_query, GL_TIMESTAMP); - for (;;) { - worker_thread_sleep(ctx, up->shared_memory->region); - asan_poison_region(ctx->arena.beg, ctx->arena.end - ctx->arena.beg); - beamformer_rf_upload(up, ctx->arena); + u64 lane_broadcast_value = 0; + + ThreadContext *threads; + { + u32 main_threads = 3 - 1; + u32 async_threads_count = os_get_system_info()->logical_processor_count; + u32 main_threads_clamped = MIN(async_threads_count, main_threads); + async_threads_count -= main_threads_clamped; + + /* NOTE(rnp): always memory bound right now so more threads don't help anything */ + async_threads_count = 1; + + //Barrier barrier = os_barrier_alloc(async_threads_count); + Barrier barrier = {0}; + threads = push_array(&ctx->arena, ThreadContext, (iz)async_threads_count); + + for (u64 index = 0; index < async_threads_count; index++) { + Stream name = stream_from_buffer(threads[index].name, countof(threads[index].name)); + stream_append_s8(&name, s8("[upload_")); + stream_append_u64(&name, index); + stream_append_s8(&name, s8("]")); + threads[index].lane_context.index = index; + threads[index].lane_context.count = async_threads_count; + threads[index].lane_context.barrier = barrier; + threads[index].lane_context.broadcast_memory = &lane_broadcast_value; + if (index != 0) { + iptr thread = os_create_thread((iptr)(threads + index), beamformer_upload_entry_point); + os_set_thread_name(thread, stream_to_s8(&name)); + } + } } + threads[0].lane_context.broadcast_memory[0] = (u64)ctx; + beamformer_upload_entry_point((iptr)threads); + unreachable(); return 0; @@ -331,8 +387,8 @@ function OS_THREAD_ENTRY_POINT_FN(upload_worker_thread_entry_point) function void setup_beamformer(Arena *memory, BeamformerCtx **o_ctx, BeamformerInput **o_input) { - Arena compute_arena = sub_arena(memory, MB(2), KB(4)); - Arena upload_arena = sub_arena(memory, KB(64), KB(4)); + Arena compute_arena = sub_arena(memory, MB(2), KB(4)); + Arena upload_arena = sub_arena(memory, KB(4), KB(4)); Stream error = stream_alloc(memory, MB(1)); Arena ui_arena = sub_arena(memory, MB(2), KB(4)); @@ -393,8 +449,8 @@ setup_beamformer(Arena *memory, BeamformerCtx **o_ctx, BeamformerInput **o_input /* TODO(rnp): we should lock this down after we have something working */ worker->user_context = (iptr)ctx; worker->window_handle = glfwCreateWindow(1, 1, "", 0, raylib_window_handle); - worker->handle = os_create_thread(*memory, (iptr)worker, s8("[compute]"), - compute_worker_thread_entry_point); + worker->handle = os_create_thread((iptr)worker, compute_worker_thread_entry_point); + os_set_thread_name(worker->handle, s8("[compute]")); GLWorkerThreadContext *upload = &ctx->upload_worker; BeamformerUploadThreadContext *upctx = push_struct(memory, typeof(*upctx)); @@ -404,8 +460,8 @@ setup_beamformer(Arena *memory, BeamformerCtx **o_ctx, BeamformerInput **o_input upctx->compute_timing_table = ctx->compute_timing_table; upctx->compute_worker_sync = &ctx->compute_worker.sync_variable; upload->window_handle = glfwCreateWindow(1, 1, "", 0, raylib_window_handle); - upload->handle = os_create_thread(*memory, (iptr)upload, s8("[upload]"), - upload_worker_thread_entry_point); + upload->handle = os_create_thread((iptr)upload, upload_worker_thread_entry_point); + os_set_thread_name(worker->handle, s8("[upload_0]")); glfwMakeContextCurrent(raylib_window_handle); diff --git a/tests/throughput.c b/tests/throughput.c @@ -74,8 +74,6 @@ die_(char *function_name, char *format, ...) #include <sys/stat.h> #include <unistd.h> -function void os_init_timer(void) { } - function f64 os_get_time(void) { @@ -111,12 +109,6 @@ os_read_file_simp(char *fname) #elif OS_WINDOWS -function void -os_init_timer(void) -{ - os_w32_context.timer_frequency = os_get_timer_frequency(); -} - function f64 os_get_time(void) { @@ -470,7 +462,7 @@ main(i32 argc, char *argv[]) if (!BETWEEN(options.remaining_count, 1, 2)) usage(argv[0]); - os_init_timer(); + os_common_init(); signal(SIGINT, sigint); diff --git a/threads.c b/threads.c @@ -0,0 +1,32 @@ +/* See LICENSE for license details. */ +thread_static ThreadContext *thread_context_local = 0; + +#define lane_context(ctx) thread_context_select((ctx)) +#define lane_index() (thread_context_local->lane_context.index) +#define lane_count() (thread_context_local->lane_context.count) +#define lane_sync() thread_context_barrier_wait(0, 0, 0) +#define lane_sync_u64(ptr, src_lane) thread_context_barrier_wait((ptr), sizeof(*(ptr)), (src_lane)) +#define lane_range(count) subrange_n_from_n_m_count(lane_index(), lane_count(), (count)) + +function void +thread_context_select(ThreadContext *tctx) +{ + thread_context_local = tctx; +} + +function void +thread_context_barrier_wait(void *broadcast, u64 broadcast_size, u64 broadcast_lane_index) +{ + ThreadContext *ctx = thread_context_local; + u64 broadcast_size_clamped = MIN(broadcast_size, sizeof(ctx->lane_context.broadcast_memory[0])); + if (broadcast && lane_index() == broadcast_lane_index) + mem_copy(ctx->lane_context.broadcast_memory, broadcast, broadcast_size_clamped); + + os_barrier_wait(ctx->lane_context.barrier); + + if (broadcast && lane_index() != broadcast_lane_index) + mem_copy(broadcast, ctx->lane_context.broadcast_memory, broadcast_size_clamped); + + if (broadcast) + os_barrier_wait(ctx->lane_context.barrier); +} diff --git a/util.c b/util.c @@ -203,10 +203,16 @@ utf16_encode(u16 *out, u32 cp) } function Stream +stream_from_buffer(u8 *buffer, u32 capacity) +{ + Stream result = {.data = buffer, .cap = (i32)capacity}; + return result; +} + +function Stream stream_alloc(Arena *a, i32 cap) { - Stream result = {.cap = cap}; - result.data = arena_commit(a, cap); + Stream result = stream_from_buffer(arena_commit(a, cap), (u32)cap); return result; } diff --git a/util.h b/util.h @@ -65,6 +65,14 @@ #define global static #define local_persist static +#if COMPILER_MSVC + #define thread_static __declspec(thread) +#elif COMPILER_CLANG || COMPILER_GCC + #define thread_static __thread +#else + #error thread_static not defined for this compiler +#endif + #define alignof _Alignof #define static_assert _Static_assert @@ -92,6 +100,9 @@ #define f32_cmp(x, y) (ABS((x) - (y)) <= F32_EPSILON * MAX(1.0f, MAX(ABS(x), ABS(y)))) +#define DeferLoop(begin, end) for (i32 _i_ = ((begin), 0); !_i_; _i_ += 1, (end)) +#define DeferLoopTag(begin, end, tag) for (i32 __##tag = ((begin), 0); !__##tag ; __##tag += 1, (end)) + #define EachBit(a, it) (u64 it = ctz_u64(a); it != 64; a &= ~(1u << (it)), it = ctz_u64(a)) #define EachElement(array, it) (u64 it = 0; it < countof(array); it += 1) #define EachEnumValue(type, it) (type it = (type)0; it < type##_Count; it = (type)(it + 1)) @@ -190,6 +201,8 @@ typedef struct { f32 x, y; } Vector2; typedef struct { f32 x, y, w, h; } Rectangle; #endif +typedef struct { u64 start, stop; } RangeU64; + typedef union { struct { i32 x, y; }; struct { i32 w, h; }; @@ -314,6 +327,22 @@ typedef struct { iptr os_context; } SharedMemoryRegion; +typedef struct { u64 value[1]; } Barrier; + +typedef struct { + u64 index; + u64 count; + Barrier barrier; + u64 *broadcast_memory; +} LaneContext; + +typedef struct { + u8 name[16]; + u64 name_length; + + LaneContext lane_context; +} ThreadContext; + #define OS_ALLOC_ARENA_FN(name) Arena name(iz capacity) typedef OS_ALLOC_ARENA_FN(os_alloc_arena_fn); @@ -362,6 +391,10 @@ typedef alignas(16) u8 RenderDocAPI[216]; #define RENDERDOC_START_FRAME_CAPTURE(a) (renderdoc_start_frame_capture_fn *)RENDERDOC_API_FN_ADDR(a, 152) #define RENDERDOC_END_FRAME_CAPTURE(a) (renderdoc_end_frame_capture_fn *) RENDERDOC_API_FN_ADDR(a, 168) +typedef struct { + u32 logical_processor_count; + u32 page_size; +} OS_SystemInfo; #define LABEL_GL_OBJECT(type, id, s) {s8 _s = (s); glObjectLabel(type, id, (i32)_s.len, (c8 *)_s.data);}