Django源码分析--ORM

这部分标题比较大,按照之前的分析方法肯定会比较复杂且不够系统,所以从另一个角度出发,我们通过对几个关键问题的追溯来帮助我们了解整个Django ORM的设计思想:

  • Django ORM如何做到多数据库支持的;
  • Django ORM中的objects是什么;
  • Filter方法的查询流程;

1、准备工作

  • Python 3.5.2
  • Django 2.1.2
  • PyCharm 2018.2.1 (Professional Edition)
  • 启动项目
1
[min:] ~/Desktop/python/Demo$ python manage.py runserver 0.0.0.0:8000

2、分析流程

现在我们开始根据上述提到的问题进行逐个的分析。

2.1、Django ORM如何做到多数据库支持的

2.1.1、 Django db source tree
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
├── __init__.py
├── backends
│   ├── __init__.py
│   ├── base
│   ├── ddl_references.py
│   ├── dummy
│   ├── mysql
│   ├── oracle
│   ├── postgresql
│   ├── postgresql_psycopg2
│   ├── signals.py
│   ├── sqlite3
│   └── utils.py
├── migrations
│   ├── __init__.py
│   ├── ........
│   └── writer.py
├── models
│   ├── __init__.py
│   ├── ........
│   └── utils.py
├── transaction.py
└── utils.py

按照科学的推断,如果要做到多数据库的支持,一般的结构肯定是有一个Wrapper保证对外的接口一致,然后在这个Wrapper中,负责加载不同的数据库类型,执行相应的方法。而Django db的源码也和我们的猜想差不多,将后端的集中到backends中,在此结构下差异化不同的数据库;

2.1.2、具体的差异化加载流程

以Django服务启动时的数据库连接检查为例:

  1. Django源码分析一:服务启动一文中,我们有分析过Django服务的启动流程,在这个过程中间也包含了对数据库连接的检查,具体路径如下:django.core.management.base.BaseCommand#check_migrations,在这个方法中,引用了django.db.connections:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    from django.db import DEFAULT_DB_ALIAS, connections  # 调用ConnectionHandler的__init__方法
    #.............省略...............
    def check_migrations(self):
    """
    Print a warning if the set of migrations on disk don't match the
    migrations in the database.
    """
    from django.db.migrations.executor import MigrationExecutor
    try:
    # python的魔术方法,调用到ConnectionHandler的__getitem__方法
    executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
    except ImproperlyConfigured:
    # No databases are configured (or the dummy one)
    return
  2. 我们发现connections = ConnectionHandler(),查看ConnectionHandler类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    class ConnectionHandler:
    def __init__(self, databases=None):
    """
    databases is an optional dictionary of database definitions (structured
    like settings.DATABASES).
    """
    self._databases = databases # 调用databases(self)方法
    self._connections = local()

    @cached_property
    def databases(self):
    if self._databases is None:
    self._databases = settings.DATABASES
    if self._databases == {}:
    self._databases = {
    DEFAULT_DB_ALIAS: {
    'ENGINE': 'django.db.backends.dummy',
    },
    }
    if DEFAULT_DB_ALIAS not in self._databases:
    raise ImproperlyConfigured("You must define a '%s' database." % DEFAULT_DB_ALIAS)
    if self._databases[DEFAULT_DB_ALIAS] == {}:
    self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy'
    return self._databases

    ................

    def __getitem__(self, alias):
    if hasattr(self._connections, alias):
    return getattr(self._connections, alias)

    self.ensure_defaults(alias)
    self.prepare_test_settings(alias)
    db = self.databases[alias]
    backend = load_backend(db['ENGINE']) # 重要!!根据ENGINE的类型决定使用哪一种数据库
    conn = backend.DatabaseWrapper(db, alias)
    setattr(self._connections, alias, conn)
    return conn

    从上面代码注释可以了解到在__init__方法中通过调用databases完成对_databases属性的赋值,将settings中的DATABASES赋值给这个变量;之后在check_migrations方法中调用了ConnectionHandler的__getitem__方法;

  3. django.db.utils.load_backend

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    def load_backend(backend_name):
    """
    Return a database backend's "base" module given a fully qualified database
    backend name, or raise an error if it doesn't exist.
    """
    # This backend was renamed in Django 1.9.
    if backend_name == 'django.db.backends.postgresql_psycopg2':
    backend_name = 'django.db.backends.postgresql'

    try:
    return import_module('%s.base' % backend_name)
    except ImportError as e_user:
    #.............省略...............
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 示例settings.DATABASES
    DATABASES = {
    'default': {
    'ENGINE': 'django.db.backends.mysql',
    'NAME': config.DATABASES_NAME,
    'USER': config.DATABASES_USER,
    'PASSWORD': config.DATABASES_PASSWORD,
    'HOST': config.DATABASES_HOST,
    'PORT': config.DATABASES_PORT,
    }
    }

    在这个方法中,根据settings.DATABASES的ENGINE值,完成对不同类型数据库的加载;每个类型的数据库拥有一个DatabaseWrapper作为其代理,作为后续操作的具体对象。

2.2 objects的作用

在分析ORM的filter之前,我们无法绕开objects这个方法,因为我们发现貌似所有的数据库操作都是基于objects,比如最常见的:

1
ret = models.Book.objects.filter(title="Django");

那么这个objects究竟是什么,对整个数据库操作有着怎样的作用呢?

2.2.1 django.db.models.base.ModelBase
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def __new__(cls, name, bases, attrs, **kwargs):
#.............省略...............
new_class = super_new(cls, name, bases, new_attrs, **kwargs)
#.............省略...............
new_class._prepare() # 调用_prepare方法

def _prepare(cls):
#.............省略...............
if not opts.managers:
if any(f.name == 'objects' for f in opts.fields):
raise ValueError(
"Model %s must specify a custom Manager, because it has a "
"field named 'objects'." % cls.__name__
)
manager = Manager()
manager.auto_created = True
cls.add_to_class('objects', manager) # 完成objects的赋值
#.............省略...............

从使用方式上我们可以看到objects是Model的一个属性,那么这个属性是什么时候赋值给Model的呢?Book继承于Model,Model继承于ModelBase,在ModelBase中有如上两个重要方法(见注释)完成对objects的赋值。但是有一个问题我们需要注意就是赋值操作使用的是add_to_class方法而不是常见病的setter方法,那么这个方法的作用是什么呢?

1
2
3
4
5
6
def add_to_class(cls, name, value):
# We should call the contribute_to_class method only if it's bound
if not inspect.isclass(value) and hasattr(value, 'contribute_to_class'):
value.contribute_to_class(cls, name)
else:
setattr(cls, name, value)

从上面方法中,我们可以看到最后会调用contribute_to_class方法,这个方法属于BaseManager。

2.2.1 django.db.models.manager.BaseManager
1
2
3
4
5
6
7
def contribute_to_class(self, model, name):
self.name = self.name or name
self.model = model

setattr(model, name, ManagerDescriptor(self))

model._meta.add_manager(self)

结合上面的分析,可以看到其实objects最后赋值的对象应该是ManagerDescriptor,这个是什么呢??

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class ManagerDescriptor:

def __init__(self, manager):
self.manager = manager

def __get__(self, instance, cls=None):
if instance is not None:
raise AttributeError("Manager isn't accessible via %s instances" % cls.__name__)

if cls._meta.abstract:
raise AttributeError("Manager isn't available; %s is abstract" % (
cls._meta.object_name,
))

if cls._meta.swapped:
raise AttributeError(
"Manager isn't available; '%s.%s' has been swapped for '%s'" % (
cls._meta.app_label,
cls._meta.object_name,
cls._meta.swapped,
)
)

return cls._meta.managers_map[self.manager.name]

可以看到在使用Book.objects的时候其实正在起作用的还是传入进来的Manager示例,为什么要多此一举呢?

django 规定, 只有 Model 类可以使用 objects, Model 类实例不可以. 请注意区分类和类实例之间的区别.

其实是非常有道理的, Book.objects.filter(id=1) 返回的是 QuerySet 对象, 而 QuerySet 对象可以看成是 Model 实例的集合, 也就是 book_set 是 Model 实例的集合。假使Model 类的实例可以使用 objects 属性, 即从一本书中查询书」这在语意上不通过. 只能是从书的集合(Book)中查询书,所以 django 用 ManagerDescriptor 特意为 Manager 做的一层包装来校验。

2.2.3 django.db.models.manager.Manager
1
2
class Manager(BaseManager.from_queryset(QuerySet)):
pass
1
2
3
4
5
6
7
8
  @classmethod
def from_queryset(cls, queryset_class, class_name=None):
if class_name is None:
class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
return type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})

从这两个方法中,可以看到其实Manager真正继承的应该是QuerySet这个方法,所以后续的filter,get等方法其实都是基于QuerySet的。

2.3 Filter方法的查询流程

这一章节的重点是让大家理解从Object到SQL到转化,了解Django ORM是如果工作的,为了让思路更加聚焦,所以涉及到一些细节的问题,不会再在Code层进行分析,只会提一下,有兴趣的话可以单独深入分析。

2.3.1 django/db/models/query.py

在使用filter的时候,其实调用的是_filter_or_exclude方法

1
2
3
4
5
6
def filter(self, *args, **kwargs):
"""
Return a new QuerySet instance with the args ANDed to the existing
set.
"""
return self._filter_or_exclude(False, *args, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
def _filter_or_exclude(self, negate, *args, **kwargs):
if args or kwargs:
assert self.query.can_filter(), \
"Cannot filter a query once a slice has been taken."

clone = self._chain() # 得到一个QuerySet的对象
if negate:
clone.query.add_q(~Q(*args, **kwargs))
else:
# filter方法,通过add_q 方法,传入Q class
clone.query.add_q(Q(*args, **kwargs))
return clone
2.3.2 django/db/models/sql/query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def add_q(self, q_object):
"""
A preprocessor for the internal _add_q(). Responsible for doing final
join promotion.
"""
# For join promotion this case is doing an AND for the added q_object
# and existing conditions. So, any existing inner join forces the join
# type to remain inner. Existing outer joins can however be demoted.
# (Consider case where rel_a is LOUTER and rel_a__col=1 is added - if
# rel_a doesn't produce any rows, then the whole condition must fail.
# So, demotion is OK.
existing_inner = {a for a in self.alias_map if self.alias_map[a].join_type == INNER}

clause, _ = self._add_q(q_object, self.used_aliases)
if clause:
self.where.add(clause, AND)
self.demote_joins(existing_inner)

添加当前的Q对象到已存在的filter中,然后将返回的where对象插入到当前类的where中,且用and连接表示;另外在这个方法中同时处理了Django ORM中的通过__符号来连接外健的操作。

一般来讲,where语句写完,就应该进行查询操作,但是在重新顺着流程捋了一遍,都没有找到在什么地方有执行sql的操作,最后发现是因为Django的惰性查询关系,做完这些操作之后,并不会马上执行sql,而是等待需要用的Queryset的__iter__的时候,才去真正的根据QuerySet 已经设置好的各种查询条件,去编译sql语句,执行并返回结果.

以如下语句为例:

1
2
ret = models.Book.objects.filter(title="Django");
books = list(ret) # 或者 book01 = ret[1]

在执行完filter方法之后,使用断点或者日志打印的方式发现ret其实是一个django.db.models.query.QuerySet对象,然后使用如上两种方式才可以打印出具体的Book信息,所以接下来我们就需要看一下QuerySet的__iter__方法。

2.3.3 django.db.models.query.ModelIterable
1
2
3
4
5
6
7
def __getitem__(self, k):
"""Retrieve an item or slice from the set of results."""
#.............省略...............
qs = self._chain()
qs.query.set_limits(k, k + 1)
qs._fetch_all() # 重要!!!
return qs._result_cache[0]
1
2
3
4
5
6
def _fetch_all(self):
if self._result_cache is None:
# 调用的ModelIterable的__iter__方法
self._result_cache = list(self._iterable_class(self))
if self._prefetch_related_lookups and not self._prefetch_done:
self._prefetch_related_objects()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ModelIterable(BaseIterable):
"""Iterable that yields a model instance for each row."""

def __iter__(self):
queryset = self.queryset
db = queryset.db
#获取sql编译器,准备编译sql语句
compiler = queryset.query.get_compiler(using=db)
# Execute the query. This will also fill compiler.select, klass_info,
# and annotations.
# 真正执行Sql取回结果!!!
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
select, klass_info, annotation_col_map = (compiler.select, compiler.klass_info,
compiler.annotation_col_map)
#.............省略...............
for row in compiler.results_iter(results):
obj = model_cls.from_db(db, init_list, row[model_fields_start:model_fields_end])
#.............省略...............
yield obj
2.3.4 django/db/models/sql/compiler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):

# 对数据库运行查询并返回结果(s),结果是一个的话直接返回,多个结果集就迭代
result_type = result_type or NO_RESULTS
try:
# 获取sql语句
sql, params = self.as_sql()
if not sql:
raise EmptyResultSet
except EmptyResultSet:
if result_type == MULTI:
return iter([])
else:
return
# 游标的获取,是直接获取还是分块游标(可能需要多个数据库的时候操作)
if chunked_fetch:
cursor = self.connection.chunked_cursor()
else:
cursor = self.connection.cursor()
try:
# 执行sql语句
cursor.execute(sql, params)
except Exception:
# Might fail for server-side cursors (e.g. connection closed)
cursor.close()
raise
# 返回游标还是一行数据
if result_type == CURSOR:
# Give the caller the cursor to process and close.
return cursor
if result_type == SINGLE:
try:
val = cursor.fetchone()
if val:
return val[0:self.col_count]
return val
finally:
# done with the cursor
cursor.close()
if result_type == NO_RESULTS:
cursor.close()
return
# 返回多行数据
result = cursor_iter(
cursor, self.connection.features.empty_fetchmany_value,
self.col_count if self.has_extra_select else None,
chunk_size,
)
if not chunked_fetch and not self.connection.features.can_use_chunked_reads:
try:
# If we are using non-chunked reads, we return the same data
# structure as normally, but ensure it is all read into memory
# before going any further. Use chunked_fetch if requested.
return list(result)
finally:
# done with the cursor
cursor.close()
return result

此方法中通过self.as_sql()拿到sql语句跟参数,获取cursor游标,执行sql并得到结果,然后根据传入的result_type来从游标中返回正确的结果集;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def as_sql(self, with_limits=True, with_col_aliases=False):
"""
Create the SQL for this query. Return the SQL string and list of
parameters.

If 'with_limits' is False, any limit/offset information is not included
in the query.
"""
refcounts_before = self.query.alias_refcount.copy()
try:
extra_select, order_by, group_by = self.pre_sql_setup()
for_update_part = None
# Is a LIMIT/OFFSET clause needed?
with_limit_offset = with_limits and (self.query.high_mark is not None or self.query.low_mark)
combinator = self.query.combinator
features = self.connection.features
if combinator:
if not getattr(features, 'supports_select_{}'.format(combinator)):
raise NotSupportedError('{} is not supported on this database backend.'.format(combinator))
result, params = self.get_combinator_sql(combinator, self.query.combinator_all)
else:
distinct_fields, distinct_params = self.get_distinct()
# This must come after 'select', 'ordering', and 'distinct'
# (see docstring of get_from_clause() for details).
from_, f_params = self.get_from_clause()
where, w_params = self.compile(self.where) if self.where is not None else ("", [])
having, h_params = self.compile(self.having) if self.having is not None else ("", [])
result = ['SELECT']
params = []

if self.query.distinct:
distinct_result, distinct_params = self.connection.ops.distinct_sql(
distinct_fields,
distinct_params,
)
result += distinct_result
params += distinct_params

out_cols = []
col_idx = 1
for _, (s_sql, s_params), alias in self.select + extra_select:
if alias:
s_sql = '%s AS %s' % (s_sql, self.connection.ops.quote_name(alias))
elif with_col_aliases:
s_sql = '%s AS %s' % (s_sql, 'Col%d' % col_idx)
col_idx += 1
params.extend(s_params)
out_cols.append(s_sql)

result += [', '.join(out_cols), 'FROM', *from_]
params.extend(f_params)

#.............省略...............

if where:
result.append('WHERE %s' % where)
params.extend(w_params)

#.............省略...............
# 拼接成sql语句
return ' '.join(result), tuple(params)
finally:
# Finally do cleanup - get rid of the joins we created above.
self.query.reset_refcounts(refcounts_before)

如果我们忽略掉这过程中的许多细节如:怎么获取select,where,order_by等sql部分,怎么对上面各部分各个连接啊,参数等合法检验等部分,就会发现,其实as_sql的实现方式不外乎就是: 用list一次存储各个部分,然后””.join方式连接这个list成一个字符串,当然,各部分包括(但不限于):

  • select部分
  • distinct
  • where表达式
  • group表达式
  • having表达式
  • 是否加入limit or offset

3、参考

  1. 深入学习Django源码基础9 - 简单分析DjangoORM部分
  2. Django 源码学习(5)—— db.models模块
  3. Django ORM源码阅读)