From f6fa2b84bc191d558da8b256aaed3412761d3c5a Mon Sep 17 00:00:00 2001 From: Jef Driesen Date: Fri, 6 Sep 2019 22:11:32 +0200 Subject: [PATCH] Add a poll function to the I/O interface The Linux implementation is very straighforward and just a lightweight wrapper around the select function. But the Windows implementation is much more complex, because the Windows event notification mechanism behaves very different: The WaitCommEvent function does not support a timeout and is always a blocking call. The only way to implement a timeout is to use asynchronous I/O (or overlapped I/O as it's called in the Windows API), to run the operation in the background. This requires some additional book keeping to keep track of the pending background operation. The event mechanism is also edge triggered instead of level triggered, and reading the event with the WaitCommEvent function clears the pending event. Therefore, the state of the input buffer needs to be checked with the ClearCommError function before and after the WaitCommEvent call. The check before is necessary in case the event is already cleared by a previous WaitCommEvent call, while there is still data present in the input buffer. In this case, WaitCommEvent should not be called at all, because it would wait until more data arrives. The check afterwards is necessary in case WaitCommEvent reports a pending event, while the data in the input buffer has already been consumed. In this case, the current event must be ignored and WaitCommEvent needs to be called again, to wait for the next event. --- include/libdivecomputer/custom.h | 1 + include/libdivecomputer/iostream.h | 30 +++++++ src/bluetooth.c | 1 + src/custom.c | 13 +++ src/iostream-private.h | 2 + src/iostream.c | 11 +++ src/irda.c | 1 + src/libdivecomputer.symbols | 1 + src/serial_posix.c | 38 ++++++++ src/serial_win32.c | 138 ++++++++++++++++++++++++++++- src/socket.c | 36 ++++++++ src/socket.h | 3 + src/usbhid.c | 8 ++ 13 files changed, 279 insertions(+), 4 deletions(-) diff --git a/include/libdivecomputer/custom.h b/include/libdivecomputer/custom.h index 4e3f9b0..eca3b1c 100644 --- a/include/libdivecomputer/custom.h +++ b/include/libdivecomputer/custom.h @@ -39,6 +39,7 @@ typedef struct dc_custom_cbs_t { dc_status_t (*get_lines) (void *userdata, unsigned int *value); dc_status_t (*get_available) (void *userdata, size_t *value); dc_status_t (*configure) (void *userdata, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); + dc_status_t (*poll) (void *userdata, int timeout); dc_status_t (*read) (void *userdata, void *data, size_t size, size_t *actual); dc_status_t (*write) (void *userdata, const void *data, size_t size, size_t *actual); dc_status_t (*flush) (void *userdata); diff --git a/include/libdivecomputer/iostream.h b/include/libdivecomputer/iostream.h index d7d2621..fb0de9a 100644 --- a/include/libdivecomputer/iostream.h +++ b/include/libdivecomputer/iostream.h @@ -211,6 +211,36 @@ dc_iostream_get_available (dc_iostream_t *iostream, size_t *value); dc_status_t dc_iostream_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); +/** + * Poll the I/O stream for available data. + * + * There are three distinct modes available: + * + * 1. Blocking (timeout < 0): + * + * The poll operation is blocked until one or more bytes have been + * received. If no bytes are received, the operation will block + * forever. + * + * 2. Non-blocking (timeout == 0): + * + * The poll operation returns immediately, even if no bytes have + * been received. + * + * 3. Timeout (timeout > 0): + * + * The poll operation is blocked until one or more bytes have been + * received. If no bytes are received within the specified amount of + * time, the operation will return with a timeout. + * + * @param[in] iostream A valid I/O stream. + * @param[in] timeout The timeout in milliseconds. + * @returns #DC_STATUS_SUCCESS on success, #DC_STATUS_TIMEOUT on + * timeout, or another #dc_status_t code on failure. + */ +dc_status_t +dc_iostream_poll (dc_iostream_t *iostream, int timeout); + /** * Read data from the I/O stream. * diff --git a/src/bluetooth.c b/src/bluetooth.c index 4630523..649d006 100644 --- a/src/bluetooth.c +++ b/src/bluetooth.c @@ -106,6 +106,7 @@ static const dc_iostream_vtable_t dc_bluetooth_vtable = { NULL, /* get_lines */ dc_socket_get_available, /* get_available */ NULL, /* configure */ + dc_socket_poll, /* poll */ dc_socket_read, /* read */ dc_socket_write, /* write */ NULL, /* flush */ diff --git a/src/custom.c b/src/custom.c index 8db276b..2bdd37c 100644 --- a/src/custom.c +++ b/src/custom.c @@ -35,6 +35,7 @@ static dc_status_t dc_custom_set_rts (dc_iostream_t *abstract, unsigned int valu static dc_status_t dc_custom_get_lines (dc_iostream_t *abstract, unsigned int *value); static dc_status_t dc_custom_get_available (dc_iostream_t *abstract, size_t *value); static dc_status_t dc_custom_configure (dc_iostream_t *abstract, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); +static dc_status_t dc_custom_poll (dc_iostream_t *abstract, int timeout); static dc_status_t dc_custom_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual); static dc_status_t dc_custom_write (dc_iostream_t *abstract, const void *data, size_t size, size_t *actual); static dc_status_t dc_custom_flush (dc_iostream_t *abstract); @@ -60,6 +61,7 @@ static const dc_iostream_vtable_t dc_custom_vtable = { dc_custom_get_lines, /* get_lines */ dc_custom_get_available, /* get_available */ dc_custom_configure, /* configure */ + dc_custom_poll, /* poll */ dc_custom_read, /* read */ dc_custom_write, /* write */ dc_custom_flush, /* flush */ @@ -181,6 +183,17 @@ dc_custom_configure (dc_iostream_t *abstract, unsigned int baudrate, unsigned in return custom->callbacks.configure (custom->userdata, baudrate, databits, parity, stopbits, flowcontrol); } +static dc_status_t +dc_custom_poll (dc_iostream_t *abstract, int timeout) +{ + dc_custom_t *custom = (dc_custom_t *) abstract; + + if (custom->callbacks.poll == NULL) + return DC_STATUS_SUCCESS; + + return custom->callbacks.poll (custom->userdata, timeout); +} + static dc_status_t dc_custom_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual) { diff --git a/src/iostream-private.h b/src/iostream-private.h index f9c0883..4ce8529 100644 --- a/src/iostream-private.h +++ b/src/iostream-private.h @@ -57,6 +57,8 @@ struct dc_iostream_vtable_t { dc_status_t (*configure) (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); + dc_status_t (*poll) (dc_iostream_t *iostream, int timeout); + dc_status_t (*read) (dc_iostream_t *iostream, void *data, size_t size, size_t *actual); dc_status_t (*write) (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual); diff --git a/src/iostream.c b/src/iostream.c index e775146..677f744 100644 --- a/src/iostream.c +++ b/src/iostream.c @@ -183,6 +183,17 @@ dc_iostream_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned return iostream->vtable->configure (iostream, baudrate, databits, parity, stopbits, flowcontrol); } +dc_status_t +dc_iostream_poll (dc_iostream_t *iostream, int timeout) +{ + if (iostream == NULL || iostream->vtable->poll == NULL) + return DC_STATUS_SUCCESS; + + INFO (iostream->context, "Poll: value=%i", timeout); + + return iostream->vtable->poll (iostream, timeout); +} + dc_status_t dc_iostream_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual) { diff --git a/src/irda.c b/src/irda.c index a78ff31..5ffc162 100644 --- a/src/irda.c +++ b/src/irda.c @@ -98,6 +98,7 @@ static const dc_iostream_vtable_t dc_irda_vtable = { NULL, /* get_lines */ dc_socket_get_available, /* get_available */ NULL, /* configure */ + dc_socket_poll, /* poll */ dc_socket_read, /* read */ dc_socket_write, /* write */ NULL, /* flush */ diff --git a/src/libdivecomputer.symbols b/src/libdivecomputer.symbols index 04e4196..4195d66 100644 --- a/src/libdivecomputer.symbols +++ b/src/libdivecomputer.symbols @@ -43,6 +43,7 @@ dc_iostream_set_rts dc_iostream_get_available dc_iostream_get_lines dc_iostream_configure +dc_iostream_poll dc_iostream_read dc_iostream_write dc_iostream_flush diff --git a/src/serial_posix.c b/src/serial_posix.c index 46ec3de..08cfdc4 100644 --- a/src/serial_posix.c +++ b/src/serial_posix.c @@ -74,6 +74,7 @@ static dc_status_t dc_serial_set_rts (dc_iostream_t *iostream, unsigned int valu static dc_status_t dc_serial_get_lines (dc_iostream_t *iostream, unsigned int *value); static dc_status_t dc_serial_get_available (dc_iostream_t *iostream, size_t *value); static dc_status_t dc_serial_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); +static dc_status_t dc_serial_poll (dc_iostream_t *iostream, int timeout); static dc_status_t dc_serial_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual); static dc_status_t dc_serial_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual); static dc_status_t dc_serial_flush (dc_iostream_t *iostream); @@ -123,6 +124,7 @@ static const dc_iostream_vtable_t dc_serial_vtable = { dc_serial_get_lines, /* get_lines */ dc_serial_get_available, /* get_available */ dc_serial_configure, /* configure */ + dc_serial_poll, /* poll */ dc_serial_read, /* read */ dc_serial_write, /* write */ dc_serial_flush, /* flush */ @@ -663,6 +665,42 @@ dc_serial_set_latency (dc_iostream_t *abstract, unsigned int milliseconds) return DC_STATUS_SUCCESS; } +static dc_status_t +dc_serial_poll (dc_iostream_t *abstract, int timeout) +{ + dc_serial_t *device = (dc_serial_t *) abstract; + int rc = 0; + + do { + fd_set fds; + FD_ZERO (&fds); + FD_SET (device->fd, &fds); + + struct timeval tv, *ptv = NULL; + if (timeout > 0) { + tv.tv_sec = (timeout / 1000); + tv.tv_usec = (timeout % 1000) * 1000; + ptv = &tv; + } else if (timeout == 0) { + tv.tv_sec = 0; + tv.tv_usec = 0; + ptv = &tv; + } + + rc = select (device->fd + 1, &fds, NULL, NULL, ptv); + } while (rc < 0 && errno == EINTR); + + if (rc < 0) { + int errcode = errno; + SYSERROR (abstract->context, errcode); + return syserror (errcode); + } else if (rc == 0) { + return DC_STATUS_TIMEOUT; + } else { + return DC_STATUS_SUCCESS; + } +} + static dc_status_t dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual) { diff --git a/src/serial_win32.c b/src/serial_win32.c index 124857a..0f8bd18 100644 --- a/src/serial_win32.c +++ b/src/serial_win32.c @@ -43,6 +43,7 @@ static dc_status_t dc_serial_set_rts (dc_iostream_t *iostream, unsigned int valu static dc_status_t dc_serial_get_lines (dc_iostream_t *iostream, unsigned int *value); static dc_status_t dc_serial_get_available (dc_iostream_t *iostream, size_t *value); static dc_status_t dc_serial_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol); +static dc_status_t dc_serial_poll (dc_iostream_t *iostream, int timeout); static dc_status_t dc_serial_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual); static dc_status_t dc_serial_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual); static dc_status_t dc_serial_flush (dc_iostream_t *iostream); @@ -75,6 +76,11 @@ typedef struct dc_serial_t { */ DCB dcb; COMMTIMEOUTS timeouts; + + HANDLE hReadWrite, hPoll; + OVERLAPPED overlapped; + DWORD events; + BOOL pending; } dc_serial_t; static const dc_iterator_vtable_t dc_serial_iterator_vtable = { @@ -93,6 +99,7 @@ static const dc_iostream_vtable_t dc_serial_vtable = { dc_serial_get_lines, /* get_lines */ dc_serial_get_available, /* get_available */ dc_serial_configure, /* configure */ + dc_serial_poll, /* poll */ dc_serial_read, /* read */ dc_serial_write, /* write */ dc_serial_flush, /* flush */ @@ -282,18 +289,41 @@ dc_serial_open (dc_iostream_t **out, dc_context_t *context, const char *name) return DC_STATUS_NOMEMORY; } + // Default values. + memset(&device->overlapped, 0, sizeof(device->overlapped)); + device->events = 0; + device->pending = FALSE; + + // Create a manual reset event for I/O. + device->hReadWrite = CreateEvent (NULL, TRUE, FALSE, NULL); + if (device->hReadWrite == INVALID_HANDLE_VALUE) { + DWORD errcode = GetLastError (); + SYSERROR (context, errcode); + status = syserror (errcode); + goto error_free; + } + + // Create a manual reset event for polling. + device->hPoll = CreateEvent (NULL, TRUE, FALSE, NULL); + if (device->hPoll == INVALID_HANDLE_VALUE) { + DWORD errcode = GetLastError (); + SYSERROR (context, errcode); + status = syserror (errcode); + goto error_free_readwrite; + } + // Open the device. device->hFile = CreateFileA (devname, GENERIC_READ | GENERIC_WRITE, 0, NULL, // No security attributes. OPEN_EXISTING, - 0, // Non-overlapped I/O. + FILE_FLAG_OVERLAPPED, NULL); if (device->hFile == INVALID_HANDLE_VALUE) { DWORD errcode = GetLastError (); SYSERROR (context, errcode); status = syserror (errcode); - goto error_free; + goto error_free_poll; } // Retrieve the current communication settings and timeouts, @@ -308,12 +338,24 @@ dc_serial_open (dc_iostream_t **out, dc_context_t *context, const char *name) goto error_close; } + // Enable event monitoring. + if (!SetCommMask (device->hFile, EV_RXCHAR)) { + DWORD errcode = GetLastError (); + SYSERROR (context, errcode); + status = syserror (errcode); + goto error_close; + } + *out = (dc_iostream_t *) device; return DC_STATUS_SUCCESS; error_close: CloseHandle (device->hFile); +error_free_poll: + CloseHandle (device->hPoll); +error_free_readwrite: + CloseHandle (device->hReadWrite); error_free: dc_iostream_deallocate ((dc_iostream_t *) device); return status; @@ -325,6 +367,9 @@ dc_serial_close (dc_iostream_t *abstract) dc_status_t status = DC_STATUS_SUCCESS; dc_serial_t *device = (dc_serial_t *) abstract; + // Disable event monitoring. + SetCommMask (device->hFile, 0); + // Restore the initial communication settings and timeouts. if (!SetCommState (device->hFile, &device->dcb) || !SetCommTimeouts (device->hFile, &device->timeouts)) { @@ -340,6 +385,9 @@ dc_serial_close (dc_iostream_t *abstract) dc_status_set_error(&status, syserror (errcode)); } + CloseHandle (device->hPoll); + CloseHandle (device->hReadWrite); + return status; } @@ -502,6 +550,64 @@ dc_serial_set_latency (dc_iostream_t *abstract, unsigned int value) return DC_STATUS_SUCCESS; } +static dc_status_t +dc_serial_poll (dc_iostream_t *abstract, int timeout) +{ + dc_serial_t *device = (dc_serial_t *) abstract; + + while (1) { + COMSTAT stats; + if (!ClearCommError (device->hFile, NULL, &stats)) { + DWORD errcode = GetLastError (); + SYSERROR (abstract->context, errcode); + return syserror (errcode); + } + + if (stats.cbInQue) + break; + + if (!device->pending) { + memset(&device->overlapped, 0, sizeof(device->overlapped)); + device->overlapped.hEvent = device->hPoll; + device->events = 0; + if (!WaitCommEvent (device->hFile, &device->events, &device->overlapped)) { + DWORD errcode = GetLastError (); + if (errcode != ERROR_IO_PENDING) { + SYSERROR (abstract->context, errcode); + return syserror (errcode); + } + device->pending = TRUE; + } + } + + if (device->pending) { + DWORD errcode = 0; + DWORD rc = WaitForSingleObject (device->hPoll, timeout >= 0 ? (DWORD) timeout : INFINITE); + switch (rc) { + case WAIT_OBJECT_0: + break; + case WAIT_TIMEOUT: + return DC_STATUS_TIMEOUT; + default: + errcode = GetLastError (); + SYSERROR (abstract->context, errcode); + return syserror (errcode); + } + } + + DWORD dummy = 0; + if (!GetOverlappedResult (device->hFile, &device->overlapped, &dummy, TRUE)) { + DWORD errcode = GetLastError (); + SYSERROR (abstract->context, errcode); + return syserror (errcode); + } + + device->pending = FALSE; + } + + return DC_STATUS_SUCCESS; +} + static dc_status_t dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual) { @@ -509,7 +615,19 @@ dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual dc_serial_t *device = (dc_serial_t *) abstract; DWORD dwRead = 0; - if (!ReadFile (device->hFile, data, size, &dwRead, NULL)) { + OVERLAPPED overlapped = {0}; + overlapped.hEvent = device->hReadWrite; + + if (!ReadFile (device->hFile, data, size, NULL, &overlapped)) { + DWORD errcode = GetLastError (); + if (errcode != ERROR_IO_PENDING) { + SYSERROR (abstract->context, errcode); + status = syserror (errcode); + goto out; + } + } + + if (!GetOverlappedResult (device->hFile, &overlapped, &dwRead, TRUE)) { DWORD errcode = GetLastError (); SYSERROR (abstract->context, errcode); status = syserror (errcode); @@ -534,7 +652,19 @@ dc_serial_write (dc_iostream_t *abstract, const void *data, size_t size, size_t dc_serial_t *device = (dc_serial_t *) abstract; DWORD dwWritten = 0; - if (!WriteFile (device->hFile, data, size, &dwWritten, NULL)) { + OVERLAPPED overlapped = {0}; + overlapped.hEvent = device->hReadWrite; + + if (!WriteFile (device->hFile, data, size, NULL, &overlapped)) { + DWORD errcode = GetLastError (); + if (errcode != ERROR_IO_PENDING) { + SYSERROR (abstract->context, errcode); + status = syserror (errcode); + goto out; + } + } + + if (!GetOverlappedResult (device->hFile, &overlapped, &dwWritten, TRUE)) { DWORD errcode = GetLastError (); SYSERROR (abstract->context, errcode); status = syserror (errcode); diff --git a/src/socket.c b/src/socket.c index cbbb684..3bb8718 100644 --- a/src/socket.c +++ b/src/socket.c @@ -186,6 +186,42 @@ dc_socket_get_available (dc_iostream_t *abstract, size_t *value) return DC_STATUS_SUCCESS; } +dc_status_t +dc_socket_poll (dc_iostream_t *abstract, int timeout) +{ + dc_socket_t *socket = (dc_socket_t *) abstract; + int rc = 0; + + do { + fd_set fds; + FD_ZERO (&fds); + FD_SET (socket->fd, &fds); + + struct timeval tv, *ptv = NULL; + if (timeout > 0) { + tv.tv_sec = (timeout / 1000); + tv.tv_usec = (timeout % 1000) * 1000; + ptv = &tv; + } else if (timeout == 0) { + tv.tv_sec = 0; + tv.tv_usec = 0; + ptv = &tv; + } + + rc = select (socket->fd + 1, &fds, NULL, NULL, ptv); + } while (rc < 0 && S_ERRNO == S_EINTR); + + if (rc < 0) { + s_errcode_t errcode = S_ERRNO; + SYSERROR (abstract->context, errcode); + return dc_socket_syserror(errcode); + } else if (rc == 0) { + return DC_STATUS_TIMEOUT; + } else { + return DC_STATUS_SUCCESS; + } +} + dc_status_t dc_socket_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual) { diff --git a/src/socket.h b/src/socket.h index 280c486..455b945 100644 --- a/src/socket.h +++ b/src/socket.h @@ -108,6 +108,9 @@ dc_socket_set_timeout (dc_iostream_t *iostream, int timeout); dc_status_t dc_socket_get_available (dc_iostream_t *iostream, size_t *value); +dc_status_t +dc_socket_poll (dc_iostream_t *iostream, int timeout); + dc_status_t dc_socket_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual); diff --git a/src/usbhid.c b/src/usbhid.c index a9563c7..6fa395a 100644 --- a/src/usbhid.c +++ b/src/usbhid.c @@ -94,6 +94,7 @@ static dc_status_t dc_usbhid_iterator_next (dc_iterator_t *iterator, void *item) static dc_status_t dc_usbhid_iterator_free (dc_iterator_t *iterator); static dc_status_t dc_usbhid_set_timeout (dc_iostream_t *iostream, int timeout); +static dc_status_t dc_usbhid_poll (dc_iostream_t *iostream, int timeout); static dc_status_t dc_usbhid_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual); static dc_status_t dc_usbhid_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual); static dc_status_t dc_usbhid_close (dc_iostream_t *iostream); @@ -144,6 +145,7 @@ static const dc_iostream_vtable_t dc_usbhid_vtable = { NULL, /* get_lines */ NULL, /* get_available */ NULL, /* configure */ + dc_usbhid_poll, /* poll */ dc_usbhid_read, /* read */ dc_usbhid_write, /* write */ NULL, /* flush */ @@ -685,6 +687,12 @@ dc_usbhid_set_timeout (dc_iostream_t *abstract, int timeout) return DC_STATUS_SUCCESS; } +static dc_status_t +dc_usbhid_poll (dc_iostream_t *abstract, int timeout) +{ + return DC_STATUS_UNSUPPORTED; +} + static dc_status_t dc_usbhid_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual) {