DB의 준비
여기서는 postgresql 을 우분투에 설치하였습니다. finance 데이터베이스을 만들었고 해당 데이터베이스를 읽고, 쓸 권한을 가지는 testuser 계정을 만들었습니다.
# (sql)
CREATE USER testuser WITH PASSWORD 'yourpassword';
\c finance
GRANT SELECT ON TABLE asset TO testuser;
\z asset
- testuser를 생성합니다.
- finance 데이터베이스에 접속합니다.
- asset 데이블에 대한 SELECT (조회) 권한을 testuser 에게 부여합니다.
- 권한이 제대로 부여되었는지 확인합니다.
파이썬에서 DB의 조회
파이썬의 psycopg2와 sqlalchemy 패키지를 이용하여 DB를 조회하여 데이터를 가져오는 코드를 작성합니다.
아래 코드는 sqlalchemy의 create_engine을 이용하여 DB 접속하는 방법을 보여줍니다. connection_string을 통해서 접속정보를 텍스트로 저장하고 create_engine()을 통해 DB를 연결합니다.
# (python)
import pandas as pd
from sqlalchemy import create_engine
db_user = "testuser"
db_password = 'yourpassword'
db_host = 'localhost'
db_port = your_port_number
db_name = 'finance'
connection_string = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'
engine = create_engine(connection_string)
다음 문자열로 sql 쿼리를 작성하고 pd.read_sql로 데이터를 읽습니다.
# (python)
query = f"""
SELECT *
FROM asset
"""
df = pd.read_sql(query, engine)
DataFrame을 Table에 Insert 하기
pandas DataFrame 을 table에 데이터를 쓰는 것은 먼저 csv 로 파일을 쓰고 그 파일을 SQL코드를 통해 insert 하는 방법을 사용할 수 있습니다. 여기서는 pandas DataFrame을 파일에 쓰지 않고 바로 table에 쓰는 방법을 알아보겠습니다.
finace 데이터베이스에 asset 테이블에 접근하여 데이터를 쓰겠습니다. 먼저 다음을 가정하겠습니다.
- 우리는 pandas DataFrame 인 df 를 가지고
있는데 여기에는 'title_id' 컬럼 'keyword' 컬럼이 있습니다.
- asset 테이블은 finance 데이터베이스에 이미 만들어져 있습니다.
'title_id'는 문자열이며 primary key 필드, 'keyword' 는 문자열 필드입니다.
아래 코드는 다음과 같은 방식으로 작동합니다.
- MetaData() 로 테이블, 열, 데이터 타입 등의
메타데이터를 관리하는 인스턴스 metadata 를 만듭니다.
- Table()로 테이블에 접근하는 인스턴스인 table 을 생성합니다.
- engine.connect()로 DB에 연결합니다. 연결 인스턴스는 conn 이라
하겠습니다.
- df의 행을 하나 불러와서 insert(table).values() 로
table insert 객체를 만듭니다.
- insert_stmt.on_conflict_do_nothing() 을 이용해서
key 필드에 대해 중복된 데이터를 무시하도록 설정합니다.
- conn.execute() 이용해서 insert를 실행합니다.
- 여러 개의 insert 가 실행된 후 commit()을 통해서
데이터베이스에 데이터쓰는 것을 완료합니다.
# (python)
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects.postgresql import insert
metadata = MetaData()
table = Table('asset', metadata, autoload_with=engine)
with engine.connect() as conn:
trans = conn.begin()
try:
for index, row in df.iterrows():
insert_stmt = insert(table).values(
title=row['title_id'],
keyword=row['keyword'])
upsert_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=['title_id']
)
conn.execute(upsert_stmt)
trans.commit()
except SQLAlchemyError as e:
trans.rollback()
print(f"오류가 발생했습니다: {e}")
engine.dispose()