cx_Oracle:각 행을 사전으로 받으려면 어떻게 해야 합니까?
기본적으로 cx_Oracle은 각 행을 튜플로 반환합니다.
>>> import cx_Oracle
>>> conn=cx_Oracle.connect('scott/tiger')
>>> curs=conn.cursor()
>>> curs.execute("select * from foo");
>>> curs.fetchone()
(33, 'blue')
각 행을 사전으로 반환하려면 어떻게 해야 합니까?
커서 위치를 재정의할 수 있습니다.rowfactory
방법.쿼리를 수행할 때마다 이 작업을 수행해야 합니다.
다음은 표준 쿼리인 튜플의 결과입니다.
curs.execute('select * from foo')
curs.fetchone()
(33, 'blue')
명명된 튜플 반환:
def makeNamedTupleFactory(cursor):
columnNames = [d[0].lower() for d in cursor.description]
import collections
Row = collections.namedtuple('Row', columnNames)
return Row
curs.rowfactory = makeNamedTupleFactory(curs)
curs.fetchone()
Row(x=33, y='blue')
사전 반환:
def makeDictFactory(cursor):
columnNames = [d[0] for d in cursor.description]
def createRow(*args):
return dict(zip(columnNames, args))
return createRow
curs.rowfactory = makeDictFactory(curs)
curs.fetchone()
{'Y': 'brown', 'X': 1}
Amoury Forgot d'Arc에 대한 크레딧: http://sourceforge.net/p/cx-oracle/mailman/message/27145597
매우 짧은 버전:
curs.rowfactory = lambda *args: dict(zip([d[0] for d in curs.description], args))
Python 3.7.0 & cx_Oracle 7.1.2에서 테스트됨
오래된 질문이지만 Python 레시피로 유용한 링크를 추가합니다.
에 따르면cx_Oracle
설명서:
Cursor.rowfactory
이 읽기-쓰기 특성은 데이터베이스에서 검색되는 각 행에 대해 호출할 메서드를 지정합니다.일반적으로 각 행에 대해 튜플이 반환되지만 이 특성이 설정된 경우 메서드는 일반적으로 반환되는 튜플과 함께 호출되고 대신 메서드의 결과가 반환됩니다.
cx_Oracle - Python Interface for Oracle Database 또한 많은 유용한 샘플 예제를 위해 GitHub 저장소를 가리킵니다.GenericRowFactory.py 을 확인하십시오.
구글링:이 PPT는 CON6543 Python 및 Oracle Database - RainFocus에서 더욱 유용할 수 있습니다.
요리법
후드 아래 Oracle용 장고 데이터베이스 백엔드에서는 cx_Oracle을 사용합니다.이전 버전(장고 1.11- )에서는 Cx_Oracle의 숫자 데이터 유형을 관련 Python 데이터로 캐스트하고 문자열을 유니코드로 캐스트했습니다.
장고를 설치한 경우 베이스를 확인하십시오.py는 다음과 같습니다.
$ DJANGO_DIR="$(python -c 'import django, os; print(os.path.dirname(django.__file__))')"
$ vim $DJANGO_DIR/db/backends/oracle/base.py
빌릴 수 있습니다_rowfactory()
부터$DJANGO_DIR/db/backends/oracle/base.py
그리고 데코레이터 아래에 적용할 수 있습니다.naming
다시 돌아오게 하기 위해namedtuple
단순한 것이 아니라tuple
.
mybase.py
import functools
from itertools import izip, imap
from operator import itemgetter
from collections import namedtuple
import cx_Oracle as Database
import decimal
def naming(rename=False, case=None):
def decorator(rowfactory):
@functools.wraps(rowfactory)
def decorated_rowfactory(cursor, row, typename="GenericRow"):
field_names = imap(case, imap(itemgetter(0), cursor.description))
return namedtuple(typename, field_names)._make(rowfactory(cursor, row))
return decorated_rowfactory
return decorator
다음과 같이 사용:
@naming(rename=False, case=str.lower)
def rowfactory(cursor, row):
casted = []
....
....
return tuple(casted)
oracle.py
import cx_Oracle as Database
from cx_Oracle import *
import mybase
class Cursor(Database.Cursor):
def execute(self, statement, args=None):
prepareNested = (statement is not None and self.statement != statement)
result = super(self.__class__, self).execute(statement, args or [])
if prepareNested:
if self.description:
self.rowfactory = lambda *row: mybase.rowfactory(self, row)
return result
def close(self):
try:
super(self.__class__, self).close()
except Database.InterfaceError:
"already closed"
class Connection(Database.Connection):
def cursor(self):
Cursor(self)
connect = Connection
이제 사용자 스크립트에서 cx_oracle 가져오기 대신 다음과 같이 오라클 가져오기를 수행합니다.
user.py
import oracle
dsn = oracle.makedsn('HOSTNAME', 1521, service_name='dev_server')
db = connect('username', 'password', dsn)
cursor = db.cursor()
cursor.execute("""
SELECT 'Grijesh' as FirstName,
'Chauhan' as LastName,
CAST('10560.254' AS NUMBER(10, 2)) as Salary
FROM DUAL
""")
row = cursor.fetchone()
print ("First Name is %s" % row.firstname) # => Grijesh
print ("Last Name is %s" % row.lastname) # => Chauhan
print ("Salary is %r" % row.salary) # => Decimal('10560.25')
한번 해보세요!!
@maelcum73의 답변을 기반으로 구축:
curs.rowfactory = lambda *args: dict(zip([d[0] for d in curs.description], args))
이 솔루션의 문제는 실행할 때마다 이를 재설정해야 한다는 것입니다.
한 단계 더 나아가 다음과 같이 커서 개체 주위에 셸을 만들 수 있습니다.
class dictcur(object):
# need to monkeypatch the built-in execute function to always return a dict
def __init__(self, cursor):
self._original_cursor = cursor
def execute(self, *args, **kwargs):
# rowfactory needs to be set AFTER EACH execution!
self._original_cursor.execute(*args, **kwargs)
self._original_cursor.rowfactory = lambda *a: dict(
zip([d[0] for d in self._original_cursor.description], a)
)
# cx_Oracle's cursor's execute method returns a cursor object
# -> return the correct cursor in the monkeypatched version as well!
return self._original_cursor
def __getattr__(self, attr):
# anything other than the execute method: just go straight to the cursor
return getattr(self._original_cursor, attr)
dict_cursor = dictcur(cursor=conn.cursor())
사용dict_cursor
그 후 매번dict_cursor.execute()
호출은 사전을 반환합니다.참고: 실행 메소드를 직접 monkeypatching 해보았지만, 내장 메소드이기 때문에 불가능했습니다.
언급URL : https://stackoverflow.com/questions/35045879/cx-oracle-how-can-i-receive-each-row-as-a-dictionary
'source' 카테고리의 다른 글
2개의 열을 기준으로 SQL 고유 제약 조건을 만들려면 어떻게 해야 합니까? (0) | 2023.06.23 |
---|---|
WiX에서 Oracle ODP.Net에 대한 레지스트리 키(값이 아님)가 있는지 테스트하려면 어떻게 해야 합니까? (0) | 2023.06.23 |
HttpWebRequest 클래스가 보내는 원시 HTTP 요청은 어떻게 볼 수 있습니까? (0) | 2023.06.23 |
안드로이드.os.노출된 파일 URI예외: 파일://storage/emulated/0/test.txt가 앱 이상에서 Intent.getData()를 통해 노출되었습니다. (0) | 2023.06.23 |
Woocommerce 속성별 검색 (0) | 2023.06.23 |