Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## Unreleased
- MongoDB: Improved `MongoDBCrateDBConverter.decode_canonical` to also
decode non-UUID binary values
- Zyp/Moksha/jq: `to_object` function now respects a `zap` option, that
removes the element altogether if it's empty
- Zyp/Moksha/jq: Improve error reporting at `MokshaTransformation.apply`
- MongoDB: Improved `MongoDBCrateDBConverter.decode_extended_json`

## 2024/09/22 v0.0.17
- MongoDB: Fixed edge case when decoding MongoDB Extended JSON elements
Expand Down
24 changes: 24 additions & 0 deletions doc/zyp/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@
- [ ] Documentation: Usage (build (API, from_yaml), apply)
- [ ] Documentation: How to extend `function.{jq,py}`

### Documentation
```
- Omit records `and .value.bill_contact.id != ""`
# Only accept `email` elements that are objects.
#and (if (.value | index("emails")) then (.value.emails[].type | type) == "object" else true end)

# Exclude a few specific documents.
# TODO: Review documents once more to discover more edge cases.
.[] |= select(
and ._id != "55d71c8ce4b02210dc47b10f"
)

# Some early `phone` elements have been stored wrongly,
# all others are of type OBJECT.
#and (.value.phone | type) != "array"

# Some early `urls` elements have been stored wrongly,
# all others are of type ARRAY.
#and (.value.urls | type) != "object"
```


## Iteration +2
- [ ] CLI interface
- [ ] Documentation: Add Python example to "Synopsis" section on /index.html
Expand Down Expand Up @@ -58,6 +80,8 @@ Demonstrate more use cases, like...
- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py
- [ ] Is `jqpy` better than `jq`?
- https://baterflyrity.github.io/jqpy/
- [ ] Load XML via Badgerfish or KDL
https://github.com/kdl-org/kdl

## Done
- [x] Refactor module namespace to `zyp`
Expand Down
104 changes: 75 additions & 29 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Copyright (c) 2021-2024, Crate.io Inc.
# Distributed under the terms of the LGPLv3 license, see LICENSE.
# ruff: noqa: S608
import base64
import calendar
import datetime as dt
import logging
import typing as t
from functools import cached_property
from typing import Iterable

import bson
Expand All @@ -23,18 +25,6 @@
logger = logging.getLogger(__name__)


def date_converter(value):
if isinstance(value, int):
return value
elif isinstance(value, (str, bytes)):
datetime = dateparser.parse(value)
elif isinstance(value, dt.datetime):
datetime = value
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return calendar.timegm(datetime.utctimetuple()) * 1000


@define
class MongoDBCrateDBConverter:
"""
Expand All @@ -43,6 +33,9 @@ class MongoDBCrateDBConverter:
Extracted from cratedb-toolkit, earlier migr8.
"""

timestamp_to_epoch: bool = False
timestamp_to_iso8601: bool = False
timestamp_use_milliseconds: bool = False
transformation: t.Any = None

def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]:
Expand Down Expand Up @@ -72,7 +65,7 @@ def decode_value(self, value: t.Any) -> t.Any:
if isinstance(value, dict):
# Decode item in BSON CANONICAL format.
if len(value) == 1 and next(iter(value)).startswith("$"):
return self.decode_canonical(value)
return self.decode_extended_json(value)

# Custom adjustments to compensate shape anomalies in source data.
# TODO: Review if it can be removed or refactored.
Expand Down Expand Up @@ -110,28 +103,81 @@ def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
return _json_convert(item)

@staticmethod
def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any:
def decode_extended_json(self, value: t.Dict[str, t.Any]) -> t.Any:
"""
Decode MongoDB Extended JSON CANONICAL representation.
Decode MongoDB Extended JSON representation, canonical and legacy variants.
"""
type_ = list(value.keys())[0]

out: t.Any

# Special handling for datetime representation in NUMBERLONG format (emulated depth-first).
type_ = next(iter(value)) # Get key of first item in dictionary.
is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"]
if is_date_numberlong:
return int(object_hook(value["$date"]))
out = dt.datetime.fromtimestamp(int(value["$date"]["$numberLong"]) / 1000, tz=dt.timezone.utc)
else:
value = object_hook(value)
is_bson = type(value).__module__.startswith("bson")
if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE:
value = value.as_uuid()
if isinstance(value, bson.Timestamp):
value = value.as_datetime()
if isinstance(value, dt.datetime):
return date_converter(value)
out = object_hook(value)

is_bson = isinstance(out, self.all_bson_types)

# Decode BSON types.
if isinstance(out, bson.Binary) and out.subtype == bson.UUID_SUBTYPE:
out = out.as_uuid()
elif isinstance(out, bson.Binary):
out = base64.b64encode(out).decode()
elif isinstance(out, bson.Timestamp):
out = out.as_datetime()

# Decode Python types.
if isinstance(out, dt.datetime):
if self.timestamp_to_epoch:
out = self.convert_epoch(out)
if self.timestamp_use_milliseconds:
out *= 1000
return out
elif self.timestamp_to_iso8601:
return self.convert_iso8601(out)

# Wrap up decoded BSON types as strings.
if is_bson:
return str(value)
return value
return str(out)

# Return others converted as-is.
return out

@cached_property
def all_bson_types(self) -> t.Tuple[t.Type, ...]:
_types: t.List[t.Type] = []
for _typ in bson._ENCODERS:
if hasattr(_typ, "_type_marker"):
_types.append(_typ)
return tuple(_types)

@staticmethod
def convert_epoch(value: t.Any) -> float:
if isinstance(value, int):
return value
elif isinstance(value, dt.datetime):
datetime = value
elif isinstance(value, (str, bytes)):
datetime = dateparser.parse(value)
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return calendar.timegm(datetime.utctimetuple())

@staticmethod
def convert_iso8601(value: t.Any) -> str:
if isinstance(value, str):
return value
elif isinstance(value, dt.datetime):
datetime = value
elif isinstance(value, bytes):
return value.decode("utf-8")
elif isinstance(value, int):
datetime = dt.datetime.fromtimestamp(value, tz=dt.timezone.utc)
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return datetime.isoformat()

def apply_special_treatments(self, value: t.Any):
"""
Expand Down Expand Up @@ -171,7 +217,7 @@ class MongoDBTranslatorBase:

def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None):
self.table_name = quote_relation_name(table_name)
self.converter = converter or MongoDBCrateDBConverter()
self.converter = converter or MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True)

@property
def sql_ddl(self):
Expand Down
14 changes: 12 additions & 2 deletions src/zyp/function.jq
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,21 @@ def to_array:

def to_object(options):
# Wrap element into object with given key if it isn't an object already.
# When option `zap: true`, remove element altogether because it is
# empty anyway, in the sense that it includes a single item with a
# null value.
if . | type == "object" then
.
else
{(options.key): .}
end;
if . then
{(options.key): .}
end
end
|
if options.zap then
values
end
;

def is_array_of_objects:
# Check if element is an array containing objects.
Expand Down
12 changes: 10 additions & 2 deletions src/zyp/model/moksha.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import logging
import typing as t

import jmespath
Expand All @@ -11,6 +12,8 @@
from zyp.model.bucket import ConverterBase, MokshaTransformer, TransonTemplate
from zyp.util.expression import compile_expression

logger = logging.getLogger(__name__)


@define
class MokshaRule:
Expand Down Expand Up @@ -69,7 +72,12 @@ def transon(self, expression: TransonTemplate) -> "MokshaTransformation":
self._add_rule(MokshaRule(type="transon", expression=expression))
return self

def apply(self, data: DictOrList) -> DictOrList:
def apply(self, data: t.Any) -> t.Any:
for rule in self._runtime_rules:
data = rule.evaluate(data)
try:
data = rule.evaluate(data)
except Exception:
logger.exception(f"Error evaluating rule: {rule}")
logger.debug(f"Error payload:\n{data}")
raise
return data
15 changes: 8 additions & 7 deletions tests/transform/mongodb/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
A few samples of MongoDB BSON / JSON structures.

Derived from:
https://github.com/mongodb/bson-ruby/tree/v5.0.1/spec/spec_tests/data/corpus
- https://github.com/mongodb/mongo-java-driver/tree/master/bson/src/test/resources/bson
- https://github.com/mongodb/bson-ruby/tree/v5.0.1/spec/spec_tests/data/corpus
"""
# ruff: noqa: ERA001

Expand Down Expand Up @@ -199,13 +200,13 @@
"list_uuid": [
# TODO: TypeError: Object of type bytes is not JSON serializable
# b's\xff\xd2dD\xb3Li\x90\xe8\xe7\xd1\xdf\xc05\xd4',
"b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'",
"b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'",
"b's\\xff\\xd2dD\\xb3Li\\x90\\xe8\\xe7\\xd1\\xdf\\xc05\\xd4'",
"c//SZESzTGmQ6OfR38A11A==",
"c//SZESzTGmQ6OfR38A11A==",
"c//SZESzTGmQ6OfR38A11A==",
"73ffc060-30b8-db47-2c20-8dfddbde3cdc",
"b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'",
"b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'",
"b's\\xff\\xc0`0\\xb8\\xdbG, \\x8d\\xfd\\xdb\\xde<\\xdc'",
"c//AYDC420csII3929483A==",
"c//AYDC420csII3929483A==",
"c//AYDC420csII3929483A==",
],
"maxkey": "MaxKey()",
"minkey": "MinKey()",
Expand Down
Loading