datatask module

General-purpose data structure and representation format for tasks (stand-alone or part of a larger data workflow) that involve multiple data resources.

class datatask.datatask.datatask(argument: dict)[source]

Bases: dict

Data structure that represents a data task. A data task consists of:

  • an optional collection of data resource name definitions (to allow for use of concise references within the input and output entries),

  • an optional list of input data resources (with optional schemas), and

  • a required list of output data resources (with a schema required for each output).

The resource definitions attribute should be a map of names to individual file paths or URIs.

>>> dt = datatask({
...     "resources": {
...         "abc": "https://examples.org/abc.txt",
...         "xyz": "xyz.txt"
...     },
...     "inputs": {"abc": ["column"]},
...     "outputs": {"xyz": [{"abc": "column"}]}
... })

Any attempt to construct an instance with an invalid collection of resource definitions raises an exception.

>>> dt = {"resources": [], "outputs": {"xyz.txt": []}}
>>> datatask(dt)
Traceback (most recent call last):
  ...
TypeError: resources attribute value must be a dictionary that maps resource name strings to file path or URI strings
>>> datatask({"resources": {"abc": 123}, "outputs": {"xyz.txt": []}})
Traceback (most recent call last):
  ...
TypeError: each resource entry value must be a path or URI string

A valid instance must have at least one input entry or one output entry. Having only input entries is acceptable (a data task may be executed for its side effects), as is having only output entries (a data task may be executed that generates data without referencing any input data).

>>> datatask.from_json({})
Traceback (most recent call last):
  ...
ValueError: at least one input or output must be specified
>>> datatask.from_json({"inputs": {}, "outputs": {}})
Traceback (most recent call last):
  ...
ValueError: at least one input or output must be specified

Input entries are specified using a dictionary that maps each input resource name, path, or URI either directly to its schema (consisting of a list of column names) or to an input specification dictionary that can contain a schema definition (optional) and a boolean value indicating whether a header column is present (also optional).

>>> dt = datatask({
...     "inputs": {"abc.txt": ["a", "b", "c"], "def.csv": {}},
...     "outputs": {"xyz.txt": []}
... })
>>> dt = datatask({
...     "inputs": {
...         "abc.txt": {
...             "schema": ["a", "b", "c"],
...             "header": False
...         },
...         "def.csv": {}
...     },
...     "outputs": {"xyz.txt": []}
... })

Any attempt to construct an instance with an invalid collection of input entries raises an exception.

>>> dt = {"inputs": 123, "outputs": {"xyz.txt": []}}
>>> datatask(dt)
Traceback (most recent call last):
  ...
TypeError: inputs attribute must be a list of resource names, paths, and/or URIs, or a dictionary mapping resource names, paths, and/or URIs to their corresponding specifications or schemas
>>> dt = datatask({"inputs": [123]})
Traceback (most recent call last):
  ...
TypeError: each input resource name, path, or URI must be a string
>>> dt = {"inputs": {123: []}, "outputs": {"xyz.txt": []}}
>>> datatask(dt)
Traceback (most recent call last):
  ...
TypeError: each specified input must be a string corresponding to a defined resource name, a valid path, or a valid URI
>>> datatask({"inputs": {"abc.txt": 123}})
Traceback (most recent call last):
  ...
TypeError: input must be associated with a schema or specification
>>> datatask({"inputs": {"abc.txt": [123]}})
Traceback (most recent call last):
  ...
TypeError: input schema must be a list of strings
>>> dt = datatask({"inputs": {"abc.txt": {"header": 123}}})
Traceback (most recent call last):
  ...
TypeError: input header indicator must be a boolean value
>>> d = {"inputs": {"abc.txt": {"invalid_field": 123}}}
>>> dt = datatask(d)
Traceback (most recent call last):
  ...
ValueError: input specification can only contain a schema definition and/or a header indicator

Output entries are specified using a dictionary that maps each output resource name, path, or URI either directly to its schema or to an output specification dictionary that can contain a header row definition (optional) and a schema definition (required). The schema definition must consist of a list of dictionaries; no other constraints are enforced on the structure of a schema. A recommended approach is to use a single-entry dictionary such as {"abc.csv": "b"} to reference a named column "b" found in an input resource "abc.csv".

>>> dt = datatask({
...     "resources": {
...         "xyz": "xyz.csv"
...     },
...     "inputs": {"abc.csv": ["a", "b", "c"]},
...     "outputs": {"xyz": [{"abc.csv": "c"}, {"abc.csv": "b"}]}
... })

The header row definition must be a list of strings (corresponding to column names).

>>> dt = datatask({
...     "resources": {
...         "xyz": "xyz.csv"
...     },
...     "inputs": {"abc.csv": ["a", "b", "c"]},
...     "outputs": {
...         "xyz": {
...             "schema": [{"z": {"abc.csv": 2}}, {"y": {"abc.csv": 1}}],
...             "header": True
...         }
...     }
... })

Any attempt to construct an instance without with an invalid collection of output entries raises an exception.

>>> datatask.from_json({"outputs": []})
Traceback (most recent call last):
  ...
ValueError: at least one input or output must be specified
>>> dt = datatask({"outputs": [123]})
Traceback (most recent call last):
  ...
TypeError: each output resource name, path, or URI must be a string
>>> datatask.from_json({"outputs": None})
Traceback (most recent call last):
  ...
TypeError: outputs attribute must be a dictionary mapping resource names, paths, and/or URIs to their corresponding specifications or schemas
>>> datatask.from_json({"outputs": {123: []}})
Traceback (most recent call last):
  ...
ValueError: each specified output must be a string corresponding to a defined resource name, a valid path, or a valid URI
>>> datatask.from_json({"outputs": {"xyz.txt": 123}})
Traceback (most recent call last):
  ...
TypeError: output must be associated with a schema or specification
>>> datatask.from_json({"outputs": {"xyz.txt": [1, 2, 3]}})
Traceback (most recent call last):
  ...
TypeError: output schema must be a list of dictionaries
>>> dt = datatask({
...     "outputs": {
...         "xyz.csv": {
...             "schema": [{"abc.csv": 2}, {"abc.csv": 1}],
...             "header": 123
...         }
...     }
... })
Traceback (most recent call last):
  ...
TypeError: output header indicator must be a boolean value
>>> d = {"outputs": {"xyz.csv": {"invalid_field": 123}}}
>>> dt = datatask(d)
Traceback (most recent call last):
  ...
ValueError: output specification can only contain a schema definition and a header definition
static __new__(cls, argument: dict) datatask.datatask.datatask[source]

Create class instance from a dictionary with a compatible structure.

>>> datatask({"inputs": {}, "outputs": {"xyz.txt":[]}})
{'inputs': {}, 'outputs': {'xyz.txt': []}}
static from_json(argument: Union[str, dict]) datatask.datatask.datatask[source]

Parse a dictionary or JSON string into an instance of this class.

>>> datatask.from_json('{"outputs": {"xyz.txt":[]}}')
{'outputs': {'xyz.txt': []}}
>>> datatask.from_json({"inputs": {}, "outputs": {"xyz.txt":[]}})
{'inputs': {}, 'outputs': {'xyz.txt': []}}
to_json(*args, **kwargs) str[source]

Convert an instance of this class into a JSON string.

>>> dt = datatask({"outputs": {"abc.text": []}})
>>> dt.to_json()
'{"outputs": {"abc.text": []}}'

All arguments supplied to this method are passed directly to the internal invocation of json.dumps.

>>> print(dt.to_json(indent=2))
{
  "outputs": {
    "abc.text": []
  }
}
resources() dict[source]

Return dictionary mapping resource name to resource URIs for this instance.

>>> dt = datatask.from_json({
...     "inputs": {"abc.txt": {}},
...     "outputs": {"xyz.text": [{"abc.txt": 0}]}
... })
>>> dt.resources()
{}
inputs() dict[source]

Return dictionary of inputs for this instance.

>>> dt = datatask.from_json({
...     "inputs": {"abc.txt": {}},
...     "outputs": {"xyz.text": [{"abc.txt": 0}]}
... })
>>> dt.inputs()
{'abc.txt': {}}
outputs() dict[source]

Return dictionary of outputs for this instance.

>>> dt = datatask.from_json({
...     "inputs": {"abc.txt": {}},
...     "outputs": {"xyz.text": [{"abc.txt": 0}]}
... })
>>> dt.outputs()
{'xyz.text': [{'abc.txt': 0}]}