Skip to content
Snippets Groups Projects
Select Git revision
  • 70cba3fd3e4d4e34591b26c4664ec8b828ae9775
  • main default protected
  • f-structured-query-tree
  • f-remove-chown-check
  • f-better-sss-bin-dir
  • dev protected
  • f-remove-dropoffbox
  • f-sss4grpc
  • f-refactor-compose
  • f-real-id
  • f-doip
  • f-filesystem-import
  • henrik-tmp
  • f-filesystem-link
  • f-filesystem-directory
  • f-filesystem-core
  • f-filesystem-cleanup
  • f-string-ids
  • f-filesystem-main
  • f-linkahead-rename-before
  • f-linkahead-rename
  • v0.13.0 protected
  • v0.12.3 protected
  • v0.12.2 protected
  • v0.12.1 protected
  • v0.12.0 protected
  • v0.11.0 protected
  • v0.10.0 protected
  • v0.9.0 protected
  • v0.8.1 protected
  • v0.8.0 protected
  • v0.7.3 protected
  • v0.7.2 protected
  • v0.7.1 protected
  • v0.6.0 protected
  • v0.5.0 protected
  • v0.4.0 protected
  • v0.3.0 protected
  • working_sss protected
  • v0.1 protected
40 results

move_files.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    operations.py 4.07 KiB
    import pytz
    from django.conf import settings
    from django.db.backends.base.operations import BaseDatabaseOperations
    from django.utils import timezone
    import datetime
    import calendar
    
    
    class DatabaseOperations(BaseDatabaseOperations):
    
        def quote_name(self, name):
            if name.startswith('"') and name.endswith('"'):
                return name
            return '"{}"'.format(name)
    
        def adapt_datefield_value(self, value):
            if value is None:
                return None
    
            if isinstance(value, datetime.datetime) and timezone.is_aware(value):
                raise ValueError(
                    "Djongo backend does not support timezone-aware dates.")
    
            x = calendar.timegm(value.timetuple())
            # this is presuming that UTC info is needed; anything after 'replace'
            # can be deleted for 'dt' if not.
            dt = datetime.datetime(1970, 1, 1).replace(
                tzinfo=datetime.timezone.utc)
            if x < 0:
                return dt + datetime.timedelta(seconds=x)
            else:
                return datetime.datetime.utcfromtimestamp(calendar.timegm(
                    value.timetuple()))
    
        def adapt_datetimefield_value(self, value):
            if value is None:
                return None
    
            if isinstance(value, datetime.datetime) and timezone.is_aware(value):
                if settings.USE_TZ:
                    value = timezone.make_naive(value, self.connection.timezone)
                else:
                    raise ValueError(
                        "Djongo backend does not support timezone-aware datetimes when USE_TZ is False.")
            return value
    
        def adapt_timefield_value(self, value):
            if value is None:
                return None
    
            if isinstance(value, str):
                return datetime.datetime.strptime(value, '%H:%M:%S')
    
            if timezone.is_aware(value):
                raise ValueError(
                    "Djongo backend does not support timezone-aware times.")
    
            return datetime.datetime(1900, 1, 1, value.hour, value.minute,
                                     value.second, value.microsecond)
    
        def convert_datefield_value(self, value, expression, connection):
            if isinstance(value, datetime.datetime):
                if settings.USE_TZ:
                    value = timezone.make_aware(value, self.connection.timezone)
                value = value.date()
            return value
    
        def convert_timefield_value(self, value, expression, connection):
            if isinstance(value, datetime.datetime):
                if settings.USE_TZ:
                    value = timezone.make_aware(value, self.connection.timezone)
                value = value.time()
            return value
    
        def convert_datetimefield_value(self, value, expression, connection):
            if isinstance(value, datetime.datetime):
                if settings.USE_TZ:
                    value = timezone.make_aware(value, self.connection.timezone)
            return value
    
        def get_db_converters(self, expression):
            converters = super(DatabaseOperations,
                               self).get_db_converters(expression)
            internal_type = expression.output_field.get_internal_type()
            if internal_type == 'DateField':
                converters.append(self.convert_datefield_value)
            elif internal_type == 'TimeField':
                converters.append(self.convert_timefield_value)
            elif internal_type == 'DateTimeField':
                converters.append(self.convert_datetimefield_value)
            return converters
    
        def sql_flush(self, style, tables, reset_sequences, allow_cascade=False):
            # TODO: Need to implement this fully
            return [f'ALTER TABLE "{table}" FLUSH'
                    for table in tables]
    
        def max_name_length(self):
            return 60
    
        def no_limit_value(self):
            return None
    
        def bulk_insert_sql(self, fields, placeholder_rows):
            return ' '.join(
                'VALUES (%s)' % ', '.join(row)
                for row in placeholder_rows
            )
    
        def date_extract_sql(self, lookup_type, field_name):
            return "EXTRACT('%s' FROM %s)" % (lookup_type, field_name)
    
        def date_trunc_sql(self, lookup_type, field_name):
            return "DATE_TRUNC('%s', %s)" % (lookup_type, field_name)