From 6444842fed1b12960e99f8409f68bf6cd5ce631e Mon Sep 17 00:00:00 2001 From: Tomasz Szumski Date: Fri, 10 Jan 2025 07:53:27 +0000 Subject: [PATCH 1/4] Add missing unit test file to build --- media-proxy/tests/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/media-proxy/tests/CMakeLists.txt b/media-proxy/tests/CMakeLists.txt index 979927bf..2bdbdb85 100644 --- a/media-proxy/tests/CMakeLists.txt +++ b/media-proxy/tests/CMakeLists.txt @@ -35,6 +35,7 @@ file(GLOB MEDIA_PROXY_TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/libfabric_mocks.cc" "${CMAKE_CURRENT_SOURCE_DIR}/libfabric_mr_tests.cc" "${CMAKE_CURRENT_SOURCE_DIR}/proxy_context_tests.cc" + "${CMAKE_CURRENT_SOURCE_DIR}/st2110_tests.cc" ) # Find source files for RDMA-specific tests From c821eaf4cc235361316ebae9ebad2ff5e1ee9e5d Mon Sep 17 00:00:00 2001 From: Tomasz Szumski Date: Thu, 12 Dec 2024 07:26:15 +0000 Subject: [PATCH 2/4] Create JSON sender and receiver applications --- sdk/samples/CMakeLists.txt | 11 +- sdk/samples/recver_app_json.c | 265 +++++++++++++++++++++++++++++ sdk/samples/sender_app_json.c | 306 ++++++++++++++++++++++++++++++++++ 3 files changed, 581 insertions(+), 1 deletion(-) create mode 100644 sdk/samples/recver_app_json.c create mode 100644 sdk/samples/sender_app_json.c diff --git a/sdk/samples/CMakeLists.txt b/sdk/samples/CMakeLists.txt index 764fadc1..054e7cb8 100644 --- a/sdk/samples/CMakeLists.txt +++ b/sdk/samples/CMakeLists.txt @@ -26,4 +26,13 @@ add_executable(pong_app pong_app.c pingpong_common.c) target_include_directories(pong_app PRIVATE ../include) target_link_libraries(pong_app PRIVATE mcm_dp) -install(TARGETS sender_app recver_app ping_app pong_app RUNTIME COMPONENT Runtime) +add_executable(sender_app_json sender_app_json.c) +target_include_directories(sender_app_json PRIVATE ../include) +target_link_libraries(sender_app_json PRIVATE mcm_dp) + +add_executable(recver_app_json recver_app_json.c) +target_include_directories(recver_app_json PRIVATE ../include) +target_link_libraries(recver_app_json PRIVATE mcm_dp) + + +install(TARGETS sender_app recver_app ping_app pong_app sender_app_json recver_app_json RUNTIME COMPONENT Runtime) diff --git a/sdk/samples/recver_app_json.c b/sdk/samples/recver_app_json.c new file mode 100644 index 00000000..25d823f2 --- /dev/null +++ b/sdk/samples/recver_app_json.c @@ -0,0 +1,265 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include "mesh_dp.h" + +#define RECV_LOCAL_FILE "recv.yuv" +#define RECV_JSON_FILE "recv.json" + +static volatile bool keepRunning = true; + +void intHandler(int dummy) { keepRunning = 0; } + +size_t get_file_size(FILE *f) { + if (f == NULL) { + return 0; + } + long prev = ftell(f); + if (prev < 0) { + return 0; + } + if (fseek(f, (long)0, (long)SEEK_END) != 0) { + (void)fseek(f, prev, (long)SEEK_SET); + return 0; + } else { + long size = ftell(f); + (void)fseek(f, prev, (long)SEEK_SET); + if (size < 0) { + return 0; + } + return (size_t)size; + } +} + +/* print a description of all supported options */ +void usage(FILE *fp, const char *path) { + /* take only the last portion of the path */ + const char *basename = strrchr(path, '/'); + basename = basename ? basename + 1 : path; + + fprintf(fp, "Usage: %s [OPTION]\n", basename); + fprintf(fp, "-H, --help\t\t\t" + "Print this help and exit\n"); + fprintf(fp, + "-j, --json=file_name\t" + "JSON file with receiver configuration(example: %s)\n", + RECV_JSON_FILE); + fprintf(fp, + "-o, --outputfile=file_name\t" + "Save stream to local file (example: %s)\n", + RECV_LOCAL_FILE); + fprintf(fp, "\n"); +} + +int main(int argc, char **argv) { + char output_filename[128] = ""; + FILE *outputfile = NULL; + + char json_filename[128] = ""; + FILE *jsonfile = NULL; + char *json_config = NULL; + + MeshClient *client = NULL; + MeshConnection *conn = NULL; + MeshBuffer *buf = NULL; + int err; + + int help_flag = 0; + int opt; + struct option longopts[] = {{"help", no_argument, &help_flag, 'H'}, + {"outputfile", required_argument, NULL, 'o'}, + {"json", required_argument, NULL, 'j'}, + {0}}; + + /* infinite loop, to be broken when we are done parsing options */ + while (1) { + opt = getopt_long(argc, argv, "H:o:j:", longopts, 0); + if (opt == -1) { + break; + } + + switch (opt) { + case 'H': + help_flag = 1; + break; + case 'o': + strlcpy(output_filename, optarg, sizeof(output_filename)); + break; + case 'j': + strlcpy(json_filename, optarg, sizeof(json_filename)); + break; + case '?': + usage(stderr, argv[0]); + return 1; + default: + break; + } + } + + if (help_flag) { + usage(stdout, argv[0]); + return 0; + } + + jsonfile = fopen(json_filename, "rb"); + if (!jsonfile) { + fprintf(stderr, "Invalid json file \n"); + err = -1; + goto fail; + } + + const size_t json_filesize = get_file_size(jsonfile); + if (json_filesize == 0) { + fprintf(stderr, "Json file empty \n"); + err = -1; + goto fail; + } + + json_config = calloc(1, json_filesize + 1); + if (!json_config) { + fprintf(stderr, "Failed to allocate memory for json config \n"); + err = -1; + goto fail; + } + + if (fread(json_config, 1, json_filesize, jsonfile) != json_filesize) { + fprintf(stderr, "Failed to read json file \n"); + err = -1; + goto fail; + } + + if (strlen(output_filename) > 0) { + outputfile = fopen(output_filename, "wb"); + if (!outputfile) { + fprintf(stderr, "Cannot create output file \n"); + err = -1; + goto fail; + } + } + + err = mesh_create_client_json(&client, json_config); + if (err) { + printf("Failed to create a mesh client: %s (%d)\n", mesh_err2str(err), err); + goto fail; + } + + err = mesh_create_rx_connection(client, &conn, json_config); + if (err) { + printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); + goto fail; + } + + signal(SIGINT, intHandler); + + uint32_t frame_count = 0; + uint32_t frm_size = conn->buf_size; + + const uint32_t stat_interval = 10; + double fps = 0.0; + double throughput_MB = 0; + double stat_period_s = 0; + void *ptr = NULL; + int timeout = MESH_TIMEOUT_INFINITE; + bool first_frame = true; + float latency = 0; + struct timespec ts_recv = {}, ts_send = {}; + struct timespec ts_begin = {}, ts_end = {}; + + while (keepRunning) { + /* receive frame */ + if (first_frame) { + /* infinite for the 1st frame. */ + timeout = MESH_TIMEOUT_INFINITE; + } else { + /* 1 second */ + timeout = 1000; + } + + err = mesh_get_buffer_timeout(conn, &buf, timeout); + if (err == -MESH_ERR_CONN_CLOSED) { + printf("Connection closed\n"); + break; + } + if (err) { + printf("Failed to get buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + printf("INFO: buf->len = %ld frame size = %u\n", buf->data_len, frm_size); + + clock_gettime(CLOCK_REALTIME, &ts_recv); + if (first_frame) { + ts_begin = ts_recv; + first_frame = false; + } + + if (outputfile) { + fwrite(buf->data, buf->data_len, 1, outputfile); + } else { + // Following code are mainly for test purpose, it requires the sender side to + // pre-set the first several bytes + ptr = buf->data; + if (*(uint32_t *)ptr != frame_count) { + printf("Wrong data content: expected %u, got %u\n", frame_count, *(uint32_t *)ptr); + /* catch up the sender frame count */ + frame_count = *(uint32_t *)ptr; + } + ptr += sizeof(frame_count); + ts_send = *(struct timespec *)ptr; + + latency = 1000.0 * (ts_recv.tv_sec - ts_send.tv_sec); + latency += (ts_recv.tv_nsec - ts_send.tv_nsec) / 1000000.0; + } + + if (frame_count % stat_interval == 0) { + /* calculate FPS */ + clock_gettime(CLOCK_REALTIME, &ts_end); + + stat_period_s = (ts_end.tv_sec - ts_begin.tv_sec); + stat_period_s += (ts_end.tv_nsec - ts_begin.tv_nsec) / 1e9; + fps = stat_interval / stat_period_s; + throughput_MB = fps * frm_size / 1000000; + + clock_gettime(CLOCK_REALTIME, &ts_begin); + } + printf("RX frames: [%u], latency: %0.1f ms, FPS: %0.3f\n", frame_count, latency, fps); + printf("Throughput: %.2lf MB/s, %.2lf Gb/s \n", throughput_MB, throughput_MB * 8 / 1000); + + frame_count++; + + err = mesh_put_buffer(&buf); + if (err) { + printf("Failed to put buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + printf("\n"); + } + +fail: + if (conn) { + mesh_delete_connection(&conn); + } + if (client) { + mesh_delete_client(&client); + } + if (json_config) { + free(json_config); + } + if (jsonfile) { + fclose(jsonfile); + } + if (outputfile) { + fclose(outputfile); + } + return err; +} diff --git a/sdk/samples/sender_app_json.c b/sdk/samples/sender_app_json.c new file mode 100644 index 00000000..72067953 --- /dev/null +++ b/sdk/samples/sender_app_json.c @@ -0,0 +1,306 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 Intel Corporation + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "mesh_dp.h" + +#define SENDER_LOCAL_FILE "sender.yuv" +#define SENDER_JSON_FILE "sender.json" + +static volatile bool keepRunning = true; + +void intHandler(int dummy) { keepRunning = 0; } + +size_t get_file_size(FILE *f) { + if (f == NULL) { + return 0; + } + long prev = ftell(f); + if (prev < 0) { + return 0; + } + if (fseek(f, (long)0, (long)SEEK_END) != 0) { + (void)fseek(f, prev, (long)SEEK_SET); + return 0; + } else { + long size = ftell(f); + (void)fseek(f, prev, (long)SEEK_SET); + if (size < 0) { + return 0; + } + return (size_t)size; + } +} + +/* print a description of all supported options */ +void usage(FILE *fp, const char *path) { + /* take only the last portion of the path */ + const char *basename = strrchr(path, '/'); + basename = basename ? basename + 1 : path; + + fprintf(fp, "Usage: %s [OPTION]\n", basename); + fprintf(fp, "-H, --help\t\t\t" + "Print this help and exit\n"); + fprintf(fp, + "-j, --json=file_name\t" + "JSON file with sender configuration(example: %s)\n", + SENDER_JSON_FILE); + fprintf(fp, + "-i, --iputfile=file_name\t" + "Input file to send (example: %s)\n", + SENDER_LOCAL_FILE); + fprintf(fp, "-n, --number=frame_number\t" + "Number of fraems to be sent, (default: -1, infinite)\n"); +} + +int read_test_data(FILE *fp, MeshBuffer *buf, uint32_t frame_size) { + int ret = 0; + + assert(fp != NULL && buf != NULL); + assert(buf->data_len >= frame_size); + + if (fread(buf->data, frame_size, 1, fp) < 1) { + ret = -1; + } + return ret; +} + +int gen_test_data(MeshBuffer *buf, uint32_t frame_count) { + /* operate on the buffer */ + void *ptr = buf->data; + + /* frame counter */ + *(uint32_t *)ptr = frame_count; + ptr += sizeof(frame_count); + + /* timestamp */ + clock_gettime(CLOCK_REALTIME, (struct timespec *)ptr); + + return 0; +} + +int main(int argc, char **argv) { + int32_t frame_num = -1; + + char input_filename[128] = ""; + FILE *inputfile = NULL; + + char json_filename[128] = ""; + FILE *jsonfile = NULL; + char *json_config = NULL; + + MeshClient *client = NULL; + MeshConnection *conn = NULL; + MeshBuffer *buf = NULL; + + int err; + + int help_flag = 0; + int opt; + struct option longopts[] = {{"help", no_argument, &help_flag, 'H'}, + {"iputfile", optional_argument, NULL, 'i'}, + {"json", required_argument, NULL, 'j'}, + {"number", optional_argument, NULL, 'n'}, + {0}}; + + /* infinite loop, to be broken when we are done parsing options */ + while (1) { + opt = getopt_long(argc, argv, "H:i:j:n:", longopts, 0); + if (opt == -1) { + break; + } + + switch (opt) { + case 'H': + help_flag = 1; + break; + case 'i': + strlcpy(input_filename, optarg, sizeof(input_filename)); + break; + case 'j': + strlcpy(json_filename, optarg, sizeof(json_filename)); + break; + case 'n': + frame_num = atoi(optarg); + break; + case '?': + usage(stderr, argv[0]); + return 1; + default: + break; + } + } + + if (help_flag) { + usage(stdout, argv[0]); + return 0; + } + + jsonfile = fopen(json_filename, "rb"); + if (!jsonfile) { + fprintf(stderr, "Invalid json file \n"); + err = -1; + goto fail; + } + + const size_t json_filesize = get_file_size(jsonfile); + if (json_filesize == 0) { + fprintf(stderr, "Json file empty \n"); + err = -1; + goto fail; + } + + json_config = calloc(1, json_filesize + 1); + if (!json_config) { + fprintf(stderr, "Failed to allocate memory for json config \n"); + err = -1; + goto fail; + } + + if (fread(json_config, 1, json_filesize, jsonfile) != json_filesize) { + fprintf(stderr, "Failed to read json file \n"); + err = -1; + goto fail; + } + + if (strlen(input_filename) > 0) { + inputfile = fopen(input_filename, "rb"); + if (!inputfile) { + fprintf(stderr, "Cannot open input file \n"); + err = -1; + goto fail; + } + } else { + fprintf(stderr, "Warning: Input file not provided, generating data\n"); + } + + if (frame_num < 0) { + frame_num = -1; + fprintf(stderr, "Warning: Negative frame count provided, sending infinite\n"); + } + + err = mesh_create_client_json(&client, json_config); + if (err) { + printf("Failed to create a mesh client: %s (%d)\n", mesh_err2str(err), err); + goto fail; + } + + err = mesh_create_tx_connection(client, &conn, json_config); + if (err) { + printf("Failed to create a mesh connection: %s (%d)\n", mesh_err2str(err), err); + goto fail; + } + + uint32_t frame_size = conn->buf_size; + + signal(SIGINT, intHandler); + + uint32_t frames_processed = 0; + const uint32_t stat_interval = 10; + double fps = 0.0; + double throughput_MB = 0; + double stat_period_s = 0; + struct timespec ts_begin = {}, ts_end = {}; + struct timespec ts_frame_begin = {}, ts_frame_end = {}; + + while (keepRunning) { + /* Timestamp for frame start. */ + clock_gettime(CLOCK_REALTIME, &ts_frame_begin); + + err = mesh_get_buffer(conn, &buf); + if (err) { + printf("Failed to get buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + printf("INFO: frame_size = %u\n", frame_size); + + if (inputfile == NULL) { + gen_test_data(buf, frames_processed); + } else { + // If we reach the end of the file + if (read_test_data(inputfile, buf, frame_size) < 0) { + // If we loop over the file + if (frame_num == -1) { + // reset file + fseek(inputfile, 0, SEEK_SET); + // read again, if fail, then file does not hold enought data + if (read_test_data(inputfile, buf, frame_size) < 0) { + break; + } + } else { + // if we do not loop over, then we are done + break; + } + } + } + + err = mesh_put_buffer(&buf); + if (err) { + printf("Failed to put buffer: %s (%d)\n", mesh_err2str(err), err); + break; + } + + if (frames_processed % stat_interval == 0) { + /* calculate FPS */ + clock_gettime(CLOCK_REALTIME, &ts_end); + + stat_period_s = (ts_end.tv_sec - ts_begin.tv_sec); + stat_period_s += (ts_end.tv_nsec - ts_begin.tv_nsec) / 1e9; + fps = stat_interval / stat_period_s; + throughput_MB = fps * frame_size / 1000000; + + clock_gettime(CLOCK_REALTIME, &ts_begin); + } + + printf("TX frames: [%d], FPS: %0.2f \n", frames_processed, fps); + printf("Throughput: %.2lf MB/s, %.2lf Gb/s \n", throughput_MB, throughput_MB * 8 / 1000); + + frames_processed++; + + if (frame_num > 0 && frames_processed >= frame_num) { + break; + } + + /* Timestamp for frame end. */ + clock_gettime(CLOCK_REALTIME, &ts_frame_end); + + /* sleep for 1/fps */ + __useconds_t spend = 1000000 * (ts_frame_end.tv_sec - ts_frame_begin.tv_sec) + + (ts_frame_end.tv_nsec - ts_frame_begin.tv_nsec) / 1000; + printf("spend: %d\n", spend); + + printf("\n"); + } + + sleep(2); + +fail: + if (conn) { + mesh_delete_connection(&conn); + } + if (client) { + mesh_delete_client(&client); + } + if (json_config) { + free(json_config); + } + if (jsonfile) { + fclose(jsonfile); + } + if (inputfile) { + fclose(inputfile); + } + + return err; +} From 83153a720b6f852c824f6b64e7590cbc5d0f37fa Mon Sep 17 00:00:00 2001 From: Tomasz Szumski Date: Tue, 14 Jan 2025 08:14:19 +0000 Subject: [PATCH 3/4] Remove local file definitions from JSON sender and receiver applications and update usage instructions; add sample JSON configuration files for sender and receiver --- sdk/samples/recver_app_json.c | 7 ++----- sdk/samples/sender_app_json.c | 7 ++----- test_data/recver.json | 23 +++++++++++++++++++++++ test_data/sender.json | 23 +++++++++++++++++++++++ 4 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 test_data/recver.json create mode 100644 test_data/sender.json diff --git a/sdk/samples/recver_app_json.c b/sdk/samples/recver_app_json.c index 25d823f2..9ad70f95 100644 --- a/sdk/samples/recver_app_json.c +++ b/sdk/samples/recver_app_json.c @@ -12,7 +12,6 @@ #include #include "mesh_dp.h" -#define RECV_LOCAL_FILE "recv.yuv" #define RECV_JSON_FILE "recv.json" static volatile bool keepRunning = true; @@ -53,10 +52,8 @@ void usage(FILE *fp, const char *path) { "-j, --json=file_name\t" "JSON file with receiver configuration(example: %s)\n", RECV_JSON_FILE); - fprintf(fp, - "-o, --outputfile=file_name\t" - "Save stream to local file (example: %s)\n", - RECV_LOCAL_FILE); + fprintf(fp, "-o, --outputfile=file_name\t" + "Save stream to local file (example: data.yuv)\n"); fprintf(fp, "\n"); } diff --git a/sdk/samples/sender_app_json.c b/sdk/samples/sender_app_json.c index 72067953..a9fcc37d 100644 --- a/sdk/samples/sender_app_json.c +++ b/sdk/samples/sender_app_json.c @@ -14,7 +14,6 @@ #include #include "mesh_dp.h" -#define SENDER_LOCAL_FILE "sender.yuv" #define SENDER_JSON_FILE "sender.json" static volatile bool keepRunning = true; @@ -55,10 +54,8 @@ void usage(FILE *fp, const char *path) { "-j, --json=file_name\t" "JSON file with sender configuration(example: %s)\n", SENDER_JSON_FILE); - fprintf(fp, - "-i, --iputfile=file_name\t" - "Input file to send (example: %s)\n", - SENDER_LOCAL_FILE); + fprintf(fp, "-i, --iputfile=file_name\t" + "Input file to send (example: data.yuv)\n"); fprintf(fp, "-n, --number=frame_number\t" "Number of fraems to be sent, (default: -1, infinite)\n"); } diff --git a/test_data/recver.json b/test_data/recver.json new file mode 100644 index 00000000..e9f1531c --- /dev/null +++ b/test_data/recver.json @@ -0,0 +1,23 @@ +{ + "apiVersion": "v1", + "apiConnectionString": "Server=127.0.0.1; Port=8003", + "apiDefaultTimeoutMicroseconds": 100000, + "maxMediaConnections": 32, + "connection": { + "st2110": { + "transport": "st2110-20", + "remoteIpAddr": "192.168.96.10", + "remotePort": 9002, + "pacing": "narrow", + "payloadType": 112 + } + }, + "payload": { + "video": { + "width": 1920, + "height": 1080, + "fps": 30, + "pixelFormat": "yuv422p10le" + } + } +} diff --git a/test_data/sender.json b/test_data/sender.json new file mode 100644 index 00000000..4eb25cc7 --- /dev/null +++ b/test_data/sender.json @@ -0,0 +1,23 @@ +{ + "apiVersion": "v1", + "apiConnectionString": "Server=127.0.0.1; Port=8002", + "apiDefaultTimeoutMicroseconds": 100000, + "maxMediaConnections": 32, + "connection": { + "st2110": { + "transport": "st2110-20", + "remoteIpAddr": "192.168.96.11", + "remotePort": 9002, + "pacing": "narrow", + "payloadType": 112 + } + }, + "payload": { + "video": { + "width": 1920, + "height": 1080, + "fps": 30, + "pixelFormat": "yuv422p10le" + } + } +} From 4d9f7aba0179fb204f8d6cd54644c1ea27af0385 Mon Sep 17 00:00:00 2001 From: Tomasz Szumski Date: Tue, 14 Jan 2025 10:07:17 +0000 Subject: [PATCH 4/4] Rename JSON receiver file from recv.json to recver.json --- sdk/samples/recver_app_json.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/samples/recver_app_json.c b/sdk/samples/recver_app_json.c index 9ad70f95..00b9d4e4 100644 --- a/sdk/samples/recver_app_json.c +++ b/sdk/samples/recver_app_json.c @@ -12,7 +12,7 @@ #include #include "mesh_dp.h" -#define RECV_JSON_FILE "recv.json" +#define RECV_JSON_FILE "recver.json" static volatile bool keepRunning = true;