-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
167 lines (146 loc) · 5.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status, Header
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime, timedelta
from jose import JWTError, jwt
import os
import secrets
from dotenv import load_dotenv
app = FastAPI()
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000","http://127.0.0.1:3000"], # Your frontend URL
allow_credentials=True,
allow_methods=["*"], # Allow all HTTP methods
allow_headers=["*"], # Allow all headers
)
# Dummy user storage
fake_users_db = {
"[email protected]": {
"username": "[email protected]",
"password": "123456", # In reality, hash your passwords
"full_name": "Example User",
}
}
SECRET_KEY = os.getenv("SECRET_KEY", "your_super_secret_key")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", 15))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", 7))
class Token(BaseModel):
access_token: str
refresh_token: Optional[str] = None
token_type: str = "bearer"
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
full_name: str
class UserInDB(User):
password: str
class LoginRequest(BaseModel):
username: str
password: str
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_refresh_token(data: dict):
to_encode = data.copy()
to_encode.update({"type": "refresh"}) # Specify this is a refresh token
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_access_token(authorization: str = Header(...)):
try:
# Extract the token from the "Bearer <token>" format
scheme, token = authorization.split()
if scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme",
)
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return TokenData(username=payload.get("sub"))
except JWTError as e:
print(f"Access token verification error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
except ValueError:
# This handles cases where Authorization header is not correctly formatted
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization header format",
)
def verify_refresh_token(authorization: str = Header(None)):
if authorization is None or not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authorization header"
)
token = authorization.split(" ")[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh": # Ensure it's a refresh token
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type for refresh"
)
return TokenData(username=payload.get("sub"))
except JWTError as e:
print(f"Refresh token verification error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
def authenticate_user(username: str, password: str):
user = fake_users_db.get(username)
if not user or user["password"] != password:
return False
return user
@app.post("/login", response_model=Token)
def login(login_req: LoginRequest):
user = authenticate_user(login_req.username, login_req.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
)
access_token = create_access_token({"sub": user["username"]})
refresh_token = create_refresh_token({"sub": user["username"]})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
@app.post("/refresh", response_model=Token)
def refresh_token(token_data: TokenData = Depends(verify_refresh_token)):
if not token_data or not token_data.username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
username = token_data.username
new_access_token = create_access_token({"sub": username})
return {
"access_token": new_access_token,
"refresh_token": None, # Optionally issue a new refresh token if needed
"token_type": "bearer",
}
@app.get("/protected")
def protected_route(token_data: TokenData = Depends(verify_access_token)):
if not token_data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
return f"Hello, {token_data.username}!"
@app.get("/")
def root():
return "Hello World1234!"