# -*- coding: utf-8 -*-
# @Time : 2022-05-18 13:53
# @Author : GeTeShi
# @File : test6.py
import datetime
import time
import requests
import redis
import json
import re
import os
import sys
rdb = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
NAME = "test"
forever = 60 * 60 * 24 * 30 * 12 * 100
def get_cache(key, name=None):
global NAME
if name:
NAME = name
if rdb.hlen(name) and rdb.hexists(name, key):
res = rdb.hget(NAME, key)
try:
res = json.loads(res)
except json.decoder.JSONDecodeError:
pass
return res
else:
return False
def get_cache_all_key(name):
global NAME
if name:
NAME = name
if rdb.hlen(name):
return rdb.hkeys(name=name)
def set_cache(key, content, timeout=60, name=None):
global NAME
if name:
NAME = name
if isinstance(content, dict):
content = json.dumps(content)
rdb.hset(NAME, key, content)
rdb.expire(key, timeout)
def del_cache(key, name=None):
global NAME
if name:
NAME = name
if rdb.hexists(name=name, key=key):
rdb.hdel(name, key)
def md5(s):
import hashlib
m2 = hashlib.md5()
m2.update(s.encode("utf-8"))
return m2.hexdigest()
class image_upload(object):
def __init__(self, email, password):
self.URL = "https://你的图床的api"
self.header = {
"Accept": "application/json",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/108.0.0.0 Safari/537.36 Edg/108.0.1462.54 ",
}
self.email = email
self.password = password
self.TOKEN_KEY = f"{self.email}_{md5(self.password)}_{datetime.datetime.strftime(datetime.datetime.now(), '%Y%m')}"
self.sess = requests.Session()
self.sess.get(url="https://你的图床的api")
def get_requests(self, url, method, data=None, file=None):
if not self.header.get('Authorization') and "tokens" not in url:
header = self.header
self.login()
self.header.update(header)
if method == "post":
response = self.sess.post(url=url, data=data if data else "", headers=self.header)
elif method == "get":
response = self.sess.get(url=url, params=data if data else "", headers=self.header)
elif method == "upload":
response = self.sess.post(url=url, files=file, headers=self.header, data=data)
else:
response = self.sess.delete(url=url, headers=self.header)
if response.status_code == 200:
response.encoding = "utf-8"
response = response.json()
return response
def get_token(self):
self.header = {"Accept": "application/json"}
token = self.get_requests(url=self.URL + "/tokens", method="post", data={
"email": self.email,
"password": self.password
})
if token:
set_cache(self.TOKEN_KEY, token, timeout=60 * 60 * 24, name="upload-image")
return token
def upload(self, file):
self.header.update({
"Accept": "multipart/form-data"
})
filename = file.rsplit("\\", 1)[1].split(".")[0]
filetype = file.rsplit("\\", 1)[1].split(".")[1]
files = {
"file": (f"{filename}.{filetype}", open(file, 'rb'), f"image/{filetype}"),
}
# 存储策略id,默认1
data = {
"strategy_id": 1,
}
url = self.URL + "/upload"
res = self.get_requests(url=url, method="upload", data=data, file=files)
if res and res.get("status"):
data = res.get('data')
print(data.get('links').get('url'))
return data.get('links').get('thumbnail_url')
else:
raise Exception(f"图片上传失败,失败原因:{res.get('message')}")
def login(self):
token = get_cache(self.TOKEN_KEY, name="upload-image")
if (not token) or (not token.get('data') or not token.get('data').get('token')):
token = self.get_token()
# token = get_cache(self.TOKEN_KEY, name="upload-image")
url = self.URL + "/profile"
# print(token.get('data'))
self.header.update({
"Authorization": "Bearer " + token.get('data').get('token'),
"Accept": "application/json"
})
self.get_requests(url=url, data="", method="get")
def main(self):
pass
def clear(self):
self.get_requests(url=self.URL + "/tokens", data="", method="delete")
self.header = {}
return "已清空tokens"
def get_strategies(self):
"""获取存储策略"""
res = self.get_requests(url=self.URL + "/strategies", method="get")
print(res)
if __name__ == '__main__':
images = sys.argv[1:]
for image in images:
image_upload(email="你的用户名", password="你的密码").upload(file=image)
版权属于:
geteshi
作品采用:
《
署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)
》许可协议授权
评论 (0)