ogl_beamformer_lib.c (13667B)
1 /* See LICENSE for license details. */ 2 #include "../util.h" 3 #include "../beamformer_parameters.h" 4 #include "../beamformer_work_queue.c" 5 6 #define PIPE_RETRY_PERIOD_MS (100ULL) 7 8 static BeamformerSharedMemory *g_bp; 9 10 #if defined(__linux__) 11 #include <fcntl.h> 12 #include <linux/futex.h> 13 #include <poll.h> 14 #include <sys/mman.h> 15 #include <sys/stat.h> 16 #include <sys/syscall.h> 17 #include <time.h> 18 #include <unistd.h> 19 20 i64 syscall(i64, ...); 21 22 #define OS_EXPORT_PIPE_NAME "/tmp/beamformer_output_pipe" 23 24 #elif defined(_WIN32) 25 26 #define OS_EXPORT_PIPE_NAME "\\\\.\\pipe\\beamformer_output_fifo" 27 28 #define OPEN_EXISTING 3 29 #define GENERIC_WRITE 0x40000000 30 #define FILE_MAP_ALL_ACCESS 0x000F001F 31 32 #define PIPE_TYPE_BYTE 0x00 33 #define PIPE_ACCESS_INBOUND 0x01 34 35 #define PIPE_WAIT 0x00 36 #define PIPE_NOWAIT 0x01 37 38 #define ERROR_NO_DATA 232L 39 #define ERROR_PIPE_NOT_CONNECTED 233L 40 #define ERROR_PIPE_LISTENING 536L 41 42 #define W32(r) __declspec(dllimport) r __stdcall 43 W32(b32) CloseHandle(iptr); 44 W32(iptr) CreateFileA(c8 *, u32, u32, void *, u32, u32, void *); 45 W32(iptr) CreateNamedPipeA(c8 *, u32, u32, u32, u32, u32, u32, void *); 46 W32(b32) DisconnectNamedPipe(iptr); 47 W32(i32) GetLastError(void); 48 W32(iptr) MapViewOfFile(iptr, u32, u32, u32, u64); 49 W32(iptr) OpenFileMappingA(u32, b32, c8 *); 50 W32(b32) ReadFile(iptr, u8 *, i32, i32 *, void *); 51 W32(void) Sleep(u32); 52 W32(void) UnmapViewOfFile(iptr); 53 W32(b32) WaitOnAddress(void *, void *, uz, i32); 54 W32(b32) WriteFile(iptr, u8 *, i32, i32 *, void *); 55 56 #else 57 #error Unsupported Platform 58 #endif 59 60 #if defined(MATLAB_CONSOLE) 61 #define mexErrMsgIdAndTxt mexErrMsgIdAndTxt_800 62 #define mexWarnMsgIdAndTxt mexWarnMsgIdAndTxt_800 63 void mexErrMsgIdAndTxt(const c8*, c8*, ...); 64 void mexWarnMsgIdAndTxt(const c8*, c8*, ...); 65 #define error_tag "ogl_beamformer_lib:error" 66 #define error_msg(...) mexErrMsgIdAndTxt(error_tag, __VA_ARGS__) 67 #define warning_msg(...) mexWarnMsgIdAndTxt(error_tag, __VA_ARGS__) 68 #else 69 #define error_msg(...) 70 #define warning_msg(...) 71 #endif 72 73 #if defined(__linux__) 74 static OS_WAIT_ON_VALUE_FN(os_wait_on_value) 75 { 76 struct timespec *timeout = 0, timeout_value; 77 if (timeout_ms != -1) { 78 timeout_value.tv_sec = timeout_ms / 1000; 79 timeout_value.tv_nsec = (timeout_ms % 1000) * 1000000; 80 timeout = &timeout_value; 81 } 82 return syscall(SYS_futex, value, FUTEX_WAIT, current, timeout, 0, 0) == 0; 83 } 84 85 static Pipe 86 os_open_read_pipe(char *name) 87 { 88 mkfifo(name, 0660); 89 return (Pipe){.file = open(name, O_RDONLY|O_NONBLOCK), .name = name}; 90 } 91 92 static void 93 os_disconnect_pipe(Pipe p) 94 { 95 } 96 97 static void 98 os_close_pipe(iptr *file, char *name) 99 { 100 if (file) close(*file); 101 if (name) unlink(name); 102 *file = INVALID_FILE; 103 } 104 105 static b32 106 os_wait_read_pipe(Pipe p, void *buf, iz read_size, u32 timeout_ms) 107 { 108 struct pollfd pfd = {.fd = p.file, .events = POLLIN}; 109 iz total_read = 0; 110 if (poll(&pfd, 1, timeout_ms) > 0) { 111 iz r; 112 do { 113 r = read(p.file, (u8 *)buf + total_read, read_size - total_read); 114 if (r > 0) total_read += r; 115 } while (r != 0); 116 } 117 return total_read == read_size; 118 } 119 120 static BeamformerSharedMemory * 121 os_open_shared_memory_area(char *name) 122 { 123 BeamformerSharedMemory *result = 0; 124 i32 fd = shm_open(name, O_RDWR, S_IRUSR|S_IWUSR); 125 if (fd > 0) { 126 void *new = mmap(0, BEAMFORMER_SHARED_MEMORY_SIZE, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0); 127 if (new != MAP_FAILED) 128 result = new; 129 close(fd); 130 } 131 return result; 132 } 133 134 #elif defined(_WIN32) 135 136 static OS_WAIT_ON_VALUE_FN(os_wait_on_value) 137 { 138 /* TODO(rnp): this doesn't work across processes on win32 (return 1 to cause a spin wait) */ 139 return 1; 140 return WaitOnAddress(value, ¤t, sizeof(*value), timeout_ms); 141 } 142 143 static Pipe 144 os_open_read_pipe(char *name) 145 { 146 iptr file = CreateNamedPipeA(name, PIPE_ACCESS_INBOUND, PIPE_TYPE_BYTE|PIPE_NOWAIT, 1, 147 0, 1024UL * 1024UL, 0, 0); 148 return (Pipe){.file = file, .name = name}; 149 } 150 151 static void 152 os_disconnect_pipe(Pipe p) 153 { 154 DisconnectNamedPipe(p.file); 155 } 156 157 static void 158 os_close_pipe(iptr *file, char *name) 159 { 160 if (file) CloseHandle(*file); 161 *file = INVALID_FILE; 162 } 163 164 static b32 165 os_wait_read_pipe(Pipe p, void *buf, iz read_size, u32 timeout_ms) 166 { 167 iz elapsed_ms = 0, total_read = 0; 168 while (elapsed_ms <= timeout_ms && read_size != total_read) { 169 u8 data; 170 i32 read; 171 b32 result = ReadFile(p.file, &data, 0, &read, 0); 172 if (!result) { 173 i32 error = GetLastError(); 174 if (error != ERROR_NO_DATA && 175 error != ERROR_PIPE_LISTENING && 176 error != ERROR_PIPE_NOT_CONNECTED) 177 { 178 /* NOTE: pipe is in a bad state; we will never read anything */ 179 break; 180 } 181 Sleep(PIPE_RETRY_PERIOD_MS); 182 elapsed_ms += PIPE_RETRY_PERIOD_MS; 183 } else { 184 ReadFile(p.file, (u8 *)buf + total_read, read_size - total_read, &read, 0); 185 total_read += read; 186 } 187 } 188 return total_read == read_size; 189 } 190 191 static BeamformerSharedMemory * 192 os_open_shared_memory_area(char *name) 193 { 194 BeamformerSharedMemory *result = 0; 195 iptr h = OpenFileMappingA(FILE_MAP_ALL_ACCESS, 0, name); 196 if (h != INVALID_FILE) { 197 iptr view = MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, BEAMFORMER_SHARED_MEMORY_SIZE); 198 result = (BeamformerSharedMemory *)view; 199 CloseHandle(h); 200 } 201 202 return result; 203 } 204 205 #endif 206 207 static b32 208 check_shared_memory(char *name) 209 { 210 if (!g_bp) { 211 g_bp = os_open_shared_memory_area(name); 212 if (!g_bp) { 213 error_msg("failed to open shared memory area"); 214 return 0; 215 } 216 } 217 return 1; 218 } 219 220 b32 221 set_beamformer_pipeline(char *shm_name, i32 *stages, i32 stages_count) 222 { 223 if (stages_count > ARRAY_COUNT(g_bp->compute_stages)) { 224 error_msg("maximum stage count is %lu", ARRAY_COUNT(g_bp->compute_stages)); 225 return 0; 226 } 227 228 if (!check_shared_memory(shm_name)) 229 return 0; 230 231 for (i32 i = 0; i < stages_count; i++) { 232 b32 valid = 0; 233 #define X(en, number, sfn, nh, pn) if (number == stages[i]) valid = 1; 234 COMPUTE_SHADERS 235 #undef X 236 237 if (!valid) { 238 error_msg("invalid shader stage: %d", stages[i]); 239 return 0; 240 } 241 242 g_bp->compute_stages[i] = stages[i]; 243 } 244 g_bp->compute_stages_count = stages_count; 245 246 return 1; 247 } 248 249 b32 250 beamformer_start_compute(char *shm_name, u32 image_plane_tag) 251 { 252 b32 result = image_plane_tag < IPT_LAST && check_shared_memory(shm_name); 253 if (result) { 254 result = !atomic_load(&g_bp->dispatch_compute_sync); 255 if (result) { 256 g_bp->current_image_plane = image_plane_tag; 257 atomic_store(&g_bp->dispatch_compute_sync, 1); 258 } 259 } 260 return result; 261 } 262 263 function b32 264 beamformer_upload_buffer(char *shm_name, void *data, u32 size, i32 store_offset, i32 sync_offset, 265 BeamformerUploadKind kind, i32 timeout_ms) 266 { 267 b32 result = check_shared_memory(shm_name); 268 if (result) { 269 BeamformWork *work = beamform_work_queue_push(&g_bp->external_work_queue); 270 result = work && try_wait_sync((i32 *)((u8 *)g_bp + sync_offset), timeout_ms, os_wait_on_value); 271 if (result) { 272 BeamformerUploadContext *uc = &work->upload_context; 273 uc->shared_memory_offset = store_offset; 274 uc->size = size; 275 uc->kind = kind; 276 work->type = BW_UPLOAD_BUFFER; 277 work->completion_barrier = sync_offset; 278 mem_copy((u8 *)g_bp + store_offset, data, size); 279 beamform_work_queue_push_commit(&g_bp->external_work_queue); 280 } 281 } 282 return result; 283 } 284 285 #define BEAMFORMER_UPLOAD_FNS \ 286 X(channel_mapping, i16, 1, CHANNEL_MAPPING) \ 287 X(sparse_elements, i16, 1, SPARSE_ELEMENTS) \ 288 X(focal_vectors, f32, 2, FOCAL_VECTORS) 289 290 #define X(name, dtype, elements, command) \ 291 b32 beamformer_push_##name (char *shm_id, dtype *data, u32 count, i32 timeout_ms) { \ 292 b32 result = count <= ARRAY_COUNT(g_bp->name); \ 293 if (result) { \ 294 result = beamformer_upload_buffer(shm_id, data, count * elements * sizeof(dtype), \ 295 offsetof(BeamformerSharedMemory, name), \ 296 offsetof(BeamformerSharedMemory, name##_sync), \ 297 BU_KIND_##command, timeout_ms); \ 298 } \ 299 return result; \ 300 } 301 BEAMFORMER_UPLOAD_FNS 302 #undef X 303 304 b32 305 beamformer_push_parameters(char *shm_name, BeamformerParameters *bp, i32 timeout_ms) 306 { 307 b32 result = beamformer_upload_buffer(shm_name, bp, sizeof(*bp), 308 offsetof(BeamformerSharedMemory, parameters), 309 offsetof(BeamformerSharedMemory, parameters_sync), 310 BU_KIND_PARAMETERS, timeout_ms); 311 return result; 312 } 313 314 b32 315 beamformer_push_data(char *shm_name, void *data, u32 data_size, i32 timeout_ms) 316 { 317 b32 result = data_size <= BEAMFORMER_MAX_RF_DATA_SIZE; 318 if (result) { 319 result = beamformer_upload_buffer(shm_name, data, data_size, BEAMFORMER_RF_DATA_OFF, 320 offsetof(BeamformerSharedMemory, raw_data_sync), 321 BU_KIND_RF_DATA, timeout_ms); 322 } 323 return result; 324 } 325 326 b32 327 beamformer_push_parameters_ui(char *shm_name, BeamformerUIParameters *bp, i32 timeout_ms) 328 { 329 b32 result = check_shared_memory(shm_name); 330 if (result) { 331 BeamformWork *work = beamform_work_queue_push(&g_bp->external_work_queue); 332 result = work && try_wait_sync(&g_bp->parameters_ui_sync, timeout_ms, os_wait_on_value); 333 if (result) { 334 BeamformerUploadContext *uc = &work->upload_context; 335 uc->shared_memory_offset = offsetof(BeamformerSharedMemory, parameters); 336 uc->size = sizeof(g_bp->parameters); 337 uc->kind = BU_KIND_PARAMETERS; 338 work->type = BW_UPLOAD_BUFFER; 339 work->completion_barrier = offsetof(BeamformerSharedMemory, parameters_ui_sync); 340 mem_copy(&g_bp->parameters_ui, bp, sizeof(*bp)); 341 beamform_work_queue_push_commit(&g_bp->external_work_queue); 342 } 343 } 344 return result; 345 } 346 347 b32 348 beamformer_push_parameters_head(char *shm_name, BeamformerParametersHead *bp, i32 timeout_ms) 349 { 350 b32 result = check_shared_memory(shm_name); 351 if (result) { 352 BeamformWork *work = beamform_work_queue_push(&g_bp->external_work_queue); 353 result = work && try_wait_sync(&g_bp->parameters_head_sync, timeout_ms, os_wait_on_value); 354 if (result) { 355 BeamformerUploadContext *uc = &work->upload_context; 356 uc->shared_memory_offset = offsetof(BeamformerSharedMemory, parameters); 357 uc->size = sizeof(g_bp->parameters); 358 uc->kind = BU_KIND_PARAMETERS; 359 work->type = BW_UPLOAD_BUFFER; 360 work->completion_barrier = offsetof(BeamformerSharedMemory, parameters_head_sync); 361 mem_copy(&g_bp->parameters_head, bp, sizeof(*bp)); 362 beamform_work_queue_push_commit(&g_bp->external_work_queue); 363 } 364 } 365 return result; 366 } 367 368 b32 369 set_beamformer_parameters(char *shm_name, BeamformerParametersV0 *new_bp) 370 { 371 b32 result = 0; 372 result |= beamformer_push_channel_mapping(shm_name, (i16 *)new_bp->channel_mapping, 373 ARRAY_COUNT(new_bp->channel_mapping), 0); 374 result |= beamformer_push_sparse_elements(shm_name, (i16 *)new_bp->uforces_channels, 375 ARRAY_COUNT(new_bp->uforces_channels), 0); 376 v2 focal_vectors[256]; 377 for (u32 i = 0; i < ARRAY_COUNT(focal_vectors); i++) 378 focal_vectors[i] = (v2){{new_bp->transmit_angles[i], new_bp->focal_depths[i]}}; 379 result |= beamformer_push_focal_vectors(shm_name, (f32 *)focal_vectors, ARRAY_COUNT(focal_vectors), 0); 380 result |= beamformer_push_parameters(shm_name, (BeamformerParameters *)&new_bp->xdc_transform, 0); 381 return result; 382 } 383 384 b32 385 send_data(char *pipe_name, char *shm_name, void *data, u32 data_size) 386 { 387 b32 result = beamformer_push_data(shm_name, data, data_size, 0); 388 if (result) { 389 if (beamformer_start_compute(shm_name, 0)) { 390 /* TODO(rnp): should we just set timeout on acquiring the lock instead of this? */ 391 try_wait_sync(&g_bp->raw_data_sync, -1, os_wait_on_value); 392 atomic_store(&g_bp->raw_data_sync, 1); 393 } else { 394 result = 0; 395 /* TODO(rnp): HACK: this is strictly meant for matlab; we need a real 396 * recovery method. for most (all?) old api uses this won't be hit */ 397 warning_msg("failed to start compute after sending data\n" 398 "library in a borked state\n" 399 "try calling beamformer_start_compute()"); 400 } 401 } 402 return result; 403 } 404 405 b32 406 beamform_data_synchronized(char *pipe_name, char *shm_name, void *data, u32 data_size, 407 uv4 output_points, f32 *out_data, i32 timeout_ms) 408 { 409 if (!check_shared_memory(shm_name)) 410 return 0; 411 412 if (output_points.x == 0) output_points.x = 1; 413 if (output_points.y == 0) output_points.y = 1; 414 if (output_points.z == 0) output_points.z = 1; 415 output_points.w = 1; 416 417 g_bp->parameters.output_points.x = output_points.x; 418 g_bp->parameters.output_points.y = output_points.y; 419 g_bp->parameters.output_points.z = output_points.z; 420 g_bp->export_next_frame = 1; 421 422 s8 export_name = s8(OS_EXPORT_PIPE_NAME); 423 if (export_name.len > ARRAY_COUNT(g_bp->export_pipe_name)) { 424 error_msg("export pipe name too long"); 425 return 0; 426 } 427 428 Pipe export_pipe = os_open_read_pipe(OS_EXPORT_PIPE_NAME); 429 if (export_pipe.file == INVALID_FILE) { 430 error_msg("failed to open export pipe"); 431 return 0; 432 } 433 434 for (u32 i = 0; i < export_name.len; i++) 435 g_bp->export_pipe_name[i] = export_name.data[i]; 436 437 b32 result = send_data(pipe_name, shm_name, data, data_size); 438 if (result) { 439 iz output_size = output_points.x * output_points.y * output_points.z * sizeof(f32) * 2; 440 result = os_wait_read_pipe(export_pipe, out_data, output_size, timeout_ms); 441 if (!result) 442 warning_msg("failed to read full export data from pipe"); 443 } 444 445 os_disconnect_pipe(export_pipe); 446 os_close_pipe(&export_pipe.file, export_pipe.name); 447 448 return result; 449 }