import copy
import logging
import traceback
from django.db import models, transaction, router
from django.db.models.deletion import Collector
from django.db.models import sql
from django.db.models.sql.constants import CURSOR
from jcdp.settings import MULTI_WRITE_DB, DATABASES
multi_write_db = MULTI_WRITE_DB
# 重写QuerySet
class BaseQuerySet(models.QuerySet):
def create(self, **kwargs):
return super().create(**kwargs)
def update(self, **kwargs):
try:
rows = super().update(**kwargs)
if multi_write_db in DATABASES:
self._for_write = True
query = self.query.chain(sql.UpdateQuery)
query.add_update_values(kwargs)
with transaction.mark_for_rollback_on_error(using=multi_write_db):
query.get_compiler(multi_write_db).execute_sql(CURSOR)
except Exception:
logging.error(traceback.format_exc())
raise
return rows
def delete(self):
try:
deleted, _rows_count = super().delete()
if multi_write_db in DATABASES:
del_query = self._chain()
del_query._for_write = True
del_query.query.select_for_update = False
del_query.query.select_related = False
collector = Collector(using=multi_write_db)
collector.collect(del_query)
collector.delete()
except Exception:
logging.error(traceback.format_exc())
raise
return deleted, _rows_count
def raw(self, raw_query, params=None, translations=None, using=None):
try:
qs = super().raw(raw_query, params=params, translations=translations, using=using)
if multi_write_db in DATABASES:
super().raw(raw_query, params=params, translations=translations, using=multi_write_db)
except Exception:
logging.error(traceback.format_exc())
raise
return qs
def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):
try:
for obj in objs:
obj.save()
except Exception:
logging.error(traceback.format_exc())
raise
# objs = super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)
# if multi_write_db in DATABASES:
# self._db = multi_write_db
# super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)
return objs
def bulk_update(self, objs, fields, batch_size=None):
try:
super().bulk_update(objs, fields, batch_size=batch_size)
if multi_write_db in DATABASES:
self._db = multi_write_db
super().bulk_update(objs, fields, batch_size=batch_size)
except Exception:
logging.error(traceback.format_exc())
raise
class BaseManager(models.Manager):
_queryset_class = BaseQuerySet
class BaseModel(models.Model):
objects = BaseManager()
class Meta:
abstract = True
def delete(
self, using=None, *args, **kwargs
):
try:
instance = copy.deepcopy(self)
super().delete(using=using, *args, **kwargs)
if multi_write_db in DATABASES:
super(BaseModel, instance).delete(using=multi_write_db, *args, **kwargs)
except Exception:
logging.error(traceback.format_exc())
raise
def save_base(self, raw=False, force_insert=False,
force_update=False, using=None, update_fields=None):
try:
using = using or router.db_for_write(self.__class__, instance=self)
assert not (force_insert and (force_update or update_fields))
assert update_fields is None or update_fields
cls = self.__class__
# Skip proxies, but keep the origin as the proxy model.
if cls._meta.proxy:
cls = cls._meta.concrete_model
meta = cls._meta
# A transaction isn't needed if one query is issued.
if meta.parents:
context_manager = transaction.atomic(using=using, savepoint=False)
else:
context_manager = transaction.mark_for_rollback_on_error(using=using)
with context_manager:
parent_inserted = False
if not raw:
parent_inserted = self._save_parents(cls, using, update_fields)
self._save_table(
raw, cls, force_insert or parent_inserted,
force_update, using, update_fields,
)
if multi_write_db in DATABASES:
super().save_base(raw=raw,
force_insert=raw,
force_update=force_update,
using=multi_write_db,
update_fields=update_fields)
# Store the database on which the object was saved
self._state.db = using
# Once saved, this is no longer a to-be-added instance.
self._state.adding = False
except Exception:
logging.error(traceback.format_exc())
raise
评论