兰空图床本地上传文件,配合Typora使用

geteshi
2024-01-30 / 0 评论 / 24 阅读 / 正在检测是否收录...
# -*- coding: utf-8 -*-
# @Time    : 2022-05-18 13:53
# @Author  : GeTeShi
# @File    : test6.py
import datetime
import time

import requests
import redis
import json
import re
import os
import sys


rdb = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
NAME = "test"
forever = 60 * 60 * 24 * 30 * 12 * 100


def get_cache(key, name=None):
    global NAME
    if name:
        NAME = name
    if rdb.hlen(name) and rdb.hexists(name, key):
        res = rdb.hget(NAME, key)
        try:
            res = json.loads(res)
        except json.decoder.JSONDecodeError:
            pass
        return res
    else:
        return False


def get_cache_all_key(name):
    global NAME
    if name:
        NAME = name
    if rdb.hlen(name):
        return rdb.hkeys(name=name)


def set_cache(key, content, timeout=60, name=None):
    global NAME
    if name:
        NAME = name
    if isinstance(content, dict):
        content = json.dumps(content)
    rdb.hset(NAME, key, content)
    rdb.expire(key, timeout)


def del_cache(key, name=None):
    global NAME
    if name:
        NAME = name
    if rdb.hexists(name=name, key=key):
        rdb.hdel(name, key)


def md5(s):
    import hashlib
    m2 = hashlib.md5()
    m2.update(s.encode("utf-8"))
    return m2.hexdigest()


class image_upload(object):

    def __init__(self, email, password):
        self.URL = "https://你的图床的api"
        self.header = {
            "Accept": "application/json",
            "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/108.0.0.0 Safari/537.36 Edg/108.0.1462.54 ",

        }
        self.email = email
        self.password = password
        self.TOKEN_KEY = f"{self.email}_{md5(self.password)}_{datetime.datetime.strftime(datetime.datetime.now(), '%Y%m')}"
        self.sess = requests.Session()
        self.sess.get(url="https://你的图床的api")

    def get_requests(self, url, method, data=None, file=None):
        if not self.header.get('Authorization') and "tokens" not in url:
            header = self.header
            self.login()
            self.header.update(header)
        if method == "post":
            response = self.sess.post(url=url, data=data if data else "", headers=self.header)
        elif method == "get":
            response = self.sess.get(url=url, params=data if data else "", headers=self.header)
        elif method == "upload":
            response = self.sess.post(url=url, files=file, headers=self.header, data=data)
        else:
            response = self.sess.delete(url=url, headers=self.header)
        if response.status_code == 200:
            response.encoding = "utf-8"
            response = response.json()
            return response

    def get_token(self):
        self.header = {"Accept": "application/json"}
        token = self.get_requests(url=self.URL + "/tokens", method="post", data={
            "email": self.email,
            "password": self.password
        })
        if token:
            set_cache(self.TOKEN_KEY, token, timeout=60 * 60 * 24, name="upload-image")
            return token

    def upload(self, file):
        self.header.update({
            "Accept": "multipart/form-data"
        })
        filename = file.rsplit("\\", 1)[1].split(".")[0]
        filetype = file.rsplit("\\", 1)[1].split(".")[1]
        files = {
            "file": (f"{filename}.{filetype}", open(file, 'rb'), f"image/{filetype}"),
        }
        # 存储策略id,默认1
        data = {
            "strategy_id": 1,
        }
        url = self.URL + "/upload"
        res = self.get_requests(url=url, method="upload", data=data, file=files)
        if res and res.get("status"):
            data = res.get('data')
            print(data.get('links').get('url'))
            return data.get('links').get('thumbnail_url')
        else:
            raise Exception(f"图片上传失败,失败原因:{res.get('message')}")

    def login(self):
        token = get_cache(self.TOKEN_KEY, name="upload-image")
        if (not token) or (not token.get('data') or not token.get('data').get('token')):
            token = self.get_token()
            # token = get_cache(self.TOKEN_KEY, name="upload-image")
        url = self.URL + "/profile"
        # print(token.get('data'))
        self.header.update({
            "Authorization": "Bearer " + token.get('data').get('token'),
            "Accept": "application/json"
        })
        self.get_requests(url=url, data="", method="get")

    def main(self):
        pass

    def clear(self):
        self.get_requests(url=self.URL + "/tokens", data="", method="delete")
        self.header = {}
        return "已清空tokens"

    def get_strategies(self):
        """获取存储策略"""
        res = self.get_requests(url=self.URL + "/strategies", method="get")
        print(res)


if __name__ == '__main__':
    images = sys.argv[1:]
    for image in images:
        image_upload(email="你的用户名", password="你的密码").upload(file=image)
0

评论 (0)

取消