Commit: 5373fda164328953e538ada42fe564c122a63cc7
Parent: 19c2860c80d7773395dab3a00ed3edeaf1bdf375
Author: tkhenry-uofa
Date: Fri, 21 Feb 2025 11:53:18 -0700
lib: add timeouts and retries for pipe errors
Co-Authored-By: Randy Palamar <randy@rnpnr.xyz>
Diffstat:
2 files changed, 163 insertions(+), 90 deletions(-)
diff --git a/helpers/ogl_beamformer_lib.c b/helpers/ogl_beamformer_lib.c
@@ -22,10 +22,17 @@ static Pipe g_pipe = {.file = INVALID_FILE};
#define ARRAY_COUNT(a) (sizeof(a) / sizeof(*a))
+#define MS_TO_S (1000ULL)
+#define NS_TO_S (1000ULL * 1000ULL)
+
+#define PIPE_RETRY_PERIOD_MS (100ULL)
+
#if defined(__unix__)
#include <fcntl.h>
+#include <poll.h>
#include <sys/mman.h>
#include <sys/stat.h>
+#include <time.h>
#include <unistd.h>
#define OS_EXPORT_PIPE_NAME "/tmp/beamformer_output_pipe"
@@ -41,21 +48,51 @@ static Pipe g_pipe = {.file = INVALID_FILE};
#define PIPE_TYPE_BYTE 0x00
#define PIPE_ACCESS_INBOUND 0x01
+#define PIPE_WAIT 0x00
+#define PIPE_NOWAIT 0x01
+
+#define ERROR_NO_DATA 232L
+#define ERROR_PIPE_NOT_CONNECTED 233L
+#define ERROR_PIPE_LISTENING 536L
+
#define W32(r) __declspec(dllimport) r __stdcall
W32(b32) CloseHandle(iptr);
W32(iptr) CreateFileA(c8 *, u32, u32, void *, u32, u32, void *);
W32(iptr) CreateNamedPipeA(c8 *, u32, u32, u32, u32, u32, u32, void *);
+W32(b32) DisconnectNamedPipe(iptr);
+W32(i32) GetLastError(void);
W32(iptr) MapViewOfFile(iptr, u32, u32, u32, u64);
W32(iptr) OpenFileMappingA(u32, b32, c8 *);
W32(b32) ReadFile(iptr, u8 *, i32, i32 *, void *);
+W32(void) Sleep(u32);
+W32(void) UnmapViewOfFile(iptr);
W32(b32) WriteFile(iptr, u8 *, i32, i32 *, void *);
#else
#error Unsupported Platform
#endif
+#if defined(MATLAB_CONSOLE)
+#define mexErrMsgIdAndTxt mexErrMsgIdAndTxt_800
+#define mexWarnMsgIdAndTxt mexWarnMsgIdAndTxt_800
+void mexErrMsgIdAndTxt(const c8*, c8*, ...);
+void mexWarnMsgIdAndTxt(const c8*, c8*, ...);
+#define error_tag "ogl_beamformer_lib:error"
+#define error_msg(...) mexErrMsgIdAndTxt(error_tag, __VA_ARGS__)
+#define warning_msg(...) mexWarnMsgIdAndTxt(error_tag, __VA_ARGS__)
+#else
+#define error_msg(...)
+#define warning_msg(...)
+#endif
+
#if defined(__unix__)
static Pipe
+os_open_named_pipe(char *name)
+{
+ return (Pipe){.file = open(name, O_WRONLY), .name = name};
+}
+
+static Pipe
os_open_read_pipe(char *name)
{
mkfifo(name, 0660);
@@ -63,39 +100,41 @@ os_open_read_pipe(char *name)
}
static void
-os_close_read_pipe(Pipe p)
+os_disconnect_pipe(Pipe p)
{
- close(p.file);
- unlink(p.name);
}
-static b32
-os_read_pipe(Pipe p, void *buf, size read_size)
+static void
+os_close_pipe(Pipe* p)
{
- size r = 0, total_read = 0;
- do {
- if (r != -1)
- total_read += r;
- r = read(p.file, buf + total_read, read_size - total_read);
- } while (r);
- return total_read == read_size;
+ close(p->file);
+ unlink(p->name);
+ p->file = INVALID_FILE;
}
-static Pipe
-os_open_named_pipe(char *name)
+static b32
+os_wait_read_pipe(Pipe p, void *buf, size read_size, u32 timeout_ms)
{
- return (Pipe){.file = open(name, O_WRONLY), .name = name};
+ struct pollfd pfd = {.fd = p.file, .events = POLLIN};
+ size total_read = 0;
+ if (poll(&pfd, 1, timeout_ms) > 0) {
+ size r;
+ do {
+ r = read(p.file, (u8 *)buf + total_read, read_size - total_read);
+ if (r > 0) total_read += r;
+ } while (r != 0);
+ }
+ return total_read == read_size;
}
static size
-os_write_to_pipe(Pipe p, void *data, size len)
+os_write(iptr f, void *data, size data_size)
{
size written = 0, w = 0;
do {
- if (w != -1)
- written += w;
- w = write(p.file, data + written, len - written);
- } while(written != len && w != 0);
+ w = write(f, (u8 *)data + written, data_size - written);
+ if (w != -1) written += w;
+ } while (written != data_size && w != 0);
return written;
}
@@ -116,43 +155,79 @@ os_open_shared_memory_area(char *name)
return new;
}
+static void
+os_release_shared_memory(iptr memory, u64 size)
+{
+ munmap((void *)memory, size);
+}
+
#elif defined(_WIN32)
static Pipe
+os_open_named_pipe(char *name)
+{
+ iptr pipe = CreateFileA(name, GENERIC_WRITE, 0, 0, OPEN_EXISTING, 0, 0);
+ return (Pipe){.file = pipe, .name = name};
+}
+
+static Pipe
os_open_read_pipe(char *name)
{
- iptr file = CreateNamedPipeA(name, PIPE_ACCESS_INBOUND, PIPE_TYPE_BYTE, 1,
+ iptr file = CreateNamedPipeA(name, PIPE_ACCESS_INBOUND, PIPE_TYPE_BYTE|PIPE_NOWAIT, 1,
0, 1024UL * 1024UL, 0, 0);
return (Pipe){.file = file, .name = name};
}
static void
-os_close_read_pipe(Pipe p)
+os_disconnect_pipe(Pipe p)
{
- CloseHandle(p.file);
+ DisconnectNamedPipe(p.file);
}
-static b32
-os_read_pipe(Pipe p, void *buf, size read_size)
+static void
+os_close_pipe(Pipe *p)
{
- i32 total_read = 0;
- ReadFile(p.file, buf, read_size, &total_read, 0);
- return total_read == read_size;
+ CloseHandle(p->file);
+ p->file = INVALID_FILE;
}
-static Pipe
-os_open_named_pipe(char *name)
+static b32
+os_wait_read_pipe(Pipe p, void *buf, size read_size, u32 timeout_ms)
{
- iptr pipe = CreateFileA(name, GENERIC_WRITE, 0, 0, OPEN_EXISTING, 0, 0);
- return (Pipe){.file = pipe, .name = name};
+ size elapsed_ms = 0, total_read = 0;
+ while (elapsed_ms <= timeout_ms && read_size != total_read) {
+ u8 data;
+ i32 read;
+ b32 result = ReadFile(p.file, &data, 0, &read, 0);
+ if (!result) {
+ i32 error = GetLastError();
+ if (error != ERROR_NO_DATA &&
+ error != ERROR_PIPE_LISTENING &&
+ error != ERROR_PIPE_NOT_CONNECTED)
+ {
+ /* NOTE: pipe is in a bad state; we will never read anything */
+ break;
+ }
+ Sleep(PIPE_RETRY_PERIOD_MS);
+ elapsed_ms += PIPE_RETRY_PERIOD_MS;
+ } else {
+ ReadFile(p.file, (u8 *)buf + total_read, read_size - total_read, &read, 0);
+ total_read += read;
+ }
+ }
+ return total_read == read_size;
}
static size
-os_write_to_pipe(Pipe p, void *data, size len)
+os_write(iptr f, void *data, size data_size)
{
- i32 bytes_written;
- WriteFile(p.file, data, len, &bytes_written, 0);
- return bytes_written;
+ i32 written = 0;
+ b32 result = WriteFile(f, (u8 *)data + total_written, data_size - total_written, &written, 0);
+ if (!result) {
+ i32 error = GetLastError();
+ warning_msg("os_write(data_size = %td): error: %d", data_size, error);
+ }
+ return written;
}
static BeamformerParametersFull *
@@ -169,19 +244,13 @@ os_open_shared_memory_area(char *name)
return new;
}
-#endif
-#if defined(MATLAB_CONSOLE)
-#define mexErrMsgIdAndTxt mexErrMsgIdAndTxt_800
-#define mexWarnMsgIdAndTxt mexWarnMsgIdAndTxt_800
-void mexErrMsgIdAndTxt(const c8 *, c8 *, ...);
-void mexWarnMsgIdAndTxt(const c8 *, c8 *, ...);
-#define error_tag "ogl_beamformer_lib:error"
-#define error_msg(...) mexErrMsgIdAndTxt(error_tag, __VA_ARGS__)
-#define warning_msg(...) mexWarnMsgIdAndTxt(error_tag, __VA_ARGS__)
-#else
-#define error_msg(...)
-#define warning_msg(...)
+static void
+os_release_shared_memory(iptr memory, u64 size)
+{
+ UnmapViewOfFile(memory);
+}
+
#endif
static b32
@@ -232,26 +301,40 @@ set_beamformer_pipeline(char *shm_name, i32 *stages, i32 stages_count)
b32
send_data(char *pipe_name, char *shm_name, i16 *data, uv2 data_dim)
{
+ b32 result = 0;
+
if (g_pipe.file == INVALID_FILE) {
g_pipe = os_open_named_pipe(pipe_name);
- if (g_pipe.file == INVALID_FILE) {
+ result = g_pipe.file != INVALID_FILE;
+ if (!result)
error_msg("failed to open pipe");
- return 0;
+ }
+ result &= !check_shared_memory(shm_name);
+
+ if (result) {
+ g_bp->raw.rf_raw_dim = data_dim;
+ g_bp->upload = 1;
+
+ size data_size = data_dim.x * data_dim.y * sizeof(i16);
+ size written = os_write(g_pipe.file, data, data_size);
+ result = written == data_size;
+ if (!result) {
+ warning_msg("failed to write data to pipe: retrying...");
+ os_close_pipe(&g_pipe);
+ os_release_shared_memory((iptr)g_bp, sizeof(*g_bp));
+ g_bp = 0;
+ g_pipe = os_open_named_pipe(pipe_name);
+ result = g_pipe.file != INVALID_FILE && check_shared_memory(shm_name);
+ if (result)
+ written = os_write(g_pipe.file, data, data_size);
+ result = written == data_size;
+ if (!result)
+ warning_msg("failed again, wrote %ld/%ld\ngiving up",
+ written, data_size);
}
}
- if (!check_shared_memory(shm_name))
- return 0;
-
- /* TODO: this probably needs a mutex around it if we want to change it here */
- g_bp->raw.rf_raw_dim = data_dim;
- size data_size = data_dim.x * data_dim.y * sizeof(i16);
- size written = os_write_to_pipe(g_pipe, data, data_size);
- if (written != data_size)
- warning_msg("failed to write full data to pipe: wrote: %ld", written);
- g_bp->upload = 1;
-
- return 1;
+ return result;
}
b32
@@ -270,7 +353,7 @@ set_beamformer_parameters(char *shm_name, BeamformerParameters *new_bp)
void
beamform_data_synchronized(char *pipe_name, char *shm_name, i16 *data, uv2 data_dim,
- uv4 output_points, f32 *out_data)
+ uv4 output_points, f32 *out_data, i32 timeout_ms)
{
if (!check_shared_memory(shm_name))
return;
@@ -280,20 +363,6 @@ beamform_data_synchronized(char *pipe_name, char *shm_name, i16 *data, uv2 data_
if (output_points.z == 0) output_points.z = 1;
output_points.w = 1;
- Pipe pipe = os_open_read_pipe(OS_EXPORT_PIPE_NAME);
- if (pipe.file == INVALID_FILE) {
- error_msg("failed to open export pipe");
- return;
- }
-
- if (g_pipe.file == INVALID_FILE) {
- g_pipe = os_open_named_pipe(pipe_name);
- if (g_pipe.file == INVALID_FILE) {
- error_msg("failed to open data pipe");
- return;
- }
- }
-
g_bp->raw.rf_raw_dim = data_dim;
g_bp->raw.output_points.x = output_points.x;
g_bp->raw.output_points.y = output_points.y;
@@ -306,23 +375,26 @@ beamform_data_synchronized(char *pipe_name, char *shm_name, i16 *data, uv2 data_
return;
}
+ Pipe export_pipe = os_open_read_pipe(OS_EXPORT_PIPE_NAME);
+ if (export_pipe.file == INVALID_FILE) {
+ error_msg("failed to open export pipe");
+ return;
+ }
+
for (u32 i = 0; i < export_name.len; i++)
g_bp->export_pipe_name[i] = export_name.data[i];
g_bp->upload = 1;
- size data_size = data_dim.x * data_dim.y * sizeof(i16);
- size written = os_write_to_pipe(g_pipe, data, data_size);
- if (written != data_size) {
- /* error */
- error_msg("failed to write full data to pipe: wrote: %ld", written);
- return;
+ b32 result = send_data(pipe_name, shm_name, data, data_dim);
+ if (result) {
+ size output_size = output_points.x * output_points.y * output_points.z * sizeof(f32) * 2;
+ result = os_wait_read_pipe(export_pipe, out_data, output_size, timeout_ms);
+ if (!result)
+ warning_msg("failed to read full export data from pipe");
}
- size output_size = output_points.x * output_points.y * output_points.z * 2 * sizeof(f32);
- b32 success = os_read_pipe(pipe, out_data, output_size);
- os_close_read_pipe(pipe);
-
- if (!success)
- warning_msg("failed to read full export data from pipe\n");
+ os_disconnect_pipe(export_pipe);
+ os_close_pipe(&export_pipe);
+ os_close_pipe(&g_pipe);
}
diff --git a/helpers/ogl_beamformer_lib.h b/helpers/ogl_beamformer_lib.h
@@ -41,4 +41,5 @@ LIB_FN b32 send_data(char *pipe_name, char *shm_name, i16 *data, uv2 data_dim);
* out_data: must be allocated by the caller as 2 f32s per output point. */
LIB_FN void beamform_data_synchronized(char *pipe_name, char *shm_name,
i16 *data, uv2 data_dim,
- uv4 output_points, f32 *out_data);
+ uv4 output_points, f32 *out_data,
+ i32 timeout_ms);