Select Git revision
test_converters.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_new.py 28.07 KiB
import datetime
from pytest import raises
from collections import OrderedDict
from caosdb import Container as _Container, Record as _Record
from sqlparse import parse
from sqlparse.sql import Statement
from djaosdb.base import DatabaseWrapper
from djaosdb.caosdb_client import FindResult, CaosDBClient
from djaosdb.exceptions import SQLDecodeError
from djaosdb.sql2mongo.query import (Query, InsertQuery, SelectQuery,
_merge_joins_and_where)
from djaosdb.sql2mongo.converters import (AggColumnSelectConverter,
InnerJoinConverter,
OuterJoinConverter)
from djaosdb.models import ListOfReferencesField
class _MockContainer(_Container):
def insert(self, *args, **kwargs):
pass
class _MockConnection(CaosDBClient):
def __init__(self, cached_record_types=None, inserted_ids=None):
self.inserted_ids = inserted_ids
super().__init__(caosdb=self)
if cached_record_types is not None:
self.cached_record_types.update(cached_record_types)
def configure_connection(self, **kwargs):
return self
def execute_query(self, *args, **kwargs):
return _Container()
def Container(self):
return _MockContainer()
def Record(self, *args, **kwargs):
return _Record(*args, **kwargs)
def test_sqlparse_insert():
sql = 'INSERT INTO "django_migrations" ("app", "name", "applied") VALUES (%(0)s, %(1)s, %(2)s,)'
statement = parse(sql)[0]
assert isinstance(statement, Statement)
assert statement.get_type() == "INSERT"
tokens = list(statement)
assert len(tokens) == 11
def test_parse_insert():
connection = _MockConnection(cached_record_types=["django_migrations"],
inserted_ids=[99])
values = ['contenttypes',
'0001_initial',
datetime.datetime(2020, 9, 28, 8, 38, 19, 762282)]
sql = 'INSERT INTO "django_migrations" ("app", "name", "applied") VALUES (%(0)s, %(1)s, %(2)s,)'
q = Query(connection=connection,
sql=sql,
params=values)
assert isinstance(q._query, InsertQuery)
insert_query = q._query
assert insert_query.left_table == "django_migrations"
assert insert_query._cols == ["app", "name", "applied"]
assert insert_query._values == [values]
def test_parse_select_where_and():
sql = 'SELECT "A"."a" FROM "A" WHERE "A"."a" = %(0)s AND "A"."b" = %(1)s'
cached_record_types = []
params = ["bla", "blub"]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection, sql=sql, params=params)
select_query = q._query
assert isinstance(select_query, SelectQuery)
assert select_query._needs_aggregation() is False
callback, _, params = q._query._to_caosdb()
assert callback == connection.find
assert params == {
"filter": {
"type": "and",
"elements": [
{'negation': False, 'o': '=', 'p': 'a',
'type': 'pov', 'v': 'bla'},
{'negation': False, 'o': '=', 'p': 'b',
'type': 'pov', 'v': 'blub'}]
},
"projection": ["a"]
}
def test_merge_joins_and_where_no_where_one_join():
joins = [{
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
}]
where = None
merged_joins, merged_where = _merge_joins_and_where(joins, where)
assert merged_joins == []
assert merged_where == {
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
}
def test_merge_joins_and_where_no_where_two_join():
joins = [{
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
},{
"type": "reference",
"p": "p2",
"v": "rt2",
"negation": False,
}]
where = None
merged_joins, merged_where = _merge_joins_and_where(joins, where)
assert merged_joins == []
assert merged_where == {
"type": "and",
"elements": [
{
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
},{
"type": "reference",
"p": "p2",
"v": "rt2",
"negation": False,
}
]
}
def test_merge_joins_and_where_independent_where():
joins = [{
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
}]
where = {
"type": "pov",
"p": "p2",
"o": "=",
"v": "v2",
"negation": False,
}
merged_joins, merged_where = _merge_joins_and_where(joins, where)
assert merged_joins == []
assert merged_where == {
"type": "and",
"elements": [
{
"type": "pov",
"p": "p2",
"o": "=",
"v": "v2",
"negation": False,
}, {
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
}
]
}
def test_merge_joins_and_where_backref_sub_query_id():
joins = [{
"type": "back_reference",
"p": "p1",
"v": "rt1",
"negation": False,
}]
where = {
"type": "pov",
"ref": "rt1",
"p": "p2",
"o": "=",
"v": "935",
"negation": False,
}
merged_joins, merged_where = _merge_joins_and_where(joins, where)
assert merged_joins == []
assert merged_where == {
"type": "back_reference",
"p": "p1",
"v": "rt1",
"negation": False,
"sub": {
"type": "pov",
"ref": "rt1",
"p": "p2",
"o": "=",
"v": "935",
"negation": False,
}
}
def test_merge_joins_and_where_ref_sub_query_id():
joins = [{
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
}]
where = {
"type": "in",
"p": "p1",
"v": ["935"],
}
merged_joins, merged_where = _merge_joins_and_where(joins, where)
assert merged_joins == []
assert merged_where == {
"type": "reference",
"p": "p1",
"v": "rt1",
"negation": False,
"sub": {
"type": "in",
"p": "ID",
"v": ["935"],
}
}
def test_parse_select_join():
sql = 'SELECT "auth_permission"."content_type_id", "auth_permission"."codename" FROM "auth_permission" INNER JOIN "django_content_type" ON ("auth_permission"."content_type_id" = "django_content_type"."id") WHERE "auth_permission"."content_type_id" IN (%(0)s) ORDER BY "django_content_type"."app_label" ASC, "django_content_type"."model" ASC, "auth_permission"."codename" ASC'
connection = _MockConnection(cached_record_types=["auth_permission",
"django_content_type"])
values = ['935']
q = Query(connection=connection,
sql=sql,
params=values)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "auth_permission"
assert select_query.where is not None
assert select_query.where.to_mongo() == {
"type": "in",
"p": "content_type_id",
"v": ["935"],
}
assert len(select_query.joins) == 1
assert select_query.joins[0].to_mongo() == {
"type": "reference",
"p": "content_type_id",
"v": "django_content_type",
"negation": False,
}
assert select_query._needs_aggregation() is True
pipeline = select_query._make_pipeline()
assert "filter" in pipeline
ref = pipeline["filter"]
assert ref == {
"type": "reference",
"p": "content_type_id",
"v": "django_content_type",
"negation": False,
"sub": {"type": "in",
"p": "ID",
"v": ["935"]}}
assert "sort" in pipeline
sort = pipeline["sort"]
assert sort == OrderedDict([("django_content_type.app_label", 1),
("django_content_type.model", 1),
("codename", 1)])
assert pipeline["projection"] == ['content_type_id', 'codename']
def test_parse_select_limit():
sql = 'SELECT (1) AS "a" FROM "django_session" WHERE "django_session"."session_key" = %(0)s LIMIT 1'
values = ['qrkt0tcs7a4vub3lw0m4g1wvr3u67dz4']
connection = _MockConnection(cached_record_types=["django_session", ])
q = Query(connection=connection,
sql=sql,
params=values)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "django_session"
assert select_query.where is not None
assert select_query.where.to_mongo() == {
"type": "pov",
"p": "session_key",
"o": "=",
"v": values[0],
"negation": False,
}
assert len(select_query.joins) == 0
assert select_query._needs_aggregation() is True
pipeline = select_query._make_pipeline()
def test_count_query():
sql = 'SELECT COUNT(*) AS "__count" FROM "Model"'
connection = _MockConnection(cached_record_types=["Model", ])
q = Query(connection=connection,
sql=sql,
params=[])
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "Model"
assert select_query.where is None
assert len(select_query.joins) == 0
assert select_query._needs_aggregation() is True
assert select_query._needs_column_selection()
select_query.selected_columns.__class__ = AggColumnSelectConverter
assert select_query.selected_columns.to_mongo() == {"count": "__count"}
pipeline = select_query._make_pipeline()
assert pipeline == {"count": "__count"}
def test_multiple_join_conditions():
"""djaosdb does not support complext ON clauses."""
sql = """
SELECT * FROM A
INNER JOIN B
ON ("A"."b_id" = "B"."id" AND "A"."b_id2" = "B"."id" )
"""
cached_record_types = []
params = []
connection = _MockConnection(cached_record_types=cached_record_types)
with raises(SQLDecodeError):
Query(connection=connection, sql=sql, params=params)
def test_inner_and_outer_join():
sql = """
SELECT "A"."id", "A"."p1",
"B"."id", "B"."p2",
"C"."id", "C"."p3"
FROM "A"
INNER JOIN "B"
ON ("A"."b_id" = "B"."id")
LEFT OUTER JOIN "C"
ON ("A"."c_id" = "C"."id")
WHERE "A"."b_id" = %(0)s
ORDER BY "A"."p1" DESC
LIMIT 10"""
cached_record_types = [
"A",
"B",
"C",
]
params = ["195"]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "A"
assert select_query.where is not None
assert select_query.where.to_mongo() == {
"type": "pov", 'negation': False,
'o': '=', 'p': 'b_id', 'v': '195'}
assert select_query._needs_aggregation() is True
assert len(select_query.joins) == 2
inner_join = select_query.joins[0]
left_outer_join = select_query.joins[1]
assert isinstance(inner_join, InnerJoinConverter)
assert isinstance(left_outer_join, OuterJoinConverter)
assert inner_join.to_mongo() == {
"type": "reference",
'negation': False, 'p': 'b_id',
'v': 'B'}
assert left_outer_join.to_mongo() == {}
callback, _, pipeline = select_query._to_caosdb()
assert callback == connection.aggregate
assert "joins" not in pipeline
assert pipeline["filter"] == {
"type": "reference",
'negation': False,
'p': 'b_id',
'v': 'B',
"sub": {
"type": "pov",
'negation': False,
'p': 'ID',
'o': '=',
'v': '195'
},
}
assert pipeline["sort"] == OrderedDict([("p1", -1)])
assert pipeline["projection"] == ["id", "p1", "B.id", "B.p2", "C.id",
"C.p3"]
# excute
select_query._get_cursor()
def test_query_generation_conjuction():
connection = _MockConnection()
query = connection._delegate._generate_query(
record_type = 'auth_permission',
sort = OrderedDict([('django_content_type.app_label', 1),
('django_content_type.model', 1),
('codename', 1)]),
projection = ['id'],
filter = {'type': 'and', 'elements': [
{'type': 'pov', 'negation': False,
'p': 'auth_group_permissions.group_id',
'o': '=', 'v': 226
}, {'type': 'reference', 'p': 'content_type_id',
'v': 'django_content_type', 'negation': False, }]},
count=False)
assert query == ('SELECT id, django_content_type.app_label, '
'django_content_type.model, codename FROM RECORD '
'"auth_permission" WITH ( auth_group_permissions.'
'group_id="226" AND ( REFERENCES django_content_type ) )')
def test_parse_select_join_with_reverse_on_clause():
sql = """
SELECT "A"."id", "A"."p1"
FROM "A"
INNER JOIN "B"
ON ("A"."id" = "B"."a")
WHERE "B"."p2" = %(0)s
"""
cached_record_types = [
"A",
"B",
]
params = ["227"]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "A"
assert select_query.where is not None
assert select_query.where.to_mongo() == {
"type": "pov", 'negation': False,
'o': '=', 'p': 'p2', 'ref': "B", 'v': '227'}
assert select_query._needs_aggregation() is True
assert len(select_query.joins) == 1
inner_join = select_query.joins[0]
assert isinstance(inner_join, InnerJoinConverter)
assert inner_join.to_mongo() == {
"type": "back_reference",
'negation': False, 'p': 'a',
'v': 'B'}
callback, _, pipeline = select_query._to_caosdb()
assert callback == connection.aggregate
assert "joins" not in pipeline
assert pipeline["filter"] == {
"type": "back_reference",
'negation': False,
'p': 'a',
'v': 'B',
"sub": {
"type": "pov",
'negation': False,
'ref': "B",
'p': 'p2',
'o': '=',
'v': '227'
},
}
assert "sort" not in pipeline
assert pipeline["projection"] == ["id", "p1"]
# excute
select_query._get_cursor()
def test_inner_join_with_bad_names():
sql = """SELECT "A"."id", "A"."p1", "A"."p2" FROM
"A" INNER JOIN "B" ON ("A"."id" =
"B"."language_id") WHERE
"B"."material_id" = %(0)s"""
cached_record_types = [
"A",
"B",
]
params = ["227"]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.left_table == "A"
assert select_query.where is not None
assert select_query.where.to_mongo() == {
"type": "pov", 'negation': False,
'o': '=', 'p': 'material_id', 'ref': "B", 'v': '227'}
assert select_query._needs_aggregation() is True
assert len(select_query.joins) == 1
inner_join = select_query.joins[0]
assert isinstance(inner_join, InnerJoinConverter)
assert inner_join.to_mongo() == {
"type": "back_reference",
'negation': False, 'p': 'language_id',
'v': 'B'}
callback, _, pipeline = select_query._to_caosdb()
assert callback == connection.aggregate
assert "joins" not in pipeline
assert pipeline["filter"] == {
"type": "back_reference",
'negation': False,
'p': 'language_id',
'v': 'B',
"sub": {
"type": "pov",
'negation': False,
'ref': "B",
'p': 'material_id',
'o': '=',
'v': '227'
},
}
assert "sort" not in pipeline
assert pipeline["projection"] == ["id", "p1", "p2"]
# excute
select_query._get_cursor()
def test_count_subquery():
sql = ('SELECT COUNT(*) FROM (SELECT DISTINCT "A"."id" AS Col1, '
'"A"."name" AS Col2 FROM "A" WHERE (("A"."Source" = %s OR '
'"A"."Source" = %s) AND "A"."language" = %s)) subquery')
params = ('147', '140', '209')
cached_record_types = [
"A",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
select_query = q._query
assert select_query.subselect is not None
assert select_query.left_table == "subquery"
subselect = select_query.subselect
assert isinstance(subselect, SelectQuery)
callback, _, pipeline = select_query._to_caosdb()
assert callback == connection.aggregate
assert "subselect" in pipeline
assert "count" in pipeline
assert pipeline["count"] is None
callback, _, pipeline = pipeline["subselect"]
assert callback == connection.aggregate
assert "distinct" in pipeline
assert "filter" in pipeline
distict = pipeline.pop("distinct")
filters = pipeline.pop("filter")
assert pipeline == {}
assert filters == {
'type': 'and',
'elements': [
{'type': 'or',
'elements': [
{'type': 'pov',
'negation': False,
'p': 'Source',
'o': '=',
'v': '147'
},
{'type': 'pov',
'negation': False,
'p': 'Source',
'o': '=',
'v': '140'
}
]
},
{'type': 'pov',
'negation': False,
'p': 'language',
'o': '=', 'v': '209'
},
]}
def test_find_in():
sql = ('SELECT "Source"."id", "Source"."name", "Source"."description" '
'FROM "Source" WHERE "Source"."id" IN (%(0)s, %(1)s)')
params = ('147', '140')
cached_record_types = [
"A",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
assert q._query._to_caosdb()[2] == {
'filter': {'p': 'id',
'type': 'in',
'v': ['147', '140']},
'projection': ['id', 'name', 'description']}
def test_remove_self_join():
sql = """
SELECT "A"."col1" AS Col1,
"A"."col2" AS Col2
FROM "A"
INNER JOIN "A" T3
ON ("A"."id" = T3."id")
WHERE (
(
"A"."col3" = %(0)s
OR "A"."col3" = %(1)s
)
AND T3."col4" = %(2)s
)"""
params = ('145', '140', '209')
cached_record_types = [
"A",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
assert isinstance(q._query, SelectQuery)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["projection"] == ["col1", "col2"]
# self-join has been removed
assert caosdb_params["filter"] == {
'type': 'and',
'elements': [
{'type': 'or',
'elements': [
{'type': 'pov',
'negation': False,
'p': 'col3',
'o': '=',
'v': '145'},
{'type': 'pov',
'negation': False,
'p': 'col3',
'o': '=',
'v': '140'}]},
{'type': 'pov',
'negation': False,
'p': 'col4',
'o': '=',
'v': '209'}]}
def test_user():
params = ('admin',)
sql = ('SELECT "caosdb_auth_user"."id", "caosdb_auth_user"."password", '
'"caosdb_auth_user"."last_login", "caosdb_auth_user"."is_superuser", '
'"caosdb_auth_user"."username", "caosdb_auth_user"."first_name", '
'"caosdb_auth_user"."last_name", "caosdb_auth_user"."email", '
'"caosdb_auth_user"."is_staff", "caosdb_auth_user"."is_active", '
'"caosdb_auth_user"."date_joined" FROM "caosdb_auth_user" WHERE '
'("caosdb_auth_user"."is_active" AND "caosdb_auth_user"."is_staff" AND '
'"caosdb_auth_user"."is_superuser" AND "caosdb_auth_user"."username" = %(0)s) '
'LIMIT 21')
cached_record_types = [
"caosdb_auth_user",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["filter"] == {
'type': 'and',
'elements': [{
'type': 'pov',
'negation': False,
'p': 'is_active',
'o': "=",
'v': "TRUE",
},{
'type': 'pov',
'negation': False,
'p': 'is_staff',
'o': "=",
'v': "TRUE",
}, {
'type': 'pov',
'negation': False,
'p': 'is_superuser',
'o': "=",
'v': "TRUE",
}, {'type': 'pov',
'p': 'username',
'o': '=',
'v': 'admin',
'negation': False}]}
def test_join_where():
params = ('973',)
sql = ('SELECT "A"."p1", "A"."p2", "A"."p3" FROM '
'"A" , "B" WHERE (("B"."A_id"="A"."id") AND '
'"A"."id" IN (%s))')
params = ('973',)
cached_record_types = [
"A", "B",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["projection"] == ["p1", "p2", "p3"]
print(caosdb_params["filter"]["elements"])
assert caosdb_params["filter"] == {
"type": "and",
"elements": [{
'type': 'back_reference',
'negation': False,
'p': 'A_id',
'v': 'B',
'ref': 'B'
}, {
'type': 'in',
'p': 'id',
'v': ['973']
}]
}
def test_select_count_group_by():
sql = """
SELECT "Language"."id", "Language"."name", "Language"."description",
COUNT("Material"."id") AS "used"
FROM "Language"
LEFT OUTER JOIN "Material"
ON ("Language"."id" = "Material"."language")
WHERE "Language"."id" IN (%(0)s)
GROUP BY "Language"."id", "Language"."name", "Language"."description"
"""
params = ('973',)
assert False, "TODO"
def test_select_count_having():
sql = """
SELECT "Language"."id", "Language"."name", "Language"."description",
COUNT("Material"."id") AS "material__count"
FROM "Language"
LEFT OUTER JOIN "Material"
ON ("Language"."id" = "Material"."language")
WHERE "Language"."id" IN (%(0)s)
GROUP BY "Language"."id", "Language"."name", "Language"."description"
HAVING COUNT("Material"."id") > %(1)s
"""
params = ('973', 0)
assert False, "TODO"
def test_select_count_having_multi_join():
sql = """
SELECT "Language"."id", "Language"."name", "Language"."description",
COUNT("Material"."id") AS "used"
FROM "Language"
LEFT OUTER JOIN "Material"
ON ("Language"."id" = "Material"."language")
INNER JOIN "Material" T4
ON ("Language"."id" = T4."language")
INNER JOIN "Material" T5
ON (T4."id" = T5."id")
WHERE ( T5."State" = %(0)s
AND "Language"."id" IN (%(1)s))
GROUP BY "Language"."id", "Language"."name", "Language"."description"
HAVING COUNT("Material"."id") > %(2)s
"""
params = ('published', '973', 0)
assert False, "TODO"
count_result = "COUNT Material WHICH HAS A state=published AND REFERENCES 973"
result = "SELECT id, name, description, {COUNT_RESULT} FROM Language WITH id = 973"
def test_field_not_null():
sql = """
SELECT "A"."p1", "A"."p2", "A"."p3"
FROM "A"
INNER JOIN "B"
ON ("A"."id" = "B"."a_id")
WHERE ("B"."a_id" IS NOT NULL
AND "A"."id" IN (%(0)s))
"""
params = ('973',)
cached_record_types = [
"A", "B",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["projection"] == ["p1", "p2", "p3"]
assert caosdb_params["filter"] == {
"type": "and",
"elements": [{
"type": "in",
"p": "id",
"v": ["973"],
}, {
"type": "back_reference",
"p": "a_id",
"v": "B",
"negation": False,
}]
}
def test_query():
sql = """
SELECT COUNT(*) AS "__count"
FROM "A"
WHERE ("A"."p1" = %(0)s
AND ("A"."p2" iLIKE %(1)s
OR "A"."p3" iLIKE %(2)s
OR "A"."p4" iLIKE %(3)s))
"""
params = ('v1', 'v2', 'v3', 'v4')
cached_record_types = [
"A",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["count"] == "__count"
assert caosdb_params["filter"] == {
"type": "and",
"elements": [{
"type": "pov",
"negation": False,
"p": "p1",
"o": "=",
"v": "v1",
}, {
"type": "or",
"elements": [{
"type": "pov",
"negation": False,
"p": "p2",
"o": " LIKE ",
"v": "v2",
}, {
"type": "pov",
"negation": False,
"p": "p3",
"o": " LIKE ",
"v": "v3",
}, {
"type": "pov",
"negation": False,
"p": "p4",
"o": " LIKE ",
"v": "v4",
}],
}]}
def test_not_in():
sql = """
SELECT COUNT(*) AS "__count"
FROM "A" WHERE (
"A"."State" = %(0)s
AND NOT ( "A"."p1" iLIKE %(1)s)
AND NOT ("A"."p2" iLIKE %(2)s)
AND ("A"."p1" iLIKE %(3)s
OR "A"."p2" iLIKE %(4)s
)"""
params = ('published', '%term1%', '%term2%', '%term3%', '%term4%')
cached_record_types = [
"A",
]
connection = _MockConnection(cached_record_types=cached_record_types)
q = Query(connection=connection, sql=sql, params=params)
caosdb_params = q._query._to_caosdb()[2]
assert caosdb_params["count"] == "__count"
assert caosdb_params["filter"] == {
'type': 'and',
'elements': [{
'negation': False,
'o': '=',
'p': 'State',
'type': 'pov',
'v': 'published',
}, {
'negation': True,
'o': ' LIKE ',
'p': 'p1',
'type': 'pov',
'v': '*term1*',
}, {
'negation': True,
'o': ' LIKE ',
'p': 'p2',
'type': 'pov',
'v': '*term2*',
}, {
'type': 'or',
'elements': [{
'negation': False,
'o': ' LIKE ',
'p': 'p1',
'type': 'pov',
'v': '*term3*',
}, {
'negation': False,
'o': ' LIKE ',
'p': 'p2',
'type': 'pov',
'v': '*term4*',
}],
}],
}