25 October 2022 by Pawel Leszczynski & Michael Robinson
How to get started with the new column lineage feature in Marquez.
We are excited to announce the addition of column-level lineage to Marquez with the release of 0.27.0. One of our most frequently requested new features, column-level lineage makes dataset column inputs and outputs available via the Marquez API. We’re pleased to be able to share this new feature now, and we welcome contributors to this important development in the project.
Our plans for the feature include support beyond the Spark integration as well as UI support for column lineage.
Simply put, column lineage is lineage data about columns. This means that in addition to emitting dataset inputs and outputs, the OpenLineage-Spark integration now emits column inputs and outputs. Thanks to this metadata, users can glean information about the input columns that were used to produce the columns of a dataset.
A major benefit of column lineage is the finer granularity of the data that one gets thanks to the deeper level of insight into a pipeline. For example, column lineage lets you track the usage of sensitive data, such customers’ personal information, by members of your organization. This capability is essential to meeting some requirements of regulatory bodies such as the GDPR, HIPAA, CCPA, BCBS and PCI, who have instituted requirements for data accuracy and integrity that compel companies and organizations to monitor their datasets and pipelines more closely than in the past.
The spec uses a new facet, ColumnLineageDatasetFacet
, to store column lineage. For each column of an output dataset, the facet relays a list of columns from the input datasets that were used to produce the column.
Here’s the new facet in the OpenLineage spec:
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://openlineage.io/spec/facets/1-0-1/ColumnLineageDatasetFacet.json",
"$defs": {
"ColumnLineageDatasetFacet": {
"allOf": [{
"$ref": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/DatasetFacet"
}, {
"type": "object",
"properties": {
"fields": {
"description": "Column level lineage that maps output fields into input fields used to evaluate them.",
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"inputFields": {
"type": "array",
"items": {
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "The input dataset namespace"
},
"name": {
"type": "string",
"description": "The input dataset name"
},
"field": {
"type": "string",
"description": "The input field"
}
},
"additionalProperties": true,
"required": [
"namespace", "name", "field"
]
}
},
"transformationDescription": {
"type": "string",
"description": "a string representation of the transformation applied"
},
"transformationType": {
"type": "string",
"description": "IDENTITY|MASKED reflects a clearly defined behavior. IDENTITY: exact same as input; MASKED: no original data available (like a hash of PII for example)"
}
},
"additionalProperties": true,
"required": ["inputFields"]
}
}
},
"additionalProperties": true,
"required": [
"fields"
]
}],
"type": "object"
}
},
"type": "object",
"properties": {
"columnLineage": {
"$ref": "#/$defs/ColumnLineageDatasetFacet"
}
}
}
As you can see above, two extra fields offer the ability to emit additional information: transformationDescription
and transformationType
. The transformationDescription
field emits a string describing the transformations of input columns that have produced an output column. The transformationType
field, a string field containing either IDENTITY
or MASKED
, indicates whether the column data is exactly the same as the input data or if no original data is available (as in the case of encrypted data).
Support for column lineage is currently limited to the Spark integration, which now detects column lineage out of the box. Also, the Marquez API now contains methods to retrieve column lineage in graph form.
The new API endpoint (at api/src/main/java/marquez/api/ColumnLineageResource.java
):
public class ColumnLineageResource extends BaseResource {
private static final String DEFAULT_DEPTH = "20";
public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}
@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
}
}
A new workshop in the OpenLineage/workshops repository provides an easy way to try out the new feature in a Jupyter Notebook using Git, Docker, and Marquez.
What you’ll need:
What you’ll learn:
Support for column lineage is currently limited to the Spark integration, but we intend to expand the feature. Our plans include:
We would love to help others develop the column-level lineage features they need, and we welcome contributions to this ongoing effort at implementing column-level lineage in Marquez! If you have experience doing frontend development, the UI work might be a good place to start.
Does this sound fun? Check out our new contributor guide to get started.