Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
This repository was archived by the owner on Aug 19, 2024. It is now read-only.

SQLAlchemy 2.x upgrade #20

@mmssix

Description

@mmssix

I know this isnt the way it should be done, but this project is truly dead otherwise. So to make it easy to find for others that are in need, this is the library upgraded to use SQLAlchemy2.

I dont know if its the correct way to code this or not, but it does seem to work.

from collections import defaultdict, namedtuple
import re
import inspect


BOOLEAN_FIELDS = (
    "search.regex", "searchable", "orderable", "regex"
)


DataColumn = namedtuple("DataColumn", ("name", "model_name", "filter"))


class DataTablesError(ValueError):
    pass


class DataTable(object):
    def __init__(self, params, model, query, columns):
        self.params = params
        self.model = model
        self.query = query
        self.data = {}
        self.columns = []
        self.columns_dict = {}
        self.search_func = lambda qs, s: qs
        self.column_search_func = lambda mc, qs, s: qs

        for col in columns:
            name, model_name, filter_func = None, None, None

            if isinstance(col, DataColumn):
                self.columns.append(col)
                continue
            elif isinstance(col, tuple):
                # col is either 1. (name, model_name), 2. (name, filter) or 3. (name, model_name, filter)
                if len(col) == 3:
                    name, model_name, filter_func = col
                elif len(col) == 2:
                    # Work out the second argument. If it is a function then it's type 2, else it is type 1.
                    if callable(col[1]):
                        name, filter_func = col
                        model_name = name
                    else:
                        name, model_name = col
                else:
                    raise ValueError("Columns must be a tuple of 2 to 3 elements")
            else:
                # It's just a string
                name, model_name = col, col

            d = DataColumn(name=name, model_name=model_name, filter=filter_func)
            self.columns.append(d)
            self.columns_dict[d.name] = d

        for column in (col for col in self.columns if "." in col.model_name):
            parent_table = getattr(self.model, column.model_name.split(".")[0])
            self.query = self.query.join(parent_table)

    def query_into_dict(self, key_start):
        returner = defaultdict(dict)

        # Matches columns[number][key] with an [optional_value] on the end
        pattern = "{}(?:\[(\d+)\])?\[(\w+)\](?:\[(\w+)\])?".format(key_start)

        columns = (param for param in self.params if re.match(pattern, param))

        for param in columns:

            column_id, key, optional_subkey = re.search(pattern, param).groups()

            if column_id is None:
                returner[key] = self.coerce_value(key, self.params[param])
            elif optional_subkey is None:
                returner[int(column_id)][key] = self.coerce_value(key, self.params[param])
            else:
                # Oh baby a triple
                subdict = returner[int(column_id)].setdefault(key, {})
                subdict[optional_subkey] = self.coerce_value("{}.{}".format(key, optional_subkey),
                                                             self.params[param])

        return dict(returner)

    @staticmethod
    def coerce_value(key, value):
        try:
            return int(value)
        except ValueError:
            if key in BOOLEAN_FIELDS:
                return value == "true"

        return value

    def get_integer_param(self, param_name):
        if param_name not in self.params:
            raise DataTablesError("Parameter {} is missing".format(param_name))

        try:
            return int(self.params[param_name])
        except ValueError:
            raise DataTablesError("Parameter {} is invalid".format(param_name))

    def add_data(self, **kwargs):
        self.data.update(**kwargs)

    def json(self):
        try:
            return self._json()
        except DataTablesError as e:
            return {
                "error": str(e)
            }

    def get_column(self, column):
        if "." in column.model_name:
            column_path = column.model_name.split(".")
            relationship = getattr(self.model, column_path[0])
            model_column = getattr(relationship.property.mapper.entity, column_path[1])
        else:
            model_column = getattr(self.model, column.model_name)

        return model_column

    def searchable(self, func):
        self.search_func = func

    def searchable_column(self, func):
        self.column_search_func = func

    def _json(self):
        draw = self.get_integer_param("draw")
        start = self.get_integer_param("start")
        length = self.get_integer_param("length")

        columns = self.query_into_dict("columns")
        ordering = self.query_into_dict("order")
        search = self.query_into_dict("search")

        query = self.query
        total_records = query.count()

        if callable(self.search_func) and search.get("value", None):
            query = self.search_func(query, search["value"])

        for column_data in columns.values():
            search_value = column_data["search"]["value"]
            if (
                not column_data["searchable"]
                or not search_value
                or not callable(self.column_search_func)
            ):
                continue

            column_name = column_data["data"]
            column = self.columns_dict[column_name]

            model_column = self.get_column(column)

            query = self.column_search_func(model_column, query, str(search_value))

        for order in ordering.values():
            direction, column = order["dir"], order["column"]

            if column not in columns:
                raise DataTablesError("Cannot order {}: column not found".format(column))

            if not columns[column]["orderable"]:
                continue

            column_name = columns[column]["data"]
            column = self.columns_dict[column_name]

            model_column = self.get_column(column)

            if isinstance(model_column, property):
                raise DataTablesError("Cannot order by column {} as it is a property".format(column.model_name))

            query = query.order_by(model_column.desc() if direction == "desc" else model_column.asc())

        filtered_records = query.count()

        if length > 0:
            query = query.slice(start, start + length)

        return {
            "draw": draw,
            "recordsTotal": total_records,
            "recordsFiltered": filtered_records,
            "data": [
                self.output_instance(instance) for instance in query.all()
            ]
        }

    def output_instance(self, instance):
        returner = {
            key.name: self.get_value(key, instance) for key in self.columns
        }

        if self.data:
            returner["DT_RowData"] = {
                k: v(instance) for k, v in self.data.items()
            }

        return returner

    def get_value(self, key, instance):
        attr = key.model_name
        if "." in attr:
            tmp_list = attr.split(".")
            attr = tmp_list[-1]
            for sub in tmp_list[:-1]:
                instance = getattr(instance, sub)

        if key.filter is not None:
            r = key.filter(instance)

        else:
            r = getattr(instance, attr)
            try:
                if not inspect.isbuiltin(r):
                    attributes = vars(r)
                    values = {}
                    for attribute in attributes.keys():
                        if attribute != '_sa_instance_state':
                            values[attribute] = getattr(r, attribute)
                    r = values
            except Exception as e:
                pass

        return r() if inspect.isroutine(r) else r

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions