source

cx_Oracle:각 행을 사전으로 받으려면 어떻게 해야 합니까?

nicesource 2023. 6. 23. 22:17
반응형

cx_Oracle:각 행을 사전으로 받으려면 어떻게 해야 합니까?

기본적으로 cx_Oracle은 각 행을 튜플로 반환합니다.

>>> import cx_Oracle
>>> conn=cx_Oracle.connect('scott/tiger')
>>> curs=conn.cursor()
>>> curs.execute("select * from foo");
>>> curs.fetchone()
(33, 'blue')

각 행을 사전으로 반환하려면 어떻게 해야 합니까?

커서 위치를 재정의할 수 있습니다.rowfactory방법.쿼리를 수행할 때마다 이 작업을 수행해야 합니다.

다음은 표준 쿼리인 튜플의 결과입니다.

curs.execute('select * from foo')
curs.fetchone()
    (33, 'blue')

명명된 튜플 반환:

def makeNamedTupleFactory(cursor):
    columnNames = [d[0].lower() for d in cursor.description]
    import collections
    Row = collections.namedtuple('Row', columnNames)
    return Row

curs.rowfactory = makeNamedTupleFactory(curs)
curs.fetchone()
    Row(x=33, y='blue')

사전 반환:

def makeDictFactory(cursor):
    columnNames = [d[0] for d in cursor.description]
    def createRow(*args):
        return dict(zip(columnNames, args))
    return createRow

curs.rowfactory = makeDictFactory(curs)
curs.fetchone()
    {'Y': 'brown', 'X': 1}

Amoury Forgot d'Arc에 대한 크레딧: http://sourceforge.net/p/cx-oracle/mailman/message/27145597

매우 짧은 버전:

curs.rowfactory = lambda *args: dict(zip([d[0] for d in curs.description], args))

Python 3.7.0 & cx_Oracle 7.1.2에서 테스트됨

오래된 질문이지만 Python 레시피로 유용한 링크를 추가합니다.

에 따르면cx_Oracle설명서:

Cursor.rowfactory

이 읽기-쓰기 특성은 데이터베이스에서 검색되는 각 행에 대해 호출할 메서드를 지정합니다.일반적으로 각 행에 대해 튜플반환되지만 이 특성이 설정된 경우 메서드는 일반적으로 반환되는 튜플과 함께 호출되고 대신 메서드의 결과가 반환됩니다.

cx_Oracle - Python Interface for Oracle Database 또한 많은 유용한 샘플 예제를 위해 GitHub 저장소를 가리킵니다.GenericRowFactory.py 을 확인하십시오.

구글링:이 PPT는 CON6543 PythonOracle Database - RainFocus에서 더욱 유용할 수 있습니다.

요리법

후드 아래 Oracle용 장고 데이터베이스 백엔드에서는 cx_Oracle을 사용합니다.이전 버전(장고 1.11- )에서는 Cx_Oracle의 숫자 데이터 유형을 관련 Python 데이터로 캐스트하고 문자열을 유니코드로 캐스트했습니다.

장고를 설치한 경우 베이스를 확인하십시오.py는 다음과 같습니다.

$ DJANGO_DIR="$(python -c 'import django, os; print(os.path.dirname(django.__file__))')"
$ vim $DJANGO_DIR/db/backends/oracle/base.py

빌릴 수 있습니다_rowfactory()부터$DJANGO_DIR/db/backends/oracle/base.py그리고 데코레이터 아래에 적용할 수 있습니다.naming다시 돌아오게 하기 위해namedtuple단순한 것이 아니라tuple.

mybase.py

import functools
from itertools import izip, imap
from operator import itemgetter
from collections import namedtuple
import cx_Oracle as Database
import decimal

def naming(rename=False, case=None):
    def decorator(rowfactory):
        @functools.wraps(rowfactory)
        def decorated_rowfactory(cursor, row, typename="GenericRow"):
            field_names = imap(case, imap(itemgetter(0), cursor.description))
            return namedtuple(typename, field_names)._make(rowfactory(cursor, row))
        return decorated_rowfactory
    return decorator

다음과 같이 사용:

@naming(rename=False, case=str.lower)
def rowfactory(cursor, row):
   casted = []
   ....
   ....
   return tuple(casted)

oracle.py

import cx_Oracle as Database
from cx_Oracle import *
import mybase

class Cursor(Database.Cursor):

    def execute(self, statement, args=None):
        prepareNested = (statement is not None and self.statement != statement)
        result = super(self.__class__, self).execute(statement, args or [])
        if prepareNested:
            if self.description:
                self.rowfactory = lambda *row: mybase.rowfactory(self, row)
        return result

    def close(self):
        try:
            super(self.__class__, self).close()
        except Database.InterfaceError:
            "already closed"

class Connection(Database.Connection):

    def cursor(self):
        Cursor(self)

connect = Connection

이제 사용자 스크립트에서 cx_oracle 가져오기 대신 다음과 같이 오라클 가져오기를 수행합니다.

user.py

import oracle

dsn = oracle.makedsn('HOSTNAME', 1521, service_name='dev_server')
db = connect('username', 'password', dsn)
cursor = db.cursor()
cursor.execute("""
  SELECT 'Grijesh' as FirstName, 
         'Chauhan' as LastName,
         CAST('10560.254' AS NUMBER(10, 2)) as Salary
  FROM DUAL
""")
row = cursor.fetchone()
print ("First Name is %s" % row.firstname) # => Grijesh
print ("Last Name is %s" % row.lastname) # => Chauhan
print ("Salary is %r" % row.salary) # => Decimal('10560.25')

한번 해보세요!!

@maelcum73의 답변을 기반으로 구축:

curs.rowfactory = lambda *args: dict(zip([d[0] for d in curs.description], args))

이 솔루션의 문제는 실행할 때마다 이를 재설정해야 한다는 것입니다.

한 단계 더 나아가 다음과 같이 커서 개체 주위에 셸을 만들 수 있습니다.

    class dictcur(object):
        # need to monkeypatch the built-in execute function to always return a dict
        def __init__(self, cursor):
            self._original_cursor = cursor

        def execute(self, *args, **kwargs):
            # rowfactory needs to be set AFTER EACH execution!
            self._original_cursor.execute(*args, **kwargs)
            self._original_cursor.rowfactory = lambda *a: dict(
                zip([d[0] for d in self._original_cursor.description], a)
            )
            # cx_Oracle's cursor's execute method returns a cursor object
            # -> return the correct cursor in the monkeypatched version as well!
            return self._original_cursor

        def __getattr__(self, attr):
            # anything other than the execute method: just go straight to the cursor
            return getattr(self._original_cursor, attr)
    dict_cursor = dictcur(cursor=conn.cursor())

사용dict_cursor그 후 매번dict_cursor.execute()호출은 사전을 반환합니다.참고: 실행 메소드를 직접 monkeypatching 해보았지만, 내장 메소드이기 때문에 불가능했습니다.

언급URL : https://stackoverflow.com/questions/35045879/cx-oracle-how-can-i-receive-each-row-as-a-dictionary

반응형