cbc-watch/run.py

57 lines
2.1 KiB
Python
Raw Normal View History

2024-09-16 10:39:29 -07:00
import requests
from rocketry import Rocketry
from rocketry.conds import every
2024-09-16 10:44:32 -07:00
from sec import GITLAB_API_TOKEN, DISCORD_WEBHOOK_URL
2024-09-16 10:39:29 -07:00
GITLAB_API_URL = 'https://gitlab.com/api/v4/projects/codebreaker-challenge%2F2024%2Foutreach-2024/issues?state=opened'
GITLAB_PROJECT_URL = 'https://gitlab.com/codebreaker-challenge/2024/outreach-2024'
# Headers for GitLab API request
headers = {
'Private-Token': GITLAB_API_TOKEN
}
2024-09-16 10:44:32 -07:00
# Dictionary to keep track of reported issues and their update times
reported_issues = {}
2024-09-16 10:39:29 -07:00
# Function to send a message to Discord
def send_discord_notification(title, email, subject, message, issue_url):
data = {
"content": f"**New Issue**\n**Email:** {email}\n**Subject:** {subject}\n**Message:** {message}\n**Link:** {issue_url}"
}
response = requests.post(DISCORD_WEBHOOK_URL, json=data)
if response.status_code == 204:
print("Notification sent successfully.")
else:
print(f"Failed to send notification. Status code: {response.status_code}")
# Function to check for new issues
def check_for_new_issues():
response = requests.get(GITLAB_API_URL, headers=headers)
if response.status_code == 200:
issues = response.json()
for issue in issues:
2024-09-16 10:44:32 -07:00
issue_id = issue['id']
updated_at = issue['updated_at']
if issue_id not in reported_issues or reported_issues[issue_id] != updated_at:
2024-09-16 10:39:29 -07:00
title = issue['title']
email, subject = title.split(' - ', 1)
message = issue['description'][:50] + '...' if len(issue['description']) > 50 else issue['description']
issue_url = f"{GITLAB_PROJECT_URL}/-/issues/{issue['iid']}"
send_discord_notification(title, email, subject, message, issue_url)
2024-09-16 10:44:32 -07:00
reported_issues[issue_id] = updated_at
2024-09-16 10:39:29 -07:00
else:
print(f"Failed to fetch issues. Status code: {response.status_code}")
# Create a Rocketry app
app = Rocketry()
# Schedule the task to run every 30 seconds
@app.task(every('20 seconds'))
def scheduled_task():
check_for_new_issues()
# Run the Rocketry app
if __name__ == "__main__":
app.run()