Add a poll function to the I/O interface

The Linux implementation is very straighforward and just a lightweight
wrapper around the select function. But the Windows implementation is
much more complex, because the Windows event notification mechanism
behaves very different:

The WaitCommEvent function does not support a timeout and is always a
blocking call. The only way to implement a timeout is to use
asynchronous I/O (or overlapped I/O as it's called in the Windows API),
to run the operation in the background. This requires some additional
book keeping to keep track of the pending background operation.

The event mechanism is also edge triggered instead of level triggered,
and reading the event with the WaitCommEvent function clears the pending
event. Therefore, the state of the input buffer needs to be checked with
the ClearCommError function before and after the WaitCommEvent call.

The check before is necessary in case the event is already cleared by a
previous WaitCommEvent call, while there is still data present in the
input buffer. In this case, WaitCommEvent should not be called at all,
because it would wait until more data arrives.

The check afterwards is necessary in case WaitCommEvent reports a
pending event, while the data in the input buffer has already been
consumed. In this case, the current event must be ignored and
WaitCommEvent needs to be called again, to wait for the next event.
This commit is contained in:
Jef Driesen 2019-09-06 22:11:32 +02:00
parent ef4bd94717
commit f6fa2b84bc
13 changed files with 279 additions and 4 deletions

View File

@ -39,6 +39,7 @@ typedef struct dc_custom_cbs_t {
dc_status_t (*get_lines) (void *userdata, unsigned int *value);
dc_status_t (*get_available) (void *userdata, size_t *value);
dc_status_t (*configure) (void *userdata, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
dc_status_t (*poll) (void *userdata, int timeout);
dc_status_t (*read) (void *userdata, void *data, size_t size, size_t *actual);
dc_status_t (*write) (void *userdata, const void *data, size_t size, size_t *actual);
dc_status_t (*flush) (void *userdata);

View File

@ -211,6 +211,36 @@ dc_iostream_get_available (dc_iostream_t *iostream, size_t *value);
dc_status_t
dc_iostream_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
/**
* Poll the I/O stream for available data.
*
* There are three distinct modes available:
*
* 1. Blocking (timeout < 0):
*
* The poll operation is blocked until one or more bytes have been
* received. If no bytes are received, the operation will block
* forever.
*
* 2. Non-blocking (timeout == 0):
*
* The poll operation returns immediately, even if no bytes have
* been received.
*
* 3. Timeout (timeout > 0):
*
* The poll operation is blocked until one or more bytes have been
* received. If no bytes are received within the specified amount of
* time, the operation will return with a timeout.
*
* @param[in] iostream A valid I/O stream.
* @param[in] timeout The timeout in milliseconds.
* @returns #DC_STATUS_SUCCESS on success, #DC_STATUS_TIMEOUT on
* timeout, or another #dc_status_t code on failure.
*/
dc_status_t
dc_iostream_poll (dc_iostream_t *iostream, int timeout);
/**
* Read data from the I/O stream.
*

View File

@ -106,6 +106,7 @@ static const dc_iostream_vtable_t dc_bluetooth_vtable = {
NULL, /* get_lines */
dc_socket_get_available, /* get_available */
NULL, /* configure */
dc_socket_poll, /* poll */
dc_socket_read, /* read */
dc_socket_write, /* write */
NULL, /* flush */

View File

@ -35,6 +35,7 @@ static dc_status_t dc_custom_set_rts (dc_iostream_t *abstract, unsigned int valu
static dc_status_t dc_custom_get_lines (dc_iostream_t *abstract, unsigned int *value);
static dc_status_t dc_custom_get_available (dc_iostream_t *abstract, size_t *value);
static dc_status_t dc_custom_configure (dc_iostream_t *abstract, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
static dc_status_t dc_custom_poll (dc_iostream_t *abstract, int timeout);
static dc_status_t dc_custom_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual);
static dc_status_t dc_custom_write (dc_iostream_t *abstract, const void *data, size_t size, size_t *actual);
static dc_status_t dc_custom_flush (dc_iostream_t *abstract);
@ -60,6 +61,7 @@ static const dc_iostream_vtable_t dc_custom_vtable = {
dc_custom_get_lines, /* get_lines */
dc_custom_get_available, /* get_available */
dc_custom_configure, /* configure */
dc_custom_poll, /* poll */
dc_custom_read, /* read */
dc_custom_write, /* write */
dc_custom_flush, /* flush */
@ -181,6 +183,17 @@ dc_custom_configure (dc_iostream_t *abstract, unsigned int baudrate, unsigned in
return custom->callbacks.configure (custom->userdata, baudrate, databits, parity, stopbits, flowcontrol);
}
static dc_status_t
dc_custom_poll (dc_iostream_t *abstract, int timeout)
{
dc_custom_t *custom = (dc_custom_t *) abstract;
if (custom->callbacks.poll == NULL)
return DC_STATUS_SUCCESS;
return custom->callbacks.poll (custom->userdata, timeout);
}
static dc_status_t
dc_custom_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual)
{

View File

@ -57,6 +57,8 @@ struct dc_iostream_vtable_t {
dc_status_t (*configure) (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
dc_status_t (*poll) (dc_iostream_t *iostream, int timeout);
dc_status_t (*read) (dc_iostream_t *iostream, void *data, size_t size, size_t *actual);
dc_status_t (*write) (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual);

View File

@ -183,6 +183,17 @@ dc_iostream_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned
return iostream->vtable->configure (iostream, baudrate, databits, parity, stopbits, flowcontrol);
}
dc_status_t
dc_iostream_poll (dc_iostream_t *iostream, int timeout)
{
if (iostream == NULL || iostream->vtable->poll == NULL)
return DC_STATUS_SUCCESS;
INFO (iostream->context, "Poll: value=%i", timeout);
return iostream->vtable->poll (iostream, timeout);
}
dc_status_t
dc_iostream_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual)
{

View File

@ -98,6 +98,7 @@ static const dc_iostream_vtable_t dc_irda_vtable = {
NULL, /* get_lines */
dc_socket_get_available, /* get_available */
NULL, /* configure */
dc_socket_poll, /* poll */
dc_socket_read, /* read */
dc_socket_write, /* write */
NULL, /* flush */

View File

@ -43,6 +43,7 @@ dc_iostream_set_rts
dc_iostream_get_available
dc_iostream_get_lines
dc_iostream_configure
dc_iostream_poll
dc_iostream_read
dc_iostream_write
dc_iostream_flush

View File

@ -74,6 +74,7 @@ static dc_status_t dc_serial_set_rts (dc_iostream_t *iostream, unsigned int valu
static dc_status_t dc_serial_get_lines (dc_iostream_t *iostream, unsigned int *value);
static dc_status_t dc_serial_get_available (dc_iostream_t *iostream, size_t *value);
static dc_status_t dc_serial_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
static dc_status_t dc_serial_poll (dc_iostream_t *iostream, int timeout);
static dc_status_t dc_serial_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual);
static dc_status_t dc_serial_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual);
static dc_status_t dc_serial_flush (dc_iostream_t *iostream);
@ -123,6 +124,7 @@ static const dc_iostream_vtable_t dc_serial_vtable = {
dc_serial_get_lines, /* get_lines */
dc_serial_get_available, /* get_available */
dc_serial_configure, /* configure */
dc_serial_poll, /* poll */
dc_serial_read, /* read */
dc_serial_write, /* write */
dc_serial_flush, /* flush */
@ -663,6 +665,42 @@ dc_serial_set_latency (dc_iostream_t *abstract, unsigned int milliseconds)
return DC_STATUS_SUCCESS;
}
static dc_status_t
dc_serial_poll (dc_iostream_t *abstract, int timeout)
{
dc_serial_t *device = (dc_serial_t *) abstract;
int rc = 0;
do {
fd_set fds;
FD_ZERO (&fds);
FD_SET (device->fd, &fds);
struct timeval tv, *ptv = NULL;
if (timeout > 0) {
tv.tv_sec = (timeout / 1000);
tv.tv_usec = (timeout % 1000) * 1000;
ptv = &tv;
} else if (timeout == 0) {
tv.tv_sec = 0;
tv.tv_usec = 0;
ptv = &tv;
}
rc = select (device->fd + 1, &fds, NULL, NULL, ptv);
} while (rc < 0 && errno == EINTR);
if (rc < 0) {
int errcode = errno;
SYSERROR (abstract->context, errcode);
return syserror (errcode);
} else if (rc == 0) {
return DC_STATUS_TIMEOUT;
} else {
return DC_STATUS_SUCCESS;
}
}
static dc_status_t
dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual)
{

View File

@ -43,6 +43,7 @@ static dc_status_t dc_serial_set_rts (dc_iostream_t *iostream, unsigned int valu
static dc_status_t dc_serial_get_lines (dc_iostream_t *iostream, unsigned int *value);
static dc_status_t dc_serial_get_available (dc_iostream_t *iostream, size_t *value);
static dc_status_t dc_serial_configure (dc_iostream_t *iostream, unsigned int baudrate, unsigned int databits, dc_parity_t parity, dc_stopbits_t stopbits, dc_flowcontrol_t flowcontrol);
static dc_status_t dc_serial_poll (dc_iostream_t *iostream, int timeout);
static dc_status_t dc_serial_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual);
static dc_status_t dc_serial_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual);
static dc_status_t dc_serial_flush (dc_iostream_t *iostream);
@ -75,6 +76,11 @@ typedef struct dc_serial_t {
*/
DCB dcb;
COMMTIMEOUTS timeouts;
HANDLE hReadWrite, hPoll;
OVERLAPPED overlapped;
DWORD events;
BOOL pending;
} dc_serial_t;
static const dc_iterator_vtable_t dc_serial_iterator_vtable = {
@ -93,6 +99,7 @@ static const dc_iostream_vtable_t dc_serial_vtable = {
dc_serial_get_lines, /* get_lines */
dc_serial_get_available, /* get_available */
dc_serial_configure, /* configure */
dc_serial_poll, /* poll */
dc_serial_read, /* read */
dc_serial_write, /* write */
dc_serial_flush, /* flush */
@ -282,18 +289,41 @@ dc_serial_open (dc_iostream_t **out, dc_context_t *context, const char *name)
return DC_STATUS_NOMEMORY;
}
// Default values.
memset(&device->overlapped, 0, sizeof(device->overlapped));
device->events = 0;
device->pending = FALSE;
// Create a manual reset event for I/O.
device->hReadWrite = CreateEvent (NULL, TRUE, FALSE, NULL);
if (device->hReadWrite == INVALID_HANDLE_VALUE) {
DWORD errcode = GetLastError ();
SYSERROR (context, errcode);
status = syserror (errcode);
goto error_free;
}
// Create a manual reset event for polling.
device->hPoll = CreateEvent (NULL, TRUE, FALSE, NULL);
if (device->hPoll == INVALID_HANDLE_VALUE) {
DWORD errcode = GetLastError ();
SYSERROR (context, errcode);
status = syserror (errcode);
goto error_free_readwrite;
}
// Open the device.
device->hFile = CreateFileA (devname,
GENERIC_READ | GENERIC_WRITE, 0,
NULL, // No security attributes.
OPEN_EXISTING,
0, // Non-overlapped I/O.
FILE_FLAG_OVERLAPPED,
NULL);
if (device->hFile == INVALID_HANDLE_VALUE) {
DWORD errcode = GetLastError ();
SYSERROR (context, errcode);
status = syserror (errcode);
goto error_free;
goto error_free_poll;
}
// Retrieve the current communication settings and timeouts,
@ -308,12 +338,24 @@ dc_serial_open (dc_iostream_t **out, dc_context_t *context, const char *name)
goto error_close;
}
// Enable event monitoring.
if (!SetCommMask (device->hFile, EV_RXCHAR)) {
DWORD errcode = GetLastError ();
SYSERROR (context, errcode);
status = syserror (errcode);
goto error_close;
}
*out = (dc_iostream_t *) device;
return DC_STATUS_SUCCESS;
error_close:
CloseHandle (device->hFile);
error_free_poll:
CloseHandle (device->hPoll);
error_free_readwrite:
CloseHandle (device->hReadWrite);
error_free:
dc_iostream_deallocate ((dc_iostream_t *) device);
return status;
@ -325,6 +367,9 @@ dc_serial_close (dc_iostream_t *abstract)
dc_status_t status = DC_STATUS_SUCCESS;
dc_serial_t *device = (dc_serial_t *) abstract;
// Disable event monitoring.
SetCommMask (device->hFile, 0);
// Restore the initial communication settings and timeouts.
if (!SetCommState (device->hFile, &device->dcb) ||
!SetCommTimeouts (device->hFile, &device->timeouts)) {
@ -340,6 +385,9 @@ dc_serial_close (dc_iostream_t *abstract)
dc_status_set_error(&status, syserror (errcode));
}
CloseHandle (device->hPoll);
CloseHandle (device->hReadWrite);
return status;
}
@ -502,6 +550,64 @@ dc_serial_set_latency (dc_iostream_t *abstract, unsigned int value)
return DC_STATUS_SUCCESS;
}
static dc_status_t
dc_serial_poll (dc_iostream_t *abstract, int timeout)
{
dc_serial_t *device = (dc_serial_t *) abstract;
while (1) {
COMSTAT stats;
if (!ClearCommError (device->hFile, NULL, &stats)) {
DWORD errcode = GetLastError ();
SYSERROR (abstract->context, errcode);
return syserror (errcode);
}
if (stats.cbInQue)
break;
if (!device->pending) {
memset(&device->overlapped, 0, sizeof(device->overlapped));
device->overlapped.hEvent = device->hPoll;
device->events = 0;
if (!WaitCommEvent (device->hFile, &device->events, &device->overlapped)) {
DWORD errcode = GetLastError ();
if (errcode != ERROR_IO_PENDING) {
SYSERROR (abstract->context, errcode);
return syserror (errcode);
}
device->pending = TRUE;
}
}
if (device->pending) {
DWORD errcode = 0;
DWORD rc = WaitForSingleObject (device->hPoll, timeout >= 0 ? (DWORD) timeout : INFINITE);
switch (rc) {
case WAIT_OBJECT_0:
break;
case WAIT_TIMEOUT:
return DC_STATUS_TIMEOUT;
default:
errcode = GetLastError ();
SYSERROR (abstract->context, errcode);
return syserror (errcode);
}
}
DWORD dummy = 0;
if (!GetOverlappedResult (device->hFile, &device->overlapped, &dummy, TRUE)) {
DWORD errcode = GetLastError ();
SYSERROR (abstract->context, errcode);
return syserror (errcode);
}
device->pending = FALSE;
}
return DC_STATUS_SUCCESS;
}
static dc_status_t
dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual)
{
@ -509,7 +615,19 @@ dc_serial_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual
dc_serial_t *device = (dc_serial_t *) abstract;
DWORD dwRead = 0;
if (!ReadFile (device->hFile, data, size, &dwRead, NULL)) {
OVERLAPPED overlapped = {0};
overlapped.hEvent = device->hReadWrite;
if (!ReadFile (device->hFile, data, size, NULL, &overlapped)) {
DWORD errcode = GetLastError ();
if (errcode != ERROR_IO_PENDING) {
SYSERROR (abstract->context, errcode);
status = syserror (errcode);
goto out;
}
}
if (!GetOverlappedResult (device->hFile, &overlapped, &dwRead, TRUE)) {
DWORD errcode = GetLastError ();
SYSERROR (abstract->context, errcode);
status = syserror (errcode);
@ -534,7 +652,19 @@ dc_serial_write (dc_iostream_t *abstract, const void *data, size_t size, size_t
dc_serial_t *device = (dc_serial_t *) abstract;
DWORD dwWritten = 0;
if (!WriteFile (device->hFile, data, size, &dwWritten, NULL)) {
OVERLAPPED overlapped = {0};
overlapped.hEvent = device->hReadWrite;
if (!WriteFile (device->hFile, data, size, NULL, &overlapped)) {
DWORD errcode = GetLastError ();
if (errcode != ERROR_IO_PENDING) {
SYSERROR (abstract->context, errcode);
status = syserror (errcode);
goto out;
}
}
if (!GetOverlappedResult (device->hFile, &overlapped, &dwWritten, TRUE)) {
DWORD errcode = GetLastError ();
SYSERROR (abstract->context, errcode);
status = syserror (errcode);

View File

@ -186,6 +186,42 @@ dc_socket_get_available (dc_iostream_t *abstract, size_t *value)
return DC_STATUS_SUCCESS;
}
dc_status_t
dc_socket_poll (dc_iostream_t *abstract, int timeout)
{
dc_socket_t *socket = (dc_socket_t *) abstract;
int rc = 0;
do {
fd_set fds;
FD_ZERO (&fds);
FD_SET (socket->fd, &fds);
struct timeval tv, *ptv = NULL;
if (timeout > 0) {
tv.tv_sec = (timeout / 1000);
tv.tv_usec = (timeout % 1000) * 1000;
ptv = &tv;
} else if (timeout == 0) {
tv.tv_sec = 0;
tv.tv_usec = 0;
ptv = &tv;
}
rc = select (socket->fd + 1, &fds, NULL, NULL, ptv);
} while (rc < 0 && S_ERRNO == S_EINTR);
if (rc < 0) {
s_errcode_t errcode = S_ERRNO;
SYSERROR (abstract->context, errcode);
return dc_socket_syserror(errcode);
} else if (rc == 0) {
return DC_STATUS_TIMEOUT;
} else {
return DC_STATUS_SUCCESS;
}
}
dc_status_t
dc_socket_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual)
{

View File

@ -108,6 +108,9 @@ dc_socket_set_timeout (dc_iostream_t *iostream, int timeout);
dc_status_t
dc_socket_get_available (dc_iostream_t *iostream, size_t *value);
dc_status_t
dc_socket_poll (dc_iostream_t *iostream, int timeout);
dc_status_t
dc_socket_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual);

View File

@ -94,6 +94,7 @@ static dc_status_t dc_usbhid_iterator_next (dc_iterator_t *iterator, void *item)
static dc_status_t dc_usbhid_iterator_free (dc_iterator_t *iterator);
static dc_status_t dc_usbhid_set_timeout (dc_iostream_t *iostream, int timeout);
static dc_status_t dc_usbhid_poll (dc_iostream_t *iostream, int timeout);
static dc_status_t dc_usbhid_read (dc_iostream_t *iostream, void *data, size_t size, size_t *actual);
static dc_status_t dc_usbhid_write (dc_iostream_t *iostream, const void *data, size_t size, size_t *actual);
static dc_status_t dc_usbhid_close (dc_iostream_t *iostream);
@ -144,6 +145,7 @@ static const dc_iostream_vtable_t dc_usbhid_vtable = {
NULL, /* get_lines */
NULL, /* get_available */
NULL, /* configure */
dc_usbhid_poll, /* poll */
dc_usbhid_read, /* read */
dc_usbhid_write, /* write */
NULL, /* flush */
@ -685,6 +687,12 @@ dc_usbhid_set_timeout (dc_iostream_t *abstract, int timeout)
return DC_STATUS_SUCCESS;
}
static dc_status_t
dc_usbhid_poll (dc_iostream_t *abstract, int timeout)
{
return DC_STATUS_UNSUPPORTED;
}
static dc_status_t
dc_usbhid_read (dc_iostream_t *abstract, void *data, size_t size, size_t *actual)
{