From dbaf232694ebbbed1046786c46f4332e90d30144 Mon Sep 17 00:00:00 2001
From: Timm Fitschen <t.fitschen@indiscale.com>
Date: Wed, 7 Oct 2020 17:45:35 +0200
Subject: [PATCH] WIP:

---
 djaosdb/caosdb_client.py        |  93 +++---
 djaosdb/operations.py           |   5 +-
 djaosdb/sql2mongo/converters.py |  36 ++-
 djaosdb/sql2mongo/operators.py  |  30 +-
 djaosdb/sql2mongo/query.py      |  91 ++++--
 setup.py                        |   3 +-
 tests/test_new.py               | 530 ++++++++++++++++++++++++++++++++
 7 files changed, 702 insertions(+), 86 deletions(-)
 create mode 100644 tests/test_new.py

diff --git a/djaosdb/caosdb_client.py b/djaosdb/caosdb_client.py
index d323ba7..9c147ec 100644
--- a/djaosdb/caosdb_client.py
+++ b/djaosdb/caosdb_client.py
@@ -98,26 +98,37 @@ class DefaultCaosDBClientDelegate:
     def __init__(self, **kwargs):
         self.cached_record_types = CachedRecordTypes(self)
         self.cached_properties = CachedProperties(self)
-        self._connection = caosdb.configure_connection(**kwargs)
+        if "caosdb" in kwargs:
+            self._caosdb = kwargs["caosdb"]
+        else:
+            self._caosdb = caosdb
+
+        self._caosdb.configure_connection(**kwargs)
 
     def _get_filter_clause(self, fil):
         logger.debug("enter _get_filter_clause(%s)", fil)
-        if "$and" in fil:
-            components = [self._get_filter_clause(comps) for comps in fil["$and"]]
+        if not "type" in fil:
+            raise NotImplementedError("_get_filter_clause(%s)", fil)
+
+        filter_type = fil["type"]
+        if filter_type == "and":
+            components = [self._get_filter_clause(comps) for comps in
+                          fil["elements"]]
             return " AND".join(components)
-        if "reference" in fil:
-            ref = fil["reference"]
-            result = ref["str"]
+        if filter_type in ["reference", "back_reference"]:
+            ref = fil
+            result = " " + ref["str"]
             if "sub" in ref:
-                if not "$in" in ref["sub"]:
-                    raise NotImplementedError("_get_filter_clause(%s)", fil)
-                components = [" ID = " + str(val) for val in ref["sub"]["$in"]]
-                result += " WITH (" + " OR".join(components) + " )"
+                subquery = self._get_filter_clause(ref["sub"])
+                result += " WITH (" + subquery + ")"
             return result
-        if "p" in fil:
-            n = ""
-            if fil["negation"] is True:
-                n = "NOT "
+        if filter_type == "in":
+            p = fil["p"]
+            values = fil["v"]
+            components = [f" {p} = " + str(val) for val in values]
+            return " OR".join(components)
+        if filter_type == "pov":
+            n = "NOT " if fil["negation"] is True else ""
             p = fil["p"]
             o = fil["o"]
             v = fil["v"]
@@ -131,7 +142,7 @@ class DefaultCaosDBClientDelegate:
             filter_clause = " WITH " + self._get_filter_clause(fil)
 
         query = f'FIND RECORD "{record_type}"{filter_clause}'
-        return caosdb.execute_query(query)
+        return self._caosdb.execute_query(query)
 
     def find(self, record_type, *args, **kwargs):
         res = self._find(record_type, *args, **kwargs)
@@ -140,23 +151,23 @@ class DefaultCaosDBClientDelegate:
         return FindResult(rows, projection)
 
     def list_record_type_names(self):
-        res = caosdb.execute_query("SELECT name FROM RECORDTYPE")
+        res = self._caosdb.execute_query("SELECT name FROM RECORDTYPE")
         return [e.name for e in res if e.name is not None]
 
     def list_property_names(self):
-        res1 = caosdb.execute_query("SELECT name FROM PROPERTY")
-        res2 = caosdb.execute_query("SELECT name FROM RECORDTYPE")
+        res1 = self._caosdb.execute_query("SELECT name FROM PROPERTY")
+        res2 = self._caosdb.execute_query("SELECT name FROM RECORDTYPE")
         return [e.name for e in res1 + res2 if e.name is not None]
 
     def create_record_type(self, name : str, properties : list):
-        c = caosdb.Container()
-        rt = caosdb.RecordType(name)
+        c = self._caosdb.Container()
+        rt = self._caosdb.RecordType(name)
         c.append(rt)
         for p in properties:
             name = p["name"]
             datatype = p["datatype"]
             if name not in self.cached_properties:
-                new_property = caosdb.Property(name=name, datatype=datatype)
+                new_property = self._caosdb.Property(name=name, datatype=datatype)
                 c.append(new_property)
 
             rt.add_property(name=name, datatype=datatype)
@@ -164,7 +175,7 @@ class DefaultCaosDBClientDelegate:
         c.insert()
 
     def insert_many(self, record_type : str, records : list):
-        c = caosdb.Container()
+        c = self._caosdb.Container()
         property_names = self.list_property_names()
         for rec in records:
             name, description = None, None
@@ -173,7 +184,7 @@ class DefaultCaosDBClientDelegate:
             if "description" in rec:
                 description = rec["description"]
 
-            new_rec = caosdb.Record(name=name, description=description)
+            new_rec = self._caosdb.Record(name=name, description=description)
             new_rec.add_parent(record_type)
             if "properties" in rec:
                 for prop in rec["properties"]:
@@ -200,8 +211,8 @@ class DefaultCaosDBClientDelegate:
 
 
     def add_foreign_key(self, record_type, *args, **kwargs):
-        c = caosdb.Container()
-        rt = caosdb.RecordType(record_type).retrieve()
+        c = self._caosdb.Container()
+        rt = self._caosdb.RecordType(record_type).retrieve()
         c.append(rt)
         referenced = kwargs["target_record_type"]
         ref_prop_ok = len(kwargs["target_property"]) == 1
@@ -213,34 +224,32 @@ class DefaultCaosDBClientDelegate:
         rt.get_property(kwargs["property"][0][0]).datatype=referenced
         c.update()
 
-    def aggregate(self, record_type, *args, **kwargs):
-        kwargs.update(args[0])
+    def aggregate(self, record_type, **kwargs):
         return self._aggregate(record_type, **kwargs)
 
-    def _aggregate(self, record_type, reference=None, sort=None,
+    def _aggregate(self, record_type, sort=None,
                    projection=None, limit=None, filter=None, count=False):
+        query = self._generate_query(record_type, sort, projection, filter,
+                                     count)
+
+        logger.debug("execute_query(%s)", query)
+        res = self._caosdb.execute_query(query)
+        if count:
+            return CountResult(res, count)
+        rows = res.get_property_values(*projection)
+        return FindResult(rows, projection, sort, limit)
+
+    def _generate_query(self, record_type, sort, projection, filter, count):
         prefix = 'FIND'
         filter_clause = ""
-        if reference is not None:
-            fil = {"reference": reference}
-            filter_clause = " WHICH " + self._get_filter_clause(fil)
         if filter is not None:
-            if reference is not None:
-                filter_clause += " AND"
-            filter_clause += " WITH " + self._get_filter_clause(filter)
+            filter_clause += " WITH" + self._get_filter_clause(filter)
         if count:
             prefix = "COUNT"
         elif sort is not None:
             prefix = 'SELECT ' + ", ".join(list(projection) +
                                            list(sort.keys())) + " FROM"
-        query = f'{prefix} RECORD "{record_type}"{filter_clause}'
-
-        logger.debug("execute_query(%s)", query)
-        res = caosdb.execute_query(query)
-        if count:
-            return CountResult(res, count)
-        rows = res.get_property_values(*projection)
-        return FindResult(rows, projection, sort, limit)
+        return f'{prefix} RECORD "{record_type}"{filter_clause}'
 
 
 
diff --git a/djaosdb/operations.py b/djaosdb/operations.py
index 8dff9ee..7022e6d 100644
--- a/djaosdb/operations.py
+++ b/djaosdb/operations.py
@@ -2,6 +2,7 @@ import pytz
 from django.conf import settings
 from django.db.backends.base.operations import BaseDatabaseOperations
 from django.utils import timezone
+from dateutil.parser import isoparse
 import datetime
 import calendar
 
@@ -73,7 +74,9 @@ class DatabaseOperations(BaseDatabaseOperations):
         return value
 
     def convert_datetimefield_value(self, value, expression, connection):
-        if isinstance(value, datetime.datetime):
+        if isinstance(value, str):
+            value = isoparse(value)
+        if isinstance(value, datetime.datetime) and value.tzinfo is None:
             if settings.USE_TZ:
                 value = timezone.make_aware(value, self.connection.timezone)
         return value
diff --git a/djaosdb/sql2mongo/converters.py b/djaosdb/sql2mongo/converters.py
index f5c711c..e06c13f 100644
--- a/djaosdb/sql2mongo/converters.py
+++ b/djaosdb/sql2mongo/converters.py
@@ -124,7 +124,7 @@ class WhereConverter(Converter):
         )
 
     def to_mongo(self):
-        return {'filter': self.op.to_mongo()}
+        return self.op.to_mongo()
 
 
 class JoinConverter(Converter):
@@ -162,21 +162,23 @@ class JoinConverter(Converter):
 
     def _reference(self):
         if self.left_column == "id":
+            backref = f"{self.right_table} AS {self.right_column}"
             return {
-                "back_reference": {
+                "type": "back_reference",
                 "p": self.right_column,
                 "v": self.right_table,
                 "negation": False,
-                "str":
-                    f"IS REFERENCED BY {self.right_table} AS {self.right_column}"
-                }}
+                "str": f"IS REFERENCED BY {backref}"
+            }
         elif self.right_column == "id":
-            return { "reference": {
+            ref = f"{self.right_table} AS {self.left_column}"
+            return {
+                "type": "reference",
                 "p": self.left_column,
                 "v": self.right_table,
                 "negation": False,
-                "str": f"REFERENCES {self.right_table} AS {self.left_column}"
-            }}
+                "str": f"REFERENCES {ref}",
+            }
         return None
 
     def _lookup(self):
@@ -203,17 +205,18 @@ class InnerJoinConverter(JoinConverter):
         super().__init__(*args)
 
     def to_mongo(self):
-        if self.left_table == self.query.left_table:
-            match_field = self.left_column
-        else:
-            match_field = f'{self.left_table}.{self.left_column}'
-
         # try to construct a references/referenced-by filter
         reference = self._reference()
         if reference is not None:
             return reference
 
+        if self.left_table == self.query.left_table:
+            match_field = self.left_column
+        else:
+            match_field = f'{self.left_table}.{self.left_column}'
+
         lookup = self._lookup()
+
         pipeline = [
             {
                 '$match': {
@@ -246,6 +249,13 @@ class OuterJoinConverter(JoinConverter):
         return fields
 
     def to_mongo(self):
+        # try to construct a references/referenced-by filter
+        reference = self._reference()
+        if reference is not None:
+            # left outer join is equivalent to "REFERENCES ... OR NOT
+            # REFERENCES" - the interessting part is done via the select.
+            return {}
+
         lookup = self._lookup()
         null_fields = self._null_fields(self.right_table)
 
diff --git a/djaosdb/sql2mongo/operators.py b/djaosdb/sql2mongo/operators.py
index 80f0626..3f427df 100644
--- a/djaosdb/sql2mongo/operators.py
+++ b/djaosdb/sql2mongo/operators.py
@@ -118,7 +118,8 @@ class _InNotInOp(_BinaryOp):
             }
 
         else:
-            return {self._field: {op: self._in}}
+            return {"type": op[1:], "p": self._field,
+                    "v": self._in}
 
 
 class NotInOp(_InNotInOp):
@@ -317,12 +318,13 @@ class _AndOrOp(_Op):
 
     def to_mongo(self):
         if self.op_type() == AndOp:
-            oper = '$and'
+            oper = 'and'
         else:
-            oper = '$or'
+            oper = 'or'
 
         docs = [itm.to_mongo() for itm in self._acc]
-        return {oper: docs}
+        return {"type": oper,
+                "elements": docs}
 
 
 class AndOp(_AndOrOp):
@@ -522,12 +524,24 @@ class CmpOp(_Op):
         pass
 
     def to_mongo(self):
+
+        ref = None
         field = self._identifier.field
         if self._field_ext:
-            field += '.' + self._field_ext
-
-        return {"negation": self.is_negated, "p": field, "o":
-                self._operator, "v": self._constant}
+            ref = self._field_ext
+        elif self._identifier.query.left_table != self._identifier.table:
+            ref = self._identifier.table
+            field = self._identifier.column
+
+        result = {"type": "pov",
+                  "negation": self.is_negated,
+                  "p": field,
+                  "o": self._operator,
+                  "v": self._constant}
+        if ref:
+            result["ref"] = ref
+
+        return result
 
 
 OPERATOR_MAP = {
diff --git a/djaosdb/sql2mongo/query.py b/djaosdb/sql2mongo/query.py
index ada223c..75220cc 100644
--- a/djaosdb/sql2mongo/query.py
+++ b/djaosdb/sql2mongo/query.py
@@ -37,6 +37,56 @@ from .converters import (ColumnSelectConverter, AggColumnSelectConverter,
 from djaosdb import base
 logger = getLogger(__name__)
 
+def _conjunction_filters(*fils):
+
+    # remove None and empty things
+    _fils = [fil for fil in fils if fil]
+
+    if not _fils:
+        return {}
+    if len(_fils) == 1:
+        return _fils[0]
+    return {
+        "type": "and",
+        "elements": _fils,
+    }
+
+def _merge_joins_and_where(joins, where):
+    if not joins:
+        return joins, where
+
+    merged_joins = [join for join in joins
+                    if join and not "type" in join]
+    refs = [join for join in joins
+            if "type" in join and join["type"] in ["reference",
+                                                   "back_reference"]]
+
+    if not where:
+        return merged_joins, _conjunction_filters(*refs)
+
+    for ref in refs:
+        # look for subqueries
+        p = ref["p"]
+        if ref["type"] == "reference":
+            if "p" in where and where["p"] == p:
+                # no conjunction or disjuction
+                ref["sub"] = where
+                ref["sub"]["p"] = "ID"
+                where = None
+                break
+        elif ref["type"] == "back_reference":
+            v = ref["v"]
+            if ("ref" in where and where["ref"]):
+                # no conjunction or disjuction
+                ref["sub"] = where
+                where = None
+                break
+        else:
+            raise NotImplementedError("merging conjuction or disjuction "
+                                      "filters with joins")
+
+    merged_where = _conjunction_filters(where, *refs)
+    return merged_joins, merged_where
 
 class ReturnDocument(enum.Enum):
     AFTER = 1
@@ -130,7 +180,9 @@ class SelectQuery(DQLQuery):
         statement = SQLStatement(self.statement)
 
         for tok in statement:
-            if tok.match(tokens.DML, 'SELECT'):
+            if tok.is_whitespace:
+                continue
+            elif tok.match(tokens.DML, 'SELECT'):
                 self.selected_columns = ColumnSelectConverter(self, statement)
 
             elif tok.match(tokens.Keyword, 'FROM'):
@@ -163,7 +215,7 @@ class SelectQuery(DQLQuery):
                 self.where = WhereConverter(self, statement)
 
             else:
-                raise SQLDecodeError(f'Unknown keyword: {tok}')
+                raise SQLDecodeError(f'Unknown token {type(tok)}: "{tok}"')
 
     def __iter__(self):
 
@@ -237,35 +289,29 @@ class SelectQuery(DQLQuery):
             aggregation.update(select)
 
         ## merge join and where clause
-        if joins is not None and where is not None:
-            if len(joins) == 1 and "reference" in joins[0]:
-                ref = joins[0]["reference"]
-                p = ref["p"]
-                if p in where["filter"]:
-                    ref["sub"] = where["filter"][p]
-                    aggregation.update(joins[0])
-                    joins = None
-                    where = None
-        if joins is not None:
+        joins, where = _merge_joins_and_where(joins, where)
+        if joins:
             aggregation["joins"] = joins
-        if where is not None:
-            aggregation.update(where)
+        if where:
+            aggregation["filter"] = where
 
 
         return aggregation
 
+
+
     def _needs_column_selection(self):
         return not(self.distinct or self.groupby) and self.selected_columns
 
-    def _get_cursor(self):
+    def _to_caosdb(self):
+        """return the correct callback and the paramters"""
         if self._needs_aggregation():
             pipeline = self._make_pipeline()
-            cur = self.connection.aggregate(self.left_table, pipeline)
-            logger.debug(f'Aggregation query: {pipeline}')
+            return self.connection.aggregate, pipeline
         else:
             kwargs = {}
             if self.where:
-                kwargs.update(self.where.to_mongo())
+                kwargs["filter"] = self.where.to_mongo()
 
             if self.selected_columns:
                 kwargs.update(self.selected_columns.to_mongo())
@@ -279,8 +325,11 @@ class SelectQuery(DQLQuery):
             if self.offset:
                 kwargs.update(self.offset.to_mongo())
 
-            cur = self.connection.find(self.left_table, **kwargs)
-            logger.debug(f'Find query: {kwargs}')
+            return self.connection.find, kwargs
+
+    def _get_cursor(self):
+        callback, params = self._to_caosdb()
+        cur = callback(self.left_table, **params)
 
         return cur
 
@@ -346,7 +395,7 @@ class UpdateQuery(DMLQuery):
 
         self.kwargs = {}
         if self.where:
-            self.kwargs.update(self.where.to_mongo())
+            self.kwargs["filter"] = self.where.to_mongo()
 
         self.kwargs.update(self.set_columns.to_mongo())
 
diff --git a/setup.py b/setup.py
index ada86ff..78db8ac 100644
--- a/setup.py
+++ b/setup.py
@@ -81,9 +81,10 @@ def find_version(*file_paths):
 
 
 install_requires = [
-    'sqlparse>=0.2.4',
+    'sqlparse==0.2.4',
     'caosdb>=0.4.0',
     'django>=2.1',
+    'python-dateutil>=2.8.1',
 ]
 
 if sys.version_info.major == 3 and sys.version_info.minor < 7:
diff --git a/tests/test_new.py b/tests/test_new.py
new file mode 100644
index 0000000..4cb203e
--- /dev/null
+++ b/tests/test_new.py
@@ -0,0 +1,530 @@
+import datetime
+from pytest import raises
+from collections import OrderedDict
+from caosdb import Container as _Container, Record as _Record
+from sqlparse import parse
+from sqlparse.sql import Statement
+from djaosdb.base import DatabaseWrapper
+from djaosdb.caosdb_client import FindResult, CaosDBClient
+from djaosdb.exceptions import SQLDecodeError
+from djaosdb.sql2mongo.query import (Query, InsertQuery, SelectQuery,
+                                     _merge_joins_and_where)
+from djaosdb.sql2mongo.converters import (AggColumnSelectConverter,
+                                          InnerJoinConverter,
+                                          OuterJoinConverter)
+
+
+class _MockContainer(_Container):
+
+    def insert(self, *args, **kwargs):
+        pass
+
+class _MockConnection(CaosDBClient):
+
+
+    def __init__(self, cached_record_types=None, inserted_ids=None):
+        self.inserted_ids = inserted_ids
+        super().__init__(caosdb=self)
+        if cached_record_types is not None:
+            self.cached_record_types.update(cached_record_types)
+
+    def configure_connection(self, **kwargs):
+        return self
+
+    def execute_query(self, *args, **kwargs):
+        return _Container()
+
+    def Container(self):
+        return _MockContainer()
+
+    def Record(self, *args, **kwargs):
+        return _Record(*args, **kwargs)
+
+
+def test_sqlparse_insert():
+    sql = 'INSERT INTO "django_migrations" ("app", "name", "applied") VALUES (%(0)s, %(1)s, %(2)s,)'
+    statement = parse(sql)[0]
+    assert isinstance(statement, Statement)
+    assert statement.get_type() == "INSERT"
+
+    tokens = list(statement)
+    assert len(tokens) == 11
+
+
+def test_parse_insert():
+    connection = _MockConnection(cached_record_types=["django_migrations"],
+                                 inserted_ids=[99])
+    values = ['contenttypes',
+              '0001_initial',
+              datetime.datetime(2020, 9, 28, 8, 38, 19, 762282)]
+    sql = 'INSERT INTO "django_migrations" ("app", "name", "applied") VALUES (%(0)s, %(1)s, %(2)s,)'
+    q = Query(connection=connection,
+              sql=sql,
+              params=values)
+
+    assert isinstance(q._query, InsertQuery)
+    insert_query = q._query
+    assert insert_query.left_table == "django_migrations"
+    assert insert_query._cols == ["app", "name", "applied"]
+    assert insert_query._values == [values]
+
+def test_parse_select_where_and():
+    sql = 'SELECT "A"."a" FROM "A" WHERE "A"."a" = %(0)s AND "A"."b" = %(1)s'
+    cached_record_types = []
+    params = ["bla", "blub"]
+    connection = _MockConnection(cached_record_types=cached_record_types)
+    q = Query(connection, sql=sql, params=params)
+
+    select_query = q._query
+    assert isinstance(select_query, SelectQuery)
+    assert select_query._needs_aggregation() is False
+    callback, params = q._query._to_caosdb()
+    assert callback == connection.find
+    assert params == {
+        "filter": {
+            "type": "and",
+            "elements": [
+                {'negation': False, 'o': '=', 'p': 'a',
+                 'type': 'pov', 'v': 'bla'},
+                {'negation': False, 'o': '=', 'p': 'b',
+                 'type': 'pov', 'v': 'blub'}]
+            },
+        "projection": ["a"]
+
+    }
+
+def test_merge_joins_and_where_no_where_one_join():
+    joins = [{
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+    }]
+    where = None
+
+    merged_joins, merged_where = _merge_joins_and_where(joins, where)
+    assert merged_joins == []
+    assert merged_where == {
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+    }
+
+def test_merge_joins_and_where_no_where_two_join():
+    joins = [{
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+    },{
+        "type": "reference",
+        "p": "p2",
+        "v": "rt2",
+        "negation": False,
+        "str": "REFERENCES rt2 AS p2",
+    }]
+    where = None
+
+    merged_joins, merged_where = _merge_joins_and_where(joins, where)
+    assert merged_joins == []
+    assert merged_where == {
+        "type": "and",
+        "elements": [
+            {
+                "type": "reference",
+                "p": "p1",
+                "v": "rt1",
+                "negation": False,
+                "str": "REFERENCES rt1 AS p1",
+            },{
+                "type": "reference",
+                "p": "p2",
+                "v": "rt2",
+                "negation": False,
+                "str": "REFERENCES rt2 AS p2",
+            }
+        ]
+    }
+
+def test_merge_joins_and_where_independent_where():
+    joins = [{
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+    }]
+    where = {
+        "type": "pov",
+        "p": "p2",
+        "o": "=",
+        "v":  "v2",
+        "negation": False,
+    }
+
+    merged_joins, merged_where = _merge_joins_and_where(joins, where)
+    assert merged_joins == []
+    assert merged_where == {
+        "type": "and",
+        "elements": [
+            {
+                "type": "pov",
+                "p": "p2",
+                "o": "=",
+                "v":  "v2",
+                "negation": False,
+            }, {
+                "type": "reference",
+                "p": "p1",
+                "v": "rt1",
+                "negation": False,
+                "str": "REFERENCES rt1 AS p1",
+            }
+        ]
+    }
+
+def test_merge_joins_and_where_backref_sub_query_id():
+    joins = [{
+        "type": "back_reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "IS REFERENCED BY rt1 AS p1",
+    }]
+    where = {
+        "type": "pov",
+        "ref": "rt1",
+        "p": "p2",
+        "o": "=",
+        "v": "935",
+        "negation": False,
+    }
+
+    merged_joins, merged_where = _merge_joins_and_where(joins, where)
+    assert merged_joins == []
+    print(merged_where)
+    assert merged_where == {
+        "type": "back_reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "IS REFERENCED BY rt1 AS p1",
+        "sub": {
+            "type": "pov",
+            "ref": "rt1",
+            "p": "p2",
+            "o": "=",
+            "v": "935",
+            "negation": False,
+        }
+    }
+
+
+
+def test_merge_joins_and_where_ref_sub_query_id():
+    joins = [{
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+    }]
+    where = {
+        "type": "in",
+        "p": "p1",
+        "v":  ["935"],
+    }
+
+    merged_joins, merged_where = _merge_joins_and_where(joins, where)
+    assert merged_joins == []
+    assert merged_where == {
+        "type": "reference",
+        "p": "p1",
+        "v": "rt1",
+        "negation": False,
+        "str": "REFERENCES rt1 AS p1",
+        "sub": {
+            "type": "in",
+            "p": "ID",
+            "v": ["935"],
+        }
+    }
+
+
+def test_parse_select_join():
+    sql = 'SELECT "auth_permission"."content_type_id", "auth_permission"."codename" FROM "auth_permission" INNER JOIN "django_content_type" ON ("auth_permission"."content_type_id" = "django_content_type"."id") WHERE "auth_permission"."content_type_id" IN (%(0)s) ORDER BY "django_content_type"."app_label" ASC, "django_content_type"."model" ASC, "auth_permission"."codename" ASC'
+    connection = _MockConnection(cached_record_types=["auth_permission",
+                                                      "django_content_type"])
+    values = ['935']
+
+    q = Query(connection=connection,
+              sql=sql,
+              params=values)
+    assert isinstance(q._query, SelectQuery)
+    select_query = q._query
+    assert select_query.left_table == "auth_permission"
+
+    assert select_query.where is not None
+    assert select_query.where.to_mongo() == {
+        "type": "in",
+        "p": "content_type_id",
+        "v":  ["935"],
+    }
+
+    assert len(select_query.joins) == 1
+    assert select_query.joins[0].to_mongo() == {
+        "type": "reference",
+        "p": "content_type_id",
+        "v": "django_content_type",
+        "negation": False,
+        "str": "REFERENCES django_content_type AS content_type_id",
+    }
+
+    assert select_query._needs_aggregation() is True
+    pipeline = select_query._make_pipeline()
+    assert "filter" in pipeline
+    ref = pipeline["filter"]
+    assert ref == {
+        "type": "reference",
+        "p": "content_type_id",
+        "v": "django_content_type",
+        "str": "REFERENCES django_content_type AS content_type_id",
+        "negation": False,
+        "sub": {"type": "in",
+                "p": "ID",
+                "v": ["935"]}}
+    assert "sort" in pipeline
+    sort = pipeline["sort"]
+    assert sort == OrderedDict([("django_content_type.app_label", 1),
+                                ("django_content_type.model", 1),
+                                ("codename", 1)])
+
+    assert pipeline["projection"] == ['content_type_id', 'codename']
+
+
+def test_parse_select_limit():
+    sql = 'SELECT (1) AS "a" FROM "django_session" WHERE "django_session"."session_key" = %(0)s LIMIT 1'
+    values = ['qrkt0tcs7a4vub3lw0m4g1wvr3u67dz4']
+    connection = _MockConnection(cached_record_types=["django_session", ])
+    q = Query(connection=connection,
+              sql=sql,
+              params=values)
+    assert isinstance(q._query, SelectQuery)
+    select_query = q._query
+    assert select_query.left_table == "django_session"
+
+    assert select_query.where is not None
+    assert select_query.where.to_mongo() == {
+        "type": "pov",
+        "p": "session_key",
+        "o": "=",
+        "v": values[0],
+        "negation": False,
+    }
+
+    assert len(select_query.joins) == 0
+    assert select_query._needs_aggregation() is True
+    pipeline = select_query._make_pipeline()
+
+
+def test_count_query():
+    sql = 'SELECT COUNT(*) AS "__count" FROM "Model"'
+    connection = _MockConnection(cached_record_types=["Model", ])
+    q = Query(connection=connection,
+              sql=sql,
+              params=[])
+
+    assert isinstance(q._query, SelectQuery)
+    select_query = q._query
+    assert select_query.left_table == "Model"
+
+    assert select_query.where is None
+    assert len(select_query.joins) == 0
+
+    assert select_query._needs_aggregation() is True
+    assert select_query._needs_column_selection()
+    select_query.selected_columns.__class__ = AggColumnSelectConverter
+    assert select_query.selected_columns.to_mongo() == {"count": "__count"}
+    pipeline = select_query._make_pipeline()
+    assert pipeline == {"count": "__count"}
+
+
+def test_multiple_join_conditions():
+    """djaosdb does not support complext ON clauses."""
+
+    sql = """
+    SELECT * FROM A
+    INNER JOIN B
+        ON ("A"."b_id" = "B"."id" AND "A"."b_id2" = "B"."id" )
+    """
+    cached_record_types = []
+    params = []
+    connection = _MockConnection(cached_record_types=cached_record_types)
+
+    with raises(SQLDecodeError):
+        Query(connection=connection, sql=sql, params=params)
+
+
+def test_inner_and_outer_join():
+    sql = """
+    SELECT "A"."id", "A"."p1",
+        "B"."id", "B"."p2",
+        "C"."id", "C"."p3"
+    FROM "A"
+    INNER JOIN "B"
+        ON ("A"."b_id" = "B"."id")
+    LEFT OUTER JOIN "C"
+        ON ("A"."c_id" = "C"."id")
+    WHERE "A"."b_id" = %(0)s
+    ORDER BY "A"."p1" DESC
+    LIMIT 10"""
+    cached_record_types = [
+        "A",
+        "B",
+        "C",
+    ]
+    params = ["195"]
+    connection = _MockConnection(cached_record_types=cached_record_types)
+    q = Query(connection=connection, sql=sql, params=params)
+
+    assert isinstance(q._query, SelectQuery)
+    select_query = q._query
+    assert select_query.left_table == "A"
+
+    assert select_query.where is not None
+    assert select_query.where.to_mongo() == {
+        "type": "pov", 'negation': False,
+        'o': '=', 'p': 'b_id', 'v': '195'}
+
+    assert select_query._needs_aggregation() is True
+
+    assert len(select_query.joins) == 2
+
+    inner_join = select_query.joins[0]
+    left_outer_join = select_query.joins[1]
+
+    assert isinstance(inner_join, InnerJoinConverter)
+    assert isinstance(left_outer_join, OuterJoinConverter)
+
+    assert inner_join.to_mongo() == {
+        "type": "reference",
+        'negation': False, 'p': 'b_id',
+        'str': 'REFERENCES B AS b_id',
+        'v': 'B'}
+
+    assert left_outer_join.to_mongo() == {}
+
+    callback, pipeline = select_query._to_caosdb()
+    assert callback == connection.aggregate
+    assert "joins" not in pipeline
+    assert pipeline["filter"] == {
+        "type": "reference",
+        'negation': False,
+        'p': 'b_id',
+        'str': 'REFERENCES B AS b_id',
+        'v': 'B',
+        "sub": {
+            "type": "pov",
+            'negation': False,
+            'p': 'ID',
+            'o': '=',
+            'v': '195'
+        },
+    }
+
+    assert pipeline["sort"] == OrderedDict([("p1", -1)])
+    assert pipeline["projection"] == ["id", "p1", "B.id", "B.p2", "C.id",
+                                      "C.p3"]
+    # excute
+    select_query._get_cursor()
+
+def test_query_generation_conjuction():
+    connection = _MockConnection()
+    query = connection._delegate._generate_query(
+        record_type = 'auth_permission',
+        sort = OrderedDict([('django_content_type.app_label', 1),
+                            ('django_content_type.model', 1),
+                            ('codename', 1)]),
+        projection = ['id'],
+        filter = {'type': 'and', 'elements': [
+            {'type': 'pov', 'negation': False,
+             'p': 'auth_group_permissions.group_id',
+             'o': '=', 'v': 226
+            }, {'type': 'reference', 'p': 'content_type_id',
+                'v': 'django_content_type', 'negation': False,
+                'str': 'REFERENCES django_content_type AS content_type_id'}]},
+        count=False)
+    assert query == ('SELECT id, django_content_type.app_label, '
+                     'django_content_type.model, codename FROM RECORD '
+                     '"auth_permission" WITH auth_group_permissions.'
+                     'group_id="226" AND REFERENCES django_content_type '
+                     'AS content_type_id')
+
+def test_parse_select_join_with_reverse_on_clause():
+    sql = """
+    SELECT "A"."id", "A"."p1"
+    FROM "A"
+    INNER JOIN "B"
+        ON ("A"."id" = "B"."a_id")
+    WHERE "B"."p2" = %(0)s
+    """
+    cached_record_types = [
+        "A",
+        "B",
+    ]
+    params = ["227"]
+    connection = _MockConnection(cached_record_types=cached_record_types)
+    q = Query(connection=connection, sql=sql, params=params)
+
+    assert isinstance(q._query, SelectQuery)
+    select_query = q._query
+    assert select_query.left_table == "A"
+
+    assert select_query.where is not None
+    assert select_query.where.to_mongo() == {
+        "type": "pov", 'negation': False,
+        'o': '=', 'p': 'p2', 'ref': "B", 'v': '227'}
+
+    assert select_query._needs_aggregation() is True
+
+    assert len(select_query.joins) == 1
+
+    inner_join = select_query.joins[0]
+
+    assert isinstance(inner_join, InnerJoinConverter)
+
+    assert inner_join.to_mongo() == {
+        "type": "back_reference",
+        'negation': False, 'p': 'a_id',
+        'str': 'IS REFERENCED BY B AS a_id',
+        'v': 'B'}
+
+    callback, pipeline = select_query._to_caosdb()
+    assert callback == connection.aggregate
+    assert "joins" not in pipeline
+    print(pipeline["filter"])
+    assert pipeline["filter"] == {
+        "type": "back_reference",
+        'negation': False,
+        'p': 'a_id',
+        'str': 'IS REFERENCED BY B AS a_id',
+        'v': 'B',
+        "sub": {
+            "type": "pov",
+            'negation': False,
+            'ref': "B",
+            'p': 'p2',
+            'o': '=',
+            'v': '227'
+        },
+    }
+
+    assert "sort" not in pipeline
+    assert pipeline["projection"] == ["id", "p1"]
+
+    # excute
+    select_query._get_cursor()
-- 
GitLab