PQ

NAME
SYNOPSIS
DESCRIPTION
EXAMPLES
DIAGNOSTICS
SEE ALSO
BUGS AND RESTRICTIONS

NAME

pq, pq_create, pq_open, pq_close, pq_insert, pqe_new, pqe_discard, pqe_insert, pq_cset, pq_ctimestamp, pq_sequence, pq_seqdel, pq_pagesize, pq_higwater, pq_suspend - LDM product queue inteface

SYNOPSIS

#include "pq.h"

int pq_create(const char *path, mode_t mode, int pflags, size_t align, off_t initialsz, size_t nproducts, pqueue **pqp);

int pq_open(const char *path, int pflags, pqueue **pqp);

int pq_close(pqueue *pq);

int pq_insert(pqueue *pq, const product *prod);

void pq_cset(pqueue *pq, const struct timeval *tvp);

void pq_ctimestamp(const pqueue *pq, struct timeval *tvp);

int pq_sequence(pqueue *pq, pq_match mt, const prod_class_t *class, pq_seqfunc *ifMatch, void *otherargs);

int pq_seqdel(pqueue *pq, pq_match mt, const prod_class_t *clss, size_t *extentp, time_t *timestampp);

int pq_pagesize(const pqueue *pq);

int pq_highwater(pqueue *pq, off_t *highwaterp, size_t *maxproductsp);

int pq_suspend(unsigned int maxsleep);

int pq_get_write_count(const char* path, unsigned* count);

int pq_clear_write_count(const char* path);

DESCRIPTION

The Product Queue interface, PQ, provides access to a persistent and sharable queue of LDM data products. This implementation is a database, a collection of LDM products keyed by the time a product was inserted in the database, by the product offset in the queue, by the product signature (to ensure uniqueness), and by the product size (for efficiently recycling memory).

By "shared" we mean the queue may be accessed by multiple processes. Contention control is handled below the level of this interface by use of POSIX fnctl() file region locking.

By "persistent" we mean the queue outlasts any particular process which might access it. The queue appears as an object in the file system. Products in the queue will not disappear due to a program or system crash.

The library maintains a cursor value for each queue and process which is simply a timestamp (struct timeval). This is used by the scanning functions pq_sequence() and pq_seqdel(). The value of the cursor is set or changed by calls to pq_cset(), pq_sequence(), or pq_seqdel(). It may be queried by a call to pq_ctimestamp().

The ldm product "signature" is used by the package to detect duplicate products. Attempts to insert products whose signature is already in the queue will fail.

Requests for storage when the queue is full will cause the library to delete the oldest products, combining adjacent deleted products when possible, until enough space is available.

All of the functions return ENOERR (0) if successful. If a system call made in the library fails, causing failure of one of these functions, the system errno value is returned. Most internal errors are mapped into system errors as well, particularly EINVAL for invalid argument and EIO for xdr encoding problems. The ulog(3) package is used for error reporting below the interface, logging descriptive information about problems at the point where they occur.

Routines

#include "pq.h"
cc -I/usr/local/ldm/include ... -L/usr/local/ldm/lib -lldm

int pq_create(const char *path, mode_t mode, int pflags, size_t align, off_t initialsz, size_t nproducts, pqueue **pqp);

Creates a new product queue and sets *pqp to reference it.

path is the pathname of the new product queue.

As the third parameter to the open() system call, the access permission bits of the file mode are set to the value of mode.

The pflags parameter may contain the bitwise OR of flag bits which alter the default behavior. These are defined in pq.h. When PQ_NOCLOBBER is set, pqcreate will return EEXIST when attempting to create a file which already exists. The default behavior is to silently overwrite any existing file. When PQ_READONLY is set, the file is opened read-only; the default is read-write. The PQ_NOMAP flag causes the library to use an i/o strategy which uses read(), write() and malloc(). The PQ_MAPRGNS flag causes the library to use an i/o strategy which uses mmap(), but maps regions as as they are needed and unmaps them when no longer needed. The default strategy is to map the whole file. Only one of PQ_NOMAP and PQ_MAPRGNS should be specified. When PQ_NOLOCK is set, locking is disabled. When PQ_PRIVATE is set and mmap() is being used, the mapping is MAP_PRIVATE instead of the default MAP_SHARED.

The align parameter sets the access alignment. Requests for storage in the queue are rounded up to a multiple of this value. A value of zero for this allows the library to set a suitable default.

The parameter initialsz is the amount of storage to initially allocate for data in the queue.

The parameter nproducts sets the initial product capacity of the queue. The size of the file created is a function of this and the initialsz parameter.

If the product queue is opened for writing, then the writer-count in the product queue is incremented and the product queue must be closed by pq_close in order to decrement the writer-count.

int pq_open(const char *path, int pflags, pqueue **pqp);

Opens an existing product queue and sets *pqp to reference it.

path is a pathname of the product queue.

The pflags parameter contains flags as described under pqcreate(), except that PQ_NOCLOBBER is meaningless in this context.

If the product queue is opened for writing, then the writer-count in the product queue is incremented and the product queue must be closed by pq_close in order to decrement the writer-count.

This function will not create a new product queue, but will instead return ENOENT when the queue does not exist. If the file exists but is not a product queue, the open fails and EINVAL is returned. If the product queue is inconsistent, then the open fails and PQ_CORRUPT is returned.

int pq_close(pqueue *pq);

Closes an open product queue pq and frees any associated resources.

If the product queue was opened for writing, then the writer-count in the product queue is decremented.

int pq_insert(pqueue *pq, const product *prod);

Inserts the LDM data product prod into the queue and sends SIGCONT to the process group. Calls to this function for products whose signature is already in the queue fail with an error indication of PQ_DUP.

int pqe_new(pqueue *pq, const prod_info *infop, size_t product_size, void **ptrp, pqe_index *indexp);

This function is used when a product is not yet assembled but we wish to allocate storage in the queue for its assembly, such as upon the receipt of a COMINGSOON remote procedure call in the server. It returns storage in *ptrp suitable for placing the data of the product described by infop and product_size. The value of *indexp should be retained for use in committing the product using pqe_insert() or abandoning it using pqe_discard(). Calls to this function for products whose signature is already in the queue fail with an error indication of PQ_DUP.

int pqe_discard(pqueue *pq, pqe_index index);

Abandon construction of a product which was begun using pqe_new(), freeing up any queue resources allocated to it.

int pqe_insert(pqueue *pq, pqe_index index);

Commit (insert) a completed product which was begun using pqe_new(), sending SIGCONT to the process group. The insertion timestamp of the product will be the time of the call to this function, not pqe_new().

void pq_cset(pqueue *pq, const struct timeval *tvp);

Sets the cursor value to *tvp. Cheap to call.

void pq_ctimestamp(const pqueue *pq, struct timeval *tvp);

Retrives the cursor value as *tvp. Cheap to call.

int pq_sequence(pqueue *pq, pq_match mt, const prod_class_t *class, pq_seqfunc *ifMatch, void *otherargs);

Step thru the queue in the direction specified by mt, relative to the current cursor value, and execute the user supplied function ifMatch if the product in next position "matches" the specification defined by clss. The queue cursor value is set to the insertion time of the product sampled. The interface file prod_class.h (included by pq.h) provides a definition of prod_class_t and pq.h provides a prototype for pq_seqfunc. The product class PQ_CLASS_ALL matches everything.

The mt argument controls the sequencing behavior.

If(mt == TV_LT), pq_sequence() will get a product whose queue insertion timestamp is strictly less than the current cursor value. If the cursor is not set, it is set to TV_ENDT early in the call, so that most recently inserted product in queue is tested. If multiple products have the same queue insertion timestamp, the user-supplied function will be applied to each of them on successive calls to pq_sequence().

If(mt == TV_GT), pq_sequence() will get a product whose queue insertion timestamp is strictly greater than the current cursor value. If the cursor is not set, it is set to TV_ZERO early in the call, so that oldest product in queue is tested.

If(mt == TV_EQ), pq_sequence() will get a product whose queue insertion timestamp is equal to the current cursor value. If the cursor is not set, you will fail an assertion and dump core.

If no product is in the inventory which which meets the above spec, pq_sequence() returns PQ_END.

This function waits for locks it needs.

Using this function is easier than it explaining or understanding it. See pqcat.c in the source distribution.

int pq_seqdel(pqueue *pq, pq_match mt, const prod_class_t *clss, size_t *extentp, time_t *timestampp);

Similar to pq_sequence(), except that the action is to delete any matching product. Upon return, *timestampp is set to creation time of the tested product. When deletion occurs, *extentp is set to the amount of storage made available.

This function does not wait on region lock. It may return EAGAIN or EACCESS.

int pq_pagesize(const pqueue *pq);

If pq is NULL, returns the system page size. Otherwise, returns the queue page size, typically the least common multiple of the system page size and the align parameter to pq_create(). This function might be used for optimizations such as the choice of an align value.

int pq_highwater(pqueue *pq, off_t *highwaterp, size_t *maxproductsp);

Upon success, returns the maximum data segment utilization in bytes as *highwaterp and maximum number of products held in the queue as *maxproductsp. These are suitable for use as the initialsz and nproducts parameters to pq_create(), respectively. Since this information is part of the shared queue state, calling this function is more expensive than one would hope.

int pq_suspend(unsigned int maxsleep);

This function is used in conjunction with the scanning function pq_sequence() when it reaches the end of queue. pq_suspend() is called and the program sleeps until either maxsleep() seconds have elapsed or the process receives the signal SIGCONT, presumably signaling that a product has been inserted by some other process.

int pq_get_write_count(const char* path, unsigned* count);

Returns the number of pq_open()s for writing outstanding on an existing product queue, i.e., this function returns the current value of the writer-counter. If a writing process terminates without calling pq_close(), then the actual number will be less than this number. This function opens the product-queue read-only, so if there are no outstanding product-queue writers, then the returned count will be zero.

path is the pathname of the product-queue. count is the memory to receive the number of writers.

On and only on success, this function returns 0 and *count is set to the number of writers. Other return values are EINVAL, which means that path is NULL or count is NULL; ENOSYS, which means that this function is not supported because the product-queue doesn’t have a writer-counter; PQ_CORRUPT, which means the product-queue is internally inconsistent; and any of the <errno.h> error-codes associated with opening and reading from a file.

int pq_clear_write_count(const char* path);

Sets to zero the number of pq_open()s for writing outstanding on the product-queue, i.e., this function clears the writer-counter in the product-queue. This is a dangerous function and should only be used when it is known that there are no outstanding pq_open()s for writing on the product-queue.

path is the pathname of the product-queue.

On and only on success, this function returns 0. Other return-values are EINVAL, which means that path is NULL; PQ_CORRUPT, which means that the product-queue is internally inconsistent; and any of the <errno.h> error-codes associated with opening and reading from a file.

EXAMPLES

There are numerous examples in ldm source code distribution.

DIAGNOSTICS

This package uses the ulog(3) library to print (hopefully) self-explanatory error-messages.

SEE ALSO

ldm(1), open(2), fnctl(2), mmap(2)

BUGS AND RESTRICTIONS

Chaos will result if time runs backwards.

On many systems, it is an error to mmap a file when that file is not on a local file system. In such a case, the package emits a message and reverts to the PQ_NOMAP strategy. Since locking over the network is _extremely_ slow, use of this feature is strongly discouraged.