import copyimport loggingimport tracebackfrom django.db import models, transaction, routerfrom django.db.models.deletion import Collectorfrom django.db.models import sqlfrom django.db.models.sql.constants import CURSORfrom jcdp.settings import MULTI_WRITE_DB, DATABASES
multi_write_db = MULTI_WRITE_DB
# 重写QuerySetclass BaseQuerySet(models.QuerySet):
def create(self, **kwargs): return super().create(**kwargs)
def update(self, **kwargs): try: rows = super().update(**kwargs) if multi_write_db in DATABASES: self._for_write = True query = self.query.chain(sql.UpdateQuery) query.add_update_values(kwargs) with transaction.mark_for_rollback_on_error(using=multi_write_db): query.get_compiler(multi_write_db).execute_sql(CURSOR) except Exception: logging.error(traceback.format_exc()) raise return rows
def delete(self): try: deleted, _rows_count = super().delete() if multi_write_db in DATABASES: del_query = self._chain() del_query._for_write = True del_query.query.select_for_update = False del_query.query.select_related = False collector = Collector(using=multi_write_db) collector.collect(del_query) collector.delete() except Exception: logging.error(traceback.format_exc()) raise return deleted, _rows_count
def raw(self, raw_query, params=None, translations=None, using=None): try: qs = super().raw(raw_query, params=params, translations=translations, using=using) if multi_write_db in DATABASES: super().raw(raw_query, params=params, translations=translations, using=multi_write_db) except Exception: logging.error(traceback.format_exc()) raise return qs
def bulk_create(self, objs, batch_size=None, ignore_conflicts=False): try: for obj in objs: obj.save() except Exception: logging.error(traceback.format_exc()) raise # objs = super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts) # if multi_write_db in DATABASES: # self._db = multi_write_db # super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts) return objs
def bulk_update(self, objs, fields, batch_size=None): try: super().bulk_update(objs, fields, batch_size=batch_size) if multi_write_db in DATABASES: self._db = multi_write_db super().bulk_update(objs, fields, batch_size=batch_size) except Exception: logging.error(traceback.format_exc()) raise
class BaseManager(models.Manager): _queryset_class = BaseQuerySet
class BaseModel(models.Model): objects = BaseManager()
class Meta: abstract = True
def delete( self, using=None, *args, **kwargs ): try: instance = copy.deepcopy(self) super().delete(using=using, *args, **kwargs) if multi_write_db in DATABASES: super(BaseModel, instance).delete(using=multi_write_db, *args, **kwargs) except Exception: logging.error(traceback.format_exc()) raise
def save_base(self, raw=False, force_insert=False, force_update=False, using=None, update_fields=None): try: using = using or router.db_for_write(self.__class__, instance=self) assert not (force_insert and (force_update or update_fields)) assert update_fields is None or update_fields cls = self.__class__ # Skip proxies, but keep the origin as the proxy model. if cls._meta.proxy: cls = cls._meta.concrete_model meta = cls._meta # A transaction isn't needed if one query is issued. if meta.parents: context_manager = transaction.atomic(using=using, savepoint=False) else: context_manager = transaction.mark_for_rollback_on_error(using=using) with context_manager: parent_inserted = False if not raw: parent_inserted = self._save_parents(cls, using, update_fields) self._save_table( raw, cls, force_insert or parent_inserted, force_update, using, update_fields, ) if multi_write_db in DATABASES: super().save_base(raw=raw, force_insert=raw, force_update=force_update, using=multi_write_db, update_fields=update_fields) # Store the database on which the object was saved self._state.db = using # Once saved, this is no longer a to-be-added instance. self._state.adding = False except Exception: logging.error(traceback.format_exc()) raise
评论